OSDN Git Service

modify side_chain_tool
[bytom/vapor.git] / node / node.go
1 package node
2
3 import (
4         "context"
5         "encoding/json"
6         "errors"
7         "net"
8         "net/http"
9         _ "net/http/pprof"
10         "os"
11         "path/filepath"
12         "strings"
13         "time"
14
15         "github.com/prometheus/prometheus/util/flock"
16         log "github.com/sirupsen/logrus"
17         cmn "github.com/tendermint/tmlibs/common"
18         dbm "github.com/tendermint/tmlibs/db"
19         browser "github.com/toqueteos/webbrowser"
20
21         "github.com/vapor/accesstoken"
22         "github.com/vapor/account"
23         "github.com/vapor/api"
24         "github.com/vapor/asset"
25         "github.com/vapor/blockchain/pseudohsm"
26         "github.com/vapor/blockchain/txfeed"
27         "github.com/vapor/common"
28         cfg "github.com/vapor/config"
29         "github.com/vapor/consensus"
30         engine "github.com/vapor/consensus/consensus"
31         "github.com/vapor/consensus/consensus/dpos"
32         "github.com/vapor/crypto/ed25519/chainkd"
33         "github.com/vapor/database/leveldb"
34         "github.com/vapor/env"
35         "github.com/vapor/mining/miner"
36         "github.com/vapor/mining/miningpool"
37         "github.com/vapor/net/websocket"
38         "github.com/vapor/netsync"
39         "github.com/vapor/protocol"
40         "github.com/vapor/protocol/bc"
41         "github.com/vapor/util"
42         w "github.com/vapor/wallet"
43 )
44
45 const (
46         webHost           = "http://127.0.0.1"
47         maxNewBlockChSize = 1024
48 )
49
50 var consensusEngine engine.Engine
51
52 type Node struct {
53         cmn.BaseService
54
55         // config
56         config *cfg.Config
57
58         syncManager *netsync.SyncManager
59
60         //bcReactor    *bc.BlockchainReactor
61         wallet          *w.Wallet
62         accessTokens    *accesstoken.CredentialStore
63         notificationMgr *websocket.WSNotificationManager
64         api             *api.API
65         chain           *protocol.Chain
66         txfeed          *txfeed.Tracker
67         //cpuMiner        *cpuminer.CPUMiner
68         miner *miner.Miner
69
70         miningPool   *miningpool.MiningPool
71         miningEnable bool
72
73         newBlockCh chan *bc.Hash
74 }
75
76 func NewNode(config *cfg.Config) *Node {
77         ctx := context.Background()
78         if err := lockDataDirectory(config); err != nil {
79                 cmn.Exit("Error: " + err.Error())
80         }
81         initLogFile(config)
82         initActiveNetParams(config)
83         initConsensusConfig(config)
84         initCommonConfig(config)
85
86         util.MainchainConfig = config.MainChain
87         util.ValidatePegin = config.ValidatePegin
88         // Get store
89         if config.DBBackend != "memdb" && config.DBBackend != "leveldb" {
90                 cmn.Exit(cmn.Fmt("Param db_backend [%v] is invalid, use leveldb or memdb", config.DBBackend))
91         }
92         coreDB := dbm.NewDB("core", config.DBBackend, config.DBDir())
93         store := leveldb.NewStore(coreDB)
94
95         tokenDB := dbm.NewDB("accesstoken", config.DBBackend, config.DBDir())
96         accessTokens := accesstoken.NewStore(tokenDB)
97
98         txPool := protocol.NewTxPool(store)
99         chain, err := protocol.NewChain(store, txPool)
100         if err != nil {
101                 cmn.Exit(cmn.Fmt("Failed to create chain structure: %v", err))
102         }
103
104         var accounts *account.Manager = nil
105         var assets *asset.Registry = nil
106         var wallet *w.Wallet = nil
107         var txFeed *txfeed.Tracker = nil
108
109         txFeedDB := dbm.NewDB("txfeeds", config.DBBackend, config.DBDir())
110         txFeed = txfeed.NewTracker(txFeedDB, chain)
111
112         if err = txFeed.Prepare(ctx); err != nil {
113                 log.WithField("error", err).Error("start txfeed")
114                 return nil
115         }
116
117         hsm, err := pseudohsm.New(config.KeysDir())
118         if err != nil {
119                 cmn.Exit(cmn.Fmt("initialize HSM failed: %v", err))
120         }
121
122         if !config.Wallet.Disable {
123                 address, err := common.DecodeAddress(config.Consensus.Dpos.Coinbase, &consensus.ActiveNetParams)
124                 if err != nil {
125                         cmn.Exit(cmn.Fmt("DecodeAddress: %v", err))
126                 }
127                 walletDB := dbm.NewDB("wallet", config.DBBackend, config.DBDir())
128                 accounts = account.NewManager(walletDB, chain)
129                 assets = asset.NewRegistry(walletDB, chain)
130                 wallet, err = w.NewWallet(walletDB, accounts, assets, hsm, chain, address)
131                 if err != nil {
132                         log.WithField("error", err).Error("init NewWallet")
133                 }
134
135                 // trigger rescan wallet
136                 if config.Wallet.Rescan {
137                         wallet.RescanBlocks()
138                 }
139         }
140         newBlockCh := make(chan *bc.Hash, maxNewBlockChSize)
141
142         syncManager, _ := netsync.NewSyncManager(config, chain, txPool, newBlockCh)
143
144         notificationMgr := websocket.NewWsNotificationManager(config.Websocket.MaxNumWebsockets, config.Websocket.MaxNumConcurrentReqs, chain)
145
146         // get transaction from txPool and send it to syncManager and wallet
147         go newPoolTxListener(txPool, syncManager, wallet, notificationMgr)
148
149         // run the profile server
150         profileHost := config.ProfListenAddress
151         if profileHost != "" {
152                 // Profiling bytomd programs.see (https://blog.golang.org/profiling-go-programs)
153                 // go tool pprof http://profileHose/debug/pprof/heap
154                 go func() {
155                         if err = http.ListenAndServe(profileHost, nil); err != nil {
156                                 cmn.Exit(cmn.Fmt("Failed to register tcp profileHost: %v", err))
157                         }
158                 }()
159         }
160
161         node := &Node{
162                 config:       config,
163                 syncManager:  syncManager,
164                 accessTokens: accessTokens,
165                 wallet:       wallet,
166                 chain:        chain,
167                 txfeed:       txFeed,
168                 miningEnable: config.Mining,
169
170                 newBlockCh:      newBlockCh,
171                 notificationMgr: notificationMgr,
172         }
173
174         //node.cpuMiner = cpuminer.NewCPUMiner(chain, accounts, txPool, newBlockCh)
175         consensusEngine = createConsensusEngine(config, store)
176         node.miner = miner.NewMiner(chain, accounts, txPool, newBlockCh, consensusEngine)
177         node.miningPool = miningpool.NewMiningPool(chain, accounts, txPool, newBlockCh)
178
179         node.BaseService = *cmn.NewBaseService(nil, "Node", node)
180
181         return node
182 }
183
184 // newPoolTxListener listener transaction from txPool, and send it to syncManager and wallet
185 func newPoolTxListener(txPool *protocol.TxPool, syncManager *netsync.SyncManager, wallet *w.Wallet, notificationMgr *websocket.WSNotificationManager) {
186         txMsgCh := txPool.GetMsgCh()
187         syncManagerTxCh := syncManager.GetNewTxCh()
188
189         for {
190                 msg := <-txMsgCh
191                 switch msg.MsgType {
192                 case protocol.MsgNewTx:
193                         syncManagerTxCh <- msg.Tx
194                         if wallet != nil {
195                                 wallet.AddUnconfirmedTx(msg.TxDesc)
196                         }
197                         notificationMgr.NotifyMempoolTx(msg.Tx)
198                 case protocol.MsgRemoveTx:
199                         if wallet != nil {
200                                 wallet.RemoveUnconfirmedTx(msg.TxDesc)
201                         }
202                 default:
203                         log.Warn("got unknow message type from the txPool channel")
204                 }
205         }
206 }
207
208 // Lock data directory after daemonization
209 func lockDataDirectory(config *cfg.Config) error {
210         _, _, err := flock.New(filepath.Join(config.RootDir, "LOCK"))
211         if err != nil {
212                 return errors.New("datadir already used by another process")
213         }
214         return nil
215 }
216
217 func initActiveNetParams(config *cfg.Config) {
218         var exist bool
219         consensus.ActiveNetParams, exist = consensus.NetParams[config.ChainID]
220         if !exist {
221                 cmn.Exit(cmn.Fmt("chain_id[%v] don't exist", config.ChainID))
222         }
223         if config.Side.FedpegXPubs != "" {
224                 var federationRedeemXPubs []chainkd.XPub
225                 fedpegXPubs := strings.Split(config.Side.FedpegXPubs, ",")
226                 for _, xpubStr := range fedpegXPubs {
227                         var xpub chainkd.XPub
228                         xpub.UnmarshalText([]byte(xpubStr))
229                         federationRedeemXPubs = append(federationRedeemXPubs, xpub)
230                 }
231                 consensus.ActiveNetParams.FedpegXPubs = federationRedeemXPubs
232         }
233
234         consensus.ActiveNetParams.Signer = config.Signer
235         consensus.ActiveNetParams.PeginMinDepth = config.Side.PeginMinDepth
236         consensus.ActiveNetParams.ParentGenesisBlockHash = config.Side.ParentGenesisBlockHash
237 }
238
239 func initLogFile(config *cfg.Config) {
240         if config.LogFile == "" {
241                 return
242         }
243         cmn.EnsureDir(filepath.Dir(config.LogFile), 0700)
244         file, err := os.OpenFile(config.LogFile, os.O_CREATE|os.O_WRONLY|os.O_APPEND, 0666)
245         if err == nil {
246                 log.SetOutput(file)
247         } else {
248                 log.WithField("err", err).Info("using default")
249         }
250
251 }
252
253 func initCommonConfig(config *cfg.Config) {
254         cfg.CommonConfig = config
255 }
256
257 // Lanch web broser or not
258 func launchWebBrowser(port string) {
259         webAddress := webHost + ":" + port
260         log.Info("Launching System Browser with :", webAddress)
261         if err := browser.Open(webAddress); err != nil {
262                 log.Error(err.Error())
263                 return
264         }
265 }
266
267 func (n *Node) initAndstartApiServer() {
268         n.api = api.NewAPI(n.syncManager, n.wallet, n.txfeed, n.miner, n.miningPool, n.chain, n.config, n.accessTokens, n.newBlockCh, n.notificationMgr)
269
270         listenAddr := env.String("LISTEN", n.config.ApiAddress)
271         env.Parse()
272         n.api.StartServer(*listenAddr)
273 }
274
275 func (n *Node) OnStart() error {
276         if n.miningEnable {
277                 if _, err := n.wallet.AccountMgr.GetMiningAddress(); err != nil {
278                         n.miningEnable = false
279                         log.Error(err)
280                 } else {
281                         //n.cpuMiner.Start()
282                         n.miner.Start()
283                 }
284         }
285         if !n.config.VaultMode {
286                 n.syncManager.Start()
287         }
288         n.initAndstartApiServer()
289         n.notificationMgr.Start()
290         if !n.config.Web.Closed {
291                 _, port, err := net.SplitHostPort(n.config.ApiAddress)
292                 if err != nil {
293                         log.Error("Invalid api address")
294                         return err
295                 }
296                 launchWebBrowser(port)
297         }
298         go bytomdRPCCheck()
299         return nil
300 }
301
302 func (n *Node) OnStop() {
303         n.notificationMgr.Shutdown()
304         n.notificationMgr.WaitForShutdown()
305         n.BaseService.OnStop()
306         if n.miningEnable {
307                 n.miner.Stop()
308         }
309         if !n.config.VaultMode {
310                 n.syncManager.Stop()
311         }
312 }
313
314 func (n *Node) RunForever() {
315         // Sleep forever and then...
316         cmn.TrapSignal(func() {
317                 n.Stop()
318         })
319 }
320
321 func (n *Node) SyncManager() *netsync.SyncManager {
322         return n.syncManager
323 }
324
325 func (n *Node) MiningPool() *miningpool.MiningPool {
326         return n.miningPool
327 }
328
329 /**bytomdRPCCheck Check if bytomd connection via RPC is correctly working*/
330 func bytomdRPCCheck() bool {
331         type Req struct {
332                 BlockHeight uint64 `json:"block_height"`
333         }
334         if util.ValidatePegin {
335                 for {
336                         resp, err := util.CallRPC("/get-merkle-proof", &Req{BlockHeight: 0})
337                         if err != nil {
338                                 log.Error("Call mainchain interface get-block-header failed")
339                                 time.Sleep(time.Millisecond * 1000)
340                                 continue
341                         }
342                         tmp, _ := json.Marshal(resp)
343                         var blockHeader api.GetBlockHeaderResp
344                         json.Unmarshal(tmp, &blockHeader)
345                         hash := blockHeader.BlockHeader.Hash()
346                         if strings.Compare(consensus.ActiveNetParams.ParentGenesisBlockHash, hash.String()) != 0 {
347                                 log.Error("Invalid parent genesis block hash response via RPC. Contacting wrong parent daemon?", consensus.ActiveNetParams.ParentGenesisBlockHash, hash.String())
348                                 return false
349                         }
350                         break
351                 }
352         }
353
354         return true
355 }
356
357 func initConsensusConfig(config *cfg.Config) {
358         if config.ConsensusConfigFile == "" {
359                 // poa
360         } else {
361                 //
362                 file, err := os.Open(config.ConsensusConfigFile)
363                 if err != nil {
364                         cmn.Exit(cmn.Fmt("Failed to read consensus file: %v", err))
365                 }
366                 defer file.Close()
367
368                 if err := json.NewDecoder(file).Decode(config); err != nil {
369                         cmn.Exit(cmn.Fmt("invalid consensus file: %v", err))
370                 }
371
372                 for _, v := range config.Consensus.Dpos.SelfVoteSigners {
373                         address, err := common.DecodeAddress(v, &consensus.ActiveNetParams)
374                         if err != nil {
375                                 cmn.Exit(cmn.Fmt("Address resolution failed: %v", err))
376                         }
377                         config.Consensus.Dpos.Signers = append(config.Consensus.Dpos.Signers, address)
378                 }
379         }
380 }
381
382 func createConsensusEngine(config *cfg.Config, store protocol.Store) engine.Engine {
383         if config.Consensus.Dpos != nil {
384                 return dpos.New(config.Consensus.Dpos, store)
385         } else {
386                 return nil
387         }
388 }
389
390 func GetConsensusEngine() engine.Engine {
391         return consensusEngine
392 }