15 "github.com/prometheus/prometheus/util/flock"
16 log "github.com/sirupsen/logrus"
17 cmn "github.com/tendermint/tmlibs/common"
18 browser "github.com/toqueteos/webbrowser"
20 "github.com/vapor/accesstoken"
21 "github.com/vapor/account"
22 "github.com/vapor/api"
23 "github.com/vapor/asset"
24 "github.com/vapor/blockchain/pseudohsm"
25 "github.com/vapor/blockchain/txfeed"
26 "github.com/vapor/common"
27 cfg "github.com/vapor/config"
28 "github.com/vapor/consensus"
29 "github.com/vapor/crypto/ed25519/chainkd"
30 "github.com/vapor/database"
31 dbm "github.com/vapor/database/db"
32 _ "github.com/vapor/database/leveldb"
33 "github.com/vapor/database/orm"
34 _ "github.com/vapor/database/sqlite"
35 "github.com/vapor/env"
36 "github.com/vapor/mining/miner"
37 "github.com/vapor/net/websocket"
38 "github.com/vapor/netsync"
39 "github.com/vapor/protocol"
40 "github.com/vapor/protocol/bc"
41 "github.com/vapor/util"
42 w "github.com/vapor/wallet"
46 webHost = "http://127.0.0.1"
47 maxNewBlockChSize = 1024
57 syncManager *netsync.SyncManager
59 //bcReactor *bc.BlockchainReactor
61 accessTokens *accesstoken.CredentialStore
62 notificationMgr *websocket.WSNotificationManager
65 txfeed *txfeed.Tracker
66 //cpuMiner *cpuminer.CPUMiner
71 newBlockCh chan *bc.Hash
74 func NewNode(config *cfg.Config) *Node {
75 ctx := context.Background()
76 if err := lockDataDirectory(config); err != nil {
77 cmn.Exit("Error: " + err.Error())
80 initActiveNetParams(config)
81 initConsensusConfig(config)
82 initCommonConfig(config)
84 util.MainchainConfig = config.MainChain
85 util.ValidatePegin = config.ValidatePegin
87 if config.DBBackend != "memdb" && config.DBBackend != "leveldb" {
88 cmn.Exit(cmn.Fmt("Param db_backend [%v] is invalid, use leveldb or memdb", config.DBBackend))
91 sqlDB := dbm.NewSqlDB("sql", "sqlitedb", config.DBDir())
92 initDatabaseTable(sqlDB)
93 sqlStore := database.NewSQLStore(sqlDB)
95 tokenDB := dbm.NewDB("accesstoken", config.DBBackend, config.DBDir())
96 accessTokens := accesstoken.NewStore(tokenDB)
98 txPool := protocol.NewTxPool(sqlStore)
99 chain, err := protocol.NewChain(sqlStore, txPool)
101 cmn.Exit(cmn.Fmt("Failed to create chain structure: %v", err))
104 switch config.Consensus.Type {
106 initDpos(chain, config)
109 var accounts *account.Manager = nil
110 var assets *asset.Registry = nil
111 var wallet *w.Wallet = nil
112 var txFeed *txfeed.Tracker = nil
114 txFeedDB := dbm.NewDB("txfeeds", config.DBBackend, config.DBDir())
115 txFeed = txfeed.NewTracker(txFeedDB, chain)
117 if err = txFeed.Prepare(ctx); err != nil {
118 log.WithField("error", err).Error("start txfeed")
122 hsm, err := pseudohsm.New(config.KeysDir())
124 cmn.Exit(cmn.Fmt("initialize HSM failed: %v", err))
127 if !config.Wallet.Disable {
128 address, err := common.DecodeAddress(config.Consensus.Coinbase, &consensus.ActiveNetParams)
130 cmn.Exit(cmn.Fmt("DecodeAddress: %v", err))
132 walletDB := dbm.NewDB("wallet", config.DBBackend, config.DBDir())
133 accounts = account.NewManager(walletDB, chain)
134 assets = asset.NewRegistry(walletDB, chain)
135 wallet, err = w.NewWallet(walletDB, accounts, assets, hsm, chain, address)
137 log.WithField("error", err).Error("init NewWallet")
140 // trigger rescan wallet
141 if config.Wallet.Rescan {
142 wallet.RescanBlocks()
145 newBlockCh := make(chan *bc.Hash, maxNewBlockChSize)
147 syncManager, _ := netsync.NewSyncManager(config, chain, txPool, newBlockCh)
149 notificationMgr := websocket.NewWsNotificationManager(config.Websocket.MaxNumWebsockets, config.Websocket.MaxNumConcurrentReqs, chain)
151 // get transaction from txPool and send it to syncManager and wallet
152 go newPoolTxListener(txPool, syncManager, wallet, notificationMgr)
154 // run the profile server
155 profileHost := config.ProfListenAddress
156 if profileHost != "" {
157 // Profiling bytomd programs.see (https://blog.golang.org/profiling-go-programs)
158 // go tool pprof http://profileHose/debug/pprof/heap
160 if err = http.ListenAndServe(profileHost, nil); err != nil {
161 cmn.Exit(cmn.Fmt("Failed to register tcp profileHost: %v", err))
168 syncManager: syncManager,
169 accessTokens: accessTokens,
173 miningEnable: config.Mining,
175 newBlockCh: newBlockCh,
176 notificationMgr: notificationMgr,
179 node.miner = miner.NewMiner(chain, accounts, txPool, newBlockCh)
180 node.BaseService = *cmn.NewBaseService(nil, "Node", node)
185 // newPoolTxListener listener transaction from txPool, and send it to syncManager and wallet
186 func newPoolTxListener(txPool *protocol.TxPool, syncManager *netsync.SyncManager, wallet *w.Wallet, notificationMgr *websocket.WSNotificationManager) {
187 txMsgCh := txPool.GetMsgCh()
188 syncManagerTxCh := syncManager.GetNewTxCh()
193 case protocol.MsgNewTx:
194 syncManagerTxCh <- msg.Tx
196 wallet.AddUnconfirmedTx(msg.TxDesc)
198 notificationMgr.NotifyMempoolTx(msg.Tx)
199 case protocol.MsgRemoveTx:
201 wallet.RemoveUnconfirmedTx(msg.TxDesc)
204 log.Warn("got unknow message type from the txPool channel")
209 // Lock data directory after daemonization
210 func lockDataDirectory(config *cfg.Config) error {
211 _, _, err := flock.New(filepath.Join(config.RootDir, "LOCK"))
213 return errors.New("datadir already used by another process")
218 func initActiveNetParams(config *cfg.Config) {
220 consensus.ActiveNetParams, exist = consensus.NetParams[config.ChainID]
222 cmn.Exit(cmn.Fmt("chain_id[%v] don't exist", config.ChainID))
224 if config.Side.FedpegXPubs != "" {
225 var federationRedeemXPubs []chainkd.XPub
226 fedpegXPubs := strings.Split(config.Side.FedpegXPubs, ",")
227 for _, xpubStr := range fedpegXPubs {
228 var xpub chainkd.XPub
229 xpub.UnmarshalText([]byte(xpubStr))
230 federationRedeemXPubs = append(federationRedeemXPubs, xpub)
232 consensus.ActiveNetParams.FedpegXPubs = federationRedeemXPubs
235 consensus.ActiveNetParams.Signer = config.Signer
236 consensus.ActiveNetParams.PeginMinDepth = config.Side.PeginMinDepth
237 consensus.ActiveNetParams.ParentGenesisBlockHash = config.Side.ParentGenesisBlockHash
240 func initLogFile(config *cfg.Config) {
241 if config.LogFile == "" {
244 cmn.EnsureDir(filepath.Dir(config.LogFile), 0700)
245 file, err := os.OpenFile(config.LogFile, os.O_CREATE|os.O_WRONLY|os.O_APPEND, 0666)
249 log.WithField("err", err).Info("using default")
254 func initCommonConfig(config *cfg.Config) {
255 cfg.CommonConfig = config
258 // Lanch web broser or not
259 func launchWebBrowser(port string) {
260 webAddress := webHost + ":" + port
261 log.Info("Launching System Browser with :", webAddress)
262 if err := browser.Open(webAddress); err != nil {
263 log.Error(err.Error())
268 func (n *Node) initAndstartApiServer() {
269 n.api = api.NewAPI(n.syncManager, n.wallet, n.txfeed, n.miner, n.chain, n.config, n.accessTokens, n.newBlockCh, n.notificationMgr)
271 listenAddr := env.String("LISTEN", n.config.ApiAddress)
273 n.api.StartServer(*listenAddr)
276 func (n *Node) OnStart() error {
278 if _, err := n.wallet.AccountMgr.GetMiningAddress(); err != nil {
279 n.miningEnable = false
286 if !n.config.VaultMode {
287 n.syncManager.Start()
289 n.initAndstartApiServer()
290 n.notificationMgr.Start()
291 if !n.config.Web.Closed {
292 _, port, err := net.SplitHostPort(n.config.ApiAddress)
294 log.Error("Invalid api address")
297 launchWebBrowser(port)
303 func (n *Node) OnStop() {
304 if err := n.chain.Engine.Finish(); err != nil {
305 log.Errorf("OnStop: %v", err)
308 n.notificationMgr.Shutdown()
309 n.notificationMgr.WaitForShutdown()
310 n.BaseService.OnStop()
314 if !n.config.VaultMode {
321 func (n *Node) RunForever() {
322 // Sleep forever and then...
323 cmn.TrapSignal(func() {
328 func (n *Node) SyncManager() *netsync.SyncManager {
332 /**bytomdRPCCheck Check if bytomd connection via RPC is correctly working*/
333 func bytomdRPCCheck() bool {
335 BlockHeight uint64 `json:"block_height"`
337 if util.ValidatePegin {
339 resp, err := util.CallRPC("/get-merkle-proof", &Req{BlockHeight: 0})
341 log.Error("Call mainchain interface get-block-header failed")
342 time.Sleep(time.Millisecond * 1000)
345 tmp, _ := json.Marshal(resp)
346 var blockHeader api.GetBlockHeaderResp
347 json.Unmarshal(tmp, &blockHeader)
348 hash := blockHeader.BlockHeader.Hash()
349 if strings.Compare(consensus.ActiveNetParams.ParentGenesisBlockHash, hash.String()) != 0 {
350 log.Error("Invalid parent genesis block hash response via RPC. Contacting wrong parent daemon?", consensus.ActiveNetParams.ParentGenesisBlockHash, ":", hash.String())
360 func initConsensusConfig(config *cfg.Config) {
361 if config.ConsensusConfigFile == "" {
365 file, err := os.Open(config.ConsensusConfigFile)
367 cmn.Exit(cmn.Fmt("Failed to read consensus file: %v", err))
371 if err := json.NewDecoder(file).Decode(config); err != nil {
372 cmn.Exit(cmn.Fmt("invalid consensus file: %v", err))
375 for _, v := range config.Consensus.SelfVoteSigners {
376 address, err := common.DecodeAddress(v, &consensus.ActiveNetParams)
378 cmn.Exit(cmn.Fmt("Address resolution failed: %v", err))
380 config.Consensus.Signers = append(config.Consensus.Signers, address)
385 func initDpos(chain *protocol.Chain, config *cfg.Config) {
386 header := chain.BestBlockHeader()
387 height := header.Height
388 hash := header.Hash()
389 maxSignerCount := config.Consensus.MaxSignerCount
390 period := config.Consensus.Period
391 err := chain.Engine.Init(chain, maxSignerCount, period, height, hash)
394 oldBlockHeight := chain.Engine.GetOldBlockHeight()
395 oldBlockHash := chain.Engine.GetOldBlockHash()
398 header, _ = chain.GetHeaderByHeight(oldBlockHeight)
399 oldBlockHash = header.Hash()
402 if err := chain.RepairDPoSData(oldBlockHeight, oldBlockHash); err != nil {
403 cmn.Exit(cmn.Fmt("initVote failed: %v", err))
408 func initDatabaseTable(db dbm.SQLDB) {
409 db.Db().AutoMigrate(&orm.BlockHeader{}, &orm.Transaction{}, &orm.BlockStoreState{}, &orm.ClaimTxState{}, &orm.Utxo{})