7 "github.com/aliyun/aliyun-oss-go-sdk/oss"
8 log "github.com/sirupsen/logrus"
10 "github.com/bytom/vapor/errors"
11 "github.com/bytom/vapor/protocol/bc/types"
12 "github.com/bytom/vapor/toolbar/apinode"
13 "github.com/bytom/vapor/toolbar/osssync/util"
16 const LOCALDIR = "./blocks/" // Local directory to store temp blocks files
18 // Run synchronize upload blocks from vapor to OSS
20 uploadKeeper, err := NewUploadKeeper()
29 // AddInterval if "info.json" exists on OSS, add Interval to the end; if not exist, create "info.json" with Interval
30 func AddInterval(end, gzSize uint64) error {
31 uploadKeeper, err := NewUploadKeeper()
36 return uploadKeeper.AddInterval(end, gzSize)
39 // UploadKeeper the struct for upload
40 type UploadKeeper struct {
44 FileUtil *util.FileUtil
47 // NewUploadKeeper return one new instance of UploadKeeper
48 func NewUploadKeeper() (*UploadKeeper, error) {
50 if err := LoadConfig(&cfg); err != nil {
54 node := apinode.NewNode(cfg.VaporURL)
56 ossClient, err := oss.New(cfg.OssConfig.Login.Endpoint, cfg.OssConfig.Login.AccessKeyID, cfg.OssConfig.Login.AccessKeySecret)
61 ossBucket, err := ossClient.Bucket(cfg.OssConfig.Bucket)
66 fileUtil := util.NewFileUtil(LOCALDIR)
76 // Run synchronize upload blocks from vapor to OSS
77 func (u *UploadKeeper) Run() {
78 ticker := time.NewTicker(time.Minute)
81 for ; true; <-ticker.C {
82 if err := u.Upload(); err != nil {
83 log.WithField("error", err).Errorln("blockKeeper fail")
88 // Upload find and upload blocks
89 func (u *UploadKeeper) Upload() error {
90 if err := u.FileUtil.BlockDirInitial(); err != nil {
94 currBlockHeight, err := u.Node.GetBlockCount() // Current block height on vapor
99 if currBlockHeight == 0 {
100 return errors.New("Current block height is 0.")
103 infoJson, err := u.GetInfoJson()
108 latestUp := infoJson.LatestBlockHeight // Latest uploaded block height
109 intervals := infoJson.Interval // Interval array
111 var pos1, pos2 int // currBlockHeight interval, latestUp interval
113 for pos1 = len(intervals) - 1; currBlockHeight < intervals[pos1].StartBlockHeight; pos1-- {
115 // Current Block Height is out of the range given by info.json
116 if currBlockHeight > intervals[pos1].EndBlockHeight {
117 currBlockHeight = intervals[pos1].EndBlockHeight // Upload the part which contained by info.json
123 for pos2 = pos1; latestUp < intervals[pos2].StartBlockHeight; pos2-- {
127 // Upload Whole Interval
128 for latestUp+1 < intervals[pos1].StartBlockHeight {
129 if err = u.UploadFiles(latestUp+1, intervals[pos2].EndBlockHeight, intervals[pos2].GzSize); err != nil {
133 latestUp = intervals[pos2].EndBlockHeight
137 // Upload the last Interval
138 newLatestUp := currBlockHeight - ((currBlockHeight - intervals[pos1].StartBlockHeight + 1) % intervals[pos1].GzSize)
139 if latestUp < newLatestUp {
140 if err = u.UploadFiles(latestUp+1, newLatestUp, intervals[pos1].GzSize); err != nil {
147 // UploadFiles get block from vapor and upload files to OSS
148 func (u *UploadKeeper) UploadFiles(start, end, size uint64) error {
153 blocks, err := u.GetBlockArray(start, size)
158 filename := strconv.FormatUint(start, 10)
159 filenameJson := filename + ".json"
160 filenameGzip := filenameJson + ".gz"
162 if _, err = u.FileUtil.SaveBlockFile(filename, blocks); err != nil {
166 if err = u.FileUtil.GzipCompress(filename); err != nil {
170 if err = u.FileUtil.RemoveLocal(filenameJson); err != nil {
174 if err = u.OssBucket.PutObjectFromFile(filenameGzip, u.FileUtil.LocalDir+filenameGzip); err != nil {
178 if err = u.SetLatestBlockHeight(start + size - 1); err != nil {
182 if err = u.FileUtil.RemoveLocal(filenameGzip); err != nil {
191 // GetBlockArray return the RawBlockArray by BlockHeight from start to start+length-1
192 func (u *UploadKeeper) GetBlockArray(start, length uint64) ([]*types.Block, error) {
194 data := []*types.Block{}
195 for i := uint64(0); i < length; i++ {
196 resp, err := u.Node.GetBlockByHeight(blockHeight)
201 data = append(data, resp)