OSDN Git Service

Create ossClient.go (#574)
[bytom/vapor.git] / vendor / github.com / aliyun / aliyun-oss-go-sdk / oss / limit_reader_1_7.go
1 // +build go1.7
2
3 package oss
4
5 import (
6         "fmt"
7         "io"
8         "math"
9         "time"
10
11         "golang.org/x/time/rate"
12 )
13
14 const (
15         perTokenBandwidthSize int = 1024
16 )
17
18 // OssLimiter wrapper rate.Limiter
19 type OssLimiter struct {
20         limiter *rate.Limiter
21 }
22
23 // GetOssLimiter create OssLimiter
24 // uploadSpeed KB/s
25 func GetOssLimiter(uploadSpeed int) (ossLimiter *OssLimiter, err error) {
26         limiter := rate.NewLimiter(rate.Limit(uploadSpeed), uploadSpeed)
27
28         // first consume the initial full token,the limiter will behave more accurately
29         limiter.AllowN(time.Now(), uploadSpeed)
30
31         return &OssLimiter{
32                 limiter: limiter,
33         }, nil
34 }
35
36 // LimitSpeedReader for limit bandwidth upload
37 type LimitSpeedReader struct {
38         io.ReadCloser
39         reader     io.Reader
40         ossLimiter *OssLimiter
41 }
42
43 // Read
44 func (r *LimitSpeedReader) Read(p []byte) (n int, err error) {
45         n = 0
46         err = nil
47         start := 0
48         burst := r.ossLimiter.limiter.Burst()
49         var end int
50         var tmpN int
51         var tc int
52         for start < len(p) {
53                 if start+burst*perTokenBandwidthSize < len(p) {
54                         end = start + burst*perTokenBandwidthSize
55                 } else {
56                         end = len(p)
57                 }
58
59                 tmpN, err = r.reader.Read(p[start:end])
60                 if tmpN > 0 {
61                         n += tmpN
62                         start = n
63                 }
64
65                 if err != nil {
66                         return
67                 }
68
69                 tc = int(math.Ceil(float64(tmpN) / float64(perTokenBandwidthSize)))
70                 now := time.Now()
71                 re := r.ossLimiter.limiter.ReserveN(now, tc)
72                 if !re.OK() {
73                         err = fmt.Errorf("LimitSpeedReader.Read() failure,ReserveN error,start:%d,end:%d,burst:%d,perTokenBandwidthSize:%d",
74                                 start, end, burst, perTokenBandwidthSize)
75                         return
76                 }
77                 timeDelay := re.Delay()
78                 time.Sleep(timeDelay)
79         }
80         return
81 }
82
83 // Close ...
84 func (r *LimitSpeedReader) Close() error {
85         rc, ok := r.reader.(io.ReadCloser)
86         if ok {
87                 return rc.Close()
88         }
89         return nil
90 }