OSDN Git Service

Osssync (#580)
authorOuter-God <39217235+Outer-God@users.noreply.github.com>
Wed, 24 Mar 2021 10:02:02 +0000 (18:02 +0800)
committerGitHub <noreply@github.com>
Wed, 24 Mar 2021 10:02:02 +0000 (18:02 +0800)
* up

* up

* up

* up

* Update upload.go

* Update upload.go

* Update upload.go

* Update upload.go

* Update upload.go

* Update oss.go

* Update upload.go

Co-authored-by: Welt <L5Accelerator@users.noreply.github.com>
12 files changed:
cmd/vapord/commands/run_node.go
config/config.go
toolbar/osssync/sync/infofile.go [deleted file]
toolbar/osssync/sync/oss.go [deleted file]
toolbar/osssync/sync/sync.go [deleted file]
toolbar/osssync/upload/config.go [moved from toolbar/osssync/config/config.go with 72% similarity]
toolbar/osssync/upload/oss.go [new file with mode: 0644]
toolbar/osssync/upload/upload.go [moved from toolbar/osssync/sync/upload.go with 62% similarity]
toolbar/osssync/util/file.go
toolbar/osssync/util/gzip.go
toolbar/osssync/util/infofile.go [new file with mode: 0644]
toolbar/osssync/util/json.go

index af75e32..18fab86 100644 (file)
@@ -55,6 +55,9 @@ func init() {
        runNodeCmd.Flags().Int("ws.max_num_websockets", config.Websocket.MaxNumWebsockets, "Max number of websocket connections")
        runNodeCmd.Flags().Int("ws.max_num_concurrent_reqs", config.Websocket.MaxNumConcurrentReqs, "Max number of concurrent websocket requests that may be processed concurrently")
 
+       // OSS
+       runNodeCmd.Flags().String("oss.endpoint", config.Oss.Endpoint, "Endpoint of OSS")
+
        RootCmd.AddCommand(runNodeCmd)
 }
 
index e9490f9..0ef07e5 100644 (file)
@@ -29,6 +29,7 @@ type Config struct {
        Websocket  *WebsocketConfig  `mapstructure:"ws"`
        Federation *FederationConfig `mapstructure:"federation"`
        CrossChain *CrossChainConfig `mapstructure:"cross_chain"`
+       Oss        *OssConfig        `mapstructure:"oss"`
 }
 
 // Default configurable parameters.
@@ -42,6 +43,7 @@ func DefaultConfig() *Config {
                Websocket:  DefaultWebsocketConfig(),
                Federation: DefaultFederationConfig(),
                CrossChain: DefaultCrossChainConfig(),
+               Oss:        DefaultOssConfig(),
        }
 }
 
@@ -220,6 +222,10 @@ type CrossChainConfig struct {
        AssetWhitelist string `mapstructure:"asset_whitelist"`
 }
 
+type OssConfig struct {
+       Endpoint string `mapstructure:"endpoint"`
+}
+
 // Default configurable rpc's auth parameters.
 func DefaultRPCAuthConfig() *RPCAuthConfig {
        return &RPCAuthConfig{
@@ -277,6 +283,11 @@ func xpub(str string) (xpub chainkd.XPub) {
        return xpub
 }
 
+// Default configurable oss parameters.
+func DefaultOssConfig() *OssConfig {
+       return &OssConfig{}
+}
+
 //-----------------------------------------------------------------------------
 // Utils
 
diff --git a/toolbar/osssync/sync/infofile.go b/toolbar/osssync/sync/infofile.go
deleted file mode 100644 (file)
index 47860ec..0000000
+++ /dev/null
@@ -1,95 +0,0 @@
-package sync
-
-import "github.com/bytom/vapor/toolbar/osssync/util"
-
-// Interval determines the number of blocks in a Gzip file in the Interval of blockHeight
-// StartBlockHeight is the start of the Interval
-// EndBlockHeight: the end of the Interval
-// GzSize is the number of blocks store in a Gzip file
-type Interval struct {
-       StartBlockHeight uint64
-       EndBlockHeight   uint64
-       GzSize           uint64
-}
-
-// NewInterval creates a new Interval from info.json
-func NewInterval(start, end, gzSize uint64) *Interval {
-       return &Interval{
-               StartBlockHeight: start,
-               EndBlockHeight:   end,
-               GzSize:           gzSize,
-       }
-}
-
-// Info is a struct for info.json
-type Info struct {
-       LatestBlockHeight uint64
-       Interval          []*Interval
-}
-
-// NewInfo creates a new Info for info.json
-func NewInfo(end, gzSize uint64) *Info {
-       newInvl := NewInterval(0, end, gzSize)
-       var arr []*Interval
-       arr = append(arr, newInvl)
-       return &Info{0, arr}
-}
-
-// GetInfoJson Download info.json
-func (s *Sync) GetInfoJson() (*Info, error) {
-       data, err := s.GetObjToData("info.json")
-       if err != nil {
-               return nil, err
-       }
-
-       info := new(Info)
-       err = util.Json2Struct(data, &info)
-       return info, err
-}
-
-// Upload info.json
-func (s *Sync) PutInfoJson(infoData *Info) error {
-       jsonData, err := util.Struct2Json(infoData)
-       if err != nil {
-               return err
-       }
-
-       // Upload
-       return s.PutObjByteArr("info.json", jsonData)
-}
-
-// SetLatestBlockHeight set new latest blockHeight on OSS
-func (s *Sync) SetLatestBlockHeight(newLatestBlockHeight uint64) error {
-       info, err := s.GetInfoJson()
-       if err != nil {
-               return err
-       }
-
-       info.LatestBlockHeight = newLatestBlockHeight
-       return s.PutInfoJson(info)
-}
-
-// AddInterval adds an interval to the end of info.json
-func (s *Sync) AddInterval(end, gzSize uint64) error {
-       isJsonExist, err := s.OssBucket.IsObjectExist("info.json")
-       if err != nil {
-               return err
-       }
-
-       var info *Info
-       if isJsonExist {
-               // Download info.json
-               info, err = s.GetInfoJson()
-               if err != nil {
-                       return err
-               }
-
-               // Add Interval
-               prevInvl := info.Interval[len(info.Interval)-1]
-               newInvl := NewInterval(prevInvl.EndBlockHeight+1, end, gzSize)
-               info.Interval = append(info.Interval, newInvl)
-       } else {
-               info = NewInfo(end, gzSize)
-       }
-       return s.PutInfoJson(info)
-}
diff --git a/toolbar/osssync/sync/oss.go b/toolbar/osssync/sync/oss.go
deleted file mode 100644 (file)
index 74bed9c..0000000
+++ /dev/null
@@ -1,31 +0,0 @@
-package sync
-
-import (
-       "bytes"
-       "io/ioutil"
-
-       "github.com/aliyun/aliyun-oss-go-sdk/oss"
-)
-
-// PutObjByteArr upload Byte Array object
-func (s *Sync) PutObjByteArr(objectName string, objectValue []byte) error {
-       objectAcl := oss.ObjectACL(oss.ACLPublicRead)
-       return s.OssBucket.PutObject(objectName, bytes.NewReader(objectValue), objectAcl)
-}
-
-// GetObjToData download object to stream
-func (s *Sync) GetObjToData(objectName string) ([]byte, error) {
-       body, err := s.OssBucket.GetObject(objectName)
-       if err != nil {
-               return nil, err
-       }
-
-       defer body.Close()
-
-       data, err := ioutil.ReadAll(body)
-       if err != nil {
-               return nil, err
-       }
-
-       return data, err
-}
diff --git a/toolbar/osssync/sync/sync.go b/toolbar/osssync/sync/sync.go
deleted file mode 100644 (file)
index 1c1e75c..0000000
+++ /dev/null
@@ -1,42 +0,0 @@
-package sync
-
-import (
-       "github.com/aliyun/aliyun-oss-go-sdk/oss"
-
-       "github.com/bytom/vapor/toolbar/osssync/config"
-       "github.com/bytom/vapor/toolbar/osssync/util"
-)
-
-// Sync the struct of the Sync
-type Sync struct {
-       OssClient *oss.Client
-       OssBucket *oss.Bucket
-       FileUtil  *util.FileUtil
-}
-
-// NewSync return one new instance of Sync
-func NewSync() (*Sync, error) {
-       cfg := &config.Config{}
-       err := config.LoadConfig(&cfg)
-       if err != nil {
-               return nil, err
-       }
-
-       ossClient, err := oss.New(cfg.Oss.Endpoint, cfg.Oss.AccessKeyID, cfg.Oss.AccessKeySecret)
-       if err != nil {
-               return nil, err
-       }
-
-       ossBucket, err := ossClient.Bucket("bytom-seed")
-       if err != nil {
-               return nil, err
-       }
-
-       fileUtil := util.NewFileUtil("./blocks")
-
-       return &Sync{
-               OssClient: ossClient,
-               OssBucket: ossBucket,
-               FileUtil:  fileUtil,
-       }, nil
-}
similarity index 72%
rename from toolbar/osssync/config/config.go
rename to toolbar/osssync/upload/config.go
index be4c2a8..b7dcd42 100644 (file)
@@ -1,4 +1,4 @@
-package config
+package upload
 
 import (
        "encoding/json"
@@ -9,18 +9,24 @@ import (
 
 // Config represent root of config
 type Config struct {
-       Oss      Oss    `json:"oss"`
-       VaporURL string `json:"vapor_url"`
+       OssConfig *OssConfig `json:"oss_config"`
+       VaporURL  string     `json:"vapor_url"`
 }
 
 // Oss logs cfg
-type Oss struct {
+type Login struct {
        Endpoint        string `json:"endpoint"`
        AccessKeyID     string `json:"access_key_id"`
        AccessKeySecret string `json:"access_key_secret"`
 }
 
-// LoadConfig read path file to the config object
+// Oss cfg
+type OssConfig struct {
+       Login  *Login `json:"login"`
+       Bucket string `json:"bucket"`
+}
+
+// LoadConfig read path file to the config object for Upload from Vapor to OSS
 func LoadConfig(config interface{}) error {
        if len(os.Args) <= 1 {
                return errors.New("Please setup the config file path as Args[1]")
diff --git a/toolbar/osssync/upload/oss.go b/toolbar/osssync/upload/oss.go
new file mode 100644 (file)
index 0000000..e2fd6cf
--- /dev/null
@@ -0,0 +1,72 @@
+package upload
+
+import (
+       "bytes"
+
+       "github.com/aliyun/aliyun-oss-go-sdk/oss"
+
+       "github.com/bytom/vapor/toolbar/osssync/util"
+)
+
+// PutObjByteArr upload Byte Array object
+func (u *UploadKeeper) PutObjByteArr(objectName string, objectValue []byte) error {
+       objectAcl := oss.ObjectACL(oss.ACLPublicRead)
+       return u.OssBucket.PutObject(objectName, bytes.NewReader(objectValue), objectAcl)
+}
+
+// GetInfoJson Download info.json
+func (u *UploadKeeper) GetInfoJson() (*util.Info, error) {
+       body, err := u.OssBucket.GetObject("info.json")
+       if err != nil {
+               return nil, err
+       }
+
+       return util.GetInfoJson(body)
+}
+
+// Upload info.json
+func (u *UploadKeeper) PutInfoJson(infoData *util.Info) error {
+       jsonData, err := util.Struct2Json(infoData)
+       if err != nil {
+               return err
+       }
+
+       // Upload
+       return u.PutObjByteArr("info.json", jsonData)
+}
+
+// SetLatestBlockHeight set new latest blockHeight on OSS
+func (u *UploadKeeper) SetLatestBlockHeight(newLatestBlockHeight uint64) error {
+       info, err := u.GetInfoJson()
+       if err != nil {
+               return err
+       }
+
+       info.LatestBlockHeight = newLatestBlockHeight
+       return u.PutInfoJson(info)
+}
+
+// AddInterval if "info.json" exists on OSS, add Interval to the end; if not exist, create "info.json" with Interval
+func (u *UploadKeeper) AddInterval(end, gzSize uint64) error {
+       isJsonExist, err := u.OssBucket.IsObjectExist("info.json")
+       if err != nil {
+               return err
+       }
+
+       var info *util.Info
+       if isJsonExist {
+               // Download info.json
+               info, err = u.GetInfoJson()
+               if err != nil {
+                       return err
+               }
+
+               // Add Interval
+               prevInvl := info.Interval[len(info.Interval)-1]
+               newInvl := util.NewInterval(prevInvl.EndBlockHeight+1, end, gzSize)
+               info.Interval = append(info.Interval, newInvl)
+       } else {
+               info = util.NewInfo(end, gzSize)
+       }
+       return u.PutInfoJson(info)
+}
similarity index 62%
rename from toolbar/osssync/sync/upload.go
rename to toolbar/osssync/upload/upload.go
index 32d5548..e81512e 100644 (file)
@@ -1,45 +1,80 @@
-package sync
+package upload
 
 import (
        "strconv"
        "time"
 
+       "github.com/aliyun/aliyun-oss-go-sdk/oss"
        log "github.com/sirupsen/logrus"
 
        "github.com/bytom/vapor/protocol/bc/types"
        "github.com/bytom/vapor/toolbar/apinode"
-       "github.com/bytom/vapor/toolbar/osssync/config"
+       "github.com/bytom/vapor/toolbar/osssync/util"
 )
 
+const LOCALDIR = "./blocks/" // Local directory to store temp blocks files
+
+// Run synchronize upload blocks from vapor to OSS
+func Run() error {
+       uploadKeeper, err := NewUploadKeeper()
+       if err != nil {
+               return err
+       }
+
+       uploadKeeper.Run()
+       return nil
+}
+
+// AddInterval if "info.json" exists on OSS, add Interval to the end; if not exist, create "info.json" with Interval
+func AddInterval(end, gzSize uint64) error {
+       uploadKeeper, err := NewUploadKeeper()
+       if err != nil {
+               return err
+       }
+
+       return uploadKeeper.AddInterval(end, gzSize)
+}
+
 // UploadKeeper the struct for upload
 type UploadKeeper struct {
-       Sync *Sync
-       Node   *apinode.Node
+       Node      *apinode.Node
+       OssClient *oss.Client
+       OssBucket *oss.Bucket
+       FileUtil  *util.FileUtil
 }
 
 // NewUploadKeeper return one new instance of UploadKeeper
 func NewUploadKeeper() (*UploadKeeper, error) {
-       cfg := &config.Config{}
-       err := config.LoadConfig(&cfg)
+       cfg := &Config{}
+       err := LoadConfig(&cfg)
        if err != nil {
                return nil, err
        }
 
        node := apinode.NewNode(cfg.VaporURL)
 
-       sync, err := NewSync()
+       ossClient, err := oss.New(cfg.OssConfig.Login.Endpoint, cfg.OssConfig.Login.AccessKeyID, cfg.OssConfig.Login.AccessKeySecret)
+       if err != nil {
+               return nil, err
+       }
+
+       ossBucket, err := ossClient.Bucket(cfg.OssConfig.Bucket)
        if err != nil {
                return nil, err
        }
 
+       fileUtil := util.NewFileUtil(LOCALDIR)
+
        return &UploadKeeper{
-               Sync: sync,
-               Node:   node,
+               Node:      node,
+               OssClient: ossClient,
+               OssBucket: ossBucket,
+               FileUtil:  fileUtil,
        }, nil
 }
 
-// RunSyncUp run synchronize upload to OSS
-func (u *UploadKeeper) RunSyncUp() {
+// Run synchronize upload blocks from vapor to OSS
+func (u *UploadKeeper) Run() {
        ticker := time.NewTicker(time.Minute)
        defer ticker.Stop()
 
@@ -53,7 +88,7 @@ func (u *UploadKeeper) RunSyncUp() {
 
 // Upload find and upload blocks
 func (u *UploadKeeper) Upload() error {
-       err := u.Sync.FileUtil.BlockDirInitial()
+       err := u.FileUtil.BlockDirInitial()
        if err != nil {
                return err
        }
@@ -63,7 +98,7 @@ func (u *UploadKeeper) Upload() error {
                return err
        }
 
-       infoJson, err := u.Sync.GetInfoJson()
+       infoJson, err := u.GetInfoJson()
        if err != nil {
                return err
        }
@@ -83,11 +118,7 @@ func (u *UploadKeeper) Upload() error {
 
        // Upload Whole Interval
        for latestUp+1 < intervals[pos1].StartBlockHeight {
-               if latestUp == 0 {
-                       err = u.UploadFiles(latestUp, intervals[pos2].EndBlockHeight, intervals[pos2].GzSize)
-               } else {
-                       err = u.UploadFiles(latestUp+1, intervals[pos2].EndBlockHeight, intervals[pos2].GzSize)
-               }
+               err = u.UploadFiles(latestUp+1, intervals[pos2].EndBlockHeight, intervals[pos2].GzSize)
                if err != nil {
                        return err
                }
@@ -99,11 +130,7 @@ func (u *UploadKeeper) Upload() error {
        // Upload the last Interval
        newLatestUp := currBlockHeight - ((currBlockHeight - intervals[pos1].StartBlockHeight) % intervals[pos1].GzSize) - 1
        if latestUp < newLatestUp {
-               if latestUp == 0 {
-                       err = u.UploadFiles(latestUp, newLatestUp, intervals[pos1].GzSize)
-               } else {
-                       err = u.UploadFiles(latestUp+1, newLatestUp, intervals[pos1].GzSize)
-               }
+               err = u.UploadFiles(latestUp+1, newLatestUp, intervals[pos1].GzSize)
                if err != nil {
                        return err
                }
@@ -126,32 +153,32 @@ func (u *UploadKeeper) UploadFiles(start, end, size uint64) error {
                filenameJson := filename + ".json"
                filenameGzip := filenameJson + ".gz"
 
-               _, err = u.Sync.FileUtil.SaveBlockFile(filename, blocks)
+               _, err = u.FileUtil.SaveBlockFile(filename, blocks)
                if err != nil {
                        return err
                }
 
-               err = u.Sync.FileUtil.GzipCompress(filename)
+               err = u.FileUtil.GzipCompress(filename)
                if err != nil {
                        return err
                }
 
-               err = u.Sync.OssBucket.PutObjectFromFile(filenameGzip, u.Sync.FileUtil.LocalDir+"/"+filenameGzip)
+               err = u.OssBucket.PutObjectFromFile(filenameGzip, u.FileUtil.LocalDir+filenameGzip)
                if err != nil {
                        return err
                }
 
-               err = u.Sync.SetLatestBlockHeight(start + size - 1)
+               err = u.SetLatestBlockHeight(start + size - 1)
                if err != nil {
                        return err
                }
 
-               err = u.Sync.FileUtil.RemoveLocal(filenameJson)
+               err = u.FileUtil.RemoveLocal(filenameJson)
                if err != nil {
                        return err
                }
 
-               err = u.Sync.FileUtil.RemoveLocal(filenameGzip)
+               err = u.FileUtil.RemoveLocal(filenameGzip)
                if err != nil {
                        return err
                }
index 928bc4d..880c72c 100644 (file)
@@ -16,20 +16,6 @@ func IsExists(path string) bool {
        return true
 }
 
-// IfNoFileToCreate if the file is not exist, create the file
-func IfNoFileToCreate(fileName string) (file *os.File) {
-       var f *os.File
-       var err error
-       if !IsExists(fileName) {
-               f, err = os.Create(fileName)
-               if err != nil {
-                       return
-               }
-               defer f.Close()
-       }
-       return f
-}
-
 // PathExists return if path exists
 func PathExists(path string) (bool, error) {
        _, err := os.Stat(path)
@@ -42,7 +28,7 @@ func PathExists(path string) (bool, error) {
 
 // RemoveLocal deletes file
 func (f *FileUtil) RemoveLocal(filename string) error {
-       return os.Remove(f.LocalDir + "/" + filename)
+       return os.Remove(f.LocalDir + filename)
 }
 
 // BlockDirInitial initializes the blocks directory
index 45c397b..99d9b2a 100644 (file)
@@ -9,7 +9,7 @@ const READ_SIZE = 1024 * 1024 * 500
 
 // GzipCompress compress file to Gzip
 func (f *FileUtil) GzipCompress(fileName string) error {
-       filePath := f.LocalDir + "/" + fileName + ".json.gz"
+       filePath := f.LocalDir + fileName + ".json.gz"
        fw, err := os.Create(filePath)
        if err != nil {
                return err
@@ -20,7 +20,7 @@ func (f *FileUtil) GzipCompress(fileName string) error {
        gw := gzip.NewWriter(fw)
        defer gw.Close()
 
-       filePath = f.LocalDir + "/" + fileName + ".json"
+       filePath = f.LocalDir + fileName + ".json"
        fr, err := os.Open(filePath)
        if err != nil {
                return err
@@ -51,7 +51,7 @@ func (f *FileUtil) GzipCompress(fileName string) error {
 
 // GzipUncompress uncompress Gzip file
 func (f *FileUtil) GzipUncompress(fileName string) error {
-       filedirname := f.LocalDir + "/" + fileName + ".json.gz"
+       filedirname := f.LocalDir + fileName + ".json.gz"
        fr, err := os.Open(filedirname)
        if err != nil {
                return err
@@ -69,7 +69,7 @@ func (f *FileUtil) GzipUncompress(fileName string) error {
        buf := make([]byte, READ_SIZE)
        n, err := gr.Read(buf)
 
-       filedirname = f.LocalDir + "/" + gr.Header.Name
+       filedirname = f.LocalDir + gr.Header.Name
        fw, err := os.Create(filedirname)
        if err != nil {
                return err
diff --git a/toolbar/osssync/util/infofile.go b/toolbar/osssync/util/infofile.go
new file mode 100644 (file)
index 0000000..afc42cc
--- /dev/null
@@ -0,0 +1,53 @@
+package util
+
+import (
+       "io"
+       "io/ioutil"
+)
+
+// Interval determines the number of blocks in a Gzip file in the Interval of blockHeight
+// StartBlockHeight is the start of the Interval
+// EndBlockHeight: the end of the Interval
+// GzSize is the number of blocks store in a Gzip file
+type Interval struct {
+       StartBlockHeight uint64
+       EndBlockHeight   uint64
+       GzSize           uint64
+}
+
+// NewInterval creates a new Interval from info.json
+func NewInterval(start, end, gzSize uint64) *Interval {
+       return &Interval{
+               StartBlockHeight: start,
+               EndBlockHeight:   end,
+               GzSize:           gzSize,
+       }
+}
+
+// Info is a struct for info.json
+type Info struct {
+       LatestBlockHeight uint64
+       Interval          []*Interval
+}
+
+// NewInfo creates a new Info for info.json
+func NewInfo(end, gzSize uint64) *Info {
+       newInvl := NewInterval(0, end, gzSize)
+       var arr []*Interval
+       arr = append(arr, newInvl)
+       return &Info{0, arr}
+}
+
+// GetInfoJson from stream
+func GetInfoJson(body io.ReadCloser) (*Info, error) {
+       defer body.Close()
+
+       data, err := ioutil.ReadAll(body)
+       if err != nil {
+               return nil, err
+       }
+
+       info := new(Info)
+       err = Json2Struct(data, &info)
+       return info, err
+}
index 91821ed..49a07dd 100644 (file)
@@ -12,7 +12,7 @@ func NewFileUtil(localDir string) *FileUtil {
 
 // SaveBlockFile saves block file
 func (f *FileUtil) SaveBlockFile(filename string, data interface{}) (bool, error) {
-       filename = f.LocalDir + "/" + filename + ".json"
+       filename = f.LocalDir + filename + ".json"
        saveData, err := json.Marshal(data)
        if err != nil {
                return false, err
@@ -28,7 +28,7 @@ func (f *FileUtil) SaveBlockFile(filename string, data interface{}) (bool, error
 
 // GetJson read json file
 func (f *FileUtil) GetJson(filename string) (json.RawMessage, error) {
-       filename = f.LocalDir + "/" + filename + ".json"
+       filename = f.LocalDir + filename + ".json"
        return ioutil.ReadFile(filename)
 }