OSDN Git Service

versoin1.1.9 (#594)
[bytom/vapor.git] / node / node.go
index 3dc3190..87d7456 100644 (file)
@@ -1,37 +1,39 @@
 package node
 
 import (
-       "context"
        "encoding/hex"
        "errors"
        "net"
        "net/http"
+       // debug tool
        _ "net/http/pprof"
-       "os"
        "path/filepath"
+       "reflect"
 
        "github.com/prometheus/prometheus/util/flock"
        log "github.com/sirupsen/logrus"
        cmn "github.com/tendermint/tmlibs/common"
        browser "github.com/toqueteos/webbrowser"
 
-       "github.com/vapor/accesstoken"
-       "github.com/vapor/account"
-       "github.com/vapor/api"
-       "github.com/vapor/asset"
-       "github.com/vapor/blockchain/pseudohsm"
-       "github.com/vapor/blockchain/txfeed"
-       cfg "github.com/vapor/config"
-       "github.com/vapor/consensus"
-       "github.com/vapor/database"
-       dbm "github.com/vapor/database/leveldb"
-       "github.com/vapor/env"
-       "github.com/vapor/event"
-       "github.com/vapor/net/websocket"
-       "github.com/vapor/netsync"
-       "github.com/vapor/proposal/blockproposer"
-       "github.com/vapor/protocol"
-       w "github.com/vapor/wallet"
+       "github.com/bytom/vapor/accesstoken"
+       "github.com/bytom/vapor/account"
+       "github.com/bytom/vapor/api"
+       "github.com/bytom/vapor/application/mov"
+       "github.com/bytom/vapor/asset"
+       "github.com/bytom/vapor/blockchain/pseudohsm"
+       cfg "github.com/bytom/vapor/config"
+       "github.com/bytom/vapor/consensus"
+       "github.com/bytom/vapor/database"
+       dbm "github.com/bytom/vapor/database/leveldb"
+       "github.com/bytom/vapor/env"
+       "github.com/bytom/vapor/event"
+       vaporLog "github.com/bytom/vapor/log"
+       "github.com/bytom/vapor/net/websocket"
+       "github.com/bytom/vapor/netsync"
+       "github.com/bytom/vapor/proposal/blockproposer"
+       "github.com/bytom/vapor/protocol"
+       "github.com/bytom/vapor/protocol/bc/types"
+       w "github.com/bytom/vapor/wallet"
 )
 
 const (
@@ -52,21 +54,16 @@ type Node struct {
        notificationMgr *websocket.WSNotificationManager
        api             *api.API
        chain           *protocol.Chain
-       txfeed          *txfeed.Tracker
        cpuMiner        *blockproposer.BlockProposer
        miningEnable    bool
 }
 
 // NewNode create bytom node
 func NewNode(config *cfg.Config) *Node {
-       ctx := context.Background()
+       initNodeConfig(config)
 
-       if err := lockDataDirectory(config); err != nil {
-               cmn.Exit("Error: " + err.Error())
-       }
-
-       if err := cfg.LoadFederationFile(config.FederationFile(), config); err != nil {
-               cmn.Exit(cmn.Fmt("Failed to load federated information:[%s]", err.Error()))
+       if err := vaporLog.InitLogFile(config); err != nil {
+               log.WithField("err", err).Fatalln("InitLogFile failed")
        }
 
        log.WithFields(log.Fields{
@@ -74,13 +71,9 @@ func NewNode(config *cfg.Config) *Node {
                "pubkey":             config.PrivateKey().XPub(),
                "fed_xpubs":          config.Federation.Xpubs,
                "fed_quorum":         config.Federation.Quorum,
-               "fed_controlprogram": hex.EncodeToString(cfg.FederationProgrom(config)),
+               "fed_controlprogram": hex.EncodeToString(cfg.FederationWScript(config, 0)),
        }).Info()
 
-       initLogFile(config)
-       initActiveNetParams(config)
-       initCommonConfig(config)
-
        // Get store
        if config.DBBackend != "memdb" && config.DBBackend != "leveldb" {
                cmn.Exit(cmn.Fmt("Param db_backend [%v] is invalid, use leveldb or memdb", config.DBBackend))
@@ -92,24 +85,21 @@ func NewNode(config *cfg.Config) *Node {
        accessTokens := accesstoken.NewStore(tokenDB)
 
        dispatcher := event.NewDispatcher()
-       txPool := protocol.NewTxPool(store, dispatcher)
-       chain, err := protocol.NewChain(store, txPool, dispatcher)
+       movCore := mov.NewCore(config.DBBackend, config.DBDir(), consensus.ActiveNetParams.MovStartHeight)
+       assetFilter := protocol.NewAssetFilter(config.CrossChain.AssetWhitelist)
+       txPool := protocol.NewTxPool(store, []protocol.DustFilterer{movCore, assetFilter}, dispatcher)
+       chain, err := protocol.NewChain(store, txPool, []protocol.SubProtocol{movCore}, dispatcher)
        if err != nil {
                cmn.Exit(cmn.Fmt("Failed to create chain structure: %v", err))
        }
 
+       if err := checkConfig(chain, config); err != nil {
+               panic(err)
+       }
+
        var accounts *account.Manager
        var assets *asset.Registry
        var wallet *w.Wallet
-       var txFeed *txfeed.Tracker
-
-       txFeedDB := dbm.NewDB("txfeeds", config.DBBackend, config.DBDir())
-       txFeed = txfeed.NewTracker(txFeedDB, chain)
-
-       if err = txFeed.Prepare(ctx); err != nil {
-               log.WithFields(log.Fields{"module": logModule, "error": err}).Error("start txfeed")
-               return nil
-       }
 
        hsm, err := pseudohsm.New(config.KeysDir())
        if err != nil {
@@ -118,20 +108,26 @@ func NewNode(config *cfg.Config) *Node {
 
        if !config.Wallet.Disable {
                walletDB := dbm.NewDB("wallet", config.DBBackend, config.DBDir())
-               accounts = account.NewManager(walletDB, chain)
+               walletStore := database.NewWalletStore(walletDB)
+               accountStore := database.NewAccountStore(walletDB)
+               accounts = account.NewManager(accountStore, chain)
                assets = asset.NewRegistry(walletDB, chain)
-               wallet, err = w.NewWallet(walletDB, accounts, assets, hsm, chain, dispatcher, config.Wallet.TxIndex)
+               wallet, err = w.NewWallet(walletStore, accounts, assets, hsm, chain, dispatcher, config.Wallet.TxIndex)
                if err != nil {
                        log.WithFields(log.Fields{"module": logModule, "error": err}).Error("init NewWallet")
                }
 
+               if err = wallet.Run(); err != nil {
+                       log.WithFields(log.Fields{"module": logModule, "error": err}).Error("init NewWallet work running thread")
+               }
+
                // trigger rescan wallet
                if config.Wallet.Rescan {
                        wallet.RescanBlocks()
                }
        }
-
-       syncManager, err := netsync.NewSyncManager(config, chain, txPool, dispatcher)
+       fastSyncDB := dbm.NewDB("fastsync", config.DBBackend, config.DBDir())
+       syncManager, err := netsync.NewSyncManager(config, chain, txPool, dispatcher, fastSyncDB)
        if err != nil {
                cmn.Exit(cmn.Fmt("Failed to create sync manager: %v", err))
        }
@@ -141,7 +137,7 @@ func NewNode(config *cfg.Config) *Node {
        // run the profile server
        profileHost := config.ProfListenAddress
        if profileHost != "" {
-               // Profiling bytomd programs.see (https://blog.golang.org/profiling-go-programs)
+               // Profiling vapord programs.see (https://blog.golang.org/profiling-go-programs)
                // go tool pprof http://profileHose/debug/pprof/heap
                go func() {
                        if err = http.ListenAndServe(profileHost, nil); err != nil {
@@ -157,7 +153,6 @@ func NewNode(config *cfg.Config) *Node {
                accessTokens:    accessTokens,
                wallet:          wallet,
                chain:           chain,
-               txfeed:          txFeed,
                miningEnable:    config.Mining,
 
                notificationMgr: notificationMgr,
@@ -168,35 +163,92 @@ func NewNode(config *cfg.Config) *Node {
        return node
 }
 
-// Lock data directory after daemonization
-func lockDataDirectory(config *cfg.Config) error {
-       _, _, err := flock.New(filepath.Join(config.RootDir, "LOCK"))
+// Rollback rollback chain from one height to targetHeight
+func Rollback(config *cfg.Config, targetHeight uint64) error {
+       if err := initNodeConfig(config); err != nil {
+               return err
+       }
+
+       // Get store
+       if config.DBBackend != "leveldb" {
+               return errors.New("Param db_backend is invalid, use leveldb")
+       }
+
+       coreDB := dbm.NewDB("core", config.DBBackend, config.DBDir())
+       store := database.NewStore(coreDB)
+
+       dispatcher := event.NewDispatcher()
+       movCore := mov.NewCore(config.DBBackend, config.DBDir(), consensus.ActiveNetParams.MovStartHeight)
+       txPool := protocol.NewTxPool(store, []protocol.DustFilterer{movCore}, dispatcher)
+       chain, err := protocol.NewChain(store, txPool, []protocol.SubProtocol{movCore}, dispatcher)
        if err != nil {
-               return errors.New("datadir already used by another process")
+               return err
        }
-       return nil
+
+       hsm, err := pseudohsm.New(config.KeysDir())
+       if err != nil {
+               return err
+       }
+
+       walletDB := dbm.NewDB("wallet", config.DBBackend, config.DBDir())
+       walletStore := database.NewWalletStore(walletDB)
+       accountStore := database.NewAccountStore(walletDB)
+       accounts := account.NewManager(accountStore, chain)
+       assets := asset.NewRegistry(walletDB, chain)
+       wallet, err := w.NewWallet(walletStore, accounts, assets, hsm, chain, dispatcher, config.Wallet.TxIndex)
+       if err != nil {
+               return err
+       }
+
+       if err := wallet.Rollback(targetHeight); err != nil {
+               return err
+       }
+
+       return chain.Rollback(targetHeight)
 }
 
-func initActiveNetParams(config *cfg.Config) {
-       var exist bool
-       consensus.ActiveNetParams, exist = consensus.NetParams[config.ChainID]
-       if !exist {
-               cmn.Exit(cmn.Fmt("chain_id[%v] don't exist", config.ChainID))
+func initNodeConfig(config *cfg.Config) error {
+       if err := lockDataDirectory(config); err != nil {
+               log.WithField("err", err).Info("Error: " + err.Error())
+               return err
+       }
+
+       if err := cfg.LoadFederationFile(config.FederationFile(), config); err != nil {
+               log.WithField("err", err).Info("Failed to load federated information")
+               return err
+       }
+
+       if err := consensus.InitActiveNetParams(config.ChainID); err != nil {
+               log.Fatalf("Failed to init ActiveNetParams:[%s]", err.Error())
        }
+
+       cfg.CommonConfig = config
+       return nil
 }
 
-func initLogFile(config *cfg.Config) {
-       if config.LogFile == "" {
-               return
+// find whether config xpubs equal genesis block xpubs
+func checkConfig(chain *protocol.Chain, config *cfg.Config) error {
+       fedpegScript := cfg.FederationWScript(config, 0)
+       genesisBlock, err := chain.GetBlockByHeight(0)
+       if err != nil {
+               return err
        }
-       cmn.EnsureDir(filepath.Dir(config.LogFile), 0700)
-       file, err := os.OpenFile(config.LogFile, os.O_CREATE|os.O_WRONLY|os.O_APPEND, 0666)
-       if err == nil {
-               log.SetOutput(file)
-       } else {
-               log.WithFields(log.Fields{"module": logModule, "err": err}).Info("using default")
+       typedInput := genesisBlock.Transactions[0].Inputs[0].TypedInput
+       if v, ok := typedInput.(*types.CoinbaseInput); ok {
+               if !reflect.DeepEqual(fedpegScript, v.Arbitrary) {
+                       return errors.New("config xpubs don't equal genesis block xpubs")
+               }
        }
+       return nil
+}
 
+// Lock data directory after daemonization
+func lockDataDirectory(config *cfg.Config) error {
+       _, _, err := flock.New(filepath.Join(config.RootDir, "LOCK"))
+       if err != nil {
+               return errors.New("datadir already used by another process")
+       }
+       return nil
 }
 
 func initCommonConfig(config *cfg.Config) {
@@ -214,13 +266,14 @@ func launchWebBrowser(port string) {
 }
 
 func (n *Node) initAndstartAPIServer() {
-       n.api = api.NewAPI(n.syncManager, n.wallet, n.txfeed, n.cpuMiner, n.chain, n.config, n.accessTokens, n.eventDispatcher, n.notificationMgr)
+       n.api = api.NewAPI(n.syncManager, n.wallet, n.cpuMiner, n.chain, n.config, n.accessTokens, n.eventDispatcher, n.notificationMgr)
 
        listenAddr := env.String("LISTEN", n.config.ApiAddress)
        env.Parse()
        n.api.StartServer(*listenAddr)
 }
 
+// OnStart implements BaseService
 func (n *Node) OnStart() error {
        if n.miningEnable {
                if _, err := n.wallet.AccountMgr.GetMiningAddress(); err != nil {
@@ -252,6 +305,7 @@ func (n *Node) OnStart() error {
        return nil
 }
 
+// OnStop implements BaseService
 func (n *Node) OnStop() {
        n.notificationMgr.Shutdown()
        n.notificationMgr.WaitForShutdown()
@@ -265,9 +319,15 @@ func (n *Node) OnStop() {
        n.eventDispatcher.Stop()
 }
 
+// RunForever listen to the stop signal
 func (n *Node) RunForever() {
        // Sleep forever and then...
        cmn.TrapSignal(func() {
                n.Stop()
        })
 }
+
+// GetChain return the chain
+func (n *Node) GetChain() *protocol.Chain {
+       return n.chain
+}