OSDN Git Service

61176c2257dd3f4f152b5c53d5facb34ae3215aa
[bytom/vapor.git] / node / node.go
1 package node
2
3 import (
4         "context"
5         "encoding/json"
6         "errors"
7         "net"
8         "net/http"
9         _ "net/http/pprof"
10         "os"
11         "path/filepath"
12         "strings"
13         "time"
14
15         "github.com/prometheus/prometheus/util/flock"
16         log "github.com/sirupsen/logrus"
17         cmn "github.com/tendermint/tmlibs/common"
18         dbm "github.com/tendermint/tmlibs/db"
19         browser "github.com/toqueteos/webbrowser"
20
21         "github.com/vapor/accesstoken"
22         "github.com/vapor/account"
23         "github.com/vapor/api"
24         "github.com/vapor/asset"
25         "github.com/vapor/blockchain/pseudohsm"
26         "github.com/vapor/blockchain/txfeed"
27         "github.com/vapor/common"
28         cfg "github.com/vapor/config"
29         "github.com/vapor/consensus"
30         engine "github.com/vapor/consensus/consensus"
31         dpos "github.com/vapor/consensus/consensus/dpos"
32         "github.com/vapor/crypto/ed25519/chainkd"
33         "github.com/vapor/database/leveldb"
34         "github.com/vapor/env"
35         "github.com/vapor/mining/miner"
36         "github.com/vapor/net/websocket"
37         "github.com/vapor/netsync"
38         "github.com/vapor/protocol"
39         "github.com/vapor/protocol/bc"
40         "github.com/vapor/util"
41         w "github.com/vapor/wallet"
42 )
43
44 const (
45         webHost           = "http://127.0.0.1"
46         maxNewBlockChSize = 1024
47 )
48
49 type Node struct {
50         cmn.BaseService
51
52         // config
53         config *cfg.Config
54
55         syncManager *netsync.SyncManager
56
57         //bcReactor    *bc.BlockchainReactor
58         wallet          *w.Wallet
59         accessTokens    *accesstoken.CredentialStore
60         notificationMgr *websocket.WSNotificationManager
61         api             *api.API
62         chain           *protocol.Chain
63         txfeed          *txfeed.Tracker
64         //cpuMiner        *cpuminer.CPUMiner
65         miner *miner.Miner
66
67         miningEnable bool
68
69         newBlockCh chan *bc.Hash
70
71         engine engine.Engine
72 }
73
74 func NewNode(config *cfg.Config) *Node {
75         ctx := context.Background()
76         if err := lockDataDirectory(config); err != nil {
77                 cmn.Exit("Error: " + err.Error())
78         }
79         initLogFile(config)
80         initActiveNetParams(config)
81         initConsensusConfig(config)
82         initCommonConfig(config)
83
84         util.MainchainConfig = config.MainChain
85         util.ValidatePegin = config.ValidatePegin
86         // Get store
87         if config.DBBackend != "memdb" && config.DBBackend != "leveldb" {
88                 cmn.Exit(cmn.Fmt("Param db_backend [%v] is invalid, use leveldb or memdb", config.DBBackend))
89         }
90         coreDB := dbm.NewDB("core", config.DBBackend, config.DBDir())
91         store := leveldb.NewStore(coreDB)
92
93         tokenDB := dbm.NewDB("accesstoken", config.DBBackend, config.DBDir())
94         accessTokens := accesstoken.NewStore(tokenDB)
95
96         var engine engine.Engine
97         switch config.Consensus.Type {
98         case "dpos":
99                 engine = dpos.GDpos
100         }
101
102         txPool := protocol.NewTxPool(store)
103         chain, err := protocol.NewChain(store, txPool, engine)
104         if err != nil {
105                 cmn.Exit(cmn.Fmt("Failed to create chain structure: %v", err))
106         }
107
108         var accounts *account.Manager = nil
109         var assets *asset.Registry = nil
110         var wallet *w.Wallet = nil
111         var txFeed *txfeed.Tracker = nil
112
113         txFeedDB := dbm.NewDB("txfeeds", config.DBBackend, config.DBDir())
114         txFeed = txfeed.NewTracker(txFeedDB, chain)
115
116         if err = txFeed.Prepare(ctx); err != nil {
117                 log.WithField("error", err).Error("start txfeed")
118                 return nil
119         }
120
121         hsm, err := pseudohsm.New(config.KeysDir())
122         if err != nil {
123                 cmn.Exit(cmn.Fmt("initialize HSM failed: %v", err))
124         }
125
126         if !config.Wallet.Disable {
127                 address, err := common.DecodeAddress(config.Consensus.Coinbase, &consensus.ActiveNetParams)
128                 if err != nil {
129                         cmn.Exit(cmn.Fmt("DecodeAddress: %v", err))
130                 }
131                 walletDB := dbm.NewDB("wallet", config.DBBackend, config.DBDir())
132                 accounts = account.NewManager(walletDB, chain)
133                 assets = asset.NewRegistry(walletDB, chain)
134                 wallet, err = w.NewWallet(walletDB, accounts, assets, hsm, chain, address)
135                 if err != nil {
136                         log.WithField("error", err).Error("init NewWallet")
137                 }
138
139                 // trigger rescan wallet
140                 if config.Wallet.Rescan {
141                         wallet.RescanBlocks()
142                 }
143         }
144         newBlockCh := make(chan *bc.Hash, maxNewBlockChSize)
145
146         syncManager, _ := netsync.NewSyncManager(config, chain, txPool, newBlockCh)
147
148         notificationMgr := websocket.NewWsNotificationManager(config.Websocket.MaxNumWebsockets, config.Websocket.MaxNumConcurrentReqs, chain)
149
150         // get transaction from txPool and send it to syncManager and wallet
151         go newPoolTxListener(txPool, syncManager, wallet, notificationMgr)
152
153         // run the profile server
154         profileHost := config.ProfListenAddress
155         if profileHost != "" {
156                 // Profiling bytomd programs.see (https://blog.golang.org/profiling-go-programs)
157                 // go tool pprof http://profileHose/debug/pprof/heap
158                 go func() {
159                         if err = http.ListenAndServe(profileHost, nil); err != nil {
160                                 cmn.Exit(cmn.Fmt("Failed to register tcp profileHost: %v", err))
161                         }
162                 }()
163         }
164
165         switch config.Consensus.Type {
166         case "dpos":
167                 initDpos(chain, config)
168         }
169
170         node := &Node{
171                 config:       config,
172                 syncManager:  syncManager,
173                 accessTokens: accessTokens,
174                 wallet:       wallet,
175                 chain:        chain,
176                 txfeed:       txFeed,
177                 miningEnable: config.Mining,
178
179                 newBlockCh:      newBlockCh,
180                 notificationMgr: notificationMgr,
181                 engine:          engine,
182         }
183
184         node.miner = miner.NewMiner(chain, accounts, txPool, newBlockCh, engine)
185         node.BaseService = *cmn.NewBaseService(nil, "Node", node)
186
187         return node
188 }
189
190 // newPoolTxListener listener transaction from txPool, and send it to syncManager and wallet
191 func newPoolTxListener(txPool *protocol.TxPool, syncManager *netsync.SyncManager, wallet *w.Wallet, notificationMgr *websocket.WSNotificationManager) {
192         txMsgCh := txPool.GetMsgCh()
193         syncManagerTxCh := syncManager.GetNewTxCh()
194
195         for {
196                 msg := <-txMsgCh
197                 switch msg.MsgType {
198                 case protocol.MsgNewTx:
199                         syncManagerTxCh <- msg.Tx
200                         if wallet != nil {
201                                 wallet.AddUnconfirmedTx(msg.TxDesc)
202                         }
203                         notificationMgr.NotifyMempoolTx(msg.Tx)
204                 case protocol.MsgRemoveTx:
205                         if wallet != nil {
206                                 wallet.RemoveUnconfirmedTx(msg.TxDesc)
207                         }
208                 default:
209                         log.Warn("got unknow message type from the txPool channel")
210                 }
211         }
212 }
213
214 // Lock data directory after daemonization
215 func lockDataDirectory(config *cfg.Config) error {
216         _, _, err := flock.New(filepath.Join(config.RootDir, "LOCK"))
217         if err != nil {
218                 return errors.New("datadir already used by another process")
219         }
220         return nil
221 }
222
223 func initActiveNetParams(config *cfg.Config) {
224         var exist bool
225         consensus.ActiveNetParams, exist = consensus.NetParams[config.ChainID]
226         if !exist {
227                 cmn.Exit(cmn.Fmt("chain_id[%v] don't exist", config.ChainID))
228         }
229         if config.Side.FedpegXPubs != "" {
230                 var federationRedeemXPubs []chainkd.XPub
231                 fedpegXPubs := strings.Split(config.Side.FedpegXPubs, ",")
232                 for _, xpubStr := range fedpegXPubs {
233                         var xpub chainkd.XPub
234                         xpub.UnmarshalText([]byte(xpubStr))
235                         federationRedeemXPubs = append(federationRedeemXPubs, xpub)
236                 }
237                 consensus.ActiveNetParams.FedpegXPubs = federationRedeemXPubs
238         }
239
240         consensus.ActiveNetParams.Signer = config.Signer
241         consensus.ActiveNetParams.PeginMinDepth = config.Side.PeginMinDepth
242         consensus.ActiveNetParams.ParentGenesisBlockHash = config.Side.ParentGenesisBlockHash
243 }
244
245 func initLogFile(config *cfg.Config) {
246         if config.LogFile == "" {
247                 return
248         }
249         cmn.EnsureDir(filepath.Dir(config.LogFile), 0700)
250         file, err := os.OpenFile(config.LogFile, os.O_CREATE|os.O_WRONLY|os.O_APPEND, 0666)
251         if err == nil {
252                 log.SetOutput(file)
253         } else {
254                 log.WithField("err", err).Info("using default")
255         }
256
257 }
258
259 func initCommonConfig(config *cfg.Config) {
260         cfg.CommonConfig = config
261 }
262
263 // Lanch web broser or not
264 func launchWebBrowser(port string) {
265         webAddress := webHost + ":" + port
266         log.Info("Launching System Browser with :", webAddress)
267         if err := browser.Open(webAddress); err != nil {
268                 log.Error(err.Error())
269                 return
270         }
271 }
272
273 func (n *Node) initAndstartApiServer() {
274         n.api = api.NewAPI(n.syncManager, n.wallet, n.txfeed, n.miner, n.chain, n.config, n.accessTokens, n.newBlockCh, n.notificationMgr)
275
276         listenAddr := env.String("LISTEN", n.config.ApiAddress)
277         env.Parse()
278         n.api.StartServer(*listenAddr)
279 }
280
281 func (n *Node) OnStart() error {
282         if n.miningEnable {
283                 if _, err := n.wallet.AccountMgr.GetMiningAddress(); err != nil {
284                         n.miningEnable = false
285                         log.Error(err)
286                 } else {
287                         //n.cpuMiner.Start()
288                         n.miner.Start()
289                 }
290         }
291         if !n.config.VaultMode {
292                 n.syncManager.Start()
293         }
294         n.initAndstartApiServer()
295         n.notificationMgr.Start()
296         if !n.config.Web.Closed {
297                 _, port, err := net.SplitHostPort(n.config.ApiAddress)
298                 if err != nil {
299                         log.Error("Invalid api address")
300                         return err
301                 }
302                 launchWebBrowser(port)
303         }
304         go bytomdRPCCheck()
305         return nil
306 }
307
308 func (n *Node) OnStop() {
309         if err := n.engine.Finish(); err != nil {
310                 log.Errorf("OnStop: %v", err)
311         }
312
313         n.notificationMgr.Shutdown()
314         n.notificationMgr.WaitForShutdown()
315         n.BaseService.OnStop()
316         if n.miningEnable {
317                 n.miner.Stop()
318         }
319         if !n.config.VaultMode {
320                 n.syncManager.Stop()
321         }
322 }
323
324 func (n *Node) RunForever() {
325         // Sleep forever and then...
326         cmn.TrapSignal(func() {
327                 n.Stop()
328         })
329 }
330
331 func (n *Node) SyncManager() *netsync.SyncManager {
332         return n.syncManager
333 }
334
335 /**bytomdRPCCheck Check if bytomd connection via RPC is correctly working*/
336 func bytomdRPCCheck() bool {
337         type Req struct {
338                 BlockHeight uint64 `json:"block_height"`
339         }
340         if util.ValidatePegin {
341                 for {
342                         resp, err := util.CallRPC("/get-merkle-proof", &Req{BlockHeight: 0})
343                         if err != nil {
344                                 log.Error("Call mainchain interface get-block-header failed")
345                                 time.Sleep(time.Millisecond * 1000)
346                                 continue
347                         }
348                         tmp, _ := json.Marshal(resp)
349                         var blockHeader api.GetBlockHeaderResp
350                         json.Unmarshal(tmp, &blockHeader)
351                         hash := blockHeader.BlockHeader.Hash()
352                         if strings.Compare(consensus.ActiveNetParams.ParentGenesisBlockHash, hash.String()) != 0 {
353                                 log.Error("Invalid parent genesis block hash response via RPC. Contacting wrong parent daemon?", consensus.ActiveNetParams.ParentGenesisBlockHash, ":", hash.String())
354                                 return false
355                         }
356                         break
357                 }
358         }
359
360         return true
361 }
362
363 func initConsensusConfig(config *cfg.Config) {
364         if config.ConsensusConfigFile == "" {
365                 // poa
366         } else {
367                 //
368                 file, err := os.Open(config.ConsensusConfigFile)
369                 if err != nil {
370                         cmn.Exit(cmn.Fmt("Failed to read consensus file: %v", err))
371                 }
372                 defer file.Close()
373
374                 if err := json.NewDecoder(file).Decode(config); err != nil {
375                         cmn.Exit(cmn.Fmt("invalid consensus file: %v", err))
376                 }
377
378                 for _, v := range config.Consensus.SelfVoteSigners {
379                         address, err := common.DecodeAddress(v, &consensus.ActiveNetParams)
380                         if err != nil {
381                                 cmn.Exit(cmn.Fmt("Address resolution failed: %v", err))
382                         }
383                         config.Consensus.Signers = append(config.Consensus.Signers, address)
384                 }
385         }
386 }
387
388 func initDpos(chain *protocol.Chain, config *cfg.Config) {
389         header := chain.BestBlockHeader()
390         height := header.Height
391         hash := header.Hash()
392         maxSignerCount := config.Consensus.MaxSignerCount
393         period := config.Consensus.Period
394         if err := dpos.GDpos.Init(chain, maxSignerCount, period, height, hash); err != nil {
395                 cmn.Exit(cmn.Fmt("initVote: Dpos new: %v", err))
396         }
397
398         if height > 0 {
399                 oldBlockHeight := dpos.GDpos.GetOldBlockHeight()
400                 oldBlockHash := dpos.GDpos.GetOldBlockHash()
401                 if err := chain.RepairDPoSData(oldBlockHeight, oldBlockHash); err != nil {
402                         cmn.Exit(cmn.Fmt("initVote failed: %v", err))
403                 }
404         }
405 }