OSDN Git Service

Create ossClient.go (#574)
[bytom/vapor.git] / vendor / github.com / aliyun / aliyun-oss-go-sdk / oss / upload.go
1 package oss
2
3 import (
4         "crypto/md5"
5         "encoding/base64"
6         "encoding/hex"
7         "encoding/json"
8         "errors"
9         "fmt"
10         "io/ioutil"
11         "net/http"
12         "os"
13         "path/filepath"
14         "time"
15 )
16
17 // UploadFile is multipart file upload.
18 //
19 // objectKey    the object name.
20 // filePath    the local file path to upload.
21 // partSize    the part size in byte.
22 // options    the options for uploading object.
23 //
24 // error    it's nil if the operation succeeds, otherwise it's an error object.
25 //
26 func (bucket Bucket) UploadFile(objectKey, filePath string, partSize int64, options ...Option) error {
27         if partSize < MinPartSize || partSize > MaxPartSize {
28                 return errors.New("oss: part size invalid range (100KB, 5GB]")
29         }
30
31         cpConf := getCpConfig(options)
32         routines := getRoutines(options)
33
34         if cpConf != nil && cpConf.IsEnable {
35                 cpFilePath := getUploadCpFilePath(cpConf, filePath, bucket.BucketName, objectKey)
36                 if cpFilePath != "" {
37                         return bucket.uploadFileWithCp(objectKey, filePath, partSize, options, cpFilePath, routines)
38                 }
39         }
40
41         return bucket.uploadFile(objectKey, filePath, partSize, options, routines)
42 }
43
44 func getUploadCpFilePath(cpConf *cpConfig, srcFile, destBucket, destObject string) string {
45         if cpConf.FilePath == "" && cpConf.DirPath != "" {
46                 dest := fmt.Sprintf("oss://%v/%v", destBucket, destObject)
47                 absPath, _ := filepath.Abs(srcFile)
48                 cpFileName := getCpFileName(absPath, dest, "")
49                 cpConf.FilePath = cpConf.DirPath + string(os.PathSeparator) + cpFileName
50         }
51         return cpConf.FilePath
52 }
53
54 // ----- concurrent upload without checkpoint  -----
55
56 // getCpConfig gets checkpoint configuration
57 func getCpConfig(options []Option) *cpConfig {
58         cpcOpt, err := FindOption(options, checkpointConfig, nil)
59         if err != nil || cpcOpt == nil {
60                 return nil
61         }
62
63         return cpcOpt.(*cpConfig)
64 }
65
66 // getCpFileName return the name of the checkpoint file
67 func getCpFileName(src, dest, versionId string) string {
68         md5Ctx := md5.New()
69         md5Ctx.Write([]byte(src))
70         srcCheckSum := hex.EncodeToString(md5Ctx.Sum(nil))
71
72         md5Ctx.Reset()
73         md5Ctx.Write([]byte(dest))
74         destCheckSum := hex.EncodeToString(md5Ctx.Sum(nil))
75
76         if versionId == "" {
77                 return fmt.Sprintf("%v-%v.cp", srcCheckSum, destCheckSum)
78         }
79
80         md5Ctx.Reset()
81         md5Ctx.Write([]byte(versionId))
82         versionCheckSum := hex.EncodeToString(md5Ctx.Sum(nil))
83         return fmt.Sprintf("%v-%v-%v.cp", srcCheckSum, destCheckSum, versionCheckSum)
84 }
85
86 // getRoutines gets the routine count. by default it's 1.
87 func getRoutines(options []Option) int {
88         rtnOpt, err := FindOption(options, routineNum, nil)
89         if err != nil || rtnOpt == nil {
90                 return 1
91         }
92
93         rs := rtnOpt.(int)
94         if rs < 1 {
95                 rs = 1
96         } else if rs > 100 {
97                 rs = 100
98         }
99
100         return rs
101 }
102
103 // getPayer return the payer of the request
104 func getPayer(options []Option) string {
105         payerOpt, err := FindOption(options, HTTPHeaderOssRequester, nil)
106         if err != nil || payerOpt == nil {
107                 return ""
108         }
109         return payerOpt.(string)
110 }
111
112 // GetProgressListener gets the progress callback
113 func GetProgressListener(options []Option) ProgressListener {
114         isSet, listener, _ := IsOptionSet(options, progressListener)
115         if !isSet {
116                 return nil
117         }
118         return listener.(ProgressListener)
119 }
120
121 // uploadPartHook is for testing usage
122 type uploadPartHook func(id int, chunk FileChunk) error
123
124 var uploadPartHooker uploadPartHook = defaultUploadPart
125
126 func defaultUploadPart(id int, chunk FileChunk) error {
127         return nil
128 }
129
130 // workerArg defines worker argument structure
131 type workerArg struct {
132         bucket   *Bucket
133         filePath string
134         imur     InitiateMultipartUploadResult
135         options  []Option
136         hook     uploadPartHook
137 }
138
139 // worker is the worker coroutine function
140 type defaultUploadProgressListener struct {
141 }
142
143 // ProgressChanged no-ops
144 func (listener *defaultUploadProgressListener) ProgressChanged(event *ProgressEvent) {
145 }
146
147 func worker(id int, arg workerArg, jobs <-chan FileChunk, results chan<- UploadPart, failed chan<- error, die <-chan bool) {
148         for chunk := range jobs {
149                 if err := arg.hook(id, chunk); err != nil {
150                         failed <- err
151                         break
152                 }
153                 var respHeader http.Header
154                 p := Progress(&defaultUploadProgressListener{})
155                 opts := make([]Option, len(arg.options)+2)
156                 opts = append(opts, arg.options...)
157
158                 // use defaultUploadProgressListener
159                 opts = append(opts, p, GetResponseHeader(&respHeader))
160
161                 startT := time.Now().UnixNano() / 1000 / 1000 / 1000
162                 part, err := arg.bucket.UploadPartFromFile(arg.imur, arg.filePath, chunk.Offset, chunk.Size, chunk.Number, opts...)
163                 endT := time.Now().UnixNano() / 1000 / 1000 / 1000
164                 if err != nil {
165                         arg.bucket.Client.Config.WriteLog(Debug, "upload part error,cost:%d second,part number:%d,request id:%s,error:%s\n", endT-startT, chunk.Number, GetRequestId(respHeader), err.Error())
166                         failed <- err
167                         break
168                 }
169                 select {
170                 case <-die:
171                         return
172                 default:
173                 }
174                 results <- part
175         }
176 }
177
178 // scheduler function
179 func scheduler(jobs chan FileChunk, chunks []FileChunk) {
180         for _, chunk := range chunks {
181                 jobs <- chunk
182         }
183         close(jobs)
184 }
185
186 func getTotalBytes(chunks []FileChunk) int64 {
187         var tb int64
188         for _, chunk := range chunks {
189                 tb += chunk.Size
190         }
191         return tb
192 }
193
194 // uploadFile is a concurrent upload, without checkpoint
195 func (bucket Bucket) uploadFile(objectKey, filePath string, partSize int64, options []Option, routines int) error {
196         listener := GetProgressListener(options)
197
198         chunks, err := SplitFileByPartSize(filePath, partSize)
199         if err != nil {
200                 return err
201         }
202
203         partOptions := ChoiceTransferPartOption(options)
204         completeOptions := ChoiceCompletePartOption(options)
205         abortOptions := ChoiceAbortPartOption(options)
206
207         // Initialize the multipart upload
208         imur, err := bucket.InitiateMultipartUpload(objectKey, options...)
209         if err != nil {
210                 return err
211         }
212
213         jobs := make(chan FileChunk, len(chunks))
214         results := make(chan UploadPart, len(chunks))
215         failed := make(chan error)
216         die := make(chan bool)
217
218         var completedBytes int64
219         totalBytes := getTotalBytes(chunks)
220         event := newProgressEvent(TransferStartedEvent, 0, totalBytes, 0)
221         publishProgress(listener, event)
222
223         // Start the worker coroutine
224         arg := workerArg{&bucket, filePath, imur, partOptions, uploadPartHooker}
225         for w := 1; w <= routines; w++ {
226                 go worker(w, arg, jobs, results, failed, die)
227         }
228
229         // Schedule the jobs
230         go scheduler(jobs, chunks)
231
232         // Waiting for the upload finished
233         completed := 0
234         parts := make([]UploadPart, len(chunks))
235         for completed < len(chunks) {
236                 select {
237                 case part := <-results:
238                         completed++
239                         parts[part.PartNumber-1] = part
240                         completedBytes += chunks[part.PartNumber-1].Size
241
242                         // why RwBytes in ProgressEvent is 0 ?
243                         // because read or write event has been notified in teeReader.Read()
244                         event = newProgressEvent(TransferDataEvent, completedBytes, totalBytes, chunks[part.PartNumber-1].Size)
245                         publishProgress(listener, event)
246                 case err := <-failed:
247                         close(die)
248                         event = newProgressEvent(TransferFailedEvent, completedBytes, totalBytes, 0)
249                         publishProgress(listener, event)
250                         bucket.AbortMultipartUpload(imur, abortOptions...)
251                         return err
252                 }
253
254                 if completed >= len(chunks) {
255                         break
256                 }
257         }
258
259         event = newProgressEvent(TransferStartedEvent, completedBytes, totalBytes, 0)
260         publishProgress(listener, event)
261
262         // Complete the multpart upload
263         _, err = bucket.CompleteMultipartUpload(imur, parts, completeOptions...)
264         if err != nil {
265                 bucket.AbortMultipartUpload(imur, abortOptions...)
266                 return err
267         }
268         return nil
269 }
270
271 // ----- concurrent upload with checkpoint  -----
272 const uploadCpMagic = "FE8BB4EA-B593-4FAC-AD7A-2459A36E2E62"
273
274 type uploadCheckpoint struct {
275         Magic     string   // Magic
276         MD5       string   // Checkpoint file content's MD5
277         FilePath  string   // Local file path
278         FileStat  cpStat   // File state
279         ObjectKey string   // Key
280         UploadID  string   // Upload ID
281         Parts     []cpPart // All parts of the local file
282 }
283
284 type cpStat struct {
285         Size         int64     // File size
286         LastModified time.Time // File's last modified time
287         MD5          string    // Local file's MD5
288 }
289
290 type cpPart struct {
291         Chunk       FileChunk  // File chunk
292         Part        UploadPart // Uploaded part
293         IsCompleted bool       // Upload complete flag
294 }
295
296 // isValid checks if the uploaded data is valid---it's valid when the file is not updated and the checkpoint data is valid.
297 func (cp uploadCheckpoint) isValid(filePath string) (bool, error) {
298         // Compare the CP's magic number and MD5.
299         cpb := cp
300         cpb.MD5 = ""
301         js, _ := json.Marshal(cpb)
302         sum := md5.Sum(js)
303         b64 := base64.StdEncoding.EncodeToString(sum[:])
304
305         if cp.Magic != uploadCpMagic || b64 != cp.MD5 {
306                 return false, nil
307         }
308
309         // Make sure if the local file is updated.
310         fd, err := os.Open(filePath)
311         if err != nil {
312                 return false, err
313         }
314         defer fd.Close()
315
316         st, err := fd.Stat()
317         if err != nil {
318                 return false, err
319         }
320
321         md, err := calcFileMD5(filePath)
322         if err != nil {
323                 return false, err
324         }
325
326         // Compare the file size, file's last modified time and file's MD5
327         if cp.FileStat.Size != st.Size() ||
328                 !cp.FileStat.LastModified.Equal(st.ModTime()) ||
329                 cp.FileStat.MD5 != md {
330                 return false, nil
331         }
332
333         return true, nil
334 }
335
336 // load loads from the file
337 func (cp *uploadCheckpoint) load(filePath string) error {
338         contents, err := ioutil.ReadFile(filePath)
339         if err != nil {
340                 return err
341         }
342
343         err = json.Unmarshal(contents, cp)
344         return err
345 }
346
347 // dump dumps to the local file
348 func (cp *uploadCheckpoint) dump(filePath string) error {
349         bcp := *cp
350
351         // Calculate MD5
352         bcp.MD5 = ""
353         js, err := json.Marshal(bcp)
354         if err != nil {
355                 return err
356         }
357         sum := md5.Sum(js)
358         b64 := base64.StdEncoding.EncodeToString(sum[:])
359         bcp.MD5 = b64
360
361         // Serialization
362         js, err = json.Marshal(bcp)
363         if err != nil {
364                 return err
365         }
366
367         // Dump
368         return ioutil.WriteFile(filePath, js, FilePermMode)
369 }
370
371 // updatePart updates the part status
372 func (cp *uploadCheckpoint) updatePart(part UploadPart) {
373         cp.Parts[part.PartNumber-1].Part = part
374         cp.Parts[part.PartNumber-1].IsCompleted = true
375 }
376
377 // todoParts returns unfinished parts
378 func (cp *uploadCheckpoint) todoParts() []FileChunk {
379         fcs := []FileChunk{}
380         for _, part := range cp.Parts {
381                 if !part.IsCompleted {
382                         fcs = append(fcs, part.Chunk)
383                 }
384         }
385         return fcs
386 }
387
388 // allParts returns all parts
389 func (cp *uploadCheckpoint) allParts() []UploadPart {
390         ps := []UploadPart{}
391         for _, part := range cp.Parts {
392                 ps = append(ps, part.Part)
393         }
394         return ps
395 }
396
397 // getCompletedBytes returns completed bytes count
398 func (cp *uploadCheckpoint) getCompletedBytes() int64 {
399         var completedBytes int64
400         for _, part := range cp.Parts {
401                 if part.IsCompleted {
402                         completedBytes += part.Chunk.Size
403                 }
404         }
405         return completedBytes
406 }
407
408 // calcFileMD5 calculates the MD5 for the specified local file
409 func calcFileMD5(filePath string) (string, error) {
410         return "", nil
411 }
412
413 // prepare initializes the multipart upload
414 func prepare(cp *uploadCheckpoint, objectKey, filePath string, partSize int64, bucket *Bucket, options []Option) error {
415         // CP
416         cp.Magic = uploadCpMagic
417         cp.FilePath = filePath
418         cp.ObjectKey = objectKey
419
420         // Local file
421         fd, err := os.Open(filePath)
422         if err != nil {
423                 return err
424         }
425         defer fd.Close()
426
427         st, err := fd.Stat()
428         if err != nil {
429                 return err
430         }
431         cp.FileStat.Size = st.Size()
432         cp.FileStat.LastModified = st.ModTime()
433         md, err := calcFileMD5(filePath)
434         if err != nil {
435                 return err
436         }
437         cp.FileStat.MD5 = md
438
439         // Chunks
440         parts, err := SplitFileByPartSize(filePath, partSize)
441         if err != nil {
442                 return err
443         }
444
445         cp.Parts = make([]cpPart, len(parts))
446         for i, part := range parts {
447                 cp.Parts[i].Chunk = part
448                 cp.Parts[i].IsCompleted = false
449         }
450
451         // Init load
452         imur, err := bucket.InitiateMultipartUpload(objectKey, options...)
453         if err != nil {
454                 return err
455         }
456         cp.UploadID = imur.UploadID
457
458         return nil
459 }
460
461 // complete completes the multipart upload and deletes the local CP files
462 func complete(cp *uploadCheckpoint, bucket *Bucket, parts []UploadPart, cpFilePath string, options []Option) error {
463         imur := InitiateMultipartUploadResult{Bucket: bucket.BucketName,
464                 Key: cp.ObjectKey, UploadID: cp.UploadID}
465         _, err := bucket.CompleteMultipartUpload(imur, parts, options...)
466         if err != nil {
467                 return err
468         }
469         os.Remove(cpFilePath)
470         return err
471 }
472
473 // uploadFileWithCp handles concurrent upload with checkpoint
474 func (bucket Bucket) uploadFileWithCp(objectKey, filePath string, partSize int64, options []Option, cpFilePath string, routines int) error {
475         listener := GetProgressListener(options)
476
477         partOptions := ChoiceTransferPartOption(options)
478         completeOptions := ChoiceCompletePartOption(options)
479
480         // Load CP data
481         ucp := uploadCheckpoint{}
482         err := ucp.load(cpFilePath)
483         if err != nil {
484                 os.Remove(cpFilePath)
485         }
486
487         // Load error or the CP data is invalid.
488         valid, err := ucp.isValid(filePath)
489         if err != nil || !valid {
490                 if err = prepare(&ucp, objectKey, filePath, partSize, &bucket, options); err != nil {
491                         return err
492                 }
493                 os.Remove(cpFilePath)
494         }
495
496         chunks := ucp.todoParts()
497         imur := InitiateMultipartUploadResult{
498                 Bucket:   bucket.BucketName,
499                 Key:      objectKey,
500                 UploadID: ucp.UploadID}
501
502         jobs := make(chan FileChunk, len(chunks))
503         results := make(chan UploadPart, len(chunks))
504         failed := make(chan error)
505         die := make(chan bool)
506
507         completedBytes := ucp.getCompletedBytes()
508
509         // why RwBytes in ProgressEvent is 0 ?
510         // because read or write event has been notified in teeReader.Read()
511         event := newProgressEvent(TransferStartedEvent, completedBytes, ucp.FileStat.Size, 0)
512         publishProgress(listener, event)
513
514         // Start the workers
515         arg := workerArg{&bucket, filePath, imur, partOptions, uploadPartHooker}
516         for w := 1; w <= routines; w++ {
517                 go worker(w, arg, jobs, results, failed, die)
518         }
519
520         // Schedule jobs
521         go scheduler(jobs, chunks)
522
523         // Waiting for the job finished
524         completed := 0
525         for completed < len(chunks) {
526                 select {
527                 case part := <-results:
528                         completed++
529                         ucp.updatePart(part)
530                         ucp.dump(cpFilePath)
531                         completedBytes += ucp.Parts[part.PartNumber-1].Chunk.Size
532                         event = newProgressEvent(TransferDataEvent, completedBytes, ucp.FileStat.Size, ucp.Parts[part.PartNumber-1].Chunk.Size)
533                         publishProgress(listener, event)
534                 case err := <-failed:
535                         close(die)
536                         event = newProgressEvent(TransferFailedEvent, completedBytes, ucp.FileStat.Size, 0)
537                         publishProgress(listener, event)
538                         return err
539                 }
540
541                 if completed >= len(chunks) {
542                         break
543                 }
544         }
545
546         event = newProgressEvent(TransferCompletedEvent, completedBytes, ucp.FileStat.Size, 0)
547         publishProgress(listener, event)
548
549         // Complete the multipart upload
550         err = complete(&ucp, &bucket, ucp.allParts(), cpFilePath, completeOptions)
551         return err
552 }