OSDN Git Service

Osssync (#581)
authorOuter-God <39217235+Outer-God@users.noreply.github.com>
Wed, 31 Mar 2021 02:35:59 +0000 (10:35 +0800)
committerGitHub <noreply@github.com>
Wed, 31 Mar 2021 02:35:59 +0000 (10:35 +0800)
* download

* Update run_node.go

* down

* Create main.go

* Update config.go

* Update download.go

* Update oss.go

* up

* u

* Update oss.go

* Update main.go

* Update upload.go

* err

* up

* if=0

* Update upload.go

* down

* Update oss.go

Co-authored-by: Welt <L5Accelerator@users.noreply.github.com>
cmd/vapord/commands/run_node.go
toolbar/osssync/download/download.go [new file with mode: 0644]
toolbar/osssync/download/oss.go [new file with mode: 0644]
toolbar/osssync/main/main.go [new file with mode: 0644]
toolbar/osssync/upload/config.go
toolbar/osssync/upload/oss.go
toolbar/osssync/upload/upload.go
toolbar/osssync/util/file.go
toolbar/osssync/util/gzip.go
toolbar/osssync/util/infofile.go
toolbar/osssync/util/json.go

index 18fab86..19abf09 100644 (file)
@@ -1,6 +1,7 @@
 package commands
 
 import (
+       "fmt"
        "strings"
        "time"
 
@@ -8,6 +9,7 @@ import (
        "github.com/spf13/cobra"
 
        "github.com/bytom/vapor/node"
+       "github.com/bytom/vapor/toolbar/osssync/download"
 )
 
 const logModule = "cmd"
@@ -84,6 +86,13 @@ func runNode(cmd *cobra.Command, args []string) error {
 
        // Create & start node
        n := node.NewNode(config)
+
+       // Get blocks from OSS
+       if err := download.Run(n, config.Oss.Endpoint); err != nil {
+               fmt.Println("Failed to get blocks from oss: ", err)
+       }
+
+       // Start node
        if _, err := n.Start(); err != nil {
                log.WithFields(log.Fields{"module": logModule, "err": err}).Fatal("failed to start node")
        }
diff --git a/toolbar/osssync/download/download.go b/toolbar/osssync/download/download.go
new file mode 100644 (file)
index 0000000..4be5b30
--- /dev/null
@@ -0,0 +1,164 @@
+package download
+
+import (
+       "strconv"
+
+       "github.com/bytom/vapor/errors"
+       "github.com/bytom/vapor/node"
+       "github.com/bytom/vapor/protocol/bc/types"
+       "github.com/bytom/vapor/toolbar/osssync/util"
+)
+
+const LOCALDIR = "./toolbar/osssync/blocks/" // Local directory to store temp blocks files
+
+// Run synchronize download from OSS to local node
+func Run(node *node.Node, ossEndpoint string) error {
+       if ossEndpoint == "" {
+               return errors.New("OSS Endpoint is empty")
+       }
+
+       downloadKeeper, err := NewDownloadKeeper(node, ossEndpoint)
+       if err != nil {
+               return err
+       }
+
+       if err = downloadKeeper.Download(); err != nil {
+               return err
+       }
+
+       return nil
+}
+
+// DownloadKeeper the struct for download
+type DownloadKeeper struct {
+       Node        *node.Node
+       OssEndpoint string
+       FileUtil    *util.FileUtil
+}
+
+// NewDownloadKeeper return one new instance of DownloadKeeper
+func NewDownloadKeeper(node *node.Node, ossEndpoint string) (*DownloadKeeper, error) {
+       fileUtil := util.NewFileUtil(LOCALDIR)
+
+       return &DownloadKeeper{
+               Node:        node,
+               OssEndpoint: "http://" + ossEndpoint + "/",
+               FileUtil:    fileUtil,
+       }, nil
+}
+
+// Download get blocks from OSS and update the node
+func (d *DownloadKeeper) Download() error {
+       if err := d.FileUtil.BlockDirInitial(); err != nil {
+               return err
+       }
+
+       syncStart := d.Node.GetChain().BestBlockHeight() + 1 // block height which the synchronization start from
+
+       infoJson, err := d.GetInfoJson()
+       if err != nil {
+               return err
+       }
+
+       latestUp := infoJson.LatestBlockHeight // Latest uploaded block height on OSS
+       intervals := infoJson.Interval         // Interval array
+       if latestUp == 0 || latestUp < syncStart {
+               return errors.New("No new blocks on OSS.")
+       }
+
+       var pos1, pos2 int // syncStart interval, latestUp interval
+       // Find pos2
+       for pos2 = len(intervals) - 1; latestUp < intervals[pos2].StartBlockHeight; pos2-- {
+       }
+       // Find pos1
+       if syncStart == 0 {
+               pos1 = 0
+       } else {
+               for pos1 = pos2; syncStart < intervals[pos1].StartBlockHeight; pos1-- {
+               }
+       }
+
+       // Download Whole Interval
+       for pos1 < pos2 {
+               if err = d.DownloadFiles(syncStart, intervals[pos1].EndBlockHeight, intervals[pos1]); err != nil {
+                       return err
+               }
+               syncStart = intervals[pos1].EndBlockHeight + 1
+               pos1++
+       }
+       // Download the last Interval
+       if pos1 == pos2 {
+               if err = d.DownloadFiles(syncStart, latestUp, intervals[pos2]); err != nil {
+                       return err
+               }
+       }
+       return nil
+}
+
+// DownloadFiles get block files from OSS, and update the node
+func (d *DownloadKeeper) DownloadFiles(start, end uint64, interval *util.Interval) error {
+       size := interval.GzSize
+       for {
+               if start > end {
+                       break
+               }
+
+               intervalStart := interval.StartBlockHeight
+               startInFile := start - intervalStart
+               n := startInFile / size
+               filenameNum := n*size + intervalStart
+
+               filename := strconv.FormatUint(filenameNum, 10)
+               filenameJson := filename + ".json"
+               filenameGzip := filenameJson + ".gz"
+
+               if err := d.GetObjectToFile(filenameGzip); err != nil {
+                       return err
+               }
+
+               if err := d.FileUtil.GzipDecode(filename); err != nil {
+                       return err
+               }
+
+               if err := d.FileUtil.RemoveLocal(filenameGzip); err != nil {
+                       return err
+               }
+
+               blocksJson, err := d.FileUtil.GetJson(filenameJson)
+               if err != nil {
+                       return err
+               }
+
+               blocks := []*types.Block{}
+               if err = util.Json2Struct(blocksJson, &blocks); err != nil {
+                       return err
+               }
+
+               latestDown := d.Node.GetChain().BestBlockHeight()
+               if latestDown+1 > start {
+                       blocks = blocks[startInFile:] // start from latestDown+1
+               } else if latestDown+1 < start {
+                       return errors.New("Wrong interval")
+               }
+               if err = d.SyncToNode(blocks); err != nil {
+                       return err
+               }
+
+               if err = d.FileUtil.RemoveLocal(filenameJson); err != nil {
+                       return err
+               }
+
+               start = filenameNum + size
+       }
+       return nil
+}
+
+// SyncToNode synchronize blocks to local node
+func (d *DownloadKeeper) SyncToNode(blocks []*types.Block) error {
+       for i := 0; i < len(blocks); i++ {
+               if _, err := d.Node.GetChain().ProcessBlock(blocks[i]); err != nil {
+                       return err
+               }
+       }
+       return nil
+}
diff --git a/toolbar/osssync/download/oss.go b/toolbar/osssync/download/oss.go
new file mode 100644 (file)
index 0000000..66a8486
--- /dev/null
@@ -0,0 +1,48 @@
+package download
+
+import (
+       "io"
+       "net/http"
+       "os"
+
+       "github.com/bytom/vapor/toolbar/osssync/util"
+)
+
+// GetObject download the file object from OSS
+func (d *DownloadKeeper) GetObject(filename string) (*io.ReadCloser, error) {
+       url := d.OssEndpoint + filename
+       res, err := http.Get(url)
+       if err != nil {
+               return nil, err
+       }
+
+       return &res.Body, nil
+}
+
+// GetObjectToFile download the file object from OSS to local
+func (d *DownloadKeeper) GetObjectToFile(filename string) error {
+       f, err := os.Create(d.FileUtil.LocalDir + filename)
+       if err != nil {
+               return err
+       }
+
+       body, err := d.GetObject(filename)
+       if err != nil {
+               return err
+       }
+
+       defer (*body).Close()
+
+       io.Copy(f, *body)
+       return nil
+}
+
+// GetInfoJson Download info.json
+func (d *DownloadKeeper) GetInfoJson() (*util.Info, error) {
+       body, err := d.GetObject("info.json")
+       if err != nil {
+               return nil, err
+       }
+
+       return util.GetInfoJson(*body)
+}
diff --git a/toolbar/osssync/main/main.go b/toolbar/osssync/main/main.go
new file mode 100644 (file)
index 0000000..04573d4
--- /dev/null
@@ -0,0 +1,13 @@
+package main
+
+import (
+       "fmt"
+
+       "github.com/bytom/vapor/toolbar/osssync/upload"
+)
+
+func main() {
+       if err := upload.Run(); err != nil {
+               fmt.Println(err)
+       }
+}
index b7dcd42..d0f0dc6 100644 (file)
@@ -4,7 +4,7 @@ import (
        "encoding/json"
        "os"
 
-       "github.com/bytom/bytom/errors"
+       "github.com/bytom/vapor/errors"
 )
 
 // Config represent root of config
index e2fd6cf..e833d99 100644 (file)
@@ -5,6 +5,7 @@ import (
 
        "github.com/aliyun/aliyun-oss-go-sdk/oss"
 
+       "github.com/bytom/vapor/errors"
        "github.com/bytom/vapor/toolbar/osssync/util"
 )
 
@@ -63,6 +64,14 @@ func (u *UploadKeeper) AddInterval(end, gzSize uint64) error {
 
                // Add Interval
                prevInvl := info.Interval[len(info.Interval)-1]
+               if prevInvl.EndBlockHeight >= end {
+                       return errors.New("New interval is included in previous intervals.")
+               }
+
+               if (end-prevInvl.EndBlockHeight)%gzSize != 0 {
+                       return errors.New("New interval is invalid.")
+               }
+
                newInvl := util.NewInterval(prevInvl.EndBlockHeight+1, end, gzSize)
                info.Interval = append(info.Interval, newInvl)
        } else {
index e81512e..a0960ff 100644 (file)
@@ -7,6 +7,7 @@ import (
        "github.com/aliyun/aliyun-oss-go-sdk/oss"
        log "github.com/sirupsen/logrus"
 
+       "github.com/bytom/vapor/errors"
        "github.com/bytom/vapor/protocol/bc/types"
        "github.com/bytom/vapor/toolbar/apinode"
        "github.com/bytom/vapor/toolbar/osssync/util"
@@ -46,8 +47,7 @@ type UploadKeeper struct {
 // NewUploadKeeper return one new instance of UploadKeeper
 func NewUploadKeeper() (*UploadKeeper, error) {
        cfg := &Config{}
-       err := LoadConfig(&cfg)
-       if err != nil {
+       if err := LoadConfig(&cfg); err != nil {
                return nil, err
        }
 
@@ -79,17 +79,15 @@ func (u *UploadKeeper) Run() {
        defer ticker.Stop()
 
        for ; true; <-ticker.C {
-               err := u.Upload()
-               if err != nil {
-                       log.WithField("error", err).Errorln("blockKeeper fail on process block")
+               if err := u.Upload(); err != nil {
+                       log.WithField("error", err).Errorln("blockKeeper fail")
                }
        }
 }
 
 // Upload find and upload blocks
 func (u *UploadKeeper) Upload() error {
-       err := u.FileUtil.BlockDirInitial()
-       if err != nil {
+       if err := u.FileUtil.BlockDirInitial(); err != nil {
                return err
        }
 
@@ -98,6 +96,10 @@ func (u *UploadKeeper) Upload() error {
                return err
        }
 
+       if currBlockHeight == 0 {
+               return errors.New("Current block height is 0.")
+       }
+
        infoJson, err := u.GetInfoJson()
        if err != nil {
                return err
@@ -107,19 +109,24 @@ func (u *UploadKeeper) Upload() error {
        intervals := infoJson.Interval         // Interval array
 
        var pos1, pos2 int // currBlockHeight interval, latestUp interval
+       // Find pos1
        for pos1 = len(intervals) - 1; currBlockHeight < intervals[pos1].StartBlockHeight; pos1-- {
        }
        // Current Block Height is out of the range given by info.json
        if currBlockHeight > intervals[pos1].EndBlockHeight {
                currBlockHeight = intervals[pos1].EndBlockHeight // Upload the part which contained by info.json
        }
-       for pos2 = pos1; latestUp < intervals[pos2].StartBlockHeight; pos2-- {
+       // Find pos2
+       if latestUp == 0 {
+               pos2 = 0
+       } else {
+               for pos2 = pos1; latestUp < intervals[pos2].StartBlockHeight; pos2-- {
+               }
        }
 
        // Upload Whole Interval
        for latestUp+1 < intervals[pos1].StartBlockHeight {
-               err = u.UploadFiles(latestUp+1, intervals[pos2].EndBlockHeight, intervals[pos2].GzSize)
-               if err != nil {
+               if err = u.UploadFiles(latestUp+1, intervals[pos2].EndBlockHeight, intervals[pos2].GzSize); err != nil {
                        return err
                }
 
@@ -128,14 +135,13 @@ func (u *UploadKeeper) Upload() error {
        }
 
        // Upload the last Interval
-       newLatestUp := currBlockHeight - ((currBlockHeight - intervals[pos1].StartBlockHeight) % intervals[pos1].GzSize) - 1
+       newLatestUp := currBlockHeight - ((currBlockHeight - intervals[pos1].StartBlockHeight + 1) % intervals[pos1].GzSize)
        if latestUp < newLatestUp {
-               err = u.UploadFiles(latestUp+1, newLatestUp, intervals[pos1].GzSize)
-               if err != nil {
+               if err = u.UploadFiles(latestUp+1, newLatestUp, intervals[pos1].GzSize); err != nil {
                        return err
                }
        }
-       return err
+       return nil
 }
 
 // UploadFiles get block from vapor and upload files to OSS
@@ -153,33 +159,27 @@ func (u *UploadKeeper) UploadFiles(start, end, size uint64) error {
                filenameJson := filename + ".json"
                filenameGzip := filenameJson + ".gz"
 
-               _, err = u.FileUtil.SaveBlockFile(filename, blocks)
-               if err != nil {
+               if _, err = u.FileUtil.SaveBlockFile(filename, blocks); err != nil {
                        return err
                }
 
-               err = u.FileUtil.GzipCompress(filename)
-               if err != nil {
+               if err = u.FileUtil.GzipCompress(filename); err != nil {
                        return err
                }
 
-               err = u.OssBucket.PutObjectFromFile(filenameGzip, u.FileUtil.LocalDir+filenameGzip)
-               if err != nil {
+               if err = u.FileUtil.RemoveLocal(filenameJson); err != nil {
                        return err
                }
 
-               err = u.SetLatestBlockHeight(start + size - 1)
-               if err != nil {
+               if err = u.OssBucket.PutObjectFromFile(filenameGzip, u.FileUtil.LocalDir+filenameGzip); err != nil {
                        return err
                }
 
-               err = u.FileUtil.RemoveLocal(filenameJson)
-               if err != nil {
+               if err = u.SetLatestBlockHeight(start + size - 1); err != nil {
                        return err
                }
 
-               err = u.FileUtil.RemoveLocal(filenameGzip)
-               if err != nil {
+               if err = u.FileUtil.RemoveLocal(filenameGzip); err != nil {
                        return err
                }
 
index 880c72c..46a30f5 100644 (file)
@@ -9,21 +9,20 @@ type FileUtil struct {
 
 // IsExists if file or directory exist
 func IsExists(path string) bool {
-       _, err := os.Stat(path)
-       if err != nil && !os.IsExist(err) {
+       if _, err := os.Stat(path); err != nil && !os.IsExist(err) {
                return false
        }
+
        return true
 }
 
 // PathExists return if path exists
 func PathExists(path string) (bool, error) {
-       _, err := os.Stat(path)
-       if err == nil {
-               return true, nil
+       if _, err := os.Stat(path); err != nil {
+               return false, err
        }
 
-       return false, err
+       return true, nil
 }
 
 // RemoveLocal deletes file
@@ -39,12 +38,10 @@ func (f *FileUtil) BlockDirInitial() error {
        }
 
        if ifPathExist {
-               err = os.RemoveAll(f.LocalDir)
-               if err != nil {
+               if err = os.RemoveAll(f.LocalDir); err != nil {
                        return err
                }
        }
 
-       err = os.Mkdir(f.LocalDir, 0755)
-       return err
+       return os.Mkdir(f.LocalDir, 0755)
 }
index 99d9b2a..1fe23c7 100644 (file)
@@ -2,15 +2,13 @@ package util
 
 import (
        "compress/gzip"
+       "io/ioutil"
        "os"
 )
 
-const READ_SIZE = 1024 * 1024 * 500
-
-// GzipCompress compress file to Gzip
+// GzipCompress Encode file to Gzip and save to the same directory
 func (f *FileUtil) GzipCompress(fileName string) error {
-       filePath := f.LocalDir + fileName + ".json.gz"
-       fw, err := os.Create(filePath)
+       fw, err := os.Create(f.LocalDir + fileName + ".json.gz")
        if err != nil {
                return err
        }
@@ -20,8 +18,7 @@ func (f *FileUtil) GzipCompress(fileName string) error {
        gw := gzip.NewWriter(fw)
        defer gw.Close()
 
-       filePath = f.LocalDir + fileName + ".json"
-       fr, err := os.Open(filePath)
+       fr, err := os.Open(f.LocalDir + fileName + ".json")
        if err != nil {
                return err
        }
@@ -36,49 +33,37 @@ func (f *FileUtil) GzipCompress(fileName string) error {
        gw.Header.Name = fi.Name()
 
        buf := make([]byte, fi.Size())
-       _, err = fr.Read(buf)
-       if err != nil {
+       if _, err = fr.Read(buf); err != nil {
                return err
        }
 
-       _, err = gw.Write(buf)
-       if err != nil {
+       if _, err = gw.Write(buf); err != nil {
                return err
        }
 
-       return err
+       return nil
 }
 
-// GzipUncompress uncompress Gzip file
-func (f *FileUtil) GzipUncompress(fileName string) error {
-       filedirname := f.LocalDir + fileName + ".json.gz"
-       fr, err := os.Open(filedirname)
+// GzipDecode Decode Gzip file and save to the same directory
+func (f *FileUtil) GzipDecode(fileName string) error {
+       fr, err := os.Open(f.LocalDir + fileName + ".json.gz")
        if err != nil {
                return err
        }
 
        defer fr.Close()
 
-       gr, err := gzip.NewReader(fr)
+       reader, err := gzip.NewReader(fr)
        if err != nil {
                return err
        }
 
-       defer gr.Close()
-
-       buf := make([]byte, READ_SIZE)
-       n, err := gr.Read(buf)
-
-       filedirname = f.LocalDir + gr.Header.Name
-       fw, err := os.Create(filedirname)
-       if err != nil {
-               return err
-       }
+       defer reader.Close()
 
-       _, err = fw.Write(buf[:n])
+       json, err := ioutil.ReadAll(reader)
        if err != nil {
                return err
        }
 
-       return err
+       return ioutil.WriteFile(f.LocalDir+fileName+".json", json, 0644)
 }
index afc42cc..82f15d2 100644 (file)
@@ -32,7 +32,7 @@ type Info struct {
 
 // NewInfo creates a new Info for info.json
 func NewInfo(end, gzSize uint64) *Info {
-       newInvl := NewInterval(0, end, gzSize)
+       newInvl := NewInterval(1, end, gzSize)
        var arr []*Interval
        arr = append(arr, newInvl)
        return &Info{0, arr}
@@ -48,6 +48,5 @@ func GetInfoJson(body io.ReadCloser) (*Info, error) {
        }
 
        info := new(Info)
-       err = Json2Struct(data, &info)
-       return info, err
+       return info, Json2Struct(data, &info)
 }
index 49a07dd..c3795cc 100644 (file)
@@ -18,8 +18,7 @@ func (f *FileUtil) SaveBlockFile(filename string, data interface{}) (bool, error
                return false, err
        }
 
-       err = ioutil.WriteFile(filename, saveData, 0644)
-       if err != nil {
+       if err = ioutil.WriteFile(filename, saveData, 0644); err != nil {
                return false, err
        }
 
@@ -28,7 +27,7 @@ func (f *FileUtil) SaveBlockFile(filename string, data interface{}) (bool, error
 
 // GetJson read json file
 func (f *FileUtil) GetJson(filename string) (json.RawMessage, error) {
-       filename = f.LocalDir + filename + ".json"
+       filename = f.LocalDir + filename
        return ioutil.ReadFile(filename)
 }