From 34cb08961967eadae6ce01b0898cae97a197b37e Mon Sep 17 00:00:00 2001 From: Outer-God <39217235+Outer-God@users.noreply.github.com> Date: Wed, 24 Mar 2021 18:02:02 +0800 Subject: [PATCH] Osssync (#580) * up * up * up * up * Update upload.go * Update upload.go * Update upload.go * Update upload.go * Update upload.go * Update oss.go * Update upload.go Co-authored-by: Welt --- cmd/vapord/commands/run_node.go | 3 + config/config.go | 11 ++++ toolbar/osssync/sync/infofile.go | 95 ---------------------------- toolbar/osssync/sync/oss.go | 31 --------- toolbar/osssync/sync/sync.go | 42 ------------ toolbar/osssync/{config => upload}/config.go | 16 +++-- toolbar/osssync/upload/oss.go | 72 +++++++++++++++++++++ toolbar/osssync/{sync => upload}/upload.go | 85 ++++++++++++++++--------- toolbar/osssync/util/file.go | 16 +---- toolbar/osssync/util/gzip.go | 8 +-- toolbar/osssync/util/infofile.go | 53 ++++++++++++++++ toolbar/osssync/util/json.go | 4 +- 12 files changed, 213 insertions(+), 223 deletions(-) delete mode 100644 toolbar/osssync/sync/infofile.go delete mode 100644 toolbar/osssync/sync/oss.go delete mode 100644 toolbar/osssync/sync/sync.go rename toolbar/osssync/{config => upload}/config.go (72%) create mode 100644 toolbar/osssync/upload/oss.go rename toolbar/osssync/{sync => upload}/upload.go (62%) create mode 100644 toolbar/osssync/util/infofile.go diff --git a/cmd/vapord/commands/run_node.go b/cmd/vapord/commands/run_node.go index af75e323..18fab866 100644 --- a/cmd/vapord/commands/run_node.go +++ b/cmd/vapord/commands/run_node.go @@ -55,6 +55,9 @@ func init() { runNodeCmd.Flags().Int("ws.max_num_websockets", config.Websocket.MaxNumWebsockets, "Max number of websocket connections") runNodeCmd.Flags().Int("ws.max_num_concurrent_reqs", config.Websocket.MaxNumConcurrentReqs, "Max number of concurrent websocket requests that may be processed concurrently") + // OSS + runNodeCmd.Flags().String("oss.endpoint", config.Oss.Endpoint, "Endpoint of OSS") + RootCmd.AddCommand(runNodeCmd) } diff --git a/config/config.go b/config/config.go index e9490f91..0ef07e55 100644 --- a/config/config.go +++ b/config/config.go @@ -29,6 +29,7 @@ type Config struct { Websocket *WebsocketConfig `mapstructure:"ws"` Federation *FederationConfig `mapstructure:"federation"` CrossChain *CrossChainConfig `mapstructure:"cross_chain"` + Oss *OssConfig `mapstructure:"oss"` } // Default configurable parameters. @@ -42,6 +43,7 @@ func DefaultConfig() *Config { Websocket: DefaultWebsocketConfig(), Federation: DefaultFederationConfig(), CrossChain: DefaultCrossChainConfig(), + Oss: DefaultOssConfig(), } } @@ -220,6 +222,10 @@ type CrossChainConfig struct { AssetWhitelist string `mapstructure:"asset_whitelist"` } +type OssConfig struct { + Endpoint string `mapstructure:"endpoint"` +} + // Default configurable rpc's auth parameters. func DefaultRPCAuthConfig() *RPCAuthConfig { return &RPCAuthConfig{ @@ -277,6 +283,11 @@ func xpub(str string) (xpub chainkd.XPub) { return xpub } +// Default configurable oss parameters. +func DefaultOssConfig() *OssConfig { + return &OssConfig{} +} + //----------------------------------------------------------------------------- // Utils diff --git a/toolbar/osssync/sync/infofile.go b/toolbar/osssync/sync/infofile.go deleted file mode 100644 index 47860ecf..00000000 --- a/toolbar/osssync/sync/infofile.go +++ /dev/null @@ -1,95 +0,0 @@ -package sync - -import "github.com/bytom/vapor/toolbar/osssync/util" - -// Interval determines the number of blocks in a Gzip file in the Interval of blockHeight -// StartBlockHeight is the start of the Interval -// EndBlockHeight: the end of the Interval -// GzSize is the number of blocks store in a Gzip file -type Interval struct { - StartBlockHeight uint64 - EndBlockHeight uint64 - GzSize uint64 -} - -// NewInterval creates a new Interval from info.json -func NewInterval(start, end, gzSize uint64) *Interval { - return &Interval{ - StartBlockHeight: start, - EndBlockHeight: end, - GzSize: gzSize, - } -} - -// Info is a struct for info.json -type Info struct { - LatestBlockHeight uint64 - Interval []*Interval -} - -// NewInfo creates a new Info for info.json -func NewInfo(end, gzSize uint64) *Info { - newInvl := NewInterval(0, end, gzSize) - var arr []*Interval - arr = append(arr, newInvl) - return &Info{0, arr} -} - -// GetInfoJson Download info.json -func (s *Sync) GetInfoJson() (*Info, error) { - data, err := s.GetObjToData("info.json") - if err != nil { - return nil, err - } - - info := new(Info) - err = util.Json2Struct(data, &info) - return info, err -} - -// Upload info.json -func (s *Sync) PutInfoJson(infoData *Info) error { - jsonData, err := util.Struct2Json(infoData) - if err != nil { - return err - } - - // Upload - return s.PutObjByteArr("info.json", jsonData) -} - -// SetLatestBlockHeight set new latest blockHeight on OSS -func (s *Sync) SetLatestBlockHeight(newLatestBlockHeight uint64) error { - info, err := s.GetInfoJson() - if err != nil { - return err - } - - info.LatestBlockHeight = newLatestBlockHeight - return s.PutInfoJson(info) -} - -// AddInterval adds an interval to the end of info.json -func (s *Sync) AddInterval(end, gzSize uint64) error { - isJsonExist, err := s.OssBucket.IsObjectExist("info.json") - if err != nil { - return err - } - - var info *Info - if isJsonExist { - // Download info.json - info, err = s.GetInfoJson() - if err != nil { - return err - } - - // Add Interval - prevInvl := info.Interval[len(info.Interval)-1] - newInvl := NewInterval(prevInvl.EndBlockHeight+1, end, gzSize) - info.Interval = append(info.Interval, newInvl) - } else { - info = NewInfo(end, gzSize) - } - return s.PutInfoJson(info) -} diff --git a/toolbar/osssync/sync/oss.go b/toolbar/osssync/sync/oss.go deleted file mode 100644 index 74bed9c0..00000000 --- a/toolbar/osssync/sync/oss.go +++ /dev/null @@ -1,31 +0,0 @@ -package sync - -import ( - "bytes" - "io/ioutil" - - "github.com/aliyun/aliyun-oss-go-sdk/oss" -) - -// PutObjByteArr upload Byte Array object -func (s *Sync) PutObjByteArr(objectName string, objectValue []byte) error { - objectAcl := oss.ObjectACL(oss.ACLPublicRead) - return s.OssBucket.PutObject(objectName, bytes.NewReader(objectValue), objectAcl) -} - -// GetObjToData download object to stream -func (s *Sync) GetObjToData(objectName string) ([]byte, error) { - body, err := s.OssBucket.GetObject(objectName) - if err != nil { - return nil, err - } - - defer body.Close() - - data, err := ioutil.ReadAll(body) - if err != nil { - return nil, err - } - - return data, err -} diff --git a/toolbar/osssync/sync/sync.go b/toolbar/osssync/sync/sync.go deleted file mode 100644 index 1c1e75cc..00000000 --- a/toolbar/osssync/sync/sync.go +++ /dev/null @@ -1,42 +0,0 @@ -package sync - -import ( - "github.com/aliyun/aliyun-oss-go-sdk/oss" - - "github.com/bytom/vapor/toolbar/osssync/config" - "github.com/bytom/vapor/toolbar/osssync/util" -) - -// Sync the struct of the Sync -type Sync struct { - OssClient *oss.Client - OssBucket *oss.Bucket - FileUtil *util.FileUtil -} - -// NewSync return one new instance of Sync -func NewSync() (*Sync, error) { - cfg := &config.Config{} - err := config.LoadConfig(&cfg) - if err != nil { - return nil, err - } - - ossClient, err := oss.New(cfg.Oss.Endpoint, cfg.Oss.AccessKeyID, cfg.Oss.AccessKeySecret) - if err != nil { - return nil, err - } - - ossBucket, err := ossClient.Bucket("bytom-seed") - if err != nil { - return nil, err - } - - fileUtil := util.NewFileUtil("./blocks") - - return &Sync{ - OssClient: ossClient, - OssBucket: ossBucket, - FileUtil: fileUtil, - }, nil -} diff --git a/toolbar/osssync/config/config.go b/toolbar/osssync/upload/config.go similarity index 72% rename from toolbar/osssync/config/config.go rename to toolbar/osssync/upload/config.go index be4c2a82..b7dcd429 100644 --- a/toolbar/osssync/config/config.go +++ b/toolbar/osssync/upload/config.go @@ -1,4 +1,4 @@ -package config +package upload import ( "encoding/json" @@ -9,18 +9,24 @@ import ( // Config represent root of config type Config struct { - Oss Oss `json:"oss"` - VaporURL string `json:"vapor_url"` + OssConfig *OssConfig `json:"oss_config"` + VaporURL string `json:"vapor_url"` } // Oss logs cfg -type Oss struct { +type Login struct { Endpoint string `json:"endpoint"` AccessKeyID string `json:"access_key_id"` AccessKeySecret string `json:"access_key_secret"` } -// LoadConfig read path file to the config object +// Oss cfg +type OssConfig struct { + Login *Login `json:"login"` + Bucket string `json:"bucket"` +} + +// LoadConfig read path file to the config object for Upload from Vapor to OSS func LoadConfig(config interface{}) error { if len(os.Args) <= 1 { return errors.New("Please setup the config file path as Args[1]") diff --git a/toolbar/osssync/upload/oss.go b/toolbar/osssync/upload/oss.go new file mode 100644 index 00000000..e2fd6cf4 --- /dev/null +++ b/toolbar/osssync/upload/oss.go @@ -0,0 +1,72 @@ +package upload + +import ( + "bytes" + + "github.com/aliyun/aliyun-oss-go-sdk/oss" + + "github.com/bytom/vapor/toolbar/osssync/util" +) + +// PutObjByteArr upload Byte Array object +func (u *UploadKeeper) PutObjByteArr(objectName string, objectValue []byte) error { + objectAcl := oss.ObjectACL(oss.ACLPublicRead) + return u.OssBucket.PutObject(objectName, bytes.NewReader(objectValue), objectAcl) +} + +// GetInfoJson Download info.json +func (u *UploadKeeper) GetInfoJson() (*util.Info, error) { + body, err := u.OssBucket.GetObject("info.json") + if err != nil { + return nil, err + } + + return util.GetInfoJson(body) +} + +// Upload info.json +func (u *UploadKeeper) PutInfoJson(infoData *util.Info) error { + jsonData, err := util.Struct2Json(infoData) + if err != nil { + return err + } + + // Upload + return u.PutObjByteArr("info.json", jsonData) +} + +// SetLatestBlockHeight set new latest blockHeight on OSS +func (u *UploadKeeper) SetLatestBlockHeight(newLatestBlockHeight uint64) error { + info, err := u.GetInfoJson() + if err != nil { + return err + } + + info.LatestBlockHeight = newLatestBlockHeight + return u.PutInfoJson(info) +} + +// AddInterval if "info.json" exists on OSS, add Interval to the end; if not exist, create "info.json" with Interval +func (u *UploadKeeper) AddInterval(end, gzSize uint64) error { + isJsonExist, err := u.OssBucket.IsObjectExist("info.json") + if err != nil { + return err + } + + var info *util.Info + if isJsonExist { + // Download info.json + info, err = u.GetInfoJson() + if err != nil { + return err + } + + // Add Interval + prevInvl := info.Interval[len(info.Interval)-1] + newInvl := util.NewInterval(prevInvl.EndBlockHeight+1, end, gzSize) + info.Interval = append(info.Interval, newInvl) + } else { + info = util.NewInfo(end, gzSize) + } + return u.PutInfoJson(info) +} diff --git a/toolbar/osssync/sync/upload.go b/toolbar/osssync/upload/upload.go similarity index 62% rename from toolbar/osssync/sync/upload.go rename to toolbar/osssync/upload/upload.go index 32d5548a..e81512ec 100644 --- a/toolbar/osssync/sync/upload.go +++ b/toolbar/osssync/upload/upload.go @@ -1,45 +1,80 @@ -package sync +package upload import ( "strconv" "time" + "github.com/aliyun/aliyun-oss-go-sdk/oss" log "github.com/sirupsen/logrus" "github.com/bytom/vapor/protocol/bc/types" "github.com/bytom/vapor/toolbar/apinode" - "github.com/bytom/vapor/toolbar/osssync/config" + "github.com/bytom/vapor/toolbar/osssync/util" ) +const LOCALDIR = "./blocks/" // Local directory to store temp blocks files + +// Run synchronize upload blocks from vapor to OSS +func Run() error { + uploadKeeper, err := NewUploadKeeper() + if err != nil { + return err + } + + uploadKeeper.Run() + return nil +} + +// AddInterval if "info.json" exists on OSS, add Interval to the end; if not exist, create "info.json" with Interval +func AddInterval(end, gzSize uint64) error { + uploadKeeper, err := NewUploadKeeper() + if err != nil { + return err + } + + return uploadKeeper.AddInterval(end, gzSize) +} + // UploadKeeper the struct for upload type UploadKeeper struct { - Sync *Sync - Node *apinode.Node + Node *apinode.Node + OssClient *oss.Client + OssBucket *oss.Bucket + FileUtil *util.FileUtil } // NewUploadKeeper return one new instance of UploadKeeper func NewUploadKeeper() (*UploadKeeper, error) { - cfg := &config.Config{} - err := config.LoadConfig(&cfg) + cfg := &Config{} + err := LoadConfig(&cfg) if err != nil { return nil, err } node := apinode.NewNode(cfg.VaporURL) - sync, err := NewSync() + ossClient, err := oss.New(cfg.OssConfig.Login.Endpoint, cfg.OssConfig.Login.AccessKeyID, cfg.OssConfig.Login.AccessKeySecret) + if err != nil { + return nil, err + } + + ossBucket, err := ossClient.Bucket(cfg.OssConfig.Bucket) if err != nil { return nil, err } + fileUtil := util.NewFileUtil(LOCALDIR) + return &UploadKeeper{ - Sync: sync, - Node: node, + Node: node, + OssClient: ossClient, + OssBucket: ossBucket, + FileUtil: fileUtil, }, nil } -// RunSyncUp run synchronize upload to OSS -func (u *UploadKeeper) RunSyncUp() { +// Run synchronize upload blocks from vapor to OSS +func (u *UploadKeeper) Run() { ticker := time.NewTicker(time.Minute) defer ticker.Stop() @@ -53,7 +88,7 @@ func (u *UploadKeeper) RunSyncUp() { // Upload find and upload blocks func (u *UploadKeeper) Upload() error { - err := u.Sync.FileUtil.BlockDirInitial() + err := u.FileUtil.BlockDirInitial() if err != nil { return err } @@ -63,7 +98,7 @@ func (u *UploadKeeper) Upload() error { return err } - infoJson, err := u.Sync.GetInfoJson() + infoJson, err := u.GetInfoJson() if err != nil { return err } @@ -83,11 +118,7 @@ func (u *UploadKeeper) Upload() error { // Upload Whole Interval for latestUp+1 < intervals[pos1].StartBlockHeight { - if latestUp == 0 { - err = u.UploadFiles(latestUp, intervals[pos2].EndBlockHeight, intervals[pos2].GzSize) - } else { - err = u.UploadFiles(latestUp+1, intervals[pos2].EndBlockHeight, intervals[pos2].GzSize) - } + err = u.UploadFiles(latestUp+1, intervals[pos2].EndBlockHeight, intervals[pos2].GzSize) if err != nil { return err } @@ -99,11 +130,7 @@ func (u *UploadKeeper) Upload() error { // Upload the last Interval newLatestUp := currBlockHeight - ((currBlockHeight - intervals[pos1].StartBlockHeight) % intervals[pos1].GzSize) - 1 if latestUp < newLatestUp { - if latestUp == 0 { - err = u.UploadFiles(latestUp, newLatestUp, intervals[pos1].GzSize) - } else { - err = u.UploadFiles(latestUp+1, newLatestUp, intervals[pos1].GzSize) - } + err = u.UploadFiles(latestUp+1, newLatestUp, intervals[pos1].GzSize) if err != nil { return err } @@ -126,32 +153,32 @@ func (u *UploadKeeper) UploadFiles(start, end, size uint64) error { filenameJson := filename + ".json" filenameGzip := filenameJson + ".gz" - _, err = u.Sync.FileUtil.SaveBlockFile(filename, blocks) + _, err = u.FileUtil.SaveBlockFile(filename, blocks) if err != nil { return err } - err = u.Sync.FileUtil.GzipCompress(filename) + err = u.FileUtil.GzipCompress(filename) if err != nil { return err } - err = u.Sync.OssBucket.PutObjectFromFile(filenameGzip, u.Sync.FileUtil.LocalDir+"/"+filenameGzip) + err = u.OssBucket.PutObjectFromFile(filenameGzip, u.FileUtil.LocalDir+filenameGzip) if err != nil { return err } - err = u.Sync.SetLatestBlockHeight(start + size - 1) + err = u.SetLatestBlockHeight(start + size - 1) if err != nil { return err } - err = u.Sync.FileUtil.RemoveLocal(filenameJson) + err = u.FileUtil.RemoveLocal(filenameJson) if err != nil { return err } - err = u.Sync.FileUtil.RemoveLocal(filenameGzip) + err = u.FileUtil.RemoveLocal(filenameGzip) if err != nil { return err } diff --git a/toolbar/osssync/util/file.go b/toolbar/osssync/util/file.go index 928bc4d5..880c72c9 100644 --- a/toolbar/osssync/util/file.go +++ b/toolbar/osssync/util/file.go @@ -16,20 +16,6 @@ func IsExists(path string) bool { return true } -// IfNoFileToCreate if the file is not exist, create the file -func IfNoFileToCreate(fileName string) (file *os.File) { - var f *os.File - var err error - if !IsExists(fileName) { - f, err = os.Create(fileName) - if err != nil { - return - } - defer f.Close() - } - return f -} - // PathExists return if path exists func PathExists(path string) (bool, error) { _, err := os.Stat(path) @@ -42,7 +28,7 @@ func PathExists(path string) (bool, error) { // RemoveLocal deletes file func (f *FileUtil) RemoveLocal(filename string) error { - return os.Remove(f.LocalDir + "/" + filename) + return os.Remove(f.LocalDir + filename) } // BlockDirInitial initializes the blocks directory diff --git a/toolbar/osssync/util/gzip.go b/toolbar/osssync/util/gzip.go index 45c397b1..99d9b2ad 100644 --- a/toolbar/osssync/util/gzip.go +++ b/toolbar/osssync/util/gzip.go @@ -9,7 +9,7 @@ const READ_SIZE = 1024 * 1024 * 500 // GzipCompress compress file to Gzip func (f *FileUtil) GzipCompress(fileName string) error { - filePath := f.LocalDir + "/" + fileName + ".json.gz" + filePath := f.LocalDir + fileName + ".json.gz" fw, err := os.Create(filePath) if err != nil { return err @@ -20,7 +20,7 @@ func (f *FileUtil) GzipCompress(fileName string) error { gw := gzip.NewWriter(fw) defer gw.Close() - filePath = f.LocalDir + "/" + fileName + ".json" + filePath = f.LocalDir + fileName + ".json" fr, err := os.Open(filePath) if err != nil { return err @@ -51,7 +51,7 @@ func (f *FileUtil) GzipCompress(fileName string) error { // GzipUncompress uncompress Gzip file func (f *FileUtil) GzipUncompress(fileName string) error { - filedirname := f.LocalDir + "/" + fileName + ".json.gz" + filedirname := f.LocalDir + fileName + ".json.gz" fr, err := os.Open(filedirname) if err != nil { return err @@ -69,7 +69,7 @@ func (f *FileUtil) GzipUncompress(fileName string) error { buf := make([]byte, READ_SIZE) n, err := gr.Read(buf) - filedirname = f.LocalDir + "/" + gr.Header.Name + filedirname = f.LocalDir + gr.Header.Name fw, err := os.Create(filedirname) if err != nil { return err diff --git a/toolbar/osssync/util/infofile.go b/toolbar/osssync/util/infofile.go new file mode 100644 index 00000000..afc42cc5 --- /dev/null +++ b/toolbar/osssync/util/infofile.go @@ -0,0 +1,53 @@ +package util + +import ( + "io" + "io/ioutil" +) + +// Interval determines the number of blocks in a Gzip file in the Interval of blockHeight +// StartBlockHeight is the start of the Interval +// EndBlockHeight: the end of the Interval +// GzSize is the number of blocks store in a Gzip file +type Interval struct { + StartBlockHeight uint64 + EndBlockHeight uint64 + GzSize uint64 +} + +// NewInterval creates a new Interval from info.json +func NewInterval(start, end, gzSize uint64) *Interval { + return &Interval{ + StartBlockHeight: start, + EndBlockHeight: end, + GzSize: gzSize, + } +} + +// Info is a struct for info.json +type Info struct { + LatestBlockHeight uint64 + Interval []*Interval +} + +// NewInfo creates a new Info for info.json +func NewInfo(end, gzSize uint64) *Info { + newInvl := NewInterval(0, end, gzSize) + var arr []*Interval + arr = append(arr, newInvl) + return &Info{0, arr} +} + +// GetInfoJson from stream +func GetInfoJson(body io.ReadCloser) (*Info, error) { + defer body.Close() + + data, err := ioutil.ReadAll(body) + if err != nil { + return nil, err + } + + info := new(Info) + err = Json2Struct(data, &info) + return info, err +} diff --git a/toolbar/osssync/util/json.go b/toolbar/osssync/util/json.go index 91821edd..49a07dd4 100644 --- a/toolbar/osssync/util/json.go +++ b/toolbar/osssync/util/json.go @@ -12,7 +12,7 @@ func NewFileUtil(localDir string) *FileUtil { // SaveBlockFile saves block file func (f *FileUtil) SaveBlockFile(filename string, data interface{}) (bool, error) { - filename = f.LocalDir + "/" + filename + ".json" + filename = f.LocalDir + filename + ".json" saveData, err := json.Marshal(data) if err != nil { return false, err @@ -28,7 +28,7 @@ func (f *FileUtil) SaveBlockFile(filename string, data interface{}) (bool, error // GetJson read json file func (f *FileUtil) GetJson(filename string) (json.RawMessage, error) { - filename = f.LocalDir + "/" + filename + ".json" + filename = f.LocalDir + filename + ".json" return ioutil.ReadFile(filename) } -- 2.11.0