OSDN Git Service

optimise
[bytom/bytom.git] / node / node.go
1 package node
2
3 import (
4         "context"
5         "errors"
6         "net/http"
7         _ "net/http/pprof"
8         "os"
9         "path/filepath"
10         "time"
11
12         "github.com/prometheus/prometheus/util/flock"
13         log "github.com/sirupsen/logrus"
14         cmn "github.com/tendermint/tmlibs/common"
15         dbm "github.com/tendermint/tmlibs/db"
16         browser "github.com/toqueteos/webbrowser"
17
18         "github.com/bytom/accesstoken"
19         "github.com/bytom/account"
20         "github.com/bytom/api"
21         "github.com/bytom/asset"
22         "github.com/bytom/blockchain/pseudohsm"
23         "github.com/bytom/blockchain/txfeed"
24         cfg "github.com/bytom/config"
25         "github.com/bytom/consensus"
26         "github.com/bytom/database/leveldb"
27         "github.com/bytom/env"
28         "github.com/bytom/mining/cpuminer"
29         "github.com/bytom/mining/miningpool"
30         "github.com/bytom/netsync"
31         "github.com/bytom/protocol"
32         "github.com/bytom/protocol/bc"
33         "github.com/bytom/protocol/bc/types"
34         "github.com/bytom/types"
35         w "github.com/bytom/wallet"
36 )
37
38 const (
39         webAddress               = "http://127.0.0.1:9888"
40         expireReservationsPeriod = time.Second
41         maxNewBlockChSize        = 1024
42 )
43
44 type Node struct {
45         cmn.BaseService
46
47         // config
48         config *cfg.Config
49
50         syncManager *netsync.SyncManager
51
52         evsw types.EventSwitch // pub/sub for services
53         //bcReactor    *bc.BlockchainReactor
54         wallet       *w.Wallet
55         accessTokens *accesstoken.CredentialStore
56         api          *api.API
57         chain        *protocol.Chain
58         txfeed       *txfeed.Tracker
59         cpuMiner     *cpuminer.CPUMiner
60         miningPool   *miningpool.MiningPool
61         miningEnable bool
62 }
63
64 func NewNode(config *cfg.Config) *Node {
65         ctx := context.Background()
66         if err := lockDataDirectory(config); err != nil {
67                 cmn.Exit("Error: " + err.Error())
68         }
69         initLogFile(config)
70         initActiveNetParams(config)
71         // Get store
72         coreDB := dbm.NewDB("core", config.DBBackend, config.DBDir())
73         store := leveldb.NewStore(coreDB)
74
75         tokenDB := dbm.NewDB("accesstoken", config.DBBackend, config.DBDir())
76         accessTokens := accesstoken.NewStore(tokenDB)
77
78         // Make event switch
79         eventSwitch := types.NewEventSwitch()
80         if _, err := eventSwitch.Start(); err != nil {
81                 cmn.Exit(cmn.Fmt("Failed to start switch: %v", err))
82         }
83
84         txPool := protocol.NewTxPool()
85         chain, err := protocol.NewChain(store, txPool)
86         if err != nil {
87                 cmn.Exit(cmn.Fmt("Failed to create chain structure: %v", err))
88         }
89
90         var accounts *account.Manager = nil
91         var assets *asset.Registry = nil
92         var wallet *w.Wallet = nil
93         var txFeed *txfeed.Tracker = nil
94
95         txFeedDB := dbm.NewDB("txfeeds", config.DBBackend, config.DBDir())
96         txFeed = txfeed.NewTracker(txFeedDB, chain)
97
98         if err = txFeed.Prepare(ctx); err != nil {
99                 log.WithField("error", err).Error("start txfeed")
100                 return nil
101         }
102
103         hsm, err := pseudohsm.New(config.KeysDir())
104         if err != nil {
105                 cmn.Exit(cmn.Fmt("initialize HSM failed: %v", err))
106         }
107
108         if !config.Wallet.Disable {
109                 walletDB := dbm.NewDB("wallet", config.DBBackend, config.DBDir())
110                 accounts = account.NewManager(walletDB, chain)
111                 assets = asset.NewRegistry(walletDB, chain)
112                 wallet, err = w.NewWallet(walletDB, accounts, assets, hsm, chain)
113                 if err != nil {
114                         log.WithField("error", err).Error("init NewWallet")
115                 }
116
117                 // trigger rescan wallet
118                 if config.Wallet.Rescan {
119                         wallet.RescanBlocks()
120                 }
121
122                 // Clean up expired UTXO reservations periodically.
123                 go accounts.ExpireReservations(ctx, expireReservationsPeriod)
124         }
125         newBlockCh := make(chan *bc.Hash, maxNewBlockChSize)
126
127         syncManager, _ := netsync.NewSyncManager(config, chain, txPool, newBlockCh)
128
129         // get transaction from txPool and send it to syncManager and wallet
130         go newPoolTxListener(txPool, syncManager, wallet)
131
132         // run the profile server
133         profileHost := config.ProfListenAddress
134         if profileHost != "" {
135                 // Profiling bytomd programs.see (https://blog.golang.org/profiling-go-programs)
136                 // go tool pprof http://profileHose/debug/pprof/heap
137                 go func() {
138                         http.ListenAndServe(profileHost, nil)
139                 }()
140         }
141
142         node := &Node{
143                 config:       config,
144                 syncManager:  syncManager,
145                 evsw:         eventSwitch,
146                 accessTokens: accessTokens,
147                 wallet:       wallet,
148                 chain:        chain,
149                 txfeed:       txFeed,
150                 miningEnable: config.Mining,
151         }
152
153         node.cpuMiner = cpuminer.NewCPUMiner(chain, accounts, txPool, newBlockCh)
154         node.miningPool = miningpool.NewMiningPool(chain, accounts, txPool, newBlockCh)
155
156         node.BaseService = *cmn.NewBaseService(nil, "Node", node)
157
158         return node
159 }
160
161 // newPoolTxListener listener transaction from txPool, and send it to syncManager and wallet
162 func newPoolTxListener(txPool *protocol.TxPool, syncManager *netsync.SyncManager, wallet *w.Wallet) {
163         newTxCh := txPool.GetNewTxCh()
164         syncManagerTxCh := syncManager.GetNewTxCh()
165
166         for {
167                 newTx := <-newTxCh
168                 syncManagerTxCh <- newTx
169                 if wallet != nil {
170                         wallet.GetNewTxCh() <- newTx
171                 }
172         }
173 }
174
175 // Lock data directory after daemonization
176 func lockDataDirectory(config *cfg.Config) error {
177         _, _, err := flock.New(filepath.Join(config.RootDir, "LOCK"))
178         if err != nil {
179                 return errors.New("datadir already used by another process")
180         }
181         return nil
182 }
183
184 func initActiveNetParams(config *cfg.Config) {
185         var exist bool
186         consensus.ActiveNetParams, exist = consensus.NetParams[config.ChainID]
187         if !exist {
188                 cmn.Exit(cmn.Fmt("chain_id[%v] don't exist", config.ChainID))
189         }
190 }
191
192 func initLogFile(config *cfg.Config) {
193         if config.LogFile == "" {
194                 return
195         }
196         cmn.EnsureDir(filepath.Dir(config.LogFile), 0700)
197         file, err := os.OpenFile(config.LogFile, os.O_CREATE|os.O_WRONLY|os.O_APPEND, 0666)
198         if err == nil {
199                 log.SetOutput(file)
200         } else {
201                 log.WithField("err", err).Info("using default")
202         }
203
204 }
205
206 // Lanch web broser or not
207 func launchWebBrowser() {
208         log.Info("Launching System Browser with :", webAddress)
209         if err := browser.Open(webAddress); err != nil {
210                 log.Error(err.Error())
211                 return
212         }
213 }
214
215 func (n *Node) initAndstartApiServer() {
216         n.api = api.NewAPI(n.syncManager, n.wallet, n.txfeed, n.cpuMiner, n.miningPool, n.chain, n.config, n.accessTokens)
217
218         listenAddr := env.String("LISTEN", n.config.ApiAddress)
219         env.Parse()
220         n.api.StartServer(*listenAddr)
221 }
222
223 func (n *Node) OnStart() error {
224         if n.miningEnable {
225                 n.cpuMiner.Start()
226         }
227         if !n.config.VaultMode {
228                 n.syncManager.Start()
229         }
230         n.initAndstartApiServer()
231         if !n.config.Web.Closed {
232                 launchWebBrowser()
233         }
234
235         return nil
236 }
237
238 func (n *Node) OnStop() {
239         n.BaseService.OnStop()
240         if n.miningEnable {
241                 n.cpuMiner.Stop()
242         }
243         if !n.config.VaultMode {
244                 n.syncManager.Stop()
245         }
246 }
247
248 func (n *Node) RunForever() {
249         // Sleep forever and then...
250         cmn.TrapSignal(func() {
251                 n.Stop()
252         })
253 }
254
255 func (n *Node) EventSwitch() types.EventSwitch {
256         return n.evsw
257 }
258
259 func (n *Node) SyncManager() *netsync.SyncManager {
260         return n.syncManager
261 }
262
263 func (n *Node) MiningPool() *miningpool.MiningPool {
264         return n.miningPool
265 }