mirror of
https://github.com/seaweedfs/seaweedfs.git
synced 2025-04-05 20:52:50 +08:00
filer.remote.sync: exit when directory is unmounted
this will not propagate the deletions back to the cloud
This commit is contained in:
parent
3faaa6e360
commit
3bd48c4f29
@ -12,7 +12,10 @@ import (
|
|||||||
"github.com/chrislusf/seaweedfs/weed/replication/source"
|
"github.com/chrislusf/seaweedfs/weed/replication/source"
|
||||||
"github.com/chrislusf/seaweedfs/weed/security"
|
"github.com/chrislusf/seaweedfs/weed/security"
|
||||||
"github.com/chrislusf/seaweedfs/weed/util"
|
"github.com/chrislusf/seaweedfs/weed/util"
|
||||||
|
"github.com/golang/protobuf/proto"
|
||||||
"google.golang.org/grpc"
|
"google.golang.org/grpc"
|
||||||
|
"os"
|
||||||
|
"strings"
|
||||||
"time"
|
"time"
|
||||||
)
|
)
|
||||||
|
|
||||||
@ -102,8 +105,6 @@ func followUpdatesAndUploadToRemote(option *RemoteSyncOptions, filerSource *sour
|
|||||||
return fmt.Errorf("read mount info: %v", detectErr)
|
return fmt.Errorf("read mount info: %v", detectErr)
|
||||||
}
|
}
|
||||||
|
|
||||||
dirHash := util.HashStringToLong(mountedDir)
|
|
||||||
|
|
||||||
// 1. specified by timeAgo
|
// 1. specified by timeAgo
|
||||||
// 2. last offset timestamp for this directory
|
// 2. last offset timestamp for this directory
|
||||||
// 3. directory creation time
|
// 3. directory creation time
|
||||||
@ -114,7 +115,7 @@ func followUpdatesAndUploadToRemote(option *RemoteSyncOptions, filerSource *sour
|
|||||||
return fmt.Errorf("lookup %s: %v", mountedDir, err)
|
return fmt.Errorf("lookup %s: %v", mountedDir, err)
|
||||||
}
|
}
|
||||||
|
|
||||||
lastOffsetTsNs, err := getOffset(option.grpcDialOption, *option.filerAddress, RemoteSyncKeyPrefix, int32(dirHash))
|
lastOffsetTsNs, err := remote_storage.GetSyncOffset(option.grpcDialOption, *option.filerAddress, mountedDir)
|
||||||
if mountedDirEntry != nil {
|
if mountedDirEntry != nil {
|
||||||
if err == nil && mountedDirEntry.Attributes.Crtime < lastOffsetTsNs/1000000 {
|
if err == nil && mountedDirEntry.Attributes.Crtime < lastOffsetTsNs/1000000 {
|
||||||
lastOffsetTs = time.Unix(0, lastOffsetTsNs)
|
lastOffsetTs = time.Unix(0, lastOffsetTsNs)
|
||||||
@ -134,8 +135,44 @@ func followUpdatesAndUploadToRemote(option *RemoteSyncOptions, filerSource *sour
|
|||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
handleEtcRemoteChanges := func(resp *filer_pb.SubscribeMetadataResponse) error {
|
||||||
|
message := resp.EventNotification
|
||||||
|
if message.NewEntry == nil {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
if message.NewEntry.Name == filer.REMOTE_STORAGE_MOUNT_FILE {
|
||||||
|
mappings, readErr := filer.UnmarshalRemoteStorageMappings(message.NewEntry.Content)
|
||||||
|
if readErr != nil {
|
||||||
|
return fmt.Errorf("unmarshal mappings: %v", readErr)
|
||||||
|
}
|
||||||
|
if remoteLoc, found := mappings.Mappings[mountedDir]; found {
|
||||||
|
if remoteStorageMountLocation.Bucket != remoteLoc.Bucket || remoteStorageMountLocation.Path != remoteLoc.Path {
|
||||||
|
glog.Fatalf("Unexpected mount changes %+v => %+v", remoteStorageMountLocation, remoteLoc)
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
glog.V(0).Infof("unmounted %s exiting ...", mountedDir)
|
||||||
|
os.Exit(0)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if message.NewEntry.Name == remoteStorage.Name + filer.REMOTE_STORAGE_CONF_SUFFIX {
|
||||||
|
conf := &remote_pb.RemoteConf{}
|
||||||
|
if err := proto.Unmarshal(message.NewEntry.Content, conf); err != nil {
|
||||||
|
return fmt.Errorf("unmarshal %s/%s: %v", filer.DirectoryEtcRemote, message.NewEntry.Name, err)
|
||||||
|
}
|
||||||
|
remoteStorage = conf
|
||||||
|
client, err = remote_storage.GetRemoteStorage(remoteStorage)
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
eachEntryFunc := func(resp *filer_pb.SubscribeMetadataResponse) error {
|
eachEntryFunc := func(resp *filer_pb.SubscribeMetadataResponse) error {
|
||||||
message := resp.EventNotification
|
message := resp.EventNotification
|
||||||
|
if strings.HasPrefix(resp.Directory, filer.DirectoryEtcRemote) {
|
||||||
|
return handleEtcRemoteChanges(resp)
|
||||||
|
}
|
||||||
|
|
||||||
if message.OldEntry == nil && message.NewEntry == nil {
|
if message.OldEntry == nil && message.NewEntry == nil {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
@ -207,11 +244,11 @@ func followUpdatesAndUploadToRemote(option *RemoteSyncOptions, filerSource *sour
|
|||||||
processEventFnWithOffset := pb.AddOffsetFunc(eachEntryFunc, 3*time.Second, func(counter int64, lastTsNs int64) error {
|
processEventFnWithOffset := pb.AddOffsetFunc(eachEntryFunc, 3*time.Second, func(counter int64, lastTsNs int64) error {
|
||||||
lastTime := time.Unix(0, lastTsNs)
|
lastTime := time.Unix(0, lastTsNs)
|
||||||
glog.V(0).Infof("remote sync %s progressed to %v %0.2f/sec", *option.filerAddress, lastTime, float64(counter)/float64(3))
|
glog.V(0).Infof("remote sync %s progressed to %v %0.2f/sec", *option.filerAddress, lastTime, float64(counter)/float64(3))
|
||||||
return setOffset(option.grpcDialOption, *option.filerAddress, RemoteSyncKeyPrefix, int32(dirHash), lastTsNs)
|
return remote_storage.SetSyncOffset(option.grpcDialOption, *option.filerAddress, mountedDir, lastTsNs)
|
||||||
})
|
})
|
||||||
|
|
||||||
return pb.FollowMetadata(*option.filerAddress, option.grpcDialOption, "filer.remote.sync",
|
return pb.FollowMetadata(*option.filerAddress, option.grpcDialOption, "filer.remote.sync",
|
||||||
mountedDir, nil, lastOffsetTs.UnixNano(), 0, processEventFnWithOffset, false)
|
mountedDir, []string{filer.DirectoryEtcRemote}, lastOffsetTs.UnixNano(), 0, processEventFnWithOffset, false)
|
||||||
}
|
}
|
||||||
|
|
||||||
func toRemoteStorageLocation(mountDir, sourcePath util.FullPath, remoteMountLocation *remote_pb.RemoteStorageLocation) *remote_pb.RemoteStorageLocation {
|
func toRemoteStorageLocation(mountDir, sourcePath util.FullPath, remoteMountLocation *remote_pb.RemoteStorageLocation) *remote_pb.RemoteStorageLocation {
|
||||||
|
73
weed/remote_storage/track_sync_offset.go
Normal file
73
weed/remote_storage/track_sync_offset.go
Normal file
@ -0,0 +1,73 @@
|
|||||||
|
package remote_storage
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"errors"
|
||||||
|
"github.com/chrislusf/seaweedfs/weed/pb"
|
||||||
|
"github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
|
||||||
|
"github.com/chrislusf/seaweedfs/weed/util"
|
||||||
|
"google.golang.org/grpc"
|
||||||
|
)
|
||||||
|
|
||||||
|
const (
|
||||||
|
SyncKeyPrefix = "remote.sync."
|
||||||
|
)
|
||||||
|
|
||||||
|
func GetSyncOffset(grpcDialOption grpc.DialOption, filer string, dir string) (lastOffsetTsNs int64, readErr error) {
|
||||||
|
|
||||||
|
dirHash := uint32(util.HashStringToLong(dir))
|
||||||
|
|
||||||
|
readErr = pb.WithFilerClient(filer, grpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
|
||||||
|
syncKey := []byte(SyncKeyPrefix + "____")
|
||||||
|
util.Uint32toBytes(syncKey[len(SyncKeyPrefix):len(SyncKeyPrefix)+4], dirHash)
|
||||||
|
|
||||||
|
resp, err := client.KvGet(context.Background(), &filer_pb.KvGetRequest{Key: syncKey})
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
if len(resp.Error) != 0 {
|
||||||
|
return errors.New(resp.Error)
|
||||||
|
}
|
||||||
|
if len(resp.Value) < 8 {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
lastOffsetTsNs = int64(util.BytesToUint64(resp.Value))
|
||||||
|
|
||||||
|
return nil
|
||||||
|
})
|
||||||
|
|
||||||
|
return
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
func SetSyncOffset(grpcDialOption grpc.DialOption, filer string, dir string, offsetTsNs int64) error {
|
||||||
|
|
||||||
|
dirHash := uint32(util.HashStringToLong(dir))
|
||||||
|
|
||||||
|
return pb.WithFilerClient(filer, grpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
|
||||||
|
|
||||||
|
syncKey := []byte(SyncKeyPrefix + "____")
|
||||||
|
util.Uint32toBytes(syncKey[len(SyncKeyPrefix):len(SyncKeyPrefix)+4], dirHash)
|
||||||
|
|
||||||
|
valueBuf := make([]byte, 8)
|
||||||
|
util.Uint64toBytes(valueBuf, uint64(offsetTsNs))
|
||||||
|
|
||||||
|
resp, err := client.KvPut(context.Background(), &filer_pb.KvPutRequest{
|
||||||
|
Key: syncKey,
|
||||||
|
Value: valueBuf,
|
||||||
|
})
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
if len(resp.Error) != 0 {
|
||||||
|
return errors.New(resp.Error)
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
|
||||||
|
})
|
||||||
|
|
||||||
|
}
|
@ -6,8 +6,10 @@ import (
|
|||||||
"fmt"
|
"fmt"
|
||||||
"github.com/chrislusf/seaweedfs/weed/filer"
|
"github.com/chrislusf/seaweedfs/weed/filer"
|
||||||
"github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
|
"github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
|
||||||
|
"github.com/chrislusf/seaweedfs/weed/remote_storage"
|
||||||
"github.com/chrislusf/seaweedfs/weed/util"
|
"github.com/chrislusf/seaweedfs/weed/util"
|
||||||
"io"
|
"io"
|
||||||
|
"time"
|
||||||
)
|
)
|
||||||
|
|
||||||
func init() {
|
func init() {
|
||||||
@ -30,9 +32,7 @@ func (c *commandRemoteUnmount) Help() string {
|
|||||||
remote.mount -dir=/xxx -remote=s3_1/bucket
|
remote.mount -dir=/xxx -remote=s3_1/bucket
|
||||||
|
|
||||||
# unmount the mounted directory and remove its cache
|
# unmount the mounted directory and remove its cache
|
||||||
# Make sure you have stopped "weed filer.remote.sync" first!
|
remote.unmount -dir=/xxx
|
||||||
# Otherwise, the deletion will also be propagated to the remote storage!!!
|
|
||||||
remote.unmount -dir=/xxx -iHaveStoppedRemoteSync
|
|
||||||
|
|
||||||
`
|
`
|
||||||
}
|
}
|
||||||
@ -42,7 +42,6 @@ func (c *commandRemoteUnmount) Do(args []string, commandEnv *CommandEnv, writer
|
|||||||
remoteMountCommand := flag.NewFlagSet(c.Name(), flag.ContinueOnError)
|
remoteMountCommand := flag.NewFlagSet(c.Name(), flag.ContinueOnError)
|
||||||
|
|
||||||
dir := remoteMountCommand.String("dir", "", "a directory in filer")
|
dir := remoteMountCommand.String("dir", "", "a directory in filer")
|
||||||
hasStoppedRemoteSync := remoteMountCommand.Bool("iHaveStoppedRemoteSync", false, "confirm to stop weed filer.remote.sync first")
|
|
||||||
|
|
||||||
if err = remoteMountCommand.Parse(args); err != nil {
|
if err = remoteMountCommand.Parse(args); err != nil {
|
||||||
return nil
|
return nil
|
||||||
@ -61,17 +60,21 @@ func (c *commandRemoteUnmount) Do(args []string, commandEnv *CommandEnv, writer
|
|||||||
return fmt.Errorf("directory %s is not mounted", *dir)
|
return fmt.Errorf("directory %s is not mounted", *dir)
|
||||||
}
|
}
|
||||||
|
|
||||||
if !*hasStoppedRemoteSync {
|
// store a mount configuration in filer
|
||||||
return fmt.Errorf("make sure \"weed filer.remote.sync\" is stopped to avoid data loss")
|
fmt.Fprintf(writer, "deleting mount for %s ...\n", *dir)
|
||||||
|
if err = c.deleteMountMapping(commandEnv, *dir); err != nil {
|
||||||
|
return fmt.Errorf("delete mount mapping: %v", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
// purge mounted data
|
// purge mounted data
|
||||||
|
fmt.Fprintf(writer, "purge %s ...\n", *dir)
|
||||||
if err = c.purgeMountedData(commandEnv, *dir); err != nil {
|
if err = c.purgeMountedData(commandEnv, *dir); err != nil {
|
||||||
return fmt.Errorf("purge mounted data: %v", err)
|
return fmt.Errorf("purge mounted data: %v", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
// store a mount configuration in filer
|
// reset remote sync offset in case the folder is mounted again
|
||||||
if err = c.deleteMountMapping(commandEnv, *dir); err != nil {
|
if err = remote_storage.SetSyncOffset(commandEnv.option.GrpcDialOption, commandEnv.option.FilerAddress, *dir, time.Now().UnixNano()); err != nil {
|
||||||
return fmt.Errorf("delete mount mapping: %v", err)
|
return fmt.Errorf("reset remote.sync offset for %s: %v", *dir, err)
|
||||||
}
|
}
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
@ -100,6 +103,8 @@ func (c *commandRemoteUnmount) purgeMountedData(commandEnv *CommandEnv, dir stri
|
|||||||
mkdirErr := filer_pb.DoMkdir(client, parent, name, func(entry *filer_pb.Entry) {
|
mkdirErr := filer_pb.DoMkdir(client, parent, name, func(entry *filer_pb.Entry) {
|
||||||
entry.Attributes = oldEntry.Attributes
|
entry.Attributes = oldEntry.Attributes
|
||||||
entry.Extended = oldEntry.Extended
|
entry.Extended = oldEntry.Extended
|
||||||
|
entry.Attributes.Crtime = time.Now().Unix()
|
||||||
|
entry.Attributes.Mtime = time.Now().Unix()
|
||||||
})
|
})
|
||||||
if mkdirErr != nil {
|
if mkdirErr != nil {
|
||||||
return fmt.Errorf("mkdir %s: %v", dir, mkdirErr)
|
return fmt.Errorf("mkdir %s: %v", dir, mkdirErr)
|
||||||
|
Loading…
Reference in New Issue
Block a user