OSDN Git Service

Sync for late node (#527)
[bytom/vapor.git] / node / node.go
1 package node
2
3 import (
4         "encoding/hex"
5         "errors"
6         "net"
7         "net/http"
8         // debug tool
9         _ "net/http/pprof"
10         "path/filepath"
11         "reflect"
12
13         "github.com/prometheus/prometheus/util/flock"
14         log "github.com/sirupsen/logrus"
15         cmn "github.com/tendermint/tmlibs/common"
16         browser "github.com/toqueteos/webbrowser"
17
18         "github.com/bytom/vapor/accesstoken"
19         "github.com/bytom/vapor/account"
20         "github.com/bytom/vapor/api"
21         "github.com/bytom/vapor/application/mov"
22         "github.com/bytom/vapor/asset"
23         "github.com/bytom/vapor/blockchain/pseudohsm"
24         cfg "github.com/bytom/vapor/config"
25         "github.com/bytom/vapor/consensus"
26         "github.com/bytom/vapor/database"
27         dbm "github.com/bytom/vapor/database/leveldb"
28         "github.com/bytom/vapor/env"
29         "github.com/bytom/vapor/event"
30         vaporLog "github.com/bytom/vapor/log"
31         "github.com/bytom/vapor/net/websocket"
32         "github.com/bytom/vapor/netsync"
33         "github.com/bytom/vapor/proposal/blockproposer"
34         "github.com/bytom/vapor/protocol"
35         "github.com/bytom/vapor/protocol/bc/types"
36         w "github.com/bytom/vapor/wallet"
37 )
38
39 const (
40         webHost   = "http://127.0.0.1"
41         logModule = "node"
42 )
43
44 // Node represent bytom node
45 type Node struct {
46         cmn.BaseService
47
48         config          *cfg.Config
49         eventDispatcher *event.Dispatcher
50         syncManager     *netsync.SyncManager
51
52         wallet          *w.Wallet
53         accessTokens    *accesstoken.CredentialStore
54         notificationMgr *websocket.WSNotificationManager
55         api             *api.API
56         chain           *protocol.Chain
57         cpuMiner        *blockproposer.BlockProposer
58         miningEnable    bool
59 }
60
61 // NewNode create bytom node
62 func NewNode(config *cfg.Config) *Node {
63         initNodeConfig(config)
64
65         if err := vaporLog.InitLogFile(config); err != nil {
66                 log.WithField("err", err).Fatalln("InitLogFile failed")
67         }
68
69         log.WithFields(log.Fields{
70                 "module":             logModule,
71                 "pubkey":             config.PrivateKey().XPub(),
72                 "fed_xpubs":          config.Federation.Xpubs,
73                 "fed_quorum":         config.Federation.Quorum,
74                 "fed_controlprogram": hex.EncodeToString(cfg.FederationWScript(config)),
75         }).Info()
76
77         // Get store
78         if config.DBBackend != "memdb" && config.DBBackend != "leveldb" {
79                 cmn.Exit(cmn.Fmt("Param db_backend [%v] is invalid, use leveldb or memdb", config.DBBackend))
80         }
81         coreDB := dbm.NewDB("core", config.DBBackend, config.DBDir())
82         store := database.NewStore(coreDB)
83
84         tokenDB := dbm.NewDB("accesstoken", config.DBBackend, config.DBDir())
85         accessTokens := accesstoken.NewStore(tokenDB)
86
87         dispatcher := event.NewDispatcher()
88         movCore := mov.NewCore(config.DBBackend, config.DBDir(), consensus.ActiveNetParams.MovStartHeight)
89         assetFilter := protocol.NewAssetFilter(config.CrossChain.AssetWhitelist)
90         txPool := protocol.NewTxPool(store, []protocol.DustFilterer{movCore, assetFilter}, dispatcher)
91         chain, err := protocol.NewChain(store, txPool, []protocol.SubProtocol{movCore}, dispatcher)
92         if err != nil {
93                 cmn.Exit(cmn.Fmt("Failed to create chain structure: %v", err))
94         }
95
96         if err := checkConfig(chain, config); err != nil {
97                 panic(err)
98         }
99
100         var accounts *account.Manager
101         var assets *asset.Registry
102         var wallet *w.Wallet
103
104         hsm, err := pseudohsm.New(config.KeysDir())
105         if err != nil {
106                 cmn.Exit(cmn.Fmt("initialize HSM failed: %v", err))
107         }
108
109         if !config.Wallet.Disable {
110                 walletDB := dbm.NewDB("wallet", config.DBBackend, config.DBDir())
111                 walletStore := database.NewWalletStore(walletDB)
112                 accountStore := database.NewAccountStore(walletDB)
113                 accounts = account.NewManager(accountStore, chain)
114                 assets = asset.NewRegistry(walletDB, chain)
115                 wallet, err = w.NewWallet(walletStore, accounts, assets, hsm, chain, dispatcher, config.Wallet.TxIndex)
116                 if err != nil {
117                         log.WithFields(log.Fields{"module": logModule, "error": err}).Error("init NewWallet")
118                 }
119
120                 if err = wallet.Run(); err != nil {
121                         log.WithFields(log.Fields{"module": logModule, "error": err}).Error("init NewWallet work running thread")
122                 }
123
124                 // trigger rescan wallet
125                 if config.Wallet.Rescan {
126                         wallet.RescanBlocks()
127                 }
128         }
129         fastSyncDB := dbm.NewDB("fastsync", config.DBBackend, config.DBDir())
130         syncManager, err := netsync.NewSyncManager(config, chain, txPool, dispatcher, fastSyncDB)
131         if err != nil {
132                 cmn.Exit(cmn.Fmt("Failed to create sync manager: %v", err))
133         }
134
135         notificationMgr := websocket.NewWsNotificationManager(config.Websocket.MaxNumWebsockets, config.Websocket.MaxNumConcurrentReqs, chain, dispatcher)
136
137         // run the profile server
138         profileHost := config.ProfListenAddress
139         if profileHost != "" {
140                 // Profiling vapord programs.see (https://blog.golang.org/profiling-go-programs)
141                 // go tool pprof http://profileHose/debug/pprof/heap
142                 go func() {
143                         if err = http.ListenAndServe(profileHost, nil); err != nil {
144                                 cmn.Exit(cmn.Fmt("Failed to register tcp profileHost: %v", err))
145                         }
146                 }()
147         }
148
149         node := &Node{
150                 eventDispatcher: dispatcher,
151                 config:          config,
152                 syncManager:     syncManager,
153                 accessTokens:    accessTokens,
154                 wallet:          wallet,
155                 chain:           chain,
156                 miningEnable:    config.Mining,
157
158                 notificationMgr: notificationMgr,
159         }
160
161         node.cpuMiner = blockproposer.NewBlockProposer(chain, accounts, txPool, dispatcher)
162         node.BaseService = *cmn.NewBaseService(nil, "Node", node)
163         return node
164 }
165
166 // Rollback rollback chain from one height to targetHeight
167 func Rollback(config *cfg.Config, targetHeight uint64) error {
168         if err := initNodeConfig(config); err != nil {
169                 return err
170         }
171
172         // Get store
173         if config.DBBackend != "leveldb" {
174                 return errors.New("Param db_backend is invalid, use leveldb")
175         }
176
177         coreDB := dbm.NewDB("core", config.DBBackend, config.DBDir())
178         store := database.NewStore(coreDB)
179
180         dispatcher := event.NewDispatcher()
181         movCore := mov.NewCore(config.DBBackend, config.DBDir(), consensus.ActiveNetParams.MovStartHeight)
182         txPool := protocol.NewTxPool(store, []protocol.DustFilterer{movCore}, dispatcher)
183         chain, err := protocol.NewChain(store, txPool, []protocol.SubProtocol{movCore}, dispatcher)
184         if err != nil {
185                 return err
186         }
187
188         hsm, err := pseudohsm.New(config.KeysDir())
189         if err != nil {
190                 return err
191         }
192
193         walletDB := dbm.NewDB("wallet", config.DBBackend, config.DBDir())
194         walletStore := database.NewWalletStore(walletDB)
195         accountStore := database.NewAccountStore(walletDB)
196         accounts := account.NewManager(accountStore, chain)
197         assets := asset.NewRegistry(walletDB, chain)
198         wallet, err := w.NewWallet(walletStore, accounts, assets, hsm, chain, dispatcher, config.Wallet.TxIndex)
199         if err != nil {
200                 return err
201         }
202
203         if err := wallet.Rollback(targetHeight); err != nil {
204                 return err
205         }
206
207         return chain.Rollback(targetHeight)
208 }
209
210 func initNodeConfig(config *cfg.Config) error {
211         if err := lockDataDirectory(config); err != nil {
212                 log.WithField("err", err).Info("Error: " + err.Error())
213                 return err
214         }
215
216         if err := cfg.LoadFederationFile(config.FederationFile(), config); err != nil {
217                 log.WithField("err", err).Info("Failed to load federated information")
218                 return err
219         }
220
221         if err := consensus.InitActiveNetParams(config.ChainID); err != nil {
222                 log.Fatalf("Failed to init ActiveNetParams:[%s]", err.Error())
223         }
224
225         cfg.CommonConfig = config
226         return nil
227 }
228
229 // find whether config xpubs equal genesis block xpubs
230 func checkConfig(chain *protocol.Chain, config *cfg.Config) error {
231         fedpegScript := cfg.FederationWScript(config)
232         genesisBlock, err := chain.GetBlockByHeight(0)
233         if err != nil {
234                 return err
235         }
236         typedInput := genesisBlock.Transactions[0].Inputs[0].TypedInput
237         if v, ok := typedInput.(*types.CoinbaseInput); ok {
238                 if !reflect.DeepEqual(fedpegScript, v.Arbitrary) {
239                         return errors.New("config xpubs don't equal genesis block xpubs")
240                 }
241         }
242         return nil
243 }
244
245 // Lock data directory after daemonization
246 func lockDataDirectory(config *cfg.Config) error {
247         _, _, err := flock.New(filepath.Join(config.RootDir, "LOCK"))
248         if err != nil {
249                 return errors.New("datadir already used by another process")
250         }
251         return nil
252 }
253
254 func initCommonConfig(config *cfg.Config) {
255         cfg.CommonConfig = config
256 }
257
258 // Lanch web broser or not
259 func launchWebBrowser(port string) {
260         webAddress := webHost + ":" + port
261         log.Info("Launching System Browser with :", webAddress)
262         if err := browser.Open(webAddress); err != nil {
263                 log.Error(err.Error())
264                 return
265         }
266 }
267
268 func (n *Node) initAndstartAPIServer() {
269         n.api = api.NewAPI(n.syncManager, n.wallet, n.cpuMiner, n.chain, n.config, n.accessTokens, n.eventDispatcher, n.notificationMgr)
270
271         listenAddr := env.String("LISTEN", n.config.ApiAddress)
272         env.Parse()
273         n.api.StartServer(*listenAddr)
274 }
275
276 // OnStart implements BaseService
277 func (n *Node) OnStart() error {
278         if n.miningEnable {
279                 if _, err := n.wallet.AccountMgr.GetMiningAddress(); err != nil {
280                         n.miningEnable = false
281                         log.Error(err)
282                 } else {
283                         n.cpuMiner.Start()
284                 }
285         }
286         if !n.config.VaultMode {
287                 if err := n.syncManager.Start(); err != nil {
288                         return err
289                 }
290         }
291
292         n.initAndstartAPIServer()
293         if err := n.notificationMgr.Start(); err != nil {
294                 return err
295         }
296
297         if !n.config.Web.Closed {
298                 _, port, err := net.SplitHostPort(n.config.ApiAddress)
299                 if err != nil {
300                         log.Error("Invalid api address")
301                         return err
302                 }
303                 launchWebBrowser(port)
304         }
305         return nil
306 }
307
308 // OnStop implements BaseService
309 func (n *Node) OnStop() {
310         n.notificationMgr.Shutdown()
311         n.notificationMgr.WaitForShutdown()
312         n.BaseService.OnStop()
313         if n.miningEnable {
314                 n.cpuMiner.Stop()
315         }
316         if !n.config.VaultMode {
317                 n.syncManager.Stop()
318         }
319         n.eventDispatcher.Stop()
320 }
321
322 // RunForever listen to the stop signal
323 func (n *Node) RunForever() {
324         // Sleep forever and then...
325         cmn.TrapSignal(func() {
326                 n.Stop()
327         })
328 }