OSDN Git Service

add check point
[bytom/bytom.git] / node / node.go
index 3d0dc79..f439669 100644 (file)
@@ -2,10 +2,14 @@ package node
 
 import (
        "context"
+       "errors"
        "net/http"
        _ "net/http/pprof"
-       "time"
+       "os"
+       "path/filepath"
+       "strings"
 
+       "github.com/prometheus/prometheus/util/flock"
        log "github.com/sirupsen/logrus"
        cmn "github.com/tendermint/tmlibs/common"
        dbm "github.com/tendermint/tmlibs/db"
@@ -18,22 +22,21 @@ import (
        "github.com/bytom/blockchain/pseudohsm"
        "github.com/bytom/blockchain/txfeed"
        cfg "github.com/bytom/config"
-       "github.com/bytom/crypto/ed25519/chainkd"
+       "github.com/bytom/consensus"
        "github.com/bytom/database/leveldb"
        "github.com/bytom/env"
        "github.com/bytom/mining/cpuminer"
        "github.com/bytom/mining/miningpool"
+       "github.com/bytom/mining/tensority"
        "github.com/bytom/netsync"
        "github.com/bytom/protocol"
        "github.com/bytom/protocol/bc"
-       "github.com/bytom/types"
        w "github.com/bytom/wallet"
 )
 
 const (
-       webAddress               = "http://127.0.0.1:9888"
-       expireReservationsPeriod = time.Second
-       maxNewBlockChSize        = 1024
+       webHost           = "http://127.0.0.1"
+       maxNewBlockChSize = 1024
 )
 
 type Node struct {
@@ -44,7 +47,6 @@ type Node struct {
 
        syncManager *netsync.SyncManager
 
-       evsw types.EventSwitch // pub/sub for services
        //bcReactor    *bc.BlockchainReactor
        wallet       *w.Wallet
        accessTokens *accesstoken.CredentialStore
@@ -58,22 +60,19 @@ type Node struct {
 
 func NewNode(config *cfg.Config) *Node {
        ctx := context.Background()
-
+       if err := lockDataDirectory(config); err != nil {
+               cmn.Exit("Error: " + err.Error())
+       }
+       initLogFile(config)
+       initActiveNetParams(config)
        // Get store
-       txDB := dbm.NewDB("txdb", config.DBBackend, config.DBDir())
-       store := leveldb.NewStore(txDB)
+       coreDB := dbm.NewDB("core", config.DBBackend, config.DBDir())
+       store := leveldb.NewStore(coreDB)
 
        tokenDB := dbm.NewDB("accesstoken", config.DBBackend, config.DBDir())
        accessTokens := accesstoken.NewStore(tokenDB)
 
-       // Make event switch
-       eventSwitch := types.NewEventSwitch()
-       _, err := eventSwitch.Start()
-       if err != nil {
-               cmn.Exit(cmn.Fmt("Failed to start switch: %v", err))
-       }
-
-       txPool := protocol.NewTxPool()
+       txPool := protocol.NewTxPool(store)
        chain, err := protocol.NewChain(store, txPool)
        if err != nil {
                cmn.Exit(cmn.Fmt("Failed to create chain structure: %v", err))
@@ -106,17 +105,18 @@ func NewNode(config *cfg.Config) *Node {
                        log.WithField("error", err).Error("init NewWallet")
                }
 
-               if err := initOrRecoverAccount(hsm, wallet); err != nil {
-                       log.WithField("error", err).Error("initialize or recover account")
+               // trigger rescan wallet
+               if config.Wallet.Rescan {
+                       wallet.RescanBlocks()
                }
-
-               // Clean up expired UTXO reservations periodically.
-               go accounts.ExpireReservations(ctx, expireReservationsPeriod)
        }
        newBlockCh := make(chan *bc.Hash, maxNewBlockChSize)
 
        syncManager, _ := netsync.NewSyncManager(config, chain, txPool, newBlockCh)
 
+       // get transaction from txPool and send it to syncManager and wallet
+       go newPoolTxListener(txPool, syncManager, wallet)
+
        // run the profile server
        profileHost := config.ProfListenAddress
        if profileHost != "" {
@@ -130,7 +130,6 @@ func NewNode(config *cfg.Config) *Node {
        node := &Node{
                config:       config,
                syncManager:  syncManager,
-               evsw:         eventSwitch,
                accessTokens: accessTokens,
                wallet:       wallet,
                chain:        chain,
@@ -143,41 +142,70 @@ func NewNode(config *cfg.Config) *Node {
 
        node.BaseService = *cmn.NewBaseService(nil, "Node", node)
 
+       if config.Simd.Enable {
+               tensority.UseSIMD = true
+       }
+
        return node
 }
 
-func initOrRecoverAccount(hsm *pseudohsm.HSM, wallet *w.Wallet) error {
-       xpubs := hsm.ListKeys()
-
-       if len(xpubs) == 0 {
-               xpub, err := hsm.XCreate("default", "123456")
-               if err != nil {
-                       return err
+// newPoolTxListener listener transaction from txPool, and send it to syncManager and wallet
+func newPoolTxListener(txPool *protocol.TxPool, syncManager *netsync.SyncManager, wallet *w.Wallet) {
+       txMsgCh := txPool.GetMsgCh()
+       syncManagerTxCh := syncManager.GetNewTxCh()
+
+       for {
+               msg := <-txMsgCh
+               switch msg.MsgType {
+               case protocol.MsgNewTx:
+                       syncManagerTxCh <- msg.Tx
+                       if wallet != nil {
+                               wallet.AddUnconfirmedTx(msg.TxDesc)
+                       }
+               case protocol.MsgRemoveTx:
+                       if wallet != nil {
+                               wallet.RemoveUnconfirmedTx(msg.TxDesc)
+                       }
+               default:
+                       log.Warn("got unknow message type from the txPool channel")
                }
-
-               wallet.AccountMgr.Create(nil, []chainkd.XPub{xpub.XPub}, 1, "default")
-               return nil
        }
+}
 
-       accounts, err := wallet.AccountMgr.ListAccounts("")
+// Lock data directory after daemonization
+func lockDataDirectory(config *cfg.Config) error {
+       _, _, err := flock.New(filepath.Join(config.RootDir, "LOCK"))
        if err != nil {
-               return err
+               return errors.New("datadir already used by another process")
        }
+       return nil
+}
 
-       if len(accounts) > 0 {
-               return nil
+func initActiveNetParams(config *cfg.Config) {
+       var exist bool
+       consensus.ActiveNetParams, exist = consensus.NetParams[config.ChainID]
+       if !exist {
+               cmn.Exit(cmn.Fmt("chain_id[%v] don't exist", config.ChainID))
        }
+}
 
-       for i, xPub := range xpubs {
-               if err := wallet.ImportAccountXpubKey(i, xPub, w.RecoveryIndex); err != nil {
-                       return err
-               }
+func initLogFile(config *cfg.Config) {
+       if config.LogFile == "" {
+               return
        }
-       return nil
+       cmn.EnsureDir(filepath.Dir(config.LogFile), 0700)
+       file, err := os.OpenFile(config.LogFile, os.O_CREATE|os.O_WRONLY|os.O_APPEND, 0666)
+       if err == nil {
+               log.SetOutput(file)
+       } else {
+               log.WithField("err", err).Info("using default")
+       }
+
 }
 
 // Lanch web broser or not
-func lanchWebBroser() {
+func launchWebBrowser(port string) {
+       webAddress := webHost + ":" + port
        log.Info("Launching System Browser with :", webAddress)
        if err := browser.Open(webAddress); err != nil {
                log.Error(err.Error())
@@ -189,19 +217,30 @@ func (n *Node) initAndstartApiServer() {
        n.api = api.NewAPI(n.syncManager, n.wallet, n.txfeed, n.cpuMiner, n.miningPool, n.chain, n.config, n.accessTokens)
 
        listenAddr := env.String("LISTEN", n.config.ApiAddress)
+       env.Parse()
        n.api.StartServer(*listenAddr)
 }
 
 func (n *Node) OnStart() error {
        if n.miningEnable {
-               n.cpuMiner.Start()
+               if _, err := n.wallet.AccountMgr.GetMiningAddress(); err != nil {
+                       n.miningEnable = false
+                       log.Error(err)
+               } else {
+                       n.cpuMiner.Start()
+               }
+       }
+       if !n.config.VaultMode {
+               n.syncManager.Start()
        }
-       n.syncManager.Start()
        n.initAndstartApiServer()
        if !n.config.Web.Closed {
-               lanchWebBroser()
+               s := strings.Split(n.config.ApiAddress, ":")
+               if len(s) != 2 {
+                       log.Error("Invalid api address")
+               }
+               launchWebBrowser(s[1])
        }
-
        return nil
 }
 
@@ -210,9 +249,9 @@ func (n *Node) OnStop() {
        if n.miningEnable {
                n.cpuMiner.Stop()
        }
-       n.syncManager.Stop()
-       log.Info("Stopping Node")
-       // TODO: gracefully disconnect from peers.
+       if !n.config.VaultMode {
+               n.syncManager.Stop()
+       }
 }
 
 func (n *Node) RunForever() {
@@ -222,10 +261,6 @@ func (n *Node) RunForever() {
        })
 }
 
-func (n *Node) EventSwitch() types.EventSwitch {
-       return n.evsw
-}
-
 func (n *Node) SyncManager() *netsync.SyncManager {
        return n.syncManager
 }
@@ -233,5 +268,3 @@ func (n *Node) SyncManager() *netsync.SyncManager {
 func (n *Node) MiningPool() *miningpool.MiningPool {
        return n.miningPool
 }
-
-//------------------------------------------------------------------------------