OSDN Git Service

Create ossClient.go (#574)
[bytom/vapor.git] / vendor / github.com / aliyun / aliyun-oss-go-sdk / oss / download.go
1 package oss
2
3 import (
4         "crypto/md5"
5         "encoding/base64"
6         "encoding/json"
7         "errors"
8         "fmt"
9         "hash"
10         "hash/crc64"
11         "io"
12         "io/ioutil"
13         "net/http"
14         "os"
15         "path/filepath"
16         "strconv"
17         "time"
18 )
19
20 // DownloadFile downloads files with multipart download.
21 //
22 // objectKey    the object key.
23 // filePath    the local file to download from objectKey in OSS.
24 // partSize    the part size in bytes.
25 // options    object's constraints, check out GetObject for the reference.
26 //
27 // error    it's nil when the call succeeds, otherwise it's an error object.
28 //
29 func (bucket Bucket) DownloadFile(objectKey, filePath string, partSize int64, options ...Option) error {
30         if partSize < 1 {
31                 return errors.New("oss: part size smaller than 1")
32         }
33
34         uRange, err := GetRangeConfig(options)
35         if err != nil {
36                 return err
37         }
38
39         cpConf := getCpConfig(options)
40         routines := getRoutines(options)
41
42         var strVersionId string
43         versionId, _ := FindOption(options, "versionId", nil)
44         if versionId != nil {
45                 strVersionId = versionId.(string)
46         }
47
48         if cpConf != nil && cpConf.IsEnable {
49                 cpFilePath := getDownloadCpFilePath(cpConf, bucket.BucketName, objectKey, strVersionId, filePath)
50                 if cpFilePath != "" {
51                         return bucket.downloadFileWithCp(objectKey, filePath, partSize, options, cpFilePath, routines, uRange)
52                 }
53         }
54
55         return bucket.downloadFile(objectKey, filePath, partSize, options, routines, uRange)
56 }
57
58 func getDownloadCpFilePath(cpConf *cpConfig, srcBucket, srcObject, versionId, destFile string) string {
59         if cpConf.FilePath == "" && cpConf.DirPath != "" {
60                 src := fmt.Sprintf("oss://%v/%v", srcBucket, srcObject)
61                 absPath, _ := filepath.Abs(destFile)
62                 cpFileName := getCpFileName(src, absPath, versionId)
63                 cpConf.FilePath = cpConf.DirPath + string(os.PathSeparator) + cpFileName
64         }
65         return cpConf.FilePath
66 }
67
68 // downloadWorkerArg is download worker's parameters
69 type downloadWorkerArg struct {
70         bucket    *Bucket
71         key       string
72         filePath  string
73         options   []Option
74         hook      downloadPartHook
75         enableCRC bool
76 }
77
78 // downloadPartHook is hook for test
79 type downloadPartHook func(part downloadPart) error
80
81 var downloadPartHooker downloadPartHook = defaultDownloadPartHook
82
83 func defaultDownloadPartHook(part downloadPart) error {
84         return nil
85 }
86
87 // defaultDownloadProgressListener defines default ProgressListener, shields the ProgressListener in options of GetObject.
88 type defaultDownloadProgressListener struct {
89 }
90
91 // ProgressChanged no-ops
92 func (listener *defaultDownloadProgressListener) ProgressChanged(event *ProgressEvent) {
93 }
94
95 // downloadWorker
96 func downloadWorker(id int, arg downloadWorkerArg, jobs <-chan downloadPart, results chan<- downloadPart, failed chan<- error, die <-chan bool) {
97         for part := range jobs {
98                 if err := arg.hook(part); err != nil {
99                         failed <- err
100                         break
101                 }
102
103                 // Resolve options
104                 r := Range(part.Start, part.End)
105                 p := Progress(&defaultDownloadProgressListener{})
106
107                 var respHeader http.Header
108                 opts := make([]Option, len(arg.options)+3)
109                 // Append orderly, can not be reversed!
110                 opts = append(opts, arg.options...)
111                 opts = append(opts, r, p, GetResponseHeader(&respHeader))
112
113                 rd, err := arg.bucket.GetObject(arg.key, opts...)
114                 if err != nil {
115                         failed <- err
116                         break
117                 }
118                 defer rd.Close()
119
120                 var crcCalc hash.Hash64
121                 if arg.enableCRC {
122                         crcCalc = crc64.New(CrcTable())
123                         contentLen := part.End - part.Start + 1
124                         rd = ioutil.NopCloser(TeeReader(rd, crcCalc, contentLen, nil, nil))
125                 }
126                 defer rd.Close()
127
128                 select {
129                 case <-die:
130                         return
131                 default:
132                 }
133
134                 fd, err := os.OpenFile(arg.filePath, os.O_WRONLY, FilePermMode)
135                 if err != nil {
136                         failed <- err
137                         break
138                 }
139
140                 _, err = fd.Seek(part.Start-part.Offset, os.SEEK_SET)
141                 if err != nil {
142                         fd.Close()
143                         failed <- err
144                         break
145                 }
146
147                 startT := time.Now().UnixNano() / 1000 / 1000 / 1000
148                 _, err = io.Copy(fd, rd)
149                 endT := time.Now().UnixNano() / 1000 / 1000 / 1000
150                 if err != nil {
151                         arg.bucket.Client.Config.WriteLog(Debug, "download part error,cost:%d second,part number:%d,request id:%s,error:%s.\n", endT-startT, part.Index, GetRequestId(respHeader), err.Error())
152                         fd.Close()
153                         failed <- err
154                         break
155                 }
156
157                 if arg.enableCRC {
158                         part.CRC64 = crcCalc.Sum64()
159                 }
160
161                 fd.Close()
162                 results <- part
163         }
164 }
165
166 // downloadScheduler
167 func downloadScheduler(jobs chan downloadPart, parts []downloadPart) {
168         for _, part := range parts {
169                 jobs <- part
170         }
171         close(jobs)
172 }
173
174 // downloadPart defines download part
175 type downloadPart struct {
176         Index  int    // Part number, starting from 0
177         Start  int64  // Start index
178         End    int64  // End index
179         Offset int64  // Offset
180         CRC64  uint64 // CRC check value of part
181 }
182
183 // getDownloadParts gets download parts
184 func getDownloadParts(objectSize, partSize int64, uRange *UnpackedRange) []downloadPart {
185         parts := []downloadPart{}
186         part := downloadPart{}
187         i := 0
188         start, end := AdjustRange(uRange, objectSize)
189         for offset := start; offset < end; offset += partSize {
190                 part.Index = i
191                 part.Start = offset
192                 part.End = GetPartEnd(offset, end, partSize)
193                 part.Offset = start
194                 part.CRC64 = 0
195                 parts = append(parts, part)
196                 i++
197         }
198         return parts
199 }
200
201 // getObjectBytes gets object bytes length
202 func getObjectBytes(parts []downloadPart) int64 {
203         var ob int64
204         for _, part := range parts {
205                 ob += (part.End - part.Start + 1)
206         }
207         return ob
208 }
209
210 // combineCRCInParts caculates the total CRC of continuous parts
211 func combineCRCInParts(dps []downloadPart) uint64 {
212         if dps == nil || len(dps) == 0 {
213                 return 0
214         }
215
216         crc := dps[0].CRC64
217         for i := 1; i < len(dps); i++ {
218                 crc = CRC64Combine(crc, dps[i].CRC64, (uint64)(dps[i].End-dps[i].Start+1))
219         }
220
221         return crc
222 }
223
224 // downloadFile downloads file concurrently without checkpoint.
225 func (bucket Bucket) downloadFile(objectKey, filePath string, partSize int64, options []Option, routines int, uRange *UnpackedRange) error {
226         tempFilePath := filePath + TempFileSuffix
227         listener := GetProgressListener(options)
228
229         // If the file does not exist, create one. If exists, the download will overwrite it.
230         fd, err := os.OpenFile(tempFilePath, os.O_WRONLY|os.O_CREATE, FilePermMode)
231         if err != nil {
232                 return err
233         }
234         fd.Close()
235
236         // Get the object detailed meta for object whole size
237         // must delete header:range to get whole object size
238         skipOptions := DeleteOption(options, HTTPHeaderRange)
239         meta, err := bucket.GetObjectDetailedMeta(objectKey, skipOptions...)
240         if err != nil {
241                 return err
242         }
243
244         objectSize, err := strconv.ParseInt(meta.Get(HTTPHeaderContentLength), 10, 64)
245         if err != nil {
246                 return err
247         }
248
249         enableCRC := false
250         expectedCRC := (uint64)(0)
251         if bucket.GetConfig().IsEnableCRC && meta.Get(HTTPHeaderOssCRC64) != "" {
252                 if uRange == nil || (!uRange.HasStart && !uRange.HasEnd) {
253                         enableCRC = true
254                         expectedCRC, _ = strconv.ParseUint(meta.Get(HTTPHeaderOssCRC64), 10, 64)
255                 }
256         }
257
258         // Get the parts of the file
259         parts := getDownloadParts(objectSize, partSize, uRange)
260         jobs := make(chan downloadPart, len(parts))
261         results := make(chan downloadPart, len(parts))
262         failed := make(chan error)
263         die := make(chan bool)
264
265         var completedBytes int64
266         totalBytes := getObjectBytes(parts)
267         event := newProgressEvent(TransferStartedEvent, 0, totalBytes, 0)
268         publishProgress(listener, event)
269
270         // Start the download workers
271         arg := downloadWorkerArg{&bucket, objectKey, tempFilePath, options, downloadPartHooker, enableCRC}
272         for w := 1; w <= routines; w++ {
273                 go downloadWorker(w, arg, jobs, results, failed, die)
274         }
275
276         // Download parts concurrently
277         go downloadScheduler(jobs, parts)
278
279         // Waiting for parts download finished
280         completed := 0
281         for completed < len(parts) {
282                 select {
283                 case part := <-results:
284                         completed++
285                         downBytes := (part.End - part.Start + 1)
286                         completedBytes += downBytes
287                         parts[part.Index].CRC64 = part.CRC64
288                         event = newProgressEvent(TransferDataEvent, completedBytes, totalBytes, downBytes)
289                         publishProgress(listener, event)
290                 case err := <-failed:
291                         close(die)
292                         event = newProgressEvent(TransferFailedEvent, completedBytes, totalBytes, 0)
293                         publishProgress(listener, event)
294                         return err
295                 }
296
297                 if completed >= len(parts) {
298                         break
299                 }
300         }
301
302         event = newProgressEvent(TransferCompletedEvent, completedBytes, totalBytes, 0)
303         publishProgress(listener, event)
304
305         if enableCRC {
306                 actualCRC := combineCRCInParts(parts)
307                 err = CheckDownloadCRC(actualCRC, expectedCRC)
308                 if err != nil {
309                         return err
310                 }
311         }
312
313         return os.Rename(tempFilePath, filePath)
314 }
315
316 // ----- Concurrent download with chcekpoint  -----
317
318 const downloadCpMagic = "92611BED-89E2-46B6-89E5-72F273D4B0A3"
319
320 type downloadCheckpoint struct {
321         Magic     string         // Magic
322         MD5       string         // Checkpoint content MD5
323         FilePath  string         // Local file
324         Object    string         // Key
325         ObjStat   objectStat     // Object status
326         Parts     []downloadPart // All download parts
327         PartStat  []bool         // Parts' download status
328         Start     int64          // Start point of the file
329         End       int64          // End point of the file
330         enableCRC bool           // Whether has CRC check
331         CRC       uint64         // CRC check value
332 }
333
334 type objectStat struct {
335         Size         int64  // Object size
336         LastModified string // Last modified time
337         Etag         string // Etag
338 }
339
340 // isValid flags of checkpoint data is valid. It returns true when the data is valid and the checkpoint is valid and the object is not updated.
341 func (cp downloadCheckpoint) isValid(meta http.Header, uRange *UnpackedRange) (bool, error) {
342         // Compare the CP's Magic and the MD5
343         cpb := cp
344         cpb.MD5 = ""
345         js, _ := json.Marshal(cpb)
346         sum := md5.Sum(js)
347         b64 := base64.StdEncoding.EncodeToString(sum[:])
348
349         if cp.Magic != downloadCpMagic || b64 != cp.MD5 {
350                 return false, nil
351         }
352
353         objectSize, err := strconv.ParseInt(meta.Get(HTTPHeaderContentLength), 10, 64)
354         if err != nil {
355                 return false, err
356         }
357
358         // Compare the object size, last modified time and etag
359         if cp.ObjStat.Size != objectSize ||
360                 cp.ObjStat.LastModified != meta.Get(HTTPHeaderLastModified) ||
361                 cp.ObjStat.Etag != meta.Get(HTTPHeaderEtag) {
362                 return false, nil
363         }
364
365         // Check the download range
366         if uRange != nil {
367                 start, end := AdjustRange(uRange, objectSize)
368                 if start != cp.Start || end != cp.End {
369                         return false, nil
370                 }
371         }
372
373         return true, nil
374 }
375
376 // load checkpoint from local file
377 func (cp *downloadCheckpoint) load(filePath string) error {
378         contents, err := ioutil.ReadFile(filePath)
379         if err != nil {
380                 return err
381         }
382
383         err = json.Unmarshal(contents, cp)
384         return err
385 }
386
387 // dump funciton dumps to file
388 func (cp *downloadCheckpoint) dump(filePath string) error {
389         bcp := *cp
390
391         // Calculate MD5
392         bcp.MD5 = ""
393         js, err := json.Marshal(bcp)
394         if err != nil {
395                 return err
396         }
397         sum := md5.Sum(js)
398         b64 := base64.StdEncoding.EncodeToString(sum[:])
399         bcp.MD5 = b64
400
401         // Serialize
402         js, err = json.Marshal(bcp)
403         if err != nil {
404                 return err
405         }
406
407         // Dump
408         return ioutil.WriteFile(filePath, js, FilePermMode)
409 }
410
411 // todoParts gets unfinished parts
412 func (cp downloadCheckpoint) todoParts() []downloadPart {
413         dps := []downloadPart{}
414         for i, ps := range cp.PartStat {
415                 if !ps {
416                         dps = append(dps, cp.Parts[i])
417                 }
418         }
419         return dps
420 }
421
422 // getCompletedBytes gets completed size
423 func (cp downloadCheckpoint) getCompletedBytes() int64 {
424         var completedBytes int64
425         for i, part := range cp.Parts {
426                 if cp.PartStat[i] {
427                         completedBytes += (part.End - part.Start + 1)
428                 }
429         }
430         return completedBytes
431 }
432
433 // prepare initiates download tasks
434 func (cp *downloadCheckpoint) prepare(meta http.Header, bucket *Bucket, objectKey, filePath string, partSize int64, uRange *UnpackedRange) error {
435         // CP
436         cp.Magic = downloadCpMagic
437         cp.FilePath = filePath
438         cp.Object = objectKey
439
440         objectSize, err := strconv.ParseInt(meta.Get(HTTPHeaderContentLength), 10, 64)
441         if err != nil {
442                 return err
443         }
444
445         cp.ObjStat.Size = objectSize
446         cp.ObjStat.LastModified = meta.Get(HTTPHeaderLastModified)
447         cp.ObjStat.Etag = meta.Get(HTTPHeaderEtag)
448
449         if bucket.GetConfig().IsEnableCRC && meta.Get(HTTPHeaderOssCRC64) != "" {
450                 if uRange == nil || (!uRange.HasStart && !uRange.HasEnd) {
451                         cp.enableCRC = true
452                         cp.CRC, _ = strconv.ParseUint(meta.Get(HTTPHeaderOssCRC64), 10, 64)
453                 }
454         }
455
456         // Parts
457         cp.Parts = getDownloadParts(objectSize, partSize, uRange)
458         cp.PartStat = make([]bool, len(cp.Parts))
459         for i := range cp.PartStat {
460                 cp.PartStat[i] = false
461         }
462
463         return nil
464 }
465
466 func (cp *downloadCheckpoint) complete(cpFilePath, downFilepath string) error {
467         err := os.Rename(downFilepath, cp.FilePath)
468         if err != nil {
469                 return err
470         }
471         return os.Remove(cpFilePath)
472 }
473
474 // downloadFileWithCp downloads files with checkpoint.
475 func (bucket Bucket) downloadFileWithCp(objectKey, filePath string, partSize int64, options []Option, cpFilePath string, routines int, uRange *UnpackedRange) error {
476         tempFilePath := filePath + TempFileSuffix
477         listener := GetProgressListener(options)
478
479         // Load checkpoint data.
480         dcp := downloadCheckpoint{}
481         err := dcp.load(cpFilePath)
482         if err != nil {
483                 os.Remove(cpFilePath)
484         }
485
486         // Get the object detailed meta for object whole size
487         // must delete header:range to get whole object size
488         skipOptions := DeleteOption(options, HTTPHeaderRange)
489         meta, err := bucket.GetObjectDetailedMeta(objectKey, skipOptions...)
490         if err != nil {
491                 return err
492         }
493
494         // Load error or data invalid. Re-initialize the download.
495         valid, err := dcp.isValid(meta, uRange)
496         if err != nil || !valid {
497                 if err = dcp.prepare(meta, &bucket, objectKey, filePath, partSize, uRange); err != nil {
498                         return err
499                 }
500                 os.Remove(cpFilePath)
501         }
502
503         // Create the file if not exists. Otherwise the parts download will overwrite it.
504         fd, err := os.OpenFile(tempFilePath, os.O_WRONLY|os.O_CREATE, FilePermMode)
505         if err != nil {
506                 return err
507         }
508         fd.Close()
509
510         // Unfinished parts
511         parts := dcp.todoParts()
512         jobs := make(chan downloadPart, len(parts))
513         results := make(chan downloadPart, len(parts))
514         failed := make(chan error)
515         die := make(chan bool)
516
517         completedBytes := dcp.getCompletedBytes()
518         event := newProgressEvent(TransferStartedEvent, completedBytes, dcp.ObjStat.Size, 0)
519         publishProgress(listener, event)
520
521         // Start the download workers routine
522         arg := downloadWorkerArg{&bucket, objectKey, tempFilePath, options, downloadPartHooker, dcp.enableCRC}
523         for w := 1; w <= routines; w++ {
524                 go downloadWorker(w, arg, jobs, results, failed, die)
525         }
526
527         // Concurrently downloads parts
528         go downloadScheduler(jobs, parts)
529
530         // Wait for the parts download finished
531         completed := 0
532         for completed < len(parts) {
533                 select {
534                 case part := <-results:
535                         completed++
536                         dcp.PartStat[part.Index] = true
537                         dcp.Parts[part.Index].CRC64 = part.CRC64
538                         dcp.dump(cpFilePath)
539                         downBytes := (part.End - part.Start + 1)
540                         completedBytes += downBytes
541                         event = newProgressEvent(TransferDataEvent, completedBytes, dcp.ObjStat.Size, downBytes)
542                         publishProgress(listener, event)
543                 case err := <-failed:
544                         close(die)
545                         event = newProgressEvent(TransferFailedEvent, completedBytes, dcp.ObjStat.Size, 0)
546                         publishProgress(listener, event)
547                         return err
548                 }
549
550                 if completed >= len(parts) {
551                         break
552                 }
553         }
554
555         event = newProgressEvent(TransferCompletedEvent, completedBytes, dcp.ObjStat.Size, 0)
556         publishProgress(listener, event)
557
558         if dcp.enableCRC {
559                 actualCRC := combineCRCInParts(dcp.Parts)
560                 err = CheckDownloadCRC(actualCRC, dcp.CRC)
561                 if err != nil {
562                         return err
563                 }
564         }
565
566         return dcp.complete(cpFilePath, tempFilePath)
567 }