OSDN Git Service

delete generate block for api
[bytom/vapor.git] / node / node.go
1 package node
2
3 import (
4         "context"
5         "encoding/json"
6         "errors"
7         "net"
8         "net/http"
9         _ "net/http/pprof"
10         "os"
11         "path/filepath"
12         "strings"
13         "time"
14
15         "github.com/prometheus/prometheus/util/flock"
16         log "github.com/sirupsen/logrus"
17         cmn "github.com/tendermint/tmlibs/common"
18         dbm "github.com/tendermint/tmlibs/db"
19         browser "github.com/toqueteos/webbrowser"
20
21         "github.com/vapor/accesstoken"
22         "github.com/vapor/account"
23         "github.com/vapor/api"
24         "github.com/vapor/asset"
25         "github.com/vapor/blockchain/pseudohsm"
26         "github.com/vapor/blockchain/txfeed"
27         "github.com/vapor/common"
28         cfg "github.com/vapor/config"
29         "github.com/vapor/consensus"
30         engine "github.com/vapor/consensus/consensus"
31         "github.com/vapor/consensus/consensus/dpos"
32         "github.com/vapor/crypto/ed25519/chainkd"
33         "github.com/vapor/database/leveldb"
34         "github.com/vapor/env"
35         "github.com/vapor/mining/miner"
36         "github.com/vapor/net/websocket"
37         "github.com/vapor/netsync"
38         "github.com/vapor/protocol"
39         "github.com/vapor/protocol/bc"
40         "github.com/vapor/util"
41         w "github.com/vapor/wallet"
42 )
43
44 const (
45         webHost           = "http://127.0.0.1"
46         maxNewBlockChSize = 1024
47 )
48
49 var consensusEngine engine.Engine
50
51 type Node struct {
52         cmn.BaseService
53
54         // config
55         config *cfg.Config
56
57         syncManager *netsync.SyncManager
58
59         //bcReactor    *bc.BlockchainReactor
60         wallet          *w.Wallet
61         accessTokens    *accesstoken.CredentialStore
62         notificationMgr *websocket.WSNotificationManager
63         api             *api.API
64         chain           *protocol.Chain
65         txfeed          *txfeed.Tracker
66         //cpuMiner        *cpuminer.CPUMiner
67         miner *miner.Miner
68
69         miningEnable bool
70
71         newBlockCh chan *bc.Hash
72 }
73
74 func NewNode(config *cfg.Config) *Node {
75         ctx := context.Background()
76         if err := lockDataDirectory(config); err != nil {
77                 cmn.Exit("Error: " + err.Error())
78         }
79         initLogFile(config)
80         initActiveNetParams(config)
81         initConsensusConfig(config)
82         initCommonConfig(config)
83
84         util.MainchainConfig = config.MainChain
85         util.ValidatePegin = config.ValidatePegin
86         // Get store
87         if config.DBBackend != "memdb" && config.DBBackend != "leveldb" {
88                 cmn.Exit(cmn.Fmt("Param db_backend [%v] is invalid, use leveldb or memdb", config.DBBackend))
89         }
90         coreDB := dbm.NewDB("core", config.DBBackend, config.DBDir())
91         store := leveldb.NewStore(coreDB)
92
93         tokenDB := dbm.NewDB("accesstoken", config.DBBackend, config.DBDir())
94         accessTokens := accesstoken.NewStore(tokenDB)
95
96         txPool := protocol.NewTxPool(store)
97         chain, err := protocol.NewChain(store, txPool)
98         if err != nil {
99                 cmn.Exit(cmn.Fmt("Failed to create chain structure: %v", err))
100         }
101
102         var accounts *account.Manager = nil
103         var assets *asset.Registry = nil
104         var wallet *w.Wallet = nil
105         var txFeed *txfeed.Tracker = nil
106
107         txFeedDB := dbm.NewDB("txfeeds", config.DBBackend, config.DBDir())
108         txFeed = txfeed.NewTracker(txFeedDB, chain)
109
110         if err = txFeed.Prepare(ctx); err != nil {
111                 log.WithField("error", err).Error("start txfeed")
112                 return nil
113         }
114
115         hsm, err := pseudohsm.New(config.KeysDir())
116         if err != nil {
117                 cmn.Exit(cmn.Fmt("initialize HSM failed: %v", err))
118         }
119
120         if !config.Wallet.Disable {
121                 address, err := common.DecodeAddress(config.Consensus.Dpos.Coinbase, &consensus.ActiveNetParams)
122                 if err != nil {
123                         cmn.Exit(cmn.Fmt("DecodeAddress: %v", err))
124                 }
125                 walletDB := dbm.NewDB("wallet", config.DBBackend, config.DBDir())
126                 accounts = account.NewManager(walletDB, chain)
127                 assets = asset.NewRegistry(walletDB, chain)
128                 wallet, err = w.NewWallet(walletDB, accounts, assets, hsm, chain, address)
129                 if err != nil {
130                         log.WithField("error", err).Error("init NewWallet")
131                 }
132
133                 // trigger rescan wallet
134                 if config.Wallet.Rescan {
135                         wallet.RescanBlocks()
136                 }
137         }
138         newBlockCh := make(chan *bc.Hash, maxNewBlockChSize)
139
140         syncManager, _ := netsync.NewSyncManager(config, chain, txPool, newBlockCh)
141
142         notificationMgr := websocket.NewWsNotificationManager(config.Websocket.MaxNumWebsockets, config.Websocket.MaxNumConcurrentReqs, chain)
143
144         // get transaction from txPool and send it to syncManager and wallet
145         go newPoolTxListener(txPool, syncManager, wallet, notificationMgr)
146
147         // run the profile server
148         profileHost := config.ProfListenAddress
149         if profileHost != "" {
150                 // Profiling bytomd programs.see (https://blog.golang.org/profiling-go-programs)
151                 // go tool pprof http://profileHose/debug/pprof/heap
152                 go func() {
153                         if err = http.ListenAndServe(profileHost, nil); err != nil {
154                                 cmn.Exit(cmn.Fmt("Failed to register tcp profileHost: %v", err))
155                         }
156                 }()
157         }
158
159         node := &Node{
160                 config:       config,
161                 syncManager:  syncManager,
162                 accessTokens: accessTokens,
163                 wallet:       wallet,
164                 chain:        chain,
165                 txfeed:       txFeed,
166                 miningEnable: config.Mining,
167
168                 newBlockCh:      newBlockCh,
169                 notificationMgr: notificationMgr,
170         }
171
172         //node.cpuMiner = cpuminer.NewCPUMiner(chain, accounts, txPool, newBlockCh)
173         consensusEngine = createConsensusEngine(config, store)
174         node.miner = miner.NewMiner(chain, accounts, txPool, newBlockCh, consensusEngine)
175
176         node.BaseService = *cmn.NewBaseService(nil, "Node", node)
177
178         return node
179 }
180
181 // newPoolTxListener listener transaction from txPool, and send it to syncManager and wallet
182 func newPoolTxListener(txPool *protocol.TxPool, syncManager *netsync.SyncManager, wallet *w.Wallet, notificationMgr *websocket.WSNotificationManager) {
183         txMsgCh := txPool.GetMsgCh()
184         syncManagerTxCh := syncManager.GetNewTxCh()
185
186         for {
187                 msg := <-txMsgCh
188                 switch msg.MsgType {
189                 case protocol.MsgNewTx:
190                         syncManagerTxCh <- msg.Tx
191                         if wallet != nil {
192                                 wallet.AddUnconfirmedTx(msg.TxDesc)
193                         }
194                         notificationMgr.NotifyMempoolTx(msg.Tx)
195                 case protocol.MsgRemoveTx:
196                         if wallet != nil {
197                                 wallet.RemoveUnconfirmedTx(msg.TxDesc)
198                         }
199                 default:
200                         log.Warn("got unknow message type from the txPool channel")
201                 }
202         }
203 }
204
205 // Lock data directory after daemonization
206 func lockDataDirectory(config *cfg.Config) error {
207         _, _, err := flock.New(filepath.Join(config.RootDir, "LOCK"))
208         if err != nil {
209                 return errors.New("datadir already used by another process")
210         }
211         return nil
212 }
213
214 func initActiveNetParams(config *cfg.Config) {
215         var exist bool
216         consensus.ActiveNetParams, exist = consensus.NetParams[config.ChainID]
217         if !exist {
218                 cmn.Exit(cmn.Fmt("chain_id[%v] don't exist", config.ChainID))
219         }
220         if config.Side.FedpegXPubs != "" {
221                 var federationRedeemXPubs []chainkd.XPub
222                 fedpegXPubs := strings.Split(config.Side.FedpegXPubs, ",")
223                 for _, xpubStr := range fedpegXPubs {
224                         var xpub chainkd.XPub
225                         xpub.UnmarshalText([]byte(xpubStr))
226                         federationRedeemXPubs = append(federationRedeemXPubs, xpub)
227                 }
228                 consensus.ActiveNetParams.FedpegXPubs = federationRedeemXPubs
229         }
230
231         consensus.ActiveNetParams.Signer = config.Signer
232         consensus.ActiveNetParams.PeginMinDepth = config.Side.PeginMinDepth
233         consensus.ActiveNetParams.ParentGenesisBlockHash = config.Side.ParentGenesisBlockHash
234 }
235
236 func initLogFile(config *cfg.Config) {
237         if config.LogFile == "" {
238                 return
239         }
240         cmn.EnsureDir(filepath.Dir(config.LogFile), 0700)
241         file, err := os.OpenFile(config.LogFile, os.O_CREATE|os.O_WRONLY|os.O_APPEND, 0666)
242         if err == nil {
243                 log.SetOutput(file)
244         } else {
245                 log.WithField("err", err).Info("using default")
246         }
247
248 }
249
250 func initCommonConfig(config *cfg.Config) {
251         cfg.CommonConfig = config
252 }
253
254 // Lanch web broser or not
255 func launchWebBrowser(port string) {
256         webAddress := webHost + ":" + port
257         log.Info("Launching System Browser with :", webAddress)
258         if err := browser.Open(webAddress); err != nil {
259                 log.Error(err.Error())
260                 return
261         }
262 }
263
264 func (n *Node) initAndstartApiServer() {
265         n.api = api.NewAPI(n.syncManager, n.wallet, n.txfeed, n.miner, n.chain, n.config, n.accessTokens, n.newBlockCh, n.notificationMgr)
266
267         listenAddr := env.String("LISTEN", n.config.ApiAddress)
268         env.Parse()
269         n.api.StartServer(*listenAddr)
270 }
271
272 func (n *Node) OnStart() error {
273         if n.miningEnable {
274                 if _, err := n.wallet.AccountMgr.GetMiningAddress(); err != nil {
275                         n.miningEnable = false
276                         log.Error(err)
277                 } else {
278                         //n.cpuMiner.Start()
279                         n.miner.Start()
280                 }
281         }
282         if !n.config.VaultMode {
283                 n.syncManager.Start()
284         }
285         n.initAndstartApiServer()
286         n.notificationMgr.Start()
287         if !n.config.Web.Closed {
288                 _, port, err := net.SplitHostPort(n.config.ApiAddress)
289                 if err != nil {
290                         log.Error("Invalid api address")
291                         return err
292                 }
293                 launchWebBrowser(port)
294         }
295         go bytomdRPCCheck()
296         return nil
297 }
298
299 func (n *Node) OnStop() {
300         n.notificationMgr.Shutdown()
301         n.notificationMgr.WaitForShutdown()
302         n.BaseService.OnStop()
303         if n.miningEnable {
304                 n.miner.Stop()
305         }
306         if !n.config.VaultMode {
307                 n.syncManager.Stop()
308         }
309 }
310
311 func (n *Node) RunForever() {
312         // Sleep forever and then...
313         cmn.TrapSignal(func() {
314                 n.Stop()
315         })
316 }
317
318 func (n *Node) SyncManager() *netsync.SyncManager {
319         return n.syncManager
320 }
321
322 /**bytomdRPCCheck Check if bytomd connection via RPC is correctly working*/
323 func bytomdRPCCheck() bool {
324         type Req struct {
325                 BlockHeight uint64 `json:"block_height"`
326         }
327         if util.ValidatePegin {
328                 for {
329                         resp, err := util.CallRPC("/get-merkle-proof", &Req{BlockHeight: 0})
330                         if err != nil {
331                                 log.Error("Call mainchain interface get-block-header failed")
332                                 time.Sleep(time.Millisecond * 1000)
333                                 continue
334                         }
335                         tmp, _ := json.Marshal(resp)
336                         var blockHeader api.GetBlockHeaderResp
337                         json.Unmarshal(tmp, &blockHeader)
338                         hash := blockHeader.BlockHeader.Hash()
339                         if strings.Compare(consensus.ActiveNetParams.ParentGenesisBlockHash, hash.String()) != 0 {
340                                 log.Error("Invalid parent genesis block hash response via RPC. Contacting wrong parent daemon?", consensus.ActiveNetParams.ParentGenesisBlockHash, ":", hash.String())
341                                 return false
342                         }
343                         break
344                 }
345         }
346
347         return true
348 }
349
350 func initConsensusConfig(config *cfg.Config) {
351         if config.ConsensusConfigFile == "" {
352                 // poa
353         } else {
354                 //
355                 file, err := os.Open(config.ConsensusConfigFile)
356                 if err != nil {
357                         cmn.Exit(cmn.Fmt("Failed to read consensus file: %v", err))
358                 }
359                 defer file.Close()
360
361                 if err := json.NewDecoder(file).Decode(config); err != nil {
362                         cmn.Exit(cmn.Fmt("invalid consensus file: %v", err))
363                 }
364
365                 for _, v := range config.Consensus.Dpos.SelfVoteSigners {
366                         address, err := common.DecodeAddress(v, &consensus.ActiveNetParams)
367                         if err != nil {
368                                 cmn.Exit(cmn.Fmt("Address resolution failed: %v", err))
369                         }
370                         config.Consensus.Dpos.Signers = append(config.Consensus.Dpos.Signers, address)
371                 }
372         }
373 }
374
375 func createConsensusEngine(config *cfg.Config, store protocol.Store) engine.Engine {
376         if config.Consensus.Dpos != nil {
377                 return dpos.New(config.Consensus.Dpos, store)
378         } else {
379                 return nil
380         }
381 }
382
383 func GetConsensusEngine() engine.Engine {
384         return consensusEngine
385 }