OSDN Git Service

add_new_casper_for_chain (#1931)
[bytom/bytom.git] / node / node.go
1 package node
2
3 import (
4         "context"
5         "errors"
6         "net"
7         "net/http"
8         _ "net/http/pprof"
9         "path/filepath"
10
11         log "github.com/sirupsen/logrus"
12         cmn "github.com/tendermint/tmlibs/common"
13         browser "github.com/toqueteos/webbrowser"
14         "github.com/prometheus/prometheus/util/flock"
15
16         "github.com/bytom/bytom/accesstoken"
17         "github.com/bytom/bytom/account"
18         "github.com/bytom/bytom/api"
19         "github.com/bytom/bytom/asset"
20         "github.com/bytom/bytom/blockchain/pseudohsm"
21         "github.com/bytom/bytom/blockchain/txfeed"
22         cfg "github.com/bytom/bytom/config"
23         "github.com/bytom/bytom/consensus"
24         "github.com/bytom/bytom/contract"
25         "github.com/bytom/bytom/database"
26         dbm "github.com/bytom/bytom/database/leveldb"
27         "github.com/bytom/bytom/env"
28         "github.com/bytom/bytom/event"
29         bytomLog "github.com/bytom/bytom/log"
30         "github.com/bytom/bytom/net/websocket"
31         "github.com/bytom/bytom/netsync"
32         "github.com/bytom/bytom/protocol"
33         w "github.com/bytom/bytom/wallet"
34 )
35
36 const (
37         webHost   = "http://127.0.0.1"
38         logModule = "node"
39 )
40
41 // Node represent bytom node
42 type Node struct {
43         cmn.BaseService
44
45         config          *cfg.Config
46         eventDispatcher *event.Dispatcher
47         syncManager     *netsync.SyncManager
48
49         wallet          *w.Wallet
50         accessTokens    *accesstoken.CredentialStore
51         notificationMgr *websocket.WSNotificationManager
52         api             *api.API
53         chain           *protocol.Chain
54         txfeed          *txfeed.Tracker
55 }
56
57 // NewNode create bytom node
58 func NewNode(config *cfg.Config) *Node {
59         ctx := context.Background()
60         if err := lockDataDirectory(config); err != nil {
61                 cmn.Exit("Error: " + err.Error())
62         }
63
64         if err := bytomLog.InitLogFile(config); err != nil {
65                 log.WithField("err", err).Fatalln("InitLogFile failed")
66         }
67
68         initActiveNetParams(config)
69         initCommonConfig(config)
70
71         // Get store
72         if config.DBBackend != "memdb" && config.DBBackend != "leveldb" {
73                 cmn.Exit(cmn.Fmt("Param db_backend [%v] is invalid, use leveldb or memdb", config.DBBackend))
74         }
75         coreDB := dbm.NewDB("core", config.DBBackend, config.DBDir())
76         store := database.NewStore(coreDB)
77
78         tokenDB := dbm.NewDB("accesstoken", config.DBBackend, config.DBDir())
79         accessTokens := accesstoken.NewStore(tokenDB)
80
81         dispatcher := event.NewDispatcher()
82         txPool := protocol.NewTxPool(store, dispatcher)
83
84         chain, err := protocol.NewChain(store, txPool)
85         if err != nil {
86                 cmn.Exit(cmn.Fmt("Failed to create chain structure: %v", err))
87         }
88
89         var accounts *account.Manager
90         var assets *asset.Registry
91         var wallet *w.Wallet
92         var txFeed *txfeed.Tracker
93
94         txFeedDB := dbm.NewDB("txfeeds", config.DBBackend, config.DBDir())
95         txFeed = txfeed.NewTracker(txFeedDB, chain)
96
97         if err = txFeed.Prepare(ctx); err != nil {
98                 log.WithFields(log.Fields{"module": logModule, "error": err}).Error("start txfeed")
99                 return nil
100         }
101
102         hsm, err := pseudohsm.New(config.KeysDir())
103         if err != nil {
104                 cmn.Exit(cmn.Fmt("initialize HSM failed: %v", err))
105         }
106
107         if !config.Wallet.Disable {
108                 walletDB := dbm.NewDB("wallet", config.DBBackend, config.DBDir())
109                 accounts = account.NewManager(walletDB, chain)
110                 assets = asset.NewRegistry(walletDB, chain)
111                 contracts := contract.NewRegistry(walletDB)
112                 wallet, err = w.NewWallet(walletDB, accounts, assets, contracts, hsm, chain, dispatcher, config.Wallet.TxIndex)
113                 if err != nil {
114                         log.WithFields(log.Fields{"module": logModule, "error": err}).Error("init NewWallet")
115                 }
116
117                 // trigger rescan wallet
118                 if config.Wallet.Rescan {
119                         wallet.RescanBlocks()
120                 }
121         }
122
123         fastSyncDB := dbm.NewDB("fastsync", config.DBBackend, config.DBDir())
124         syncManager, err := netsync.NewSyncManager(config, chain, txPool, dispatcher, fastSyncDB)
125         if err != nil {
126                 cmn.Exit(cmn.Fmt("Failed to create sync manager: %v", err))
127         }
128
129         notificationMgr := websocket.NewWsNotificationManager(config.Websocket.MaxNumWebsockets, config.Websocket.MaxNumConcurrentReqs, chain, dispatcher)
130
131         // run the profile server
132         profileHost := config.ProfListenAddress
133         if profileHost != "" {
134                 // Profiling bytomd programs.see (https://blog.golang.org/profiling-go-programs)
135                 // go tool pprof http://profileHose/debug/pprof/heap
136                 go func() {
137                         if err = http.ListenAndServe(profileHost, nil); err != nil {
138                                 cmn.Exit(cmn.Fmt("Failed to register tcp profileHost: %v", err))
139                         }
140                 }()
141         }
142
143         node := &Node{
144                 eventDispatcher: dispatcher,
145                 config:          config,
146                 syncManager:     syncManager,
147                 accessTokens:    accessTokens,
148                 wallet:          wallet,
149                 chain:           chain,
150                 txfeed:          txFeed,
151
152                 notificationMgr: notificationMgr,
153         }
154
155         node.BaseService = *cmn.NewBaseService(nil, "Node", node)
156
157         return node
158 }
159
160 // Lock data directory after daemonization
161 func lockDataDirectory(config *cfg.Config) error {
162         _, _, err := flock.New(filepath.Join(config.RootDir, "LOCK"))
163         if err != nil {
164                 return errors.New("datadir already used by another process")
165         }
166         return nil
167 }
168
169 func initActiveNetParams(config *cfg.Config) {
170         var exist bool
171         consensus.ActiveNetParams, exist = consensus.NetParams[config.ChainID]
172         if !exist {
173                 cmn.Exit(cmn.Fmt("chain_id[%v] don't exist", config.ChainID))
174         }
175 }
176
177 func initCommonConfig(config *cfg.Config) {
178         cfg.CommonConfig = config
179 }
180
181 // Lanch web broser or not
182 func launchWebBrowser(port string) {
183         webAddress := webHost + ":" + port
184         log.Info("Launching System Browser with :", webAddress)
185         if err := browser.Open(webAddress); err != nil {
186                 log.Error(err.Error())
187                 return
188         }
189 }
190
191 func (n *Node) initAndstartAPIServer() {
192         n.api = api.NewAPI(n.syncManager, n.wallet, n.txfeed, n.chain, n.config, n.accessTokens, n.eventDispatcher, n.notificationMgr)
193
194         listenAddr := env.String("LISTEN", n.config.ApiAddress)
195         env.Parse()
196         n.api.StartServer(*listenAddr)
197 }
198
199 func (n *Node) OnStart() error {
200         if !n.config.VaultMode {
201                 if err := n.syncManager.Start(); err != nil {
202                         return err
203                 }
204         }
205
206         n.initAndstartAPIServer()
207         if err := n.notificationMgr.Start(); err != nil {
208                 return err
209         }
210
211         if !n.config.Web.Closed {
212                 _, port, err := net.SplitHostPort(n.config.ApiAddress)
213                 if err != nil {
214                         log.Error("Invalid api address")
215                         return err
216                 }
217                 launchWebBrowser(port)
218         }
219         return nil
220 }
221
222 func (n *Node) OnStop() {
223         n.notificationMgr.Shutdown()
224         n.notificationMgr.WaitForShutdown()
225         n.BaseService.OnStop()
226         if !n.config.VaultMode {
227                 n.syncManager.Stop()
228         }
229         n.eventDispatcher.Stop()
230 }
231
232 func (n *Node) RunForever() {
233         // Sleep forever and then...
234         cmn.TrapSignal(func() {
235                 n.Stop()
236         })
237 }