OSDN Git Service

trace updater (#2109)
[bytom/bytom.git] / node / node.go
1 package node
2
3 import (
4         "errors"
5         "net"
6         "net/http"
7         _ "net/http/pprof"
8         "path/filepath"
9
10         log "github.com/sirupsen/logrus"
11         cmn "github.com/tendermint/tmlibs/common"
12         browser "github.com/toqueteos/webbrowser"
13
14         "github.com/bytom/bytom/proposal/blockproposer"
15         "github.com/prometheus/prometheus/util/flock"
16
17         "github.com/bytom/bytom/accesstoken"
18         "github.com/bytom/bytom/account"
19         "github.com/bytom/bytom/api"
20         "github.com/bytom/bytom/asset"
21         "github.com/bytom/bytom/blockchain/pseudohsm"
22         cfg "github.com/bytom/bytom/config"
23         "github.com/bytom/bytom/consensus"
24         "github.com/bytom/bytom/contract"
25         "github.com/bytom/bytom/database"
26         dbm "github.com/bytom/bytom/database/leveldb"
27         "github.com/bytom/bytom/env"
28         "github.com/bytom/bytom/event"
29         bytomLog "github.com/bytom/bytom/log"
30         "github.com/bytom/bytom/net/websocket"
31         "github.com/bytom/bytom/netsync"
32         "github.com/bytom/bytom/protocol"
33         w "github.com/bytom/bytom/wallet"
34 )
35
36 const (
37         webHost   = "http://127.0.0.1"
38         logModule = "node"
39 )
40
41 // Node represent bytom node
42 type Node struct {
43         cmn.BaseService
44
45         config          *cfg.Config
46         eventDispatcher *event.Dispatcher
47         syncManager     *netsync.SyncManager
48
49         wallet          *w.Wallet
50         accessTokens    *accesstoken.CredentialStore
51         notificationMgr *websocket.WSNotificationManager
52         api             *api.API
53         chain           *protocol.Chain
54         blockProposer   *blockproposer.BlockProposer
55         miningEnable    bool
56 }
57
58 // NewNode create bytom node
59 func NewNode(config *cfg.Config) *Node {
60         if err := initNodeConfig(config); err != nil {
61                 cmn.Exit(cmn.Fmt("Failed to init config: %v", err))
62         }
63
64         // Get store
65         if config.DBBackend != "memdb" && config.DBBackend != "leveldb" {
66                 cmn.Exit(cmn.Fmt("Param db_backend [%v] is invalid, use leveldb or memdb", config.DBBackend))
67         }
68         coreDB := dbm.NewDB("core", config.DBBackend, config.DBDir())
69         store := database.NewStore(coreDB)
70
71         tokenDB := dbm.NewDB("accesstoken", config.DBBackend, config.DBDir())
72         accessTokens := accesstoken.NewStore(tokenDB)
73
74         dispatcher := event.NewDispatcher()
75         txPool := protocol.NewTxPool(store, dispatcher)
76
77         chain, err := protocol.NewChain(store, txPool, dispatcher)
78         if err != nil {
79                 cmn.Exit(cmn.Fmt("Failed to create chain structure: %v", err))
80         }
81
82         startTraceUpdater(chain, config)
83
84         var accounts *account.Manager
85         var assets *asset.Registry
86         var wallet *w.Wallet
87
88         hsm, err := pseudohsm.New(config.KeysDir())
89         if err != nil {
90                 cmn.Exit(cmn.Fmt("initialize HSM failed: %v", err))
91         }
92
93         if !config.Wallet.Disable {
94                 walletDB := dbm.NewDB("wallet", config.DBBackend, config.DBDir())
95                 accounts = account.NewManager(walletDB, chain)
96                 assets = asset.NewRegistry(walletDB, chain)
97                 contracts := contract.NewRegistry(walletDB)
98                 wallet, err = w.NewWallet(walletDB, accounts, assets, contracts, hsm, chain, dispatcher, config.Wallet.TxIndex)
99                 if err != nil {
100                         log.WithFields(log.Fields{"module": logModule, "error": err}).Error("init NewWallet")
101                 }
102
103                 // trigger rescan wallet
104                 if config.Wallet.Rescan {
105                         wallet.RescanBlocks()
106                 }
107         }
108
109         fastSyncDB := dbm.NewDB("fastsync", config.DBBackend, config.DBDir())
110         syncManager, err := netsync.NewSyncManager(config, chain, txPool, dispatcher, fastSyncDB)
111         if err != nil {
112                 cmn.Exit(cmn.Fmt("Failed to create sync manager: %v", err))
113         }
114
115         notificationMgr := websocket.NewWsNotificationManager(config.Websocket.MaxNumWebsockets, config.Websocket.MaxNumConcurrentReqs, chain, dispatcher)
116
117         // run the profile server
118         profileHost := config.ProfListenAddress
119         if profileHost != "" {
120                 // Profiling bytomd programs.see (https://blog.golang.org/profiling-go-programs)
121                 // go tool pprof http://profileHose/debug/pprof/heap
122                 go func() {
123                         if err = http.ListenAndServe(profileHost, nil); err != nil {
124                                 cmn.Exit(cmn.Fmt("Failed to register tcp profileHost: %v", err))
125                         }
126                 }()
127         }
128
129         node := &Node{
130                 eventDispatcher: dispatcher,
131                 config:          config,
132                 syncManager:     syncManager,
133                 accessTokens:    accessTokens,
134                 wallet:          wallet,
135                 chain:           chain,
136                 miningEnable:    config.Mining,
137                 notificationMgr: notificationMgr,
138         }
139
140         node.BaseService = *cmn.NewBaseService(nil, "Node", node)
141         node.blockProposer = blockproposer.NewBlockProposer(chain, accounts, dispatcher)
142         return node
143 }
144
145 func startTraceUpdater(chain *protocol.Chain, cfg *cfg.Config) {
146         db := dbm.NewDB("trace", cfg.DBBackend, cfg.DBDir())
147         store := contract.NewTraceStore(db)
148         tracerService := contract.NewTraceService(contract.NewInfrastructure(chain, store))
149         traceUpdater := contract.NewTraceUpdater(tracerService, chain)
150         go traceUpdater.Sync()
151 }
152
153 func initNodeConfig(config *cfg.Config) error {
154         if err := lockDataDirectory(config); err != nil {
155                 cmn.Exit("Error: " + err.Error())
156         }
157
158         if err := bytomLog.InitLogFile(config); err != nil {
159                 log.WithField("err", err).Fatalln("InitLogFile failed")
160         }
161
162         initActiveNetParams(config)
163         initCommonConfig(config)
164         return nil
165 }
166
167 // Lock data directory after daemonization
168 func lockDataDirectory(config *cfg.Config) error {
169         _, _, err := flock.New(filepath.Join(config.RootDir, "LOCK"))
170         if err != nil {
171                 return errors.New("datadir already used by another process")
172         }
173         return nil
174 }
175
176 func initActiveNetParams(config *cfg.Config) {
177         var exist bool
178         consensus.ActiveNetParams, exist = consensus.NetParams[config.ChainID]
179         if !exist {
180                 cmn.Exit(cmn.Fmt("chain_id[%v] don't exist", config.ChainID))
181         }
182 }
183
184 func initCommonConfig(config *cfg.Config) {
185         cfg.CommonConfig = config
186 }
187
188 // Lanch web broser or not
189 func launchWebBrowser(port string) {
190         webAddress := webHost + ":" + port
191         log.Info("Launching System Browser with :", webAddress)
192         if err := browser.Open(webAddress); err != nil {
193                 log.Error(err.Error())
194                 return
195         }
196 }
197
198 func (n *Node) initAndstartAPIServer() {
199         n.api = api.NewAPI(n.syncManager, n.wallet, n.blockProposer, n.chain, n.config, n.accessTokens, n.eventDispatcher, n.notificationMgr)
200
201         listenAddr := env.String("LISTEN", n.config.ApiAddress)
202         env.Parse()
203         n.api.StartServer(*listenAddr)
204 }
205
206 func (n *Node) OnStart() error {
207         if n.miningEnable {
208                 if _, err := n.wallet.AccountMgr.GetMiningAddress(); err != nil {
209                         n.miningEnable = false
210                         log.Error(err)
211                 } else {
212                         n.blockProposer.Start()
213                 }
214         }
215         if !n.config.VaultMode {
216                 if err := n.syncManager.Start(); err != nil {
217                         return err
218                 }
219         }
220
221         n.initAndstartAPIServer()
222         if err := n.notificationMgr.Start(); err != nil {
223                 return err
224         }
225
226         if !n.config.Web.Closed {
227                 _, port, err := net.SplitHostPort(n.config.ApiAddress)
228                 if err != nil {
229                         log.Error("Invalid api address")
230                         return err
231                 }
232                 launchWebBrowser(port)
233         }
234         return nil
235 }
236
237 func (n *Node) OnStop() {
238         n.notificationMgr.Shutdown()
239         n.notificationMgr.WaitForShutdown()
240         n.BaseService.OnStop()
241         if n.miningEnable {
242                 n.blockProposer.Stop()
243         }
244         if !n.config.VaultMode {
245                 n.syncManager.Stop()
246         }
247         n.eventDispatcher.Stop()
248 }
249
250 func (n *Node) RunForever() {
251         // Sleep forever and then...
252         cmn.TrapSignal(func() {
253                 n.Stop()
254         })
255 }