7 "github.com/aliyun/aliyun-oss-go-sdk/oss"
8 log "github.com/sirupsen/logrus"
10 "github.com/bytom/vapor/protocol/bc/types"
11 "github.com/bytom/vapor/toolbar/apinode"
12 "github.com/bytom/vapor/toolbar/osssync/util"
15 const LOCALDIR = "./blocks/" // Local directory to store temp blocks files
17 // Run synchronize upload blocks from vapor to OSS
19 uploadKeeper, err := NewUploadKeeper()
28 // AddInterval if "info.json" exists on OSS, add Interval to the end; if not exist, create "info.json" with Interval
29 func AddInterval(end, gzSize uint64) error {
30 uploadKeeper, err := NewUploadKeeper()
35 return uploadKeeper.AddInterval(end, gzSize)
38 // UploadKeeper the struct for upload
39 type UploadKeeper struct {
43 FileUtil *util.FileUtil
46 // NewUploadKeeper return one new instance of UploadKeeper
47 func NewUploadKeeper() (*UploadKeeper, error) {
49 err := LoadConfig(&cfg)
54 node := apinode.NewNode(cfg.VaporURL)
56 ossClient, err := oss.New(cfg.OssConfig.Login.Endpoint, cfg.OssConfig.Login.AccessKeyID, cfg.OssConfig.Login.AccessKeySecret)
61 ossBucket, err := ossClient.Bucket(cfg.OssConfig.Bucket)
66 fileUtil := util.NewFileUtil(LOCALDIR)
76 // Run synchronize upload blocks from vapor to OSS
77 func (u *UploadKeeper) Run() {
78 ticker := time.NewTicker(time.Minute)
81 for ; true; <-ticker.C {
84 log.WithField("error", err).Errorln("blockKeeper fail on process block")
89 // Upload find and upload blocks
90 func (u *UploadKeeper) Upload() error {
91 err := u.FileUtil.BlockDirInitial()
96 currBlockHeight, err := u.Node.GetBlockCount() // Current block height on vapor
101 infoJson, err := u.GetInfoJson()
106 latestUp := infoJson.LatestBlockHeight // Latest uploaded block height
107 intervals := infoJson.Interval // Interval array
109 var pos1, pos2 int // currBlockHeight interval, latestUp interval
110 for pos1 = len(intervals) - 1; currBlockHeight < intervals[pos1].StartBlockHeight; pos1-- {
112 // Current Block Height is out of the range given by info.json
113 if currBlockHeight > intervals[pos1].EndBlockHeight {
114 currBlockHeight = intervals[pos1].EndBlockHeight // Upload the part which contained by info.json
116 for pos2 = pos1; latestUp < intervals[pos2].StartBlockHeight; pos2-- {
119 // Upload Whole Interval
120 for latestUp+1 < intervals[pos1].StartBlockHeight {
121 err = u.UploadFiles(latestUp+1, intervals[pos2].EndBlockHeight, intervals[pos2].GzSize)
126 latestUp = intervals[pos2].EndBlockHeight
130 // Upload the last Interval
131 newLatestUp := currBlockHeight - ((currBlockHeight - intervals[pos1].StartBlockHeight) % intervals[pos1].GzSize) - 1
132 if latestUp < newLatestUp {
133 err = u.UploadFiles(latestUp+1, newLatestUp, intervals[pos1].GzSize)
141 // UploadFiles get block from vapor and upload files to OSS
142 func (u *UploadKeeper) UploadFiles(start, end, size uint64) error {
147 blocks, err := u.GetBlockArray(start, size)
152 filename := strconv.FormatUint(start, 10)
153 filenameJson := filename + ".json"
154 filenameGzip := filenameJson + ".gz"
156 _, err = u.FileUtil.SaveBlockFile(filename, blocks)
161 err = u.FileUtil.GzipCompress(filename)
166 err = u.OssBucket.PutObjectFromFile(filenameGzip, u.FileUtil.LocalDir+filenameGzip)
171 err = u.SetLatestBlockHeight(start + size - 1)
176 err = u.FileUtil.RemoveLocal(filenameJson)
181 err = u.FileUtil.RemoveLocal(filenameGzip)
191 // GetBlockArray return the RawBlockArray by BlockHeight from start to start+length-1
192 func (u *UploadKeeper) GetBlockArray(start, length uint64) ([]*types.Block, error) {
194 data := []*types.Block{}
195 for i := uint64(0); i < length; i++ {
196 resp, err := u.Node.GetBlockByHeight(blockHeight)
201 data = append(data, resp)