OSDN Git Service

Add png dir
[bytom/vapor.git] / node / node.go
1 package node
2
3 import (
4         "context"
5         "encoding/json"
6         "errors"
7         "net"
8         "net/http"
9         _ "net/http/pprof"
10         "os"
11         "path/filepath"
12         "strings"
13         "time"
14
15         "github.com/prometheus/prometheus/util/flock"
16         log "github.com/sirupsen/logrus"
17         cmn "github.com/tendermint/tmlibs/common"
18         dbm "github.com/tendermint/tmlibs/db"
19         browser "github.com/toqueteos/webbrowser"
20
21         "github.com/vapor/accesstoken"
22         "github.com/vapor/account"
23         "github.com/vapor/api"
24         "github.com/vapor/asset"
25         "github.com/vapor/blockchain/pseudohsm"
26         "github.com/vapor/blockchain/txfeed"
27         "github.com/vapor/common"
28         cfg "github.com/vapor/config"
29         "github.com/vapor/consensus"
30         "github.com/vapor/crypto/ed25519/chainkd"
31         "github.com/vapor/database/leveldb"
32         "github.com/vapor/env"
33         "github.com/vapor/mining/miner"
34         "github.com/vapor/net/websocket"
35         "github.com/vapor/netsync"
36         "github.com/vapor/protocol"
37         "github.com/vapor/protocol/bc"
38         "github.com/vapor/util"
39         w "github.com/vapor/wallet"
40 )
41
42 const (
43         webHost           = "http://127.0.0.1"
44         maxNewBlockChSize = 1024
45 )
46
47 type Node struct {
48         cmn.BaseService
49
50         // config
51         config *cfg.Config
52
53         syncManager *netsync.SyncManager
54
55         //bcReactor    *bc.BlockchainReactor
56         wallet          *w.Wallet
57         accessTokens    *accesstoken.CredentialStore
58         notificationMgr *websocket.WSNotificationManager
59         api             *api.API
60         chain           *protocol.Chain
61         txfeed          *txfeed.Tracker
62         //cpuMiner        *cpuminer.CPUMiner
63         miner *miner.Miner
64
65         miningEnable bool
66
67         newBlockCh chan *bc.Hash
68 }
69
70 func NewNode(config *cfg.Config) *Node {
71         ctx := context.Background()
72         if err := lockDataDirectory(config); err != nil {
73                 cmn.Exit("Error: " + err.Error())
74         }
75         initLogFile(config)
76         initActiveNetParams(config)
77         initConsensusConfig(config)
78         initCommonConfig(config)
79
80         util.MainchainConfig = config.MainChain
81         util.ValidatePegin = config.ValidatePegin
82         // Get store
83         if config.DBBackend != "memdb" && config.DBBackend != "leveldb" {
84                 cmn.Exit(cmn.Fmt("Param db_backend [%v] is invalid, use leveldb or memdb", config.DBBackend))
85         }
86         coreDB := dbm.NewDB("core", config.DBBackend, config.DBDir())
87         store := leveldb.NewStore(coreDB)
88
89         tokenDB := dbm.NewDB("accesstoken", config.DBBackend, config.DBDir())
90         accessTokens := accesstoken.NewStore(tokenDB)
91
92         txPool := protocol.NewTxPool(store)
93         chain, err := protocol.NewChain(store, txPool)
94         if err != nil {
95                 cmn.Exit(cmn.Fmt("Failed to create chain structure: %v", err))
96         }
97
98         switch config.Consensus.Type {
99         case "dpos":
100                 initDpos(chain, config)
101         }
102
103         var accounts *account.Manager = nil
104         var assets *asset.Registry = nil
105         var wallet *w.Wallet = nil
106         var txFeed *txfeed.Tracker = nil
107
108         txFeedDB := dbm.NewDB("txfeeds", config.DBBackend, config.DBDir())
109         txFeed = txfeed.NewTracker(txFeedDB, chain)
110
111         if err = txFeed.Prepare(ctx); err != nil {
112                 log.WithField("error", err).Error("start txfeed")
113                 return nil
114         }
115
116         hsm, err := pseudohsm.New(config.KeysDir())
117         if err != nil {
118                 cmn.Exit(cmn.Fmt("initialize HSM failed: %v", err))
119         }
120
121         if !config.Wallet.Disable {
122                 address, err := common.DecodeAddress(config.Consensus.Coinbase, &consensus.ActiveNetParams)
123                 if err != nil {
124                         cmn.Exit(cmn.Fmt("DecodeAddress: %v", err))
125                 }
126                 walletDB := dbm.NewDB("wallet", config.DBBackend, config.DBDir())
127                 accounts = account.NewManager(walletDB, chain)
128                 assets = asset.NewRegistry(walletDB, chain)
129                 wallet, err = w.NewWallet(walletDB, accounts, assets, hsm, chain, address)
130                 if err != nil {
131                         log.WithField("error", err).Error("init NewWallet")
132                 }
133
134                 // trigger rescan wallet
135                 if config.Wallet.Rescan {
136                         wallet.RescanBlocks()
137                 }
138         }
139         newBlockCh := make(chan *bc.Hash, maxNewBlockChSize)
140
141         syncManager, _ := netsync.NewSyncManager(config, chain, txPool, newBlockCh)
142
143         notificationMgr := websocket.NewWsNotificationManager(config.Websocket.MaxNumWebsockets, config.Websocket.MaxNumConcurrentReqs, chain)
144
145         // get transaction from txPool and send it to syncManager and wallet
146         go newPoolTxListener(txPool, syncManager, wallet, notificationMgr)
147
148         // run the profile server
149         profileHost := config.ProfListenAddress
150         if profileHost != "" {
151                 // Profiling bytomd programs.see (https://blog.golang.org/profiling-go-programs)
152                 // go tool pprof http://profileHose/debug/pprof/heap
153                 go func() {
154                         if err = http.ListenAndServe(profileHost, nil); err != nil {
155                                 cmn.Exit(cmn.Fmt("Failed to register tcp profileHost: %v", err))
156                         }
157                 }()
158         }
159         node := &Node{
160                 config:       config,
161                 syncManager:  syncManager,
162                 accessTokens: accessTokens,
163                 wallet:       wallet,
164                 chain:        chain,
165                 txfeed:       txFeed,
166                 miningEnable: config.Mining,
167
168                 newBlockCh:      newBlockCh,
169                 notificationMgr: notificationMgr,
170         }
171
172         node.miner = miner.NewMiner(chain, accounts, txPool, newBlockCh)
173         node.BaseService = *cmn.NewBaseService(nil, "Node", node)
174
175         return node
176 }
177
178 // newPoolTxListener listener transaction from txPool, and send it to syncManager and wallet
179 func newPoolTxListener(txPool *protocol.TxPool, syncManager *netsync.SyncManager, wallet *w.Wallet, notificationMgr *websocket.WSNotificationManager) {
180         txMsgCh := txPool.GetMsgCh()
181         syncManagerTxCh := syncManager.GetNewTxCh()
182
183         for {
184                 msg := <-txMsgCh
185                 switch msg.MsgType {
186                 case protocol.MsgNewTx:
187                         syncManagerTxCh <- msg.Tx
188                         if wallet != nil {
189                                 wallet.AddUnconfirmedTx(msg.TxDesc)
190                         }
191                         notificationMgr.NotifyMempoolTx(msg.Tx)
192                 case protocol.MsgRemoveTx:
193                         if wallet != nil {
194                                 wallet.RemoveUnconfirmedTx(msg.TxDesc)
195                         }
196                 default:
197                         log.Warn("got unknow message type from the txPool channel")
198                 }
199         }
200 }
201
202 // Lock data directory after daemonization
203 func lockDataDirectory(config *cfg.Config) error {
204         _, _, err := flock.New(filepath.Join(config.RootDir, "LOCK"))
205         if err != nil {
206                 return errors.New("datadir already used by another process")
207         }
208         return nil
209 }
210
211 func initActiveNetParams(config *cfg.Config) {
212         var exist bool
213         consensus.ActiveNetParams, exist = consensus.NetParams[config.ChainID]
214         if !exist {
215                 cmn.Exit(cmn.Fmt("chain_id[%v] don't exist", config.ChainID))
216         }
217         if config.Side.FedpegXPubs != "" {
218                 var federationRedeemXPubs []chainkd.XPub
219                 fedpegXPubs := strings.Split(config.Side.FedpegXPubs, ",")
220                 for _, xpubStr := range fedpegXPubs {
221                         var xpub chainkd.XPub
222                         xpub.UnmarshalText([]byte(xpubStr))
223                         federationRedeemXPubs = append(federationRedeemXPubs, xpub)
224                 }
225                 consensus.ActiveNetParams.FedpegXPubs = federationRedeemXPubs
226         }
227
228         consensus.ActiveNetParams.Signer = config.Signer
229         consensus.ActiveNetParams.PeginMinDepth = config.Side.PeginMinDepth
230         consensus.ActiveNetParams.ParentGenesisBlockHash = config.Side.ParentGenesisBlockHash
231 }
232
233 func initLogFile(config *cfg.Config) {
234         if config.LogFile == "" {
235                 return
236         }
237         cmn.EnsureDir(filepath.Dir(config.LogFile), 0700)
238         file, err := os.OpenFile(config.LogFile, os.O_CREATE|os.O_WRONLY|os.O_APPEND, 0666)
239         if err == nil {
240                 log.SetOutput(file)
241         } else {
242                 log.WithField("err", err).Info("using default")
243         }
244
245 }
246
247 func initCommonConfig(config *cfg.Config) {
248         cfg.CommonConfig = config
249 }
250
251 // Lanch web broser or not
252 func launchWebBrowser(port string) {
253         webAddress := webHost + ":" + port
254         log.Info("Launching System Browser with :", webAddress)
255         if err := browser.Open(webAddress); err != nil {
256                 log.Error(err.Error())
257                 return
258         }
259 }
260
261 func (n *Node) initAndstartApiServer() {
262         n.api = api.NewAPI(n.syncManager, n.wallet, n.txfeed, n.miner, n.chain, n.config, n.accessTokens, n.newBlockCh, n.notificationMgr)
263
264         listenAddr := env.String("LISTEN", n.config.ApiAddress)
265         env.Parse()
266         n.api.StartServer(*listenAddr)
267 }
268
269 func (n *Node) OnStart() error {
270         if n.miningEnable {
271                 if _, err := n.wallet.AccountMgr.GetMiningAddress(); err != nil {
272                         n.miningEnable = false
273                         log.Error(err)
274                 } else {
275                         //n.cpuMiner.Start()
276                         n.miner.Start()
277                 }
278         }
279         if !n.config.VaultMode {
280                 n.syncManager.Start()
281         }
282         n.initAndstartApiServer()
283         n.notificationMgr.Start()
284         if !n.config.Web.Closed {
285                 _, port, err := net.SplitHostPort(n.config.ApiAddress)
286                 if err != nil {
287                         log.Error("Invalid api address")
288                         return err
289                 }
290                 launchWebBrowser(port)
291         }
292         go bytomdRPCCheck()
293         return nil
294 }
295
296 func (n *Node) OnStop() {
297         if err := n.chain.Engine.Finish(); err != nil {
298                 log.Errorf("OnStop: %v", err)
299         }
300
301         n.notificationMgr.Shutdown()
302         n.notificationMgr.WaitForShutdown()
303         n.BaseService.OnStop()
304         if n.miningEnable {
305                 n.miner.Stop()
306         }
307         if !n.config.VaultMode {
308                 n.syncManager.Stop()
309         }
310 }
311
312 func (n *Node) RunForever() {
313         // Sleep forever and then...
314         cmn.TrapSignal(func() {
315                 n.Stop()
316         })
317 }
318
319 func (n *Node) SyncManager() *netsync.SyncManager {
320         return n.syncManager
321 }
322
323 /**bytomdRPCCheck Check if bytomd connection via RPC is correctly working*/
324 func bytomdRPCCheck() bool {
325         type Req struct {
326                 BlockHeight uint64 `json:"block_height"`
327         }
328         if util.ValidatePegin {
329                 for {
330                         resp, err := util.CallRPC("/get-merkle-proof", &Req{BlockHeight: 0})
331                         if err != nil {
332                                 log.Error("Call mainchain interface get-block-header failed")
333                                 time.Sleep(time.Millisecond * 1000)
334                                 continue
335                         }
336                         tmp, _ := json.Marshal(resp)
337                         var blockHeader api.GetBlockHeaderResp
338                         json.Unmarshal(tmp, &blockHeader)
339                         hash := blockHeader.BlockHeader.Hash()
340                         if strings.Compare(consensus.ActiveNetParams.ParentGenesisBlockHash, hash.String()) != 0 {
341                                 log.Error("Invalid parent genesis block hash response via RPC. Contacting wrong parent daemon?", consensus.ActiveNetParams.ParentGenesisBlockHash, ":", hash.String())
342                                 return false
343                         }
344                         break
345                 }
346         }
347
348         return true
349 }
350
351 func initConsensusConfig(config *cfg.Config) {
352         if config.ConsensusConfigFile == "" {
353                 // poa
354         } else {
355                 //
356                 file, err := os.Open(config.ConsensusConfigFile)
357                 if err != nil {
358                         cmn.Exit(cmn.Fmt("Failed to read consensus file: %v", err))
359                 }
360                 defer file.Close()
361
362                 if err := json.NewDecoder(file).Decode(config); err != nil {
363                         cmn.Exit(cmn.Fmt("invalid consensus file: %v", err))
364                 }
365
366                 for _, v := range config.Consensus.SelfVoteSigners {
367                         address, err := common.DecodeAddress(v, &consensus.ActiveNetParams)
368                         if err != nil {
369                                 cmn.Exit(cmn.Fmt("Address resolution failed: %v", err))
370                         }
371                         config.Consensus.Signers = append(config.Consensus.Signers, address)
372                 }
373         }
374 }
375
376 func initDpos(chain *protocol.Chain, config *cfg.Config) {
377         header := chain.BestBlockHeader()
378         height := header.Height
379         hash := header.Hash()
380         maxSignerCount := config.Consensus.MaxSignerCount
381         period := config.Consensus.Period
382         err := chain.Engine.Init(chain, maxSignerCount, period, height, hash)
383
384         if height > 0 {
385                 oldBlockHeight := chain.Engine.GetOldBlockHeight()
386                 oldBlockHash := chain.Engine.GetOldBlockHash()
387                 if err != nil {
388                         oldBlockHeight = 0
389                         header, _ = chain.GetHeaderByHeight(oldBlockHeight)
390                         oldBlockHash = header.Hash()
391                 }
392
393                 if err := chain.RepairDPoSData(oldBlockHeight, oldBlockHash); err != nil {
394                         cmn.Exit(cmn.Fmt("initVote failed: %v", err))
395                 }
396         }
397 }