OSDN Git Service

32d5548aba7fd3e35ba1e634a5cf21cb6d57bcbc
[bytom/vapor.git] / toolbar / osssync / sync / upload.go
1 package sync
2
3 import (
4         "strconv"
5         "time"
6
7         log "github.com/sirupsen/logrus"
8
9         "github.com/bytom/vapor/protocol/bc/types"
10         "github.com/bytom/vapor/toolbar/apinode"
11         "github.com/bytom/vapor/toolbar/osssync/config"
12 )
13
14 // UploadKeeper the struct for upload
15 type UploadKeeper struct {
16         Sync *Sync
17         Node   *apinode.Node
18 }
19
20 // NewUploadKeeper return one new instance of UploadKeeper
21 func NewUploadKeeper() (*UploadKeeper, error) {
22         cfg := &config.Config{}
23         err := config.LoadConfig(&cfg)
24         if err != nil {
25                 return nil, err
26         }
27
28         node := apinode.NewNode(cfg.VaporURL)
29
30         sync, err := NewSync()
31         if err != nil {
32                 return nil, err
33         }
34
35         return &UploadKeeper{
36                 Sync: sync,
37                 Node:   node,
38         }, nil
39 }
40
41 // RunSyncUp run synchronize upload to OSS
42 func (u *UploadKeeper) RunSyncUp() {
43         ticker := time.NewTicker(time.Minute)
44         defer ticker.Stop()
45
46         for ; true; <-ticker.C {
47                 err := u.Upload()
48                 if err != nil {
49                         log.WithField("error", err).Errorln("blockKeeper fail on process block")
50                 }
51         }
52 }
53
54 // Upload find and upload blocks
55 func (u *UploadKeeper) Upload() error {
56         err := u.Sync.FileUtil.BlockDirInitial()
57         if err != nil {
58                 return err
59         }
60
61         currBlockHeight, err := u.Node.GetBlockCount() // Current block height on vapor
62         if err != nil {
63                 return err
64         }
65
66         infoJson, err := u.Sync.GetInfoJson()
67         if err != nil {
68                 return err
69         }
70
71         latestUp := infoJson.LatestBlockHeight // Latest uploaded block height
72         intervals := infoJson.Interval         // Interval array
73
74         var pos1, pos2 int // currBlockHeight interval, latestUp interval
75         for pos1 = len(intervals) - 1; currBlockHeight < intervals[pos1].StartBlockHeight; pos1-- {
76         }
77         // Current Block Height is out of the range given by info.json
78         if currBlockHeight > intervals[pos1].EndBlockHeight {
79                 currBlockHeight = intervals[pos1].EndBlockHeight // Upload the part which contained by info.json
80         }
81         for pos2 = pos1; latestUp < intervals[pos2].StartBlockHeight; pos2-- {
82         }
83
84         // Upload Whole Interval
85         for latestUp+1 < intervals[pos1].StartBlockHeight {
86                 if latestUp == 0 {
87                         err = u.UploadFiles(latestUp, intervals[pos2].EndBlockHeight, intervals[pos2].GzSize)
88                 } else {
89                         err = u.UploadFiles(latestUp+1, intervals[pos2].EndBlockHeight, intervals[pos2].GzSize)
90                 }
91                 if err != nil {
92                         return err
93                 }
94
95                 latestUp = intervals[pos2].EndBlockHeight
96                 pos2++
97         }
98
99         // Upload the last Interval
100         newLatestUp := currBlockHeight - ((currBlockHeight - intervals[pos1].StartBlockHeight) % intervals[pos1].GzSize) - 1
101         if latestUp < newLatestUp {
102                 if latestUp == 0 {
103                         err = u.UploadFiles(latestUp, newLatestUp, intervals[pos1].GzSize)
104                 } else {
105                         err = u.UploadFiles(latestUp+1, newLatestUp, intervals[pos1].GzSize)
106                 }
107                 if err != nil {
108                         return err
109                 }
110         }
111         return err
112 }
113
114 // UploadFiles get block from vapor and upload files to OSS
115 func (u *UploadKeeper) UploadFiles(start, end, size uint64) error {
116         for {
117                 if start > end {
118                         break
119                 }
120                 blocks, err := u.GetBlockArray(start, size)
121                 if err != nil {
122                         return err
123                 }
124
125                 filename := strconv.FormatUint(start, 10)
126                 filenameJson := filename + ".json"
127                 filenameGzip := filenameJson + ".gz"
128
129                 _, err = u.Sync.FileUtil.SaveBlockFile(filename, blocks)
130                 if err != nil {
131                         return err
132                 }
133
134                 err = u.Sync.FileUtil.GzipCompress(filename)
135                 if err != nil {
136                         return err
137                 }
138
139                 err = u.Sync.OssBucket.PutObjectFromFile(filenameGzip, u.Sync.FileUtil.LocalDir+"/"+filenameGzip)
140                 if err != nil {
141                         return err
142                 }
143
144                 err = u.Sync.SetLatestBlockHeight(start + size - 1)
145                 if err != nil {
146                         return err
147                 }
148
149                 err = u.Sync.FileUtil.RemoveLocal(filenameJson)
150                 if err != nil {
151                         return err
152                 }
153
154                 err = u.Sync.FileUtil.RemoveLocal(filenameGzip)
155                 if err != nil {
156                         return err
157                 }
158
159                 start += size
160         }
161         return nil
162 }
163
164 // GetBlockArray return the RawBlockArray by BlockHeight from start to start+length-1
165 func (u *UploadKeeper) GetBlockArray(start, length uint64) ([]*types.Block, error) {
166         blockHeight := start
167         data := []*types.Block{}
168         for i := uint64(0); i < length; i++ {
169                 resp, err := u.Node.GetBlockByHeight(blockHeight)
170                 if err != nil {
171                         return nil, err
172                 }
173
174                 data = append(data, resp)
175                 blockHeight++
176         }
177         return data, nil
178 }