diff --git a/weed/s3api/s3api_object_handlers_copy.go b/weed/s3api/s3api_object_handlers_copy.go index d96faea27..f7f8bc06e 100644 --- a/weed/s3api/s3api_object_handlers_copy.go +++ b/weed/s3api/s3api_object_handlers_copy.go @@ -19,6 +19,7 @@ import ( "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb" "github.com/seaweedfs/seaweedfs/weed/s3api/s3_constants" "github.com/seaweedfs/seaweedfs/weed/s3api/s3err" + "github.com/seaweedfs/seaweedfs/weed/security" "github.com/seaweedfs/seaweedfs/weed/util" util_http "github.com/seaweedfs/seaweedfs/weed/util/http" ) @@ -114,82 +115,11 @@ func (s3a *S3ApiServer) CopyObjectHandler(w http.ResponseWriter, r *http.Request } // Replicate chunks - dstChunks := make([]*filer_pb.FileChunk, len(entry.GetChunks())) - for i, chunk := range entry.GetChunks() { - // Create a new chunk with same properties but new file ID - dstChunk := &filer_pb.FileChunk{ - Offset: chunk.Offset, - Size: chunk.Size, - ModifiedTsNs: time.Now().UnixNano(), - ETag: chunk.ETag, - IsCompressed: chunk.IsCompressed, - CipherKey: chunk.CipherKey, - } - - // Get new file ID for the chunk - assignResult, err := operation.Assign(func(_ context.Context) pb.ServerAddress { - return pb.ServerAddress(s3a.option.Filer.ToGrpcAddress()) - }, s3a.option.GrpcDialOption, &operation.VolumeAssignRequest{ - Count: 1, - Replication: "", - Collection: "", - Ttl: "", - DiskType: "", - DataCenter: s3a.option.DataCenter, - }) - if err != nil { - s3err.WriteErrorResponse(w, r, s3err.ErrInternalError) - return - } - - dstChunk.FileId = assignResult.Fid - fid, err := filer_pb.ToFileIdObject(assignResult.Fid) - if err != nil { - s3err.WriteErrorResponse(w, r, s3err.ErrInternalError) - return - } - dstChunk.Fid = fid - - // Copy chunk data - srcUrl := fmt.Sprintf("http://%s/%s", s3a.option.Filer.ToHttpAddress(), chunk.GetFileIdString()) - dstUrl := fmt.Sprintf("http://%s/%s", assignResult.Url, assignResult.Fid) - - _, _, resp, err := util_http.DownloadFile(srcUrl, s3a.maybeGetFilerJwtAuthorizationToken(false)) - if err != nil { - s3err.WriteErrorResponse(w, r, s3err.ErrInvalidCopySource) - return - } - - // Read response body into byte slice - chunkData, err := io.ReadAll(resp.Body) - util_http.CloseResponse(resp) - if err != nil { - s3err.WriteErrorResponse(w, r, s3err.ErrInvalidCopySource) - return - } - - // Upload chunk to new location - uploadOption := &operation.UploadOption{ - UploadUrl: dstUrl, - Filename: dstObject, - Cipher: false, - IsInputCompressed: false, - MimeType: "", - PairMap: nil, - Jwt: assignResult.Auth, - } - uploader, err := operation.NewUploader() - if err != nil { - s3err.WriteErrorResponse(w, r, s3err.ErrInternalError) - return - } - _, err = uploader.UploadData(chunkData, uploadOption) - if err != nil { - s3err.WriteErrorResponse(w, r, s3err.ErrInternalError) - return - } - - dstChunks[i] = dstChunk + dstChunks, err := s3a.copyChunks(entry, r.URL.Path) + if err != nil { + glog.Errorf("CopyObjectHandler copy chunks error: %v", err) + s3err.WriteErrorResponse(w, r, s3err.ErrInternalError) + return } dstEntry.Chunks = dstChunks @@ -415,3 +345,137 @@ func processMetadataBytes(reqHeader http.Header, existing map[string][]byte, rep return } + +// copyChunks replicates chunks from source entry to destination entry +func (s3a *S3ApiServer) copyChunks(entry *filer_pb.Entry, dstPath string) ([]*filer_pb.FileChunk, error) { + dstChunks := make([]*filer_pb.FileChunk, len(entry.GetChunks())) + executor := util.NewLimitedConcurrentExecutor(8) // Limit to 8 concurrent operations + errChan := make(chan error, len(entry.GetChunks())) + + for i, chunk := range entry.GetChunks() { + chunkIndex := i + executor.Execute(func() { + dstChunk, err := s3a.copySingleChunk(chunk, chunkIndex, dstPath) + if err != nil { + errChan <- fmt.Errorf("chunk %d: %v", chunkIndex, err) + return + } + dstChunks[chunkIndex] = dstChunk + errChan <- nil + }) + } + + // Wait for all operations to complete and check for errors + for i := 0; i < len(entry.GetChunks()); i++ { + if err := <-errChan; err != nil { + return nil, err + } + } + + return dstChunks, nil +} + +// copySingleChunk copies a single chunk from source to destination +func (s3a *S3ApiServer) copySingleChunk(chunk *filer_pb.FileChunk, chunkIndex int, dstPath string) (*filer_pb.FileChunk, error) { + // Create a new chunk with same properties but new file ID + dstChunk := &filer_pb.FileChunk{ + Offset: chunk.Offset, + Size: chunk.Size, + ModifiedTsNs: time.Now().UnixNano(), + ETag: chunk.ETag, + IsCompressed: chunk.IsCompressed, + CipherKey: chunk.CipherKey, + } + + // Get new file ID using filer's AssignVolume + assignResult, err := s3a.assignNewVolume(dstPath) + if err != nil { + return nil, fmt.Errorf("assign volume: %v", err) + } + + dstChunk.FileId = assignResult.FileId + fid, err := filer_pb.ToFileIdObject(assignResult.FileId) + if err != nil { + return nil, fmt.Errorf("parse file ID: %v", err) + } + dstChunk.Fid = fid + + // Get source URL using LookupFileId + srcUrl, _, err := operation.LookupFileId(func(_ context.Context) pb.ServerAddress { + return pb.ServerAddress(s3a.option.Filer.ToGrpcAddress()) + }, s3a.option.GrpcDialOption, chunk.GetFileIdString()) + if err != nil { + return nil, fmt.Errorf("lookup source file ID: %v", err) + } + + // Download and upload the chunk + if err := s3a.transferChunkData(srcUrl, assignResult); err != nil { + return nil, fmt.Errorf("transfer chunk data: %v", err) + } + + return dstChunk, nil +} + +// assignNewVolume assigns a new volume for the chunk +func (s3a *S3ApiServer) assignNewVolume(dstPath string) (*filer_pb.AssignVolumeResponse, error) { + var assignResult *filer_pb.AssignVolumeResponse + err := s3a.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error { + resp, err := client.AssignVolume(context.Background(), &filer_pb.AssignVolumeRequest{ + Count: 1, + Replication: "", + Collection: "", + DiskType: "", + DataCenter: s3a.option.DataCenter, + Path: dstPath, + }) + if err != nil { + return fmt.Errorf("assign volume: %v", err) + } + if resp.Error != "" { + return fmt.Errorf("assign volume: %v", resp.Error) + } + assignResult = resp + return nil + }) + if err != nil { + return nil, err + } + return assignResult, nil +} + +// transferChunkData downloads the chunk from source and uploads it to destination +func (s3a *S3ApiServer) transferChunkData(srcUrl string, assignResult *filer_pb.AssignVolumeResponse) error { + dstUrl := fmt.Sprintf("http://%s/%s", assignResult.Location.Url, assignResult.FileId) + + _, _, resp, err := util_http.DownloadFile(srcUrl, s3a.maybeGetFilerJwtAuthorizationToken(false)) + if err != nil { + return fmt.Errorf("download chunk: %v", err) + } + + // Read response body into byte slice + chunkData, err := io.ReadAll(resp.Body) + util_http.CloseResponse(resp) + if err != nil { + return fmt.Errorf("read chunk data: %v", err) + } + + // Upload chunk to new location + uploadOption := &operation.UploadOption{ + UploadUrl: dstUrl, + Cipher: false, + IsInputCompressed: false, + MimeType: "", + PairMap: nil, + Jwt: security.EncodedJwt(assignResult.Auth), + } + uploader, err := operation.NewUploader() + if err != nil { + return fmt.Errorf("create uploader: %v", err) + } + _, err = uploader.UploadData(chunkData, uploadOption) + if err != nil { + return fmt.Errorf("upload chunk: %v", err) + } + + return nil +}