mirror of
https://github.com/seaweedfs/seaweedfs.git
synced 2025-04-05 20:52:50 +08:00
re-lock if the lock owner is not found
This commit is contained in:
parent
0b2e5ddc7c
commit
ac50d8a822
@ -57,27 +57,7 @@ func (lc *LockClient) StartLock(key string, owner string) (lock *LiveLock) {
|
||||
lc: lc,
|
||||
}
|
||||
go func() {
|
||||
util.RetryUntil("create lock:"+key, func() error {
|
||||
errorMessage, err := lock.doLock(lock_manager.MaxDuration)
|
||||
if err != nil {
|
||||
glog.V(0).Infof("create lock %s: %s", key, err)
|
||||
time.Sleep(time.Second)
|
||||
return err
|
||||
}
|
||||
if errorMessage != "" {
|
||||
glog.V(4).Infof("create lock %s: %s", key, errorMessage)
|
||||
time.Sleep(time.Second)
|
||||
return fmt.Errorf("%v", errorMessage)
|
||||
}
|
||||
lock.isLocked = true
|
||||
return nil
|
||||
}, func(err error) (shouldContinue bool) {
|
||||
if err != nil {
|
||||
glog.Warningf("create lock %s: %s", key, err)
|
||||
time.Sleep(time.Second)
|
||||
}
|
||||
return lock.renewToken == ""
|
||||
})
|
||||
lock.CreateLock(lock_manager.MaxDuration)
|
||||
lc.keepLock(lock)
|
||||
}()
|
||||
return
|
||||
@ -98,24 +78,8 @@ func (lc *LockClient) doNewLock(key string, lockDuration time.Duration, owner st
|
||||
lockDuration = lc.maxLockDuration
|
||||
needRenewal = true
|
||||
}
|
||||
util.RetryUntil("create lock:"+key, func() error {
|
||||
errorMessage, err := lock.doLock(lockDuration)
|
||||
if err != nil {
|
||||
time.Sleep(time.Second)
|
||||
return err
|
||||
}
|
||||
if errorMessage != "" {
|
||||
time.Sleep(time.Second)
|
||||
return fmt.Errorf("%v", errorMessage)
|
||||
}
|
||||
lock.isLocked = true
|
||||
return nil
|
||||
}, func(err error) (shouldContinue bool) {
|
||||
if err != nil {
|
||||
glog.Warningf("create lock %s: %s", key, err)
|
||||
}
|
||||
return lock.renewToken == ""
|
||||
})
|
||||
|
||||
lock.CreateLock(lockDuration)
|
||||
|
||||
if needRenewal {
|
||||
go lc.keepLock(lock)
|
||||
@ -124,6 +88,31 @@ func (lc *LockClient) doNewLock(key string, lockDuration time.Duration, owner st
|
||||
return
|
||||
}
|
||||
|
||||
func (lock *LiveLock) CreateLock(lockDuration time.Duration) {
|
||||
util.RetryUntil("create lock:"+lock.key, func() error {
|
||||
return lock.DoLock(lockDuration)
|
||||
}, func(err error) (shouldContinue bool) {
|
||||
if err != nil {
|
||||
glog.Warningf("create lock %s: %s", lock.key, err)
|
||||
}
|
||||
return lock.renewToken == ""
|
||||
})
|
||||
}
|
||||
|
||||
func (lock *LiveLock) DoLock(lockDuration time.Duration) error {
|
||||
errorMessage, err := lock.doLock(lockDuration)
|
||||
if err != nil {
|
||||
time.Sleep(time.Second)
|
||||
return err
|
||||
}
|
||||
if errorMessage != "" {
|
||||
time.Sleep(time.Second)
|
||||
return fmt.Errorf("%v", errorMessage)
|
||||
}
|
||||
lock.isLocked = true
|
||||
return nil
|
||||
}
|
||||
|
||||
func (lock *LiveLock) IsLocked() bool {
|
||||
return lock!=nil && lock.isLocked
|
||||
}
|
||||
@ -161,12 +150,14 @@ func (lc *LockClient) keepLock(lock *LiveLock) {
|
||||
if err != nil {
|
||||
lock.isLocked = false
|
||||
time.Sleep(time.Second)
|
||||
glog.V(0).Infof("keep lock %s: %v", lock.key, err)
|
||||
return err
|
||||
}
|
||||
if errorMessage != "" {
|
||||
lock.isLocked = false
|
||||
time.Sleep(time.Second)
|
||||
return fmt.Errorf("%v", errorMessage)
|
||||
glog.V(4).Infof("keep lock message %s: %v", lock.key, errorMessage)
|
||||
return fmt.Errorf("keep lock error: %v", errorMessage)
|
||||
}
|
||||
return nil
|
||||
}, func(err error) (shouldContinue bool) {
|
||||
|
@ -4,6 +4,7 @@ import (
|
||||
"fmt"
|
||||
"github.com/google/uuid"
|
||||
"github.com/puzpuzpuz/xsync/v2"
|
||||
"github.com/seaweedfs/seaweedfs/weed/glog"
|
||||
"time"
|
||||
)
|
||||
|
||||
@ -11,6 +12,7 @@ var LockErrorNonEmptyTokenOnNewLock = fmt.Errorf("lock: non-empty token on a new
|
||||
var LockErrorNonEmptyTokenOnExpiredLock = fmt.Errorf("lock: non-empty token on an expired lock")
|
||||
var LockErrorTokenMismatch = fmt.Errorf("lock: token mismatch")
|
||||
var UnlockErrorTokenMismatch = fmt.Errorf("unlock: token mismatch")
|
||||
var LockNotFound = fmt.Errorf("lock not found")
|
||||
|
||||
// LockManager local lock manager, used by distributed lock manager
|
||||
type LockManager struct {
|
||||
@ -138,13 +140,11 @@ func (lm *LockManager) InsertLock(path string, expiredAtNs int64, token string,
|
||||
}
|
||||
|
||||
func (lm *LockManager) GetLockOwner(key string) (owner string, err error) {
|
||||
lm.locks.Range(func(k string, lock *Lock) bool {
|
||||
if k == key && lock != nil {
|
||||
owner = lock.Owner
|
||||
return false
|
||||
}
|
||||
return true
|
||||
})
|
||||
lock, _ := lm.locks.Load(key)
|
||||
if lock != nil {
|
||||
return lock.Owner, nil
|
||||
}
|
||||
glog.V(0).Infof("get lock %s %+v", key, lock)
|
||||
err = LockNotFound
|
||||
return
|
||||
|
||||
}
|
||||
|
@ -1,7 +1,7 @@
|
||||
package broker
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"github.com/seaweedfs/seaweedfs/weed/cluster/lock_manager"
|
||||
"github.com/seaweedfs/seaweedfs/weed/glog"
|
||||
"github.com/seaweedfs/seaweedfs/weed/mq/pub_balancer"
|
||||
"github.com/seaweedfs/seaweedfs/weed/mq/sub_coordinator"
|
||||
@ -88,11 +88,13 @@ func NewMessageBroker(option *MessageQueueBrokerOption, grpcDialOption grpc.Dial
|
||||
lockClient := cluster.NewLockClient(grpcDialOption, mqBroker.currentFiler)
|
||||
mqBroker.lockAsBalancer = lockClient.StartLock(pub_balancer.LockBrokerBalancer, string(self))
|
||||
for {
|
||||
err := mqBroker.BrokerConnectToBalancer(string(self))
|
||||
if err != nil {
|
||||
fmt.Printf("BrokerConnectToBalancer: %v\n", err)
|
||||
if err := mqBroker.BrokerConnectToBalancer(string(self)); err != nil {
|
||||
glog.V(0).Infof("BrokerConnectToBalancer: %v", err)
|
||||
}
|
||||
time.Sleep(time.Second)
|
||||
if err := mqBroker.lockAsBalancer.DoLock(lock_manager.MaxDuration); err != nil {
|
||||
glog.V(0).Infof("DoLock: %v", err)
|
||||
}
|
||||
}
|
||||
}()
|
||||
|
||||
|
@ -3,6 +3,7 @@ package weed_server
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"github.com/seaweedfs/seaweedfs/weed/cluster/lock_manager"
|
||||
"github.com/seaweedfs/seaweedfs/weed/glog"
|
||||
"github.com/seaweedfs/seaweedfs/weed/pb"
|
||||
"github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
|
||||
@ -81,13 +82,7 @@ func (fs *FilerServer) DistributedUnlock(ctx context.Context, req *filer_pb.Unlo
|
||||
|
||||
func (fs *FilerServer) FindLockOwner(ctx context.Context, req *filer_pb.FindLockOwnerRequest) (*filer_pb.FindLockOwnerResponse, error) {
|
||||
owner, movedTo, err := fs.filer.Dlm.FindLockOwner(req.Name)
|
||||
if owner == "" {
|
||||
glog.V(0).Infof("find lock %s moved to %v: %v", req.Name, movedTo, err)
|
||||
}
|
||||
if err != nil {
|
||||
return nil, status.Error(codes.Internal, err.Error())
|
||||
}
|
||||
if !req.IsMoved && movedTo != "" {
|
||||
if !req.IsMoved && movedTo != "" && err == lock_manager.LockNotFound {
|
||||
err = pb.WithFilerClient(false, 0, movedTo, fs.grpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
|
||||
secondResp, err := client.FindLockOwner(context.Background(), &filer_pb.FindLockOwnerRequest{
|
||||
Name: req.Name,
|
||||
@ -103,6 +98,15 @@ func (fs *FilerServer) FindLockOwner(ctx context.Context, req *filer_pb.FindLock
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
|
||||
if owner == "" {
|
||||
glog.V(0).Infof("find lock %s moved to %v: %v", req.Name, movedTo, err)
|
||||
return nil, status.Error(codes.NotFound, err.Error())
|
||||
}
|
||||
if err != nil {
|
||||
return nil, status.Error(codes.Internal, err.Error())
|
||||
}
|
||||
|
||||
return &filer_pb.FindLockOwnerResponse{
|
||||
Owner: owner,
|
||||
}, nil
|
||||
|
Loading…
Reference in New Issue
Block a user