OSDN Git Service

add check point
[bytom/bytom.git] / node / node.go
old mode 100755 (executable)
new mode 100644 (file)
index 904fde9..f439669
@@ -2,39 +2,41 @@ package node
 
 import (
        "context"
+       "errors"
        "net/http"
        _ "net/http/pprof"
+       "os"
+       "path/filepath"
        "strings"
-       "time"
 
+       "github.com/prometheus/prometheus/util/flock"
        log "github.com/sirupsen/logrus"
-       "github.com/tendermint/go-crypto"
-       "github.com/tendermint/go-wire"
        cmn "github.com/tendermint/tmlibs/common"
        dbm "github.com/tendermint/tmlibs/db"
+       browser "github.com/toqueteos/webbrowser"
 
-       "github.com/bytom/api"
-       "github.com/bytom/crypto/ed25519/chainkd"
-       bc "github.com/bytom/blockchain"
        "github.com/bytom/accesstoken"
        "github.com/bytom/account"
+       "github.com/bytom/api"
        "github.com/bytom/asset"
        "github.com/bytom/blockchain/pseudohsm"
        "github.com/bytom/blockchain/txfeed"
        cfg "github.com/bytom/config"
+       "github.com/bytom/consensus"
        "github.com/bytom/database/leveldb"
        "github.com/bytom/env"
-       "github.com/bytom/p2p"
+       "github.com/bytom/mining/cpuminer"
+       "github.com/bytom/mining/miningpool"
+       "github.com/bytom/mining/tensority"
+       "github.com/bytom/netsync"
        "github.com/bytom/protocol"
-       "github.com/bytom/types"
-       "github.com/bytom/util/browser"
-       "github.com/bytom/version"
+       "github.com/bytom/protocol/bc"
        w "github.com/bytom/wallet"
 )
 
 const (
-       webAddress               = "http://127.0.0.1:9888"
-       expireReservationsPeriod = time.Second
+       webHost           = "http://127.0.0.1"
+       maxNewBlockChSize = 1024
 )
 
 type Node struct {
@@ -43,59 +45,39 @@ type Node struct {
        // config
        config *cfg.Config
 
-       // network
-       privKey  crypto.PrivKeyEd25519 // local node's p2p key
-       sw       *p2p.Switch           // p2p connections
-       addrBook *p2p.AddrBook         // known peers
+       syncManager *netsync.SyncManager
 
-       evsw         types.EventSwitch // pub/sub for services
-       bcReactor    *bc.BlockchainReactor
+       //bcReactor    *bc.BlockchainReactor
        wallet       *w.Wallet
        accessTokens *accesstoken.CredentialStore
        api          *api.API
        chain        *protocol.Chain
+       txfeed       *txfeed.Tracker
+       cpuMiner     *cpuminer.CPUMiner
+       miningPool   *miningpool.MiningPool
+       miningEnable bool
 }
 
 func NewNode(config *cfg.Config) *Node {
        ctx := context.Background()
-
+       if err := lockDataDirectory(config); err != nil {
+               cmn.Exit("Error: " + err.Error())
+       }
+       initLogFile(config)
+       initActiveNetParams(config)
        // Get store
-       txDB := dbm.NewDB("txdb", config.DBBackend, config.DBDir())
-       store := leveldb.NewStore(txDB)
+       coreDB := dbm.NewDB("core", config.DBBackend, config.DBDir())
+       store := leveldb.NewStore(coreDB)
 
        tokenDB := dbm.NewDB("accesstoken", config.DBBackend, config.DBDir())
        accessTokens := accesstoken.NewStore(tokenDB)
 
-       privKey := crypto.GenPrivKeyEd25519()
-
-       // Make event switch
-       eventSwitch := types.NewEventSwitch()
-       _, err := eventSwitch.Start()
-       if err != nil {
-               cmn.Exit(cmn.Fmt("Failed to start switch: %v", err))
-       }
-
-       trustHistoryDB := dbm.NewDB("trusthistory", config.DBBackend, config.DBDir())
-
-       sw := p2p.NewSwitch(config.P2P, trustHistoryDB)
-
-       genesisBlock := cfg.GenerateGenesisBlock()
-
-       txPool := protocol.NewTxPool()
-       chain, err := protocol.NewChain(genesisBlock.Hash(), store, txPool)
+       txPool := protocol.NewTxPool(store)
+       chain, err := protocol.NewChain(store, txPool)
        if err != nil {
                cmn.Exit(cmn.Fmt("Failed to create chain structure: %v", err))
        }
 
-       if chain.BestBlockHash() == nil {
-               if err := chain.SaveBlock(genesisBlock); err != nil {
-                       cmn.Exit(cmn.Fmt("Failed to save genesisBlock to store: %v", err))
-               }
-               if err := chain.ConnectBlock(genesisBlock); err != nil {
-                       cmn.Exit(cmn.Fmt("Failed to connect genesisBlock to chain: %v", err))
-               }
-       }
-
        var accounts *account.Manager = nil
        var assets *asset.Registry = nil
        var wallet *w.Wallet = nil
@@ -118,30 +100,22 @@ func NewNode(config *cfg.Config) *Node {
                walletDB := dbm.NewDB("wallet", config.DBBackend, config.DBDir())
                accounts = account.NewManager(walletDB, chain)
                assets = asset.NewRegistry(walletDB, chain)
-               wallet, err = w.NewWallet(walletDB, accounts, assets, hsm, accessTokens, chain)
+               wallet, err = w.NewWallet(walletDB, accounts, assets, hsm, chain)
                if err != nil {
                        log.WithField("error", err).Error("init NewWallet")
                }
 
-               if err := initOrRecoverAccount(hsm, wallet); err != nil {
-                       log.WithField("error", err).Error("initialize or recover account")
+               // trigger rescan wallet
+               if config.Wallet.Rescan {
+                       wallet.RescanBlocks()
                }
-
-               // Clean up expired UTXO reservations periodically.
-               go accounts.ExpireReservations(ctx, expireReservationsPeriod)
        }
+       newBlockCh := make(chan *bc.Hash, maxNewBlockChSize)
 
-       bcReactor := bc.NewBlockchainReactor(chain, txPool, sw, wallet, txFeed, config.Mining)
-
-       sw.AddReactor("BLOCKCHAIN", bcReactor)
+       syncManager, _ := netsync.NewSyncManager(config, chain, txPool, newBlockCh)
 
-       // Optionally, start the pex reactor
-       var addrBook *p2p.AddrBook
-       if config.P2P.PexReactor {
-               addrBook = p2p.NewAddrBook(config.P2P.AddrBookFile(), config.P2P.AddrBookStrict)
-               pexReactor := p2p.NewPEXReactor(addrBook)
-               sw.AddReactor("PEX", pexReactor)
-       }
+       // get transaction from txPool and send it to syncManager and wallet
+       go newPoolTxListener(txPool, syncManager, wallet)
 
        // run the profile server
        profileHost := config.ProfListenAddress
@@ -154,107 +128,130 @@ func NewNode(config *cfg.Config) *Node {
        }
 
        node := &Node{
-               config: config,
-
-               privKey:  privKey,
-               sw:       sw,
-               addrBook: addrBook,
-
-               evsw:         eventSwitch,
-               bcReactor:    bcReactor,
+               config:       config,
+               syncManager:  syncManager,
                accessTokens: accessTokens,
                wallet:       wallet,
                chain:        chain,
+               txfeed:       txFeed,
+               miningEnable: config.Mining,
        }
+
+       node.cpuMiner = cpuminer.NewCPUMiner(chain, accounts, txPool, newBlockCh)
+       node.miningPool = miningpool.NewMiningPool(chain, accounts, txPool, newBlockCh)
+
        node.BaseService = *cmn.NewBaseService(nil, "Node", node)
 
+       if config.Simd.Enable {
+               tensority.UseSIMD = true
+       }
+
        return node
 }
 
-func initOrRecoverAccount(hsm *pseudohsm.HSM, wallet *w.Wallet) error {
-       xpubs := hsm.ListKeys()
-
-       if len(xpubs) == 0 {
-               xpub, err := hsm.XCreate("default", "123456")
-               if err != nil {
-                       return err
+// newPoolTxListener listener transaction from txPool, and send it to syncManager and wallet
+func newPoolTxListener(txPool *protocol.TxPool, syncManager *netsync.SyncManager, wallet *w.Wallet) {
+       txMsgCh := txPool.GetMsgCh()
+       syncManagerTxCh := syncManager.GetNewTxCh()
+
+       for {
+               msg := <-txMsgCh
+               switch msg.MsgType {
+               case protocol.MsgNewTx:
+                       syncManagerTxCh <- msg.Tx
+                       if wallet != nil {
+                               wallet.AddUnconfirmedTx(msg.TxDesc)
+                       }
+               case protocol.MsgRemoveTx:
+                       if wallet != nil {
+                               wallet.RemoveUnconfirmedTx(msg.TxDesc)
+                       }
+               default:
+                       log.Warn("got unknow message type from the txPool channel")
                }
-
-               wallet.AccountMgr.Create(nil, []chainkd.XPub{xpub.XPub}, 1, "default", nil)
-               return nil
        }
+}
 
-       accounts, err := wallet.AccountMgr.ListAccounts("")
+// Lock data directory after daemonization
+func lockDataDirectory(config *cfg.Config) error {
+       _, _, err := flock.New(filepath.Join(config.RootDir, "LOCK"))
        if err != nil {
-               return err
+               return errors.New("datadir already used by another process")
        }
+       return nil
+}
 
-       if len(accounts) > 0 {
-               return nil
+func initActiveNetParams(config *cfg.Config) {
+       var exist bool
+       consensus.ActiveNetParams, exist = consensus.NetParams[config.ChainID]
+       if !exist {
+               cmn.Exit(cmn.Fmt("chain_id[%v] don't exist", config.ChainID))
        }
+}
 
-       for i, xPub := range xpubs {
-               if err := wallet.ImportAccountXpubKey(i, xPub, w.RecoveryIndex); err != nil {
-                       return err
-               }
+func initLogFile(config *cfg.Config) {
+       if config.LogFile == "" {
+               return
        }
-       return nil
+       cmn.EnsureDir(filepath.Dir(config.LogFile), 0700)
+       file, err := os.OpenFile(config.LogFile, os.O_CREATE|os.O_WRONLY|os.O_APPEND, 0666)
+       if err == nil {
+               log.SetOutput(file)
+       } else {
+               log.WithField("err", err).Info("using default")
+       }
+
 }
 
 // Lanch web broser or not
-func lanchWebBroser(lanch bool) {
-       if lanch {
-               log.Info("Launching System Browser with :", webAddress)
-               if err := browser.Open(webAddress); err != nil {
-                       log.Error(err.Error())
-                       return
-               }
+func launchWebBrowser(port string) {
+       webAddress := webHost + ":" + port
+       log.Info("Launching System Browser with :", webAddress)
+       if err := browser.Open(webAddress); err != nil {
+               log.Error(err.Error())
+               return
        }
 }
 
 func (n *Node) initAndstartApiServer() {
-       n.api = api.NewAPI(n.bcReactor, n.wallet, n.chain, n.config)
+       n.api = api.NewAPI(n.syncManager, n.wallet, n.txfeed, n.cpuMiner, n.miningPool, n.chain, n.config, n.accessTokens)
 
        listenAddr := env.String("LISTEN", n.config.ApiAddress)
+       env.Parse()
        n.api.StartServer(*listenAddr)
 }
 
 func (n *Node) OnStart() error {
-       // Create & add listener
-       p, address := ProtocolAndAddress(n.config.P2P.ListenAddress)
-       l := p2p.NewDefaultListener(p, address, n.config.P2P.SkipUPNP, nil)
-       n.sw.AddListener(l)
-
-       // Start the switch
-       n.sw.SetNodeInfo(n.makeNodeInfo())
-       n.sw.SetNodePrivKey(n.privKey)
-       _, err := n.sw.Start()
-       if err != nil {
-               return err
-       }
-
-       // If seeds exist, add them to the address book and dial out
-       if n.config.P2P.Seeds != "" {
-               // dial out
-               seeds := strings.Split(n.config.P2P.Seeds, ",")
-               if err := n.DialSeeds(seeds); err != nil {
-                       return err
+       if n.miningEnable {
+               if _, err := n.wallet.AccountMgr.GetMiningAddress(); err != nil {
+                       n.miningEnable = false
+                       log.Error(err)
+               } else {
+                       n.cpuMiner.Start()
                }
        }
-
+       if !n.config.VaultMode {
+               n.syncManager.Start()
+       }
        n.initAndstartApiServer()
-       lanchWebBroser(!n.config.Web.Closed)
-
+       if !n.config.Web.Closed {
+               s := strings.Split(n.config.ApiAddress, ":")
+               if len(s) != 2 {
+                       log.Error("Invalid api address")
+               }
+               launchWebBrowser(s[1])
+       }
        return nil
 }
 
 func (n *Node) OnStop() {
        n.BaseService.OnStop()
-
-       log.Info("Stopping Node")
-       // TODO: gracefully disconnect from peers.
-       n.sw.Stop()
-
+       if n.miningEnable {
+               n.cpuMiner.Stop()
+       }
+       if !n.config.VaultMode {
+               n.syncManager.Stop()
+       }
 }
 
 func (n *Node) RunForever() {
@@ -264,68 +261,10 @@ func (n *Node) RunForever() {
        })
 }
 
-// Add a Listener to accept inbound peer connections.
-// Add listeners before starting the Node.
-// The first listener is the primary listener (in NodeInfo)
-func (n *Node) AddListener(l p2p.Listener) {
-       n.sw.AddListener(l)
+func (n *Node) SyncManager() *netsync.SyncManager {
+       return n.syncManager
 }
 
-func (n *Node) Switch() *p2p.Switch {
-       return n.sw
+func (n *Node) MiningPool() *miningpool.MiningPool {
+       return n.miningPool
 }
-
-func (n *Node) EventSwitch() types.EventSwitch {
-       return n.evsw
-}
-
-func (n *Node) makeNodeInfo() *p2p.NodeInfo {
-       nodeInfo := &p2p.NodeInfo{
-               PubKey:  n.privKey.PubKey().Unwrap().(crypto.PubKeyEd25519),
-               Moniker: n.config.Moniker,
-               Network: "bytom",
-               Version: version.Version,
-               Other: []string{
-                       cmn.Fmt("wire_version=%v", wire.Version),
-                       cmn.Fmt("p2p_version=%v", p2p.Version),
-               },
-       }
-
-       if !n.sw.IsListening() {
-               return nodeInfo
-       }
-
-       p2pListener := n.sw.Listeners()[0]
-       p2pHost := p2pListener.ExternalAddress().IP.String()
-       p2pPort := p2pListener.ExternalAddress().Port
-       //rpcListenAddr := n.config.RPC.ListenAddress
-
-       // We assume that the rpcListener has the same ExternalAddress.
-       // This is probably true because both P2P and RPC listeners use UPnP,
-       // except of course if the rpc is only bound to localhost
-       nodeInfo.ListenAddr = cmn.Fmt("%v:%v", p2pHost, p2pPort)
-       //nodeInfo.Other = append(nodeInfo.Other, cmn.Fmt("rpc_addr=%v", rpcListenAddr))
-       return nodeInfo
-}
-
-//------------------------------------------------------------------------------
-
-func (n *Node) NodeInfo() *p2p.NodeInfo {
-       return n.sw.NodeInfo()
-}
-
-func (n *Node) DialSeeds(seeds []string) error {
-       return n.sw.DialSeeds(n.addrBook, seeds)
-}
-
-// Defaults to tcp
-func ProtocolAndAddress(listenAddr string) (string, string) {
-       p, address := "tcp", listenAddr
-       parts := strings.SplitN(address, "://", 2)
-       if len(parts) == 2 {
-               p, address = parts[0], parts[1]
-       }
-       return p, address
-}
-
-//------------------------------------------------------------------------------