refactored

This commit is contained in:
chrislu 2025-03-25 16:48:02 -07:00
parent f25f0e6bd2
commit 1989b601fc

View File

@ -19,6 +19,7 @@ import (
"github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
"github.com/seaweedfs/seaweedfs/weed/s3api/s3_constants"
"github.com/seaweedfs/seaweedfs/weed/s3api/s3err"
"github.com/seaweedfs/seaweedfs/weed/security"
"github.com/seaweedfs/seaweedfs/weed/util"
util_http "github.com/seaweedfs/seaweedfs/weed/util/http"
)
@ -114,82 +115,11 @@ func (s3a *S3ApiServer) CopyObjectHandler(w http.ResponseWriter, r *http.Request
}
// Replicate chunks
dstChunks := make([]*filer_pb.FileChunk, len(entry.GetChunks()))
for i, chunk := range entry.GetChunks() {
// Create a new chunk with same properties but new file ID
dstChunk := &filer_pb.FileChunk{
Offset: chunk.Offset,
Size: chunk.Size,
ModifiedTsNs: time.Now().UnixNano(),
ETag: chunk.ETag,
IsCompressed: chunk.IsCompressed,
CipherKey: chunk.CipherKey,
}
// Get new file ID for the chunk
assignResult, err := operation.Assign(func(_ context.Context) pb.ServerAddress {
return pb.ServerAddress(s3a.option.Filer.ToGrpcAddress())
}, s3a.option.GrpcDialOption, &operation.VolumeAssignRequest{
Count: 1,
Replication: "",
Collection: "",
Ttl: "",
DiskType: "",
DataCenter: s3a.option.DataCenter,
})
if err != nil {
s3err.WriteErrorResponse(w, r, s3err.ErrInternalError)
return
}
dstChunk.FileId = assignResult.Fid
fid, err := filer_pb.ToFileIdObject(assignResult.Fid)
if err != nil {
s3err.WriteErrorResponse(w, r, s3err.ErrInternalError)
return
}
dstChunk.Fid = fid
// Copy chunk data
srcUrl := fmt.Sprintf("http://%s/%s", s3a.option.Filer.ToHttpAddress(), chunk.GetFileIdString())
dstUrl := fmt.Sprintf("http://%s/%s", assignResult.Url, assignResult.Fid)
_, _, resp, err := util_http.DownloadFile(srcUrl, s3a.maybeGetFilerJwtAuthorizationToken(false))
if err != nil {
s3err.WriteErrorResponse(w, r, s3err.ErrInvalidCopySource)
return
}
// Read response body into byte slice
chunkData, err := io.ReadAll(resp.Body)
util_http.CloseResponse(resp)
if err != nil {
s3err.WriteErrorResponse(w, r, s3err.ErrInvalidCopySource)
return
}
// Upload chunk to new location
uploadOption := &operation.UploadOption{
UploadUrl: dstUrl,
Filename: dstObject,
Cipher: false,
IsInputCompressed: false,
MimeType: "",
PairMap: nil,
Jwt: assignResult.Auth,
}
uploader, err := operation.NewUploader()
if err != nil {
s3err.WriteErrorResponse(w, r, s3err.ErrInternalError)
return
}
_, err = uploader.UploadData(chunkData, uploadOption)
if err != nil {
s3err.WriteErrorResponse(w, r, s3err.ErrInternalError)
return
}
dstChunks[i] = dstChunk
dstChunks, err := s3a.copyChunks(entry, r.URL.Path)
if err != nil {
glog.Errorf("CopyObjectHandler copy chunks error: %v", err)
s3err.WriteErrorResponse(w, r, s3err.ErrInternalError)
return
}
dstEntry.Chunks = dstChunks
@ -415,3 +345,137 @@ func processMetadataBytes(reqHeader http.Header, existing map[string][]byte, rep
return
}
// copyChunks replicates chunks from source entry to destination entry
func (s3a *S3ApiServer) copyChunks(entry *filer_pb.Entry, dstPath string) ([]*filer_pb.FileChunk, error) {
dstChunks := make([]*filer_pb.FileChunk, len(entry.GetChunks()))
executor := util.NewLimitedConcurrentExecutor(8) // Limit to 8 concurrent operations
errChan := make(chan error, len(entry.GetChunks()))
for i, chunk := range entry.GetChunks() {
chunkIndex := i
executor.Execute(func() {
dstChunk, err := s3a.copySingleChunk(chunk, chunkIndex, dstPath)
if err != nil {
errChan <- fmt.Errorf("chunk %d: %v", chunkIndex, err)
return
}
dstChunks[chunkIndex] = dstChunk
errChan <- nil
})
}
// Wait for all operations to complete and check for errors
for i := 0; i < len(entry.GetChunks()); i++ {
if err := <-errChan; err != nil {
return nil, err
}
}
return dstChunks, nil
}
// copySingleChunk copies a single chunk from source to destination
func (s3a *S3ApiServer) copySingleChunk(chunk *filer_pb.FileChunk, chunkIndex int, dstPath string) (*filer_pb.FileChunk, error) {
// Create a new chunk with same properties but new file ID
dstChunk := &filer_pb.FileChunk{
Offset: chunk.Offset,
Size: chunk.Size,
ModifiedTsNs: time.Now().UnixNano(),
ETag: chunk.ETag,
IsCompressed: chunk.IsCompressed,
CipherKey: chunk.CipherKey,
}
// Get new file ID using filer's AssignVolume
assignResult, err := s3a.assignNewVolume(dstPath)
if err != nil {
return nil, fmt.Errorf("assign volume: %v", err)
}
dstChunk.FileId = assignResult.FileId
fid, err := filer_pb.ToFileIdObject(assignResult.FileId)
if err != nil {
return nil, fmt.Errorf("parse file ID: %v", err)
}
dstChunk.Fid = fid
// Get source URL using LookupFileId
srcUrl, _, err := operation.LookupFileId(func(_ context.Context) pb.ServerAddress {
return pb.ServerAddress(s3a.option.Filer.ToGrpcAddress())
}, s3a.option.GrpcDialOption, chunk.GetFileIdString())
if err != nil {
return nil, fmt.Errorf("lookup source file ID: %v", err)
}
// Download and upload the chunk
if err := s3a.transferChunkData(srcUrl, assignResult); err != nil {
return nil, fmt.Errorf("transfer chunk data: %v", err)
}
return dstChunk, nil
}
// assignNewVolume assigns a new volume for the chunk
func (s3a *S3ApiServer) assignNewVolume(dstPath string) (*filer_pb.AssignVolumeResponse, error) {
var assignResult *filer_pb.AssignVolumeResponse
err := s3a.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error {
resp, err := client.AssignVolume(context.Background(), &filer_pb.AssignVolumeRequest{
Count: 1,
Replication: "",
Collection: "",
DiskType: "",
DataCenter: s3a.option.DataCenter,
Path: dstPath,
})
if err != nil {
return fmt.Errorf("assign volume: %v", err)
}
if resp.Error != "" {
return fmt.Errorf("assign volume: %v", resp.Error)
}
assignResult = resp
return nil
})
if err != nil {
return nil, err
}
return assignResult, nil
}
// transferChunkData downloads the chunk from source and uploads it to destination
func (s3a *S3ApiServer) transferChunkData(srcUrl string, assignResult *filer_pb.AssignVolumeResponse) error {
dstUrl := fmt.Sprintf("http://%s/%s", assignResult.Location.Url, assignResult.FileId)
_, _, resp, err := util_http.DownloadFile(srcUrl, s3a.maybeGetFilerJwtAuthorizationToken(false))
if err != nil {
return fmt.Errorf("download chunk: %v", err)
}
// Read response body into byte slice
chunkData, err := io.ReadAll(resp.Body)
util_http.CloseResponse(resp)
if err != nil {
return fmt.Errorf("read chunk data: %v", err)
}
// Upload chunk to new location
uploadOption := &operation.UploadOption{
UploadUrl: dstUrl,
Cipher: false,
IsInputCompressed: false,
MimeType: "",
PairMap: nil,
Jwt: security.EncodedJwt(assignResult.Auth),
}
uploader, err := operation.NewUploader()
if err != nil {
return fmt.Errorf("create uploader: %v", err)
}
_, err = uploader.UploadData(chunkData, uploadOption)
if err != nil {
return fmt.Errorf("upload chunk: %v", err)
}
return nil
}