20 // DownloadFile downloads files with multipart download.
22 // objectKey the object key.
23 // filePath the local file to download from objectKey in OSS.
24 // partSize the part size in bytes.
25 // options object's constraints, check out GetObject for the reference.
27 // error it's nil when the call succeeds, otherwise it's an error object.
29 func (bucket Bucket) DownloadFile(objectKey, filePath string, partSize int64, options ...Option) error {
31 return errors.New("oss: part size smaller than 1")
34 uRange, err := GetRangeConfig(options)
39 cpConf := getCpConfig(options)
40 routines := getRoutines(options)
42 var strVersionId string
43 versionId, _ := FindOption(options, "versionId", nil)
45 strVersionId = versionId.(string)
48 if cpConf != nil && cpConf.IsEnable {
49 cpFilePath := getDownloadCpFilePath(cpConf, bucket.BucketName, objectKey, strVersionId, filePath)
51 return bucket.downloadFileWithCp(objectKey, filePath, partSize, options, cpFilePath, routines, uRange)
55 return bucket.downloadFile(objectKey, filePath, partSize, options, routines, uRange)
58 func getDownloadCpFilePath(cpConf *cpConfig, srcBucket, srcObject, versionId, destFile string) string {
59 if cpConf.FilePath == "" && cpConf.DirPath != "" {
60 src := fmt.Sprintf("oss://%v/%v", srcBucket, srcObject)
61 absPath, _ := filepath.Abs(destFile)
62 cpFileName := getCpFileName(src, absPath, versionId)
63 cpConf.FilePath = cpConf.DirPath + string(os.PathSeparator) + cpFileName
65 return cpConf.FilePath
68 // downloadWorkerArg is download worker's parameters
69 type downloadWorkerArg struct {
78 // downloadPartHook is hook for test
79 type downloadPartHook func(part downloadPart) error
81 var downloadPartHooker downloadPartHook = defaultDownloadPartHook
83 func defaultDownloadPartHook(part downloadPart) error {
87 // defaultDownloadProgressListener defines default ProgressListener, shields the ProgressListener in options of GetObject.
88 type defaultDownloadProgressListener struct {
91 // ProgressChanged no-ops
92 func (listener *defaultDownloadProgressListener) ProgressChanged(event *ProgressEvent) {
96 func downloadWorker(id int, arg downloadWorkerArg, jobs <-chan downloadPart, results chan<- downloadPart, failed chan<- error, die <-chan bool) {
97 for part := range jobs {
98 if err := arg.hook(part); err != nil {
104 r := Range(part.Start, part.End)
105 p := Progress(&defaultDownloadProgressListener{})
107 var respHeader http.Header
108 opts := make([]Option, len(arg.options)+3)
109 // Append orderly, can not be reversed!
110 opts = append(opts, arg.options...)
111 opts = append(opts, r, p, GetResponseHeader(&respHeader))
113 rd, err := arg.bucket.GetObject(arg.key, opts...)
120 var crcCalc hash.Hash64
122 crcCalc = crc64.New(CrcTable())
123 contentLen := part.End - part.Start + 1
124 rd = ioutil.NopCloser(TeeReader(rd, crcCalc, contentLen, nil, nil))
134 fd, err := os.OpenFile(arg.filePath, os.O_WRONLY, FilePermMode)
140 _, err = fd.Seek(part.Start-part.Offset, os.SEEK_SET)
147 startT := time.Now().UnixNano() / 1000 / 1000 / 1000
148 _, err = io.Copy(fd, rd)
149 endT := time.Now().UnixNano() / 1000 / 1000 / 1000
151 arg.bucket.Client.Config.WriteLog(Debug, "download part error,cost:%d second,part number:%d,request id:%s,error:%s.\n", endT-startT, part.Index, GetRequestId(respHeader), err.Error())
158 part.CRC64 = crcCalc.Sum64()
167 func downloadScheduler(jobs chan downloadPart, parts []downloadPart) {
168 for _, part := range parts {
174 // downloadPart defines download part
175 type downloadPart struct {
176 Index int // Part number, starting from 0
177 Start int64 // Start index
178 End int64 // End index
179 Offset int64 // Offset
180 CRC64 uint64 // CRC check value of part
183 // getDownloadParts gets download parts
184 func getDownloadParts(objectSize, partSize int64, uRange *UnpackedRange) []downloadPart {
185 parts := []downloadPart{}
186 part := downloadPart{}
188 start, end := AdjustRange(uRange, objectSize)
189 for offset := start; offset < end; offset += partSize {
192 part.End = GetPartEnd(offset, end, partSize)
195 parts = append(parts, part)
201 // getObjectBytes gets object bytes length
202 func getObjectBytes(parts []downloadPart) int64 {
204 for _, part := range parts {
205 ob += (part.End - part.Start + 1)
210 // combineCRCInParts caculates the total CRC of continuous parts
211 func combineCRCInParts(dps []downloadPart) uint64 {
212 if dps == nil || len(dps) == 0 {
217 for i := 1; i < len(dps); i++ {
218 crc = CRC64Combine(crc, dps[i].CRC64, (uint64)(dps[i].End-dps[i].Start+1))
224 // downloadFile downloads file concurrently without checkpoint.
225 func (bucket Bucket) downloadFile(objectKey, filePath string, partSize int64, options []Option, routines int, uRange *UnpackedRange) error {
226 tempFilePath := filePath + TempFileSuffix
227 listener := GetProgressListener(options)
229 // If the file does not exist, create one. If exists, the download will overwrite it.
230 fd, err := os.OpenFile(tempFilePath, os.O_WRONLY|os.O_CREATE, FilePermMode)
236 // Get the object detailed meta for object whole size
237 // must delete header:range to get whole object size
238 skipOptions := DeleteOption(options, HTTPHeaderRange)
239 meta, err := bucket.GetObjectDetailedMeta(objectKey, skipOptions...)
244 objectSize, err := strconv.ParseInt(meta.Get(HTTPHeaderContentLength), 10, 64)
250 expectedCRC := (uint64)(0)
251 if bucket.GetConfig().IsEnableCRC && meta.Get(HTTPHeaderOssCRC64) != "" {
252 if uRange == nil || (!uRange.HasStart && !uRange.HasEnd) {
254 expectedCRC, _ = strconv.ParseUint(meta.Get(HTTPHeaderOssCRC64), 10, 64)
258 // Get the parts of the file
259 parts := getDownloadParts(objectSize, partSize, uRange)
260 jobs := make(chan downloadPart, len(parts))
261 results := make(chan downloadPart, len(parts))
262 failed := make(chan error)
263 die := make(chan bool)
265 var completedBytes int64
266 totalBytes := getObjectBytes(parts)
267 event := newProgressEvent(TransferStartedEvent, 0, totalBytes, 0)
268 publishProgress(listener, event)
270 // Start the download workers
271 arg := downloadWorkerArg{&bucket, objectKey, tempFilePath, options, downloadPartHooker, enableCRC}
272 for w := 1; w <= routines; w++ {
273 go downloadWorker(w, arg, jobs, results, failed, die)
276 // Download parts concurrently
277 go downloadScheduler(jobs, parts)
279 // Waiting for parts download finished
281 for completed < len(parts) {
283 case part := <-results:
285 downBytes := (part.End - part.Start + 1)
286 completedBytes += downBytes
287 parts[part.Index].CRC64 = part.CRC64
288 event = newProgressEvent(TransferDataEvent, completedBytes, totalBytes, downBytes)
289 publishProgress(listener, event)
290 case err := <-failed:
292 event = newProgressEvent(TransferFailedEvent, completedBytes, totalBytes, 0)
293 publishProgress(listener, event)
297 if completed >= len(parts) {
302 event = newProgressEvent(TransferCompletedEvent, completedBytes, totalBytes, 0)
303 publishProgress(listener, event)
306 actualCRC := combineCRCInParts(parts)
307 err = CheckDownloadCRC(actualCRC, expectedCRC)
313 return os.Rename(tempFilePath, filePath)
316 // ----- Concurrent download with chcekpoint -----
318 const downloadCpMagic = "92611BED-89E2-46B6-89E5-72F273D4B0A3"
320 type downloadCheckpoint struct {
321 Magic string // Magic
322 MD5 string // Checkpoint content MD5
323 FilePath string // Local file
325 ObjStat objectStat // Object status
326 Parts []downloadPart // All download parts
327 PartStat []bool // Parts' download status
328 Start int64 // Start point of the file
329 End int64 // End point of the file
330 enableCRC bool // Whether has CRC check
331 CRC uint64 // CRC check value
334 type objectStat struct {
335 Size int64 // Object size
336 LastModified string // Last modified time
340 // isValid flags of checkpoint data is valid. It returns true when the data is valid and the checkpoint is valid and the object is not updated.
341 func (cp downloadCheckpoint) isValid(meta http.Header, uRange *UnpackedRange) (bool, error) {
342 // Compare the CP's Magic and the MD5
345 js, _ := json.Marshal(cpb)
347 b64 := base64.StdEncoding.EncodeToString(sum[:])
349 if cp.Magic != downloadCpMagic || b64 != cp.MD5 {
353 objectSize, err := strconv.ParseInt(meta.Get(HTTPHeaderContentLength), 10, 64)
358 // Compare the object size, last modified time and etag
359 if cp.ObjStat.Size != objectSize ||
360 cp.ObjStat.LastModified != meta.Get(HTTPHeaderLastModified) ||
361 cp.ObjStat.Etag != meta.Get(HTTPHeaderEtag) {
365 // Check the download range
367 start, end := AdjustRange(uRange, objectSize)
368 if start != cp.Start || end != cp.End {
376 // load checkpoint from local file
377 func (cp *downloadCheckpoint) load(filePath string) error {
378 contents, err := ioutil.ReadFile(filePath)
383 err = json.Unmarshal(contents, cp)
387 // dump funciton dumps to file
388 func (cp *downloadCheckpoint) dump(filePath string) error {
393 js, err := json.Marshal(bcp)
398 b64 := base64.StdEncoding.EncodeToString(sum[:])
402 js, err = json.Marshal(bcp)
408 return ioutil.WriteFile(filePath, js, FilePermMode)
411 // todoParts gets unfinished parts
412 func (cp downloadCheckpoint) todoParts() []downloadPart {
413 dps := []downloadPart{}
414 for i, ps := range cp.PartStat {
416 dps = append(dps, cp.Parts[i])
422 // getCompletedBytes gets completed size
423 func (cp downloadCheckpoint) getCompletedBytes() int64 {
424 var completedBytes int64
425 for i, part := range cp.Parts {
427 completedBytes += (part.End - part.Start + 1)
430 return completedBytes
433 // prepare initiates download tasks
434 func (cp *downloadCheckpoint) prepare(meta http.Header, bucket *Bucket, objectKey, filePath string, partSize int64, uRange *UnpackedRange) error {
436 cp.Magic = downloadCpMagic
437 cp.FilePath = filePath
438 cp.Object = objectKey
440 objectSize, err := strconv.ParseInt(meta.Get(HTTPHeaderContentLength), 10, 64)
445 cp.ObjStat.Size = objectSize
446 cp.ObjStat.LastModified = meta.Get(HTTPHeaderLastModified)
447 cp.ObjStat.Etag = meta.Get(HTTPHeaderEtag)
449 if bucket.GetConfig().IsEnableCRC && meta.Get(HTTPHeaderOssCRC64) != "" {
450 if uRange == nil || (!uRange.HasStart && !uRange.HasEnd) {
452 cp.CRC, _ = strconv.ParseUint(meta.Get(HTTPHeaderOssCRC64), 10, 64)
457 cp.Parts = getDownloadParts(objectSize, partSize, uRange)
458 cp.PartStat = make([]bool, len(cp.Parts))
459 for i := range cp.PartStat {
460 cp.PartStat[i] = false
466 func (cp *downloadCheckpoint) complete(cpFilePath, downFilepath string) error {
467 err := os.Rename(downFilepath, cp.FilePath)
471 return os.Remove(cpFilePath)
474 // downloadFileWithCp downloads files with checkpoint.
475 func (bucket Bucket) downloadFileWithCp(objectKey, filePath string, partSize int64, options []Option, cpFilePath string, routines int, uRange *UnpackedRange) error {
476 tempFilePath := filePath + TempFileSuffix
477 listener := GetProgressListener(options)
479 // Load checkpoint data.
480 dcp := downloadCheckpoint{}
481 err := dcp.load(cpFilePath)
483 os.Remove(cpFilePath)
486 // Get the object detailed meta for object whole size
487 // must delete header:range to get whole object size
488 skipOptions := DeleteOption(options, HTTPHeaderRange)
489 meta, err := bucket.GetObjectDetailedMeta(objectKey, skipOptions...)
494 // Load error or data invalid. Re-initialize the download.
495 valid, err := dcp.isValid(meta, uRange)
496 if err != nil || !valid {
497 if err = dcp.prepare(meta, &bucket, objectKey, filePath, partSize, uRange); err != nil {
500 os.Remove(cpFilePath)
503 // Create the file if not exists. Otherwise the parts download will overwrite it.
504 fd, err := os.OpenFile(tempFilePath, os.O_WRONLY|os.O_CREATE, FilePermMode)
511 parts := dcp.todoParts()
512 jobs := make(chan downloadPart, len(parts))
513 results := make(chan downloadPart, len(parts))
514 failed := make(chan error)
515 die := make(chan bool)
517 completedBytes := dcp.getCompletedBytes()
518 event := newProgressEvent(TransferStartedEvent, completedBytes, dcp.ObjStat.Size, 0)
519 publishProgress(listener, event)
521 // Start the download workers routine
522 arg := downloadWorkerArg{&bucket, objectKey, tempFilePath, options, downloadPartHooker, dcp.enableCRC}
523 for w := 1; w <= routines; w++ {
524 go downloadWorker(w, arg, jobs, results, failed, die)
527 // Concurrently downloads parts
528 go downloadScheduler(jobs, parts)
530 // Wait for the parts download finished
532 for completed < len(parts) {
534 case part := <-results:
536 dcp.PartStat[part.Index] = true
537 dcp.Parts[part.Index].CRC64 = part.CRC64
539 downBytes := (part.End - part.Start + 1)
540 completedBytes += downBytes
541 event = newProgressEvent(TransferDataEvent, completedBytes, dcp.ObjStat.Size, downBytes)
542 publishProgress(listener, event)
543 case err := <-failed:
545 event = newProgressEvent(TransferFailedEvent, completedBytes, dcp.ObjStat.Size, 0)
546 publishProgress(listener, event)
550 if completed >= len(parts) {
555 event = newProgressEvent(TransferCompletedEvent, completedBytes, dcp.ObjStat.Size, 0)
556 publishProgress(listener, event)
559 actualCRC := combineCRCInParts(dcp.Parts)
560 err = CheckDownloadCRC(actualCRC, dcp.CRC)
566 return dcp.complete(cpFilePath, tempFilePath)