OSDN Git Service

Hulk did something
[bytom/vapor.git] / node / node.go
1 package node
2
3 import (
4         "context"
5         "errors"
6         "net"
7         "net/http"
8         _ "net/http/pprof"
9         "os"
10         "path/filepath"
11
12         "github.com/prometheus/prometheus/util/flock"
13         log "github.com/sirupsen/logrus"
14         cmn "github.com/tendermint/tmlibs/common"
15         browser "github.com/toqueteos/webbrowser"
16
17         "github.com/vapor/accesstoken"
18         "github.com/vapor/account"
19         "github.com/vapor/api"
20         "github.com/vapor/asset"
21         "github.com/vapor/blockchain/pseudohsm"
22         "github.com/vapor/blockchain/txfeed"
23         cfg "github.com/vapor/config"
24         "github.com/vapor/consensus"
25         "github.com/vapor/database"
26         dbm "github.com/vapor/database/leveldb"
27         "github.com/vapor/env"
28         "github.com/vapor/event"
29         "github.com/vapor/mining/cpuminer"
30         "github.com/vapor/mining/miningpool"
31         "github.com/vapor/mining/tensority"
32         "github.com/vapor/net/websocket"
33         "github.com/vapor/netsync"
34         "github.com/vapor/p2p"
35         "github.com/vapor/protocol"
36         w "github.com/vapor/wallet"
37 )
38
39 const (
40         webHost   = "http://127.0.0.1"
41         logModule = "node"
42 )
43
44 // Node represent bytom node
45 type Node struct {
46         cmn.BaseService
47
48         config          *cfg.Config
49         eventDispatcher *event.Dispatcher
50         syncManager     *netsync.SyncManager
51
52         wallet          *w.Wallet
53         accessTokens    *accesstoken.CredentialStore
54         notificationMgr *websocket.WSNotificationManager
55         api             *api.API
56         chain           *protocol.Chain
57         txfeed          *txfeed.Tracker
58         cpuMiner        *cpuminer.CPUMiner
59         miningPool      *miningpool.MiningPool
60         miningEnable    bool
61 }
62
63 // NewNode create bytom node
64 func NewNode(config *cfg.Config) *Node {
65         ctx := context.Background()
66         if err := lockDataDirectory(config); err != nil {
67                 cmn.Exit("Error: " + err.Error())
68         }
69         initLogFile(config)
70         initActiveNetParams(config)
71         initCommonConfig(config)
72
73         // Get store
74         if config.DBBackend != "memdb" && config.DBBackend != "leveldb" {
75                 cmn.Exit(cmn.Fmt("Param db_backend [%v] is invalid, use leveldb or memdb", config.DBBackend))
76         }
77         coreDB := dbm.NewDB("core", config.DBBackend, config.DBDir())
78         store := database.NewStore(coreDB)
79
80         tokenDB := dbm.NewDB("accesstoken", config.DBBackend, config.DBDir())
81         accessTokens := accesstoken.NewStore(tokenDB)
82
83         dispatcher := event.NewDispatcher()
84         txPool := protocol.NewTxPool(store, dispatcher)
85         chain, err := protocol.NewChain(store, txPool)
86         if err != nil {
87                 cmn.Exit(cmn.Fmt("Failed to create chain structure: %v", err))
88         }
89
90         var accounts *account.Manager
91         var assets *asset.Registry
92         var wallet *w.Wallet
93         var txFeed *txfeed.Tracker
94
95         txFeedDB := dbm.NewDB("txfeeds", config.DBBackend, config.DBDir())
96         txFeed = txfeed.NewTracker(txFeedDB, chain)
97
98         if err = txFeed.Prepare(ctx); err != nil {
99                 log.WithFields(log.Fields{"module": logModule, "error": err}).Error("start txfeed")
100                 return nil
101         }
102
103         hsm, err := pseudohsm.New(config.KeysDir())
104         if err != nil {
105                 cmn.Exit(cmn.Fmt("initialize HSM failed: %v", err))
106         }
107
108         if !config.Wallet.Disable {
109                 walletDB := dbm.NewDB("wallet", config.DBBackend, config.DBDir())
110                 accounts = account.NewManager(walletDB, chain)
111                 assets = asset.NewRegistry(walletDB, chain)
112                 wallet, err = w.NewWallet(walletDB, accounts, assets, hsm, chain, dispatcher, config.Wallet.TxIndex)
113                 if err != nil {
114                         log.WithFields(log.Fields{"module": logModule, "error": err}).Error("init NewWallet")
115                 }
116
117                 // trigger rescan wallet
118                 if config.Wallet.Rescan {
119                         wallet.RescanBlocks()
120                 }
121         }
122
123         syncManager, err := netsync.NewSyncManager(config, chain, txPool, dispatcher)
124         if err != nil {
125                 cmn.Exit(cmn.Fmt("Failed to create sync manager: %v", err))
126         }
127
128         notificationMgr := websocket.NewWsNotificationManager(config.Websocket.MaxNumWebsockets, config.Websocket.MaxNumConcurrentReqs, chain, dispatcher)
129
130         // run the profile server
131         profileHost := config.ProfListenAddress
132         if profileHost != "" {
133                 // Profiling bytomd programs.see (https://blog.golang.org/profiling-go-programs)
134                 // go tool pprof http://profileHose/debug/pprof/heap
135                 go func() {
136                         if err = http.ListenAndServe(profileHost, nil); err != nil {
137                                 cmn.Exit(cmn.Fmt("Failed to register tcp profileHost: %v", err))
138                         }
139                 }()
140         }
141
142         node := &Node{
143                 eventDispatcher: dispatcher,
144                 config:          config,
145                 syncManager:     syncManager,
146                 accessTokens:    accessTokens,
147                 wallet:          wallet,
148                 chain:           chain,
149                 txfeed:          txFeed,
150                 miningEnable:    config.Mining,
151
152                 notificationMgr: notificationMgr,
153         }
154
155         node.cpuMiner = cpuminer.NewCPUMiner(chain, accounts, txPool, dispatcher)
156         node.miningPool = miningpool.NewMiningPool(chain, accounts, txPool, dispatcher)
157
158         node.BaseService = *cmn.NewBaseService(nil, "Node", node)
159
160         if config.Simd.Enable {
161                 tensority.UseSIMD = true
162         }
163
164         return node
165 }
166
167 // Lock data directory after daemonization
168 func lockDataDirectory(config *cfg.Config) error {
169         _, _, err := flock.New(filepath.Join(config.RootDir, "LOCK"))
170         if err != nil {
171                 return errors.New("datadir already used by another process")
172         }
173         return nil
174 }
175
176 func initActiveNetParams(config *cfg.Config) {
177         var exist bool
178         consensus.ActiveNetParams, exist = consensus.NetParams[config.ChainID]
179         if !exist {
180                 cmn.Exit(cmn.Fmt("chain_id[%v] don't exist", config.ChainID))
181         }
182 }
183
184 func initLogFile(config *cfg.Config) {
185         if config.LogFile == "" {
186                 return
187         }
188         cmn.EnsureDir(filepath.Dir(config.LogFile), 0700)
189         file, err := os.OpenFile(config.LogFile, os.O_CREATE|os.O_WRONLY|os.O_APPEND, 0666)
190         if err == nil {
191                 log.SetOutput(file)
192         } else {
193                 log.WithFields(log.Fields{"module": logModule, "err": err}).Info("using default")
194         }
195
196 }
197
198 func initCommonConfig(config *cfg.Config) {
199         cfg.CommonConfig = config
200 }
201
202 // Lanch web broser or not
203 func launchWebBrowser(port string) {
204         webAddress := webHost + ":" + port
205         log.Info("Launching System Browser with :", webAddress)
206         if err := browser.Open(webAddress); err != nil {
207                 log.Error(err.Error())
208                 return
209         }
210 }
211
212 func (n *Node) initAndstartAPIServer() {
213         n.api = api.NewAPI(n.syncManager, n.wallet, n.txfeed, n.cpuMiner, n.miningPool, n.chain, n.config, n.accessTokens, n.eventDispatcher, n.notificationMgr)
214
215         listenAddr := env.String("LISTEN", n.config.ApiAddress)
216         env.Parse()
217         n.api.StartServer(*listenAddr)
218 }
219
220 func (n *Node) OnStart() error {
221         if n.miningEnable {
222                 if _, err := n.wallet.AccountMgr.GetMiningAddress(); err != nil {
223                         n.miningEnable = false
224                         log.Error(err)
225                 } else {
226                         n.cpuMiner.Start()
227                 }
228         }
229         if !n.config.VaultMode {
230                 if err := n.syncManager.Start(); err != nil {
231                         return err
232                 }
233         }
234
235         n.initAndstartAPIServer()
236         if err := n.notificationMgr.Start(); err != nil {
237                 return err
238         }
239
240         if !n.config.Web.Closed {
241                 _, port, err := net.SplitHostPort(n.config.ApiAddress)
242                 if err != nil {
243                         log.Error("Invalid api address")
244                         return err
245                 }
246                 launchWebBrowser(port)
247         }
248         return nil
249 }
250
251 func (n *Node) OnStop() {
252         n.notificationMgr.Shutdown()
253         n.notificationMgr.WaitForShutdown()
254         n.BaseService.OnStop()
255         if n.miningEnable {
256                 n.cpuMiner.Stop()
257         }
258         if !n.config.VaultMode {
259                 n.syncManager.Stop()
260         }
261         n.eventDispatcher.Stop()
262 }
263
264 func (n *Node) RunForever() {
265         // Sleep forever and then...
266         cmn.TrapSignal(func() {
267                 n.Stop()
268         })
269 }
270
271 func (n *Node) NodeInfo() *p2p.NodeInfo {
272         return n.syncManager.NodeInfo()
273 }
274
275 func (n *Node) MiningPool() *miningpool.MiningPool {
276         return n.miningPool
277 }