From bb274ea8a377e52f0b9289ff2ccb5dda67eeb4cd Mon Sep 17 00:00:00 2001 From: "Amir H. Yeganemehr" Date: Mon, 13 Nov 2023 13:41:46 +0330 Subject: [PATCH] fs.mergeVolumes: Make a plan based on volumes size --- weed/shell/command_fs_merge_volumes.go | 213 ++++++++++++++++++------- 1 file changed, 152 insertions(+), 61 deletions(-) diff --git a/weed/shell/command_fs_merge_volumes.go b/weed/shell/command_fs_merge_volumes.go index beac6a0dd..9ed50a6cf 100644 --- a/weed/shell/command_fs_merge_volumes.go +++ b/weed/shell/command_fs_merge_volumes.go @@ -7,10 +7,13 @@ import ( "fmt" "io" "net/http" + "sort" "strings" "github.com/seaweedfs/seaweedfs/weed/storage/needle" "github.com/seaweedfs/seaweedfs/weed/wdclient" + "golang.org/x/exp/maps" + "golang.org/x/exp/slices" "github.com/seaweedfs/seaweedfs/weed/operation" "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb" @@ -28,7 +31,8 @@ func init() { } type commandFsMergeVolumes struct { - volumes map[needle.VolumeId]*master_pb.VolumeInformationMessage + volumes map[needle.VolumeId]*master_pb.VolumeInformationMessage + volumeSizeLimit uint64 } func (c *commandFsMergeVolumes) Name() string { @@ -40,7 +44,7 @@ func (c *commandFsMergeVolumes) Help() string { This would help clear half-full volumes and let vacuum system to delete them later. - fs.mergeVolumes -toVolumeId=y [-fromVolumeId=x] [-apply] /dir/ + fs.mergeVolumes [-toVolumeId=y] [-fromVolumeId=x] [-collection="*"] [-apply] [/dir/] ` } @@ -50,10 +54,13 @@ func (c *commandFsMergeVolumes) Do(args []string, commandEnv *CommandEnv, writer if err != nil { return err } - dir = strings.TrimRight(dir, "/") + if dir != "/" { + dir = strings.TrimRight(dir, "/") + } fsMergeVolumesCommand := flag.NewFlagSet(c.Name(), flag.ContinueOnError) fromVolumeArg := fsMergeVolumesCommand.Uint("fromVolumeId", 0, "move chunks with this volume id") toVolumeArg := fsMergeVolumesCommand.Uint("toVolumeId", 0, "change chunks to this volume id") + collectionArg := fsMergeVolumesCommand.String("collection", "*", "Name of collection to merge") apply := fsMergeVolumesCommand.Bool("apply", false, "applying the metadata changes") if err = fsMergeVolumesCommand.Parse(args); err != nil { return err @@ -61,21 +68,9 @@ func (c *commandFsMergeVolumes) Do(args []string, commandEnv *CommandEnv, writer fromVolumeId := needle.VolumeId(*fromVolumeArg) toVolumeId := needle.VolumeId(*toVolumeArg) - if toVolumeId == 0 { - return fmt.Errorf("volume id can not be zero") - } - c.reloadVolumesInfo(commandEnv.MasterClient) - toVolumeInfo, err := c.getVolumeInfoById(toVolumeId) - if err != nil { - return err - } - if toVolumeInfo.ReadOnly { - return fmt.Errorf("volume is readonly: %d", toVolumeId) - } - - if fromVolumeId != 0 { + if fromVolumeId != 0 && toVolumeId != 0 { if fromVolumeId == toVolumeId { return fmt.Errorf("no volume id changes, %d == %d", fromVolumeId, toVolumeId) } @@ -86,57 +81,62 @@ func (c *commandFsMergeVolumes) Do(args []string, commandEnv *CommandEnv, writer if !compatible { return fmt.Errorf("volume %d is not compatible with volume %d", fromVolumeId, toVolumeId) } + fromSize := c.getVolumeSizeById(fromVolumeId) + toSize := c.getVolumeSizeById(toVolumeId) + if fromSize+toSize > c.volumeSizeLimit { + return fmt.Errorf( + "volume %d (%d MB) cannot merge into volume %d (%d MB_ due to volume size limit (%d MB)", + fromVolumeId, fromSize/1024/1024, + toVolumeId, toSize/1024/1024, + c.volumeSizeLimit/1024/102, + ) + } } - defer client.CloseIdleConnections() - compatibility := make(map[string]bool) + plan, err := c.createMergePlan(*collectionArg, toVolumeId, fromVolumeId) + + if err != nil { + return err + } + c.printPlan(plan) + + if len(plan) == 0 { + return nil + } + + defer client.CloseIdleConnections() return commandEnv.WithFilerClient(false, func(filerClient filer_pb.SeaweedFilerClient) error { return filer_pb.TraverseBfs(commandEnv, util.FullPath(dir), func(parentPath util.FullPath, entry *filer_pb.Entry) { - if !entry.IsDirectory { - for _, chunk := range entry.Chunks { - if chunk.IsChunkManifest { - fmt.Printf("Change volume id for large file is not implemented yet: %s/%s\n", parentPath, entry.Name) - continue - } - chunkVolumeId := needle.VolumeId(chunk.Fid.VolumeId) - if chunkVolumeId == toVolumeId || (fromVolumeId != 0 && fromVolumeId != chunkVolumeId) { - continue - } - cacheKey := fmt.Sprintf("%d-%d", chunkVolumeId, toVolumeId) - compatible, cached := compatibility[cacheKey] - if !cached { - compatible, err = c.volumesAreCompatible(chunkVolumeId, toVolumeId) - if err != nil { - _ = fmt.Errorf("cannot determine volumes are compatible: %d and %d", chunkVolumeId, toVolumeId) - return - } - compatibility[cacheKey] = compatible - } - if !compatible { - if fromVolumeId != 0 { - _ = fmt.Errorf("volumes are incompatible: %d and %d", fromVolumeId, toVolumeId) - return - } - continue - } - path := parentPath.Child(entry.Name) + if entry.IsDirectory { + return + } + for _, chunk := range entry.Chunks { + if chunk.IsChunkManifest { + fmt.Printf("Change volume id for large file is not implemented yet: %s/%s\n", parentPath, entry.Name) + continue + } + chunkVolumeId := needle.VolumeId(chunk.Fid.VolumeId) + toVolumeId, found := plan[chunkVolumeId] + if !found { + continue + } + path := parentPath.Child(entry.Name) - fmt.Printf("move %s(%s)\n", path, chunk.GetFileIdString()) - if !*apply { - continue - } - if err = moveChunk(chunk, toVolumeId, commandEnv.MasterClient); err != nil { - fmt.Printf("failed to move %s/%s: %v\n", path, chunk.GetFileIdString(), err) - continue - } + fmt.Printf("move %s(%s)\n", path, chunk.GetFileIdString()) + if !*apply { + continue + } + if err = moveChunk(chunk, toVolumeId, commandEnv.MasterClient); err != nil { + fmt.Printf("failed to move %s/%s: %v\n", path, chunk.GetFileIdString(), err) + continue + } - if err = filer_pb.UpdateEntry(filerClient, &filer_pb.UpdateEntryRequest{ - Directory: string(parentPath), - Entry: entry, - }); err != nil { - fmt.Printf("failed to update %s: %v\n", path, err) - } + if err = filer_pb.UpdateEntry(filerClient, &filer_pb.UpdateEntryRequest{ + Directory: string(parentPath), + Entry: entry, + }); err != nil { + fmt.Printf("failed to update %s: %v\n", path, err) } } }) @@ -174,6 +174,9 @@ func (c *commandFsMergeVolumes) reloadVolumesInfo(masterClient *wdclient.MasterC if err != nil { return err } + + c.volumeSizeLimit = volumes.GetVolumeSizeLimitMb() * 1024 * 1024 + for _, dc := range volumes.TopologyInfo.DataCenterInfos { for _, rack := range dc.RackInfos { for _, node := range rack.DataNodeInfos { @@ -192,6 +195,94 @@ func (c *commandFsMergeVolumes) reloadVolumesInfo(masterClient *wdclient.MasterC }) } +func (c *commandFsMergeVolumes) createMergePlan(collection string, toVolumeId needle.VolumeId, fromVolumeId needle.VolumeId) (map[needle.VolumeId]needle.VolumeId, error) { + plan := make(map[needle.VolumeId]needle.VolumeId) + volumes := maps.Keys(c.volumes) + sort.Slice(volumes, func(a, b int) bool { + return c.volumes[volumes[b]].Size < c.volumes[volumes[a]].Size + }) + + l := len(volumes) + for i := 0; i < l; i++ { + volume := c.volumes[volumes[i]] + if volume.GetReadOnly() || c.getVolumeSize(volume) == 0 || (collection != "*" && collection != volume.GetCollection()) { + volumes = slices.Delete(volumes, i, i+1) + i-- + l-- + } + } + for i := l - 1; i >= 0; i-- { + src := volumes[i] + if fromVolumeId != 0 && src != fromVolumeId { + continue + } + for j := 0; j < i; j++ { + condidate := volumes[j] + if toVolumeId != 0 && condidate != toVolumeId { + continue + } + if _, moving := plan[condidate]; moving { + continue + } + compatible, err := c.volumesAreCompatible(src, condidate) + if err != nil { + return nil, err + } + if !compatible { + continue + } + if c.getVolumeSizeBasedOnPlan(plan, condidate)+c.getVolumeSizeById(src) > c.volumeSizeLimit { + continue + } + plan[src] = condidate + break + } + } + + return plan, nil +} + +func (c *commandFsMergeVolumes) getVolumeSizeBasedOnPlan(plan map[needle.VolumeId]needle.VolumeId, vid needle.VolumeId) uint64 { + size := c.getVolumeSizeById(vid) + for src, dist := range plan { + if dist == vid { + size += c.getVolumeSizeById(src) + } + } + return size +} + +func (c *commandFsMergeVolumes) getVolumeSize(volume *master_pb.VolumeInformationMessage) uint64 { + return volume.Size - volume.DeletedByteCount +} + +func (c *commandFsMergeVolumes) getVolumeSizeById(vid needle.VolumeId) uint64 { + return c.getVolumeSize(c.volumes[vid]) +} + +func (c *commandFsMergeVolumes) printPlan(plan map[needle.VolumeId]needle.VolumeId) { + fmt.Printf("max volume size: %d MB\n", c.volumeSizeLimit/1024/1024) + reversePlan := make(map[needle.VolumeId][]needle.VolumeId) + for src, dist := range plan { + reversePlan[dist] = append(reversePlan[dist], src) + } + for dist, srcs := range reversePlan { + currentSize := c.getVolumeSizeById(dist) + for _, src := range srcs { + srcSize := c.getVolumeSizeById(src) + newSize := currentSize + srcSize + fmt.Printf( + "volume %d (%d MB) merge into volume %d (%d MB => %d MB)\n", + src, srcSize/1024/1024, + dist, currentSize/1024/1024, newSize/1024/1024, + ) + currentSize = newSize + + } + fmt.Println() + } +} + func moveChunk(chunk *filer_pb.FileChunk, toVolumeId needle.VolumeId, masterClient *wdclient.MasterClient) error { fromFid := needle.NewFileId(needle.VolumeId(chunk.Fid.VolumeId), chunk.Fid.FileKey, chunk.Fid.Cookie) toFid := needle.NewFileId(toVolumeId, chunk.Fid.FileKey, chunk.Fid.Cookie) @@ -201,7 +292,7 @@ func moveChunk(chunk *filer_pb.FileChunk, toVolumeId needle.VolumeId, masterClie return err } - downloadURL := fmt.Sprintf("http://%s/%s", downloadURLs[0], fromFid.String()) + downloadURL := fmt.Sprintf("http://%s/%s?readDeleted=true", downloadURLs[0], fromFid.String()) uploadURLs, err := masterClient.LookupVolumeServerUrl(toVolumeId.String()) if err != nil {