15 "github.com/prometheus/prometheus/util/flock"
16 log "github.com/sirupsen/logrus"
17 cmn "github.com/tendermint/tmlibs/common"
18 dbm "github.com/tendermint/tmlibs/db"
19 browser "github.com/toqueteos/webbrowser"
21 "github.com/vapor/accesstoken"
22 "github.com/vapor/account"
23 "github.com/vapor/api"
24 "github.com/vapor/asset"
25 "github.com/vapor/blockchain/pseudohsm"
26 "github.com/vapor/blockchain/txfeed"
27 "github.com/vapor/common"
28 cfg "github.com/vapor/config"
29 "github.com/vapor/consensus"
30 engine "github.com/vapor/consensus/consensus"
31 "github.com/vapor/consensus/consensus/dpos"
32 "github.com/vapor/crypto/ed25519/chainkd"
33 "github.com/vapor/database/leveldb"
34 "github.com/vapor/env"
35 "github.com/vapor/mining/miner"
36 "github.com/vapor/mining/miningpool"
37 "github.com/vapor/net/websocket"
38 "github.com/vapor/netsync"
39 "github.com/vapor/protocol"
40 "github.com/vapor/protocol/bc"
41 "github.com/vapor/util"
42 w "github.com/vapor/wallet"
46 webHost = "http://127.0.0.1"
47 maxNewBlockChSize = 1024
50 var consensusEngine engine.Engine
58 syncManager *netsync.SyncManager
60 //bcReactor *bc.BlockchainReactor
62 accessTokens *accesstoken.CredentialStore
63 notificationMgr *websocket.WSNotificationManager
66 txfeed *txfeed.Tracker
67 //cpuMiner *cpuminer.CPUMiner
70 miningPool *miningpool.MiningPool
73 newBlockCh chan *bc.Hash
76 func NewNode(config *cfg.Config) *Node {
77 ctx := context.Background()
78 if err := lockDataDirectory(config); err != nil {
79 cmn.Exit("Error: " + err.Error())
82 initActiveNetParams(config)
83 initConsensusConfig(config)
84 initCommonConfig(config)
86 util.MainchainConfig = config.MainChain
87 util.ValidatePegin = config.ValidatePegin
89 if config.DBBackend != "memdb" && config.DBBackend != "leveldb" {
90 cmn.Exit(cmn.Fmt("Param db_backend [%v] is invalid, use leveldb or memdb", config.DBBackend))
92 coreDB := dbm.NewDB("core", config.DBBackend, config.DBDir())
93 store := leveldb.NewStore(coreDB)
95 tokenDB := dbm.NewDB("accesstoken", config.DBBackend, config.DBDir())
96 accessTokens := accesstoken.NewStore(tokenDB)
98 txPool := protocol.NewTxPool(store)
99 chain, err := protocol.NewChain(store, txPool)
101 cmn.Exit(cmn.Fmt("Failed to create chain structure: %v", err))
104 var accounts *account.Manager = nil
105 var assets *asset.Registry = nil
106 var wallet *w.Wallet = nil
107 var txFeed *txfeed.Tracker = nil
109 txFeedDB := dbm.NewDB("txfeeds", config.DBBackend, config.DBDir())
110 txFeed = txfeed.NewTracker(txFeedDB, chain)
112 if err = txFeed.Prepare(ctx); err != nil {
113 log.WithField("error", err).Error("start txfeed")
117 hsm, err := pseudohsm.New(config.KeysDir())
119 cmn.Exit(cmn.Fmt("initialize HSM failed: %v", err))
122 if !config.Wallet.Disable {
123 address, err := common.DecodeAddress(config.Consensus.Dpos.Coinbase, &consensus.ActiveNetParams)
125 cmn.Exit(cmn.Fmt("DecodeAddress: %v", err))
127 walletDB := dbm.NewDB("wallet", config.DBBackend, config.DBDir())
128 accounts = account.NewManager(walletDB, chain)
129 assets = asset.NewRegistry(walletDB, chain)
130 wallet, err = w.NewWallet(walletDB, accounts, assets, hsm, chain, address)
132 log.WithField("error", err).Error("init NewWallet")
135 // trigger rescan wallet
136 if config.Wallet.Rescan {
137 wallet.RescanBlocks()
140 newBlockCh := make(chan *bc.Hash, maxNewBlockChSize)
142 syncManager, _ := netsync.NewSyncManager(config, chain, txPool, newBlockCh)
144 notificationMgr := websocket.NewWsNotificationManager(config.Websocket.MaxNumWebsockets, config.Websocket.MaxNumConcurrentReqs, chain)
146 // get transaction from txPool and send it to syncManager and wallet
147 go newPoolTxListener(txPool, syncManager, wallet, notificationMgr)
149 // run the profile server
150 profileHost := config.ProfListenAddress
151 if profileHost != "" {
152 // Profiling bytomd programs.see (https://blog.golang.org/profiling-go-programs)
153 // go tool pprof http://profileHose/debug/pprof/heap
155 if err = http.ListenAndServe(profileHost, nil); err != nil {
156 cmn.Exit(cmn.Fmt("Failed to register tcp profileHost: %v", err))
163 syncManager: syncManager,
164 accessTokens: accessTokens,
168 miningEnable: config.Mining,
170 newBlockCh: newBlockCh,
171 notificationMgr: notificationMgr,
174 //node.cpuMiner = cpuminer.NewCPUMiner(chain, accounts, txPool, newBlockCh)
175 consensusEngine = createConsensusEngine(config, store)
176 node.miner = miner.NewMiner(chain, accounts, txPool, newBlockCh, consensusEngine)
177 node.miningPool = miningpool.NewMiningPool(chain, accounts, txPool, newBlockCh)
179 node.BaseService = *cmn.NewBaseService(nil, "Node", node)
184 // newPoolTxListener listener transaction from txPool, and send it to syncManager and wallet
185 func newPoolTxListener(txPool *protocol.TxPool, syncManager *netsync.SyncManager, wallet *w.Wallet, notificationMgr *websocket.WSNotificationManager) {
186 txMsgCh := txPool.GetMsgCh()
187 syncManagerTxCh := syncManager.GetNewTxCh()
192 case protocol.MsgNewTx:
193 syncManagerTxCh <- msg.Tx
195 wallet.AddUnconfirmedTx(msg.TxDesc)
197 notificationMgr.NotifyMempoolTx(msg.Tx)
198 case protocol.MsgRemoveTx:
200 wallet.RemoveUnconfirmedTx(msg.TxDesc)
203 log.Warn("got unknow message type from the txPool channel")
208 // Lock data directory after daemonization
209 func lockDataDirectory(config *cfg.Config) error {
210 _, _, err := flock.New(filepath.Join(config.RootDir, "LOCK"))
212 return errors.New("datadir already used by another process")
217 func initActiveNetParams(config *cfg.Config) {
219 consensus.ActiveNetParams, exist = consensus.NetParams[config.ChainID]
221 cmn.Exit(cmn.Fmt("chain_id[%v] don't exist", config.ChainID))
223 if config.Side.FedpegXPubs != "" {
224 var federationRedeemXPubs []chainkd.XPub
225 fedpegXPubs := strings.Split(config.Side.FedpegXPubs, ",")
226 for _, xpubStr := range fedpegXPubs {
227 var xpub chainkd.XPub
228 xpub.UnmarshalText([]byte(xpubStr))
229 federationRedeemXPubs = append(federationRedeemXPubs, xpub)
231 consensus.ActiveNetParams.FedpegXPubs = federationRedeemXPubs
234 if config.Side.SignBlockXPubs != "" {
235 var signBlockXPubs []chainkd.XPub
236 xPubs := strings.Split(config.Side.SignBlockXPubs, ",")
237 for _, xpubStr := range xPubs {
238 var xpub chainkd.XPub
239 xpub.UnmarshalText([]byte(xpubStr))
240 signBlockXPubs = append(signBlockXPubs, xpub)
242 consensus.ActiveNetParams.SignBlockXPubs = signBlockXPubs
245 consensus.ActiveNetParams.Signer = config.Signer
246 consensus.ActiveNetParams.PeginMinDepth = config.Side.PeginMinDepth
247 consensus.ActiveNetParams.ParentGenesisBlockHash = config.Side.ParentGenesisBlockHash
250 func initLogFile(config *cfg.Config) {
251 if config.LogFile == "" {
254 cmn.EnsureDir(filepath.Dir(config.LogFile), 0700)
255 file, err := os.OpenFile(config.LogFile, os.O_CREATE|os.O_WRONLY|os.O_APPEND, 0666)
259 log.WithField("err", err).Info("using default")
264 func initCommonConfig(config *cfg.Config) {
265 cfg.CommonConfig = config
268 // Lanch web broser or not
269 func launchWebBrowser(port string) {
270 webAddress := webHost + ":" + port
271 log.Info("Launching System Browser with :", webAddress)
272 if err := browser.Open(webAddress); err != nil {
273 log.Error(err.Error())
278 func (n *Node) initAndstartApiServer() {
279 n.api = api.NewAPI(n.syncManager, n.wallet, n.txfeed, n.miner, n.miningPool, n.chain, n.config, n.accessTokens, n.newBlockCh, n.notificationMgr)
281 listenAddr := env.String("LISTEN", n.config.ApiAddress)
283 n.api.StartServer(*listenAddr)
286 func (n *Node) OnStart() error {
288 if _, err := n.wallet.AccountMgr.GetMiningAddress(); err != nil {
289 n.miningEnable = false
296 if !n.config.VaultMode {
297 n.syncManager.Start()
299 n.initAndstartApiServer()
300 n.notificationMgr.Start()
301 if !n.config.Web.Closed {
302 _, port, err := net.SplitHostPort(n.config.ApiAddress)
304 log.Error("Invalid api address")
307 launchWebBrowser(port)
313 func (n *Node) OnStop() {
314 n.notificationMgr.Shutdown()
315 n.notificationMgr.WaitForShutdown()
316 n.BaseService.OnStop()
320 if !n.config.VaultMode {
325 func (n *Node) RunForever() {
326 // Sleep forever and then...
327 cmn.TrapSignal(func() {
332 func (n *Node) SyncManager() *netsync.SyncManager {
336 func (n *Node) MiningPool() *miningpool.MiningPool {
340 /**bytomdRPCCheck Check if bytomd connection via RPC is correctly working*/
341 func bytomdRPCCheck() bool {
343 BlockHeight uint64 `json:"block_height"`
345 if util.ValidatePegin {
347 resp, err := util.CallRPC("/get-merkle-proof", &Req{BlockHeight: 0})
349 log.Error("Call mainchain interface get-block-header failed")
350 time.Sleep(time.Millisecond * 1000)
353 tmp, _ := json.Marshal(resp)
354 var blockHeader api.GetBlockHeaderResp
355 json.Unmarshal(tmp, &blockHeader)
356 hash := blockHeader.BlockHeader.Hash()
357 if strings.Compare(consensus.ActiveNetParams.ParentGenesisBlockHash, hash.String()) != 0 {
358 log.Error("Invalid parent genesis block hash response via RPC. Contacting wrong parent daemon?", consensus.ActiveNetParams.ParentGenesisBlockHash, hash.String())
368 func initConsensusConfig(config *cfg.Config) {
369 if config.ConsensusConfigFile == "" {
373 file, err := os.Open(config.ConsensusConfigFile)
375 cmn.Exit(cmn.Fmt("Failed to read consensus file: %v", err))
379 if err := json.NewDecoder(file).Decode(config); err != nil {
380 cmn.Exit(cmn.Fmt("invalid consensus file: %v", err))
382 for _, v := range config.Consensus.Dpos.SelfVoteSigners {
383 address, err := common.DecodeAddress(v, &consensus.ActiveNetParams)
385 cmn.Exit(cmn.Fmt("Address resolution failed: %v", err))
387 config.Consensus.Dpos.Signers = append(config.Consensus.Dpos.Signers, address)
392 func createConsensusEngine(config *cfg.Config, store protocol.Store) engine.Engine {
393 if config.Consensus.Dpos != nil {
394 return dpos.New(config.Consensus.Dpos, store)
400 func GetConsensusEngine() engine.Engine {
401 return consensusEngine