OSDN Git Service

up (#579)
authorOuter-God <39217235+Outer-God@users.noreply.github.com>
Mon, 22 Mar 2021 02:32:50 +0000 (10:32 +0800)
committerGitHub <noreply@github.com>
Mon, 22 Mar 2021 02:32:50 +0000 (10:32 +0800)
* up

* up

* Update upload.go

* Update upload.go

Co-authored-by: Welt <L5Accelerator@users.noreply.github.com>
node/node.go
toolbar/osssync/sync/blockkeeper.go [deleted file]
toolbar/osssync/sync/infofile.go
toolbar/osssync/sync/node.go [deleted file]
toolbar/osssync/sync/oss.go
toolbar/osssync/sync/sync.go
toolbar/osssync/sync/upload.go
toolbar/osssync/util/file.go
toolbar/osssync/util/json.go

index 61e2b8c..1835e34 100644 (file)
@@ -326,3 +326,8 @@ func (n *Node) RunForever() {
                n.Stop()
        })
 }
+
+// GetChain return the chain
+func (n *Node) GetChain() *protocol.Chain {
+       return n.chain
+}
diff --git a/toolbar/osssync/sync/blockkeeper.go b/toolbar/osssync/sync/blockkeeper.go
deleted file mode 100644 (file)
index dfd5c63..0000000
+++ /dev/null
@@ -1,47 +0,0 @@
-package sync
-
-import (
-       "github.com/aliyun/aliyun-oss-go-sdk/oss"
-
-       "github.com/bytom/vapor/toolbar/apinode"
-       "github.com/bytom/vapor/toolbar/osssync/config"
-       "github.com/bytom/vapor/toolbar/osssync/util"
-)
-
-// BlockKeeper the struct of the BlockKeeper
-type BlockKeeper struct {
-       Node      *apinode.Node
-       OssClient *oss.Client
-       OssBucket *oss.Bucket
-       FileUtil  *util.FileUtil
-}
-
-// NewBlockKeeper return one new instance of BlockKeeper
-func NewBlockKeeper() (*BlockKeeper, error) {
-       cfg := &config.Config{}
-       err := config.LoadConfig(&cfg)
-       if err != nil {
-               return nil, err
-       }
-
-       node := apinode.NewNode(cfg.VaporURL)
-
-       ossClient, err := oss.New(cfg.Oss.Endpoint, cfg.Oss.AccessKeyID, cfg.Oss.AccessKeySecret)
-       if err != nil {
-               return nil, err
-       }
-
-       ossBucket, err := ossClient.Bucket("bytom-seed")
-       if err != nil {
-               return nil, err
-       }
-
-       fileUtil := util.NewFileUtil("./blocks")
-
-       return &BlockKeeper{
-               Node:      node,
-               OssClient: ossClient,
-               OssBucket: ossBucket,
-               FileUtil:  fileUtil,
-       }, nil
-}
index e3dfc21..47860ec 100644 (file)
@@ -36,8 +36,8 @@ func NewInfo(end, gzSize uint64) *Info {
 }
 
 // GetInfoJson Download info.json
-func (b *BlockKeeper) GetInfoJson() (*Info, error) {
-       data, err := b.GetObjToData("info.json")
+func (s *Sync) GetInfoJson() (*Info, error) {
+       data, err := s.GetObjToData("info.json")
        if err != nil {
                return nil, err
        }
@@ -48,30 +48,30 @@ func (b *BlockKeeper) GetInfoJson() (*Info, error) {
 }
 
 // Upload info.json
-func (b *BlockKeeper) PutInfoJson(infoData *Info) error {
+func (s *Sync) PutInfoJson(infoData *Info) error {
        jsonData, err := util.Struct2Json(infoData)
        if err != nil {
                return err
        }
 
        // Upload
-       return b.PutObjByteArr("info.json", jsonData)
+       return s.PutObjByteArr("info.json", jsonData)
 }
 
 // SetLatestBlockHeight set new latest blockHeight on OSS
-func (b *BlockKeeper) SetLatestBlockHeight(newLatestBlockHeight uint64) error {
-       info, err := b.GetInfoJson()
+func (s *Sync) SetLatestBlockHeight(newLatestBlockHeight uint64) error {
+       info, err := s.GetInfoJson()
        if err != nil {
                return err
        }
 
        info.LatestBlockHeight = newLatestBlockHeight
-       return b.PutInfoJson(info)
+       return s.PutInfoJson(info)
 }
 
 // AddInterval adds an interval to the end of info.json
-func (b *BlockKeeper) AddInterval(end, gzSize uint64) error {
-       isJsonExist, err := b.OssBucket.IsObjectExist("info.json")
+func (s *Sync) AddInterval(end, gzSize uint64) error {
+       isJsonExist, err := s.OssBucket.IsObjectExist("info.json")
        if err != nil {
                return err
        }
@@ -79,7 +79,7 @@ func (b *BlockKeeper) AddInterval(end, gzSize uint64) error {
        var info *Info
        if isJsonExist {
                // Download info.json
-               info, err = b.GetInfoJson()
+               info, err = s.GetInfoJson()
                if err != nil {
                        return err
                }
@@ -91,5 +91,5 @@ func (b *BlockKeeper) AddInterval(end, gzSize uint64) error {
        } else {
                info = NewInfo(end, gzSize)
        }
-       return b.PutInfoJson(info)
+       return s.PutInfoJson(info)
 }
diff --git a/toolbar/osssync/sync/node.go b/toolbar/osssync/sync/node.go
deleted file mode 100644 (file)
index 951936d..0000000
+++ /dev/null
@@ -1,19 +0,0 @@
-package sync
-
-import "github.com/bytom/vapor/protocol/bc/types"
-
-// GetBlockArray return the RawBlockArray by BlockHeight from start to start+length-1
-func (b *BlockKeeper) GetBlockArray(start, length uint64) ([]*types.Block, error) {
-       blockHeight := start
-       data := []*types.Block{}
-       for i := uint64(0); i < length; i++ {
-               resp, err := b.Node.GetBlockByHeight(blockHeight)
-               if err != nil {
-                       return nil, err
-               }
-
-               data = append(data, resp)
-               blockHeight++
-       }
-       return data, nil
-}
index c89d862..74bed9c 100644 (file)
@@ -8,14 +8,14 @@ import (
 )
 
 // PutObjByteArr upload Byte Array object
-func (b *BlockKeeper) PutObjByteArr(objectName string, objectValue []byte) error {
+func (s *Sync) PutObjByteArr(objectName string, objectValue []byte) error {
        objectAcl := oss.ObjectACL(oss.ACLPublicRead)
-       return b.OssBucket.PutObject(objectName, bytes.NewReader(objectValue), objectAcl)
+       return s.OssBucket.PutObject(objectName, bytes.NewReader(objectValue), objectAcl)
 }
 
 // GetObjToData download object to stream
-func (b *BlockKeeper) GetObjToData(objectName string) ([]byte, error) {
-       body, err := b.OssBucket.GetObject(objectName)
+func (s *Sync) GetObjToData(objectName string) ([]byte, error) {
+       body, err := s.OssBucket.GetObject(objectName)
        if err != nil {
                return nil, err
        }
index 88ac2f3..1c1e75c 100644 (file)
@@ -1,22 +1,42 @@
 package sync
 
 import (
-       "github.com/bytom/vapor/protocol"
-       "github.com/bytom/vapor/protocol/bc/types"
+       "github.com/aliyun/aliyun-oss-go-sdk/oss"
+
+       "github.com/bytom/vapor/toolbar/osssync/config"
+       "github.com/bytom/vapor/toolbar/osssync/util"
 )
 
-// GetLatestDownloadBlockHeight returns the current height of the node wait for download.
-func GetLatestDownloadBlockHeight(c *protocol.Chain) uint64 {
-       return c.BestBlockHeight()
+// Sync the struct of the Sync
+type Sync struct {
+       OssClient *oss.Client
+       OssBucket *oss.Bucket
+       FileUtil  *util.FileUtil
 }
 
-// Sync
-func Sync(c *protocol.Chain, blocks []*types.Block) error {
-       for i := 0; i < len(blocks); i++ {
-               _, err := c.ProcessBlock(blocks[i])
-               if err != nil {
-                       return err
-               }
+// NewSync return one new instance of Sync
+func NewSync() (*Sync, error) {
+       cfg := &config.Config{}
+       err := config.LoadConfig(&cfg)
+       if err != nil {
+               return nil, err
+       }
+
+       ossClient, err := oss.New(cfg.Oss.Endpoint, cfg.Oss.AccessKeyID, cfg.Oss.AccessKeySecret)
+       if err != nil {
+               return nil, err
        }
-       return nil
+
+       ossBucket, err := ossClient.Bucket("bytom-seed")
+       if err != nil {
+               return nil, err
+       }
+
+       fileUtil := util.NewFileUtil("./blocks")
+
+       return &Sync{
+               OssClient: ossClient,
+               OssBucket: ossBucket,
+               FileUtil:  fileUtil,
+       }, nil
 }
index 2c9df91..32d5548 100644 (file)
@@ -5,15 +5,46 @@ import (
        "time"
 
        log "github.com/sirupsen/logrus"
+
+       "github.com/bytom/vapor/protocol/bc/types"
+       "github.com/bytom/vapor/toolbar/apinode"
+       "github.com/bytom/vapor/toolbar/osssync/config"
 )
 
+// UploadKeeper the struct for upload
+type UploadKeeper struct {
+       Sync *Sync
+       Node   *apinode.Node
+}
+
+// NewUploadKeeper return one new instance of UploadKeeper
+func NewUploadKeeper() (*UploadKeeper, error) {
+       cfg := &config.Config{}
+       err := config.LoadConfig(&cfg)
+       if err != nil {
+               return nil, err
+       }
+
+       node := apinode.NewNode(cfg.VaporURL)
+
+       sync, err := NewSync()
+       if err != nil {
+               return nil, err
+       }
+
+       return &UploadKeeper{
+               Sync: sync,
+               Node:   node,
+       }, nil
+}
+
 // RunSyncUp run synchronize upload to OSS
-func (b *BlockKeeper) RunSyncUp() {
+func (u *UploadKeeper) RunSyncUp() {
        ticker := time.NewTicker(time.Minute)
        defer ticker.Stop()
 
        for ; true; <-ticker.C {
-               err := b.Upload()
+               err := u.Upload()
                if err != nil {
                        log.WithField("error", err).Errorln("blockKeeper fail on process block")
                }
@@ -21,18 +52,18 @@ func (b *BlockKeeper) RunSyncUp() {
 }
 
 // Upload find and upload blocks
-func (b *BlockKeeper) Upload() error {
-       err := b.FileUtil.BlockDirInitial()
+func (u *UploadKeeper) Upload() error {
+       err := u.Sync.FileUtil.BlockDirInitial()
        if err != nil {
                return err
        }
 
-       currBlockHeight, err := b.Node.GetBlockCount() // Current block height on vapor
+       currBlockHeight, err := u.Node.GetBlockCount() // Current block height on vapor
        if err != nil {
                return err
        }
 
-       infoJson, err := b.GetInfoJson()
+       infoJson, err := u.Sync.GetInfoJson()
        if err != nil {
                return err
        }
@@ -53,9 +84,9 @@ func (b *BlockKeeper) Upload() error {
        // Upload Whole Interval
        for latestUp+1 < intervals[pos1].StartBlockHeight {
                if latestUp == 0 {
-                       err = b.UploadFiles(latestUp, intervals[pos2].EndBlockHeight, intervals[pos2].GzSize)
+                       err = u.UploadFiles(latestUp, intervals[pos2].EndBlockHeight, intervals[pos2].GzSize)
                } else {
-                       err = b.UploadFiles(latestUp+1, intervals[pos2].EndBlockHeight, intervals[pos2].GzSize)
+                       err = u.UploadFiles(latestUp+1, intervals[pos2].EndBlockHeight, intervals[pos2].GzSize)
                }
                if err != nil {
                        return err
@@ -69,9 +100,9 @@ func (b *BlockKeeper) Upload() error {
        newLatestUp := currBlockHeight - ((currBlockHeight - intervals[pos1].StartBlockHeight) % intervals[pos1].GzSize) - 1
        if latestUp < newLatestUp {
                if latestUp == 0 {
-                       err = b.UploadFiles(latestUp, newLatestUp, intervals[pos1].GzSize)
+                       err = u.UploadFiles(latestUp, newLatestUp, intervals[pos1].GzSize)
                } else {
-                       err = b.UploadFiles(latestUp+1, newLatestUp, intervals[pos1].GzSize)
+                       err = u.UploadFiles(latestUp+1, newLatestUp, intervals[pos1].GzSize)
                }
                if err != nil {
                        return err
@@ -81,12 +112,12 @@ func (b *BlockKeeper) Upload() error {
 }
 
 // UploadFiles get block from vapor and upload files to OSS
-func (b *BlockKeeper) UploadFiles(start, end, size uint64) error {
+func (u *UploadKeeper) UploadFiles(start, end, size uint64) error {
        for {
                if start > end {
                        break
                }
-               blocks, err := b.GetBlockArray(start, size)
+               blocks, err := u.GetBlockArray(start, size)
                if err != nil {
                        return err
                }
@@ -95,32 +126,32 @@ func (b *BlockKeeper) UploadFiles(start, end, size uint64) error {
                filenameJson := filename + ".json"
                filenameGzip := filenameJson + ".gz"
 
-               _, err = b.FileUtil.SaveBlockFile(filename, blocks)
+               _, err = u.Sync.FileUtil.SaveBlockFile(filename, blocks)
                if err != nil {
                        return err
                }
 
-               err = b.FileUtil.GzipCompress(filename)
+               err = u.Sync.FileUtil.GzipCompress(filename)
                if err != nil {
                        return err
                }
 
-               err = b.OssBucket.PutObjectFromFile(filenameGzip, b.FileUtil.LocalDir+"/"+filenameGzip)
+               err = u.Sync.OssBucket.PutObjectFromFile(filenameGzip, u.Sync.FileUtil.LocalDir+"/"+filenameGzip)
                if err != nil {
                        return err
                }
 
-               err = b.SetLatestBlockHeight(start + size - 1)
+               err = u.Sync.SetLatestBlockHeight(start + size - 1)
                if err != nil {
                        return err
                }
 
-               err = b.FileUtil.RemoveLocal(filenameJson)
+               err = u.Sync.FileUtil.RemoveLocal(filenameJson)
                if err != nil {
                        return err
                }
 
-               err = b.FileUtil.RemoveLocal(filenameGzip)
+               err = u.Sync.FileUtil.RemoveLocal(filenameGzip)
                if err != nil {
                        return err
                }
@@ -129,3 +160,19 @@ func (b *BlockKeeper) UploadFiles(start, end, size uint64) error {
        }
        return nil
 }
+
+// GetBlockArray return the RawBlockArray by BlockHeight from start to start+length-1
+func (u *UploadKeeper) GetBlockArray(start, length uint64) ([]*types.Block, error) {
+       blockHeight := start
+       data := []*types.Block{}
+       for i := uint64(0); i < length; i++ {
+               resp, err := u.Node.GetBlockByHeight(blockHeight)
+               if err != nil {
+                       return nil, err
+               }
+
+               data = append(data, resp)
+               blockHeight++
+       }
+       return data, nil
+}
index 2f42549..928bc4d 100644 (file)
@@ -40,6 +40,11 @@ func PathExists(path string) (bool, error) {
        return false, err
 }
 
+// RemoveLocal deletes file
+func (f *FileUtil) RemoveLocal(filename string) error {
+       return os.Remove(f.LocalDir + "/" + filename)
+}
+
 // BlockDirInitial initializes the blocks directory
 func (f *FileUtil) BlockDirInitial() error {
        ifPathExist, err := PathExists(f.LocalDir)
index 3ebd6a7..91821ed 100644 (file)
@@ -3,7 +3,6 @@ package util
 import (
        "encoding/json"
        "io/ioutil"
-       "os"
 )
 
 // NewFileUtil creates new file util
@@ -33,11 +32,6 @@ func (f *FileUtil) GetJson(filename string) (json.RawMessage, error) {
        return ioutil.ReadFile(filename)
 }
 
-// RemoveLocal deletes file
-func (f *FileUtil) RemoveLocal(filename string) error {
-       return os.Remove(f.LocalDir + "/" + filename)
-}
-
 // Json2Struct transform json to struct
 func Json2Struct(data json.RawMessage, resp interface{}) error {
        return json.Unmarshal(data, &resp)