X-Git-Url: http://git.osdn.net/view?a=blobdiff_plain;f=node%2Fnode.go;h=c160f17c930e8244367ae5ff2fdeccfbd25c42b3;hb=9642338b3b27c3a8f748e7a746bef04821d72fe6;hp=c30ce928d27ecdf573501c5f82ae76b76cc2051e;hpb=6375289bdf8d263dbfa00833ef982714c9c8c87f;p=bytom%2Fvapor.git diff --git a/node/node.go b/node/node.go index c30ce928..c160f17c 100644 --- a/node/node.go +++ b/node/node.go @@ -1,21 +1,17 @@ package node import ( - "context" - "encoding/json" + "encoding/hex" "errors" "net" "net/http" _ "net/http/pprof" - "os" "path/filepath" - "strings" - "time" + "reflect" "github.com/prometheus/prometheus/util/flock" log "github.com/sirupsen/logrus" cmn "github.com/tendermint/tmlibs/common" - dbm "github.com/tendermint/tmlibs/db" browser "github.com/toqueteos/webbrowser" "github.com/vapor/accesstoken" @@ -23,95 +19,95 @@ import ( "github.com/vapor/api" "github.com/vapor/asset" "github.com/vapor/blockchain/pseudohsm" - "github.com/vapor/blockchain/txfeed" - "github.com/vapor/common" cfg "github.com/vapor/config" "github.com/vapor/consensus" - "github.com/vapor/crypto/ed25519/chainkd" - "github.com/vapor/database/leveldb" + "github.com/vapor/database" + dbm "github.com/vapor/database/leveldb" "github.com/vapor/env" - "github.com/vapor/mining/miner" + "github.com/vapor/event" + vaporLog "github.com/vapor/log" "github.com/vapor/net/websocket" "github.com/vapor/netsync" + "github.com/vapor/proposal/blockproposer" "github.com/vapor/protocol" - "github.com/vapor/protocol/bc" - "github.com/vapor/util" + "github.com/vapor/protocol/bc/types" w "github.com/vapor/wallet" ) const ( - webHost = "http://127.0.0.1" - maxNewBlockChSize = 1024 + webHost = "http://127.0.0.1" + logModule = "node" ) +// Node represent bytom node type Node struct { cmn.BaseService - // config - config *cfg.Config + config *cfg.Config + eventDispatcher *event.Dispatcher + syncManager *netsync.SyncManager - syncManager *netsync.SyncManager - - //bcReactor *bc.BlockchainReactor wallet *w.Wallet accessTokens *accesstoken.CredentialStore notificationMgr *websocket.WSNotificationManager api *api.API chain *protocol.Chain - txfeed *txfeed.Tracker - //cpuMiner *cpuminer.CPUMiner - miner *miner.Miner - - miningEnable bool - - newBlockCh chan *bc.Hash + cpuMiner *blockproposer.BlockProposer + miningEnable bool } +// NewNode create bytom node func NewNode(config *cfg.Config) *Node { - ctx := context.Background() if err := lockDataDirectory(config); err != nil { cmn.Exit("Error: " + err.Error()) } - initLogFile(config) - initActiveNetParams(config) - initConsensusConfig(config) + + if err := cfg.LoadFederationFile(config.FederationFile(), config); err != nil { + cmn.Exit(cmn.Fmt("Failed to load federated information:[%s]", err.Error())) + } + + if err:=vaporLog.InitLogFile(config);err!=nil{ + log.WithField("err",err).Fatalln("InitLogFile failed") + } + + log.WithFields(log.Fields{ + "module": logModule, + "pubkey": config.PrivateKey().XPub(), + "fed_xpubs": config.Federation.Xpubs, + "fed_quorum": config.Federation.Quorum, + "fed_controlprogram": hex.EncodeToString(cfg.FederationWScript(config)), + }).Info() + + if err := consensus.InitActiveNetParams(config.ChainID); err != nil { + log.Fatalf("Failed to init ActiveNetParams:[%s]", err.Error()) + } + initCommonConfig(config) - util.MainchainConfig = config.MainChain - util.ValidatePegin = config.ValidatePegin // Get store if config.DBBackend != "memdb" && config.DBBackend != "leveldb" { cmn.Exit(cmn.Fmt("Param db_backend [%v] is invalid, use leveldb or memdb", config.DBBackend)) } coreDB := dbm.NewDB("core", config.DBBackend, config.DBDir()) - store := leveldb.NewStore(coreDB) + store := database.NewStore(coreDB) tokenDB := dbm.NewDB("accesstoken", config.DBBackend, config.DBDir()) accessTokens := accesstoken.NewStore(tokenDB) - txPool := protocol.NewTxPool(store) - chain, err := protocol.NewChain(store, txPool) + dispatcher := event.NewDispatcher() + txPool := protocol.NewTxPool(store, dispatcher) + chain, err := protocol.NewChain(store, txPool, dispatcher) if err != nil { cmn.Exit(cmn.Fmt("Failed to create chain structure: %v", err)) } - switch config.Consensus.Type { - case "dpos": - initDpos(chain, config) + if err := checkConfig(chain, config); err != nil { + panic(err) } - var accounts *account.Manager = nil - var assets *asset.Registry = nil - var wallet *w.Wallet = nil - var txFeed *txfeed.Tracker = nil - - txFeedDB := dbm.NewDB("txfeeds", config.DBBackend, config.DBDir()) - txFeed = txfeed.NewTracker(txFeedDB, chain) - - if err = txFeed.Prepare(ctx); err != nil { - log.WithField("error", err).Error("start txfeed") - return nil - } + var accounts *account.Manager + var assets *asset.Registry + var wallet *w.Wallet hsm, err := pseudohsm.New(config.KeysDir()) if err != nil { @@ -119,16 +115,14 @@ func NewNode(config *cfg.Config) *Node { } if !config.Wallet.Disable { - address, err := common.DecodeAddress(config.Consensus.Coinbase, &consensus.ActiveNetParams) - if err != nil { - cmn.Exit(cmn.Fmt("DecodeAddress: %v", err)) - } walletDB := dbm.NewDB("wallet", config.DBBackend, config.DBDir()) - accounts = account.NewManager(walletDB, chain) + walletStore := database.NewWalletStore(walletDB) + accountStore := database.NewAccountStore(walletDB) + accounts = account.NewManager(accountStore, chain) assets = asset.NewRegistry(walletDB, chain) - wallet, err = w.NewWallet(walletDB, accounts, assets, hsm, chain, address) + wallet, err = w.NewWallet(walletStore, accounts, assets, hsm, chain, dispatcher, config.Wallet.TxIndex) if err != nil { - log.WithField("error", err).Error("init NewWallet") + log.WithFields(log.Fields{"module": logModule, "error": err}).Error("init NewWallet") } // trigger rescan wallet @@ -136,19 +130,18 @@ func NewNode(config *cfg.Config) *Node { wallet.RescanBlocks() } } - newBlockCh := make(chan *bc.Hash, maxNewBlockChSize) - - syncManager, _ := netsync.NewSyncManager(config, chain, txPool, newBlockCh) - - notificationMgr := websocket.NewWsNotificationManager(config.Websocket.MaxNumWebsockets, config.Websocket.MaxNumConcurrentReqs, chain) + fastSyncDB := dbm.NewDB("fastsync", config.DBBackend, config.DBDir()) + syncManager, err := netsync.NewSyncManager(config, chain, txPool, dispatcher, fastSyncDB) + if err != nil { + cmn.Exit(cmn.Fmt("Failed to create sync manager: %v", err)) + } - // get transaction from txPool and send it to syncManager and wallet - go newPoolTxListener(txPool, syncManager, wallet, notificationMgr) + notificationMgr := websocket.NewWsNotificationManager(config.Websocket.MaxNumWebsockets, config.Websocket.MaxNumConcurrentReqs, chain, dispatcher) // run the profile server profileHost := config.ProfListenAddress if profileHost != "" { - // Profiling bytomd programs.see (https://blog.golang.org/profiling-go-programs) + // Profiling vapord programs.see (https://blog.golang.org/profiling-go-programs) // go tool pprof http://profileHose/debug/pprof/heap go func() { if err = http.ListenAndServe(profileHost, nil); err != nil { @@ -156,47 +149,38 @@ func NewNode(config *cfg.Config) *Node { } }() } + node := &Node{ - config: config, - syncManager: syncManager, - accessTokens: accessTokens, - wallet: wallet, - chain: chain, - txfeed: txFeed, - miningEnable: config.Mining, - - newBlockCh: newBlockCh, + eventDispatcher: dispatcher, + config: config, + syncManager: syncManager, + accessTokens: accessTokens, + wallet: wallet, + chain: chain, + miningEnable: config.Mining, + notificationMgr: notificationMgr, } - node.miner = miner.NewMiner(chain, accounts, txPool, newBlockCh) + node.cpuMiner = blockproposer.NewBlockProposer(chain, accounts, txPool, dispatcher) node.BaseService = *cmn.NewBaseService(nil, "Node", node) - return node } -// newPoolTxListener listener transaction from txPool, and send it to syncManager and wallet -func newPoolTxListener(txPool *protocol.TxPool, syncManager *netsync.SyncManager, wallet *w.Wallet, notificationMgr *websocket.WSNotificationManager) { - txMsgCh := txPool.GetMsgCh() - syncManagerTxCh := syncManager.GetNewTxCh() - - for { - msg := <-txMsgCh - switch msg.MsgType { - case protocol.MsgNewTx: - syncManagerTxCh <- msg.Tx - if wallet != nil { - wallet.AddUnconfirmedTx(msg.TxDesc) - } - notificationMgr.NotifyMempoolTx(msg.Tx) - case protocol.MsgRemoveTx: - if wallet != nil { - wallet.RemoveUnconfirmedTx(msg.TxDesc) - } - default: - log.Warn("got unknow message type from the txPool channel") +// find whether config xpubs equal genesis block xpubs +func checkConfig(chain *protocol.Chain, config *cfg.Config) error { + fedpegScript := cfg.FederationWScript(config) + genesisBlock, err := chain.GetBlockByHeight(0) + if err != nil { + return err + } + typedInput := genesisBlock.Transactions[0].Inputs[0].TypedInput + if v, ok := typedInput.(*types.CoinbaseInput); ok { + if !reflect.DeepEqual(fedpegScript, v.Arbitrary) { + return errors.New("config xpubs don't equal genesis block xpubs.") } } + return nil } // Lock data directory after daemonization @@ -208,42 +192,6 @@ func lockDataDirectory(config *cfg.Config) error { return nil } -func initActiveNetParams(config *cfg.Config) { - var exist bool - consensus.ActiveNetParams, exist = consensus.NetParams[config.ChainID] - if !exist { - cmn.Exit(cmn.Fmt("chain_id[%v] don't exist", config.ChainID)) - } - if config.Side.FedpegXPubs != "" { - var federationRedeemXPubs []chainkd.XPub - fedpegXPubs := strings.Split(config.Side.FedpegXPubs, ",") - for _, xpubStr := range fedpegXPubs { - var xpub chainkd.XPub - xpub.UnmarshalText([]byte(xpubStr)) - federationRedeemXPubs = append(federationRedeemXPubs, xpub) - } - consensus.ActiveNetParams.FedpegXPubs = federationRedeemXPubs - } - - consensus.ActiveNetParams.Signer = config.Signer - consensus.ActiveNetParams.PeginMinDepth = config.Side.PeginMinDepth - consensus.ActiveNetParams.ParentGenesisBlockHash = config.Side.ParentGenesisBlockHash -} - -func initLogFile(config *cfg.Config) { - if config.LogFile == "" { - return - } - cmn.EnsureDir(filepath.Dir(config.LogFile), 0700) - file, err := os.OpenFile(config.LogFile, os.O_CREATE|os.O_WRONLY|os.O_APPEND, 0666) - if err == nil { - log.SetOutput(file) - } else { - log.WithField("err", err).Info("using default") - } - -} - func initCommonConfig(config *cfg.Config) { cfg.CommonConfig = config } @@ -258,8 +206,8 @@ func launchWebBrowser(port string) { } } -func (n *Node) initAndstartApiServer() { - n.api = api.NewAPI(n.syncManager, n.wallet, n.txfeed, n.miner, n.chain, n.config, n.accessTokens, n.newBlockCh, n.notificationMgr) +func (n *Node) initAndstartAPIServer() { + n.api = api.NewAPI(n.syncManager, n.wallet, n.cpuMiner, n.chain, n.config, n.accessTokens, n.eventDispatcher, n.notificationMgr) listenAddr := env.String("LISTEN", n.config.ApiAddress) env.Parse() @@ -272,15 +220,20 @@ func (n *Node) OnStart() error { n.miningEnable = false log.Error(err) } else { - //n.cpuMiner.Start() - n.miner.Start() + n.cpuMiner.Start() } } if !n.config.VaultMode { - n.syncManager.Start() + if err := n.syncManager.Start(); err != nil { + return err + } } - n.initAndstartApiServer() - n.notificationMgr.Start() + + n.initAndstartAPIServer() + if err := n.notificationMgr.Start(); err != nil { + return err + } + if !n.config.Web.Closed { _, port, err := net.SplitHostPort(n.config.ApiAddress) if err != nil { @@ -289,24 +242,20 @@ func (n *Node) OnStart() error { } launchWebBrowser(port) } - go bytomdRPCCheck() return nil } func (n *Node) OnStop() { - if err := n.chain.Engine.Finish(); err != nil { - log.Errorf("OnStop: %v", err) - } - n.notificationMgr.Shutdown() n.notificationMgr.WaitForShutdown() n.BaseService.OnStop() if n.miningEnable { - n.miner.Stop() + n.cpuMiner.Stop() } if !n.config.VaultMode { n.syncManager.Stop() } + n.eventDispatcher.Stop() } func (n *Node) RunForever() { @@ -315,83 +264,3 @@ func (n *Node) RunForever() { n.Stop() }) } - -func (n *Node) SyncManager() *netsync.SyncManager { - return n.syncManager -} - -/**bytomdRPCCheck Check if bytomd connection via RPC is correctly working*/ -func bytomdRPCCheck() bool { - type Req struct { - BlockHeight uint64 `json:"block_height"` - } - if util.ValidatePegin { - for { - resp, err := util.CallRPC("/get-merkle-proof", &Req{BlockHeight: 0}) - if err != nil { - log.Error("Call mainchain interface get-block-header failed") - time.Sleep(time.Millisecond * 1000) - continue - } - tmp, _ := json.Marshal(resp) - var blockHeader api.GetBlockHeaderResp - json.Unmarshal(tmp, &blockHeader) - hash := blockHeader.BlockHeader.Hash() - if strings.Compare(consensus.ActiveNetParams.ParentGenesisBlockHash, hash.String()) != 0 { - log.Error("Invalid parent genesis block hash response via RPC. Contacting wrong parent daemon?", consensus.ActiveNetParams.ParentGenesisBlockHash, ":", hash.String()) - return false - } - break - } - } - - return true -} - -func initConsensusConfig(config *cfg.Config) { - if config.ConsensusConfigFile == "" { - // poa - } else { - // - file, err := os.Open(config.ConsensusConfigFile) - if err != nil { - cmn.Exit(cmn.Fmt("Failed to read consensus file: %v", err)) - } - defer file.Close() - - if err := json.NewDecoder(file).Decode(config); err != nil { - cmn.Exit(cmn.Fmt("invalid consensus file: %v", err)) - } - - for _, v := range config.Consensus.SelfVoteSigners { - address, err := common.DecodeAddress(v, &consensus.ActiveNetParams) - if err != nil { - cmn.Exit(cmn.Fmt("Address resolution failed: %v", err)) - } - config.Consensus.Signers = append(config.Consensus.Signers, address) - } - } -} - -func initDpos(chain *protocol.Chain, config *cfg.Config) { - header := chain.BestBlockHeader() - height := header.Height - hash := header.Hash() - maxSignerCount := config.Consensus.MaxSignerCount - period := config.Consensus.Period - err := chain.Engine.Init(chain, maxSignerCount, period, height, hash) - - if height > 0 { - oldBlockHeight := chain.Engine.GetOldBlockHeight() - oldBlockHash := chain.Engine.GetOldBlockHash() - if err != nil { - oldBlockHeight = 0 - header, _ = chain.GetHeaderByHeight(oldBlockHeight) - oldBlockHash = header.Hash() - } - - if err := chain.RepairDPoSData(oldBlockHeight, oldBlockHash); err != nil { - cmn.Exit(cmn.Fmt("initVote failed: %v", err)) - } - } -}