7 log "github.com/sirupsen/logrus"
9 "github.com/bytom/vapor/protocol/bc/types"
10 "github.com/bytom/vapor/toolbar/apinode"
11 "github.com/bytom/vapor/toolbar/osssync/config"
14 // UploadKeeper the struct for upload
15 type UploadKeeper struct {
20 // NewUploadKeeper return one new instance of UploadKeeper
21 func NewUploadKeeper() (*UploadKeeper, error) {
22 cfg := &config.Config{}
23 err := config.LoadConfig(&cfg)
28 node := apinode.NewNode(cfg.VaporURL)
30 sync, err := NewSync()
41 // RunSyncUp run synchronize upload to OSS
42 func (u *UploadKeeper) RunSyncUp() {
43 ticker := time.NewTicker(time.Minute)
46 for ; true; <-ticker.C {
49 log.WithField("error", err).Errorln("blockKeeper fail on process block")
54 // Upload find and upload blocks
55 func (u *UploadKeeper) Upload() error {
56 err := u.Sync.FileUtil.BlockDirInitial()
61 currBlockHeight, err := u.Node.GetBlockCount() // Current block height on vapor
66 infoJson, err := u.Sync.GetInfoJson()
71 latestUp := infoJson.LatestBlockHeight // Latest uploaded block height
72 intervals := infoJson.Interval // Interval array
74 var pos1, pos2 int // currBlockHeight interval, latestUp interval
75 for pos1 = len(intervals) - 1; currBlockHeight < intervals[pos1].StartBlockHeight; pos1-- {
77 // Current Block Height is out of the range given by info.json
78 if currBlockHeight > intervals[pos1].EndBlockHeight {
79 currBlockHeight = intervals[pos1].EndBlockHeight // Upload the part which contained by info.json
81 for pos2 = pos1; latestUp < intervals[pos2].StartBlockHeight; pos2-- {
84 // Upload Whole Interval
85 for latestUp+1 < intervals[pos1].StartBlockHeight {
87 err = u.UploadFiles(latestUp, intervals[pos2].EndBlockHeight, intervals[pos2].GzSize)
89 err = u.UploadFiles(latestUp+1, intervals[pos2].EndBlockHeight, intervals[pos2].GzSize)
95 latestUp = intervals[pos2].EndBlockHeight
99 // Upload the last Interval
100 newLatestUp := currBlockHeight - ((currBlockHeight - intervals[pos1].StartBlockHeight) % intervals[pos1].GzSize) - 1
101 if latestUp < newLatestUp {
103 err = u.UploadFiles(latestUp, newLatestUp, intervals[pos1].GzSize)
105 err = u.UploadFiles(latestUp+1, newLatestUp, intervals[pos1].GzSize)
114 // UploadFiles get block from vapor and upload files to OSS
115 func (u *UploadKeeper) UploadFiles(start, end, size uint64) error {
120 blocks, err := u.GetBlockArray(start, size)
125 filename := strconv.FormatUint(start, 10)
126 filenameJson := filename + ".json"
127 filenameGzip := filenameJson + ".gz"
129 _, err = u.Sync.FileUtil.SaveBlockFile(filename, blocks)
134 err = u.Sync.FileUtil.GzipCompress(filename)
139 err = u.Sync.OssBucket.PutObjectFromFile(filenameGzip, u.Sync.FileUtil.LocalDir+"/"+filenameGzip)
144 err = u.Sync.SetLatestBlockHeight(start + size - 1)
149 err = u.Sync.FileUtil.RemoveLocal(filenameJson)
154 err = u.Sync.FileUtil.RemoveLocal(filenameGzip)
164 // GetBlockArray return the RawBlockArray by BlockHeight from start to start+length-1
165 func (u *UploadKeeper) GetBlockArray(start, length uint64) ([]*types.Block, error) {
167 data := []*types.Block{}
168 for i := uint64(0); i < length; i++ {
169 resp, err := u.Node.GetBlockByHeight(blockHeight)
174 data = append(data, resp)