X-Git-Url: http://git.osdn.net/view?a=blobdiff_plain;f=node%2Fnode.go;h=f439669fefdc749ad7c4bb1253ac079b6f90c7b1;hb=33700f5679bfa78beb9e14abc844d0df0f2e9c4b;hp=46909cffc1062944b7bde3729865e725229b4abd;hpb=2aa44bbadea99984dc5c43ba8f9162376c707c79;p=bytom%2Fbytom.git diff --git a/node/node.go b/node/node.go index 46909cff..f439669f 100644 --- a/node/node.go +++ b/node/node.go @@ -2,10 +2,14 @@ package node import ( "context" + "errors" "net/http" _ "net/http/pprof" - "time" + "os" + "path/filepath" + "strings" + "github.com/prometheus/prometheus/util/flock" log "github.com/sirupsen/logrus" cmn "github.com/tendermint/tmlibs/common" dbm "github.com/tendermint/tmlibs/db" @@ -19,22 +23,20 @@ import ( "github.com/bytom/blockchain/txfeed" cfg "github.com/bytom/config" "github.com/bytom/consensus" - "github.com/bytom/crypto/ed25519/chainkd" "github.com/bytom/database/leveldb" "github.com/bytom/env" "github.com/bytom/mining/cpuminer" "github.com/bytom/mining/miningpool" + "github.com/bytom/mining/tensority" "github.com/bytom/netsync" "github.com/bytom/protocol" "github.com/bytom/protocol/bc" - "github.com/bytom/types" w "github.com/bytom/wallet" ) const ( - webAddress = "http://127.0.0.1:9888" - expireReservationsPeriod = time.Second - maxNewBlockChSize = 1024 + webHost = "http://127.0.0.1" + maxNewBlockChSize = 1024 ) type Node struct { @@ -45,7 +47,6 @@ type Node struct { syncManager *netsync.SyncManager - evsw types.EventSwitch // pub/sub for services //bcReactor *bc.BlockchainReactor wallet *w.Wallet accessTokens *accesstoken.CredentialStore @@ -59,24 +60,19 @@ type Node struct { func NewNode(config *cfg.Config) *Node { ctx := context.Background() - if config.ChainID == "testnet" { - consensus.ActiveNetParams = &consensus.TestNetParams + if err := lockDataDirectory(config); err != nil { + cmn.Exit("Error: " + err.Error()) } + initLogFile(config) + initActiveNetParams(config) // Get store - txDB := dbm.NewDB("txdb", config.DBBackend, config.DBDir()) - store := leveldb.NewStore(txDB) + coreDB := dbm.NewDB("core", config.DBBackend, config.DBDir()) + store := leveldb.NewStore(coreDB) tokenDB := dbm.NewDB("accesstoken", config.DBBackend, config.DBDir()) accessTokens := accesstoken.NewStore(tokenDB) - // Make event switch - eventSwitch := types.NewEventSwitch() - _, err := eventSwitch.Start() - if err != nil { - cmn.Exit(cmn.Fmt("Failed to start switch: %v", err)) - } - - txPool := protocol.NewTxPool() + txPool := protocol.NewTxPool(store) chain, err := protocol.NewChain(store, txPool) if err != nil { cmn.Exit(cmn.Fmt("Failed to create chain structure: %v", err)) @@ -109,17 +105,18 @@ func NewNode(config *cfg.Config) *Node { log.WithField("error", err).Error("init NewWallet") } - if err := initOrRecoverAccount(hsm, wallet); err != nil { - log.WithField("error", err).Error("initialize or recover account") + // trigger rescan wallet + if config.Wallet.Rescan { + wallet.RescanBlocks() } - - // Clean up expired UTXO reservations periodically. - go accounts.ExpireReservations(ctx, expireReservationsPeriod) } newBlockCh := make(chan *bc.Hash, maxNewBlockChSize) syncManager, _ := netsync.NewSyncManager(config, chain, txPool, newBlockCh) + // get transaction from txPool and send it to syncManager and wallet + go newPoolTxListener(txPool, syncManager, wallet) + // run the profile server profileHost := config.ProfListenAddress if profileHost != "" { @@ -133,7 +130,6 @@ func NewNode(config *cfg.Config) *Node { node := &Node{ config: config, syncManager: syncManager, - evsw: eventSwitch, accessTokens: accessTokens, wallet: wallet, chain: chain, @@ -146,41 +142,70 @@ func NewNode(config *cfg.Config) *Node { node.BaseService = *cmn.NewBaseService(nil, "Node", node) + if config.Simd.Enable { + tensority.UseSIMD = true + } + return node } -func initOrRecoverAccount(hsm *pseudohsm.HSM, wallet *w.Wallet) error { - xpubs := hsm.ListKeys() - - if len(xpubs) == 0 { - xpub, err := hsm.XCreate("default", "123456") - if err != nil { - return err +// newPoolTxListener listener transaction from txPool, and send it to syncManager and wallet +func newPoolTxListener(txPool *protocol.TxPool, syncManager *netsync.SyncManager, wallet *w.Wallet) { + txMsgCh := txPool.GetMsgCh() + syncManagerTxCh := syncManager.GetNewTxCh() + + for { + msg := <-txMsgCh + switch msg.MsgType { + case protocol.MsgNewTx: + syncManagerTxCh <- msg.Tx + if wallet != nil { + wallet.AddUnconfirmedTx(msg.TxDesc) + } + case protocol.MsgRemoveTx: + if wallet != nil { + wallet.RemoveUnconfirmedTx(msg.TxDesc) + } + default: + log.Warn("got unknow message type from the txPool channel") } - - wallet.AccountMgr.Create(nil, []chainkd.XPub{xpub.XPub}, 1, "default") - return nil } +} - accounts, err := wallet.AccountMgr.ListAccounts("") +// Lock data directory after daemonization +func lockDataDirectory(config *cfg.Config) error { + _, _, err := flock.New(filepath.Join(config.RootDir, "LOCK")) if err != nil { - return err + return errors.New("datadir already used by another process") } + return nil +} - if len(accounts) > 0 { - return nil +func initActiveNetParams(config *cfg.Config) { + var exist bool + consensus.ActiveNetParams, exist = consensus.NetParams[config.ChainID] + if !exist { + cmn.Exit(cmn.Fmt("chain_id[%v] don't exist", config.ChainID)) } +} - for i, xPub := range xpubs { - if err := wallet.ImportAccountXpubKey(i, xPub, w.RecoveryIndex); err != nil { - return err - } +func initLogFile(config *cfg.Config) { + if config.LogFile == "" { + return } - return nil + cmn.EnsureDir(filepath.Dir(config.LogFile), 0700) + file, err := os.OpenFile(config.LogFile, os.O_CREATE|os.O_WRONLY|os.O_APPEND, 0666) + if err == nil { + log.SetOutput(file) + } else { + log.WithField("err", err).Info("using default") + } + } // Lanch web broser or not -func lanchWebBroser() { +func launchWebBrowser(port string) { + webAddress := webHost + ":" + port log.Info("Launching System Browser with :", webAddress) if err := browser.Open(webAddress); err != nil { log.Error(err.Error()) @@ -198,14 +223,24 @@ func (n *Node) initAndstartApiServer() { func (n *Node) OnStart() error { if n.miningEnable { - n.cpuMiner.Start() + if _, err := n.wallet.AccountMgr.GetMiningAddress(); err != nil { + n.miningEnable = false + log.Error(err) + } else { + n.cpuMiner.Start() + } + } + if !n.config.VaultMode { + n.syncManager.Start() } - n.syncManager.Start() n.initAndstartApiServer() if !n.config.Web.Closed { - lanchWebBroser() + s := strings.Split(n.config.ApiAddress, ":") + if len(s) != 2 { + log.Error("Invalid api address") + } + launchWebBrowser(s[1]) } - return nil } @@ -214,9 +249,9 @@ func (n *Node) OnStop() { if n.miningEnable { n.cpuMiner.Stop() } - n.syncManager.Stop() - log.Info("Stopping Node") - // TODO: gracefully disconnect from peers. + if !n.config.VaultMode { + n.syncManager.Stop() + } } func (n *Node) RunForever() { @@ -226,10 +261,6 @@ func (n *Node) RunForever() { }) } -func (n *Node) EventSwitch() types.EventSwitch { - return n.evsw -} - func (n *Node) SyncManager() *netsync.SyncManager { return n.syncManager } @@ -237,5 +268,3 @@ func (n *Node) SyncManager() *netsync.SyncManager { func (n *Node) MiningPool() *miningpool.MiningPool { return n.miningPool } - -//------------------------------------------------------------------------------