package node
import (
- "context"
- "encoding/json"
+ "encoding/hex"
"errors"
"net"
"net/http"
_ "net/http/pprof"
"os"
"path/filepath"
- "strings"
- "time"
+ "reflect"
"github.com/prometheus/prometheus/util/flock"
log "github.com/sirupsen/logrus"
cmn "github.com/tendermint/tmlibs/common"
- dbm "github.com/tendermint/tmlibs/db"
browser "github.com/toqueteos/webbrowser"
"github.com/vapor/accesstoken"
"github.com/vapor/api"
"github.com/vapor/asset"
"github.com/vapor/blockchain/pseudohsm"
- "github.com/vapor/blockchain/txfeed"
- "github.com/vapor/common"
cfg "github.com/vapor/config"
"github.com/vapor/consensus"
- engine "github.com/vapor/consensus/consensus"
- "github.com/vapor/consensus/consensus/dpos"
- "github.com/vapor/crypto/ed25519/chainkd"
- "github.com/vapor/database/leveldb"
+ "github.com/vapor/database"
+ dbm "github.com/vapor/database/leveldb"
"github.com/vapor/env"
- "github.com/vapor/mining/miner"
- "github.com/vapor/mining/miningpool"
+ "github.com/vapor/event"
"github.com/vapor/net/websocket"
"github.com/vapor/netsync"
+ "github.com/vapor/proposal/blockproposer"
"github.com/vapor/protocol"
- "github.com/vapor/protocol/bc"
- "github.com/vapor/util"
+ "github.com/vapor/protocol/bc/types"
w "github.com/vapor/wallet"
)
const (
- webHost = "http://127.0.0.1"
- maxNewBlockChSize = 1024
+ webHost = "http://127.0.0.1"
+ logModule = "node"
)
-var consensusEngine engine.Engine
-
+// Node represent bytom node
type Node struct {
cmn.BaseService
- // config
- config *cfg.Config
-
- syncManager *netsync.SyncManager
+ config *cfg.Config
+ eventDispatcher *event.Dispatcher
+ syncManager *netsync.SyncManager
- //bcReactor *bc.BlockchainReactor
wallet *w.Wallet
accessTokens *accesstoken.CredentialStore
notificationMgr *websocket.WSNotificationManager
api *api.API
chain *protocol.Chain
- txfeed *txfeed.Tracker
- //cpuMiner *cpuminer.CPUMiner
- miner *miner.Miner
-
- miningPool *miningpool.MiningPool
- miningEnable bool
-
- newBlockCh chan *bc.Hash
+ cpuMiner *blockproposer.BlockProposer
+ miningEnable bool
}
+// NewNode create bytom node
func NewNode(config *cfg.Config) *Node {
- ctx := context.Background()
if err := lockDataDirectory(config); err != nil {
cmn.Exit("Error: " + err.Error())
}
+
+ if err := cfg.LoadFederationFile(config.FederationFile(), config); err != nil {
+ cmn.Exit(cmn.Fmt("Failed to load federated information:[%s]", err.Error()))
+ }
+
+ log.WithFields(log.Fields{
+ "module": logModule,
+ "pubkey": config.PrivateKey().XPub(),
+ "fed_xpubs": config.Federation.Xpubs,
+ "fed_quorum": config.Federation.Quorum,
+ "fed_controlprogram": hex.EncodeToString(cfg.FederationWScript(config)),
+ }).Info()
+
initLogFile(config)
initActiveNetParams(config)
- initConsensusConfig(config)
initCommonConfig(config)
- util.MainchainConfig = config.MainChain
- util.ValidatePegin = config.ValidatePegin
// Get store
if config.DBBackend != "memdb" && config.DBBackend != "leveldb" {
cmn.Exit(cmn.Fmt("Param db_backend [%v] is invalid, use leveldb or memdb", config.DBBackend))
}
coreDB := dbm.NewDB("core", config.DBBackend, config.DBDir())
- store := leveldb.NewStore(coreDB)
+ store := database.NewStore(coreDB)
tokenDB := dbm.NewDB("accesstoken", config.DBBackend, config.DBDir())
accessTokens := accesstoken.NewStore(tokenDB)
- txPool := protocol.NewTxPool(store)
- chain, err := protocol.NewChain(store, txPool)
+ dispatcher := event.NewDispatcher()
+ txPool := protocol.NewTxPool(store, dispatcher)
+ chain, err := protocol.NewChain(store, txPool, dispatcher)
if err != nil {
cmn.Exit(cmn.Fmt("Failed to create chain structure: %v", err))
}
- var accounts *account.Manager = nil
- var assets *asset.Registry = nil
- var wallet *w.Wallet = nil
- var txFeed *txfeed.Tracker = nil
-
- txFeedDB := dbm.NewDB("txfeeds", config.DBBackend, config.DBDir())
- txFeed = txfeed.NewTracker(txFeedDB, chain)
-
- if err = txFeed.Prepare(ctx); err != nil {
- log.WithField("error", err).Error("start txfeed")
- return nil
+ if err := checkConfig(chain, config); err != nil {
+ panic(err)
}
+ var accounts *account.Manager
+ var assets *asset.Registry
+ var wallet *w.Wallet
+
hsm, err := pseudohsm.New(config.KeysDir())
if err != nil {
cmn.Exit(cmn.Fmt("initialize HSM failed: %v", err))
}
if !config.Wallet.Disable {
- address, err := common.DecodeAddress(config.Consensus.Dpos.Coinbase, &consensus.ActiveNetParams)
- if err != nil {
- cmn.Exit(cmn.Fmt("DecodeAddress: %v", err))
- }
walletDB := dbm.NewDB("wallet", config.DBBackend, config.DBDir())
- accounts = account.NewManager(walletDB, chain)
+ walletStore := database.NewWalletStore(walletDB)
+ accountStore := database.NewAccountStore(walletDB)
+ accounts = account.NewManager(accountStore, chain)
assets = asset.NewRegistry(walletDB, chain)
- wallet, err = w.NewWallet(walletDB, accounts, assets, hsm, chain, address)
+ wallet, err = w.NewWallet(walletStore, accounts, assets, hsm, chain, dispatcher, config.Wallet.TxIndex)
if err != nil {
- log.WithField("error", err).Error("init NewWallet")
+ log.WithFields(log.Fields{"module": logModule, "error": err}).Error("init NewWallet")
}
// trigger rescan wallet
wallet.RescanBlocks()
}
}
- newBlockCh := make(chan *bc.Hash, maxNewBlockChSize)
-
- syncManager, _ := netsync.NewSyncManager(config, chain, txPool, newBlockCh)
-
- notificationMgr := websocket.NewWsNotificationManager(config.Websocket.MaxNumWebsockets, config.Websocket.MaxNumConcurrentReqs, chain)
+ fastSyncDB := dbm.NewDB("fastsync", config.DBBackend, config.DBDir())
+ syncManager, err := netsync.NewSyncManager(config, chain, txPool, dispatcher,fastSyncDB)
+ if err != nil {
+ cmn.Exit(cmn.Fmt("Failed to create sync manager: %v", err))
+ }
- // get transaction from txPool and send it to syncManager and wallet
- go newPoolTxListener(txPool, syncManager, wallet, notificationMgr)
+ notificationMgr := websocket.NewWsNotificationManager(config.Websocket.MaxNumWebsockets, config.Websocket.MaxNumConcurrentReqs, chain, dispatcher)
// run the profile server
profileHost := config.ProfListenAddress
}
node := &Node{
- config: config,
- syncManager: syncManager,
- accessTokens: accessTokens,
- wallet: wallet,
- chain: chain,
- txfeed: txFeed,
- miningEnable: config.Mining,
-
- newBlockCh: newBlockCh,
+ eventDispatcher: dispatcher,
+ config: config,
+ syncManager: syncManager,
+ accessTokens: accessTokens,
+ wallet: wallet,
+ chain: chain,
+ miningEnable: config.Mining,
+
notificationMgr: notificationMgr,
}
- //node.cpuMiner = cpuminer.NewCPUMiner(chain, accounts, txPool, newBlockCh)
- consensusEngine = createConsensusEngine(config, store)
- node.miner = miner.NewMiner(chain, accounts, txPool, newBlockCh, consensusEngine)
- node.miningPool = miningpool.NewMiningPool(chain, accounts, txPool, newBlockCh)
-
+ node.cpuMiner = blockproposer.NewBlockProposer(chain, accounts, txPool, dispatcher)
node.BaseService = *cmn.NewBaseService(nil, "Node", node)
-
return node
}
-// newPoolTxListener listener transaction from txPool, and send it to syncManager and wallet
-func newPoolTxListener(txPool *protocol.TxPool, syncManager *netsync.SyncManager, wallet *w.Wallet, notificationMgr *websocket.WSNotificationManager) {
- txMsgCh := txPool.GetMsgCh()
- syncManagerTxCh := syncManager.GetNewTxCh()
-
- for {
- msg := <-txMsgCh
- switch msg.MsgType {
- case protocol.MsgNewTx:
- syncManagerTxCh <- msg.Tx
- if wallet != nil {
- wallet.AddUnconfirmedTx(msg.TxDesc)
- }
- notificationMgr.NotifyMempoolTx(msg.Tx)
- case protocol.MsgRemoveTx:
- if wallet != nil {
- wallet.RemoveUnconfirmedTx(msg.TxDesc)
- }
- default:
- log.Warn("got unknow message type from the txPool channel")
+// find whether config xpubs equal genesis block xpubs
+func checkConfig(chain *protocol.Chain, config *cfg.Config) error {
+ fedpegScript := cfg.FederationWScript(config)
+ genesisBlock, err := chain.GetBlockByHeight(0)
+ if err != nil {
+ return err
+ }
+ typedInput := genesisBlock.Transactions[0].Inputs[0].TypedInput
+ if v, ok := typedInput.(*types.CoinbaseInput); ok {
+ if !reflect.DeepEqual(fedpegScript, v.Arbitrary) {
+ return errors.New("config xpubs don't equal genesis block xpubs.")
}
}
+ return nil
}
// Lock data directory after daemonization
if !exist {
cmn.Exit(cmn.Fmt("chain_id[%v] don't exist", config.ChainID))
}
- if config.Side.FedpegXPubs != "" {
- var federationRedeemXPubs []chainkd.XPub
- fedpegXPubs := strings.Split(config.Side.FedpegXPubs, ",")
- for _, xpubStr := range fedpegXPubs {
- var xpub chainkd.XPub
- xpub.UnmarshalText([]byte(xpubStr))
- federationRedeemXPubs = append(federationRedeemXPubs, xpub)
- }
- consensus.ActiveNetParams.FedpegXPubs = federationRedeemXPubs
- }
-
- consensus.ActiveNetParams.Signer = config.Signer
- consensus.ActiveNetParams.PeginMinDepth = config.Side.PeginMinDepth
- consensus.ActiveNetParams.ParentGenesisBlockHash = config.Side.ParentGenesisBlockHash
}
func initLogFile(config *cfg.Config) {
if err == nil {
log.SetOutput(file)
} else {
- log.WithField("err", err).Info("using default")
+ log.WithFields(log.Fields{"module": logModule, "err": err}).Info("using default")
}
}
}
}
-func (n *Node) initAndstartApiServer() {
- n.api = api.NewAPI(n.syncManager, n.wallet, n.txfeed, n.miner, n.miningPool, n.chain, n.config, n.accessTokens, n.newBlockCh, n.notificationMgr)
+func (n *Node) initAndstartAPIServer() {
+ n.api = api.NewAPI(n.syncManager, n.wallet, n.cpuMiner, n.chain, n.config, n.accessTokens, n.eventDispatcher, n.notificationMgr)
listenAddr := env.String("LISTEN", n.config.ApiAddress)
env.Parse()
n.miningEnable = false
log.Error(err)
} else {
- //n.cpuMiner.Start()
- n.miner.Start()
+ n.cpuMiner.Start()
}
}
if !n.config.VaultMode {
- n.syncManager.Start()
+ if err := n.syncManager.Start(); err != nil {
+ return err
+ }
+ }
+
+ n.initAndstartAPIServer()
+ if err := n.notificationMgr.Start(); err != nil {
+ return err
}
- n.initAndstartApiServer()
- n.notificationMgr.Start()
+
if !n.config.Web.Closed {
_, port, err := net.SplitHostPort(n.config.ApiAddress)
if err != nil {
}
launchWebBrowser(port)
}
- go bytomdRPCCheck()
return nil
}
n.notificationMgr.WaitForShutdown()
n.BaseService.OnStop()
if n.miningEnable {
- n.miner.Stop()
+ n.cpuMiner.Stop()
}
if !n.config.VaultMode {
n.syncManager.Stop()
}
+ n.eventDispatcher.Stop()
}
func (n *Node) RunForever() {
n.Stop()
})
}
-
-func (n *Node) SyncManager() *netsync.SyncManager {
- return n.syncManager
-}
-
-func (n *Node) MiningPool() *miningpool.MiningPool {
- return n.miningPool
-}
-
-/**bytomdRPCCheck Check if bytomd connection via RPC is correctly working*/
-func bytomdRPCCheck() bool {
- type Req struct {
- BlockHeight uint64 `json:"block_height"`
- }
- if util.ValidatePegin {
- for {
- resp, err := util.CallRPC("/get-merkle-proof", &Req{BlockHeight: 0})
- if err != nil {
- log.Error("Call mainchain interface get-block-header failed")
- time.Sleep(time.Millisecond * 1000)
- continue
- }
- tmp, _ := json.Marshal(resp)
- var blockHeader api.GetBlockHeaderResp
- json.Unmarshal(tmp, &blockHeader)
- hash := blockHeader.BlockHeader.Hash()
- if strings.Compare(consensus.ActiveNetParams.ParentGenesisBlockHash, hash.String()) != 0 {
- log.Error("Invalid parent genesis block hash response via RPC. Contacting wrong parent daemon?", consensus.ActiveNetParams.ParentGenesisBlockHash, hash.String())
- return false
- }
- break
- }
- }
-
- return true
-}
-
-func initConsensusConfig(config *cfg.Config) {
- if config.ConsensusConfigFile == "" {
- // poa
- } else {
- //
- file, err := os.Open(config.ConsensusConfigFile)
- if err != nil {
- cmn.Exit(cmn.Fmt("Failed to read consensus file: %v", err))
- }
- defer file.Close()
-
- if err := json.NewDecoder(file).Decode(config); err != nil {
- cmn.Exit(cmn.Fmt("invalid consensus file: %v", err))
- }
-
- for _, v := range config.Consensus.Dpos.SelfVoteSigners {
- address, err := common.DecodeAddress(v, &consensus.ActiveNetParams)
- if err != nil {
- cmn.Exit(cmn.Fmt("Address resolution failed: %v", err))
- }
- config.Consensus.Dpos.Signers = append(config.Consensus.Dpos.Signers, address)
- }
- }
-}
-
-func createConsensusEngine(config *cfg.Config, store protocol.Store) engine.Engine {
- if config.Consensus.Dpos != nil {
- return dpos.New(config.Consensus.Dpos, store)
- } else {
- return nil
- }
-}
-
-func GetConsensusEngine() engine.Engine {
- return consensusEngine
-}