OSDN Git Service

try to fix ban peer bug (#273)
[bytom/vapor.git] / node / node.go
index 5774488..d4c7fd5 100644 (file)
@@ -1,21 +1,18 @@
 package node
 
 import (
-       "context"
-       "encoding/json"
+       "encoding/hex"
        "errors"
        "net"
        "net/http"
        _ "net/http/pprof"
        "os"
        "path/filepath"
-       "strings"
-       "time"
+       "reflect"
 
        "github.com/prometheus/prometheus/util/flock"
        log "github.com/sirupsen/logrus"
        cmn "github.com/tendermint/tmlibs/common"
-       dbm "github.com/tendermint/tmlibs/db"
        browser "github.com/toqueteos/webbrowser"
 
        "github.com/vapor/accesstoken"
@@ -23,105 +20,88 @@ import (
        "github.com/vapor/api"
        "github.com/vapor/asset"
        "github.com/vapor/blockchain/pseudohsm"
-       "github.com/vapor/blockchain/txfeed"
-       "github.com/vapor/common"
        cfg "github.com/vapor/config"
        "github.com/vapor/consensus"
-       engine "github.com/vapor/consensus/consensus"
-       dpos "github.com/vapor/consensus/consensus/dpos"
-       "github.com/vapor/crypto/ed25519/chainkd"
-       "github.com/vapor/database/leveldb"
+       "github.com/vapor/database"
+       dbm "github.com/vapor/database/leveldb"
        "github.com/vapor/env"
-       "github.com/vapor/mining/miner"
+       "github.com/vapor/event"
        "github.com/vapor/net/websocket"
        "github.com/vapor/netsync"
+       "github.com/vapor/proposal/blockproposer"
        "github.com/vapor/protocol"
-       "github.com/vapor/protocol/bc"
-       "github.com/vapor/util"
+       "github.com/vapor/protocol/bc/types"
        w "github.com/vapor/wallet"
 )
 
 const (
-       webHost           = "http://127.0.0.1"
-       maxNewBlockChSize = 1024
+       webHost   = "http://127.0.0.1"
+       logModule = "node"
 )
 
+// Node represent bytom node
 type Node struct {
        cmn.BaseService
 
-       // config
-       config *cfg.Config
+       config          *cfg.Config
+       eventDispatcher *event.Dispatcher
+       syncManager     *netsync.SyncManager
 
-       syncManager *netsync.SyncManager
-
-       //bcReactor    *bc.BlockchainReactor
        wallet          *w.Wallet
        accessTokens    *accesstoken.CredentialStore
        notificationMgr *websocket.WSNotificationManager
        api             *api.API
        chain           *protocol.Chain
-       txfeed          *txfeed.Tracker
-       //cpuMiner        *cpuminer.CPUMiner
-       miner *miner.Miner
-
-       miningEnable bool
-
-       newBlockCh chan *bc.Hash
-
-       engine engine.Engine
+       cpuMiner        *blockproposer.BlockProposer
+       miningEnable    bool
 }
 
+// NewNode create bytom node
 func NewNode(config *cfg.Config) *Node {
-       ctx := context.Background()
        if err := lockDataDirectory(config); err != nil {
                cmn.Exit("Error: " + err.Error())
        }
+
+       if err := cfg.LoadFederationFile(config.FederationFile(), config); err != nil {
+               cmn.Exit(cmn.Fmt("Failed to load federated information:[%s]", err.Error()))
+       }
+
+       log.WithFields(log.Fields{
+               "module":             logModule,
+               "pubkey":             config.PrivateKey().XPub(),
+               "fed_xpubs":          config.Federation.Xpubs,
+               "fed_quorum":         config.Federation.Quorum,
+               "fed_controlprogram": hex.EncodeToString(cfg.FederationWScript(config)),
+       }).Info()
+
        initLogFile(config)
        initActiveNetParams(config)
-       initConsensusConfig(config)
        initCommonConfig(config)
 
-       util.MainchainConfig = config.MainChain
-       util.ValidatePegin = config.ValidatePegin
        // Get store
        if config.DBBackend != "memdb" && config.DBBackend != "leveldb" {
                cmn.Exit(cmn.Fmt("Param db_backend [%v] is invalid, use leveldb or memdb", config.DBBackend))
        }
        coreDB := dbm.NewDB("core", config.DBBackend, config.DBDir())
-       store := leveldb.NewStore(coreDB)
+       store := database.NewStore(coreDB)
 
        tokenDB := dbm.NewDB("accesstoken", config.DBBackend, config.DBDir())
        accessTokens := accesstoken.NewStore(tokenDB)
 
-       var engine engine.Engine
-       switch config.Consensus.Type {
-       case "dpos":
-               engine = dpos.GDpos
-       }
-
-       txPool := protocol.NewTxPool(store)
-       chain, err := protocol.NewChain(store, txPool, engine)
+       dispatcher := event.NewDispatcher()
+       txPool := protocol.NewTxPool(store, dispatcher)
+       chain, err := protocol.NewChain(store, txPool, dispatcher)
        if err != nil {
                cmn.Exit(cmn.Fmt("Failed to create chain structure: %v", err))
        }
 
-       switch config.Consensus.Type {
-       case "dpos":
-               initDpos(chain, config)
+       if err := checkConfig(chain, config); err != nil {
+               panic(err)
        }
 
-       var accounts *account.Manager = nil
-       var assets *asset.Registry = nil
-       var wallet *w.Wallet = nil
-       var txFeed *txfeed.Tracker = nil
-
-       txFeedDB := dbm.NewDB("txfeeds", config.DBBackend, config.DBDir())
-       txFeed = txfeed.NewTracker(txFeedDB, chain)
-
-       if err = txFeed.Prepare(ctx); err != nil {
-               log.WithField("error", err).Error("start txfeed")
-               return nil
-       }
+       var accounts *account.Manager
+       var assets *asset.Registry
+       var wallet *w.Wallet
 
        hsm, err := pseudohsm.New(config.KeysDir())
        if err != nil {
@@ -129,16 +109,14 @@ func NewNode(config *cfg.Config) *Node {
        }
 
        if !config.Wallet.Disable {
-               address, err := common.DecodeAddress(config.Consensus.Coinbase, &consensus.ActiveNetParams)
-               if err != nil {
-                       cmn.Exit(cmn.Fmt("DecodeAddress: %v", err))
-               }
                walletDB := dbm.NewDB("wallet", config.DBBackend, config.DBDir())
-               accounts = account.NewManager(walletDB, chain)
+               walletStore := database.NewWalletStore(walletDB)
+               accountStore := database.NewAccountStore(walletDB)
+               accounts = account.NewManager(accountStore, chain)
                assets = asset.NewRegistry(walletDB, chain)
-               wallet, err = w.NewWallet(walletDB, accounts, assets, hsm, chain, address)
+               wallet, err = w.NewWallet(walletStore, accounts, assets, hsm, chain, dispatcher, config.Wallet.TxIndex)
                if err != nil {
-                       log.WithField("error", err).Error("init NewWallet")
+                       log.WithFields(log.Fields{"module": logModule, "error": err}).Error("init NewWallet")
                }
 
                // trigger rescan wallet
@@ -146,14 +124,13 @@ func NewNode(config *cfg.Config) *Node {
                        wallet.RescanBlocks()
                }
        }
-       newBlockCh := make(chan *bc.Hash, maxNewBlockChSize)
-
-       syncManager, _ := netsync.NewSyncManager(config, chain, txPool, newBlockCh)
-
-       notificationMgr := websocket.NewWsNotificationManager(config.Websocket.MaxNumWebsockets, config.Websocket.MaxNumConcurrentReqs, chain)
+       fastSyncDB := dbm.NewDB("fastsync", config.DBBackend, config.DBDir())
+       syncManager, err := netsync.NewSyncManager(config, chain, txPool, dispatcher,fastSyncDB)
+       if err != nil {
+               cmn.Exit(cmn.Fmt("Failed to create sync manager: %v", err))
+       }
 
-       // get transaction from txPool and send it to syncManager and wallet
-       go newPoolTxListener(txPool, syncManager, wallet, notificationMgr)
+       notificationMgr := websocket.NewWsNotificationManager(config.Websocket.MaxNumWebsockets, config.Websocket.MaxNumConcurrentReqs, chain, dispatcher)
 
        // run the profile server
        profileHost := config.ProfListenAddress
@@ -166,48 +143,38 @@ func NewNode(config *cfg.Config) *Node {
                        }
                }()
        }
+
        node := &Node{
-               config:       config,
-               syncManager:  syncManager,
-               accessTokens: accessTokens,
-               wallet:       wallet,
-               chain:        chain,
-               txfeed:       txFeed,
-               miningEnable: config.Mining,
-
-               newBlockCh:      newBlockCh,
+               eventDispatcher: dispatcher,
+               config:          config,
+               syncManager:     syncManager,
+               accessTokens:    accessTokens,
+               wallet:          wallet,
+               chain:           chain,
+               miningEnable:    config.Mining,
+
                notificationMgr: notificationMgr,
-               engine:          engine,
        }
 
-       node.miner = miner.NewMiner(chain, accounts, txPool, newBlockCh, engine)
+       node.cpuMiner = blockproposer.NewBlockProposer(chain, accounts, txPool, dispatcher)
        node.BaseService = *cmn.NewBaseService(nil, "Node", node)
-
        return node
 }
 
-// newPoolTxListener listener transaction from txPool, and send it to syncManager and wallet
-func newPoolTxListener(txPool *protocol.TxPool, syncManager *netsync.SyncManager, wallet *w.Wallet, notificationMgr *websocket.WSNotificationManager) {
-       txMsgCh := txPool.GetMsgCh()
-       syncManagerTxCh := syncManager.GetNewTxCh()
-
-       for {
-               msg := <-txMsgCh
-               switch msg.MsgType {
-               case protocol.MsgNewTx:
-                       syncManagerTxCh <- msg.Tx
-                       if wallet != nil {
-                               wallet.AddUnconfirmedTx(msg.TxDesc)
-                       }
-                       notificationMgr.NotifyMempoolTx(msg.Tx)
-               case protocol.MsgRemoveTx:
-                       if wallet != nil {
-                               wallet.RemoveUnconfirmedTx(msg.TxDesc)
-                       }
-               default:
-                       log.Warn("got unknow message type from the txPool channel")
+// find whether config xpubs equal genesis block xpubs
+func checkConfig(chain *protocol.Chain, config *cfg.Config) error {
+       fedpegScript := cfg.FederationWScript(config)
+       genesisBlock, err := chain.GetBlockByHeight(0)
+       if err != nil {
+               return err
+       }
+       typedInput := genesisBlock.Transactions[0].Inputs[0].TypedInput
+       if v, ok := typedInput.(*types.CoinbaseInput); ok {
+               if !reflect.DeepEqual(fedpegScript, v.Arbitrary) {
+                       return errors.New("config xpubs don't equal genesis block xpubs.")
                }
        }
+       return nil
 }
 
 // Lock data directory after daemonization
@@ -225,20 +192,6 @@ func initActiveNetParams(config *cfg.Config) {
        if !exist {
                cmn.Exit(cmn.Fmt("chain_id[%v] don't exist", config.ChainID))
        }
-       if config.Side.FedpegXPubs != "" {
-               var federationRedeemXPubs []chainkd.XPub
-               fedpegXPubs := strings.Split(config.Side.FedpegXPubs, ",")
-               for _, xpubStr := range fedpegXPubs {
-                       var xpub chainkd.XPub
-                       xpub.UnmarshalText([]byte(xpubStr))
-                       federationRedeemXPubs = append(federationRedeemXPubs, xpub)
-               }
-               consensus.ActiveNetParams.FedpegXPubs = federationRedeemXPubs
-       }
-
-       consensus.ActiveNetParams.Signer = config.Signer
-       consensus.ActiveNetParams.PeginMinDepth = config.Side.PeginMinDepth
-       consensus.ActiveNetParams.ParentGenesisBlockHash = config.Side.ParentGenesisBlockHash
 }
 
 func initLogFile(config *cfg.Config) {
@@ -250,7 +203,7 @@ func initLogFile(config *cfg.Config) {
        if err == nil {
                log.SetOutput(file)
        } else {
-               log.WithField("err", err).Info("using default")
+               log.WithFields(log.Fields{"module": logModule, "err": err}).Info("using default")
        }
 
 }
@@ -269,8 +222,8 @@ func launchWebBrowser(port string) {
        }
 }
 
-func (n *Node) initAndstartApiServer() {
-       n.api = api.NewAPI(n.syncManager, n.wallet, n.txfeed, n.miner, n.chain, n.config, n.accessTokens, n.newBlockCh, n.notificationMgr)
+func (n *Node) initAndstartAPIServer() {
+       n.api = api.NewAPI(n.syncManager, n.wallet, n.cpuMiner, n.chain, n.config, n.accessTokens, n.eventDispatcher, n.notificationMgr)
 
        listenAddr := env.String("LISTEN", n.config.ApiAddress)
        env.Parse()
@@ -283,15 +236,20 @@ func (n *Node) OnStart() error {
                        n.miningEnable = false
                        log.Error(err)
                } else {
-                       //n.cpuMiner.Start()
-                       n.miner.Start()
+                       n.cpuMiner.Start()
                }
        }
        if !n.config.VaultMode {
-               n.syncManager.Start()
+               if err := n.syncManager.Start(); err != nil {
+                       return err
+               }
        }
-       n.initAndstartApiServer()
-       n.notificationMgr.Start()
+
+       n.initAndstartAPIServer()
+       if err := n.notificationMgr.Start(); err != nil {
+               return err
+       }
+
        if !n.config.Web.Closed {
                _, port, err := net.SplitHostPort(n.config.ApiAddress)
                if err != nil {
@@ -300,24 +258,20 @@ func (n *Node) OnStart() error {
                }
                launchWebBrowser(port)
        }
-       go bytomdRPCCheck()
        return nil
 }
 
 func (n *Node) OnStop() {
-       if err := n.engine.Finish(); err != nil {
-               log.Errorf("OnStop: %v", err)
-       }
-
        n.notificationMgr.Shutdown()
        n.notificationMgr.WaitForShutdown()
        n.BaseService.OnStop()
        if n.miningEnable {
-               n.miner.Stop()
+               n.cpuMiner.Stop()
        }
        if !n.config.VaultMode {
                n.syncManager.Stop()
        }
+       n.eventDispatcher.Stop()
 }
 
 func (n *Node) RunForever() {
@@ -326,79 +280,3 @@ func (n *Node) RunForever() {
                n.Stop()
        })
 }
-
-func (n *Node) SyncManager() *netsync.SyncManager {
-       return n.syncManager
-}
-
-/**bytomdRPCCheck Check if bytomd connection via RPC is correctly working*/
-func bytomdRPCCheck() bool {
-       type Req struct {
-               BlockHeight uint64 `json:"block_height"`
-       }
-       if util.ValidatePegin {
-               for {
-                       resp, err := util.CallRPC("/get-merkle-proof", &Req{BlockHeight: 0})
-                       if err != nil {
-                               log.Error("Call mainchain interface get-block-header failed")
-                               time.Sleep(time.Millisecond * 1000)
-                               continue
-                       }
-                       tmp, _ := json.Marshal(resp)
-                       var blockHeader api.GetBlockHeaderResp
-                       json.Unmarshal(tmp, &blockHeader)
-                       hash := blockHeader.BlockHeader.Hash()
-                       if strings.Compare(consensus.ActiveNetParams.ParentGenesisBlockHash, hash.String()) != 0 {
-                               log.Error("Invalid parent genesis block hash response via RPC. Contacting wrong parent daemon?", consensus.ActiveNetParams.ParentGenesisBlockHash, ":", hash.String())
-                               return false
-                       }
-                       break
-               }
-       }
-
-       return true
-}
-
-func initConsensusConfig(config *cfg.Config) {
-       if config.ConsensusConfigFile == "" {
-               // poa
-       } else {
-               //
-               file, err := os.Open(config.ConsensusConfigFile)
-               if err != nil {
-                       cmn.Exit(cmn.Fmt("Failed to read consensus file: %v", err))
-               }
-               defer file.Close()
-
-               if err := json.NewDecoder(file).Decode(config); err != nil {
-                       cmn.Exit(cmn.Fmt("invalid consensus file: %v", err))
-               }
-
-               for _, v := range config.Consensus.SelfVoteSigners {
-                       address, err := common.DecodeAddress(v, &consensus.ActiveNetParams)
-                       if err != nil {
-                               cmn.Exit(cmn.Fmt("Address resolution failed: %v", err))
-                       }
-                       config.Consensus.Signers = append(config.Consensus.Signers, address)
-               }
-       }
-}
-
-func initDpos(chain *protocol.Chain, config *cfg.Config) {
-       header := chain.BestBlockHeader()
-       height := header.Height
-       hash := header.Hash()
-       maxSignerCount := config.Consensus.MaxSignerCount
-       period := config.Consensus.Period
-       if err := dpos.GDpos.Init(chain, maxSignerCount, period, height, hash); err != nil {
-               cmn.Exit(cmn.Fmt("initVote: Dpos new: %v", err))
-       }
-
-       if height > 0 {
-               oldBlockHeight := dpos.GDpos.GetOldBlockHeight()
-               oldBlockHash := dpos.GDpos.GetOldBlockHash()
-               if err := chain.RepairDPoSData(oldBlockHeight, oldBlockHash); err != nil {
-                       cmn.Exit(cmn.Fmt("initVote failed: %v", err))
-               }
-       }
-}