X-Git-Url: http://git.osdn.net/view?p=bytom%2Fvapor.git;a=blobdiff_plain;f=node%2Fnode.go;h=92a45932464480c291b487854d402c1fbdc9e43d;hp=61176c2257dd3f4f152b5c53d5facb34ae3215aa;hb=2cf5801b2e693a45de9b51ec9aa9c1f787d57105;hpb=369882c62ed2ccfe13f9d0ff918076a52ed7b91b diff --git a/node/node.go b/node/node.go index 61176c22..92a45932 100644 --- a/node/node.go +++ b/node/node.go @@ -2,20 +2,16 @@ package node import ( "context" - "encoding/json" "errors" "net" "net/http" _ "net/http/pprof" "os" "path/filepath" - "strings" - "time" "github.com/prometheus/prometheus/util/flock" log "github.com/sirupsen/logrus" cmn "github.com/tendermint/tmlibs/common" - dbm "github.com/tendermint/tmlibs/db" browser "github.com/toqueteos/webbrowser" "github.com/vapor/accesstoken" @@ -24,53 +20,44 @@ import ( "github.com/vapor/asset" "github.com/vapor/blockchain/pseudohsm" "github.com/vapor/blockchain/txfeed" - "github.com/vapor/common" cfg "github.com/vapor/config" "github.com/vapor/consensus" - engine "github.com/vapor/consensus/consensus" - dpos "github.com/vapor/consensus/consensus/dpos" - "github.com/vapor/crypto/ed25519/chainkd" - "github.com/vapor/database/leveldb" + "github.com/vapor/database" + dbm "github.com/vapor/database/leveldb" "github.com/vapor/env" - "github.com/vapor/mining/miner" + "github.com/vapor/event" + "github.com/vapor/mining/cpuminer" "github.com/vapor/net/websocket" "github.com/vapor/netsync" + "github.com/vapor/p2p" "github.com/vapor/protocol" - "github.com/vapor/protocol/bc" - "github.com/vapor/util" w "github.com/vapor/wallet" ) const ( - webHost = "http://127.0.0.1" - maxNewBlockChSize = 1024 + webHost = "http://127.0.0.1" + logModule = "node" ) +// Node represent bytom node type Node struct { cmn.BaseService - // config - config *cfg.Config + config *cfg.Config + eventDispatcher *event.Dispatcher + syncManager *netsync.SyncManager - syncManager *netsync.SyncManager - - //bcReactor *bc.BlockchainReactor wallet *w.Wallet accessTokens *accesstoken.CredentialStore notificationMgr *websocket.WSNotificationManager api *api.API chain *protocol.Chain txfeed *txfeed.Tracker - //cpuMiner *cpuminer.CPUMiner - miner *miner.Miner - - miningEnable bool - - newBlockCh chan *bc.Hash - - engine engine.Engine + cpuMiner *cpuminer.CPUMiner + miningEnable bool } +// NewNode create bytom node func NewNode(config *cfg.Config) *Node { ctx := context.Background() if err := lockDataDirectory(config); err != nil { @@ -78,43 +65,35 @@ func NewNode(config *cfg.Config) *Node { } initLogFile(config) initActiveNetParams(config) - initConsensusConfig(config) initCommonConfig(config) - util.MainchainConfig = config.MainChain - util.ValidatePegin = config.ValidatePegin // Get store if config.DBBackend != "memdb" && config.DBBackend != "leveldb" { cmn.Exit(cmn.Fmt("Param db_backend [%v] is invalid, use leveldb or memdb", config.DBBackend)) } coreDB := dbm.NewDB("core", config.DBBackend, config.DBDir()) - store := leveldb.NewStore(coreDB) + store := database.NewStore(coreDB) tokenDB := dbm.NewDB("accesstoken", config.DBBackend, config.DBDir()) accessTokens := accesstoken.NewStore(tokenDB) - var engine engine.Engine - switch config.Consensus.Type { - case "dpos": - engine = dpos.GDpos - } - - txPool := protocol.NewTxPool(store) - chain, err := protocol.NewChain(store, txPool, engine) + dispatcher := event.NewDispatcher() + txPool := protocol.NewTxPool(store, dispatcher) + chain, err := protocol.NewChain(store, txPool) if err != nil { cmn.Exit(cmn.Fmt("Failed to create chain structure: %v", err)) } - var accounts *account.Manager = nil - var assets *asset.Registry = nil - var wallet *w.Wallet = nil - var txFeed *txfeed.Tracker = nil + var accounts *account.Manager + var assets *asset.Registry + var wallet *w.Wallet + var txFeed *txfeed.Tracker txFeedDB := dbm.NewDB("txfeeds", config.DBBackend, config.DBDir()) txFeed = txfeed.NewTracker(txFeedDB, chain) if err = txFeed.Prepare(ctx); err != nil { - log.WithField("error", err).Error("start txfeed") + log.WithFields(log.Fields{"module": logModule, "error": err}).Error("start txfeed") return nil } @@ -124,16 +103,12 @@ func NewNode(config *cfg.Config) *Node { } if !config.Wallet.Disable { - address, err := common.DecodeAddress(config.Consensus.Coinbase, &consensus.ActiveNetParams) - if err != nil { - cmn.Exit(cmn.Fmt("DecodeAddress: %v", err)) - } walletDB := dbm.NewDB("wallet", config.DBBackend, config.DBDir()) accounts = account.NewManager(walletDB, chain) assets = asset.NewRegistry(walletDB, chain) - wallet, err = w.NewWallet(walletDB, accounts, assets, hsm, chain, address) + wallet, err = w.NewWallet(walletDB, accounts, assets, hsm, chain, dispatcher, config.Wallet.TxIndex) if err != nil { - log.WithField("error", err).Error("init NewWallet") + log.WithFields(log.Fields{"module": logModule, "error": err}).Error("init NewWallet") } // trigger rescan wallet @@ -141,14 +116,13 @@ func NewNode(config *cfg.Config) *Node { wallet.RescanBlocks() } } - newBlockCh := make(chan *bc.Hash, maxNewBlockChSize) - - syncManager, _ := netsync.NewSyncManager(config, chain, txPool, newBlockCh) - notificationMgr := websocket.NewWsNotificationManager(config.Websocket.MaxNumWebsockets, config.Websocket.MaxNumConcurrentReqs, chain) + syncManager, err := netsync.NewSyncManager(config, chain, txPool, dispatcher) + if err != nil { + cmn.Exit(cmn.Fmt("Failed to create sync manager: %v", err)) + } - // get transaction from txPool and send it to syncManager and wallet - go newPoolTxListener(txPool, syncManager, wallet, notificationMgr) + notificationMgr := websocket.NewWsNotificationManager(config.Websocket.MaxNumWebsockets, config.Websocket.MaxNumConcurrentReqs, chain, dispatcher) // run the profile server profileHost := config.ProfListenAddress @@ -162,55 +136,24 @@ func NewNode(config *cfg.Config) *Node { }() } - switch config.Consensus.Type { - case "dpos": - initDpos(chain, config) - } - node := &Node{ - config: config, - syncManager: syncManager, - accessTokens: accessTokens, - wallet: wallet, - chain: chain, - txfeed: txFeed, - miningEnable: config.Mining, - - newBlockCh: newBlockCh, + eventDispatcher: dispatcher, + config: config, + syncManager: syncManager, + accessTokens: accessTokens, + wallet: wallet, + chain: chain, + txfeed: txFeed, + miningEnable: config.Mining, + notificationMgr: notificationMgr, - engine: engine, } - node.miner = miner.NewMiner(chain, accounts, txPool, newBlockCh, engine) + node.cpuMiner = cpuminer.NewCPUMiner(chain, accounts, txPool, dispatcher) node.BaseService = *cmn.NewBaseService(nil, "Node", node) - return node } -// newPoolTxListener listener transaction from txPool, and send it to syncManager and wallet -func newPoolTxListener(txPool *protocol.TxPool, syncManager *netsync.SyncManager, wallet *w.Wallet, notificationMgr *websocket.WSNotificationManager) { - txMsgCh := txPool.GetMsgCh() - syncManagerTxCh := syncManager.GetNewTxCh() - - for { - msg := <-txMsgCh - switch msg.MsgType { - case protocol.MsgNewTx: - syncManagerTxCh <- msg.Tx - if wallet != nil { - wallet.AddUnconfirmedTx(msg.TxDesc) - } - notificationMgr.NotifyMempoolTx(msg.Tx) - case protocol.MsgRemoveTx: - if wallet != nil { - wallet.RemoveUnconfirmedTx(msg.TxDesc) - } - default: - log.Warn("got unknow message type from the txPool channel") - } - } -} - // Lock data directory after daemonization func lockDataDirectory(config *cfg.Config) error { _, _, err := flock.New(filepath.Join(config.RootDir, "LOCK")) @@ -226,20 +169,6 @@ func initActiveNetParams(config *cfg.Config) { if !exist { cmn.Exit(cmn.Fmt("chain_id[%v] don't exist", config.ChainID)) } - if config.Side.FedpegXPubs != "" { - var federationRedeemXPubs []chainkd.XPub - fedpegXPubs := strings.Split(config.Side.FedpegXPubs, ",") - for _, xpubStr := range fedpegXPubs { - var xpub chainkd.XPub - xpub.UnmarshalText([]byte(xpubStr)) - federationRedeemXPubs = append(federationRedeemXPubs, xpub) - } - consensus.ActiveNetParams.FedpegXPubs = federationRedeemXPubs - } - - consensus.ActiveNetParams.Signer = config.Signer - consensus.ActiveNetParams.PeginMinDepth = config.Side.PeginMinDepth - consensus.ActiveNetParams.ParentGenesisBlockHash = config.Side.ParentGenesisBlockHash } func initLogFile(config *cfg.Config) { @@ -251,7 +180,7 @@ func initLogFile(config *cfg.Config) { if err == nil { log.SetOutput(file) } else { - log.WithField("err", err).Info("using default") + log.WithFields(log.Fields{"module": logModule, "err": err}).Info("using default") } } @@ -270,8 +199,8 @@ func launchWebBrowser(port string) { } } -func (n *Node) initAndstartApiServer() { - n.api = api.NewAPI(n.syncManager, n.wallet, n.txfeed, n.miner, n.chain, n.config, n.accessTokens, n.newBlockCh, n.notificationMgr) +func (n *Node) initAndstartAPIServer() { + n.api = api.NewAPI(n.syncManager, n.wallet, n.txfeed, n.cpuMiner, n.chain, n.config, n.accessTokens, n.eventDispatcher, n.notificationMgr) listenAddr := env.String("LISTEN", n.config.ApiAddress) env.Parse() @@ -284,15 +213,20 @@ func (n *Node) OnStart() error { n.miningEnable = false log.Error(err) } else { - //n.cpuMiner.Start() - n.miner.Start() + n.cpuMiner.Start() } } if !n.config.VaultMode { - n.syncManager.Start() + if err := n.syncManager.Start(); err != nil { + return err + } + } + + n.initAndstartAPIServer() + if err := n.notificationMgr.Start(); err != nil { + return err } - n.initAndstartApiServer() - n.notificationMgr.Start() + if !n.config.Web.Closed { _, port, err := net.SplitHostPort(n.config.ApiAddress) if err != nil { @@ -301,24 +235,20 @@ func (n *Node) OnStart() error { } launchWebBrowser(port) } - go bytomdRPCCheck() return nil } func (n *Node) OnStop() { - if err := n.engine.Finish(); err != nil { - log.Errorf("OnStop: %v", err) - } - n.notificationMgr.Shutdown() n.notificationMgr.WaitForShutdown() n.BaseService.OnStop() if n.miningEnable { - n.miner.Stop() + n.cpuMiner.Stop() } if !n.config.VaultMode { n.syncManager.Stop() } + n.eventDispatcher.Stop() } func (n *Node) RunForever() { @@ -328,78 +258,6 @@ func (n *Node) RunForever() { }) } -func (n *Node) SyncManager() *netsync.SyncManager { - return n.syncManager -} - -/**bytomdRPCCheck Check if bytomd connection via RPC is correctly working*/ -func bytomdRPCCheck() bool { - type Req struct { - BlockHeight uint64 `json:"block_height"` - } - if util.ValidatePegin { - for { - resp, err := util.CallRPC("/get-merkle-proof", &Req{BlockHeight: 0}) - if err != nil { - log.Error("Call mainchain interface get-block-header failed") - time.Sleep(time.Millisecond * 1000) - continue - } - tmp, _ := json.Marshal(resp) - var blockHeader api.GetBlockHeaderResp - json.Unmarshal(tmp, &blockHeader) - hash := blockHeader.BlockHeader.Hash() - if strings.Compare(consensus.ActiveNetParams.ParentGenesisBlockHash, hash.String()) != 0 { - log.Error("Invalid parent genesis block hash response via RPC. Contacting wrong parent daemon?", consensus.ActiveNetParams.ParentGenesisBlockHash, ":", hash.String()) - return false - } - break - } - } - - return true -} - -func initConsensusConfig(config *cfg.Config) { - if config.ConsensusConfigFile == "" { - // poa - } else { - // - file, err := os.Open(config.ConsensusConfigFile) - if err != nil { - cmn.Exit(cmn.Fmt("Failed to read consensus file: %v", err)) - } - defer file.Close() - - if err := json.NewDecoder(file).Decode(config); err != nil { - cmn.Exit(cmn.Fmt("invalid consensus file: %v", err)) - } - - for _, v := range config.Consensus.SelfVoteSigners { - address, err := common.DecodeAddress(v, &consensus.ActiveNetParams) - if err != nil { - cmn.Exit(cmn.Fmt("Address resolution failed: %v", err)) - } - config.Consensus.Signers = append(config.Consensus.Signers, address) - } - } -} - -func initDpos(chain *protocol.Chain, config *cfg.Config) { - header := chain.BestBlockHeader() - height := header.Height - hash := header.Hash() - maxSignerCount := config.Consensus.MaxSignerCount - period := config.Consensus.Period - if err := dpos.GDpos.Init(chain, maxSignerCount, period, height, hash); err != nil { - cmn.Exit(cmn.Fmt("initVote: Dpos new: %v", err)) - } - - if height > 0 { - oldBlockHeight := dpos.GDpos.GetOldBlockHeight() - oldBlockHash := dpos.GDpos.GetOldBlockHash() - if err := chain.RepairDPoSData(oldBlockHeight, oldBlockHash); err != nil { - cmn.Exit(cmn.Fmt("initVote failed: %v", err)) - } - } +func (n *Node) NodeInfo() *p2p.NodeInfo { + return n.syncManager.NodeInfo() }