it compiles

This commit is contained in:
chrislu 2025-03-25 16:03:37 -07:00
parent 9d8b975ff9
commit f25f0e6bd2

View File

@ -1,7 +1,9 @@
package s3api
import (
"context"
"fmt"
"io"
"net/http"
"net/url"
"strconv"
@ -10,7 +12,11 @@ import (
"modernc.org/strutil"
"github.com/seaweedfs/seaweedfs/weed/filer"
"github.com/seaweedfs/seaweedfs/weed/glog"
"github.com/seaweedfs/seaweedfs/weed/operation"
"github.com/seaweedfs/seaweedfs/weed/pb"
"github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
"github.com/seaweedfs/seaweedfs/weed/s3api/s3_constants"
"github.com/seaweedfs/seaweedfs/weed/s3api/s3err"
"github.com/seaweedfs/seaweedfs/weed/util"
@ -73,7 +79,8 @@ func (s3a *S3ApiServer) CopyObjectHandler(w http.ResponseWriter, r *http.Request
}
srcPath := util.FullPath(fmt.Sprintf("%s/%s%s", s3a.option.BucketsPath, srcBucket, srcObject))
dir, name := srcPath.DirAndName()
if entry, err := s3a.getEntry(dir, name); err != nil || entry.IsDirectory {
entry, err := s3a.getEntry(dir, name)
if err != nil || entry.IsDirectory {
s3err.WriteErrorResponse(w, r, s3err.ErrInvalidCopySource)
return
}
@ -83,36 +90,134 @@ func (s3a *S3ApiServer) CopyObjectHandler(w http.ResponseWriter, r *http.Request
return
}
dstUrl := fmt.Sprintf("http://%s%s/%s%s",
s3a.option.Filer.ToHttpAddress(), s3a.option.BucketsPath, dstBucket, urlEscapeObject(dstObject))
srcUrl := fmt.Sprintf("http://%s%s/%s%s",
s3a.option.Filer.ToHttpAddress(), s3a.option.BucketsPath, srcBucket, urlEscapeObject(srcObject))
_, _, resp, err := util_http.DownloadFile(srcUrl, s3a.maybeGetFilerJwtAuthorizationToken(false))
if err != nil {
s3err.WriteErrorResponse(w, r, s3err.ErrInvalidCopySource)
return
}
defer util_http.CloseResponse(resp)
tagErr := processMetadata(r.Header, resp.Header, replaceMeta, replaceTagging, s3a.getTags, dir, name)
// Process metadata and tags
tagErr := processMetadata(r.Header, nil, replaceMeta, replaceTagging, s3a.getTags, dir, name)
if tagErr != nil {
s3err.WriteErrorResponse(w, r, s3err.ErrInvalidCopySource)
return
}
glog.V(2).Infof("copy from %s to %s", srcUrl, dstUrl)
destination := fmt.Sprintf("%s/%s%s", s3a.option.BucketsPath, dstBucket, dstObject)
etag, errCode := s3a.putToFiler(r, dstUrl, resp.Body, destination, dstBucket)
if errCode != s3err.ErrNone {
s3err.WriteErrorResponse(w, r, errCode)
// Create new entry for destination
dstEntry := &filer_pb.Entry{
Attributes: &filer_pb.FuseAttributes{
FileSize: entry.Attributes.FileSize,
Mtime: time.Now().Unix(),
Crtime: entry.Attributes.Crtime,
Mime: entry.Attributes.Mime,
},
Extended: make(map[string][]byte),
}
// Copy extended attributes
for k, v := range entry.Extended {
dstEntry.Extended[k] = v
}
// Replicate chunks
dstChunks := make([]*filer_pb.FileChunk, len(entry.GetChunks()))
for i, chunk := range entry.GetChunks() {
// Create a new chunk with same properties but new file ID
dstChunk := &filer_pb.FileChunk{
Offset: chunk.Offset,
Size: chunk.Size,
ModifiedTsNs: time.Now().UnixNano(),
ETag: chunk.ETag,
IsCompressed: chunk.IsCompressed,
CipherKey: chunk.CipherKey,
}
// Get new file ID for the chunk
assignResult, err := operation.Assign(func(_ context.Context) pb.ServerAddress {
return pb.ServerAddress(s3a.option.Filer.ToGrpcAddress())
}, s3a.option.GrpcDialOption, &operation.VolumeAssignRequest{
Count: 1,
Replication: "",
Collection: "",
Ttl: "",
DiskType: "",
DataCenter: s3a.option.DataCenter,
})
if err != nil {
s3err.WriteErrorResponse(w, r, s3err.ErrInternalError)
return
}
dstChunk.FileId = assignResult.Fid
fid, err := filer_pb.ToFileIdObject(assignResult.Fid)
if err != nil {
s3err.WriteErrorResponse(w, r, s3err.ErrInternalError)
return
}
dstChunk.Fid = fid
// Copy chunk data
srcUrl := fmt.Sprintf("http://%s/%s", s3a.option.Filer.ToHttpAddress(), chunk.GetFileIdString())
dstUrl := fmt.Sprintf("http://%s/%s", assignResult.Url, assignResult.Fid)
_, _, resp, err := util_http.DownloadFile(srcUrl, s3a.maybeGetFilerJwtAuthorizationToken(false))
if err != nil {
s3err.WriteErrorResponse(w, r, s3err.ErrInvalidCopySource)
return
}
// Read response body into byte slice
chunkData, err := io.ReadAll(resp.Body)
util_http.CloseResponse(resp)
if err != nil {
s3err.WriteErrorResponse(w, r, s3err.ErrInvalidCopySource)
return
}
// Upload chunk to new location
uploadOption := &operation.UploadOption{
UploadUrl: dstUrl,
Filename: dstObject,
Cipher: false,
IsInputCompressed: false,
MimeType: "",
PairMap: nil,
Jwt: assignResult.Auth,
}
uploader, err := operation.NewUploader()
if err != nil {
s3err.WriteErrorResponse(w, r, s3err.ErrInternalError)
return
}
_, err = uploader.UploadData(chunkData, uploadOption)
if err != nil {
s3err.WriteErrorResponse(w, r, s3err.ErrInternalError)
return
}
dstChunks[i] = dstChunk
}
dstEntry.Chunks = dstChunks
// Save the new entry
dstPath := util.FullPath(fmt.Sprintf("%s/%s%s", s3a.option.BucketsPath, dstBucket, dstObject))
dstDir, dstName := dstPath.DirAndName()
if err := s3a.touch(dstDir, dstName, dstEntry); err != nil {
s3err.WriteErrorResponse(w, r, s3err.ErrInternalError)
return
}
setEtag(w, etag)
// Convert filer_pb.Entry to filer.Entry for ETag calculation
filerEntry := &filer.Entry{
FullPath: dstPath,
Attr: filer.Attr{
FileSize: dstEntry.Attributes.FileSize,
Mtime: time.Unix(dstEntry.Attributes.Mtime, 0),
Crtime: time.Unix(dstEntry.Attributes.Crtime, 0),
Mime: dstEntry.Attributes.Mime,
},
Chunks: dstEntry.Chunks,
}
setEtag(w, filer.ETagEntry(filerEntry))
response := CopyObjectResult{
ETag: etag,
ETag: filer.ETagEntry(filerEntry),
LastModified: time.Now().UTC(),
}