15 // CopyFile is multipart copy object
17 // srcBucketName source bucket name
18 // srcObjectKey source object name
19 // destObjectKey target object name in the form of bucketname.objectkey
20 // partSize the part size in byte.
21 // options object's contraints. Check out function InitiateMultipartUpload.
23 // error it's nil if the operation succeeds, otherwise it's an error object.
25 func (bucket Bucket) CopyFile(srcBucketName, srcObjectKey, destObjectKey string, partSize int64, options ...Option) error {
26 destBucketName := bucket.BucketName
27 if partSize < MinPartSize || partSize > MaxPartSize {
28 return errors.New("oss: part size invalid range (1024KB, 5GB]")
31 cpConf := getCpConfig(options)
32 routines := getRoutines(options)
34 var strVersionId string
35 versionId, _ := FindOption(options, "versionId", nil)
37 strVersionId = versionId.(string)
40 if cpConf != nil && cpConf.IsEnable {
41 cpFilePath := getCopyCpFilePath(cpConf, srcBucketName, srcObjectKey, destBucketName, destObjectKey, strVersionId)
43 return bucket.copyFileWithCp(srcBucketName, srcObjectKey, destBucketName, destObjectKey, partSize, options, cpFilePath, routines)
47 return bucket.copyFile(srcBucketName, srcObjectKey, destBucketName, destObjectKey,
48 partSize, options, routines)
51 func getCopyCpFilePath(cpConf *cpConfig, srcBucket, srcObject, destBucket, destObject, versionId string) string {
52 if cpConf.FilePath == "" && cpConf.DirPath != "" {
53 dest := fmt.Sprintf("oss://%v/%v", destBucket, destObject)
54 src := fmt.Sprintf("oss://%v/%v", srcBucket, srcObject)
55 cpFileName := getCpFileName(src, dest, versionId)
56 cpConf.FilePath = cpConf.DirPath + string(os.PathSeparator) + cpFileName
58 return cpConf.FilePath
61 // ----- Concurrently copy without checkpoint ---------
63 // copyWorkerArg defines the copy worker arguments
64 type copyWorkerArg struct {
66 imur InitiateMultipartUploadResult
73 // copyPartHook is the hook for testing purpose
74 type copyPartHook func(part copyPart) error
76 var copyPartHooker copyPartHook = defaultCopyPartHook
78 func defaultCopyPartHook(part copyPart) error {
82 // copyWorker copies worker
83 func copyWorker(id int, arg copyWorkerArg, jobs <-chan copyPart, results chan<- UploadPart, failed chan<- error, die <-chan bool) {
84 for chunk := range jobs {
85 if err := arg.hook(chunk); err != nil {
89 chunkSize := chunk.End - chunk.Start + 1
90 part, err := arg.bucket.UploadPartCopy(arg.imur, arg.srcBucketName, arg.srcObjectKey,
91 chunk.Start, chunkSize, chunk.Number, arg.options...)
106 func copyScheduler(jobs chan copyPart, parts []copyPart) {
107 for _, part := range parts {
113 // copyPart structure
114 type copyPart struct {
115 Number int // Part number (from 1 to 10,000)
116 Start int64 // The start index in the source file.
117 End int64 // The end index in the source file
120 // getCopyParts calculates copy parts
121 func getCopyParts(objectSize, partSize int64) []copyPart {
122 parts := []copyPart{}
125 for offset := int64(0); offset < objectSize; offset += partSize {
128 part.End = GetPartEnd(offset, objectSize, partSize)
129 parts = append(parts, part)
135 // getSrcObjectBytes gets the source file size
136 func getSrcObjectBytes(parts []copyPart) int64 {
138 for _, part := range parts {
139 ob += (part.End - part.Start + 1)
144 // copyFile is a concurrently copy without checkpoint
145 func (bucket Bucket) copyFile(srcBucketName, srcObjectKey, destBucketName, destObjectKey string,
146 partSize int64, options []Option, routines int) error {
147 descBucket, err := bucket.Client.Bucket(destBucketName)
148 srcBucket, err := bucket.Client.Bucket(srcBucketName)
149 listener := GetProgressListener(options)
151 // choice valid options
152 headerOptions := ChoiceHeadObjectOption(options)
153 partOptions := ChoiceTransferPartOption(options)
154 completeOptions := ChoiceCompletePartOption(options)
155 abortOptions := ChoiceAbortPartOption(options)
157 meta, err := srcBucket.GetObjectDetailedMeta(srcObjectKey, headerOptions...)
162 objectSize, err := strconv.ParseInt(meta.Get(HTTPHeaderContentLength), 10, 0)
168 parts := getCopyParts(objectSize, partSize)
169 // Initialize the multipart upload
170 imur, err := descBucket.InitiateMultipartUpload(destObjectKey, options...)
175 jobs := make(chan copyPart, len(parts))
176 results := make(chan UploadPart, len(parts))
177 failed := make(chan error)
178 die := make(chan bool)
180 var completedBytes int64
181 totalBytes := getSrcObjectBytes(parts)
182 event := newProgressEvent(TransferStartedEvent, 0, totalBytes, 0)
183 publishProgress(listener, event)
185 // Start to copy workers
186 arg := copyWorkerArg{descBucket, imur, srcBucketName, srcObjectKey, partOptions, copyPartHooker}
187 for w := 1; w <= routines; w++ {
188 go copyWorker(w, arg, jobs, results, failed, die)
191 // Start the scheduler
192 go copyScheduler(jobs, parts)
194 // Wait for the parts finished.
196 ups := make([]UploadPart, len(parts))
197 for completed < len(parts) {
199 case part := <-results:
201 ups[part.PartNumber-1] = part
202 copyBytes := (parts[part.PartNumber-1].End - parts[part.PartNumber-1].Start + 1)
203 completedBytes += copyBytes
204 event = newProgressEvent(TransferDataEvent, completedBytes, totalBytes, copyBytes)
205 publishProgress(listener, event)
206 case err := <-failed:
208 descBucket.AbortMultipartUpload(imur, abortOptions...)
209 event = newProgressEvent(TransferFailedEvent, completedBytes, totalBytes, 0)
210 publishProgress(listener, event)
214 if completed >= len(parts) {
219 event = newProgressEvent(TransferCompletedEvent, completedBytes, totalBytes, 0)
220 publishProgress(listener, event)
222 // Complete the multipart upload
223 _, err = descBucket.CompleteMultipartUpload(imur, ups, completeOptions...)
225 bucket.AbortMultipartUpload(imur, abortOptions...)
231 // ----- Concurrently copy with checkpoint -----
233 const copyCpMagic = "84F1F18C-FF1D-403B-A1D8-9DEB5F65910A"
235 type copyCheckpoint struct {
236 Magic string // Magic
237 MD5 string // CP content MD5
238 SrcBucketName string // Source bucket
239 SrcObjectKey string // Source object
240 DestBucketName string // Target bucket
241 DestObjectKey string // Target object
242 CopyID string // Copy ID
243 ObjStat objectStat // Object stat
244 Parts []copyPart // Copy parts
245 CopyParts []UploadPart // The uploaded parts
246 PartStat []bool // The part status
249 // isValid checks if the data is valid which means CP is valid and object is not updated.
250 func (cp copyCheckpoint) isValid(meta http.Header) (bool, error) {
251 // Compare CP's magic number and the MD5.
254 js, _ := json.Marshal(cpb)
256 b64 := base64.StdEncoding.EncodeToString(sum[:])
258 if cp.Magic != downloadCpMagic || b64 != cp.MD5 {
262 objectSize, err := strconv.ParseInt(meta.Get(HTTPHeaderContentLength), 10, 64)
267 // Compare the object size and last modified time and etag.
268 if cp.ObjStat.Size != objectSize ||
269 cp.ObjStat.LastModified != meta.Get(HTTPHeaderLastModified) ||
270 cp.ObjStat.Etag != meta.Get(HTTPHeaderEtag) {
277 // load loads from the checkpoint file
278 func (cp *copyCheckpoint) load(filePath string) error {
279 contents, err := ioutil.ReadFile(filePath)
284 err = json.Unmarshal(contents, cp)
288 // update updates the parts status
289 func (cp *copyCheckpoint) update(part UploadPart) {
290 cp.CopyParts[part.PartNumber-1] = part
291 cp.PartStat[part.PartNumber-1] = true
294 // dump dumps the CP to the file
295 func (cp *copyCheckpoint) dump(filePath string) error {
300 js, err := json.Marshal(bcp)
305 b64 := base64.StdEncoding.EncodeToString(sum[:])
309 js, err = json.Marshal(bcp)
315 return ioutil.WriteFile(filePath, js, FilePermMode)
318 // todoParts returns unfinished parts
319 func (cp copyCheckpoint) todoParts() []copyPart {
321 for i, ps := range cp.PartStat {
323 dps = append(dps, cp.Parts[i])
329 // getCompletedBytes returns finished bytes count
330 func (cp copyCheckpoint) getCompletedBytes() int64 {
331 var completedBytes int64
332 for i, part := range cp.Parts {
334 completedBytes += (part.End - part.Start + 1)
337 return completedBytes
340 // prepare initializes the multipart upload
341 func (cp *copyCheckpoint) prepare(meta http.Header, srcBucket *Bucket, srcObjectKey string, destBucket *Bucket, destObjectKey string,
342 partSize int64, options []Option) error {
344 cp.Magic = copyCpMagic
345 cp.SrcBucketName = srcBucket.BucketName
346 cp.SrcObjectKey = srcObjectKey
347 cp.DestBucketName = destBucket.BucketName
348 cp.DestObjectKey = destObjectKey
350 objectSize, err := strconv.ParseInt(meta.Get(HTTPHeaderContentLength), 10, 64)
355 cp.ObjStat.Size = objectSize
356 cp.ObjStat.LastModified = meta.Get(HTTPHeaderLastModified)
357 cp.ObjStat.Etag = meta.Get(HTTPHeaderEtag)
360 cp.Parts = getCopyParts(objectSize, partSize)
361 cp.PartStat = make([]bool, len(cp.Parts))
362 for i := range cp.PartStat {
363 cp.PartStat[i] = false
365 cp.CopyParts = make([]UploadPart, len(cp.Parts))
368 imur, err := destBucket.InitiateMultipartUpload(destObjectKey, options...)
372 cp.CopyID = imur.UploadID
377 func (cp *copyCheckpoint) complete(bucket *Bucket, parts []UploadPart, cpFilePath string, options []Option) error {
378 imur := InitiateMultipartUploadResult{Bucket: cp.DestBucketName,
379 Key: cp.DestObjectKey, UploadID: cp.CopyID}
380 _, err := bucket.CompleteMultipartUpload(imur, parts, options...)
384 os.Remove(cpFilePath)
388 // copyFileWithCp is concurrently copy with checkpoint
389 func (bucket Bucket) copyFileWithCp(srcBucketName, srcObjectKey, destBucketName, destObjectKey string,
390 partSize int64, options []Option, cpFilePath string, routines int) error {
391 descBucket, err := bucket.Client.Bucket(destBucketName)
392 srcBucket, err := bucket.Client.Bucket(srcBucketName)
393 listener := GetProgressListener(options)
396 ccp := copyCheckpoint{}
397 err = ccp.load(cpFilePath)
399 os.Remove(cpFilePath)
402 // choice valid options
403 headerOptions := ChoiceHeadObjectOption(options)
404 partOptions := ChoiceTransferPartOption(options)
405 completeOptions := ChoiceCompletePartOption(options)
407 meta, err := srcBucket.GetObjectDetailedMeta(srcObjectKey, headerOptions...)
412 // Load error or the CP data is invalid---reinitialize
413 valid, err := ccp.isValid(meta)
414 if err != nil || !valid {
415 if err = ccp.prepare(meta, srcBucket, srcObjectKey, descBucket, destObjectKey, partSize, options); err != nil {
418 os.Remove(cpFilePath)
422 parts := ccp.todoParts()
423 imur := InitiateMultipartUploadResult{
424 Bucket: destBucketName,
426 UploadID: ccp.CopyID}
428 jobs := make(chan copyPart, len(parts))
429 results := make(chan UploadPart, len(parts))
430 failed := make(chan error)
431 die := make(chan bool)
433 completedBytes := ccp.getCompletedBytes()
434 event := newProgressEvent(TransferStartedEvent, completedBytes, ccp.ObjStat.Size, 0)
435 publishProgress(listener, event)
437 // Start the worker coroutines
438 arg := copyWorkerArg{descBucket, imur, srcBucketName, srcObjectKey, partOptions, copyPartHooker}
439 for w := 1; w <= routines; w++ {
440 go copyWorker(w, arg, jobs, results, failed, die)
443 // Start the scheduler
444 go copyScheduler(jobs, parts)
446 // Wait for the parts completed.
448 for completed < len(parts) {
450 case part := <-results:
454 copyBytes := (parts[part.PartNumber-1].End - parts[part.PartNumber-1].Start + 1)
455 completedBytes += copyBytes
456 event = newProgressEvent(TransferDataEvent, completedBytes, ccp.ObjStat.Size, copyBytes)
457 publishProgress(listener, event)
458 case err := <-failed:
460 event = newProgressEvent(TransferFailedEvent, completedBytes, ccp.ObjStat.Size, 0)
461 publishProgress(listener, event)
465 if completed >= len(parts) {
470 event = newProgressEvent(TransferCompletedEvent, completedBytes, ccp.ObjStat.Size, 0)
471 publishProgress(listener, event)
473 return ccp.complete(descBucket, ccp.CopyParts, cpFilePath, completeOptions)