OSDN Git Service

7640e8ebdfcb4950bfcce4dff5b9625c39e0de4e
[bytom/vapor.git] / node / node.go
1 package node
2
3 import (
4         "encoding/hex"
5         "errors"
6         "net"
7         "net/http"
8         _ "net/http/pprof"
9         "path/filepath"
10         "reflect"
11
12         "github.com/prometheus/prometheus/util/flock"
13         log "github.com/sirupsen/logrus"
14         cmn "github.com/tendermint/tmlibs/common"
15         browser "github.com/toqueteos/webbrowser"
16
17         "github.com/bytom/vapor/accesstoken"
18         "github.com/bytom/vapor/account"
19         "github.com/bytom/vapor/api"
20         "github.com/bytom/vapor/application/mov"
21         "github.com/bytom/vapor/asset"
22         "github.com/bytom/vapor/blockchain/pseudohsm"
23         cfg "github.com/bytom/vapor/config"
24         "github.com/bytom/vapor/consensus"
25         "github.com/bytom/vapor/database"
26         dbm "github.com/bytom/vapor/database/leveldb"
27         "github.com/bytom/vapor/env"
28         "github.com/bytom/vapor/event"
29         vaporLog "github.com/bytom/vapor/log"
30         "github.com/bytom/vapor/net/websocket"
31         "github.com/bytom/vapor/netsync"
32         "github.com/bytom/vapor/proposal/blockproposer"
33         "github.com/bytom/vapor/protocol"
34         "github.com/bytom/vapor/protocol/bc/types"
35         w "github.com/bytom/vapor/wallet"
36 )
37
38 const (
39         webHost   = "http://127.0.0.1"
40         logModule = "node"
41 )
42
43 // Node represent bytom node
44 type Node struct {
45         cmn.BaseService
46
47         config          *cfg.Config
48         eventDispatcher *event.Dispatcher
49         syncManager     *netsync.SyncManager
50
51         wallet          *w.Wallet
52         accessTokens    *accesstoken.CredentialStore
53         notificationMgr *websocket.WSNotificationManager
54         api             *api.API
55         chain           *protocol.Chain
56         cpuMiner        *blockproposer.BlockProposer
57         miningEnable    bool
58 }
59
60 // NewNode create bytom node
61 func NewNode(config *cfg.Config) *Node {
62         initNodeConfig(config)
63
64         if err := vaporLog.InitLogFile(config); err != nil {
65                 log.WithField("err", err).Fatalln("InitLogFile failed")
66         }
67
68         log.WithFields(log.Fields{
69                 "module":             logModule,
70                 "pubkey":             config.PrivateKey().XPub(),
71                 "fed_xpubs":          config.Federation.Xpubs,
72                 "fed_quorum":         config.Federation.Quorum,
73                 "fed_controlprogram": hex.EncodeToString(cfg.FederationWScript(config)),
74         }).Info()
75
76         // Get store
77         if config.DBBackend != "memdb" && config.DBBackend != "leveldb" {
78                 cmn.Exit(cmn.Fmt("Param db_backend [%v] is invalid, use leveldb or memdb", config.DBBackend))
79         }
80         coreDB := dbm.NewDB("core", config.DBBackend, config.DBDir())
81         store := database.NewStore(coreDB)
82
83         tokenDB := dbm.NewDB("accesstoken", config.DBBackend, config.DBDir())
84         accessTokens := accesstoken.NewStore(tokenDB)
85
86         dispatcher := event.NewDispatcher()
87         movCore := mov.NewMovCore(config.DBBackend, config.DBDir(), consensus.ActiveNetParams.MovStartHeight)
88         assetFilter := protocol.NewAssetFilter(config.CrossChain.AssetWhitelist)
89         txPool := protocol.NewTxPool(store, []protocol.DustFilterer{movCore, assetFilter}, dispatcher)
90         chain, err := protocol.NewChain(store, txPool, []protocol.Protocoler{movCore}, dispatcher)
91         if err != nil {
92                 cmn.Exit(cmn.Fmt("Failed to create chain structure: %v", err))
93         }
94
95         if err := checkConfig(chain, config); err != nil {
96                 panic(err)
97         }
98
99         var accounts *account.Manager
100         var assets *asset.Registry
101         var wallet *w.Wallet
102
103         hsm, err := pseudohsm.New(config.KeysDir())
104         if err != nil {
105                 cmn.Exit(cmn.Fmt("initialize HSM failed: %v", err))
106         }
107
108         if !config.Wallet.Disable {
109                 walletDB := dbm.NewDB("wallet", config.DBBackend, config.DBDir())
110                 walletStore := database.NewWalletStore(walletDB)
111                 accountStore := database.NewAccountStore(walletDB)
112                 accounts = account.NewManager(accountStore, chain)
113                 assets = asset.NewRegistry(walletDB, chain)
114                 wallet, err = w.NewWallet(walletStore, accounts, assets, hsm, chain, dispatcher, config.Wallet.TxIndex)
115                 if err != nil {
116                         log.WithFields(log.Fields{"module": logModule, "error": err}).Error("init NewWallet")
117                 }
118
119                 if err = wallet.Run(); err != nil {
120                         log.WithFields(log.Fields{"module": logModule, "error": err}).Error("init NewWallet work running thread")
121                 }
122
123                 // trigger rescan wallet
124                 if config.Wallet.Rescan {
125                         wallet.RescanBlocks()
126                 }
127         }
128         fastSyncDB := dbm.NewDB("fastsync", config.DBBackend, config.DBDir())
129         syncManager, err := netsync.NewSyncManager(config, chain, txPool, dispatcher, fastSyncDB)
130         if err != nil {
131                 cmn.Exit(cmn.Fmt("Failed to create sync manager: %v", err))
132         }
133
134         notificationMgr := websocket.NewWsNotificationManager(config.Websocket.MaxNumWebsockets, config.Websocket.MaxNumConcurrentReqs, chain, dispatcher)
135
136         // run the profile server
137         profileHost := config.ProfListenAddress
138         if profileHost != "" {
139                 // Profiling vapord programs.see (https://blog.golang.org/profiling-go-programs)
140                 // go tool pprof http://profileHose/debug/pprof/heap
141                 go func() {
142                         if err = http.ListenAndServe(profileHost, nil); err != nil {
143                                 cmn.Exit(cmn.Fmt("Failed to register tcp profileHost: %v", err))
144                         }
145                 }()
146         }
147
148         node := &Node{
149                 eventDispatcher: dispatcher,
150                 config:          config,
151                 syncManager:     syncManager,
152                 accessTokens:    accessTokens,
153                 wallet:          wallet,
154                 chain:           chain,
155                 miningEnable:    config.Mining,
156
157                 notificationMgr: notificationMgr,
158         }
159
160         node.cpuMiner = blockproposer.NewBlockProposer(chain, accounts, txPool, dispatcher)
161         node.BaseService = *cmn.NewBaseService(nil, "Node", node)
162         return node
163 }
164
165 // Rollback rollback chain from one height to targetHeight
166 func Rollback(config *cfg.Config, targetHeight uint64) error {
167         if err := initNodeConfig(config); err != nil {
168                 return err
169         }
170
171         // Get store
172         if config.DBBackend != "leveldb" {
173                 return errors.New("Param db_backend is invalid, use leveldb")
174         }
175
176         coreDB := dbm.NewDB("core", config.DBBackend, config.DBDir())
177         store := database.NewStore(coreDB)
178
179         dispatcher := event.NewDispatcher()
180         movCore := mov.NewMovCore(config.DBBackend, config.DBDir(), consensus.ActiveNetParams.MovStartHeight)
181         txPool := protocol.NewTxPool(store, []protocol.DustFilterer{movCore}, dispatcher)
182         chain, err := protocol.NewChain(store, txPool, []protocol.Protocoler{movCore}, dispatcher)
183         if err != nil {
184                 return err
185         }
186
187         hsm, err := pseudohsm.New(config.KeysDir())
188         if err != nil {
189                 return err
190         }
191
192         walletDB := dbm.NewDB("wallet", config.DBBackend, config.DBDir())
193         walletStore := database.NewWalletStore(walletDB)
194         accountStore := database.NewAccountStore(walletDB)
195         accounts := account.NewManager(accountStore, chain)
196         assets := asset.NewRegistry(walletDB, chain)
197         wallet, err := w.NewWallet(walletStore, accounts, assets, hsm, chain, dispatcher, config.Wallet.TxIndex)
198         if err != nil {
199                 return err
200         }
201
202         if err := wallet.Rollback(targetHeight); err != nil {
203                 return err
204         }
205
206         return chain.Rollback(targetHeight)
207 }
208
209 func initNodeConfig(config *cfg.Config) error {
210         if err := lockDataDirectory(config); err != nil {
211                 log.WithField("err", err).Info("Error: " + err.Error())
212                 return err
213         }
214
215         if err := cfg.LoadFederationFile(config.FederationFile(), config); err != nil {
216                 log.WithField("err", err).Info("Failed to load federated information")
217                 return err
218         }
219
220         if err := consensus.InitActiveNetParams(config.ChainID); err != nil {
221                 log.Fatalf("Failed to init ActiveNetParams:[%s]", err.Error())
222         }
223
224         cfg.CommonConfig = config
225         return nil
226 }
227
228 // find whether config xpubs equal genesis block xpubs
229 func checkConfig(chain *protocol.Chain, config *cfg.Config) error {
230         fedpegScript := cfg.FederationWScript(config)
231         genesisBlock, err := chain.GetBlockByHeight(0)
232         if err != nil {
233                 return err
234         }
235         typedInput := genesisBlock.Transactions[0].Inputs[0].TypedInput
236         if v, ok := typedInput.(*types.CoinbaseInput); ok {
237                 if !reflect.DeepEqual(fedpegScript, v.Arbitrary) {
238                         return errors.New("config xpubs don't equal genesis block xpubs.")
239                 }
240         }
241         return nil
242 }
243
244 // Lock data directory after daemonization
245 func lockDataDirectory(config *cfg.Config) error {
246         _, _, err := flock.New(filepath.Join(config.RootDir, "LOCK"))
247         if err != nil {
248                 return errors.New("datadir already used by another process")
249         }
250         return nil
251 }
252
253 func initCommonConfig(config *cfg.Config) {
254         cfg.CommonConfig = config
255 }
256
257 // Lanch web broser or not
258 func launchWebBrowser(port string) {
259         webAddress := webHost + ":" + port
260         log.Info("Launching System Browser with :", webAddress)
261         if err := browser.Open(webAddress); err != nil {
262                 log.Error(err.Error())
263                 return
264         }
265 }
266
267 func (n *Node) initAndstartAPIServer() {
268         n.api = api.NewAPI(n.syncManager, n.wallet, n.cpuMiner, n.chain, n.config, n.accessTokens, n.eventDispatcher, n.notificationMgr)
269
270         listenAddr := env.String("LISTEN", n.config.ApiAddress)
271         env.Parse()
272         n.api.StartServer(*listenAddr)
273 }
274
275 func (n *Node) OnStart() error {
276         if n.miningEnable {
277                 if _, err := n.wallet.AccountMgr.GetMiningAddress(); err != nil {
278                         n.miningEnable = false
279                         log.Error(err)
280                 } else {
281                         n.cpuMiner.Start()
282                 }
283         }
284         if !n.config.VaultMode {
285                 if err := n.syncManager.Start(); err != nil {
286                         return err
287                 }
288         }
289
290         n.initAndstartAPIServer()
291         if err := n.notificationMgr.Start(); err != nil {
292                 return err
293         }
294
295         if !n.config.Web.Closed {
296                 _, port, err := net.SplitHostPort(n.config.ApiAddress)
297                 if err != nil {
298                         log.Error("Invalid api address")
299                         return err
300                 }
301                 launchWebBrowser(port)
302         }
303         return nil
304 }
305
306 func (n *Node) OnStop() {
307         n.notificationMgr.Shutdown()
308         n.notificationMgr.WaitForShutdown()
309         n.BaseService.OnStop()
310         if n.miningEnable {
311                 n.cpuMiner.Stop()
312         }
313         if !n.config.VaultMode {
314                 n.syncManager.Stop()
315         }
316         n.eventDispatcher.Stop()
317 }
318
319 func (n *Node) RunForever() {
320         // Sleep forever and then...
321         cmn.TrapSignal(func() {
322                 n.Stop()
323         })
324 }