From f25f0e6bd27a62a00a8ce0a8d90a211e1fdba2ac Mon Sep 17 00:00:00 2001 From: chrislu Date: Tue, 25 Mar 2025 16:03:37 -0700 Subject: [PATCH] it compiles --- weed/s3api/s3api_object_handlers_copy.go | 147 +++++++++++++++++++---- 1 file changed, 126 insertions(+), 21 deletions(-) diff --git a/weed/s3api/s3api_object_handlers_copy.go b/weed/s3api/s3api_object_handlers_copy.go index 4ca8010d2..d96faea27 100644 --- a/weed/s3api/s3api_object_handlers_copy.go +++ b/weed/s3api/s3api_object_handlers_copy.go @@ -1,7 +1,9 @@ package s3api import ( + "context" "fmt" + "io" "net/http" "net/url" "strconv" @@ -10,7 +12,11 @@ import ( "modernc.org/strutil" + "github.com/seaweedfs/seaweedfs/weed/filer" "github.com/seaweedfs/seaweedfs/weed/glog" + "github.com/seaweedfs/seaweedfs/weed/operation" + "github.com/seaweedfs/seaweedfs/weed/pb" + "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb" "github.com/seaweedfs/seaweedfs/weed/s3api/s3_constants" "github.com/seaweedfs/seaweedfs/weed/s3api/s3err" "github.com/seaweedfs/seaweedfs/weed/util" @@ -73,7 +79,8 @@ func (s3a *S3ApiServer) CopyObjectHandler(w http.ResponseWriter, r *http.Request } srcPath := util.FullPath(fmt.Sprintf("%s/%s%s", s3a.option.BucketsPath, srcBucket, srcObject)) dir, name := srcPath.DirAndName() - if entry, err := s3a.getEntry(dir, name); err != nil || entry.IsDirectory { + entry, err := s3a.getEntry(dir, name) + if err != nil || entry.IsDirectory { s3err.WriteErrorResponse(w, r, s3err.ErrInvalidCopySource) return } @@ -83,36 +90,134 @@ func (s3a *S3ApiServer) CopyObjectHandler(w http.ResponseWriter, r *http.Request return } - dstUrl := fmt.Sprintf("http://%s%s/%s%s", - s3a.option.Filer.ToHttpAddress(), s3a.option.BucketsPath, dstBucket, urlEscapeObject(dstObject)) - srcUrl := fmt.Sprintf("http://%s%s/%s%s", - s3a.option.Filer.ToHttpAddress(), s3a.option.BucketsPath, srcBucket, urlEscapeObject(srcObject)) - - _, _, resp, err := util_http.DownloadFile(srcUrl, s3a.maybeGetFilerJwtAuthorizationToken(false)) - if err != nil { - s3err.WriteErrorResponse(w, r, s3err.ErrInvalidCopySource) - return - } - defer util_http.CloseResponse(resp) - - tagErr := processMetadata(r.Header, resp.Header, replaceMeta, replaceTagging, s3a.getTags, dir, name) + // Process metadata and tags + tagErr := processMetadata(r.Header, nil, replaceMeta, replaceTagging, s3a.getTags, dir, name) if tagErr != nil { s3err.WriteErrorResponse(w, r, s3err.ErrInvalidCopySource) return } - glog.V(2).Infof("copy from %s to %s", srcUrl, dstUrl) - destination := fmt.Sprintf("%s/%s%s", s3a.option.BucketsPath, dstBucket, dstObject) - etag, errCode := s3a.putToFiler(r, dstUrl, resp.Body, destination, dstBucket) - if errCode != s3err.ErrNone { - s3err.WriteErrorResponse(w, r, errCode) + // Create new entry for destination + dstEntry := &filer_pb.Entry{ + Attributes: &filer_pb.FuseAttributes{ + FileSize: entry.Attributes.FileSize, + Mtime: time.Now().Unix(), + Crtime: entry.Attributes.Crtime, + Mime: entry.Attributes.Mime, + }, + Extended: make(map[string][]byte), + } + + // Copy extended attributes + for k, v := range entry.Extended { + dstEntry.Extended[k] = v + } + + // Replicate chunks + dstChunks := make([]*filer_pb.FileChunk, len(entry.GetChunks())) + for i, chunk := range entry.GetChunks() { + // Create a new chunk with same properties but new file ID + dstChunk := &filer_pb.FileChunk{ + Offset: chunk.Offset, + Size: chunk.Size, + ModifiedTsNs: time.Now().UnixNano(), + ETag: chunk.ETag, + IsCompressed: chunk.IsCompressed, + CipherKey: chunk.CipherKey, + } + + // Get new file ID for the chunk + assignResult, err := operation.Assign(func(_ context.Context) pb.ServerAddress { + return pb.ServerAddress(s3a.option.Filer.ToGrpcAddress()) + }, s3a.option.GrpcDialOption, &operation.VolumeAssignRequest{ + Count: 1, + Replication: "", + Collection: "", + Ttl: "", + DiskType: "", + DataCenter: s3a.option.DataCenter, + }) + if err != nil { + s3err.WriteErrorResponse(w, r, s3err.ErrInternalError) + return + } + + dstChunk.FileId = assignResult.Fid + fid, err := filer_pb.ToFileIdObject(assignResult.Fid) + if err != nil { + s3err.WriteErrorResponse(w, r, s3err.ErrInternalError) + return + } + dstChunk.Fid = fid + + // Copy chunk data + srcUrl := fmt.Sprintf("http://%s/%s", s3a.option.Filer.ToHttpAddress(), chunk.GetFileIdString()) + dstUrl := fmt.Sprintf("http://%s/%s", assignResult.Url, assignResult.Fid) + + _, _, resp, err := util_http.DownloadFile(srcUrl, s3a.maybeGetFilerJwtAuthorizationToken(false)) + if err != nil { + s3err.WriteErrorResponse(w, r, s3err.ErrInvalidCopySource) + return + } + + // Read response body into byte slice + chunkData, err := io.ReadAll(resp.Body) + util_http.CloseResponse(resp) + if err != nil { + s3err.WriteErrorResponse(w, r, s3err.ErrInvalidCopySource) + return + } + + // Upload chunk to new location + uploadOption := &operation.UploadOption{ + UploadUrl: dstUrl, + Filename: dstObject, + Cipher: false, + IsInputCompressed: false, + MimeType: "", + PairMap: nil, + Jwt: assignResult.Auth, + } + uploader, err := operation.NewUploader() + if err != nil { + s3err.WriteErrorResponse(w, r, s3err.ErrInternalError) + return + } + _, err = uploader.UploadData(chunkData, uploadOption) + if err != nil { + s3err.WriteErrorResponse(w, r, s3err.ErrInternalError) + return + } + + dstChunks[i] = dstChunk + } + + dstEntry.Chunks = dstChunks + + // Save the new entry + dstPath := util.FullPath(fmt.Sprintf("%s/%s%s", s3a.option.BucketsPath, dstBucket, dstObject)) + dstDir, dstName := dstPath.DirAndName() + if err := s3a.touch(dstDir, dstName, dstEntry); err != nil { + s3err.WriteErrorResponse(w, r, s3err.ErrInternalError) return } - setEtag(w, etag) + // Convert filer_pb.Entry to filer.Entry for ETag calculation + filerEntry := &filer.Entry{ + FullPath: dstPath, + Attr: filer.Attr{ + FileSize: dstEntry.Attributes.FileSize, + Mtime: time.Unix(dstEntry.Attributes.Mtime, 0), + Crtime: time.Unix(dstEntry.Attributes.Crtime, 0), + Mime: dstEntry.Attributes.Mime, + }, + Chunks: dstEntry.Chunks, + } + + setEtag(w, filer.ETagEntry(filerEntry)) response := CopyObjectResult{ - ETag: etag, + ETag: filer.ETagEntry(filerEntry), LastModified: time.Now().UTC(), }