OSDN Git Service

e81512ec8ef64ca6d72f8ce014751f29c189fdfc
[bytom/vapor.git] / toolbar / osssync / upload / upload.go
1 package upload
2
3 import (
4         "strconv"
5         "time"
6
7         "github.com/aliyun/aliyun-oss-go-sdk/oss"
8         log "github.com/sirupsen/logrus"
9
10         "github.com/bytom/vapor/protocol/bc/types"
11         "github.com/bytom/vapor/toolbar/apinode"
12         "github.com/bytom/vapor/toolbar/osssync/util"
13 )
14
15 const LOCALDIR = "./blocks/" // Local directory to store temp blocks files
16
17 // Run synchronize upload blocks from vapor to OSS
18 func Run() error {
19         uploadKeeper, err := NewUploadKeeper()
20         if err != nil {
21                 return err
22         }
23
24         uploadKeeper.Run()
25         return nil
26 }
27
28 // AddInterval if "info.json" exists on OSS, add Interval to the end; if not exist, create "info.json" with Interval
29 func AddInterval(end, gzSize uint64) error {
30         uploadKeeper, err := NewUploadKeeper()
31         if err != nil {
32                 return err
33         }
34
35         return uploadKeeper.AddInterval(end, gzSize)
36 }
37
38 // UploadKeeper the struct for upload
39 type UploadKeeper struct {
40         Node      *apinode.Node
41         OssClient *oss.Client
42         OssBucket *oss.Bucket
43         FileUtil  *util.FileUtil
44 }
45
46 // NewUploadKeeper return one new instance of UploadKeeper
47 func NewUploadKeeper() (*UploadKeeper, error) {
48         cfg := &Config{}
49         err := LoadConfig(&cfg)
50         if err != nil {
51                 return nil, err
52         }
53
54         node := apinode.NewNode(cfg.VaporURL)
55
56         ossClient, err := oss.New(cfg.OssConfig.Login.Endpoint, cfg.OssConfig.Login.AccessKeyID, cfg.OssConfig.Login.AccessKeySecret)
57         if err != nil {
58                 return nil, err
59         }
60
61         ossBucket, err := ossClient.Bucket(cfg.OssConfig.Bucket)
62         if err != nil {
63                 return nil, err
64         }
65
66         fileUtil := util.NewFileUtil(LOCALDIR)
67
68         return &UploadKeeper{
69                 Node:      node,
70                 OssClient: ossClient,
71                 OssBucket: ossBucket,
72                 FileUtil:  fileUtil,
73         }, nil
74 }
75
76 // Run synchronize upload blocks from vapor to OSS
77 func (u *UploadKeeper) Run() {
78         ticker := time.NewTicker(time.Minute)
79         defer ticker.Stop()
80
81         for ; true; <-ticker.C {
82                 err := u.Upload()
83                 if err != nil {
84                         log.WithField("error", err).Errorln("blockKeeper fail on process block")
85                 }
86         }
87 }
88
89 // Upload find and upload blocks
90 func (u *UploadKeeper) Upload() error {
91         err := u.FileUtil.BlockDirInitial()
92         if err != nil {
93                 return err
94         }
95
96         currBlockHeight, err := u.Node.GetBlockCount() // Current block height on vapor
97         if err != nil {
98                 return err
99         }
100
101         infoJson, err := u.GetInfoJson()
102         if err != nil {
103                 return err
104         }
105
106         latestUp := infoJson.LatestBlockHeight // Latest uploaded block height
107         intervals := infoJson.Interval         // Interval array
108
109         var pos1, pos2 int // currBlockHeight interval, latestUp interval
110         for pos1 = len(intervals) - 1; currBlockHeight < intervals[pos1].StartBlockHeight; pos1-- {
111         }
112         // Current Block Height is out of the range given by info.json
113         if currBlockHeight > intervals[pos1].EndBlockHeight {
114                 currBlockHeight = intervals[pos1].EndBlockHeight // Upload the part which contained by info.json
115         }
116         for pos2 = pos1; latestUp < intervals[pos2].StartBlockHeight; pos2-- {
117         }
118
119         // Upload Whole Interval
120         for latestUp+1 < intervals[pos1].StartBlockHeight {
121                 err = u.UploadFiles(latestUp+1, intervals[pos2].EndBlockHeight, intervals[pos2].GzSize)
122                 if err != nil {
123                         return err
124                 }
125
126                 latestUp = intervals[pos2].EndBlockHeight
127                 pos2++
128         }
129
130         // Upload the last Interval
131         newLatestUp := currBlockHeight - ((currBlockHeight - intervals[pos1].StartBlockHeight) % intervals[pos1].GzSize) - 1
132         if latestUp < newLatestUp {
133                 err = u.UploadFiles(latestUp+1, newLatestUp, intervals[pos1].GzSize)
134                 if err != nil {
135                         return err
136                 }
137         }
138         return err
139 }
140
141 // UploadFiles get block from vapor and upload files to OSS
142 func (u *UploadKeeper) UploadFiles(start, end, size uint64) error {
143         for {
144                 if start > end {
145                         break
146                 }
147                 blocks, err := u.GetBlockArray(start, size)
148                 if err != nil {
149                         return err
150                 }
151
152                 filename := strconv.FormatUint(start, 10)
153                 filenameJson := filename + ".json"
154                 filenameGzip := filenameJson + ".gz"
155
156                 _, err = u.FileUtil.SaveBlockFile(filename, blocks)
157                 if err != nil {
158                         return err
159                 }
160
161                 err = u.FileUtil.GzipCompress(filename)
162                 if err != nil {
163                         return err
164                 }
165
166                 err = u.OssBucket.PutObjectFromFile(filenameGzip, u.FileUtil.LocalDir+filenameGzip)
167                 if err != nil {
168                         return err
169                 }
170
171                 err = u.SetLatestBlockHeight(start + size - 1)
172                 if err != nil {
173                         return err
174                 }
175
176                 err = u.FileUtil.RemoveLocal(filenameJson)
177                 if err != nil {
178                         return err
179                 }
180
181                 err = u.FileUtil.RemoveLocal(filenameGzip)
182                 if err != nil {
183                         return err
184                 }
185
186                 start += size
187         }
188         return nil
189 }
190
191 // GetBlockArray return the RawBlockArray by BlockHeight from start to start+length-1
192 func (u *UploadKeeper) GetBlockArray(start, length uint64) ([]*types.Block, error) {
193         blockHeight := start
194         data := []*types.Block{}
195         for i := uint64(0); i < length; i++ {
196                 resp, err := u.Node.GetBlockByHeight(blockHeight)
197                 if err != nil {
198                         return nil, err
199                 }
200
201                 data = append(data, resp)
202                 blockHeight++
203         }
204         return data, nil
205 }