X-Git-Url: http://git.osdn.net/view?a=blobdiff_plain;f=node%2Fnode.go;h=f439669fefdc749ad7c4bb1253ac079b6f90c7b1;hb=33700f5679bfa78beb9e14abc844d0df0f2e9c4b;hp=904fde9c522b872d1fb7a0f9d4f0e3f0ff9990b8;hpb=4449c8f1b3d4a55d22004d09a735f518c7c0b862;p=bytom%2Fbytom.git diff --git a/node/node.go b/node/node.go old mode 100755 new mode 100644 index 904fde9c..f439669f --- a/node/node.go +++ b/node/node.go @@ -2,39 +2,41 @@ package node import ( "context" + "errors" "net/http" _ "net/http/pprof" + "os" + "path/filepath" "strings" - "time" + "github.com/prometheus/prometheus/util/flock" log "github.com/sirupsen/logrus" - "github.com/tendermint/go-crypto" - "github.com/tendermint/go-wire" cmn "github.com/tendermint/tmlibs/common" dbm "github.com/tendermint/tmlibs/db" + browser "github.com/toqueteos/webbrowser" - "github.com/bytom/api" - "github.com/bytom/crypto/ed25519/chainkd" - bc "github.com/bytom/blockchain" "github.com/bytom/accesstoken" "github.com/bytom/account" + "github.com/bytom/api" "github.com/bytom/asset" "github.com/bytom/blockchain/pseudohsm" "github.com/bytom/blockchain/txfeed" cfg "github.com/bytom/config" + "github.com/bytom/consensus" "github.com/bytom/database/leveldb" "github.com/bytom/env" - "github.com/bytom/p2p" + "github.com/bytom/mining/cpuminer" + "github.com/bytom/mining/miningpool" + "github.com/bytom/mining/tensority" + "github.com/bytom/netsync" "github.com/bytom/protocol" - "github.com/bytom/types" - "github.com/bytom/util/browser" - "github.com/bytom/version" + "github.com/bytom/protocol/bc" w "github.com/bytom/wallet" ) const ( - webAddress = "http://127.0.0.1:9888" - expireReservationsPeriod = time.Second + webHost = "http://127.0.0.1" + maxNewBlockChSize = 1024 ) type Node struct { @@ -43,59 +45,39 @@ type Node struct { // config config *cfg.Config - // network - privKey crypto.PrivKeyEd25519 // local node's p2p key - sw *p2p.Switch // p2p connections - addrBook *p2p.AddrBook // known peers + syncManager *netsync.SyncManager - evsw types.EventSwitch // pub/sub for services - bcReactor *bc.BlockchainReactor + //bcReactor *bc.BlockchainReactor wallet *w.Wallet accessTokens *accesstoken.CredentialStore api *api.API chain *protocol.Chain + txfeed *txfeed.Tracker + cpuMiner *cpuminer.CPUMiner + miningPool *miningpool.MiningPool + miningEnable bool } func NewNode(config *cfg.Config) *Node { ctx := context.Background() - + if err := lockDataDirectory(config); err != nil { + cmn.Exit("Error: " + err.Error()) + } + initLogFile(config) + initActiveNetParams(config) // Get store - txDB := dbm.NewDB("txdb", config.DBBackend, config.DBDir()) - store := leveldb.NewStore(txDB) + coreDB := dbm.NewDB("core", config.DBBackend, config.DBDir()) + store := leveldb.NewStore(coreDB) tokenDB := dbm.NewDB("accesstoken", config.DBBackend, config.DBDir()) accessTokens := accesstoken.NewStore(tokenDB) - privKey := crypto.GenPrivKeyEd25519() - - // Make event switch - eventSwitch := types.NewEventSwitch() - _, err := eventSwitch.Start() - if err != nil { - cmn.Exit(cmn.Fmt("Failed to start switch: %v", err)) - } - - trustHistoryDB := dbm.NewDB("trusthistory", config.DBBackend, config.DBDir()) - - sw := p2p.NewSwitch(config.P2P, trustHistoryDB) - - genesisBlock := cfg.GenerateGenesisBlock() - - txPool := protocol.NewTxPool() - chain, err := protocol.NewChain(genesisBlock.Hash(), store, txPool) + txPool := protocol.NewTxPool(store) + chain, err := protocol.NewChain(store, txPool) if err != nil { cmn.Exit(cmn.Fmt("Failed to create chain structure: %v", err)) } - if chain.BestBlockHash() == nil { - if err := chain.SaveBlock(genesisBlock); err != nil { - cmn.Exit(cmn.Fmt("Failed to save genesisBlock to store: %v", err)) - } - if err := chain.ConnectBlock(genesisBlock); err != nil { - cmn.Exit(cmn.Fmt("Failed to connect genesisBlock to chain: %v", err)) - } - } - var accounts *account.Manager = nil var assets *asset.Registry = nil var wallet *w.Wallet = nil @@ -118,30 +100,22 @@ func NewNode(config *cfg.Config) *Node { walletDB := dbm.NewDB("wallet", config.DBBackend, config.DBDir()) accounts = account.NewManager(walletDB, chain) assets = asset.NewRegistry(walletDB, chain) - wallet, err = w.NewWallet(walletDB, accounts, assets, hsm, accessTokens, chain) + wallet, err = w.NewWallet(walletDB, accounts, assets, hsm, chain) if err != nil { log.WithField("error", err).Error("init NewWallet") } - if err := initOrRecoverAccount(hsm, wallet); err != nil { - log.WithField("error", err).Error("initialize or recover account") + // trigger rescan wallet + if config.Wallet.Rescan { + wallet.RescanBlocks() } - - // Clean up expired UTXO reservations periodically. - go accounts.ExpireReservations(ctx, expireReservationsPeriod) } + newBlockCh := make(chan *bc.Hash, maxNewBlockChSize) - bcReactor := bc.NewBlockchainReactor(chain, txPool, sw, wallet, txFeed, config.Mining) - - sw.AddReactor("BLOCKCHAIN", bcReactor) + syncManager, _ := netsync.NewSyncManager(config, chain, txPool, newBlockCh) - // Optionally, start the pex reactor - var addrBook *p2p.AddrBook - if config.P2P.PexReactor { - addrBook = p2p.NewAddrBook(config.P2P.AddrBookFile(), config.P2P.AddrBookStrict) - pexReactor := p2p.NewPEXReactor(addrBook) - sw.AddReactor("PEX", pexReactor) - } + // get transaction from txPool and send it to syncManager and wallet + go newPoolTxListener(txPool, syncManager, wallet) // run the profile server profileHost := config.ProfListenAddress @@ -154,107 +128,130 @@ func NewNode(config *cfg.Config) *Node { } node := &Node{ - config: config, - - privKey: privKey, - sw: sw, - addrBook: addrBook, - - evsw: eventSwitch, - bcReactor: bcReactor, + config: config, + syncManager: syncManager, accessTokens: accessTokens, wallet: wallet, chain: chain, + txfeed: txFeed, + miningEnable: config.Mining, } + + node.cpuMiner = cpuminer.NewCPUMiner(chain, accounts, txPool, newBlockCh) + node.miningPool = miningpool.NewMiningPool(chain, accounts, txPool, newBlockCh) + node.BaseService = *cmn.NewBaseService(nil, "Node", node) + if config.Simd.Enable { + tensority.UseSIMD = true + } + return node } -func initOrRecoverAccount(hsm *pseudohsm.HSM, wallet *w.Wallet) error { - xpubs := hsm.ListKeys() - - if len(xpubs) == 0 { - xpub, err := hsm.XCreate("default", "123456") - if err != nil { - return err +// newPoolTxListener listener transaction from txPool, and send it to syncManager and wallet +func newPoolTxListener(txPool *protocol.TxPool, syncManager *netsync.SyncManager, wallet *w.Wallet) { + txMsgCh := txPool.GetMsgCh() + syncManagerTxCh := syncManager.GetNewTxCh() + + for { + msg := <-txMsgCh + switch msg.MsgType { + case protocol.MsgNewTx: + syncManagerTxCh <- msg.Tx + if wallet != nil { + wallet.AddUnconfirmedTx(msg.TxDesc) + } + case protocol.MsgRemoveTx: + if wallet != nil { + wallet.RemoveUnconfirmedTx(msg.TxDesc) + } + default: + log.Warn("got unknow message type from the txPool channel") } - - wallet.AccountMgr.Create(nil, []chainkd.XPub{xpub.XPub}, 1, "default", nil) - return nil } +} - accounts, err := wallet.AccountMgr.ListAccounts("") +// Lock data directory after daemonization +func lockDataDirectory(config *cfg.Config) error { + _, _, err := flock.New(filepath.Join(config.RootDir, "LOCK")) if err != nil { - return err + return errors.New("datadir already used by another process") } + return nil +} - if len(accounts) > 0 { - return nil +func initActiveNetParams(config *cfg.Config) { + var exist bool + consensus.ActiveNetParams, exist = consensus.NetParams[config.ChainID] + if !exist { + cmn.Exit(cmn.Fmt("chain_id[%v] don't exist", config.ChainID)) } +} - for i, xPub := range xpubs { - if err := wallet.ImportAccountXpubKey(i, xPub, w.RecoveryIndex); err != nil { - return err - } +func initLogFile(config *cfg.Config) { + if config.LogFile == "" { + return } - return nil + cmn.EnsureDir(filepath.Dir(config.LogFile), 0700) + file, err := os.OpenFile(config.LogFile, os.O_CREATE|os.O_WRONLY|os.O_APPEND, 0666) + if err == nil { + log.SetOutput(file) + } else { + log.WithField("err", err).Info("using default") + } + } // Lanch web broser or not -func lanchWebBroser(lanch bool) { - if lanch { - log.Info("Launching System Browser with :", webAddress) - if err := browser.Open(webAddress); err != nil { - log.Error(err.Error()) - return - } +func launchWebBrowser(port string) { + webAddress := webHost + ":" + port + log.Info("Launching System Browser with :", webAddress) + if err := browser.Open(webAddress); err != nil { + log.Error(err.Error()) + return } } func (n *Node) initAndstartApiServer() { - n.api = api.NewAPI(n.bcReactor, n.wallet, n.chain, n.config) + n.api = api.NewAPI(n.syncManager, n.wallet, n.txfeed, n.cpuMiner, n.miningPool, n.chain, n.config, n.accessTokens) listenAddr := env.String("LISTEN", n.config.ApiAddress) + env.Parse() n.api.StartServer(*listenAddr) } func (n *Node) OnStart() error { - // Create & add listener - p, address := ProtocolAndAddress(n.config.P2P.ListenAddress) - l := p2p.NewDefaultListener(p, address, n.config.P2P.SkipUPNP, nil) - n.sw.AddListener(l) - - // Start the switch - n.sw.SetNodeInfo(n.makeNodeInfo()) - n.sw.SetNodePrivKey(n.privKey) - _, err := n.sw.Start() - if err != nil { - return err - } - - // If seeds exist, add them to the address book and dial out - if n.config.P2P.Seeds != "" { - // dial out - seeds := strings.Split(n.config.P2P.Seeds, ",") - if err := n.DialSeeds(seeds); err != nil { - return err + if n.miningEnable { + if _, err := n.wallet.AccountMgr.GetMiningAddress(); err != nil { + n.miningEnable = false + log.Error(err) + } else { + n.cpuMiner.Start() } } - + if !n.config.VaultMode { + n.syncManager.Start() + } n.initAndstartApiServer() - lanchWebBroser(!n.config.Web.Closed) - + if !n.config.Web.Closed { + s := strings.Split(n.config.ApiAddress, ":") + if len(s) != 2 { + log.Error("Invalid api address") + } + launchWebBrowser(s[1]) + } return nil } func (n *Node) OnStop() { n.BaseService.OnStop() - - log.Info("Stopping Node") - // TODO: gracefully disconnect from peers. - n.sw.Stop() - + if n.miningEnable { + n.cpuMiner.Stop() + } + if !n.config.VaultMode { + n.syncManager.Stop() + } } func (n *Node) RunForever() { @@ -264,68 +261,10 @@ func (n *Node) RunForever() { }) } -// Add a Listener to accept inbound peer connections. -// Add listeners before starting the Node. -// The first listener is the primary listener (in NodeInfo) -func (n *Node) AddListener(l p2p.Listener) { - n.sw.AddListener(l) +func (n *Node) SyncManager() *netsync.SyncManager { + return n.syncManager } -func (n *Node) Switch() *p2p.Switch { - return n.sw +func (n *Node) MiningPool() *miningpool.MiningPool { + return n.miningPool } - -func (n *Node) EventSwitch() types.EventSwitch { - return n.evsw -} - -func (n *Node) makeNodeInfo() *p2p.NodeInfo { - nodeInfo := &p2p.NodeInfo{ - PubKey: n.privKey.PubKey().Unwrap().(crypto.PubKeyEd25519), - Moniker: n.config.Moniker, - Network: "bytom", - Version: version.Version, - Other: []string{ - cmn.Fmt("wire_version=%v", wire.Version), - cmn.Fmt("p2p_version=%v", p2p.Version), - }, - } - - if !n.sw.IsListening() { - return nodeInfo - } - - p2pListener := n.sw.Listeners()[0] - p2pHost := p2pListener.ExternalAddress().IP.String() - p2pPort := p2pListener.ExternalAddress().Port - //rpcListenAddr := n.config.RPC.ListenAddress - - // We assume that the rpcListener has the same ExternalAddress. - // This is probably true because both P2P and RPC listeners use UPnP, - // except of course if the rpc is only bound to localhost - nodeInfo.ListenAddr = cmn.Fmt("%v:%v", p2pHost, p2pPort) - //nodeInfo.Other = append(nodeInfo.Other, cmn.Fmt("rpc_addr=%v", rpcListenAddr)) - return nodeInfo -} - -//------------------------------------------------------------------------------ - -func (n *Node) NodeInfo() *p2p.NodeInfo { - return n.sw.NodeInfo() -} - -func (n *Node) DialSeeds(seeds []string) error { - return n.sw.DialSeeds(n.addrBook, seeds) -} - -// Defaults to tcp -func ProtocolAndAddress(listenAddr string) (string, string) { - p, address := "tcp", listenAddr - parts := strings.SplitN(address, "://", 2) - if len(parts) == 2 { - p, address = parts[0], parts[1] - } - return p, address -} - -//------------------------------------------------------------------------------