OSDN Git Service

Create ossClient.go (#574)
[bytom/vapor.git] / vendor / github.com / aliyun / aliyun-oss-go-sdk / oss / multicopy.go
1 package oss
2
3 import (
4         "crypto/md5"
5         "encoding/base64"
6         "encoding/json"
7         "errors"
8         "fmt"
9         "io/ioutil"
10         "net/http"
11         "os"
12         "strconv"
13 )
14
15 // CopyFile is multipart copy object
16 //
17 // srcBucketName    source bucket name
18 // srcObjectKey    source object name
19 // destObjectKey    target object name in the form of bucketname.objectkey
20 // partSize    the part size in byte.
21 // options    object's contraints. Check out function InitiateMultipartUpload.
22 //
23 // error    it's nil if the operation succeeds, otherwise it's an error object.
24 //
25 func (bucket Bucket) CopyFile(srcBucketName, srcObjectKey, destObjectKey string, partSize int64, options ...Option) error {
26         destBucketName := bucket.BucketName
27         if partSize < MinPartSize || partSize > MaxPartSize {
28                 return errors.New("oss: part size invalid range (1024KB, 5GB]")
29         }
30
31         cpConf := getCpConfig(options)
32         routines := getRoutines(options)
33
34         var strVersionId string
35         versionId, _ := FindOption(options, "versionId", nil)
36         if versionId != nil {
37                 strVersionId = versionId.(string)
38         }
39
40         if cpConf != nil && cpConf.IsEnable {
41                 cpFilePath := getCopyCpFilePath(cpConf, srcBucketName, srcObjectKey, destBucketName, destObjectKey, strVersionId)
42                 if cpFilePath != "" {
43                         return bucket.copyFileWithCp(srcBucketName, srcObjectKey, destBucketName, destObjectKey, partSize, options, cpFilePath, routines)
44                 }
45         }
46
47         return bucket.copyFile(srcBucketName, srcObjectKey, destBucketName, destObjectKey,
48                 partSize, options, routines)
49 }
50
51 func getCopyCpFilePath(cpConf *cpConfig, srcBucket, srcObject, destBucket, destObject, versionId string) string {
52         if cpConf.FilePath == "" && cpConf.DirPath != "" {
53                 dest := fmt.Sprintf("oss://%v/%v", destBucket, destObject)
54                 src := fmt.Sprintf("oss://%v/%v", srcBucket, srcObject)
55                 cpFileName := getCpFileName(src, dest, versionId)
56                 cpConf.FilePath = cpConf.DirPath + string(os.PathSeparator) + cpFileName
57         }
58         return cpConf.FilePath
59 }
60
61 // ----- Concurrently copy without checkpoint ---------
62
63 // copyWorkerArg defines the copy worker arguments
64 type copyWorkerArg struct {
65         bucket        *Bucket
66         imur          InitiateMultipartUploadResult
67         srcBucketName string
68         srcObjectKey  string
69         options       []Option
70         hook          copyPartHook
71 }
72
73 // copyPartHook is the hook for testing purpose
74 type copyPartHook func(part copyPart) error
75
76 var copyPartHooker copyPartHook = defaultCopyPartHook
77
78 func defaultCopyPartHook(part copyPart) error {
79         return nil
80 }
81
82 // copyWorker copies worker
83 func copyWorker(id int, arg copyWorkerArg, jobs <-chan copyPart, results chan<- UploadPart, failed chan<- error, die <-chan bool) {
84         for chunk := range jobs {
85                 if err := arg.hook(chunk); err != nil {
86                         failed <- err
87                         break
88                 }
89                 chunkSize := chunk.End - chunk.Start + 1
90                 part, err := arg.bucket.UploadPartCopy(arg.imur, arg.srcBucketName, arg.srcObjectKey,
91                         chunk.Start, chunkSize, chunk.Number, arg.options...)
92                 if err != nil {
93                         failed <- err
94                         break
95                 }
96                 select {
97                 case <-die:
98                         return
99                 default:
100                 }
101                 results <- part
102         }
103 }
104
105 // copyScheduler
106 func copyScheduler(jobs chan copyPart, parts []copyPart) {
107         for _, part := range parts {
108                 jobs <- part
109         }
110         close(jobs)
111 }
112
113 // copyPart structure
114 type copyPart struct {
115         Number int   // Part number (from 1 to 10,000)
116         Start  int64 // The start index in the source file.
117         End    int64 // The end index in the source file
118 }
119
120 // getCopyParts calculates copy parts
121 func getCopyParts(objectSize, partSize int64) []copyPart {
122         parts := []copyPart{}
123         part := copyPart{}
124         i := 0
125         for offset := int64(0); offset < objectSize; offset += partSize {
126                 part.Number = i + 1
127                 part.Start = offset
128                 part.End = GetPartEnd(offset, objectSize, partSize)
129                 parts = append(parts, part)
130                 i++
131         }
132         return parts
133 }
134
135 // getSrcObjectBytes gets the source file size
136 func getSrcObjectBytes(parts []copyPart) int64 {
137         var ob int64
138         for _, part := range parts {
139                 ob += (part.End - part.Start + 1)
140         }
141         return ob
142 }
143
144 // copyFile is a concurrently copy without checkpoint
145 func (bucket Bucket) copyFile(srcBucketName, srcObjectKey, destBucketName, destObjectKey string,
146         partSize int64, options []Option, routines int) error {
147         descBucket, err := bucket.Client.Bucket(destBucketName)
148         srcBucket, err := bucket.Client.Bucket(srcBucketName)
149         listener := GetProgressListener(options)
150
151         // choice valid options
152         headerOptions := ChoiceHeadObjectOption(options)
153         partOptions := ChoiceTransferPartOption(options)
154         completeOptions := ChoiceCompletePartOption(options)
155         abortOptions := ChoiceAbortPartOption(options)
156
157         meta, err := srcBucket.GetObjectDetailedMeta(srcObjectKey, headerOptions...)
158         if err != nil {
159                 return err
160         }
161
162         objectSize, err := strconv.ParseInt(meta.Get(HTTPHeaderContentLength), 10, 0)
163         if err != nil {
164                 return err
165         }
166
167         // Get copy parts
168         parts := getCopyParts(objectSize, partSize)
169         // Initialize the multipart upload
170         imur, err := descBucket.InitiateMultipartUpload(destObjectKey, options...)
171         if err != nil {
172                 return err
173         }
174
175         jobs := make(chan copyPart, len(parts))
176         results := make(chan UploadPart, len(parts))
177         failed := make(chan error)
178         die := make(chan bool)
179
180         var completedBytes int64
181         totalBytes := getSrcObjectBytes(parts)
182         event := newProgressEvent(TransferStartedEvent, 0, totalBytes, 0)
183         publishProgress(listener, event)
184
185         // Start to copy workers
186         arg := copyWorkerArg{descBucket, imur, srcBucketName, srcObjectKey, partOptions, copyPartHooker}
187         for w := 1; w <= routines; w++ {
188                 go copyWorker(w, arg, jobs, results, failed, die)
189         }
190
191         // Start the scheduler
192         go copyScheduler(jobs, parts)
193
194         // Wait for the parts finished.
195         completed := 0
196         ups := make([]UploadPart, len(parts))
197         for completed < len(parts) {
198                 select {
199                 case part := <-results:
200                         completed++
201                         ups[part.PartNumber-1] = part
202                         copyBytes := (parts[part.PartNumber-1].End - parts[part.PartNumber-1].Start + 1)
203                         completedBytes += copyBytes
204                         event = newProgressEvent(TransferDataEvent, completedBytes, totalBytes, copyBytes)
205                         publishProgress(listener, event)
206                 case err := <-failed:
207                         close(die)
208                         descBucket.AbortMultipartUpload(imur, abortOptions...)
209                         event = newProgressEvent(TransferFailedEvent, completedBytes, totalBytes, 0)
210                         publishProgress(listener, event)
211                         return err
212                 }
213
214                 if completed >= len(parts) {
215                         break
216                 }
217         }
218
219         event = newProgressEvent(TransferCompletedEvent, completedBytes, totalBytes, 0)
220         publishProgress(listener, event)
221
222         // Complete the multipart upload
223         _, err = descBucket.CompleteMultipartUpload(imur, ups, completeOptions...)
224         if err != nil {
225                 bucket.AbortMultipartUpload(imur, abortOptions...)
226                 return err
227         }
228         return nil
229 }
230
231 // ----- Concurrently copy with checkpoint  -----
232
233 const copyCpMagic = "84F1F18C-FF1D-403B-A1D8-9DEB5F65910A"
234
235 type copyCheckpoint struct {
236         Magic          string       // Magic
237         MD5            string       // CP content MD5
238         SrcBucketName  string       // Source bucket
239         SrcObjectKey   string       // Source object
240         DestBucketName string       // Target bucket
241         DestObjectKey  string       // Target object
242         CopyID         string       // Copy ID
243         ObjStat        objectStat   // Object stat
244         Parts          []copyPart   // Copy parts
245         CopyParts      []UploadPart // The uploaded parts
246         PartStat       []bool       // The part status
247 }
248
249 // isValid checks if the data is valid which means CP is valid and object is not updated.
250 func (cp copyCheckpoint) isValid(meta http.Header) (bool, error) {
251         // Compare CP's magic number and the MD5.
252         cpb := cp
253         cpb.MD5 = ""
254         js, _ := json.Marshal(cpb)
255         sum := md5.Sum(js)
256         b64 := base64.StdEncoding.EncodeToString(sum[:])
257
258         if cp.Magic != downloadCpMagic || b64 != cp.MD5 {
259                 return false, nil
260         }
261
262         objectSize, err := strconv.ParseInt(meta.Get(HTTPHeaderContentLength), 10, 64)
263         if err != nil {
264                 return false, err
265         }
266
267         // Compare the object size and last modified time and etag.
268         if cp.ObjStat.Size != objectSize ||
269                 cp.ObjStat.LastModified != meta.Get(HTTPHeaderLastModified) ||
270                 cp.ObjStat.Etag != meta.Get(HTTPHeaderEtag) {
271                 return false, nil
272         }
273
274         return true, nil
275 }
276
277 // load loads from the checkpoint file
278 func (cp *copyCheckpoint) load(filePath string) error {
279         contents, err := ioutil.ReadFile(filePath)
280         if err != nil {
281                 return err
282         }
283
284         err = json.Unmarshal(contents, cp)
285         return err
286 }
287
288 // update updates the parts status
289 func (cp *copyCheckpoint) update(part UploadPart) {
290         cp.CopyParts[part.PartNumber-1] = part
291         cp.PartStat[part.PartNumber-1] = true
292 }
293
294 // dump dumps the CP to the file
295 func (cp *copyCheckpoint) dump(filePath string) error {
296         bcp := *cp
297
298         // Calculate MD5
299         bcp.MD5 = ""
300         js, err := json.Marshal(bcp)
301         if err != nil {
302                 return err
303         }
304         sum := md5.Sum(js)
305         b64 := base64.StdEncoding.EncodeToString(sum[:])
306         bcp.MD5 = b64
307
308         // Serialization
309         js, err = json.Marshal(bcp)
310         if err != nil {
311                 return err
312         }
313
314         // Dump
315         return ioutil.WriteFile(filePath, js, FilePermMode)
316 }
317
318 // todoParts returns unfinished parts
319 func (cp copyCheckpoint) todoParts() []copyPart {
320         dps := []copyPart{}
321         for i, ps := range cp.PartStat {
322                 if !ps {
323                         dps = append(dps, cp.Parts[i])
324                 }
325         }
326         return dps
327 }
328
329 // getCompletedBytes returns finished bytes count
330 func (cp copyCheckpoint) getCompletedBytes() int64 {
331         var completedBytes int64
332         for i, part := range cp.Parts {
333                 if cp.PartStat[i] {
334                         completedBytes += (part.End - part.Start + 1)
335                 }
336         }
337         return completedBytes
338 }
339
340 // prepare initializes the multipart upload
341 func (cp *copyCheckpoint) prepare(meta http.Header, srcBucket *Bucket, srcObjectKey string, destBucket *Bucket, destObjectKey string,
342         partSize int64, options []Option) error {
343         // CP
344         cp.Magic = copyCpMagic
345         cp.SrcBucketName = srcBucket.BucketName
346         cp.SrcObjectKey = srcObjectKey
347         cp.DestBucketName = destBucket.BucketName
348         cp.DestObjectKey = destObjectKey
349
350         objectSize, err := strconv.ParseInt(meta.Get(HTTPHeaderContentLength), 10, 64)
351         if err != nil {
352                 return err
353         }
354
355         cp.ObjStat.Size = objectSize
356         cp.ObjStat.LastModified = meta.Get(HTTPHeaderLastModified)
357         cp.ObjStat.Etag = meta.Get(HTTPHeaderEtag)
358
359         // Parts
360         cp.Parts = getCopyParts(objectSize, partSize)
361         cp.PartStat = make([]bool, len(cp.Parts))
362         for i := range cp.PartStat {
363                 cp.PartStat[i] = false
364         }
365         cp.CopyParts = make([]UploadPart, len(cp.Parts))
366
367         // Init copy
368         imur, err := destBucket.InitiateMultipartUpload(destObjectKey, options...)
369         if err != nil {
370                 return err
371         }
372         cp.CopyID = imur.UploadID
373
374         return nil
375 }
376
377 func (cp *copyCheckpoint) complete(bucket *Bucket, parts []UploadPart, cpFilePath string, options []Option) error {
378         imur := InitiateMultipartUploadResult{Bucket: cp.DestBucketName,
379                 Key: cp.DestObjectKey, UploadID: cp.CopyID}
380         _, err := bucket.CompleteMultipartUpload(imur, parts, options...)
381         if err != nil {
382                 return err
383         }
384         os.Remove(cpFilePath)
385         return err
386 }
387
388 // copyFileWithCp is concurrently copy with checkpoint
389 func (bucket Bucket) copyFileWithCp(srcBucketName, srcObjectKey, destBucketName, destObjectKey string,
390         partSize int64, options []Option, cpFilePath string, routines int) error {
391         descBucket, err := bucket.Client.Bucket(destBucketName)
392         srcBucket, err := bucket.Client.Bucket(srcBucketName)
393         listener := GetProgressListener(options)
394
395         // Load CP data
396         ccp := copyCheckpoint{}
397         err = ccp.load(cpFilePath)
398         if err != nil {
399                 os.Remove(cpFilePath)
400         }
401
402         // choice valid options
403         headerOptions := ChoiceHeadObjectOption(options)
404         partOptions := ChoiceTransferPartOption(options)
405         completeOptions := ChoiceCompletePartOption(options)
406
407         meta, err := srcBucket.GetObjectDetailedMeta(srcObjectKey, headerOptions...)
408         if err != nil {
409                 return err
410         }
411
412         // Load error or the CP data is invalid---reinitialize
413         valid, err := ccp.isValid(meta)
414         if err != nil || !valid {
415                 if err = ccp.prepare(meta, srcBucket, srcObjectKey, descBucket, destObjectKey, partSize, options); err != nil {
416                         return err
417                 }
418                 os.Remove(cpFilePath)
419         }
420
421         // Unfinished parts
422         parts := ccp.todoParts()
423         imur := InitiateMultipartUploadResult{
424                 Bucket:   destBucketName,
425                 Key:      destObjectKey,
426                 UploadID: ccp.CopyID}
427
428         jobs := make(chan copyPart, len(parts))
429         results := make(chan UploadPart, len(parts))
430         failed := make(chan error)
431         die := make(chan bool)
432
433         completedBytes := ccp.getCompletedBytes()
434         event := newProgressEvent(TransferStartedEvent, completedBytes, ccp.ObjStat.Size, 0)
435         publishProgress(listener, event)
436
437         // Start the worker coroutines
438         arg := copyWorkerArg{descBucket, imur, srcBucketName, srcObjectKey, partOptions, copyPartHooker}
439         for w := 1; w <= routines; w++ {
440                 go copyWorker(w, arg, jobs, results, failed, die)
441         }
442
443         // Start the scheduler
444         go copyScheduler(jobs, parts)
445
446         // Wait for the parts completed.
447         completed := 0
448         for completed < len(parts) {
449                 select {
450                 case part := <-results:
451                         completed++
452                         ccp.update(part)
453                         ccp.dump(cpFilePath)
454                         copyBytes := (parts[part.PartNumber-1].End - parts[part.PartNumber-1].Start + 1)
455                         completedBytes += copyBytes
456                         event = newProgressEvent(TransferDataEvent, completedBytes, ccp.ObjStat.Size, copyBytes)
457                         publishProgress(listener, event)
458                 case err := <-failed:
459                         close(die)
460                         event = newProgressEvent(TransferFailedEvent, completedBytes, ccp.ObjStat.Size, 0)
461                         publishProgress(listener, event)
462                         return err
463                 }
464
465                 if completed >= len(parts) {
466                         break
467                 }
468         }
469
470         event = newProgressEvent(TransferCompletedEvent, completedBytes, ccp.ObjStat.Size, 0)
471         publishProgress(listener, event)
472
473         return ccp.complete(descBucket, ccp.CopyParts, cpFilePath, completeOptions)
474 }