runNodeCmd.Flags().Int("ws.max_num_websockets", config.Websocket.MaxNumWebsockets, "Max number of websocket connections")
runNodeCmd.Flags().Int("ws.max_num_concurrent_reqs", config.Websocket.MaxNumConcurrentReqs, "Max number of concurrent websocket requests that may be processed concurrently")
+ // OSS
+ runNodeCmd.Flags().String("oss.endpoint", config.Oss.Endpoint, "Endpoint of OSS")
+
RootCmd.AddCommand(runNodeCmd)
}
Websocket *WebsocketConfig `mapstructure:"ws"`
Federation *FederationConfig `mapstructure:"federation"`
CrossChain *CrossChainConfig `mapstructure:"cross_chain"`
+ Oss *OssConfig `mapstructure:"oss"`
}
// Default configurable parameters.
Websocket: DefaultWebsocketConfig(),
Federation: DefaultFederationConfig(),
CrossChain: DefaultCrossChainConfig(),
+ Oss: DefaultOssConfig(),
}
}
AssetWhitelist string `mapstructure:"asset_whitelist"`
}
+type OssConfig struct {
+ Endpoint string `mapstructure:"endpoint"`
+}
+
// Default configurable rpc's auth parameters.
func DefaultRPCAuthConfig() *RPCAuthConfig {
return &RPCAuthConfig{
return xpub
}
+// Default configurable oss parameters.
+func DefaultOssConfig() *OssConfig {
+ return &OssConfig{}
+}
+
//-----------------------------------------------------------------------------
// Utils
+++ /dev/null
-package sync
-
-import "github.com/bytom/vapor/toolbar/osssync/util"
-
-// Interval determines the number of blocks in a Gzip file in the Interval of blockHeight
-// StartBlockHeight is the start of the Interval
-// EndBlockHeight: the end of the Interval
-// GzSize is the number of blocks store in a Gzip file
-type Interval struct {
- StartBlockHeight uint64
- EndBlockHeight uint64
- GzSize uint64
-}
-
-// NewInterval creates a new Interval from info.json
-func NewInterval(start, end, gzSize uint64) *Interval {
- return &Interval{
- StartBlockHeight: start,
- EndBlockHeight: end,
- GzSize: gzSize,
- }
-}
-
-// Info is a struct for info.json
-type Info struct {
- LatestBlockHeight uint64
- Interval []*Interval
-}
-
-// NewInfo creates a new Info for info.json
-func NewInfo(end, gzSize uint64) *Info {
- newInvl := NewInterval(0, end, gzSize)
- var arr []*Interval
- arr = append(arr, newInvl)
- return &Info{0, arr}
-}
-
-// GetInfoJson Download info.json
-func (s *Sync) GetInfoJson() (*Info, error) {
- data, err := s.GetObjToData("info.json")
- if err != nil {
- return nil, err
- }
-
- info := new(Info)
- err = util.Json2Struct(data, &info)
- return info, err
-}
-
-// Upload info.json
-func (s *Sync) PutInfoJson(infoData *Info) error {
- jsonData, err := util.Struct2Json(infoData)
- if err != nil {
- return err
- }
-
- // Upload
- return s.PutObjByteArr("info.json", jsonData)
-}
-
-// SetLatestBlockHeight set new latest blockHeight on OSS
-func (s *Sync) SetLatestBlockHeight(newLatestBlockHeight uint64) error {
- info, err := s.GetInfoJson()
- if err != nil {
- return err
- }
-
- info.LatestBlockHeight = newLatestBlockHeight
- return s.PutInfoJson(info)
-}
-
-// AddInterval adds an interval to the end of info.json
-func (s *Sync) AddInterval(end, gzSize uint64) error {
- isJsonExist, err := s.OssBucket.IsObjectExist("info.json")
- if err != nil {
- return err
- }
-
- var info *Info
- if isJsonExist {
- // Download info.json
- info, err = s.GetInfoJson()
- if err != nil {
- return err
- }
-
- // Add Interval
- prevInvl := info.Interval[len(info.Interval)-1]
- newInvl := NewInterval(prevInvl.EndBlockHeight+1, end, gzSize)
- info.Interval = append(info.Interval, newInvl)
- } else {
- info = NewInfo(end, gzSize)
- }
- return s.PutInfoJson(info)
-}
+++ /dev/null
-package sync
-
-import (
- "bytes"
- "io/ioutil"
-
- "github.com/aliyun/aliyun-oss-go-sdk/oss"
-)
-
-// PutObjByteArr upload Byte Array object
-func (s *Sync) PutObjByteArr(objectName string, objectValue []byte) error {
- objectAcl := oss.ObjectACL(oss.ACLPublicRead)
- return s.OssBucket.PutObject(objectName, bytes.NewReader(objectValue), objectAcl)
-}
-
-// GetObjToData download object to stream
-func (s *Sync) GetObjToData(objectName string) ([]byte, error) {
- body, err := s.OssBucket.GetObject(objectName)
- if err != nil {
- return nil, err
- }
-
- defer body.Close()
-
- data, err := ioutil.ReadAll(body)
- if err != nil {
- return nil, err
- }
-
- return data, err
-}
+++ /dev/null
-package sync
-
-import (
- "github.com/aliyun/aliyun-oss-go-sdk/oss"
-
- "github.com/bytom/vapor/toolbar/osssync/config"
- "github.com/bytom/vapor/toolbar/osssync/util"
-)
-
-// Sync the struct of the Sync
-type Sync struct {
- OssClient *oss.Client
- OssBucket *oss.Bucket
- FileUtil *util.FileUtil
-}
-
-// NewSync return one new instance of Sync
-func NewSync() (*Sync, error) {
- cfg := &config.Config{}
- err := config.LoadConfig(&cfg)
- if err != nil {
- return nil, err
- }
-
- ossClient, err := oss.New(cfg.Oss.Endpoint, cfg.Oss.AccessKeyID, cfg.Oss.AccessKeySecret)
- if err != nil {
- return nil, err
- }
-
- ossBucket, err := ossClient.Bucket("bytom-seed")
- if err != nil {
- return nil, err
- }
-
- fileUtil := util.NewFileUtil("./blocks")
-
- return &Sync{
- OssClient: ossClient,
- OssBucket: ossBucket,
- FileUtil: fileUtil,
- }, nil
-}
-package config
+package upload
import (
"encoding/json"
// Config represent root of config
type Config struct {
- Oss Oss `json:"oss"`
- VaporURL string `json:"vapor_url"`
+ OssConfig *OssConfig `json:"oss_config"`
+ VaporURL string `json:"vapor_url"`
}
// Oss logs cfg
-type Oss struct {
+type Login struct {
Endpoint string `json:"endpoint"`
AccessKeyID string `json:"access_key_id"`
AccessKeySecret string `json:"access_key_secret"`
}
-// LoadConfig read path file to the config object
+// Oss cfg
+type OssConfig struct {
+ Login *Login `json:"login"`
+ Bucket string `json:"bucket"`
+}
+
+// LoadConfig read path file to the config object for Upload from Vapor to OSS
func LoadConfig(config interface{}) error {
if len(os.Args) <= 1 {
return errors.New("Please setup the config file path as Args[1]")
--- /dev/null
+package upload
+
+import (
+ "bytes"
+
+ "github.com/aliyun/aliyun-oss-go-sdk/oss"
+
+ "github.com/bytom/vapor/toolbar/osssync/util"
+)
+
+// PutObjByteArr upload Byte Array object
+func (u *UploadKeeper) PutObjByteArr(objectName string, objectValue []byte) error {
+ objectAcl := oss.ObjectACL(oss.ACLPublicRead)
+ return u.OssBucket.PutObject(objectName, bytes.NewReader(objectValue), objectAcl)
+}
+
+// GetInfoJson Download info.json
+func (u *UploadKeeper) GetInfoJson() (*util.Info, error) {
+ body, err := u.OssBucket.GetObject("info.json")
+ if err != nil {
+ return nil, err
+ }
+
+ return util.GetInfoJson(body)
+}
+
+// Upload info.json
+func (u *UploadKeeper) PutInfoJson(infoData *util.Info) error {
+ jsonData, err := util.Struct2Json(infoData)
+ if err != nil {
+ return err
+ }
+
+ // Upload
+ return u.PutObjByteArr("info.json", jsonData)
+}
+
+// SetLatestBlockHeight set new latest blockHeight on OSS
+func (u *UploadKeeper) SetLatestBlockHeight(newLatestBlockHeight uint64) error {
+ info, err := u.GetInfoJson()
+ if err != nil {
+ return err
+ }
+
+ info.LatestBlockHeight = newLatestBlockHeight
+ return u.PutInfoJson(info)
+}
+
+// AddInterval if "info.json" exists on OSS, add Interval to the end; if not exist, create "info.json" with Interval
+func (u *UploadKeeper) AddInterval(end, gzSize uint64) error {
+ isJsonExist, err := u.OssBucket.IsObjectExist("info.json")
+ if err != nil {
+ return err
+ }
+
+ var info *util.Info
+ if isJsonExist {
+ // Download info.json
+ info, err = u.GetInfoJson()
+ if err != nil {
+ return err
+ }
+
+ // Add Interval
+ prevInvl := info.Interval[len(info.Interval)-1]
+ newInvl := util.NewInterval(prevInvl.EndBlockHeight+1, end, gzSize)
+ info.Interval = append(info.Interval, newInvl)
+ } else {
+ info = util.NewInfo(end, gzSize)
+ }
+ return u.PutInfoJson(info)
+}
-package sync
+package upload
import (
"strconv"
"time"
+ "github.com/aliyun/aliyun-oss-go-sdk/oss"
log "github.com/sirupsen/logrus"
"github.com/bytom/vapor/protocol/bc/types"
"github.com/bytom/vapor/toolbar/apinode"
- "github.com/bytom/vapor/toolbar/osssync/config"
+ "github.com/bytom/vapor/toolbar/osssync/util"
)
+const LOCALDIR = "./blocks/" // Local directory to store temp blocks files
+
+// Run synchronize upload blocks from vapor to OSS
+func Run() error {
+ uploadKeeper, err := NewUploadKeeper()
+ if err != nil {
+ return err
+ }
+
+ uploadKeeper.Run()
+ return nil
+}
+
+// AddInterval if "info.json" exists on OSS, add Interval to the end; if not exist, create "info.json" with Interval
+func AddInterval(end, gzSize uint64) error {
+ uploadKeeper, err := NewUploadKeeper()
+ if err != nil {
+ return err
+ }
+
+ return uploadKeeper.AddInterval(end, gzSize)
+}
+
// UploadKeeper the struct for upload
type UploadKeeper struct {
- Sync *Sync
- Node *apinode.Node
+ Node *apinode.Node
+ OssClient *oss.Client
+ OssBucket *oss.Bucket
+ FileUtil *util.FileUtil
}
// NewUploadKeeper return one new instance of UploadKeeper
func NewUploadKeeper() (*UploadKeeper, error) {
- cfg := &config.Config{}
- err := config.LoadConfig(&cfg)
+ cfg := &Config{}
+ err := LoadConfig(&cfg)
if err != nil {
return nil, err
}
node := apinode.NewNode(cfg.VaporURL)
- sync, err := NewSync()
+ ossClient, err := oss.New(cfg.OssConfig.Login.Endpoint, cfg.OssConfig.Login.AccessKeyID, cfg.OssConfig.Login.AccessKeySecret)
+ if err != nil {
+ return nil, err
+ }
+
+ ossBucket, err := ossClient.Bucket(cfg.OssConfig.Bucket)
if err != nil {
return nil, err
}
+ fileUtil := util.NewFileUtil(LOCALDIR)
+
return &UploadKeeper{
- Sync: sync,
- Node: node,
+ Node: node,
+ OssClient: ossClient,
+ OssBucket: ossBucket,
+ FileUtil: fileUtil,
}, nil
}
-// RunSyncUp run synchronize upload to OSS
-func (u *UploadKeeper) RunSyncUp() {
+// Run synchronize upload blocks from vapor to OSS
+func (u *UploadKeeper) Run() {
ticker := time.NewTicker(time.Minute)
defer ticker.Stop()
// Upload find and upload blocks
func (u *UploadKeeper) Upload() error {
- err := u.Sync.FileUtil.BlockDirInitial()
+ err := u.FileUtil.BlockDirInitial()
if err != nil {
return err
}
return err
}
- infoJson, err := u.Sync.GetInfoJson()
+ infoJson, err := u.GetInfoJson()
if err != nil {
return err
}
// Upload Whole Interval
for latestUp+1 < intervals[pos1].StartBlockHeight {
- if latestUp == 0 {
- err = u.UploadFiles(latestUp, intervals[pos2].EndBlockHeight, intervals[pos2].GzSize)
- } else {
- err = u.UploadFiles(latestUp+1, intervals[pos2].EndBlockHeight, intervals[pos2].GzSize)
- }
+ err = u.UploadFiles(latestUp+1, intervals[pos2].EndBlockHeight, intervals[pos2].GzSize)
if err != nil {
return err
}
// Upload the last Interval
newLatestUp := currBlockHeight - ((currBlockHeight - intervals[pos1].StartBlockHeight) % intervals[pos1].GzSize) - 1
if latestUp < newLatestUp {
- if latestUp == 0 {
- err = u.UploadFiles(latestUp, newLatestUp, intervals[pos1].GzSize)
- } else {
- err = u.UploadFiles(latestUp+1, newLatestUp, intervals[pos1].GzSize)
- }
+ err = u.UploadFiles(latestUp+1, newLatestUp, intervals[pos1].GzSize)
if err != nil {
return err
}
filenameJson := filename + ".json"
filenameGzip := filenameJson + ".gz"
- _, err = u.Sync.FileUtil.SaveBlockFile(filename, blocks)
+ _, err = u.FileUtil.SaveBlockFile(filename, blocks)
if err != nil {
return err
}
- err = u.Sync.FileUtil.GzipCompress(filename)
+ err = u.FileUtil.GzipCompress(filename)
if err != nil {
return err
}
- err = u.Sync.OssBucket.PutObjectFromFile(filenameGzip, u.Sync.FileUtil.LocalDir+"/"+filenameGzip)
+ err = u.OssBucket.PutObjectFromFile(filenameGzip, u.FileUtil.LocalDir+filenameGzip)
if err != nil {
return err
}
- err = u.Sync.SetLatestBlockHeight(start + size - 1)
+ err = u.SetLatestBlockHeight(start + size - 1)
if err != nil {
return err
}
- err = u.Sync.FileUtil.RemoveLocal(filenameJson)
+ err = u.FileUtil.RemoveLocal(filenameJson)
if err != nil {
return err
}
- err = u.Sync.FileUtil.RemoveLocal(filenameGzip)
+ err = u.FileUtil.RemoveLocal(filenameGzip)
if err != nil {
return err
}
return true
}
-// IfNoFileToCreate if the file is not exist, create the file
-func IfNoFileToCreate(fileName string) (file *os.File) {
- var f *os.File
- var err error
- if !IsExists(fileName) {
- f, err = os.Create(fileName)
- if err != nil {
- return
- }
- defer f.Close()
- }
- return f
-}
-
// PathExists return if path exists
func PathExists(path string) (bool, error) {
_, err := os.Stat(path)
// RemoveLocal deletes file
func (f *FileUtil) RemoveLocal(filename string) error {
- return os.Remove(f.LocalDir + "/" + filename)
+ return os.Remove(f.LocalDir + filename)
}
// BlockDirInitial initializes the blocks directory
// GzipCompress compress file to Gzip
func (f *FileUtil) GzipCompress(fileName string) error {
- filePath := f.LocalDir + "/" + fileName + ".json.gz"
+ filePath := f.LocalDir + fileName + ".json.gz"
fw, err := os.Create(filePath)
if err != nil {
return err
gw := gzip.NewWriter(fw)
defer gw.Close()
- filePath = f.LocalDir + "/" + fileName + ".json"
+ filePath = f.LocalDir + fileName + ".json"
fr, err := os.Open(filePath)
if err != nil {
return err
// GzipUncompress uncompress Gzip file
func (f *FileUtil) GzipUncompress(fileName string) error {
- filedirname := f.LocalDir + "/" + fileName + ".json.gz"
+ filedirname := f.LocalDir + fileName + ".json.gz"
fr, err := os.Open(filedirname)
if err != nil {
return err
buf := make([]byte, READ_SIZE)
n, err := gr.Read(buf)
- filedirname = f.LocalDir + "/" + gr.Header.Name
+ filedirname = f.LocalDir + gr.Header.Name
fw, err := os.Create(filedirname)
if err != nil {
return err
--- /dev/null
+package util
+
+import (
+ "io"
+ "io/ioutil"
+)
+
+// Interval determines the number of blocks in a Gzip file in the Interval of blockHeight
+// StartBlockHeight is the start of the Interval
+// EndBlockHeight: the end of the Interval
+// GzSize is the number of blocks store in a Gzip file
+type Interval struct {
+ StartBlockHeight uint64
+ EndBlockHeight uint64
+ GzSize uint64
+}
+
+// NewInterval creates a new Interval from info.json
+func NewInterval(start, end, gzSize uint64) *Interval {
+ return &Interval{
+ StartBlockHeight: start,
+ EndBlockHeight: end,
+ GzSize: gzSize,
+ }
+}
+
+// Info is a struct for info.json
+type Info struct {
+ LatestBlockHeight uint64
+ Interval []*Interval
+}
+
+// NewInfo creates a new Info for info.json
+func NewInfo(end, gzSize uint64) *Info {
+ newInvl := NewInterval(0, end, gzSize)
+ var arr []*Interval
+ arr = append(arr, newInvl)
+ return &Info{0, arr}
+}
+
+// GetInfoJson from stream
+func GetInfoJson(body io.ReadCloser) (*Info, error) {
+ defer body.Close()
+
+ data, err := ioutil.ReadAll(body)
+ if err != nil {
+ return nil, err
+ }
+
+ info := new(Info)
+ err = Json2Struct(data, &info)
+ return info, err
+}
// SaveBlockFile saves block file
func (f *FileUtil) SaveBlockFile(filename string, data interface{}) (bool, error) {
- filename = f.LocalDir + "/" + filename + ".json"
+ filename = f.LocalDir + filename + ".json"
saveData, err := json.Marshal(data)
if err != nil {
return false, err
// GetJson read json file
func (f *FileUtil) GetJson(filename string) (json.RawMessage, error) {
- filename = f.LocalDir + "/" + filename + ".json"
+ filename = f.LocalDir + filename + ".json"
return ioutil.ReadFile(filename)
}