OSDN Git Service

577448807422a8179d3061373a6c1b3a925f7634
[bytom/vapor.git] / node / node.go
1 package node
2
3 import (
4         "context"
5         "encoding/json"
6         "errors"
7         "net"
8         "net/http"
9         _ "net/http/pprof"
10         "os"
11         "path/filepath"
12         "strings"
13         "time"
14
15         "github.com/prometheus/prometheus/util/flock"
16         log "github.com/sirupsen/logrus"
17         cmn "github.com/tendermint/tmlibs/common"
18         dbm "github.com/tendermint/tmlibs/db"
19         browser "github.com/toqueteos/webbrowser"
20
21         "github.com/vapor/accesstoken"
22         "github.com/vapor/account"
23         "github.com/vapor/api"
24         "github.com/vapor/asset"
25         "github.com/vapor/blockchain/pseudohsm"
26         "github.com/vapor/blockchain/txfeed"
27         "github.com/vapor/common"
28         cfg "github.com/vapor/config"
29         "github.com/vapor/consensus"
30         engine "github.com/vapor/consensus/consensus"
31         dpos "github.com/vapor/consensus/consensus/dpos"
32         "github.com/vapor/crypto/ed25519/chainkd"
33         "github.com/vapor/database/leveldb"
34         "github.com/vapor/env"
35         "github.com/vapor/mining/miner"
36         "github.com/vapor/net/websocket"
37         "github.com/vapor/netsync"
38         "github.com/vapor/protocol"
39         "github.com/vapor/protocol/bc"
40         "github.com/vapor/util"
41         w "github.com/vapor/wallet"
42 )
43
44 const (
45         webHost           = "http://127.0.0.1"
46         maxNewBlockChSize = 1024
47 )
48
49 type Node struct {
50         cmn.BaseService
51
52         // config
53         config *cfg.Config
54
55         syncManager *netsync.SyncManager
56
57         //bcReactor    *bc.BlockchainReactor
58         wallet          *w.Wallet
59         accessTokens    *accesstoken.CredentialStore
60         notificationMgr *websocket.WSNotificationManager
61         api             *api.API
62         chain           *protocol.Chain
63         txfeed          *txfeed.Tracker
64         //cpuMiner        *cpuminer.CPUMiner
65         miner *miner.Miner
66
67         miningEnable bool
68
69         newBlockCh chan *bc.Hash
70
71         engine engine.Engine
72 }
73
74 func NewNode(config *cfg.Config) *Node {
75         ctx := context.Background()
76         if err := lockDataDirectory(config); err != nil {
77                 cmn.Exit("Error: " + err.Error())
78         }
79         initLogFile(config)
80         initActiveNetParams(config)
81         initConsensusConfig(config)
82         initCommonConfig(config)
83
84         util.MainchainConfig = config.MainChain
85         util.ValidatePegin = config.ValidatePegin
86         // Get store
87         if config.DBBackend != "memdb" && config.DBBackend != "leveldb" {
88                 cmn.Exit(cmn.Fmt("Param db_backend [%v] is invalid, use leveldb or memdb", config.DBBackend))
89         }
90         coreDB := dbm.NewDB("core", config.DBBackend, config.DBDir())
91         store := leveldb.NewStore(coreDB)
92
93         tokenDB := dbm.NewDB("accesstoken", config.DBBackend, config.DBDir())
94         accessTokens := accesstoken.NewStore(tokenDB)
95
96         var engine engine.Engine
97         switch config.Consensus.Type {
98         case "dpos":
99                 engine = dpos.GDpos
100         }
101
102         txPool := protocol.NewTxPool(store)
103         chain, err := protocol.NewChain(store, txPool, engine)
104         if err != nil {
105                 cmn.Exit(cmn.Fmt("Failed to create chain structure: %v", err))
106         }
107
108         switch config.Consensus.Type {
109         case "dpos":
110                 initDpos(chain, config)
111         }
112
113         var accounts *account.Manager = nil
114         var assets *asset.Registry = nil
115         var wallet *w.Wallet = nil
116         var txFeed *txfeed.Tracker = nil
117
118         txFeedDB := dbm.NewDB("txfeeds", config.DBBackend, config.DBDir())
119         txFeed = txfeed.NewTracker(txFeedDB, chain)
120
121         if err = txFeed.Prepare(ctx); err != nil {
122                 log.WithField("error", err).Error("start txfeed")
123                 return nil
124         }
125
126         hsm, err := pseudohsm.New(config.KeysDir())
127         if err != nil {
128                 cmn.Exit(cmn.Fmt("initialize HSM failed: %v", err))
129         }
130
131         if !config.Wallet.Disable {
132                 address, err := common.DecodeAddress(config.Consensus.Coinbase, &consensus.ActiveNetParams)
133                 if err != nil {
134                         cmn.Exit(cmn.Fmt("DecodeAddress: %v", err))
135                 }
136                 walletDB := dbm.NewDB("wallet", config.DBBackend, config.DBDir())
137                 accounts = account.NewManager(walletDB, chain)
138                 assets = asset.NewRegistry(walletDB, chain)
139                 wallet, err = w.NewWallet(walletDB, accounts, assets, hsm, chain, address)
140                 if err != nil {
141                         log.WithField("error", err).Error("init NewWallet")
142                 }
143
144                 // trigger rescan wallet
145                 if config.Wallet.Rescan {
146                         wallet.RescanBlocks()
147                 }
148         }
149         newBlockCh := make(chan *bc.Hash, maxNewBlockChSize)
150
151         syncManager, _ := netsync.NewSyncManager(config, chain, txPool, newBlockCh)
152
153         notificationMgr := websocket.NewWsNotificationManager(config.Websocket.MaxNumWebsockets, config.Websocket.MaxNumConcurrentReqs, chain)
154
155         // get transaction from txPool and send it to syncManager and wallet
156         go newPoolTxListener(txPool, syncManager, wallet, notificationMgr)
157
158         // run the profile server
159         profileHost := config.ProfListenAddress
160         if profileHost != "" {
161                 // Profiling bytomd programs.see (https://blog.golang.org/profiling-go-programs)
162                 // go tool pprof http://profileHose/debug/pprof/heap
163                 go func() {
164                         if err = http.ListenAndServe(profileHost, nil); err != nil {
165                                 cmn.Exit(cmn.Fmt("Failed to register tcp profileHost: %v", err))
166                         }
167                 }()
168         }
169         node := &Node{
170                 config:       config,
171                 syncManager:  syncManager,
172                 accessTokens: accessTokens,
173                 wallet:       wallet,
174                 chain:        chain,
175                 txfeed:       txFeed,
176                 miningEnable: config.Mining,
177
178                 newBlockCh:      newBlockCh,
179                 notificationMgr: notificationMgr,
180                 engine:          engine,
181         }
182
183         node.miner = miner.NewMiner(chain, accounts, txPool, newBlockCh, engine)
184         node.BaseService = *cmn.NewBaseService(nil, "Node", node)
185
186         return node
187 }
188
189 // newPoolTxListener listener transaction from txPool, and send it to syncManager and wallet
190 func newPoolTxListener(txPool *protocol.TxPool, syncManager *netsync.SyncManager, wallet *w.Wallet, notificationMgr *websocket.WSNotificationManager) {
191         txMsgCh := txPool.GetMsgCh()
192         syncManagerTxCh := syncManager.GetNewTxCh()
193
194         for {
195                 msg := <-txMsgCh
196                 switch msg.MsgType {
197                 case protocol.MsgNewTx:
198                         syncManagerTxCh <- msg.Tx
199                         if wallet != nil {
200                                 wallet.AddUnconfirmedTx(msg.TxDesc)
201                         }
202                         notificationMgr.NotifyMempoolTx(msg.Tx)
203                 case protocol.MsgRemoveTx:
204                         if wallet != nil {
205                                 wallet.RemoveUnconfirmedTx(msg.TxDesc)
206                         }
207                 default:
208                         log.Warn("got unknow message type from the txPool channel")
209                 }
210         }
211 }
212
213 // Lock data directory after daemonization
214 func lockDataDirectory(config *cfg.Config) error {
215         _, _, err := flock.New(filepath.Join(config.RootDir, "LOCK"))
216         if err != nil {
217                 return errors.New("datadir already used by another process")
218         }
219         return nil
220 }
221
222 func initActiveNetParams(config *cfg.Config) {
223         var exist bool
224         consensus.ActiveNetParams, exist = consensus.NetParams[config.ChainID]
225         if !exist {
226                 cmn.Exit(cmn.Fmt("chain_id[%v] don't exist", config.ChainID))
227         }
228         if config.Side.FedpegXPubs != "" {
229                 var federationRedeemXPubs []chainkd.XPub
230                 fedpegXPubs := strings.Split(config.Side.FedpegXPubs, ",")
231                 for _, xpubStr := range fedpegXPubs {
232                         var xpub chainkd.XPub
233                         xpub.UnmarshalText([]byte(xpubStr))
234                         federationRedeemXPubs = append(federationRedeemXPubs, xpub)
235                 }
236                 consensus.ActiveNetParams.FedpegXPubs = federationRedeemXPubs
237         }
238
239         consensus.ActiveNetParams.Signer = config.Signer
240         consensus.ActiveNetParams.PeginMinDepth = config.Side.PeginMinDepth
241         consensus.ActiveNetParams.ParentGenesisBlockHash = config.Side.ParentGenesisBlockHash
242 }
243
244 func initLogFile(config *cfg.Config) {
245         if config.LogFile == "" {
246                 return
247         }
248         cmn.EnsureDir(filepath.Dir(config.LogFile), 0700)
249         file, err := os.OpenFile(config.LogFile, os.O_CREATE|os.O_WRONLY|os.O_APPEND, 0666)
250         if err == nil {
251                 log.SetOutput(file)
252         } else {
253                 log.WithField("err", err).Info("using default")
254         }
255
256 }
257
258 func initCommonConfig(config *cfg.Config) {
259         cfg.CommonConfig = config
260 }
261
262 // Lanch web broser or not
263 func launchWebBrowser(port string) {
264         webAddress := webHost + ":" + port
265         log.Info("Launching System Browser with :", webAddress)
266         if err := browser.Open(webAddress); err != nil {
267                 log.Error(err.Error())
268                 return
269         }
270 }
271
272 func (n *Node) initAndstartApiServer() {
273         n.api = api.NewAPI(n.syncManager, n.wallet, n.txfeed, n.miner, n.chain, n.config, n.accessTokens, n.newBlockCh, n.notificationMgr)
274
275         listenAddr := env.String("LISTEN", n.config.ApiAddress)
276         env.Parse()
277         n.api.StartServer(*listenAddr)
278 }
279
280 func (n *Node) OnStart() error {
281         if n.miningEnable {
282                 if _, err := n.wallet.AccountMgr.GetMiningAddress(); err != nil {
283                         n.miningEnable = false
284                         log.Error(err)
285                 } else {
286                         //n.cpuMiner.Start()
287                         n.miner.Start()
288                 }
289         }
290         if !n.config.VaultMode {
291                 n.syncManager.Start()
292         }
293         n.initAndstartApiServer()
294         n.notificationMgr.Start()
295         if !n.config.Web.Closed {
296                 _, port, err := net.SplitHostPort(n.config.ApiAddress)
297                 if err != nil {
298                         log.Error("Invalid api address")
299                         return err
300                 }
301                 launchWebBrowser(port)
302         }
303         go bytomdRPCCheck()
304         return nil
305 }
306
307 func (n *Node) OnStop() {
308         if err := n.engine.Finish(); err != nil {
309                 log.Errorf("OnStop: %v", err)
310         }
311
312         n.notificationMgr.Shutdown()
313         n.notificationMgr.WaitForShutdown()
314         n.BaseService.OnStop()
315         if n.miningEnable {
316                 n.miner.Stop()
317         }
318         if !n.config.VaultMode {
319                 n.syncManager.Stop()
320         }
321 }
322
323 func (n *Node) RunForever() {
324         // Sleep forever and then...
325         cmn.TrapSignal(func() {
326                 n.Stop()
327         })
328 }
329
330 func (n *Node) SyncManager() *netsync.SyncManager {
331         return n.syncManager
332 }
333
334 /**bytomdRPCCheck Check if bytomd connection via RPC is correctly working*/
335 func bytomdRPCCheck() bool {
336         type Req struct {
337                 BlockHeight uint64 `json:"block_height"`
338         }
339         if util.ValidatePegin {
340                 for {
341                         resp, err := util.CallRPC("/get-merkle-proof", &Req{BlockHeight: 0})
342                         if err != nil {
343                                 log.Error("Call mainchain interface get-block-header failed")
344                                 time.Sleep(time.Millisecond * 1000)
345                                 continue
346                         }
347                         tmp, _ := json.Marshal(resp)
348                         var blockHeader api.GetBlockHeaderResp
349                         json.Unmarshal(tmp, &blockHeader)
350                         hash := blockHeader.BlockHeader.Hash()
351                         if strings.Compare(consensus.ActiveNetParams.ParentGenesisBlockHash, hash.String()) != 0 {
352                                 log.Error("Invalid parent genesis block hash response via RPC. Contacting wrong parent daemon?", consensus.ActiveNetParams.ParentGenesisBlockHash, ":", hash.String())
353                                 return false
354                         }
355                         break
356                 }
357         }
358
359         return true
360 }
361
362 func initConsensusConfig(config *cfg.Config) {
363         if config.ConsensusConfigFile == "" {
364                 // poa
365         } else {
366                 //
367                 file, err := os.Open(config.ConsensusConfigFile)
368                 if err != nil {
369                         cmn.Exit(cmn.Fmt("Failed to read consensus file: %v", err))
370                 }
371                 defer file.Close()
372
373                 if err := json.NewDecoder(file).Decode(config); err != nil {
374                         cmn.Exit(cmn.Fmt("invalid consensus file: %v", err))
375                 }
376
377                 for _, v := range config.Consensus.SelfVoteSigners {
378                         address, err := common.DecodeAddress(v, &consensus.ActiveNetParams)
379                         if err != nil {
380                                 cmn.Exit(cmn.Fmt("Address resolution failed: %v", err))
381                         }
382                         config.Consensus.Signers = append(config.Consensus.Signers, address)
383                 }
384         }
385 }
386
387 func initDpos(chain *protocol.Chain, config *cfg.Config) {
388         header := chain.BestBlockHeader()
389         height := header.Height
390         hash := header.Hash()
391         maxSignerCount := config.Consensus.MaxSignerCount
392         period := config.Consensus.Period
393         if err := dpos.GDpos.Init(chain, maxSignerCount, period, height, hash); err != nil {
394                 cmn.Exit(cmn.Fmt("initVote: Dpos new: %v", err))
395         }
396
397         if height > 0 {
398                 oldBlockHeight := dpos.GDpos.GetOldBlockHeight()
399                 oldBlockHash := dpos.GDpos.GetOldBlockHash()
400                 if err := chain.RepairDPoSData(oldBlockHeight, oldBlockHash); err != nil {
401                         cmn.Exit(cmn.Fmt("initVote failed: %v", err))
402                 }
403         }
404 }