X-Git-Url: http://git.osdn.net/view?p=bytom%2Fvapor.git;a=blobdiff_plain;f=node%2Fnode.go;h=8f05938c6f5c9d86afed9e13c3a819602e125df6;hp=d46f06ddccfde445881a5b7610e980e7c99823da;hb=6f7fe6fd7442ddcec8ee959c09b6d45639ef045f;hpb=9136364633fe4117ebc87f0ea601fc36b5eff135 diff --git a/node/node.go b/node/node.go index d46f06dd..8f05938c 100644 --- a/node/node.go +++ b/node/node.go @@ -2,20 +2,17 @@ package node import ( "context" - "encoding/json" + "encoding/hex" "errors" "net" "net/http" _ "net/http/pprof" "os" "path/filepath" - "strings" - "time" "github.com/prometheus/prometheus/util/flock" log "github.com/sirupsen/logrus" cmn "github.com/tendermint/tmlibs/common" - dbm "github.com/tendermint/tmlibs/db" browser "github.com/toqueteos/webbrowser" "github.com/vapor/accesstoken" @@ -24,93 +21,92 @@ import ( "github.com/vapor/asset" "github.com/vapor/blockchain/pseudohsm" "github.com/vapor/blockchain/txfeed" - "github.com/vapor/common" cfg "github.com/vapor/config" "github.com/vapor/consensus" - engine "github.com/vapor/consensus/consensus" - "github.com/vapor/consensus/consensus/dpos" - "github.com/vapor/crypto/ed25519/chainkd" - "github.com/vapor/database/leveldb" + "github.com/vapor/database" + dbm "github.com/vapor/database/leveldb" "github.com/vapor/env" - "github.com/vapor/mining/miner" - "github.com/vapor/mining/miningpool" + "github.com/vapor/event" "github.com/vapor/net/websocket" "github.com/vapor/netsync" + "github.com/vapor/proposal/blockproposer" "github.com/vapor/protocol" - "github.com/vapor/protocol/bc" - "github.com/vapor/util" w "github.com/vapor/wallet" ) const ( - webHost = "http://127.0.0.1" - maxNewBlockChSize = 1024 + webHost = "http://127.0.0.1" + logModule = "node" ) -var consensusEngine engine.Engine - +// Node represent bytom node type Node struct { cmn.BaseService - // config - config *cfg.Config - - syncManager *netsync.SyncManager + config *cfg.Config + eventDispatcher *event.Dispatcher + syncManager *netsync.SyncManager - //bcReactor *bc.BlockchainReactor wallet *w.Wallet accessTokens *accesstoken.CredentialStore notificationMgr *websocket.WSNotificationManager api *api.API chain *protocol.Chain txfeed *txfeed.Tracker - //cpuMiner *cpuminer.CPUMiner - miner *miner.Miner - - miningPool *miningpool.MiningPool - miningEnable bool - - newBlockCh chan *bc.Hash + cpuMiner *blockproposer.BlockProposer + miningEnable bool } +// NewNode create bytom node func NewNode(config *cfg.Config) *Node { ctx := context.Background() + if err := lockDataDirectory(config); err != nil { cmn.Exit("Error: " + err.Error()) } + + if err := cfg.LoadFederationFile(config.FederationFile(), config); err != nil { + cmn.Exit(cmn.Fmt("Failed to load federated information:[%s]", err.Error())) + } + + log.WithFields(log.Fields{ + "module": logModule, + "fed_xpubs": config.Federation.Xpubs, + "fed_quorum": config.Federation.Quorum, + "fed_controlprogram": hex.EncodeToString(cfg.FederationProgrom(config)), + }).Info() + initLogFile(config) initActiveNetParams(config) - initConsensusConfig(config) initCommonConfig(config) - util.MainchainConfig = config.MainChain - util.ValidatePegin = config.ValidatePegin // Get store if config.DBBackend != "memdb" && config.DBBackend != "leveldb" { cmn.Exit(cmn.Fmt("Param db_backend [%v] is invalid, use leveldb or memdb", config.DBBackend)) } coreDB := dbm.NewDB("core", config.DBBackend, config.DBDir()) - store := leveldb.NewStore(coreDB) + store := database.NewStore(coreDB) tokenDB := dbm.NewDB("accesstoken", config.DBBackend, config.DBDir()) accessTokens := accesstoken.NewStore(tokenDB) - txPool := protocol.NewTxPool(store) - chain, err := protocol.NewChain(store, txPool) + dispatcher := event.NewDispatcher() + txPool := protocol.NewTxPool(store, dispatcher) + chain, err := protocol.NewChain(store, txPool, dispatcher) if err != nil { cmn.Exit(cmn.Fmt("Failed to create chain structure: %v", err)) } - var accounts *account.Manager = nil - var assets *asset.Registry = nil - var wallet *w.Wallet = nil - var txFeed *txfeed.Tracker = nil + var accounts *account.Manager + var assets *asset.Registry + var wallet *w.Wallet + var txFeed *txfeed.Tracker txFeedDB := dbm.NewDB("txfeeds", config.DBBackend, config.DBDir()) txFeed = txfeed.NewTracker(txFeedDB, chain) if err = txFeed.Prepare(ctx); err != nil { - log.WithField("error", err).Error("start txfeed") + log.WithFields(log.Fields{"module": logModule, "error": err}).Error("start txfeed") return nil } @@ -120,16 +116,12 @@ func NewNode(config *cfg.Config) *Node { } if !config.Wallet.Disable { - address, err := common.DecodeAddress(config.Consensus.Dpos.Coinbase, &consensus.ActiveNetParams) - if err != nil { - cmn.Exit(cmn.Fmt("DecodeAddress: %v", err)) - } walletDB := dbm.NewDB("wallet", config.DBBackend, config.DBDir()) accounts = account.NewManager(walletDB, chain) assets = asset.NewRegistry(walletDB, chain) - wallet, err = w.NewWallet(walletDB, accounts, assets, hsm, chain, address) + wallet, err = w.NewWallet(walletDB, accounts, assets, hsm, chain, dispatcher, config.Wallet.TxIndex) if err != nil { - log.WithField("error", err).Error("init NewWallet") + log.WithFields(log.Fields{"module": logModule, "error": err}).Error("init NewWallet") } // trigger rescan wallet @@ -137,14 +129,13 @@ func NewNode(config *cfg.Config) *Node { wallet.RescanBlocks() } } - newBlockCh := make(chan *bc.Hash, maxNewBlockChSize) - - syncManager, _ := netsync.NewSyncManager(config, chain, txPool, newBlockCh) - notificationMgr := websocket.NewWsNotificationManager(config.Websocket.MaxNumWebsockets, config.Websocket.MaxNumConcurrentReqs, chain) + syncManager, err := netsync.NewSyncManager(config, chain, txPool, dispatcher) + if err != nil { + cmn.Exit(cmn.Fmt("Failed to create sync manager: %v", err)) + } - // get transaction from txPool and send it to syncManager and wallet - go newPoolTxListener(txPool, syncManager, wallet, notificationMgr) + notificationMgr := websocket.NewWsNotificationManager(config.Websocket.MaxNumWebsockets, config.Websocket.MaxNumConcurrentReqs, chain, dispatcher) // run the profile server profileHost := config.ProfListenAddress @@ -159,52 +150,23 @@ func NewNode(config *cfg.Config) *Node { } node := &Node{ - config: config, - syncManager: syncManager, - accessTokens: accessTokens, - wallet: wallet, - chain: chain, - txfeed: txFeed, - miningEnable: config.Mining, - - newBlockCh: newBlockCh, + eventDispatcher: dispatcher, + config: config, + syncManager: syncManager, + accessTokens: accessTokens, + wallet: wallet, + chain: chain, + txfeed: txFeed, + miningEnable: config.Mining, + notificationMgr: notificationMgr, } - //node.cpuMiner = cpuminer.NewCPUMiner(chain, accounts, txPool, newBlockCh) - consensusEngine = createConsensusEngine(config, store) - node.miner = miner.NewMiner(chain, accounts, txPool, newBlockCh, consensusEngine) - node.miningPool = miningpool.NewMiningPool(chain, accounts, txPool, newBlockCh) - + node.cpuMiner = blockproposer.NewBlockProposer(chain, accounts, txPool, dispatcher) node.BaseService = *cmn.NewBaseService(nil, "Node", node) - return node } -// newPoolTxListener listener transaction from txPool, and send it to syncManager and wallet -func newPoolTxListener(txPool *protocol.TxPool, syncManager *netsync.SyncManager, wallet *w.Wallet, notificationMgr *websocket.WSNotificationManager) { - txMsgCh := txPool.GetMsgCh() - syncManagerTxCh := syncManager.GetNewTxCh() - - for { - msg := <-txMsgCh - switch msg.MsgType { - case protocol.MsgNewTx: - syncManagerTxCh <- msg.Tx - if wallet != nil { - wallet.AddUnconfirmedTx(msg.TxDesc) - } - notificationMgr.NotifyMempoolTx(msg.Tx) - case protocol.MsgRemoveTx: - if wallet != nil { - wallet.RemoveUnconfirmedTx(msg.TxDesc) - } - default: - log.Warn("got unknow message type from the txPool channel") - } - } -} - // Lock data directory after daemonization func lockDataDirectory(config *cfg.Config) error { _, _, err := flock.New(filepath.Join(config.RootDir, "LOCK")) @@ -220,20 +182,6 @@ func initActiveNetParams(config *cfg.Config) { if !exist { cmn.Exit(cmn.Fmt("chain_id[%v] don't exist", config.ChainID)) } - if config.Side.FedpegXPubs != "" { - var federationRedeemXPubs []chainkd.XPub - fedpegXPubs := strings.Split(config.Side.FedpegXPubs, ",") - for _, xpubStr := range fedpegXPubs { - var xpub chainkd.XPub - xpub.UnmarshalText([]byte(xpubStr)) - federationRedeemXPubs = append(federationRedeemXPubs, xpub) - } - consensus.ActiveNetParams.FedpegXPubs = federationRedeemXPubs - } - - consensus.ActiveNetParams.Signer = config.Signer - consensus.ActiveNetParams.PeginMinDepth = config.Side.PeginMinDepth - consensus.ActiveNetParams.ParentGenesisBlockHash = config.Side.ParentGenesisBlockHash } func initLogFile(config *cfg.Config) { @@ -245,7 +193,7 @@ func initLogFile(config *cfg.Config) { if err == nil { log.SetOutput(file) } else { - log.WithField("err", err).Info("using default") + log.WithFields(log.Fields{"module": logModule, "err": err}).Info("using default") } } @@ -264,8 +212,8 @@ func launchWebBrowser(port string) { } } -func (n *Node) initAndstartApiServer() { - n.api = api.NewAPI(n.syncManager, n.wallet, n.txfeed, n.miner, n.miningPool, n.chain, n.config, n.accessTokens, n.newBlockCh, n.notificationMgr) +func (n *Node) initAndstartAPIServer() { + n.api = api.NewAPI(n.syncManager, n.wallet, n.txfeed, n.cpuMiner, n.chain, n.config, n.accessTokens, n.eventDispatcher, n.notificationMgr) listenAddr := env.String("LISTEN", n.config.ApiAddress) env.Parse() @@ -278,15 +226,20 @@ func (n *Node) OnStart() error { n.miningEnable = false log.Error(err) } else { - //n.cpuMiner.Start() - n.miner.Start() + n.cpuMiner.Start() } } if !n.config.VaultMode { - n.syncManager.Start() + if err := n.syncManager.Start(); err != nil { + return err + } } - n.initAndstartApiServer() - n.notificationMgr.Start() + + n.initAndstartAPIServer() + if err := n.notificationMgr.Start(); err != nil { + return err + } + if !n.config.Web.Closed { _, port, err := net.SplitHostPort(n.config.ApiAddress) if err != nil { @@ -295,7 +248,6 @@ func (n *Node) OnStart() error { } launchWebBrowser(port) } - go bytomdRPCCheck() return nil } @@ -304,11 +256,12 @@ func (n *Node) OnStop() { n.notificationMgr.WaitForShutdown() n.BaseService.OnStop() if n.miningEnable { - n.miner.Stop() + n.cpuMiner.Stop() } if !n.config.VaultMode { n.syncManager.Stop() } + n.eventDispatcher.Stop() } func (n *Node) RunForever() { @@ -317,76 +270,3 @@ func (n *Node) RunForever() { n.Stop() }) } - -func (n *Node) SyncManager() *netsync.SyncManager { - return n.syncManager -} - -func (n *Node) MiningPool() *miningpool.MiningPool { - return n.miningPool -} - -/**bytomdRPCCheck Check if bytomd connection via RPC is correctly working*/ -func bytomdRPCCheck() bool { - type Req struct { - BlockHeight uint64 `json:"block_height"` - } - if util.ValidatePegin { - for { - resp, err := util.CallRPC("/get-merkle-proof", &Req{BlockHeight: 0}) - if err != nil { - log.Error("Call mainchain interface get-block-header failed") - time.Sleep(time.Millisecond * 1000) - continue - } - tmp, _ := json.Marshal(resp) - var blockHeader api.GetBlockHeaderResp - json.Unmarshal(tmp, &blockHeader) - hash := blockHeader.BlockHeader.Hash() - if strings.Compare(consensus.ActiveNetParams.ParentGenesisBlockHash, hash.String()) != 0 { - log.Error("Invalid parent genesis block hash response via RPC. Contacting wrong parent daemon?", consensus.ActiveNetParams.ParentGenesisBlockHash, ":", hash.String()) - return false - } - break - } - } - - return true -} - -func initConsensusConfig(config *cfg.Config) { - if config.ConsensusConfigFile == "" { - // poa - } else { - // - file, err := os.Open(config.ConsensusConfigFile) - if err != nil { - cmn.Exit(cmn.Fmt("Failed to read consensus file: %v", err)) - } - defer file.Close() - - if err := json.NewDecoder(file).Decode(config); err != nil { - cmn.Exit(cmn.Fmt("invalid consensus file: %v", err)) - } - - for _, v := range config.Consensus.Dpos.SelfVoteSigners { - address, err := common.DecodeAddress(v, &consensus.ActiveNetParams) - if err != nil { - cmn.Exit(cmn.Fmt("Address resolution failed: %v", err)) - } - config.Consensus.Dpos.Signers = append(config.Consensus.Dpos.Signers, address) - } - } -} - -func createConsensusEngine(config *cfg.Config, store protocol.Store) engine.Engine { - if config.Consensus.Dpos != nil { - return dpos.New(config.Consensus.Dpos, store) - } else { - return nil - } -} - -func GetConsensusEngine() engine.Engine { - return consensusEngine -}