OSDN Git Service

Modify the dependency on path
[bytom/vapor.git] / node / node.go
1 package node
2
3 import (
4         "context"
5         "encoding/json"
6         "errors"
7         "net"
8         "net/http"
9         _ "net/http/pprof"
10         "os"
11         "path/filepath"
12         "strings"
13         "time"
14
15         "github.com/prometheus/prometheus/util/flock"
16         log "github.com/sirupsen/logrus"
17         cmn "github.com/tendermint/tmlibs/common"
18         dbm "github.com/tendermint/tmlibs/db"
19         browser "github.com/toqueteos/webbrowser"
20
21         "github.com/vapor/accesstoken"
22         "github.com/vapor/account"
23         "github.com/vapor/api"
24         "github.com/vapor/asset"
25         "github.com/vapor/blockchain/pseudohsm"
26         "github.com/vapor/blockchain/txfeed"
27         cfg "github.com/vapor/config"
28         "github.com/vapor/consensus"
29         "github.com/vapor/crypto/ed25519/chainkd"
30         "github.com/vapor/database/leveldb"
31         "github.com/vapor/env"
32         "github.com/vapor/mining/cpuminer"
33         "github.com/vapor/mining/miningpool"
34         "github.com/vapor/net/websocket"
35         "github.com/vapor/netsync"
36         "github.com/vapor/protocol"
37         "github.com/vapor/protocol/bc"
38         "github.com/vapor/util"
39         w "github.com/vapor/wallet"
40 )
41
42 const (
43         webHost           = "http://127.0.0.1"
44         maxNewBlockChSize = 1024
45 )
46
47 type Node struct {
48         cmn.BaseService
49
50         // config
51         config *cfg.Config
52
53         syncManager *netsync.SyncManager
54
55         //bcReactor    *bc.BlockchainReactor
56         wallet          *w.Wallet
57         accessTokens    *accesstoken.CredentialStore
58         notificationMgr *websocket.WSNotificationManager
59         api             *api.API
60         chain           *protocol.Chain
61         txfeed          *txfeed.Tracker
62         cpuMiner        *cpuminer.CPUMiner
63         miningPool      *miningpool.MiningPool
64         miningEnable    bool
65
66         newBlockCh chan *bc.Hash
67 }
68
69 func NewNode(config *cfg.Config) *Node {
70         ctx := context.Background()
71         if err := lockDataDirectory(config); err != nil {
72                 cmn.Exit("Error: " + err.Error())
73         }
74         initLogFile(config)
75         initActiveNetParams(config)
76         initCommonConfig(config)
77         util.MainchainConfig = config.MainChain
78         util.ValidatePegin = config.ValidatePegin
79         // Get store
80         if config.DBBackend != "memdb" && config.DBBackend != "leveldb" {
81                 cmn.Exit(cmn.Fmt("Param db_backend [%v] is invalid, use leveldb or memdb", config.DBBackend))
82         }
83         coreDB := dbm.NewDB("core", config.DBBackend, config.DBDir())
84         store := leveldb.NewStore(coreDB)
85
86         tokenDB := dbm.NewDB("accesstoken", config.DBBackend, config.DBDir())
87         accessTokens := accesstoken.NewStore(tokenDB)
88
89         txPool := protocol.NewTxPool(store)
90         chain, err := protocol.NewChain(store, txPool)
91         if err != nil {
92                 cmn.Exit(cmn.Fmt("Failed to create chain structure: %v", err))
93         }
94
95         var accounts *account.Manager = nil
96         var assets *asset.Registry = nil
97         var wallet *w.Wallet = nil
98         var txFeed *txfeed.Tracker = nil
99
100         txFeedDB := dbm.NewDB("txfeeds", config.DBBackend, config.DBDir())
101         txFeed = txfeed.NewTracker(txFeedDB, chain)
102
103         if err = txFeed.Prepare(ctx); err != nil {
104                 log.WithField("error", err).Error("start txfeed")
105                 return nil
106         }
107
108         hsm, err := pseudohsm.New(config.KeysDir())
109         if err != nil {
110                 cmn.Exit(cmn.Fmt("initialize HSM failed: %v", err))
111         }
112
113         if !config.Wallet.Disable {
114                 walletDB := dbm.NewDB("wallet", config.DBBackend, config.DBDir())
115                 accounts = account.NewManager(walletDB, chain)
116                 assets = asset.NewRegistry(walletDB, chain)
117                 wallet, err = w.NewWallet(walletDB, accounts, assets, hsm, chain)
118                 if err != nil {
119                         log.WithField("error", err).Error("init NewWallet")
120                 }
121
122                 // trigger rescan wallet
123                 if config.Wallet.Rescan {
124                         wallet.RescanBlocks()
125                 }
126         }
127         newBlockCh := make(chan *bc.Hash, maxNewBlockChSize)
128
129         syncManager, _ := netsync.NewSyncManager(config, chain, txPool, newBlockCh)
130
131         notificationMgr := websocket.NewWsNotificationManager(config.Websocket.MaxNumWebsockets, config.Websocket.MaxNumConcurrentReqs, chain)
132
133         // get transaction from txPool and send it to syncManager and wallet
134         go newPoolTxListener(txPool, syncManager, wallet, notificationMgr)
135
136         // run the profile server
137         profileHost := config.ProfListenAddress
138         if profileHost != "" {
139                 // Profiling bytomd programs.see (https://blog.golang.org/profiling-go-programs)
140                 // go tool pprof http://profileHose/debug/pprof/heap
141                 go func() {
142                         if err = http.ListenAndServe(profileHost, nil); err != nil {
143                                 cmn.Exit(cmn.Fmt("Failed to register tcp profileHost: %v", err))
144                         }
145                 }()
146         }
147
148         node := &Node{
149                 config:       config,
150                 syncManager:  syncManager,
151                 accessTokens: accessTokens,
152                 wallet:       wallet,
153                 chain:        chain,
154                 txfeed:       txFeed,
155                 miningEnable: config.Mining,
156
157                 newBlockCh:      newBlockCh,
158                 notificationMgr: notificationMgr,
159         }
160
161         node.cpuMiner = cpuminer.NewCPUMiner(chain, accounts, txPool, newBlockCh)
162         node.miningPool = miningpool.NewMiningPool(chain, accounts, txPool, newBlockCh)
163
164         node.BaseService = *cmn.NewBaseService(nil, "Node", node)
165
166         return node
167 }
168
169 // newPoolTxListener listener transaction from txPool, and send it to syncManager and wallet
170 func newPoolTxListener(txPool *protocol.TxPool, syncManager *netsync.SyncManager, wallet *w.Wallet, notificationMgr *websocket.WSNotificationManager) {
171         txMsgCh := txPool.GetMsgCh()
172         syncManagerTxCh := syncManager.GetNewTxCh()
173
174         for {
175                 msg := <-txMsgCh
176                 switch msg.MsgType {
177                 case protocol.MsgNewTx:
178                         syncManagerTxCh <- msg.Tx
179                         if wallet != nil {
180                                 wallet.AddUnconfirmedTx(msg.TxDesc)
181                         }
182                         notificationMgr.NotifyMempoolTx(msg.Tx)
183                 case protocol.MsgRemoveTx:
184                         if wallet != nil {
185                                 wallet.RemoveUnconfirmedTx(msg.TxDesc)
186                         }
187                 default:
188                         log.Warn("got unknow message type from the txPool channel")
189                 }
190         }
191 }
192
193 // Lock data directory after daemonization
194 func lockDataDirectory(config *cfg.Config) error {
195         _, _, err := flock.New(filepath.Join(config.RootDir, "LOCK"))
196         if err != nil {
197                 return errors.New("datadir already used by another process")
198         }
199         return nil
200 }
201
202 func initActiveNetParams(config *cfg.Config) {
203         var exist bool
204         consensus.ActiveNetParams, exist = consensus.NetParams[config.ChainID]
205         if !exist {
206                 cmn.Exit(cmn.Fmt("chain_id[%v] don't exist", config.ChainID))
207         }
208         if config.Side.FedpegXPubs != "" {
209                 var federationRedeemXPubs []chainkd.XPub
210                 fedpegXPubs := strings.Split(config.Side.FedpegXPubs, ",")
211                 for _, xpubStr := range fedpegXPubs {
212                         var xpub chainkd.XPub
213                         xpub.UnmarshalText([]byte(xpubStr))
214                         federationRedeemXPubs = append(federationRedeemXPubs, xpub)
215                 }
216                 consensus.ActiveNetParams.FedpegXPubs = federationRedeemXPubs
217         }
218
219         if config.Side.SignBlockXPubs != "" {
220                 var signBlockXPubs []chainkd.XPub
221                 xPubs := strings.Split(config.Side.SignBlockXPubs, ",")
222                 for _, xpubStr := range xPubs {
223                         var xpub chainkd.XPub
224                         xpub.UnmarshalText([]byte(xpubStr))
225                         signBlockXPubs = append(signBlockXPubs, xpub)
226                 }
227                 consensus.ActiveNetParams.SignBlockXPubs = signBlockXPubs
228         }
229
230         consensus.ActiveNetParams.Signer = config.Signer
231         consensus.ActiveNetParams.PeginMinDepth = config.Side.PeginMinDepth
232         consensus.ActiveNetParams.ParentGenesisBlockHash = config.Side.ParentGenesisBlockHash
233 }
234
235 func initLogFile(config *cfg.Config) {
236         if config.LogFile == "" {
237                 return
238         }
239         cmn.EnsureDir(filepath.Dir(config.LogFile), 0700)
240         file, err := os.OpenFile(config.LogFile, os.O_CREATE|os.O_WRONLY|os.O_APPEND, 0666)
241         if err == nil {
242                 log.SetOutput(file)
243         } else {
244                 log.WithField("err", err).Info("using default")
245         }
246
247 }
248
249 func initCommonConfig(config *cfg.Config) {
250         cfg.CommonConfig = config
251 }
252
253 // Lanch web broser or not
254 func launchWebBrowser(port string) {
255         webAddress := webHost + ":" + port
256         log.Info("Launching System Browser with :", webAddress)
257         if err := browser.Open(webAddress); err != nil {
258                 log.Error(err.Error())
259                 return
260         }
261 }
262
263 func (n *Node) initAndstartApiServer() {
264         n.api = api.NewAPI(n.syncManager, n.wallet, n.txfeed, n.cpuMiner, n.miningPool, n.chain, n.config, n.accessTokens, n.newBlockCh, n.notificationMgr)
265
266         listenAddr := env.String("LISTEN", n.config.ApiAddress)
267         env.Parse()
268         n.api.StartServer(*listenAddr)
269 }
270
271 func (n *Node) OnStart() error {
272         if n.miningEnable {
273                 if _, err := n.wallet.AccountMgr.GetMiningAddress(); err != nil {
274                         n.miningEnable = false
275                         log.Error(err)
276                 } else {
277                         n.cpuMiner.Start()
278                 }
279         }
280         if !n.config.VaultMode {
281                 n.syncManager.Start()
282         }
283         n.initAndstartApiServer()
284         n.notificationMgr.Start()
285         if !n.config.Web.Closed {
286                 _, port, err := net.SplitHostPort(n.config.ApiAddress)
287                 if err != nil {
288                         log.Error("Invalid api address")
289                         return err
290                 }
291                 launchWebBrowser(port)
292         }
293         go bytomdRPCCheck()
294         return nil
295 }
296
297 func (n *Node) OnStop() {
298         n.notificationMgr.Shutdown()
299         n.notificationMgr.WaitForShutdown()
300         n.BaseService.OnStop()
301         if n.miningEnable {
302                 n.cpuMiner.Stop()
303         }
304         if !n.config.VaultMode {
305                 n.syncManager.Stop()
306         }
307 }
308
309 func (n *Node) RunForever() {
310         // Sleep forever and then...
311         cmn.TrapSignal(func() {
312                 n.Stop()
313         })
314 }
315
316 func (n *Node) SyncManager() *netsync.SyncManager {
317         return n.syncManager
318 }
319
320 func (n *Node) MiningPool() *miningpool.MiningPool {
321         return n.miningPool
322 }
323
324 /**bytomdRPCCheck Check if bytomd connection via RPC is correctly working*/
325 func bytomdRPCCheck() bool {
326         type Req struct {
327                 BlockHeight uint64 `json:"block_height"`
328         }
329         if util.ValidatePegin {
330                 for {
331                         resp, err := util.CallRPC("/get-block-header", &Req{BlockHeight: 0})
332                         if err != nil {
333                                 log.Error("Call mainchain interface get-block-header failed")
334                                 time.Sleep(time.Millisecond * 1000)
335                                 continue
336                         }
337                         tmp, _ := json.Marshal(resp)
338                         var blockHeader api.GetBlockHeaderResp
339                         json.Unmarshal(tmp, &blockHeader)
340                         hash := blockHeader.BlockHeader.Hash()
341                         if strings.Compare(consensus.ActiveNetParams.ParentGenesisBlockHash, hash.String()) != 0 {
342                                 log.Error("Invalid parent genesis block hash response via RPC. Contacting wrong parent daemon?")
343                                 return false
344                         }
345                         break
346                 }
347         }
348
349         return true
350 }