package commands
import (
+ "fmt"
"strings"
"time"
"github.com/spf13/cobra"
"github.com/bytom/vapor/node"
+ "github.com/bytom/vapor/toolbar/osssync/download"
)
const logModule = "cmd"
// Create & start node
n := node.NewNode(config)
+
+ // Get blocks from OSS
+ if err := download.Run(n, config.Oss.Endpoint); err != nil {
+ fmt.Println("Failed to get blocks from oss: ", err)
+ }
+
+ // Start node
if _, err := n.Start(); err != nil {
log.WithFields(log.Fields{"module": logModule, "err": err}).Fatal("failed to start node")
}
--- /dev/null
+package download
+
+import (
+ "strconv"
+
+ "github.com/bytom/vapor/errors"
+ "github.com/bytom/vapor/node"
+ "github.com/bytom/vapor/protocol/bc/types"
+ "github.com/bytom/vapor/toolbar/osssync/util"
+)
+
+const LOCALDIR = "./toolbar/osssync/blocks/" // Local directory to store temp blocks files
+
+// Run synchronize download from OSS to local node
+func Run(node *node.Node, ossEndpoint string) error {
+ if ossEndpoint == "" {
+ return errors.New("OSS Endpoint is empty")
+ }
+
+ downloadKeeper, err := NewDownloadKeeper(node, ossEndpoint)
+ if err != nil {
+ return err
+ }
+
+ if err = downloadKeeper.Download(); err != nil {
+ return err
+ }
+
+ return nil
+}
+
+// DownloadKeeper the struct for download
+type DownloadKeeper struct {
+ Node *node.Node
+ OssEndpoint string
+ FileUtil *util.FileUtil
+}
+
+// NewDownloadKeeper return one new instance of DownloadKeeper
+func NewDownloadKeeper(node *node.Node, ossEndpoint string) (*DownloadKeeper, error) {
+ fileUtil := util.NewFileUtil(LOCALDIR)
+
+ return &DownloadKeeper{
+ Node: node,
+ OssEndpoint: "http://" + ossEndpoint + "/",
+ FileUtil: fileUtil,
+ }, nil
+}
+
+// Download get blocks from OSS and update the node
+func (d *DownloadKeeper) Download() error {
+ if err := d.FileUtil.BlockDirInitial(); err != nil {
+ return err
+ }
+
+ syncStart := d.Node.GetChain().BestBlockHeight() + 1 // block height which the synchronization start from
+
+ infoJson, err := d.GetInfoJson()
+ if err != nil {
+ return err
+ }
+
+ latestUp := infoJson.LatestBlockHeight // Latest uploaded block height on OSS
+ intervals := infoJson.Interval // Interval array
+ if latestUp == 0 || latestUp < syncStart {
+ return errors.New("No new blocks on OSS.")
+ }
+
+ var pos1, pos2 int // syncStart interval, latestUp interval
+ // Find pos2
+ for pos2 = len(intervals) - 1; latestUp < intervals[pos2].StartBlockHeight; pos2-- {
+ }
+ // Find pos1
+ if syncStart == 0 {
+ pos1 = 0
+ } else {
+ for pos1 = pos2; syncStart < intervals[pos1].StartBlockHeight; pos1-- {
+ }
+ }
+
+ // Download Whole Interval
+ for pos1 < pos2 {
+ if err = d.DownloadFiles(syncStart, intervals[pos1].EndBlockHeight, intervals[pos1]); err != nil {
+ return err
+ }
+ syncStart = intervals[pos1].EndBlockHeight + 1
+ pos1++
+ }
+ // Download the last Interval
+ if pos1 == pos2 {
+ if err = d.DownloadFiles(syncStart, latestUp, intervals[pos2]); err != nil {
+ return err
+ }
+ }
+ return nil
+}
+
+// DownloadFiles get block files from OSS, and update the node
+func (d *DownloadKeeper) DownloadFiles(start, end uint64, interval *util.Interval) error {
+ size := interval.GzSize
+ for {
+ if start > end {
+ break
+ }
+
+ intervalStart := interval.StartBlockHeight
+ startInFile := start - intervalStart
+ n := startInFile / size
+ filenameNum := n*size + intervalStart
+
+ filename := strconv.FormatUint(filenameNum, 10)
+ filenameJson := filename + ".json"
+ filenameGzip := filenameJson + ".gz"
+
+ if err := d.GetObjectToFile(filenameGzip); err != nil {
+ return err
+ }
+
+ if err := d.FileUtil.GzipDecode(filename); err != nil {
+ return err
+ }
+
+ if err := d.FileUtil.RemoveLocal(filenameGzip); err != nil {
+ return err
+ }
+
+ blocksJson, err := d.FileUtil.GetJson(filenameJson)
+ if err != nil {
+ return err
+ }
+
+ blocks := []*types.Block{}
+ if err = util.Json2Struct(blocksJson, &blocks); err != nil {
+ return err
+ }
+
+ latestDown := d.Node.GetChain().BestBlockHeight()
+ if latestDown+1 > start {
+ blocks = blocks[startInFile:] // start from latestDown+1
+ } else if latestDown+1 < start {
+ return errors.New("Wrong interval")
+ }
+ if err = d.SyncToNode(blocks); err != nil {
+ return err
+ }
+
+ if err = d.FileUtil.RemoveLocal(filenameJson); err != nil {
+ return err
+ }
+
+ start = filenameNum + size
+ }
+ return nil
+}
+
+// SyncToNode synchronize blocks to local node
+func (d *DownloadKeeper) SyncToNode(blocks []*types.Block) error {
+ for i := 0; i < len(blocks); i++ {
+ if _, err := d.Node.GetChain().ProcessBlock(blocks[i]); err != nil {
+ return err
+ }
+ }
+ return nil
+}
--- /dev/null
+package download
+
+import (
+ "io"
+ "net/http"
+ "os"
+
+ "github.com/bytom/vapor/toolbar/osssync/util"
+)
+
+// GetObject download the file object from OSS
+func (d *DownloadKeeper) GetObject(filename string) (*io.ReadCloser, error) {
+ url := d.OssEndpoint + filename
+ res, err := http.Get(url)
+ if err != nil {
+ return nil, err
+ }
+
+ return &res.Body, nil
+}
+
+// GetObjectToFile download the file object from OSS to local
+func (d *DownloadKeeper) GetObjectToFile(filename string) error {
+ f, err := os.Create(d.FileUtil.LocalDir + filename)
+ if err != nil {
+ return err
+ }
+
+ body, err := d.GetObject(filename)
+ if err != nil {
+ return err
+ }
+
+ defer (*body).Close()
+
+ io.Copy(f, *body)
+ return nil
+}
+
+// GetInfoJson Download info.json
+func (d *DownloadKeeper) GetInfoJson() (*util.Info, error) {
+ body, err := d.GetObject("info.json")
+ if err != nil {
+ return nil, err
+ }
+
+ return util.GetInfoJson(*body)
+}
--- /dev/null
+package main
+
+import (
+ "fmt"
+
+ "github.com/bytom/vapor/toolbar/osssync/upload"
+)
+
+func main() {
+ if err := upload.Run(); err != nil {
+ fmt.Println(err)
+ }
+}
"encoding/json"
"os"
- "github.com/bytom/bytom/errors"
+ "github.com/bytom/vapor/errors"
)
// Config represent root of config
"github.com/aliyun/aliyun-oss-go-sdk/oss"
+ "github.com/bytom/vapor/errors"
"github.com/bytom/vapor/toolbar/osssync/util"
)
// Add Interval
prevInvl := info.Interval[len(info.Interval)-1]
+ if prevInvl.EndBlockHeight >= end {
+ return errors.New("New interval is included in previous intervals.")
+ }
+
+ if (end-prevInvl.EndBlockHeight)%gzSize != 0 {
+ return errors.New("New interval is invalid.")
+ }
+
newInvl := util.NewInterval(prevInvl.EndBlockHeight+1, end, gzSize)
info.Interval = append(info.Interval, newInvl)
} else {
"github.com/aliyun/aliyun-oss-go-sdk/oss"
log "github.com/sirupsen/logrus"
+ "github.com/bytom/vapor/errors"
"github.com/bytom/vapor/protocol/bc/types"
"github.com/bytom/vapor/toolbar/apinode"
"github.com/bytom/vapor/toolbar/osssync/util"
// NewUploadKeeper return one new instance of UploadKeeper
func NewUploadKeeper() (*UploadKeeper, error) {
cfg := &Config{}
- err := LoadConfig(&cfg)
- if err != nil {
+ if err := LoadConfig(&cfg); err != nil {
return nil, err
}
defer ticker.Stop()
for ; true; <-ticker.C {
- err := u.Upload()
- if err != nil {
- log.WithField("error", err).Errorln("blockKeeper fail on process block")
+ if err := u.Upload(); err != nil {
+ log.WithField("error", err).Errorln("blockKeeper fail")
}
}
}
// Upload find and upload blocks
func (u *UploadKeeper) Upload() error {
- err := u.FileUtil.BlockDirInitial()
- if err != nil {
+ if err := u.FileUtil.BlockDirInitial(); err != nil {
return err
}
return err
}
+ if currBlockHeight == 0 {
+ return errors.New("Current block height is 0.")
+ }
+
infoJson, err := u.GetInfoJson()
if err != nil {
return err
intervals := infoJson.Interval // Interval array
var pos1, pos2 int // currBlockHeight interval, latestUp interval
+ // Find pos1
for pos1 = len(intervals) - 1; currBlockHeight < intervals[pos1].StartBlockHeight; pos1-- {
}
// Current Block Height is out of the range given by info.json
if currBlockHeight > intervals[pos1].EndBlockHeight {
currBlockHeight = intervals[pos1].EndBlockHeight // Upload the part which contained by info.json
}
- for pos2 = pos1; latestUp < intervals[pos2].StartBlockHeight; pos2-- {
+ // Find pos2
+ if latestUp == 0 {
+ pos2 = 0
+ } else {
+ for pos2 = pos1; latestUp < intervals[pos2].StartBlockHeight; pos2-- {
+ }
}
// Upload Whole Interval
for latestUp+1 < intervals[pos1].StartBlockHeight {
- err = u.UploadFiles(latestUp+1, intervals[pos2].EndBlockHeight, intervals[pos2].GzSize)
- if err != nil {
+ if err = u.UploadFiles(latestUp+1, intervals[pos2].EndBlockHeight, intervals[pos2].GzSize); err != nil {
return err
}
}
// Upload the last Interval
- newLatestUp := currBlockHeight - ((currBlockHeight - intervals[pos1].StartBlockHeight) % intervals[pos1].GzSize) - 1
+ newLatestUp := currBlockHeight - ((currBlockHeight - intervals[pos1].StartBlockHeight + 1) % intervals[pos1].GzSize)
if latestUp < newLatestUp {
- err = u.UploadFiles(latestUp+1, newLatestUp, intervals[pos1].GzSize)
- if err != nil {
+ if err = u.UploadFiles(latestUp+1, newLatestUp, intervals[pos1].GzSize); err != nil {
return err
}
}
- return err
+ return nil
}
// UploadFiles get block from vapor and upload files to OSS
filenameJson := filename + ".json"
filenameGzip := filenameJson + ".gz"
- _, err = u.FileUtil.SaveBlockFile(filename, blocks)
- if err != nil {
+ if _, err = u.FileUtil.SaveBlockFile(filename, blocks); err != nil {
return err
}
- err = u.FileUtil.GzipCompress(filename)
- if err != nil {
+ if err = u.FileUtil.GzipCompress(filename); err != nil {
return err
}
- err = u.OssBucket.PutObjectFromFile(filenameGzip, u.FileUtil.LocalDir+filenameGzip)
- if err != nil {
+ if err = u.FileUtil.RemoveLocal(filenameJson); err != nil {
return err
}
- err = u.SetLatestBlockHeight(start + size - 1)
- if err != nil {
+ if err = u.OssBucket.PutObjectFromFile(filenameGzip, u.FileUtil.LocalDir+filenameGzip); err != nil {
return err
}
- err = u.FileUtil.RemoveLocal(filenameJson)
- if err != nil {
+ if err = u.SetLatestBlockHeight(start + size - 1); err != nil {
return err
}
- err = u.FileUtil.RemoveLocal(filenameGzip)
- if err != nil {
+ if err = u.FileUtil.RemoveLocal(filenameGzip); err != nil {
return err
}
// IsExists if file or directory exist
func IsExists(path string) bool {
- _, err := os.Stat(path)
- if err != nil && !os.IsExist(err) {
+ if _, err := os.Stat(path); err != nil && !os.IsExist(err) {
return false
}
+
return true
}
// PathExists return if path exists
func PathExists(path string) (bool, error) {
- _, err := os.Stat(path)
- if err == nil {
- return true, nil
+ if _, err := os.Stat(path); err != nil {
+ return false, err
}
- return false, err
+ return true, nil
}
// RemoveLocal deletes file
}
if ifPathExist {
- err = os.RemoveAll(f.LocalDir)
- if err != nil {
+ if err = os.RemoveAll(f.LocalDir); err != nil {
return err
}
}
- err = os.Mkdir(f.LocalDir, 0755)
- return err
+ return os.Mkdir(f.LocalDir, 0755)
}
import (
"compress/gzip"
+ "io/ioutil"
"os"
)
-const READ_SIZE = 1024 * 1024 * 500
-
-// GzipCompress compress file to Gzip
+// GzipCompress Encode file to Gzip and save to the same directory
func (f *FileUtil) GzipCompress(fileName string) error {
- filePath := f.LocalDir + fileName + ".json.gz"
- fw, err := os.Create(filePath)
+ fw, err := os.Create(f.LocalDir + fileName + ".json.gz")
if err != nil {
return err
}
gw := gzip.NewWriter(fw)
defer gw.Close()
- filePath = f.LocalDir + fileName + ".json"
- fr, err := os.Open(filePath)
+ fr, err := os.Open(f.LocalDir + fileName + ".json")
if err != nil {
return err
}
gw.Header.Name = fi.Name()
buf := make([]byte, fi.Size())
- _, err = fr.Read(buf)
- if err != nil {
+ if _, err = fr.Read(buf); err != nil {
return err
}
- _, err = gw.Write(buf)
- if err != nil {
+ if _, err = gw.Write(buf); err != nil {
return err
}
- return err
+ return nil
}
-// GzipUncompress uncompress Gzip file
-func (f *FileUtil) GzipUncompress(fileName string) error {
- filedirname := f.LocalDir + fileName + ".json.gz"
- fr, err := os.Open(filedirname)
+// GzipDecode Decode Gzip file and save to the same directory
+func (f *FileUtil) GzipDecode(fileName string) error {
+ fr, err := os.Open(f.LocalDir + fileName + ".json.gz")
if err != nil {
return err
}
defer fr.Close()
- gr, err := gzip.NewReader(fr)
+ reader, err := gzip.NewReader(fr)
if err != nil {
return err
}
- defer gr.Close()
-
- buf := make([]byte, READ_SIZE)
- n, err := gr.Read(buf)
-
- filedirname = f.LocalDir + gr.Header.Name
- fw, err := os.Create(filedirname)
- if err != nil {
- return err
- }
+ defer reader.Close()
- _, err = fw.Write(buf[:n])
+ json, err := ioutil.ReadAll(reader)
if err != nil {
return err
}
- return err
+ return ioutil.WriteFile(f.LocalDir+fileName+".json", json, 0644)
}
// NewInfo creates a new Info for info.json
func NewInfo(end, gzSize uint64) *Info {
- newInvl := NewInterval(0, end, gzSize)
+ newInvl := NewInterval(1, end, gzSize)
var arr []*Interval
arr = append(arr, newInvl)
return &Info{0, arr}
}
info := new(Info)
- err = Json2Struct(data, &info)
- return info, err
+ return info, Json2Struct(data, &info)
}
return false, err
}
- err = ioutil.WriteFile(filename, saveData, 0644)
- if err != nil {
+ if err = ioutil.WriteFile(filename, saveData, 0644); err != nil {
return false, err
}
// GetJson read json file
func (f *FileUtil) GetJson(filename string) (json.RawMessage, error) {
- filename = f.LocalDir + filename + ".json"
+ filename = f.LocalDir + filename
return ioutil.ReadFile(filename)
}