X-Git-Url: http://git.osdn.net/view?a=blobdiff_plain;f=node%2Fnode.go;h=c160f17c930e8244367ae5ff2fdeccfbd25c42b3;hb=refs%2Fheads%2Fmaster;hp=45c335e379e273905a15cfd0a262f2845539838d;hpb=1accb05ca43992587ace7559732900098953980f;p=bytom%2Fvapor.git diff --git a/node/node.go b/node/node.go index 45c335e3..87d7456f 100644 --- a/node/node.go +++ b/node/node.go @@ -1,110 +1,106 @@ package node import ( - "context" - "encoding/json" + "encoding/hex" "errors" "net" "net/http" + // debug tool _ "net/http/pprof" - "os" "path/filepath" - "strings" - "time" + "reflect" "github.com/prometheus/prometheus/util/flock" log "github.com/sirupsen/logrus" cmn "github.com/tendermint/tmlibs/common" - dbm "github.com/tendermint/tmlibs/db" browser "github.com/toqueteos/webbrowser" - "github.com/vapor/accesstoken" - "github.com/vapor/account" - "github.com/vapor/api" - "github.com/vapor/asset" - "github.com/vapor/blockchain/pseudohsm" - "github.com/vapor/blockchain/txfeed" - cfg "github.com/vapor/config" - "github.com/vapor/consensus" - "github.com/vapor/crypto/ed25519/chainkd" - "github.com/vapor/database/leveldb" - "github.com/vapor/env" - "github.com/vapor/mining/cpuminer" - "github.com/vapor/mining/miningpool" - "github.com/vapor/net/websocket" - "github.com/vapor/netsync" - "github.com/vapor/protocol" - "github.com/vapor/protocol/bc" - "github.com/vapor/util" - w "github.com/vapor/wallet" + "github.com/bytom/vapor/accesstoken" + "github.com/bytom/vapor/account" + "github.com/bytom/vapor/api" + "github.com/bytom/vapor/application/mov" + "github.com/bytom/vapor/asset" + "github.com/bytom/vapor/blockchain/pseudohsm" + cfg "github.com/bytom/vapor/config" + "github.com/bytom/vapor/consensus" + "github.com/bytom/vapor/database" + dbm "github.com/bytom/vapor/database/leveldb" + "github.com/bytom/vapor/env" + "github.com/bytom/vapor/event" + vaporLog "github.com/bytom/vapor/log" + "github.com/bytom/vapor/net/websocket" + "github.com/bytom/vapor/netsync" + "github.com/bytom/vapor/proposal/blockproposer" + "github.com/bytom/vapor/protocol" + "github.com/bytom/vapor/protocol/bc/types" + w "github.com/bytom/vapor/wallet" ) const ( - webHost = "http://127.0.0.1" - maxNewBlockChSize = 1024 + webHost = "http://127.0.0.1" + logModule = "node" ) +// Node represent bytom node type Node struct { cmn.BaseService - // config - config *cfg.Config + config *cfg.Config + eventDispatcher *event.Dispatcher + syncManager *netsync.SyncManager - syncManager *netsync.SyncManager - - //bcReactor *bc.BlockchainReactor wallet *w.Wallet accessTokens *accesstoken.CredentialStore notificationMgr *websocket.WSNotificationManager api *api.API chain *protocol.Chain - txfeed *txfeed.Tracker - cpuMiner *cpuminer.CPUMiner - miningPool *miningpool.MiningPool + cpuMiner *blockproposer.BlockProposer miningEnable bool - - newBlockCh chan *bc.Hash } +// NewNode create bytom node func NewNode(config *cfg.Config) *Node { - ctx := context.Background() - if err := lockDataDirectory(config); err != nil { - cmn.Exit("Error: " + err.Error()) + initNodeConfig(config) + + if err := vaporLog.InitLogFile(config); err != nil { + log.WithField("err", err).Fatalln("InitLogFile failed") } - initLogFile(config) - initActiveNetParams(config) - initCommonConfig(config) - util.MainchainConfig = config.MainChain - util.ValidatePegin = config.ValidatePegin + + log.WithFields(log.Fields{ + "module": logModule, + "pubkey": config.PrivateKey().XPub(), + "fed_xpubs": config.Federation.Xpubs, + "fed_quorum": config.Federation.Quorum, + "fed_controlprogram": hex.EncodeToString(cfg.FederationWScript(config, 0)), + }).Info() + // Get store if config.DBBackend != "memdb" && config.DBBackend != "leveldb" { cmn.Exit(cmn.Fmt("Param db_backend [%v] is invalid, use leveldb or memdb", config.DBBackend)) } coreDB := dbm.NewDB("core", config.DBBackend, config.DBDir()) - store := leveldb.NewStore(coreDB) + store := database.NewStore(coreDB) tokenDB := dbm.NewDB("accesstoken", config.DBBackend, config.DBDir()) accessTokens := accesstoken.NewStore(tokenDB) - txPool := protocol.NewTxPool(store) - chain, err := protocol.NewChain(store, txPool) + dispatcher := event.NewDispatcher() + movCore := mov.NewCore(config.DBBackend, config.DBDir(), consensus.ActiveNetParams.MovStartHeight) + assetFilter := protocol.NewAssetFilter(config.CrossChain.AssetWhitelist) + txPool := protocol.NewTxPool(store, []protocol.DustFilterer{movCore, assetFilter}, dispatcher) + chain, err := protocol.NewChain(store, txPool, []protocol.SubProtocol{movCore}, dispatcher) if err != nil { cmn.Exit(cmn.Fmt("Failed to create chain structure: %v", err)) } - var accounts *account.Manager = nil - var assets *asset.Registry = nil - var wallet *w.Wallet = nil - var txFeed *txfeed.Tracker = nil - - txFeedDB := dbm.NewDB("txfeeds", config.DBBackend, config.DBDir()) - txFeed = txfeed.NewTracker(txFeedDB, chain) - - if err = txFeed.Prepare(ctx); err != nil { - log.WithField("error", err).Error("start txfeed") - return nil + if err := checkConfig(chain, config); err != nil { + panic(err) } + var accounts *account.Manager + var assets *asset.Registry + var wallet *w.Wallet + hsm, err := pseudohsm.New(config.KeysDir()) if err != nil { cmn.Exit(cmn.Fmt("initialize HSM failed: %v", err)) @@ -112,11 +108,17 @@ func NewNode(config *cfg.Config) *Node { if !config.Wallet.Disable { walletDB := dbm.NewDB("wallet", config.DBBackend, config.DBDir()) - accounts = account.NewManager(walletDB, chain) + walletStore := database.NewWalletStore(walletDB) + accountStore := database.NewAccountStore(walletDB) + accounts = account.NewManager(accountStore, chain) assets = asset.NewRegistry(walletDB, chain) - wallet, err = w.NewWallet(walletDB, accounts, assets, hsm, chain) + wallet, err = w.NewWallet(walletStore, accounts, assets, hsm, chain, dispatcher, config.Wallet.TxIndex) if err != nil { - log.WithField("error", err).Error("init NewWallet") + log.WithFields(log.Fields{"module": logModule, "error": err}).Error("init NewWallet") + } + + if err = wallet.Run(); err != nil { + log.WithFields(log.Fields{"module": logModule, "error": err}).Error("init NewWallet work running thread") } // trigger rescan wallet @@ -124,19 +126,18 @@ func NewNode(config *cfg.Config) *Node { wallet.RescanBlocks() } } - newBlockCh := make(chan *bc.Hash, maxNewBlockChSize) - - syncManager, _ := netsync.NewSyncManager(config, chain, txPool, newBlockCh) - - notificationMgr := websocket.NewWsNotificationManager(config.Websocket.MaxNumWebsockets, config.Websocket.MaxNumConcurrentReqs, chain) + fastSyncDB := dbm.NewDB("fastsync", config.DBBackend, config.DBDir()) + syncManager, err := netsync.NewSyncManager(config, chain, txPool, dispatcher, fastSyncDB) + if err != nil { + cmn.Exit(cmn.Fmt("Failed to create sync manager: %v", err)) + } - // get transaction from txPool and send it to syncManager and wallet - go newPoolTxListener(txPool, syncManager, wallet, notificationMgr) + notificationMgr := websocket.NewWsNotificationManager(config.Websocket.MaxNumWebsockets, config.Websocket.MaxNumConcurrentReqs, chain, dispatcher) // run the profile server profileHost := config.ProfListenAddress if profileHost != "" { - // Profiling bytomd programs.see (https://blog.golang.org/profiling-go-programs) + // Profiling vapord programs.see (https://blog.golang.org/profiling-go-programs) // go tool pprof http://profileHose/debug/pprof/heap go func() { if err = http.ListenAndServe(profileHost, nil); err != nil { @@ -146,104 +147,108 @@ func NewNode(config *cfg.Config) *Node { } node := &Node{ - config: config, - syncManager: syncManager, - accessTokens: accessTokens, - wallet: wallet, - chain: chain, - txfeed: txFeed, - miningEnable: config.Mining, - - newBlockCh: newBlockCh, + eventDispatcher: dispatcher, + config: config, + syncManager: syncManager, + accessTokens: accessTokens, + wallet: wallet, + chain: chain, + miningEnable: config.Mining, + notificationMgr: notificationMgr, } - node.cpuMiner = cpuminer.NewCPUMiner(chain, accounts, txPool, newBlockCh) - node.miningPool = miningpool.NewMiningPool(chain, accounts, txPool, newBlockCh) - + node.cpuMiner = blockproposer.NewBlockProposer(chain, accounts, txPool, dispatcher) node.BaseService = *cmn.NewBaseService(nil, "Node", node) - return node } -// newPoolTxListener listener transaction from txPool, and send it to syncManager and wallet -func newPoolTxListener(txPool *protocol.TxPool, syncManager *netsync.SyncManager, wallet *w.Wallet, notificationMgr *websocket.WSNotificationManager) { - txMsgCh := txPool.GetMsgCh() - syncManagerTxCh := syncManager.GetNewTxCh() - - for { - msg := <-txMsgCh - switch msg.MsgType { - case protocol.MsgNewTx: - syncManagerTxCh <- msg.Tx - if wallet != nil { - wallet.AddUnconfirmedTx(msg.TxDesc) - } - notificationMgr.NotifyMempoolTx(msg.Tx) - case protocol.MsgRemoveTx: - if wallet != nil { - wallet.RemoveUnconfirmedTx(msg.TxDesc) - } - default: - log.Warn("got unknow message type from the txPool channel") - } +// Rollback rollback chain from one height to targetHeight +func Rollback(config *cfg.Config, targetHeight uint64) error { + if err := initNodeConfig(config); err != nil { + return err } -} -// Lock data directory after daemonization -func lockDataDirectory(config *cfg.Config) error { - _, _, err := flock.New(filepath.Join(config.RootDir, "LOCK")) + // Get store + if config.DBBackend != "leveldb" { + return errors.New("Param db_backend is invalid, use leveldb") + } + + coreDB := dbm.NewDB("core", config.DBBackend, config.DBDir()) + store := database.NewStore(coreDB) + + dispatcher := event.NewDispatcher() + movCore := mov.NewCore(config.DBBackend, config.DBDir(), consensus.ActiveNetParams.MovStartHeight) + txPool := protocol.NewTxPool(store, []protocol.DustFilterer{movCore}, dispatcher) + chain, err := protocol.NewChain(store, txPool, []protocol.SubProtocol{movCore}, dispatcher) if err != nil { - return errors.New("datadir already used by another process") + return err } - return nil + + hsm, err := pseudohsm.New(config.KeysDir()) + if err != nil { + return err + } + + walletDB := dbm.NewDB("wallet", config.DBBackend, config.DBDir()) + walletStore := database.NewWalletStore(walletDB) + accountStore := database.NewAccountStore(walletDB) + accounts := account.NewManager(accountStore, chain) + assets := asset.NewRegistry(walletDB, chain) + wallet, err := w.NewWallet(walletStore, accounts, assets, hsm, chain, dispatcher, config.Wallet.TxIndex) + if err != nil { + return err + } + + if err := wallet.Rollback(targetHeight); err != nil { + return err + } + + return chain.Rollback(targetHeight) } -func initActiveNetParams(config *cfg.Config) { - var exist bool - consensus.ActiveNetParams, exist = consensus.NetParams[config.ChainID] - if !exist { - cmn.Exit(cmn.Fmt("chain_id[%v] don't exist", config.ChainID)) +func initNodeConfig(config *cfg.Config) error { + if err := lockDataDirectory(config); err != nil { + log.WithField("err", err).Info("Error: " + err.Error()) + return err } - if config.Side.FedpegXPubs != "" { - var federationRedeemXPubs []chainkd.XPub - fedpegXPubs := strings.Split(config.Side.FedpegXPubs, ",") - for _, xpubStr := range fedpegXPubs { - var xpub chainkd.XPub - xpub.UnmarshalText([]byte(xpubStr)) - federationRedeemXPubs = append(federationRedeemXPubs, xpub) - } - consensus.ActiveNetParams.FedpegXPubs = federationRedeemXPubs + + if err := cfg.LoadFederationFile(config.FederationFile(), config); err != nil { + log.WithField("err", err).Info("Failed to load federated information") + return err } - if config.Side.SignBlockXPubs != "" { - var signBlockXPubs []chainkd.XPub - xPubs := strings.Split(config.Side.SignBlockXPubs, ",") - for _, xpubStr := range xPubs { - var xpub chainkd.XPub - xpub.UnmarshalText([]byte(xpubStr)) - signBlockXPubs = append(signBlockXPubs, xpub) - } - consensus.ActiveNetParams.SignBlockXPubs = signBlockXPubs + if err := consensus.InitActiveNetParams(config.ChainID); err != nil { + log.Fatalf("Failed to init ActiveNetParams:[%s]", err.Error()) } - consensus.ActiveNetParams.Signer = config.Signer - consensus.ActiveNetParams.PeginMinDepth = config.Side.PeginMinDepth - consensus.ActiveNetParams.ParentGenesisBlockHash = config.Side.ParentGenesisBlockHash + cfg.CommonConfig = config + return nil } -func initLogFile(config *cfg.Config) { - if config.LogFile == "" { - return +// find whether config xpubs equal genesis block xpubs +func checkConfig(chain *protocol.Chain, config *cfg.Config) error { + fedpegScript := cfg.FederationWScript(config, 0) + genesisBlock, err := chain.GetBlockByHeight(0) + if err != nil { + return err } - cmn.EnsureDir(filepath.Dir(config.LogFile), 0700) - file, err := os.OpenFile(config.LogFile, os.O_CREATE|os.O_WRONLY|os.O_APPEND, 0666) - if err == nil { - log.SetOutput(file) - } else { - log.WithField("err", err).Info("using default") + typedInput := genesisBlock.Transactions[0].Inputs[0].TypedInput + if v, ok := typedInput.(*types.CoinbaseInput); ok { + if !reflect.DeepEqual(fedpegScript, v.Arbitrary) { + return errors.New("config xpubs don't equal genesis block xpubs") + } } + return nil +} +// Lock data directory after daemonization +func lockDataDirectory(config *cfg.Config) error { + _, _, err := flock.New(filepath.Join(config.RootDir, "LOCK")) + if err != nil { + return errors.New("datadir already used by another process") + } + return nil } func initCommonConfig(config *cfg.Config) { @@ -260,14 +265,15 @@ func launchWebBrowser(port string) { } } -func (n *Node) initAndstartApiServer() { - n.api = api.NewAPI(n.syncManager, n.wallet, n.txfeed, n.cpuMiner, n.miningPool, n.chain, n.config, n.accessTokens, n.newBlockCh, n.notificationMgr) +func (n *Node) initAndstartAPIServer() { + n.api = api.NewAPI(n.syncManager, n.wallet, n.cpuMiner, n.chain, n.config, n.accessTokens, n.eventDispatcher, n.notificationMgr) listenAddr := env.String("LISTEN", n.config.ApiAddress) env.Parse() n.api.StartServer(*listenAddr) } +// OnStart implements BaseService func (n *Node) OnStart() error { if n.miningEnable { if _, err := n.wallet.AccountMgr.GetMiningAddress(); err != nil { @@ -278,10 +284,16 @@ func (n *Node) OnStart() error { } } if !n.config.VaultMode { - n.syncManager.Start() + if err := n.syncManager.Start(); err != nil { + return err + } } - n.initAndstartApiServer() - n.notificationMgr.Start() + + n.initAndstartAPIServer() + if err := n.notificationMgr.Start(); err != nil { + return err + } + if !n.config.Web.Closed { _, port, err := net.SplitHostPort(n.config.ApiAddress) if err != nil { @@ -290,10 +302,10 @@ func (n *Node) OnStart() error { } launchWebBrowser(port) } - go bytomdRPCCheck() return nil } +// OnStop implements BaseService func (n *Node) OnStop() { n.notificationMgr.Shutdown() n.notificationMgr.WaitForShutdown() @@ -304,8 +316,10 @@ func (n *Node) OnStop() { if !n.config.VaultMode { n.syncManager.Stop() } + n.eventDispatcher.Stop() } +// RunForever listen to the stop signal func (n *Node) RunForever() { // Sleep forever and then... cmn.TrapSignal(func() { @@ -313,38 +327,7 @@ func (n *Node) RunForever() { }) } -func (n *Node) SyncManager() *netsync.SyncManager { - return n.syncManager -} - -func (n *Node) MiningPool() *miningpool.MiningPool { - return n.miningPool -} - -/**bytomdRPCCheck Check if bytomd connection via RPC is correctly working*/ -func bytomdRPCCheck() bool { - type Req struct { - BlockHeight uint64 `json:"block_height"` - } - if util.ValidatePegin { - for { - resp, err := util.CallRPC("/get-merkle-proof", &Req{BlockHeight: 0}) - if err != nil { - log.Error("Call mainchain interface get-block-header failed") - time.Sleep(time.Millisecond * 1000) - continue - } - tmp, _ := json.Marshal(resp) - var blockHeader api.GetBlockHeaderResp - json.Unmarshal(tmp, &blockHeader) - hash := blockHeader.BlockHeader.Hash() - if strings.Compare(consensus.ActiveNetParams.ParentGenesisBlockHash, hash.String()) != 0 { - log.Error("Invalid parent genesis block hash response via RPC. Contacting wrong parent daemon?") - return false - } - break - } - } - - return true +// GetChain return the chain +func (n *Node) GetChain() *protocol.Chain { + return n.chain }