OSDN Git Service

61c51aeeb76b1e76c996677c513ef2ee7036fb28
[bytom/vapor.git] / node / node.go
1 package node
2
3 import (
4         "context"
5         "encoding/json"
6         "errors"
7         "net"
8         "net/http"
9         _ "net/http/pprof"
10         "os"
11         "path/filepath"
12         "strings"
13         "time"
14
15         "github.com/prometheus/prometheus/util/flock"
16         log "github.com/sirupsen/logrus"
17         cmn "github.com/tendermint/tmlibs/common"
18         dbm "github.com/tendermint/tmlibs/db"
19         browser "github.com/toqueteos/webbrowser"
20
21         "github.com/vapor/accesstoken"
22         "github.com/vapor/account"
23         "github.com/vapor/api"
24         "github.com/vapor/asset"
25         "github.com/vapor/blockchain/pseudohsm"
26         "github.com/vapor/blockchain/txfeed"
27         cfg "github.com/vapor/config"
28         "github.com/vapor/consensus"
29         "github.com/vapor/crypto/ed25519/chainkd"
30         "github.com/vapor/database/leveldb"
31         "github.com/vapor/env"
32         "github.com/vapor/mining/cpuminer"
33         "github.com/vapor/mining/miningpool"
34         "github.com/vapor/net/websocket"
35         "github.com/vapor/netsync"
36         "github.com/vapor/protocol"
37         "github.com/vapor/protocol/bc"
38         "github.com/vapor/util"
39         w "github.com/vapor/wallet"
40 )
41
42 const (
43         webHost           = "http://127.0.0.1"
44         maxNewBlockChSize = 1024
45 )
46
47 type Node struct {
48         cmn.BaseService
49
50         // config
51         config *cfg.Config
52
53         syncManager *netsync.SyncManager
54
55         //bcReactor    *bc.BlockchainReactor
56         wallet          *w.Wallet
57         accessTokens    *accesstoken.CredentialStore
58         notificationMgr *websocket.WSNotificationManager
59         api             *api.API
60         chain           *protocol.Chain
61         txfeed          *txfeed.Tracker
62         cpuMiner        *cpuminer.CPUMiner
63         miningPool      *miningpool.MiningPool
64         miningEnable    bool
65
66         newBlockCh chan *bc.Hash
67 }
68
69 func NewNode(config *cfg.Config) *Node {
70         ctx := context.Background()
71         if err := lockDataDirectory(config); err != nil {
72                 cmn.Exit("Error: " + err.Error())
73         }
74         initLogFile(config)
75         initActiveNetParams(config)
76         initCommonConfig(config)
77         util.MainchainConfig = config.MainChain
78         util.ValidatePegin = config.ValidatePegin
79         // Get store
80         if config.DBBackend != "memdb" && config.DBBackend != "leveldb" {
81                 cmn.Exit(cmn.Fmt("Param db_backend [%v] is invalid, use leveldb or memdb", config.DBBackend))
82         }
83         coreDB := dbm.NewDB("core", config.DBBackend, config.DBDir())
84         store := leveldb.NewStore(coreDB)
85
86         tokenDB := dbm.NewDB("accesstoken", config.DBBackend, config.DBDir())
87         accessTokens := accesstoken.NewStore(tokenDB)
88
89         txPool := protocol.NewTxPool(store)
90         chain, err := protocol.NewChain(store, txPool)
91         if err != nil {
92                 cmn.Exit(cmn.Fmt("Failed to create chain structure: %v", err))
93         }
94
95         var accounts *account.Manager = nil
96         var assets *asset.Registry = nil
97         var wallet *w.Wallet = nil
98         var txFeed *txfeed.Tracker = nil
99
100         txFeedDB := dbm.NewDB("txfeeds", config.DBBackend, config.DBDir())
101         txFeed = txfeed.NewTracker(txFeedDB, chain)
102
103         if err = txFeed.Prepare(ctx); err != nil {
104                 log.WithField("error", err).Error("start txfeed")
105                 return nil
106         }
107
108         hsm, err := pseudohsm.New(config.KeysDir())
109         if err != nil {
110                 cmn.Exit(cmn.Fmt("initialize HSM failed: %v", err))
111         }
112
113         if !config.Wallet.Disable {
114                 walletDB := dbm.NewDB("wallet", config.DBBackend, config.DBDir())
115                 accounts = account.NewManager(walletDB, chain)
116                 assets = asset.NewRegistry(walletDB, chain)
117                 wallet, err = w.NewWallet(walletDB, accounts, assets, hsm, chain)
118                 if err != nil {
119                         log.WithField("error", err).Error("init NewWallet")
120                 }
121
122                 // trigger rescan wallet
123                 if config.Wallet.Rescan {
124                         wallet.RescanBlocks()
125                 }
126         }
127         newBlockCh := make(chan *bc.Hash, maxNewBlockChSize)
128
129         syncManager, _ := netsync.NewSyncManager(config, chain, txPool, newBlockCh)
130
131         notificationMgr := websocket.NewWsNotificationManager(config.Websocket.MaxNumWebsockets, config.Websocket.MaxNumConcurrentReqs, chain)
132
133         // get transaction from txPool and send it to syncManager and wallet
134         go newPoolTxListener(txPool, syncManager, wallet, notificationMgr)
135
136         // run the profile server
137         profileHost := config.ProfListenAddress
138         if profileHost != "" {
139                 // Profiling bytomd programs.see (https://blog.golang.org/profiling-go-programs)
140                 // go tool pprof http://profileHose/debug/pprof/heap
141                 go func() {
142                         if err = http.ListenAndServe(profileHost, nil); err != nil {
143                                 cmn.Exit(cmn.Fmt("Failed to register tcp profileHost: %v", err))
144                         }
145                 }()
146         }
147
148         node := &Node{
149                 config:       config,
150                 syncManager:  syncManager,
151                 accessTokens: accessTokens,
152                 wallet:       wallet,
153                 chain:        chain,
154                 txfeed:       txFeed,
155                 miningEnable: config.Mining,
156
157                 newBlockCh:      newBlockCh,
158                 notificationMgr: notificationMgr,
159         }
160
161         node.cpuMiner = cpuminer.NewCPUMiner(chain, accounts, txPool, newBlockCh)
162         node.miningPool = miningpool.NewMiningPool(chain, accounts, txPool, newBlockCh)
163
164         node.BaseService = *cmn.NewBaseService(nil, "Node", node)
165
166         return node
167 }
168
169 // newPoolTxListener listener transaction from txPool, and send it to syncManager and wallet
170 func newPoolTxListener(txPool *protocol.TxPool, syncManager *netsync.SyncManager, wallet *w.Wallet, notificationMgr *websocket.WSNotificationManager) {
171         txMsgCh := txPool.GetMsgCh()
172         syncManagerTxCh := syncManager.GetNewTxCh()
173
174         for {
175                 msg := <-txMsgCh
176                 switch msg.MsgType {
177                 case protocol.MsgNewTx:
178                         syncManagerTxCh <- msg.Tx
179                         if wallet != nil {
180                                 wallet.AddUnconfirmedTx(msg.TxDesc)
181                         }
182                         notificationMgr.NotifyMempoolTx(msg.Tx)
183                 case protocol.MsgRemoveTx:
184                         if wallet != nil {
185                                 wallet.RemoveUnconfirmedTx(msg.TxDesc)
186                         }
187                 default:
188                         log.Warn("got unknow message type from the txPool channel")
189                 }
190         }
191 }
192
193 // Lock data directory after daemonization
194 func lockDataDirectory(config *cfg.Config) error {
195         _, _, err := flock.New(filepath.Join(config.RootDir, "LOCK"))
196         if err != nil {
197                 return errors.New("datadir already used by another process")
198         }
199         return nil
200 }
201
202 func initActiveNetParams(config *cfg.Config) {
203         var exist bool
204         consensus.ActiveNetParams, exist = consensus.NetParams[config.ChainID]
205         if !exist {
206                 cmn.Exit(cmn.Fmt("chain_id[%v] don't exist", config.ChainID))
207         }
208         var federationRedeemXPubs []chainkd.XPub
209         if fedpegXPubs := strings.Split(config.Side.FedpegXPubs, ","); len(fedpegXPubs) > 0 {
210                 for _, xpubStr := range fedpegXPubs {
211                         var xpub chainkd.XPub
212                         xpub.UnmarshalText([]byte(xpubStr))
213                         federationRedeemXPubs = append(federationRedeemXPubs, xpub)
214                 }
215         }
216
217         var signBlockXPubs []chainkd.XPub
218         if xPubs := strings.Split(config.Side.SignBlockXPubs, ","); len(xPubs) > 0 {
219                 for _, xpubStr := range xPubs {
220                         var xpub chainkd.XPub
221                         xpub.UnmarshalText([]byte(xpubStr))
222                         signBlockXPubs = append(signBlockXPubs, xpub)
223                 }
224         }
225
226         consensus.ActiveNetParams.Signer = config.Signer
227         consensus.ActiveNetParams.FedpegXPubs = federationRedeemXPubs
228         consensus.ActiveNetParams.SignBlockXPubs = signBlockXPubs
229         consensus.ActiveNetParams.PeginMinDepth = config.Side.PeginMinDepth
230         consensus.ActiveNetParams.ParentGenesisBlockHash = config.Side.ParentGenesisBlockHash
231 }
232
233 func initLogFile(config *cfg.Config) {
234         if config.LogFile == "" {
235                 return
236         }
237         cmn.EnsureDir(filepath.Dir(config.LogFile), 0700)
238         file, err := os.OpenFile(config.LogFile, os.O_CREATE|os.O_WRONLY|os.O_APPEND, 0666)
239         if err == nil {
240                 log.SetOutput(file)
241         } else {
242                 log.WithField("err", err).Info("using default")
243         }
244
245 }
246
247 func initCommonConfig(config *cfg.Config) {
248         cfg.CommonConfig = config
249 }
250
251 // Lanch web broser or not
252 func launchWebBrowser(port string) {
253         webAddress := webHost + ":" + port
254         log.Info("Launching System Browser with :", webAddress)
255         if err := browser.Open(webAddress); err != nil {
256                 log.Error(err.Error())
257                 return
258         }
259 }
260
261 func (n *Node) initAndstartApiServer() {
262         n.api = api.NewAPI(n.syncManager, n.wallet, n.txfeed, n.cpuMiner, n.miningPool, n.chain, n.config, n.accessTokens, n.newBlockCh, n.notificationMgr)
263
264         listenAddr := env.String("LISTEN", n.config.ApiAddress)
265         env.Parse()
266         n.api.StartServer(*listenAddr)
267 }
268
269 func (n *Node) OnStart() error {
270         if n.miningEnable {
271                 if _, err := n.wallet.AccountMgr.GetMiningAddress(); err != nil {
272                         n.miningEnable = false
273                         log.Error(err)
274                 } else {
275                         n.cpuMiner.Start()
276                 }
277         }
278         if !n.config.VaultMode {
279                 n.syncManager.Start()
280         }
281         n.initAndstartApiServer()
282         n.notificationMgr.Start()
283         if !n.config.Web.Closed {
284                 _, port, err := net.SplitHostPort(n.config.ApiAddress)
285                 if err != nil {
286                         log.Error("Invalid api address")
287                         return err
288                 }
289                 launchWebBrowser(port)
290         }
291         go bytomdRPCCheck()
292         return nil
293 }
294
295 func (n *Node) OnStop() {
296         n.notificationMgr.Shutdown()
297         n.notificationMgr.WaitForShutdown()
298         n.BaseService.OnStop()
299         if n.miningEnable {
300                 n.cpuMiner.Stop()
301         }
302         if !n.config.VaultMode {
303                 n.syncManager.Stop()
304         }
305 }
306
307 func (n *Node) RunForever() {
308         // Sleep forever and then...
309         cmn.TrapSignal(func() {
310                 n.Stop()
311         })
312 }
313
314 func (n *Node) SyncManager() *netsync.SyncManager {
315         return n.syncManager
316 }
317
318 func (n *Node) MiningPool() *miningpool.MiningPool {
319         return n.miningPool
320 }
321
322 /**bytomdRPCCheck Check if bytomd connection via RPC is correctly working*/
323 func bytomdRPCCheck() bool {
324         type Req struct {
325                 BlockHeight uint64 `json:"block_height"`
326         }
327         if util.ValidatePegin {
328                 for {
329                         resp, err := util.CallRPC("/get-block-header", &Req{BlockHeight: 0})
330                         if err != nil {
331                                 log.Error("Call mainchain interface get-block-header failed")
332                                 time.Sleep(time.Millisecond * 1000)
333                                 continue
334                         }
335                         tmp, _ := json.Marshal(resp)
336                         var blockHeader api.GetBlockHeaderResp
337                         json.Unmarshal(tmp, &blockHeader)
338                         hash := blockHeader.BlockHeader.Hash()
339                         if strings.Compare(consensus.ActiveNetParams.ParentGenesisBlockHash, hash.String()) != 0 {
340                                 log.Error("Invalid parent genesis block hash response via RPC. Contacting wrong parent daemon?")
341                                 return false
342                         }
343                         break
344                 }
345         }
346
347         return true
348 }