16 "github.com/prometheus/prometheus/util/flock"
17 log "github.com/sirupsen/logrus"
18 cmn "github.com/tendermint/tmlibs/common"
19 dbm "github.com/tendermint/tmlibs/db"
20 browser "github.com/toqueteos/webbrowser"
22 "github.com/vapor/accesstoken"
23 "github.com/vapor/account"
24 "github.com/vapor/api"
25 "github.com/vapor/asset"
26 "github.com/vapor/blockchain/pseudohsm"
27 "github.com/vapor/blockchain/txfeed"
28 cfg "github.com/vapor/config"
29 "github.com/vapor/consensus"
30 "github.com/vapor/crypto/ed25519/chainkd"
31 "github.com/vapor/database/leveldb"
32 "github.com/vapor/env"
33 "github.com/vapor/mining/cpuminer"
34 "github.com/vapor/mining/miningpool"
35 "github.com/vapor/net/websocket"
36 "github.com/vapor/netsync"
37 "github.com/vapor/protocol"
38 "github.com/vapor/protocol/bc"
39 "github.com/vapor/util"
40 w "github.com/vapor/wallet"
44 webHost = "http://127.0.0.1"
45 maxNewBlockChSize = 1024
54 syncManager *netsync.SyncManager
56 //bcReactor *bc.BlockchainReactor
58 accessTokens *accesstoken.CredentialStore
59 notificationMgr *websocket.WSNotificationManager
62 txfeed *txfeed.Tracker
63 cpuMiner *cpuminer.CPUMiner
64 miningPool *miningpool.MiningPool
67 newBlockCh chan *bc.Hash
70 func NewNode(config *cfg.Config) *Node {
71 ctx := context.Background()
72 if err := lockDataDirectory(config); err != nil {
73 cmn.Exit("Error: " + err.Error())
76 initActiveNetParams(config)
77 initCommonConfig(config)
78 util.MainchainConfig = config.MainChain
79 util.ValidatePegin = config.ValidatePegin
81 if config.DBBackend != "memdb" && config.DBBackend != "leveldb" {
82 cmn.Exit(cmn.Fmt("Param db_backend [%v] is invalid, use leveldb or memdb", config.DBBackend))
84 coreDB := dbm.NewDB("core", config.DBBackend, config.DBDir())
85 store := leveldb.NewStore(coreDB)
87 tokenDB := dbm.NewDB("accesstoken", config.DBBackend, config.DBDir())
88 accessTokens := accesstoken.NewStore(tokenDB)
90 txPool := protocol.NewTxPool(store)
91 chain, err := protocol.NewChain(store, txPool)
93 cmn.Exit(cmn.Fmt("Failed to create chain structure: %v", err))
96 var accounts *account.Manager = nil
97 var assets *asset.Registry = nil
98 var wallet *w.Wallet = nil
99 var txFeed *txfeed.Tracker = nil
101 txFeedDB := dbm.NewDB("txfeeds", config.DBBackend, config.DBDir())
102 txFeed = txfeed.NewTracker(txFeedDB, chain)
104 if err = txFeed.Prepare(ctx); err != nil {
105 log.WithField("error", err).Error("start txfeed")
109 hsm, err := pseudohsm.New(config.KeysDir())
111 cmn.Exit(cmn.Fmt("initialize HSM failed: %v", err))
114 if !config.Wallet.Disable {
115 walletDB := dbm.NewDB("wallet", config.DBBackend, config.DBDir())
116 accounts = account.NewManager(walletDB, chain)
117 assets = asset.NewRegistry(walletDB, chain)
118 wallet, err = w.NewWallet(walletDB, accounts, assets, hsm, chain)
120 log.WithField("error", err).Error("init NewWallet")
123 // trigger rescan wallet
124 if config.Wallet.Rescan {
125 wallet.RescanBlocks()
128 newBlockCh := make(chan *bc.Hash, maxNewBlockChSize)
130 syncManager, _ := netsync.NewSyncManager(config, chain, txPool, newBlockCh)
132 notificationMgr := websocket.NewWsNotificationManager(config.Websocket.MaxNumWebsockets, config.Websocket.MaxNumConcurrentReqs, chain)
134 // get transaction from txPool and send it to syncManager and wallet
135 go newPoolTxListener(txPool, syncManager, wallet, notificationMgr)
137 // run the profile server
138 profileHost := config.ProfListenAddress
139 if profileHost != "" {
140 // Profiling bytomd programs.see (https://blog.golang.org/profiling-go-programs)
141 // go tool pprof http://profileHose/debug/pprof/heap
143 if err = http.ListenAndServe(profileHost, nil); err != nil {
144 cmn.Exit(cmn.Fmt("Failed to register tcp profileHost: %v", err))
151 syncManager: syncManager,
152 accessTokens: accessTokens,
156 miningEnable: config.Mining,
158 newBlockCh: newBlockCh,
159 notificationMgr: notificationMgr,
162 node.cpuMiner = cpuminer.NewCPUMiner(chain, accounts, txPool, newBlockCh)
163 node.miningPool = miningpool.NewMiningPool(chain, accounts, txPool, newBlockCh)
165 node.BaseService = *cmn.NewBaseService(nil, "Node", node)
170 // newPoolTxListener listener transaction from txPool, and send it to syncManager and wallet
171 func newPoolTxListener(txPool *protocol.TxPool, syncManager *netsync.SyncManager, wallet *w.Wallet, notificationMgr *websocket.WSNotificationManager) {
172 txMsgCh := txPool.GetMsgCh()
173 syncManagerTxCh := syncManager.GetNewTxCh()
178 case protocol.MsgNewTx:
179 syncManagerTxCh <- msg.Tx
181 wallet.AddUnconfirmedTx(msg.TxDesc)
183 notificationMgr.NotifyMempoolTx(msg.Tx)
184 case protocol.MsgRemoveTx:
186 wallet.RemoveUnconfirmedTx(msg.TxDesc)
189 log.Warn("got unknow message type from the txPool channel")
194 // Lock data directory after daemonization
195 func lockDataDirectory(config *cfg.Config) error {
196 _, _, err := flock.New(filepath.Join(config.RootDir, "LOCK"))
198 return errors.New("datadir already used by another process")
203 func initActiveNetParams(config *cfg.Config) {
205 consensus.ActiveNetParams, exist = consensus.NetParams[config.ChainID]
207 cmn.Exit(cmn.Fmt("chain_id[%v] don't exist", config.ChainID))
209 if config.Side.FedpegXPubs != "" {
210 var federationRedeemXPubs []chainkd.XPub
211 fedpegXPubs := strings.Split(config.Side.FedpegXPubs, ",")
212 for _, xpubStr := range fedpegXPubs {
213 var xpub chainkd.XPub
214 xpub.UnmarshalText([]byte(xpubStr))
215 federationRedeemXPubs = append(federationRedeemXPubs, xpub)
217 consensus.ActiveNetParams.FedpegXPubs = federationRedeemXPubs
220 if config.Side.SignBlockXPubs != "" {
221 var signBlockXPubs []chainkd.XPub
222 fmt.Println(signBlockXPubs)
223 xPubs := strings.Split(config.Side.SignBlockXPubs, ",")
224 for _, xpubStr := range xPubs {
225 var xpub chainkd.XPub
226 xpub.UnmarshalText([]byte(xpubStr))
227 signBlockXPubs = append(signBlockXPubs, xpub)
229 consensus.ActiveNetParams.SignBlockXPubs = signBlockXPubs
232 consensus.ActiveNetParams.Signer = config.Signer
233 consensus.ActiveNetParams.PeginMinDepth = config.Side.PeginMinDepth
234 consensus.ActiveNetParams.ParentGenesisBlockHash = config.Side.ParentGenesisBlockHash
237 func initLogFile(config *cfg.Config) {
238 if config.LogFile == "" {
241 cmn.EnsureDir(filepath.Dir(config.LogFile), 0700)
242 file, err := os.OpenFile(config.LogFile, os.O_CREATE|os.O_WRONLY|os.O_APPEND, 0666)
246 log.WithField("err", err).Info("using default")
251 func initCommonConfig(config *cfg.Config) {
252 cfg.CommonConfig = config
255 // Lanch web broser or not
256 func launchWebBrowser(port string) {
257 webAddress := webHost + ":" + port
258 log.Info("Launching System Browser with :", webAddress)
259 if err := browser.Open(webAddress); err != nil {
260 log.Error(err.Error())
265 func (n *Node) initAndstartApiServer() {
266 n.api = api.NewAPI(n.syncManager, n.wallet, n.txfeed, n.cpuMiner, n.miningPool, n.chain, n.config, n.accessTokens, n.newBlockCh, n.notificationMgr)
268 listenAddr := env.String("LISTEN", n.config.ApiAddress)
270 n.api.StartServer(*listenAddr)
273 func (n *Node) OnStart() error {
275 if _, err := n.wallet.AccountMgr.GetMiningAddress(); err != nil {
276 n.miningEnable = false
282 if !n.config.VaultMode {
283 n.syncManager.Start()
285 n.initAndstartApiServer()
286 n.notificationMgr.Start()
287 if !n.config.Web.Closed {
288 _, port, err := net.SplitHostPort(n.config.ApiAddress)
290 log.Error("Invalid api address")
293 launchWebBrowser(port)
299 func (n *Node) OnStop() {
300 n.notificationMgr.Shutdown()
301 n.notificationMgr.WaitForShutdown()
302 n.BaseService.OnStop()
306 if !n.config.VaultMode {
311 func (n *Node) RunForever() {
312 // Sleep forever and then...
313 cmn.TrapSignal(func() {
318 func (n *Node) SyncManager() *netsync.SyncManager {
322 func (n *Node) MiningPool() *miningpool.MiningPool {
326 /**bytomdRPCCheck Check if bytomd connection via RPC is correctly working*/
327 func bytomdRPCCheck() bool {
329 BlockHeight uint64 `json:"block_height"`
331 if util.ValidatePegin {
333 resp, err := util.CallRPC("/get-block-header", &Req{BlockHeight: 0})
335 log.Error("Call mainchain interface get-block-header failed")
336 time.Sleep(time.Millisecond * 1000)
339 tmp, _ := json.Marshal(resp)
340 var blockHeader api.GetBlockHeaderResp
341 json.Unmarshal(tmp, &blockHeader)
342 hash := blockHeader.BlockHeader.Hash()
343 if strings.Compare(consensus.ActiveNetParams.ParentGenesisBlockHash, hash.String()) != 0 {
344 log.Error("Invalid parent genesis block hash response via RPC. Contacting wrong parent daemon?")