OSDN Git Service

add pegin address for contract
[bytom/vapor.git] / node / node.go
1 package node
2
3 import (
4         "context"
5         "encoding/json"
6         "errors"
7         "fmt"
8         "net"
9         "net/http"
10         _ "net/http/pprof"
11         "os"
12         "path/filepath"
13         "strings"
14         "time"
15
16         "github.com/prometheus/prometheus/util/flock"
17         log "github.com/sirupsen/logrus"
18         cmn "github.com/tendermint/tmlibs/common"
19         dbm "github.com/tendermint/tmlibs/db"
20         browser "github.com/toqueteos/webbrowser"
21
22         "github.com/vapor/accesstoken"
23         "github.com/vapor/account"
24         "github.com/vapor/api"
25         "github.com/vapor/asset"
26         "github.com/vapor/blockchain/pseudohsm"
27         "github.com/vapor/blockchain/txfeed"
28         cfg "github.com/vapor/config"
29         "github.com/vapor/consensus"
30         "github.com/vapor/crypto/ed25519/chainkd"
31         "github.com/vapor/database/leveldb"
32         "github.com/vapor/env"
33         "github.com/vapor/mining/cpuminer"
34         "github.com/vapor/mining/miningpool"
35         "github.com/vapor/net/websocket"
36         "github.com/vapor/netsync"
37         "github.com/vapor/protocol"
38         "github.com/vapor/protocol/bc"
39         "github.com/vapor/util"
40         w "github.com/vapor/wallet"
41 )
42
43 const (
44         webHost           = "http://127.0.0.1"
45         maxNewBlockChSize = 1024
46 )
47
48 type Node struct {
49         cmn.BaseService
50
51         // config
52         config *cfg.Config
53
54         syncManager *netsync.SyncManager
55
56         //bcReactor    *bc.BlockchainReactor
57         wallet          *w.Wallet
58         accessTokens    *accesstoken.CredentialStore
59         notificationMgr *websocket.WSNotificationManager
60         api             *api.API
61         chain           *protocol.Chain
62         txfeed          *txfeed.Tracker
63         cpuMiner        *cpuminer.CPUMiner
64         miningPool      *miningpool.MiningPool
65         miningEnable    bool
66
67         newBlockCh chan *bc.Hash
68 }
69
70 func NewNode(config *cfg.Config) *Node {
71         ctx := context.Background()
72         if err := lockDataDirectory(config); err != nil {
73                 cmn.Exit("Error: " + err.Error())
74         }
75         initLogFile(config)
76         initActiveNetParams(config)
77         initCommonConfig(config)
78         util.MainchainConfig = config.MainChain
79         util.ValidatePegin = config.ValidatePegin
80         // Get store
81         if config.DBBackend != "memdb" && config.DBBackend != "leveldb" {
82                 cmn.Exit(cmn.Fmt("Param db_backend [%v] is invalid, use leveldb or memdb", config.DBBackend))
83         }
84         coreDB := dbm.NewDB("core", config.DBBackend, config.DBDir())
85         store := leveldb.NewStore(coreDB)
86
87         tokenDB := dbm.NewDB("accesstoken", config.DBBackend, config.DBDir())
88         accessTokens := accesstoken.NewStore(tokenDB)
89
90         txPool := protocol.NewTxPool(store)
91         chain, err := protocol.NewChain(store, txPool)
92         if err != nil {
93                 cmn.Exit(cmn.Fmt("Failed to create chain structure: %v", err))
94         }
95
96         var accounts *account.Manager = nil
97         var assets *asset.Registry = nil
98         var wallet *w.Wallet = nil
99         var txFeed *txfeed.Tracker = nil
100
101         txFeedDB := dbm.NewDB("txfeeds", config.DBBackend, config.DBDir())
102         txFeed = txfeed.NewTracker(txFeedDB, chain)
103
104         if err = txFeed.Prepare(ctx); err != nil {
105                 log.WithField("error", err).Error("start txfeed")
106                 return nil
107         }
108
109         hsm, err := pseudohsm.New(config.KeysDir())
110         if err != nil {
111                 cmn.Exit(cmn.Fmt("initialize HSM failed: %v", err))
112         }
113
114         if !config.Wallet.Disable {
115                 walletDB := dbm.NewDB("wallet", config.DBBackend, config.DBDir())
116                 accounts = account.NewManager(walletDB, chain)
117                 assets = asset.NewRegistry(walletDB, chain)
118                 wallet, err = w.NewWallet(walletDB, accounts, assets, hsm, chain)
119                 if err != nil {
120                         log.WithField("error", err).Error("init NewWallet")
121                 }
122
123                 // trigger rescan wallet
124                 if config.Wallet.Rescan {
125                         wallet.RescanBlocks()
126                 }
127         }
128         newBlockCh := make(chan *bc.Hash, maxNewBlockChSize)
129
130         syncManager, _ := netsync.NewSyncManager(config, chain, txPool, newBlockCh)
131
132         notificationMgr := websocket.NewWsNotificationManager(config.Websocket.MaxNumWebsockets, config.Websocket.MaxNumConcurrentReqs, chain)
133
134         // get transaction from txPool and send it to syncManager and wallet
135         go newPoolTxListener(txPool, syncManager, wallet, notificationMgr)
136
137         // run the profile server
138         profileHost := config.ProfListenAddress
139         if profileHost != "" {
140                 // Profiling bytomd programs.see (https://blog.golang.org/profiling-go-programs)
141                 // go tool pprof http://profileHose/debug/pprof/heap
142                 go func() {
143                         if err = http.ListenAndServe(profileHost, nil); err != nil {
144                                 cmn.Exit(cmn.Fmt("Failed to register tcp profileHost: %v", err))
145                         }
146                 }()
147         }
148
149         node := &Node{
150                 config:       config,
151                 syncManager:  syncManager,
152                 accessTokens: accessTokens,
153                 wallet:       wallet,
154                 chain:        chain,
155                 txfeed:       txFeed,
156                 miningEnable: config.Mining,
157
158                 newBlockCh:      newBlockCh,
159                 notificationMgr: notificationMgr,
160         }
161
162         node.cpuMiner = cpuminer.NewCPUMiner(chain, accounts, txPool, newBlockCh)
163         node.miningPool = miningpool.NewMiningPool(chain, accounts, txPool, newBlockCh)
164
165         node.BaseService = *cmn.NewBaseService(nil, "Node", node)
166
167         return node
168 }
169
170 // newPoolTxListener listener transaction from txPool, and send it to syncManager and wallet
171 func newPoolTxListener(txPool *protocol.TxPool, syncManager *netsync.SyncManager, wallet *w.Wallet, notificationMgr *websocket.WSNotificationManager) {
172         txMsgCh := txPool.GetMsgCh()
173         syncManagerTxCh := syncManager.GetNewTxCh()
174
175         for {
176                 msg := <-txMsgCh
177                 switch msg.MsgType {
178                 case protocol.MsgNewTx:
179                         syncManagerTxCh <- msg.Tx
180                         if wallet != nil {
181                                 wallet.AddUnconfirmedTx(msg.TxDesc)
182                         }
183                         notificationMgr.NotifyMempoolTx(msg.Tx)
184                 case protocol.MsgRemoveTx:
185                         if wallet != nil {
186                                 wallet.RemoveUnconfirmedTx(msg.TxDesc)
187                         }
188                 default:
189                         log.Warn("got unknow message type from the txPool channel")
190                 }
191         }
192 }
193
194 // Lock data directory after daemonization
195 func lockDataDirectory(config *cfg.Config) error {
196         _, _, err := flock.New(filepath.Join(config.RootDir, "LOCK"))
197         if err != nil {
198                 return errors.New("datadir already used by another process")
199         }
200         return nil
201 }
202
203 func initActiveNetParams(config *cfg.Config) {
204         var exist bool
205         consensus.ActiveNetParams, exist = consensus.NetParams[config.ChainID]
206         if !exist {
207                 cmn.Exit(cmn.Fmt("chain_id[%v] don't exist", config.ChainID))
208         }
209         if config.Side.FedpegXPubs != "" {
210                 var federationRedeemXPubs []chainkd.XPub
211                 fedpegXPubs := strings.Split(config.Side.FedpegXPubs, ",")
212                 for _, xpubStr := range fedpegXPubs {
213                         var xpub chainkd.XPub
214                         xpub.UnmarshalText([]byte(xpubStr))
215                         federationRedeemXPubs = append(federationRedeemXPubs, xpub)
216                 }
217                 consensus.ActiveNetParams.FedpegXPubs = federationRedeemXPubs
218         }
219
220         if config.Side.SignBlockXPubs != "" {
221                 var signBlockXPubs []chainkd.XPub
222                 fmt.Println(signBlockXPubs)
223                 xPubs := strings.Split(config.Side.SignBlockXPubs, ",")
224                 for _, xpubStr := range xPubs {
225                         var xpub chainkd.XPub
226                         xpub.UnmarshalText([]byte(xpubStr))
227                         signBlockXPubs = append(signBlockXPubs, xpub)
228                 }
229                 consensus.ActiveNetParams.SignBlockXPubs = signBlockXPubs
230         }
231
232         consensus.ActiveNetParams.Signer = config.Signer
233         consensus.ActiveNetParams.PeginMinDepth = config.Side.PeginMinDepth
234         consensus.ActiveNetParams.ParentGenesisBlockHash = config.Side.ParentGenesisBlockHash
235 }
236
237 func initLogFile(config *cfg.Config) {
238         if config.LogFile == "" {
239                 return
240         }
241         cmn.EnsureDir(filepath.Dir(config.LogFile), 0700)
242         file, err := os.OpenFile(config.LogFile, os.O_CREATE|os.O_WRONLY|os.O_APPEND, 0666)
243         if err == nil {
244                 log.SetOutput(file)
245         } else {
246                 log.WithField("err", err).Info("using default")
247         }
248
249 }
250
251 func initCommonConfig(config *cfg.Config) {
252         cfg.CommonConfig = config
253 }
254
255 // Lanch web broser or not
256 func launchWebBrowser(port string) {
257         webAddress := webHost + ":" + port
258         log.Info("Launching System Browser with :", webAddress)
259         if err := browser.Open(webAddress); err != nil {
260                 log.Error(err.Error())
261                 return
262         }
263 }
264
265 func (n *Node) initAndstartApiServer() {
266         n.api = api.NewAPI(n.syncManager, n.wallet, n.txfeed, n.cpuMiner, n.miningPool, n.chain, n.config, n.accessTokens, n.newBlockCh, n.notificationMgr)
267
268         listenAddr := env.String("LISTEN", n.config.ApiAddress)
269         env.Parse()
270         n.api.StartServer(*listenAddr)
271 }
272
273 func (n *Node) OnStart() error {
274         if n.miningEnable {
275                 if _, err := n.wallet.AccountMgr.GetMiningAddress(); err != nil {
276                         n.miningEnable = false
277                         log.Error(err)
278                 } else {
279                         n.cpuMiner.Start()
280                 }
281         }
282         if !n.config.VaultMode {
283                 n.syncManager.Start()
284         }
285         n.initAndstartApiServer()
286         n.notificationMgr.Start()
287         if !n.config.Web.Closed {
288                 _, port, err := net.SplitHostPort(n.config.ApiAddress)
289                 if err != nil {
290                         log.Error("Invalid api address")
291                         return err
292                 }
293                 launchWebBrowser(port)
294         }
295         go bytomdRPCCheck()
296         return nil
297 }
298
299 func (n *Node) OnStop() {
300         n.notificationMgr.Shutdown()
301         n.notificationMgr.WaitForShutdown()
302         n.BaseService.OnStop()
303         if n.miningEnable {
304                 n.cpuMiner.Stop()
305         }
306         if !n.config.VaultMode {
307                 n.syncManager.Stop()
308         }
309 }
310
311 func (n *Node) RunForever() {
312         // Sleep forever and then...
313         cmn.TrapSignal(func() {
314                 n.Stop()
315         })
316 }
317
318 func (n *Node) SyncManager() *netsync.SyncManager {
319         return n.syncManager
320 }
321
322 func (n *Node) MiningPool() *miningpool.MiningPool {
323         return n.miningPool
324 }
325
326 /**bytomdRPCCheck Check if bytomd connection via RPC is correctly working*/
327 func bytomdRPCCheck() bool {
328         type Req struct {
329                 BlockHeight uint64 `json:"block_height"`
330         }
331         if util.ValidatePegin {
332                 for {
333                         resp, err := util.CallRPC("/get-block-header", &Req{BlockHeight: 0})
334                         if err != nil {
335                                 log.Error("Call mainchain interface get-block-header failed")
336                                 time.Sleep(time.Millisecond * 1000)
337                                 continue
338                         }
339                         tmp, _ := json.Marshal(resp)
340                         var blockHeader api.GetBlockHeaderResp
341                         json.Unmarshal(tmp, &blockHeader)
342                         hash := blockHeader.BlockHeader.Hash()
343                         if strings.Compare(consensus.ActiveNetParams.ParentGenesisBlockHash, hash.String()) != 0 {
344                                 log.Error("Invalid parent genesis block hash response via RPC. Contacting wrong parent daemon?")
345                                 return false
346                         }
347                         break
348                 }
349         }
350
351         return true
352 }