X-Git-Url: http://git.osdn.net/view?a=blobdiff_plain;f=node%2Fnode.go;h=d82c1558c9ff1cc10d4cbd13dbe3e6f75eac687d;hb=f7f12ea310fc0a1e49f4ccbd3d0d79f1664db045;hp=61c51aeeb76b1e76c996677c513ef2ee7036fb28;hpb=08281341c2cb02ba11d4218576256688854790fc;p=bytom%2Fvapor.git diff --git a/node/node.go b/node/node.go index 61c51aee..d82c1558 100644 --- a/node/node.go +++ b/node/node.go @@ -1,21 +1,18 @@ package node import ( - "context" - "encoding/json" + "encoding/hex" "errors" "net" "net/http" _ "net/http/pprof" "os" "path/filepath" - "strings" - "time" + "reflect" "github.com/prometheus/prometheus/util/flock" log "github.com/sirupsen/logrus" cmn "github.com/tendermint/tmlibs/common" - dbm "github.com/tendermint/tmlibs/db" browser "github.com/toqueteos/webbrowser" "github.com/vapor/accesstoken" @@ -23,88 +20,89 @@ import ( "github.com/vapor/api" "github.com/vapor/asset" "github.com/vapor/blockchain/pseudohsm" - "github.com/vapor/blockchain/txfeed" cfg "github.com/vapor/config" "github.com/vapor/consensus" - "github.com/vapor/crypto/ed25519/chainkd" - "github.com/vapor/database/leveldb" + "github.com/vapor/database" + dbm "github.com/vapor/database/leveldb" "github.com/vapor/env" - "github.com/vapor/mining/cpuminer" - "github.com/vapor/mining/miningpool" + "github.com/vapor/event" "github.com/vapor/net/websocket" "github.com/vapor/netsync" + "github.com/vapor/proposal/blockproposer" "github.com/vapor/protocol" - "github.com/vapor/protocol/bc" - "github.com/vapor/util" + "github.com/vapor/protocol/bc/types" w "github.com/vapor/wallet" ) const ( - webHost = "http://127.0.0.1" - maxNewBlockChSize = 1024 + webHost = "http://127.0.0.1" + logModule = "node" ) +// Node represent bytom node type Node struct { cmn.BaseService - // config - config *cfg.Config + config *cfg.Config + eventDispatcher *event.Dispatcher + syncManager *netsync.SyncManager - syncManager *netsync.SyncManager - - //bcReactor *bc.BlockchainReactor wallet *w.Wallet accessTokens *accesstoken.CredentialStore notificationMgr *websocket.WSNotificationManager api *api.API chain *protocol.Chain - txfeed *txfeed.Tracker - cpuMiner *cpuminer.CPUMiner - miningPool *miningpool.MiningPool + cpuMiner *blockproposer.BlockProposer miningEnable bool - - newBlockCh chan *bc.Hash } +// NewNode create bytom node func NewNode(config *cfg.Config) *Node { - ctx := context.Background() if err := lockDataDirectory(config); err != nil { cmn.Exit("Error: " + err.Error()) } + + if err := cfg.LoadFederationFile(config.FederationFile(), config); err != nil { + cmn.Exit(cmn.Fmt("Failed to load federated information:[%s]", err.Error())) + } + + log.WithFields(log.Fields{ + "module": logModule, + "pubkey": config.PrivateKey().XPub(), + "fed_xpubs": config.Federation.Xpubs, + "fed_quorum": config.Federation.Quorum, + "fed_controlprogram": hex.EncodeToString(cfg.FederationWScript(config)), + }).Info() + initLogFile(config) initActiveNetParams(config) initCommonConfig(config) - util.MainchainConfig = config.MainChain - util.ValidatePegin = config.ValidatePegin + // Get store if config.DBBackend != "memdb" && config.DBBackend != "leveldb" { cmn.Exit(cmn.Fmt("Param db_backend [%v] is invalid, use leveldb or memdb", config.DBBackend)) } coreDB := dbm.NewDB("core", config.DBBackend, config.DBDir()) - store := leveldb.NewStore(coreDB) + store := database.NewStore(coreDB) tokenDB := dbm.NewDB("accesstoken", config.DBBackend, config.DBDir()) accessTokens := accesstoken.NewStore(tokenDB) - txPool := protocol.NewTxPool(store) - chain, err := protocol.NewChain(store, txPool) + dispatcher := event.NewDispatcher() + txPool := protocol.NewTxPool(store, dispatcher) + chain, err := protocol.NewChain(store, txPool, dispatcher) if err != nil { cmn.Exit(cmn.Fmt("Failed to create chain structure: %v", err)) } - var accounts *account.Manager = nil - var assets *asset.Registry = nil - var wallet *w.Wallet = nil - var txFeed *txfeed.Tracker = nil - - txFeedDB := dbm.NewDB("txfeeds", config.DBBackend, config.DBDir()) - txFeed = txfeed.NewTracker(txFeedDB, chain) - - if err = txFeed.Prepare(ctx); err != nil { - log.WithField("error", err).Error("start txfeed") - return nil + if err := checkConfig(chain, config); err != nil { + panic(err) } + var accounts *account.Manager + var assets *asset.Registry + var wallet *w.Wallet + hsm, err := pseudohsm.New(config.KeysDir()) if err != nil { cmn.Exit(cmn.Fmt("initialize HSM failed: %v", err)) @@ -114,9 +112,9 @@ func NewNode(config *cfg.Config) *Node { walletDB := dbm.NewDB("wallet", config.DBBackend, config.DBDir()) accounts = account.NewManager(walletDB, chain) assets = asset.NewRegistry(walletDB, chain) - wallet, err = w.NewWallet(walletDB, accounts, assets, hsm, chain) + wallet, err = w.NewWallet(walletDB, accounts, assets, hsm, chain, dispatcher, config.Wallet.TxIndex) if err != nil { - log.WithField("error", err).Error("init NewWallet") + log.WithFields(log.Fields{"module": logModule, "error": err}).Error("init NewWallet") } // trigger rescan wallet @@ -124,14 +122,13 @@ func NewNode(config *cfg.Config) *Node { wallet.RescanBlocks() } } - newBlockCh := make(chan *bc.Hash, maxNewBlockChSize) - - syncManager, _ := netsync.NewSyncManager(config, chain, txPool, newBlockCh) - notificationMgr := websocket.NewWsNotificationManager(config.Websocket.MaxNumWebsockets, config.Websocket.MaxNumConcurrentReqs, chain) + syncManager, err := netsync.NewSyncManager(config, chain, txPool, dispatcher) + if err != nil { + cmn.Exit(cmn.Fmt("Failed to create sync manager: %v", err)) + } - // get transaction from txPool and send it to syncManager and wallet - go newPoolTxListener(txPool, syncManager, wallet, notificationMgr) + notificationMgr := websocket.NewWsNotificationManager(config.Websocket.MaxNumWebsockets, config.Websocket.MaxNumConcurrentReqs, chain, dispatcher) // run the profile server profileHost := config.ProfListenAddress @@ -146,48 +143,36 @@ func NewNode(config *cfg.Config) *Node { } node := &Node{ - config: config, - syncManager: syncManager, - accessTokens: accessTokens, - wallet: wallet, - chain: chain, - txfeed: txFeed, - miningEnable: config.Mining, - - newBlockCh: newBlockCh, + eventDispatcher: dispatcher, + config: config, + syncManager: syncManager, + accessTokens: accessTokens, + wallet: wallet, + chain: chain, + miningEnable: config.Mining, + notificationMgr: notificationMgr, } - node.cpuMiner = cpuminer.NewCPUMiner(chain, accounts, txPool, newBlockCh) - node.miningPool = miningpool.NewMiningPool(chain, accounts, txPool, newBlockCh) - + node.cpuMiner = blockproposer.NewBlockProposer(chain, accounts, txPool, dispatcher) node.BaseService = *cmn.NewBaseService(nil, "Node", node) - return node } -// newPoolTxListener listener transaction from txPool, and send it to syncManager and wallet -func newPoolTxListener(txPool *protocol.TxPool, syncManager *netsync.SyncManager, wallet *w.Wallet, notificationMgr *websocket.WSNotificationManager) { - txMsgCh := txPool.GetMsgCh() - syncManagerTxCh := syncManager.GetNewTxCh() - - for { - msg := <-txMsgCh - switch msg.MsgType { - case protocol.MsgNewTx: - syncManagerTxCh <- msg.Tx - if wallet != nil { - wallet.AddUnconfirmedTx(msg.TxDesc) - } - notificationMgr.NotifyMempoolTx(msg.Tx) - case protocol.MsgRemoveTx: - if wallet != nil { - wallet.RemoveUnconfirmedTx(msg.TxDesc) - } - default: - log.Warn("got unknow message type from the txPool channel") +// find whether config xpubs equal genesis block xpubs +func checkConfig(chain *protocol.Chain, config *cfg.Config) error { + fedpegScript := cfg.FederationWScript(config) + genesisBlock, err := chain.GetBlockByHeight(0) + if err != nil { + return err + } + typedInput := genesisBlock.Transactions[0].Inputs[0].TypedInput + if v, ok := typedInput.(*types.CoinbaseInput); ok { + if !reflect.DeepEqual(fedpegScript, v.Arbitrary) { + return errors.New("config xpubs don't equal genesis block xpubs.") } } + return nil } // Lock data directory after daemonization @@ -205,29 +190,6 @@ func initActiveNetParams(config *cfg.Config) { if !exist { cmn.Exit(cmn.Fmt("chain_id[%v] don't exist", config.ChainID)) } - var federationRedeemXPubs []chainkd.XPub - if fedpegXPubs := strings.Split(config.Side.FedpegXPubs, ","); len(fedpegXPubs) > 0 { - for _, xpubStr := range fedpegXPubs { - var xpub chainkd.XPub - xpub.UnmarshalText([]byte(xpubStr)) - federationRedeemXPubs = append(federationRedeemXPubs, xpub) - } - } - - var signBlockXPubs []chainkd.XPub - if xPubs := strings.Split(config.Side.SignBlockXPubs, ","); len(xPubs) > 0 { - for _, xpubStr := range xPubs { - var xpub chainkd.XPub - xpub.UnmarshalText([]byte(xpubStr)) - signBlockXPubs = append(signBlockXPubs, xpub) - } - } - - consensus.ActiveNetParams.Signer = config.Signer - consensus.ActiveNetParams.FedpegXPubs = federationRedeemXPubs - consensus.ActiveNetParams.SignBlockXPubs = signBlockXPubs - consensus.ActiveNetParams.PeginMinDepth = config.Side.PeginMinDepth - consensus.ActiveNetParams.ParentGenesisBlockHash = config.Side.ParentGenesisBlockHash } func initLogFile(config *cfg.Config) { @@ -239,7 +201,7 @@ func initLogFile(config *cfg.Config) { if err == nil { log.SetOutput(file) } else { - log.WithField("err", err).Info("using default") + log.WithFields(log.Fields{"module": logModule, "err": err}).Info("using default") } } @@ -258,8 +220,8 @@ func launchWebBrowser(port string) { } } -func (n *Node) initAndstartApiServer() { - n.api = api.NewAPI(n.syncManager, n.wallet, n.txfeed, n.cpuMiner, n.miningPool, n.chain, n.config, n.accessTokens, n.newBlockCh, n.notificationMgr) +func (n *Node) initAndstartAPIServer() { + n.api = api.NewAPI(n.syncManager, n.wallet, n.cpuMiner, n.chain, n.config, n.accessTokens, n.eventDispatcher, n.notificationMgr) listenAddr := env.String("LISTEN", n.config.ApiAddress) env.Parse() @@ -276,10 +238,16 @@ func (n *Node) OnStart() error { } } if !n.config.VaultMode { - n.syncManager.Start() + if err := n.syncManager.Start(); err != nil { + return err + } + } + + n.initAndstartAPIServer() + if err := n.notificationMgr.Start(); err != nil { + return err } - n.initAndstartApiServer() - n.notificationMgr.Start() + if !n.config.Web.Closed { _, port, err := net.SplitHostPort(n.config.ApiAddress) if err != nil { @@ -288,7 +256,6 @@ func (n *Node) OnStart() error { } launchWebBrowser(port) } - go bytomdRPCCheck() return nil } @@ -302,6 +269,7 @@ func (n *Node) OnStop() { if !n.config.VaultMode { n.syncManager.Stop() } + n.eventDispatcher.Stop() } func (n *Node) RunForever() { @@ -310,39 +278,3 @@ func (n *Node) RunForever() { n.Stop() }) } - -func (n *Node) SyncManager() *netsync.SyncManager { - return n.syncManager -} - -func (n *Node) MiningPool() *miningpool.MiningPool { - return n.miningPool -} - -/**bytomdRPCCheck Check if bytomd connection via RPC is correctly working*/ -func bytomdRPCCheck() bool { - type Req struct { - BlockHeight uint64 `json:"block_height"` - } - if util.ValidatePegin { - for { - resp, err := util.CallRPC("/get-block-header", &Req{BlockHeight: 0}) - if err != nil { - log.Error("Call mainchain interface get-block-header failed") - time.Sleep(time.Millisecond * 1000) - continue - } - tmp, _ := json.Marshal(resp) - var blockHeader api.GetBlockHeaderResp - json.Unmarshal(tmp, &blockHeader) - hash := blockHeader.BlockHeader.Hash() - if strings.Compare(consensus.ActiveNetParams.ParentGenesisBlockHash, hash.String()) != 0 { - log.Error("Invalid parent genesis block hash response via RPC. Contacting wrong parent daemon?") - return false - } - break - } - } - - return true -}