From d923544284a6397f34a60259b6ba9b700e9c3715 Mon Sep 17 00:00:00 2001 From: Outer-God <39217235+Outer-God@users.noreply.github.com> Date: Wed, 31 Mar 2021 10:35:59 +0800 Subject: [PATCH] Osssync (#581) * download * Update run_node.go * down * Create main.go * Update config.go * Update download.go * Update oss.go * up * u * Update oss.go * Update main.go * Update upload.go * err * up * if=0 * Update upload.go * down * Update oss.go Co-authored-by: Welt --- cmd/vapord/commands/run_node.go | 9 ++ toolbar/osssync/download/download.go | 164 +++++++++++++++++++++++++++++++++++ toolbar/osssync/download/oss.go | 48 ++++++++++ toolbar/osssync/main/main.go | 13 +++ toolbar/osssync/upload/config.go | 2 +- toolbar/osssync/upload/oss.go | 9 ++ toolbar/osssync/upload/upload.go | 52 +++++------ toolbar/osssync/util/file.go | 17 ++-- toolbar/osssync/util/gzip.go | 43 +++------ toolbar/osssync/util/infofile.go | 5 +- toolbar/osssync/util/json.go | 5 +- 11 files changed, 295 insertions(+), 72 deletions(-) create mode 100644 toolbar/osssync/download/download.go create mode 100644 toolbar/osssync/download/oss.go create mode 100644 toolbar/osssync/main/main.go diff --git a/cmd/vapord/commands/run_node.go b/cmd/vapord/commands/run_node.go index 18fab866..19abf09e 100644 --- a/cmd/vapord/commands/run_node.go +++ b/cmd/vapord/commands/run_node.go @@ -1,6 +1,7 @@ package commands import ( + "fmt" "strings" "time" @@ -8,6 +9,7 @@ import ( "github.com/spf13/cobra" "github.com/bytom/vapor/node" + "github.com/bytom/vapor/toolbar/osssync/download" ) const logModule = "cmd" @@ -84,6 +86,13 @@ func runNode(cmd *cobra.Command, args []string) error { // Create & start node n := node.NewNode(config) + + // Get blocks from OSS + if err := download.Run(n, config.Oss.Endpoint); err != nil { + fmt.Println("Failed to get blocks from oss: ", err) + } + + // Start node if _, err := n.Start(); err != nil { log.WithFields(log.Fields{"module": logModule, "err": err}).Fatal("failed to start node") } diff --git a/toolbar/osssync/download/download.go b/toolbar/osssync/download/download.go new file mode 100644 index 00000000..4be5b30a --- /dev/null +++ b/toolbar/osssync/download/download.go @@ -0,0 +1,164 @@ +package download + +import ( + "strconv" + + "github.com/bytom/vapor/errors" + "github.com/bytom/vapor/node" + "github.com/bytom/vapor/protocol/bc/types" + "github.com/bytom/vapor/toolbar/osssync/util" +) + +const LOCALDIR = "./toolbar/osssync/blocks/" // Local directory to store temp blocks files + +// Run synchronize download from OSS to local node +func Run(node *node.Node, ossEndpoint string) error { + if ossEndpoint == "" { + return errors.New("OSS Endpoint is empty") + } + + downloadKeeper, err := NewDownloadKeeper(node, ossEndpoint) + if err != nil { + return err + } + + if err = downloadKeeper.Download(); err != nil { + return err + } + + return nil +} + +// DownloadKeeper the struct for download +type DownloadKeeper struct { + Node *node.Node + OssEndpoint string + FileUtil *util.FileUtil +} + +// NewDownloadKeeper return one new instance of DownloadKeeper +func NewDownloadKeeper(node *node.Node, ossEndpoint string) (*DownloadKeeper, error) { + fileUtil := util.NewFileUtil(LOCALDIR) + + return &DownloadKeeper{ + Node: node, + OssEndpoint: "http://" + ossEndpoint + "/", + FileUtil: fileUtil, + }, nil +} + +// Download get blocks from OSS and update the node +func (d *DownloadKeeper) Download() error { + if err := d.FileUtil.BlockDirInitial(); err != nil { + return err + } + + syncStart := d.Node.GetChain().BestBlockHeight() + 1 // block height which the synchronization start from + + infoJson, err := d.GetInfoJson() + if err != nil { + return err + } + + latestUp := infoJson.LatestBlockHeight // Latest uploaded block height on OSS + intervals := infoJson.Interval // Interval array + if latestUp == 0 || latestUp < syncStart { + return errors.New("No new blocks on OSS.") + } + + var pos1, pos2 int // syncStart interval, latestUp interval + // Find pos2 + for pos2 = len(intervals) - 1; latestUp < intervals[pos2].StartBlockHeight; pos2-- { + } + // Find pos1 + if syncStart == 0 { + pos1 = 0 + } else { + for pos1 = pos2; syncStart < intervals[pos1].StartBlockHeight; pos1-- { + } + } + + // Download Whole Interval + for pos1 < pos2 { + if err = d.DownloadFiles(syncStart, intervals[pos1].EndBlockHeight, intervals[pos1]); err != nil { + return err + } + syncStart = intervals[pos1].EndBlockHeight + 1 + pos1++ + } + // Download the last Interval + if pos1 == pos2 { + if err = d.DownloadFiles(syncStart, latestUp, intervals[pos2]); err != nil { + return err + } + } + return nil +} + +// DownloadFiles get block files from OSS, and update the node +func (d *DownloadKeeper) DownloadFiles(start, end uint64, interval *util.Interval) error { + size := interval.GzSize + for { + if start > end { + break + } + + intervalStart := interval.StartBlockHeight + startInFile := start - intervalStart + n := startInFile / size + filenameNum := n*size + intervalStart + + filename := strconv.FormatUint(filenameNum, 10) + filenameJson := filename + ".json" + filenameGzip := filenameJson + ".gz" + + if err := d.GetObjectToFile(filenameGzip); err != nil { + return err + } + + if err := d.FileUtil.GzipDecode(filename); err != nil { + return err + } + + if err := d.FileUtil.RemoveLocal(filenameGzip); err != nil { + return err + } + + blocksJson, err := d.FileUtil.GetJson(filenameJson) + if err != nil { + return err + } + + blocks := []*types.Block{} + if err = util.Json2Struct(blocksJson, &blocks); err != nil { + return err + } + + latestDown := d.Node.GetChain().BestBlockHeight() + if latestDown+1 > start { + blocks = blocks[startInFile:] // start from latestDown+1 + } else if latestDown+1 < start { + return errors.New("Wrong interval") + } + if err = d.SyncToNode(blocks); err != nil { + return err + } + + if err = d.FileUtil.RemoveLocal(filenameJson); err != nil { + return err + } + + start = filenameNum + size + } + return nil +} + +// SyncToNode synchronize blocks to local node +func (d *DownloadKeeper) SyncToNode(blocks []*types.Block) error { + for i := 0; i < len(blocks); i++ { + if _, err := d.Node.GetChain().ProcessBlock(blocks[i]); err != nil { + return err + } + } + return nil +} diff --git a/toolbar/osssync/download/oss.go b/toolbar/osssync/download/oss.go new file mode 100644 index 00000000..66a84861 --- /dev/null +++ b/toolbar/osssync/download/oss.go @@ -0,0 +1,48 @@ +package download + +import ( + "io" + "net/http" + "os" + + "github.com/bytom/vapor/toolbar/osssync/util" +) + +// GetObject download the file object from OSS +func (d *DownloadKeeper) GetObject(filename string) (*io.ReadCloser, error) { + url := d.OssEndpoint + filename + res, err := http.Get(url) + if err != nil { + return nil, err + } + + return &res.Body, nil +} + +// GetObjectToFile download the file object from OSS to local +func (d *DownloadKeeper) GetObjectToFile(filename string) error { + f, err := os.Create(d.FileUtil.LocalDir + filename) + if err != nil { + return err + } + + body, err := d.GetObject(filename) + if err != nil { + return err + } + + defer (*body).Close() + + io.Copy(f, *body) + return nil +} + +// GetInfoJson Download info.json +func (d *DownloadKeeper) GetInfoJson() (*util.Info, error) { + body, err := d.GetObject("info.json") + if err != nil { + return nil, err + } + + return util.GetInfoJson(*body) +} diff --git a/toolbar/osssync/main/main.go b/toolbar/osssync/main/main.go new file mode 100644 index 00000000..04573d41 --- /dev/null +++ b/toolbar/osssync/main/main.go @@ -0,0 +1,13 @@ +package main + +import ( + "fmt" + + "github.com/bytom/vapor/toolbar/osssync/upload" +) + +func main() { + if err := upload.Run(); err != nil { + fmt.Println(err) + } +} diff --git a/toolbar/osssync/upload/config.go b/toolbar/osssync/upload/config.go index b7dcd429..d0f0dc61 100644 --- a/toolbar/osssync/upload/config.go +++ b/toolbar/osssync/upload/config.go @@ -4,7 +4,7 @@ import ( "encoding/json" "os" - "github.com/bytom/bytom/errors" + "github.com/bytom/vapor/errors" ) // Config represent root of config diff --git a/toolbar/osssync/upload/oss.go b/toolbar/osssync/upload/oss.go index e2fd6cf4..e833d99b 100644 --- a/toolbar/osssync/upload/oss.go +++ b/toolbar/osssync/upload/oss.go @@ -5,6 +5,7 @@ import ( "github.com/aliyun/aliyun-oss-go-sdk/oss" + "github.com/bytom/vapor/errors" "github.com/bytom/vapor/toolbar/osssync/util" ) @@ -63,6 +64,14 @@ func (u *UploadKeeper) AddInterval(end, gzSize uint64) error { // Add Interval prevInvl := info.Interval[len(info.Interval)-1] + if prevInvl.EndBlockHeight >= end { + return errors.New("New interval is included in previous intervals.") + } + + if (end-prevInvl.EndBlockHeight)%gzSize != 0 { + return errors.New("New interval is invalid.") + } + newInvl := util.NewInterval(prevInvl.EndBlockHeight+1, end, gzSize) info.Interval = append(info.Interval, newInvl) } else { diff --git a/toolbar/osssync/upload/upload.go b/toolbar/osssync/upload/upload.go index e81512ec..a0960ff1 100644 --- a/toolbar/osssync/upload/upload.go +++ b/toolbar/osssync/upload/upload.go @@ -7,6 +7,7 @@ import ( "github.com/aliyun/aliyun-oss-go-sdk/oss" log "github.com/sirupsen/logrus" + "github.com/bytom/vapor/errors" "github.com/bytom/vapor/protocol/bc/types" "github.com/bytom/vapor/toolbar/apinode" "github.com/bytom/vapor/toolbar/osssync/util" @@ -46,8 +47,7 @@ type UploadKeeper struct { // NewUploadKeeper return one new instance of UploadKeeper func NewUploadKeeper() (*UploadKeeper, error) { cfg := &Config{} - err := LoadConfig(&cfg) - if err != nil { + if err := LoadConfig(&cfg); err != nil { return nil, err } @@ -79,17 +79,15 @@ func (u *UploadKeeper) Run() { defer ticker.Stop() for ; true; <-ticker.C { - err := u.Upload() - if err != nil { - log.WithField("error", err).Errorln("blockKeeper fail on process block") + if err := u.Upload(); err != nil { + log.WithField("error", err).Errorln("blockKeeper fail") } } } // Upload find and upload blocks func (u *UploadKeeper) Upload() error { - err := u.FileUtil.BlockDirInitial() - if err != nil { + if err := u.FileUtil.BlockDirInitial(); err != nil { return err } @@ -98,6 +96,10 @@ func (u *UploadKeeper) Upload() error { return err } + if currBlockHeight == 0 { + return errors.New("Current block height is 0.") + } + infoJson, err := u.GetInfoJson() if err != nil { return err @@ -107,19 +109,24 @@ func (u *UploadKeeper) Upload() error { intervals := infoJson.Interval // Interval array var pos1, pos2 int // currBlockHeight interval, latestUp interval + // Find pos1 for pos1 = len(intervals) - 1; currBlockHeight < intervals[pos1].StartBlockHeight; pos1-- { } // Current Block Height is out of the range given by info.json if currBlockHeight > intervals[pos1].EndBlockHeight { currBlockHeight = intervals[pos1].EndBlockHeight // Upload the part which contained by info.json } - for pos2 = pos1; latestUp < intervals[pos2].StartBlockHeight; pos2-- { + // Find pos2 + if latestUp == 0 { + pos2 = 0 + } else { + for pos2 = pos1; latestUp < intervals[pos2].StartBlockHeight; pos2-- { + } } // Upload Whole Interval for latestUp+1 < intervals[pos1].StartBlockHeight { - err = u.UploadFiles(latestUp+1, intervals[pos2].EndBlockHeight, intervals[pos2].GzSize) - if err != nil { + if err = u.UploadFiles(latestUp+1, intervals[pos2].EndBlockHeight, intervals[pos2].GzSize); err != nil { return err } @@ -128,14 +135,13 @@ func (u *UploadKeeper) Upload() error { } // Upload the last Interval - newLatestUp := currBlockHeight - ((currBlockHeight - intervals[pos1].StartBlockHeight) % intervals[pos1].GzSize) - 1 + newLatestUp := currBlockHeight - ((currBlockHeight - intervals[pos1].StartBlockHeight + 1) % intervals[pos1].GzSize) if latestUp < newLatestUp { - err = u.UploadFiles(latestUp+1, newLatestUp, intervals[pos1].GzSize) - if err != nil { + if err = u.UploadFiles(latestUp+1, newLatestUp, intervals[pos1].GzSize); err != nil { return err } } - return err + return nil } // UploadFiles get block from vapor and upload files to OSS @@ -153,33 +159,27 @@ func (u *UploadKeeper) UploadFiles(start, end, size uint64) error { filenameJson := filename + ".json" filenameGzip := filenameJson + ".gz" - _, err = u.FileUtil.SaveBlockFile(filename, blocks) - if err != nil { + if _, err = u.FileUtil.SaveBlockFile(filename, blocks); err != nil { return err } - err = u.FileUtil.GzipCompress(filename) - if err != nil { + if err = u.FileUtil.GzipCompress(filename); err != nil { return err } - err = u.OssBucket.PutObjectFromFile(filenameGzip, u.FileUtil.LocalDir+filenameGzip) - if err != nil { + if err = u.FileUtil.RemoveLocal(filenameJson); err != nil { return err } - err = u.SetLatestBlockHeight(start + size - 1) - if err != nil { + if err = u.OssBucket.PutObjectFromFile(filenameGzip, u.FileUtil.LocalDir+filenameGzip); err != nil { return err } - err = u.FileUtil.RemoveLocal(filenameJson) - if err != nil { + if err = u.SetLatestBlockHeight(start + size - 1); err != nil { return err } - err = u.FileUtil.RemoveLocal(filenameGzip) - if err != nil { + if err = u.FileUtil.RemoveLocal(filenameGzip); err != nil { return err } diff --git a/toolbar/osssync/util/file.go b/toolbar/osssync/util/file.go index 880c72c9..46a30f54 100644 --- a/toolbar/osssync/util/file.go +++ b/toolbar/osssync/util/file.go @@ -9,21 +9,20 @@ type FileUtil struct { // IsExists if file or directory exist func IsExists(path string) bool { - _, err := os.Stat(path) - if err != nil && !os.IsExist(err) { + if _, err := os.Stat(path); err != nil && !os.IsExist(err) { return false } + return true } // PathExists return if path exists func PathExists(path string) (bool, error) { - _, err := os.Stat(path) - if err == nil { - return true, nil + if _, err := os.Stat(path); err != nil { + return false, err } - return false, err + return true, nil } // RemoveLocal deletes file @@ -39,12 +38,10 @@ func (f *FileUtil) BlockDirInitial() error { } if ifPathExist { - err = os.RemoveAll(f.LocalDir) - if err != nil { + if err = os.RemoveAll(f.LocalDir); err != nil { return err } } - err = os.Mkdir(f.LocalDir, 0755) - return err + return os.Mkdir(f.LocalDir, 0755) } diff --git a/toolbar/osssync/util/gzip.go b/toolbar/osssync/util/gzip.go index 99d9b2ad..1fe23c79 100644 --- a/toolbar/osssync/util/gzip.go +++ b/toolbar/osssync/util/gzip.go @@ -2,15 +2,13 @@ package util import ( "compress/gzip" + "io/ioutil" "os" ) -const READ_SIZE = 1024 * 1024 * 500 - -// GzipCompress compress file to Gzip +// GzipCompress Encode file to Gzip and save to the same directory func (f *FileUtil) GzipCompress(fileName string) error { - filePath := f.LocalDir + fileName + ".json.gz" - fw, err := os.Create(filePath) + fw, err := os.Create(f.LocalDir + fileName + ".json.gz") if err != nil { return err } @@ -20,8 +18,7 @@ func (f *FileUtil) GzipCompress(fileName string) error { gw := gzip.NewWriter(fw) defer gw.Close() - filePath = f.LocalDir + fileName + ".json" - fr, err := os.Open(filePath) + fr, err := os.Open(f.LocalDir + fileName + ".json") if err != nil { return err } @@ -36,49 +33,37 @@ func (f *FileUtil) GzipCompress(fileName string) error { gw.Header.Name = fi.Name() buf := make([]byte, fi.Size()) - _, err = fr.Read(buf) - if err != nil { + if _, err = fr.Read(buf); err != nil { return err } - _, err = gw.Write(buf) - if err != nil { + if _, err = gw.Write(buf); err != nil { return err } - return err + return nil } -// GzipUncompress uncompress Gzip file -func (f *FileUtil) GzipUncompress(fileName string) error { - filedirname := f.LocalDir + fileName + ".json.gz" - fr, err := os.Open(filedirname) +// GzipDecode Decode Gzip file and save to the same directory +func (f *FileUtil) GzipDecode(fileName string) error { + fr, err := os.Open(f.LocalDir + fileName + ".json.gz") if err != nil { return err } defer fr.Close() - gr, err := gzip.NewReader(fr) + reader, err := gzip.NewReader(fr) if err != nil { return err } - defer gr.Close() - - buf := make([]byte, READ_SIZE) - n, err := gr.Read(buf) - - filedirname = f.LocalDir + gr.Header.Name - fw, err := os.Create(filedirname) - if err != nil { - return err - } + defer reader.Close() - _, err = fw.Write(buf[:n]) + json, err := ioutil.ReadAll(reader) if err != nil { return err } - return err + return ioutil.WriteFile(f.LocalDir+fileName+".json", json, 0644) } diff --git a/toolbar/osssync/util/infofile.go b/toolbar/osssync/util/infofile.go index afc42cc5..82f15d2b 100644 --- a/toolbar/osssync/util/infofile.go +++ b/toolbar/osssync/util/infofile.go @@ -32,7 +32,7 @@ type Info struct { // NewInfo creates a new Info for info.json func NewInfo(end, gzSize uint64) *Info { - newInvl := NewInterval(0, end, gzSize) + newInvl := NewInterval(1, end, gzSize) var arr []*Interval arr = append(arr, newInvl) return &Info{0, arr} @@ -48,6 +48,5 @@ func GetInfoJson(body io.ReadCloser) (*Info, error) { } info := new(Info) - err = Json2Struct(data, &info) - return info, err + return info, Json2Struct(data, &info) } diff --git a/toolbar/osssync/util/json.go b/toolbar/osssync/util/json.go index 49a07dd4..c3795cca 100644 --- a/toolbar/osssync/util/json.go +++ b/toolbar/osssync/util/json.go @@ -18,8 +18,7 @@ func (f *FileUtil) SaveBlockFile(filename string, data interface{}) (bool, error return false, err } - err = ioutil.WriteFile(filename, saveData, 0644) - if err != nil { + if err = ioutil.WriteFile(filename, saveData, 0644); err != nil { return false, err } @@ -28,7 +27,7 @@ func (f *FileUtil) SaveBlockFile(filename string, data interface{}) (bool, error // GetJson read json file func (f *FileUtil) GetJson(filename string) (json.RawMessage, error) { - filename = f.LocalDir + filename + ".json" + filename = f.LocalDir + filename return ioutil.ReadFile(filename) } -- 2.11.0