OSDN Git Service

0c7cfa4917bea233a183877beb3ce09bddebda6d
[bytom/vapor.git] / node / node.go
1 package node
2
3 import (
4         "context"
5         "encoding/json"
6         "errors"
7         "net"
8         "net/http"
9         _ "net/http/pprof"
10         "os"
11         "path/filepath"
12         "strings"
13         "time"
14
15         "github.com/prometheus/prometheus/util/flock"
16         log "github.com/sirupsen/logrus"
17         cmn "github.com/tendermint/tmlibs/common"
18         dbm "github.com/tendermint/tmlibs/db"
19         browser "github.com/toqueteos/webbrowser"
20
21         "github.com/vapor/accesstoken"
22         "github.com/vapor/account"
23         "github.com/vapor/api"
24         "github.com/vapor/asset"
25         "github.com/vapor/blockchain/pseudohsm"
26         "github.com/vapor/blockchain/txfeed"
27         "github.com/vapor/common"
28         cfg "github.com/vapor/config"
29         "github.com/vapor/consensus"
30         engine "github.com/vapor/consensus/consensus"
31         "github.com/vapor/consensus/consensus/dpos"
32         "github.com/vapor/crypto/ed25519/chainkd"
33         "github.com/vapor/database/leveldb"
34         "github.com/vapor/env"
35         "github.com/vapor/mining/miner"
36         "github.com/vapor/mining/miningpool"
37         "github.com/vapor/net/websocket"
38         "github.com/vapor/netsync"
39         "github.com/vapor/protocol"
40         "github.com/vapor/protocol/bc"
41         "github.com/vapor/util"
42         w "github.com/vapor/wallet"
43 )
44
45 const (
46         webHost           = "http://127.0.0.1"
47         maxNewBlockChSize = 1024
48 )
49
50 var consensusEngine engine.Engine
51
52 type Node struct {
53         cmn.BaseService
54
55         // config
56         config *cfg.Config
57
58         syncManager *netsync.SyncManager
59
60         //bcReactor    *bc.BlockchainReactor
61         wallet          *w.Wallet
62         accessTokens    *accesstoken.CredentialStore
63         notificationMgr *websocket.WSNotificationManager
64         api             *api.API
65         chain           *protocol.Chain
66         txfeed          *txfeed.Tracker
67         //cpuMiner        *cpuminer.CPUMiner
68         miner *miner.Miner
69
70         miningPool   *miningpool.MiningPool
71         miningEnable bool
72
73         newBlockCh chan *bc.Hash
74 }
75
76 func NewNode(config *cfg.Config) *Node {
77         ctx := context.Background()
78         if err := lockDataDirectory(config); err != nil {
79                 cmn.Exit("Error: " + err.Error())
80         }
81         initLogFile(config)
82         initActiveNetParams(config)
83         initConsensusConfig(config)
84         initCommonConfig(config)
85
86         util.MainchainConfig = config.MainChain
87         util.ValidatePegin = config.ValidatePegin
88         // Get store
89         if config.DBBackend != "memdb" && config.DBBackend != "leveldb" {
90                 cmn.Exit(cmn.Fmt("Param db_backend [%v] is invalid, use leveldb or memdb", config.DBBackend))
91         }
92         coreDB := dbm.NewDB("core", config.DBBackend, config.DBDir())
93         store := leveldb.NewStore(coreDB)
94
95         tokenDB := dbm.NewDB("accesstoken", config.DBBackend, config.DBDir())
96         accessTokens := accesstoken.NewStore(tokenDB)
97
98         txPool := protocol.NewTxPool(store)
99         chain, err := protocol.NewChain(store, txPool)
100         if err != nil {
101                 cmn.Exit(cmn.Fmt("Failed to create chain structure: %v", err))
102         }
103
104         var accounts *account.Manager = nil
105         var assets *asset.Registry = nil
106         var wallet *w.Wallet = nil
107         var txFeed *txfeed.Tracker = nil
108
109         txFeedDB := dbm.NewDB("txfeeds", config.DBBackend, config.DBDir())
110         txFeed = txfeed.NewTracker(txFeedDB, chain)
111
112         if err = txFeed.Prepare(ctx); err != nil {
113                 log.WithField("error", err).Error("start txfeed")
114                 return nil
115         }
116
117         hsm, err := pseudohsm.New(config.KeysDir())
118         if err != nil {
119                 cmn.Exit(cmn.Fmt("initialize HSM failed: %v", err))
120         }
121
122         if !config.Wallet.Disable {
123                 address, err := common.DecodeAddress(config.Consensus.Dpos.Coinbase, &consensus.ActiveNetParams)
124                 if err != nil {
125                         cmn.Exit(cmn.Fmt("DecodeAddress: %v", err))
126                 }
127                 walletDB := dbm.NewDB("wallet", config.DBBackend, config.DBDir())
128                 accounts = account.NewManager(walletDB, chain)
129                 assets = asset.NewRegistry(walletDB, chain)
130                 wallet, err = w.NewWallet(walletDB, accounts, assets, hsm, chain, address)
131                 if err != nil {
132                         log.WithField("error", err).Error("init NewWallet")
133                 }
134
135                 // trigger rescan wallet
136                 if config.Wallet.Rescan {
137                         wallet.RescanBlocks()
138                 }
139         }
140         newBlockCh := make(chan *bc.Hash, maxNewBlockChSize)
141
142         syncManager, _ := netsync.NewSyncManager(config, chain, txPool, newBlockCh)
143
144         notificationMgr := websocket.NewWsNotificationManager(config.Websocket.MaxNumWebsockets, config.Websocket.MaxNumConcurrentReqs, chain)
145
146         // get transaction from txPool and send it to syncManager and wallet
147         go newPoolTxListener(txPool, syncManager, wallet, notificationMgr)
148
149         // run the profile server
150         profileHost := config.ProfListenAddress
151         if profileHost != "" {
152                 // Profiling bytomd programs.see (https://blog.golang.org/profiling-go-programs)
153                 // go tool pprof http://profileHose/debug/pprof/heap
154                 go func() {
155                         if err = http.ListenAndServe(profileHost, nil); err != nil {
156                                 cmn.Exit(cmn.Fmt("Failed to register tcp profileHost: %v", err))
157                         }
158                 }()
159         }
160
161         node := &Node{
162                 config:       config,
163                 syncManager:  syncManager,
164                 accessTokens: accessTokens,
165                 wallet:       wallet,
166                 chain:        chain,
167                 txfeed:       txFeed,
168                 miningEnable: config.Mining,
169
170                 newBlockCh:      newBlockCh,
171                 notificationMgr: notificationMgr,
172         }
173
174         //node.cpuMiner = cpuminer.NewCPUMiner(chain, accounts, txPool, newBlockCh)
175         consensusEngine = createConsensusEngine(config, store)
176         node.miner = miner.NewMiner(chain, accounts, txPool, newBlockCh, consensusEngine)
177         node.miningPool = miningpool.NewMiningPool(chain, accounts, txPool, newBlockCh)
178
179         node.BaseService = *cmn.NewBaseService(nil, "Node", node)
180
181         return node
182 }
183
184 // newPoolTxListener listener transaction from txPool, and send it to syncManager and wallet
185 func newPoolTxListener(txPool *protocol.TxPool, syncManager *netsync.SyncManager, wallet *w.Wallet, notificationMgr *websocket.WSNotificationManager) {
186         txMsgCh := txPool.GetMsgCh()
187         syncManagerTxCh := syncManager.GetNewTxCh()
188
189         for {
190                 msg := <-txMsgCh
191                 switch msg.MsgType {
192                 case protocol.MsgNewTx:
193                         syncManagerTxCh <- msg.Tx
194                         if wallet != nil {
195                                 wallet.AddUnconfirmedTx(msg.TxDesc)
196                         }
197                         notificationMgr.NotifyMempoolTx(msg.Tx)
198                 case protocol.MsgRemoveTx:
199                         if wallet != nil {
200                                 wallet.RemoveUnconfirmedTx(msg.TxDesc)
201                         }
202                 default:
203                         log.Warn("got unknow message type from the txPool channel")
204                 }
205         }
206 }
207
208 // Lock data directory after daemonization
209 func lockDataDirectory(config *cfg.Config) error {
210         _, _, err := flock.New(filepath.Join(config.RootDir, "LOCK"))
211         if err != nil {
212                 return errors.New("datadir already used by another process")
213         }
214         return nil
215 }
216
217 func initActiveNetParams(config *cfg.Config) {
218         var exist bool
219         consensus.ActiveNetParams, exist = consensus.NetParams[config.ChainID]
220         if !exist {
221                 cmn.Exit(cmn.Fmt("chain_id[%v] don't exist", config.ChainID))
222         }
223         if config.Side.FedpegXPubs != "" {
224                 var federationRedeemXPubs []chainkd.XPub
225                 fedpegXPubs := strings.Split(config.Side.FedpegXPubs, ",")
226                 for _, xpubStr := range fedpegXPubs {
227                         var xpub chainkd.XPub
228                         xpub.UnmarshalText([]byte(xpubStr))
229                         federationRedeemXPubs = append(federationRedeemXPubs, xpub)
230                 }
231                 consensus.ActiveNetParams.FedpegXPubs = federationRedeemXPubs
232         }
233
234         if config.Side.SignBlockXPubs != "" {
235                 var signBlockXPubs []chainkd.XPub
236                 xPubs := strings.Split(config.Side.SignBlockXPubs, ",")
237                 for _, xpubStr := range xPubs {
238                         var xpub chainkd.XPub
239                         xpub.UnmarshalText([]byte(xpubStr))
240                         signBlockXPubs = append(signBlockXPubs, xpub)
241                 }
242                 consensus.ActiveNetParams.SignBlockXPubs = signBlockXPubs
243         }
244
245         consensus.ActiveNetParams.Signer = config.Signer
246         consensus.ActiveNetParams.PeginMinDepth = config.Side.PeginMinDepth
247         consensus.ActiveNetParams.ParentGenesisBlockHash = config.Side.ParentGenesisBlockHash
248 }
249
250 func initLogFile(config *cfg.Config) {
251         if config.LogFile == "" {
252                 return
253         }
254         cmn.EnsureDir(filepath.Dir(config.LogFile), 0700)
255         file, err := os.OpenFile(config.LogFile, os.O_CREATE|os.O_WRONLY|os.O_APPEND, 0666)
256         if err == nil {
257                 log.SetOutput(file)
258         } else {
259                 log.WithField("err", err).Info("using default")
260         }
261
262 }
263
264 func initCommonConfig(config *cfg.Config) {
265         cfg.CommonConfig = config
266 }
267
268 // Lanch web broser or not
269 func launchWebBrowser(port string) {
270         webAddress := webHost + ":" + port
271         log.Info("Launching System Browser with :", webAddress)
272         if err := browser.Open(webAddress); err != nil {
273                 log.Error(err.Error())
274                 return
275         }
276 }
277
278 func (n *Node) initAndstartApiServer() {
279         n.api = api.NewAPI(n.syncManager, n.wallet, n.txfeed, n.miner, n.miningPool, n.chain, n.config, n.accessTokens, n.newBlockCh, n.notificationMgr)
280
281         listenAddr := env.String("LISTEN", n.config.ApiAddress)
282         env.Parse()
283         n.api.StartServer(*listenAddr)
284 }
285
286 func (n *Node) OnStart() error {
287         if n.miningEnable {
288                 if _, err := n.wallet.AccountMgr.GetMiningAddress(); err != nil {
289                         n.miningEnable = false
290                         log.Error(err)
291                 } else {
292                         //n.cpuMiner.Start()
293                         n.miner.Start()
294                 }
295         }
296         if !n.config.VaultMode {
297                 n.syncManager.Start()
298         }
299         n.initAndstartApiServer()
300         n.notificationMgr.Start()
301         if !n.config.Web.Closed {
302                 _, port, err := net.SplitHostPort(n.config.ApiAddress)
303                 if err != nil {
304                         log.Error("Invalid api address")
305                         return err
306                 }
307                 launchWebBrowser(port)
308         }
309         go bytomdRPCCheck()
310         return nil
311 }
312
313 func (n *Node) OnStop() {
314         n.notificationMgr.Shutdown()
315         n.notificationMgr.WaitForShutdown()
316         n.BaseService.OnStop()
317         if n.miningEnable {
318                 n.miner.Stop()
319         }
320         if !n.config.VaultMode {
321                 n.syncManager.Stop()
322         }
323 }
324
325 func (n *Node) RunForever() {
326         // Sleep forever and then...
327         cmn.TrapSignal(func() {
328                 n.Stop()
329         })
330 }
331
332 func (n *Node) SyncManager() *netsync.SyncManager {
333         return n.syncManager
334 }
335
336 func (n *Node) MiningPool() *miningpool.MiningPool {
337         return n.miningPool
338 }
339
340 /**bytomdRPCCheck Check if bytomd connection via RPC is correctly working*/
341 func bytomdRPCCheck() bool {
342         type Req struct {
343                 BlockHeight uint64 `json:"block_height"`
344         }
345         if util.ValidatePegin {
346                 for {
347                         resp, err := util.CallRPC("/get-merkle-proof", &Req{BlockHeight: 0})
348                         if err != nil {
349                                 log.Error("Call mainchain interface get-block-header failed")
350                                 time.Sleep(time.Millisecond * 1000)
351                                 continue
352                         }
353                         tmp, _ := json.Marshal(resp)
354                         var blockHeader api.GetBlockHeaderResp
355                         json.Unmarshal(tmp, &blockHeader)
356                         hash := blockHeader.BlockHeader.Hash()
357                         if strings.Compare(consensus.ActiveNetParams.ParentGenesisBlockHash, hash.String()) != 0 {
358                                 log.Error("Invalid parent genesis block hash response via RPC. Contacting wrong parent daemon?", consensus.ActiveNetParams.ParentGenesisBlockHash, hash.String())
359                                 return false
360                         }
361                         break
362                 }
363         }
364
365         return true
366 }
367
368 func initConsensusConfig(config *cfg.Config) {
369         if config.ConsensusConfigFile == "" {
370                 // poa
371         } else {
372                 //
373                 file, err := os.Open(config.ConsensusConfigFile)
374                 if err != nil {
375                         cmn.Exit(cmn.Fmt("Failed to read consensus file: %v", err))
376                 }
377                 defer file.Close()
378
379                 if err := json.NewDecoder(file).Decode(config); err != nil {
380                         cmn.Exit(cmn.Fmt("invalid consensus file: %v", err))
381                 }
382                 for _, v := range config.Consensus.Dpos.SelfVoteSigners {
383                         address, err := common.DecodeAddress(v, &consensus.ActiveNetParams)
384                         if err != nil {
385                                 cmn.Exit(cmn.Fmt("Address resolution failed: %v", err))
386                         }
387                         config.Consensus.Dpos.Signers = append(config.Consensus.Dpos.Signers, address)
388                 }
389         }
390 }
391
392 func createConsensusEngine(config *cfg.Config, store protocol.Store) engine.Engine {
393         if config.Consensus.Dpos != nil {
394                 return dpos.New(config.Consensus.Dpos, store)
395         } else {
396                 return nil
397         }
398 }
399
400 func GetConsensusEngine() engine.Engine {
401         return consensusEngine
402 }