OSDN Git Service

7354485bd51267c1c1e1d168859b8a5f9817e299
[bytom/vapor.git] / node / node.go
1 package node
2
3 import (
4         "context"
5         "encoding/json"
6         "errors"
7         "net"
8         "net/http"
9         _ "net/http/pprof"
10         "os"
11         "path/filepath"
12         "strings"
13         "time"
14
15         "github.com/prometheus/prometheus/util/flock"
16         log "github.com/sirupsen/logrus"
17         cmn "github.com/tendermint/tmlibs/common"
18         browser "github.com/toqueteos/webbrowser"
19
20         "github.com/vapor/accesstoken"
21         "github.com/vapor/account"
22         "github.com/vapor/api"
23         "github.com/vapor/asset"
24         "github.com/vapor/blockchain/pseudohsm"
25         "github.com/vapor/blockchain/txfeed"
26         "github.com/vapor/common"
27         cfg "github.com/vapor/config"
28         "github.com/vapor/consensus"
29         "github.com/vapor/crypto/ed25519/chainkd"
30         "github.com/vapor/database"
31         dbm "github.com/vapor/database/db"
32         _ "github.com/vapor/database/leveldb"
33         "github.com/vapor/database/orm"
34         _ "github.com/vapor/database/sqlite"
35         "github.com/vapor/env"
36         "github.com/vapor/mining/miner"
37         "github.com/vapor/net/websocket"
38         "github.com/vapor/netsync"
39         "github.com/vapor/protocol"
40         "github.com/vapor/protocol/bc"
41         "github.com/vapor/util"
42         w "github.com/vapor/wallet"
43 )
44
45 const (
46         webHost           = "http://127.0.0.1"
47         maxNewBlockChSize = 1024
48 )
49
50 type Node struct {
51         cmn.BaseService
52
53         db dbm.SQLDB
54         // config
55         config *cfg.Config
56
57         syncManager *netsync.SyncManager
58
59         //bcReactor    *bc.BlockchainReactor
60         wallet          *w.Wallet
61         accessTokens    *accesstoken.CredentialStore
62         notificationMgr *websocket.WSNotificationManager
63         api             *api.API
64         chain           *protocol.Chain
65         txfeed          *txfeed.Tracker
66         //cpuMiner        *cpuminer.CPUMiner
67         miner *miner.Miner
68
69         miningEnable bool
70
71         newBlockCh chan *bc.Hash
72 }
73
74 func NewNode(config *cfg.Config) *Node {
75         ctx := context.Background()
76         if err := lockDataDirectory(config); err != nil {
77                 cmn.Exit("Error: " + err.Error())
78         }
79         initLogFile(config)
80         initActiveNetParams(config)
81         initConsensusConfig(config)
82         initCommonConfig(config)
83
84         util.MainchainConfig = config.MainChain
85         util.ValidatePegin = config.ValidatePegin
86         // Get store
87         if config.DBBackend != "memdb" && config.DBBackend != "leveldb" {
88                 cmn.Exit(cmn.Fmt("Param db_backend [%v] is invalid, use leveldb or memdb", config.DBBackend))
89         }
90
91         sqlDB := dbm.NewSqlDB("sql", "sqlitedb", config.DBDir())
92         initDatabaseTable(sqlDB)
93         sqlStore := database.NewSQLStore(sqlDB)
94
95         tokenDB := dbm.NewDB("accesstoken", config.DBBackend, config.DBDir())
96         accessTokens := accesstoken.NewStore(tokenDB)
97
98         txPool := protocol.NewTxPool(sqlStore)
99         chain, err := protocol.NewChain(sqlStore, txPool)
100         if err != nil {
101                 cmn.Exit(cmn.Fmt("Failed to create chain structure: %v", err))
102         }
103
104         switch config.Consensus.Type {
105         case "dpos":
106                 initDpos(chain, config)
107         }
108
109         var accounts *account.Manager = nil
110         var assets *asset.Registry = nil
111         var wallet *w.Wallet = nil
112         var txFeed *txfeed.Tracker = nil
113
114         txFeedDB := dbm.NewDB("txfeeds", config.DBBackend, config.DBDir())
115         txFeed = txfeed.NewTracker(txFeedDB, chain)
116
117         if err = txFeed.Prepare(ctx); err != nil {
118                 log.WithField("error", err).Error("start txfeed")
119                 return nil
120         }
121
122         hsm, err := pseudohsm.New(config.KeysDir())
123         if err != nil {
124                 cmn.Exit(cmn.Fmt("initialize HSM failed: %v", err))
125         }
126
127         if !config.Wallet.Disable {
128                 address, err := common.DecodeAddress(config.Consensus.Coinbase, &consensus.ActiveNetParams)
129                 if err != nil {
130                         cmn.Exit(cmn.Fmt("DecodeAddress: %v", err))
131                 }
132                 walletDB := dbm.NewDB("wallet", config.DBBackend, config.DBDir())
133                 accounts = account.NewManager(walletDB, chain)
134                 assets = asset.NewRegistry(walletDB, chain)
135                 wallet, err = w.NewWallet(walletDB, accounts, assets, hsm, chain, address)
136                 if err != nil {
137                         log.WithField("error", err).Error("init NewWallet")
138                 }
139
140                 // trigger rescan wallet
141                 if config.Wallet.Rescan {
142                         wallet.RescanBlocks()
143                 }
144         }
145         newBlockCh := make(chan *bc.Hash, maxNewBlockChSize)
146
147         syncManager, _ := netsync.NewSyncManager(config, chain, txPool, newBlockCh)
148
149         notificationMgr := websocket.NewWsNotificationManager(config.Websocket.MaxNumWebsockets, config.Websocket.MaxNumConcurrentReqs, chain)
150
151         // get transaction from txPool and send it to syncManager and wallet
152         go newPoolTxListener(txPool, syncManager, wallet, notificationMgr)
153
154         // run the profile server
155         profileHost := config.ProfListenAddress
156         if profileHost != "" {
157                 // Profiling bytomd programs.see (https://blog.golang.org/profiling-go-programs)
158                 // go tool pprof http://profileHose/debug/pprof/heap
159                 go func() {
160                         if err = http.ListenAndServe(profileHost, nil); err != nil {
161                                 cmn.Exit(cmn.Fmt("Failed to register tcp profileHost: %v", err))
162                         }
163                 }()
164         }
165         node := &Node{
166                 db:           sqlDB,
167                 config:       config,
168                 syncManager:  syncManager,
169                 accessTokens: accessTokens,
170                 wallet:       wallet,
171                 chain:        chain,
172                 txfeed:       txFeed,
173                 miningEnable: config.Mining,
174
175                 newBlockCh:      newBlockCh,
176                 notificationMgr: notificationMgr,
177         }
178
179         node.miner = miner.NewMiner(chain, accounts, txPool, newBlockCh)
180         node.BaseService = *cmn.NewBaseService(nil, "Node", node)
181
182         return node
183 }
184
185 // newPoolTxListener listener transaction from txPool, and send it to syncManager and wallet
186 func newPoolTxListener(txPool *protocol.TxPool, syncManager *netsync.SyncManager, wallet *w.Wallet, notificationMgr *websocket.WSNotificationManager) {
187         txMsgCh := txPool.GetMsgCh()
188         syncManagerTxCh := syncManager.GetNewTxCh()
189
190         for {
191                 msg := <-txMsgCh
192                 switch msg.MsgType {
193                 case protocol.MsgNewTx:
194                         syncManagerTxCh <- msg.Tx
195                         if wallet != nil {
196                                 wallet.AddUnconfirmedTx(msg.TxDesc)
197                         }
198                         notificationMgr.NotifyMempoolTx(msg.Tx)
199                 case protocol.MsgRemoveTx:
200                         if wallet != nil {
201                                 wallet.RemoveUnconfirmedTx(msg.TxDesc)
202                         }
203                 default:
204                         log.Warn("got unknow message type from the txPool channel")
205                 }
206         }
207 }
208
209 // Lock data directory after daemonization
210 func lockDataDirectory(config *cfg.Config) error {
211         _, _, err := flock.New(filepath.Join(config.RootDir, "LOCK"))
212         if err != nil {
213                 return errors.New("datadir already used by another process")
214         }
215         return nil
216 }
217
218 func initActiveNetParams(config *cfg.Config) {
219         var exist bool
220         consensus.ActiveNetParams, exist = consensus.NetParams[config.ChainID]
221         if !exist {
222                 cmn.Exit(cmn.Fmt("chain_id[%v] don't exist", config.ChainID))
223         }
224         if config.Side.FedpegXPubs != "" {
225                 var federationRedeemXPubs []chainkd.XPub
226                 fedpegXPubs := strings.Split(config.Side.FedpegXPubs, ",")
227                 for _, xpubStr := range fedpegXPubs {
228                         var xpub chainkd.XPub
229                         xpub.UnmarshalText([]byte(xpubStr))
230                         federationRedeemXPubs = append(federationRedeemXPubs, xpub)
231                 }
232                 consensus.ActiveNetParams.FedpegXPubs = federationRedeemXPubs
233         }
234
235         consensus.ActiveNetParams.Signer = config.Signer
236         consensus.ActiveNetParams.PeginMinDepth = config.Side.PeginMinDepth
237         consensus.ActiveNetParams.ParentGenesisBlockHash = config.Side.ParentGenesisBlockHash
238 }
239
240 func initLogFile(config *cfg.Config) {
241         if config.LogFile == "" {
242                 return
243         }
244         cmn.EnsureDir(filepath.Dir(config.LogFile), 0700)
245         file, err := os.OpenFile(config.LogFile, os.O_CREATE|os.O_WRONLY|os.O_APPEND, 0666)
246         if err == nil {
247                 log.SetOutput(file)
248         } else {
249                 log.WithField("err", err).Info("using default")
250         }
251
252 }
253
254 func initCommonConfig(config *cfg.Config) {
255         cfg.CommonConfig = config
256 }
257
258 // Lanch web broser or not
259 func launchWebBrowser(port string) {
260         webAddress := webHost + ":" + port
261         log.Info("Launching System Browser with :", webAddress)
262         if err := browser.Open(webAddress); err != nil {
263                 log.Error(err.Error())
264                 return
265         }
266 }
267
268 func (n *Node) initAndstartApiServer() {
269         n.api = api.NewAPI(n.syncManager, n.wallet, n.txfeed, n.miner, n.chain, n.config, n.accessTokens, n.newBlockCh, n.notificationMgr)
270
271         listenAddr := env.String("LISTEN", n.config.ApiAddress)
272         env.Parse()
273         n.api.StartServer(*listenAddr)
274 }
275
276 func (n *Node) OnStart() error {
277         if n.miningEnable {
278                 if _, err := n.wallet.AccountMgr.GetMiningAddress(); err != nil {
279                         n.miningEnable = false
280                         log.Error(err)
281                 } else {
282                         //n.cpuMiner.Start()
283                         n.miner.Start()
284                 }
285         }
286         if !n.config.VaultMode {
287                 n.syncManager.Start()
288         }
289         n.initAndstartApiServer()
290         n.notificationMgr.Start()
291         if !n.config.Web.Closed {
292                 _, port, err := net.SplitHostPort(n.config.ApiAddress)
293                 if err != nil {
294                         log.Error("Invalid api address")
295                         return err
296                 }
297                 launchWebBrowser(port)
298         }
299         go bytomdRPCCheck()
300         return nil
301 }
302
303 func (n *Node) OnStop() {
304         if err := n.chain.Engine.Finish(); err != nil {
305                 log.Errorf("OnStop: %v", err)
306         }
307
308         n.notificationMgr.Shutdown()
309         n.notificationMgr.WaitForShutdown()
310         n.BaseService.OnStop()
311         if n.miningEnable {
312                 n.miner.Stop()
313         }
314         if !n.config.VaultMode {
315                 n.syncManager.Stop()
316         }
317
318         n.db.Db().Close()
319 }
320
321 func (n *Node) RunForever() {
322         // Sleep forever and then...
323         cmn.TrapSignal(func() {
324                 n.Stop()
325         })
326 }
327
328 func (n *Node) SyncManager() *netsync.SyncManager {
329         return n.syncManager
330 }
331
332 /**bytomdRPCCheck Check if bytomd connection via RPC is correctly working*/
333 func bytomdRPCCheck() bool {
334         type Req struct {
335                 BlockHeight uint64 `json:"block_height"`
336         }
337         if util.ValidatePegin {
338                 for {
339                         resp, err := util.CallRPC("/get-merkle-proof", &Req{BlockHeight: 0})
340                         if err != nil {
341                                 log.Error("Call mainchain interface get-block-header failed")
342                                 time.Sleep(time.Millisecond * 1000)
343                                 continue
344                         }
345                         tmp, _ := json.Marshal(resp)
346                         var blockHeader api.GetBlockHeaderResp
347                         json.Unmarshal(tmp, &blockHeader)
348                         hash := blockHeader.BlockHeader.Hash()
349                         if strings.Compare(consensus.ActiveNetParams.ParentGenesisBlockHash, hash.String()) != 0 {
350                                 log.Error("Invalid parent genesis block hash response via RPC. Contacting wrong parent daemon?", consensus.ActiveNetParams.ParentGenesisBlockHash, ":", hash.String())
351                                 return false
352                         }
353                         break
354                 }
355         }
356
357         return true
358 }
359
360 func initConsensusConfig(config *cfg.Config) {
361         if config.ConsensusConfigFile == "" {
362                 // poa
363         } else {
364                 //
365                 file, err := os.Open(config.ConsensusConfigFile)
366                 if err != nil {
367                         cmn.Exit(cmn.Fmt("Failed to read consensus file: %v", err))
368                 }
369                 defer file.Close()
370
371                 if err := json.NewDecoder(file).Decode(config); err != nil {
372                         cmn.Exit(cmn.Fmt("invalid consensus file: %v", err))
373                 }
374
375                 for _, v := range config.Consensus.SelfVoteSigners {
376                         address, err := common.DecodeAddress(v, &consensus.ActiveNetParams)
377                         if err != nil {
378                                 cmn.Exit(cmn.Fmt("Address resolution failed: %v", err))
379                         }
380                         config.Consensus.Signers = append(config.Consensus.Signers, address)
381                 }
382         }
383 }
384
385 func initDpos(chain *protocol.Chain, config *cfg.Config) {
386         header := chain.BestBlockHeader()
387         height := header.Height
388         hash := header.Hash()
389         maxSignerCount := config.Consensus.MaxSignerCount
390         period := config.Consensus.Period
391         err := chain.Engine.Init(chain, maxSignerCount, period, height, hash)
392
393         if height > 0 {
394                 oldBlockHeight := chain.Engine.GetOldBlockHeight()
395                 oldBlockHash := chain.Engine.GetOldBlockHash()
396                 if err != nil {
397                         oldBlockHeight = 0
398                         header, _ = chain.GetHeaderByHeight(oldBlockHeight)
399                         oldBlockHash = header.Hash()
400                 }
401
402                 if err := chain.RepairDPoSData(oldBlockHeight, oldBlockHash); err != nil {
403                         cmn.Exit(cmn.Fmt("initVote failed: %v", err))
404                 }
405         }
406 }
407
408 func initDatabaseTable(db dbm.SQLDB) {
409         db.Db().AutoMigrate(&orm.BlockHeader{}, &orm.Transaction{}, &orm.BlockStoreState{}, &orm.ClaimTxState{}, &orm.Utxo{})
410 }