OSDN Git Service

Osssync (#581)
[bytom/vapor.git] / toolbar / osssync / upload / upload.go
1 package upload
2
3 import (
4         "strconv"
5         "time"
6
7         "github.com/aliyun/aliyun-oss-go-sdk/oss"
8         log "github.com/sirupsen/logrus"
9
10         "github.com/bytom/vapor/errors"
11         "github.com/bytom/vapor/protocol/bc/types"
12         "github.com/bytom/vapor/toolbar/apinode"
13         "github.com/bytom/vapor/toolbar/osssync/util"
14 )
15
16 const LOCALDIR = "./blocks/" // Local directory to store temp blocks files
17
18 // Run synchronize upload blocks from vapor to OSS
19 func Run() error {
20         uploadKeeper, err := NewUploadKeeper()
21         if err != nil {
22                 return err
23         }
24
25         uploadKeeper.Run()
26         return nil
27 }
28
29 // AddInterval if "info.json" exists on OSS, add Interval to the end; if not exist, create "info.json" with Interval
30 func AddInterval(end, gzSize uint64) error {
31         uploadKeeper, err := NewUploadKeeper()
32         if err != nil {
33                 return err
34         }
35
36         return uploadKeeper.AddInterval(end, gzSize)
37 }
38
39 // UploadKeeper the struct for upload
40 type UploadKeeper struct {
41         Node      *apinode.Node
42         OssClient *oss.Client
43         OssBucket *oss.Bucket
44         FileUtil  *util.FileUtil
45 }
46
47 // NewUploadKeeper return one new instance of UploadKeeper
48 func NewUploadKeeper() (*UploadKeeper, error) {
49         cfg := &Config{}
50         if err := LoadConfig(&cfg); err != nil {
51                 return nil, err
52         }
53
54         node := apinode.NewNode(cfg.VaporURL)
55
56         ossClient, err := oss.New(cfg.OssConfig.Login.Endpoint, cfg.OssConfig.Login.AccessKeyID, cfg.OssConfig.Login.AccessKeySecret)
57         if err != nil {
58                 return nil, err
59         }
60
61         ossBucket, err := ossClient.Bucket(cfg.OssConfig.Bucket)
62         if err != nil {
63                 return nil, err
64         }
65
66         fileUtil := util.NewFileUtil(LOCALDIR)
67
68         return &UploadKeeper{
69                 Node:      node,
70                 OssClient: ossClient,
71                 OssBucket: ossBucket,
72                 FileUtil:  fileUtil,
73         }, nil
74 }
75
76 // Run synchronize upload blocks from vapor to OSS
77 func (u *UploadKeeper) Run() {
78         ticker := time.NewTicker(time.Minute)
79         defer ticker.Stop()
80
81         for ; true; <-ticker.C {
82                 if err := u.Upload(); err != nil {
83                         log.WithField("error", err).Errorln("blockKeeper fail")
84                 }
85         }
86 }
87
88 // Upload find and upload blocks
89 func (u *UploadKeeper) Upload() error {
90         if err := u.FileUtil.BlockDirInitial(); err != nil {
91                 return err
92         }
93
94         currBlockHeight, err := u.Node.GetBlockCount() // Current block height on vapor
95         if err != nil {
96                 return err
97         }
98
99         if currBlockHeight == 0 {
100                 return errors.New("Current block height is 0.")
101         }
102
103         infoJson, err := u.GetInfoJson()
104         if err != nil {
105                 return err
106         }
107
108         latestUp := infoJson.LatestBlockHeight // Latest uploaded block height
109         intervals := infoJson.Interval         // Interval array
110
111         var pos1, pos2 int // currBlockHeight interval, latestUp interval
112         // Find pos1
113         for pos1 = len(intervals) - 1; currBlockHeight < intervals[pos1].StartBlockHeight; pos1-- {
114         }
115         // Current Block Height is out of the range given by info.json
116         if currBlockHeight > intervals[pos1].EndBlockHeight {
117                 currBlockHeight = intervals[pos1].EndBlockHeight // Upload the part which contained by info.json
118         }
119         // Find pos2
120         if latestUp == 0 {
121                 pos2 = 0
122         } else {
123                 for pos2 = pos1; latestUp < intervals[pos2].StartBlockHeight; pos2-- {
124                 }
125         }
126
127         // Upload Whole Interval
128         for latestUp+1 < intervals[pos1].StartBlockHeight {
129                 if err = u.UploadFiles(latestUp+1, intervals[pos2].EndBlockHeight, intervals[pos2].GzSize); err != nil {
130                         return err
131                 }
132
133                 latestUp = intervals[pos2].EndBlockHeight
134                 pos2++
135         }
136
137         // Upload the last Interval
138         newLatestUp := currBlockHeight - ((currBlockHeight - intervals[pos1].StartBlockHeight + 1) % intervals[pos1].GzSize)
139         if latestUp < newLatestUp {
140                 if err = u.UploadFiles(latestUp+1, newLatestUp, intervals[pos1].GzSize); err != nil {
141                         return err
142                 }
143         }
144         return nil
145 }
146
147 // UploadFiles get block from vapor and upload files to OSS
148 func (u *UploadKeeper) UploadFiles(start, end, size uint64) error {
149         for {
150                 if start > end {
151                         break
152                 }
153                 blocks, err := u.GetBlockArray(start, size)
154                 if err != nil {
155                         return err
156                 }
157
158                 filename := strconv.FormatUint(start, 10)
159                 filenameJson := filename + ".json"
160                 filenameGzip := filenameJson + ".gz"
161
162                 if _, err = u.FileUtil.SaveBlockFile(filename, blocks); err != nil {
163                         return err
164                 }
165
166                 if err = u.FileUtil.GzipCompress(filename); err != nil {
167                         return err
168                 }
169
170                 if err = u.FileUtil.RemoveLocal(filenameJson); err != nil {
171                         return err
172                 }
173
174                 if err = u.OssBucket.PutObjectFromFile(filenameGzip, u.FileUtil.LocalDir+filenameGzip); err != nil {
175                         return err
176                 }
177
178                 if err = u.SetLatestBlockHeight(start + size - 1); err != nil {
179                         return err
180                 }
181
182                 if err = u.FileUtil.RemoveLocal(filenameGzip); err != nil {
183                         return err
184                 }
185
186                 start += size
187         }
188         return nil
189 }
190
191 // GetBlockArray return the RawBlockArray by BlockHeight from start to start+length-1
192 func (u *UploadKeeper) GetBlockArray(start, length uint64) ([]*types.Block, error) {
193         blockHeight := start
194         data := []*types.Block{}
195         for i := uint64(0); i < length; i++ {
196                 resp, err := u.Node.GetBlockByHeight(blockHeight)
197                 if err != nil {
198                         return nil, err
199                 }
200
201                 data = append(data, resp)
202                 blockHeight++
203         }
204         return data, nil
205 }