OSDN Git Service

Add go profiling programs (#109)
[bytom/bytom.git] / node / node.go
1 package node
2
3 import (
4         "context"
5         "crypto/tls"
6         "net"
7         "net/http"
8         _ "net/http/pprof"
9         "os"
10         "strings"
11         "sync"
12         "time"
13
14         "github.com/kr/secureheader"
15         log "github.com/sirupsen/logrus"
16         crypto "github.com/tendermint/go-crypto"
17         wire "github.com/tendermint/go-wire"
18         cmn "github.com/tendermint/tmlibs/common"
19         dbm "github.com/tendermint/tmlibs/db"
20
21         bc "github.com/bytom/blockchain"
22         "github.com/bytom/blockchain/account"
23         "github.com/bytom/blockchain/asset"
24         "github.com/bytom/blockchain/pin"
25         "github.com/bytom/blockchain/pseudohsm"
26         "github.com/bytom/blockchain/txdb"
27         "github.com/bytom/blockchain/txfeed"
28         cfg "github.com/bytom/config"
29         "github.com/bytom/consensus"
30         "github.com/bytom/env"
31         "github.com/bytom/errors"
32         "github.com/bytom/p2p"
33         "github.com/bytom/protocol"
34         "github.com/bytom/protocol/bc/legacy"
35         "github.com/bytom/types"
36         "github.com/bytom/version"
37 )
38
39 const (
40         httpReadTimeout  = 2 * time.Minute
41         httpWriteTimeout = time.Hour
42 )
43
44 type Node struct {
45         cmn.BaseService
46
47         // config
48         config *cfg.Config
49
50         // network
51         privKey  crypto.PrivKeyEd25519 // local node's p2p key
52         sw       *p2p.Switch           // p2p connections
53         addrBook *p2p.AddrBook         // known peers
54
55         evsw       types.EventSwitch // pub/sub for services
56         blockStore *txdb.Store
57         bcReactor  *bc.BlockchainReactor
58         accounts   *account.Manager
59         assets     *asset.Registry
60 }
61
62 var (
63         // config vars
64         rootCAs       = env.String("ROOT_CA_CERTS", "") // file path
65         splunkAddr    = os.Getenv("SPLUNKADDR")
66         logFile       = os.Getenv("LOGFILE")
67         logSize       = env.Int("LOGSIZE", 5e6) // 5MB
68         logCount      = env.Int("LOGCOUNT", 9)
69         logQueries    = env.Bool("LOG_QUERIES", false)
70         maxDBConns    = env.Int("MAXDBCONNS", 10)           // set to 100 in prod
71         rpsToken      = env.Int("RATELIMIT_TOKEN", 0)       // reqs/sec
72         rpsRemoteAddr = env.Int("RATELIMIT_REMOTE_ADDR", 0) // reqs/sec
73         indexTxs      = env.Bool("INDEX_TRANSACTIONS", true)
74         home          = bc.HomeDirFromEnvironment()
75         bootURL       = env.String("BOOTURL", "")
76         // build vars; initialized by the linker
77         buildTag    = "?"
78         buildCommit = "?"
79         buildDate   = "?"
80         race        []interface{} // initialized in race.go
81 )
82
83 func NewNodeDefault(config *cfg.Config) *Node {
84         return NewNode(config)
85 }
86
87 func RedirectHandler(next http.Handler) http.Handler {
88         return http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
89                 if req.URL.Path == "/" {
90                         http.Redirect(w, req, "/dashboard/", http.StatusFound)
91                         return
92                 }
93                 next.ServeHTTP(w, req)
94         })
95 }
96
97 type waitHandler struct {
98         h  http.Handler
99         wg sync.WaitGroup
100 }
101
102 func (wh *waitHandler) Set(h http.Handler) {
103         wh.h = h
104         wh.wg.Done()
105 }
106
107 func (wh *waitHandler) ServeHTTP(w http.ResponseWriter, req *http.Request) {
108         wh.wg.Wait()
109         wh.h.ServeHTTP(w, req)
110 }
111
112 func rpcInit(h *bc.BlockchainReactor, config *cfg.Config) {
113         // The waitHandler accepts incoming requests, but blocks until its underlying
114         // handler is set, when the second phase is complete.
115         var coreHandler waitHandler
116         coreHandler.wg.Add(1)
117         mux := http.NewServeMux()
118         mux.Handle("/", &coreHandler)
119
120         var handler http.Handler = mux
121         handler = RedirectHandler(handler)
122
123         secureheader.DefaultConfig.PermitClearLoopback = true
124         secureheader.DefaultConfig.HTTPSRedirect = false
125         secureheader.DefaultConfig.Next = handler
126
127         server := &http.Server{
128                 // Note: we should not set TLSConfig here;
129                 // we took care of TLS with the listener in maybeUseTLS.
130                 Handler:      secureheader.DefaultConfig,
131                 ReadTimeout:  httpReadTimeout,
132                 WriteTimeout: httpWriteTimeout,
133                 // Disable HTTP/2 for now until the Go implementation is more stable.
134                 // https://github.com/golang/go/issues/16450
135                 // https://github.com/golang/go/issues/17071
136                 TLSNextProto: map[string]func(*http.Server, *tls.Conn, http.Handler){},
137         }
138         listenAddr := env.String("LISTEN", config.ApiAddress)
139         listener, err := net.Listen("tcp", *listenAddr)
140         if err != nil {
141                 cmn.Exit(cmn.Fmt("Failed to register tcp port: %v", err))
142         }
143
144         // The `Serve` call has to happen in its own goroutine because
145         // it's blocking and we need to proceed to the rest of the core setup after
146         // we call it.
147         go func() {
148                 if err := server.Serve(listener); err != nil {
149                         log.WithField("error", errors.Wrap(err, "Serve")).Error("Rpc server")
150                 }
151         }()
152         coreHandler.Set(h)
153 }
154
155 func NewNode(config *cfg.Config) *Node {
156         ctx := context.Background()
157
158         // Get store
159         txDB := dbm.NewDB("txdb", config.DBBackend, config.DBDir())
160         store := txdb.NewStore(txDB)
161
162         privKey := crypto.GenPrivKeyEd25519()
163
164         // Make event switch
165         eventSwitch := types.NewEventSwitch()
166         _, err := eventSwitch.Start()
167         if err != nil {
168                 cmn.Exit(cmn.Fmt("Failed to start switch: %v", err))
169         }
170
171         sw := p2p.NewSwitch(config.P2P)
172
173         genesisBlock := &legacy.Block{
174                 BlockHeader:  legacy.BlockHeader{},
175                 Transactions: []*legacy.Tx{},
176         }
177         genesisBlock.UnmarshalText(consensus.InitBlock())
178
179         txPool := protocol.NewTxPool()
180         chain, err := protocol.NewChain(genesisBlock.Hash(), store, txPool)
181         if err != nil {
182                 cmn.Exit(cmn.Fmt("Failed to create chain structure: %v", err))
183         }
184
185         if chain.Height() == 0 {
186                 if err := chain.SaveBlock(genesisBlock); err != nil {
187                         cmn.Exit(cmn.Fmt("Failed to save genesisBlock to store: %v", err))
188                 }
189                 if err := chain.ConnectBlock(genesisBlock); err != nil {
190                         cmn.Exit(cmn.Fmt("Failed to connect genesisBlock to chain: %v", err))
191                 }
192         }
193
194         var accounts *account.Manager = nil
195         var assets *asset.Registry = nil
196         var pinStore *pin.Store = nil
197         var txFeed *txfeed.Tracker = nil
198
199         if config.Wallet.Enable {
200                 accountsDB := dbm.NewDB("account", config.DBBackend, config.DBDir())
201                 accUTXODB := dbm.NewDB("accountutxos", config.DBBackend, config.DBDir())
202                 pinStore = pin.NewStore(accUTXODB)
203                 if err = pinStore.LoadAll(ctx); err != nil {
204                         log.WithField("error", err).Error("load pin store")
205                         return nil
206                 }
207
208                 pinHeight := chain.Height()
209                 if pinHeight > 0 {
210                         pinHeight = pinHeight - 1
211                 }
212
213                 pins := []string{account.PinName, account.DeleteSpentsPinName}
214                 for _, p := range pins {
215                         if err = pinStore.CreatePin(ctx, p, pinHeight); err != nil {
216                                 log.WithField("error", err).Error("Create pin")
217                         }
218                 }
219
220                 accounts = account.NewManager(accountsDB, chain, pinStore)
221                 go accounts.ProcessBlocks(ctx)
222
223                 assetsDB := dbm.NewDB("asset", config.DBBackend, config.DBDir())
224                 assets = asset.NewRegistry(assetsDB, chain)
225
226                 txFeedDB := dbm.NewDB("txfeeds", config.DBBackend, config.DBDir())
227                 txFeed = txfeed.NewTracker(txFeedDB, chain)
228
229                 if err = txFeed.Prepare(ctx); err != nil {
230                         log.WithField("error", err).Error("start txfeed")
231                         return nil
232                 }
233
234         }
235         //Todo HSM
236         /*
237                 if config.HsmUrl != ""{
238                         // todo remoteHSM
239                         cmn.Exit(cmn.Fmt("not implement"))
240                 } else {
241                         hsm, err = pseudohsm.New(config.KeysDir())
242                         if err != nil {
243                                 cmn.Exit(cmn.Fmt("initialize HSM failed: %v", err))
244                         }
245                 }*/
246
247         hsm, err := pseudohsm.New(config.KeysDir())
248         if err != nil {
249                 cmn.Exit(cmn.Fmt("initialize HSM failed: %v", err))
250         }
251         bcReactor := bc.NewBlockchainReactor(chain, txPool, accounts, assets, sw, hsm, pinStore, txFeed)
252
253         sw.AddReactor("BLOCKCHAIN", bcReactor)
254
255         rpcInit(bcReactor, config)
256         // Optionally, start the pex reactor
257         var addrBook *p2p.AddrBook
258         if config.P2P.PexReactor {
259                 addrBook = p2p.NewAddrBook(config.P2P.AddrBookFile(), config.P2P.AddrBookStrict)
260                 pexReactor := p2p.NewPEXReactor(addrBook)
261                 sw.AddReactor("PEX", pexReactor)
262         }
263
264         // run the profile server
265         profileHost := config.ProfListenAddress
266         if profileHost != "" {
267                 // Profiling bytomd programs.see (https://blog.golang.org/profiling-go-programs)
268                 // go tool pprof http://profileHose/debug/pprof/heap
269                 go func() {
270                         http.ListenAndServe(profileHost, nil)
271                 } ()
272         }
273
274         node := &Node{
275                 config: config,
276
277                 privKey:  privKey,
278                 sw:       sw,
279                 addrBook: addrBook,
280
281                 evsw:       eventSwitch,
282                 bcReactor:  bcReactor,
283                 blockStore: store,
284                 accounts:   accounts,
285                 assets:     assets,
286         }
287         node.BaseService = *cmn.NewBaseService(nil, "Node", node)
288         return node
289 }
290
291 func (n *Node) OnStart() error {
292         // Create & add listener
293         protocol, address := ProtocolAndAddress(n.config.P2P.ListenAddress)
294         l := p2p.NewDefaultListener(protocol, address, n.config.P2P.SkipUPNP, nil)
295         n.sw.AddListener(l)
296
297         // Start the switch
298         n.sw.SetNodeInfo(n.makeNodeInfo())
299         n.sw.SetNodePrivKey(n.privKey)
300         _, err := n.sw.Start()
301         if err != nil {
302                 return err
303         }
304
305         // If seeds exist, add them to the address book and dial out
306         if n.config.P2P.Seeds != "" {
307                 // dial out
308                 seeds := strings.Split(n.config.P2P.Seeds, ",")
309                 if err := n.DialSeeds(seeds); err != nil {
310                         return err
311                 }
312         }
313         return nil
314 }
315
316 func (n *Node) OnStop() {
317         n.BaseService.OnStop()
318
319         log.Info("Stopping Node")
320         // TODO: gracefully disconnect from peers.
321         n.sw.Stop()
322
323 }
324
325 func (n *Node) RunForever() {
326         // Sleep forever and then...
327         cmn.TrapSignal(func() {
328                 n.Stop()
329         })
330 }
331
332 // Add the event switch to reactors, mempool, etc.
333 func SetEventSwitch(evsw types.EventSwitch, eventables ...types.Eventable) {
334         for _, e := range eventables {
335                 e.SetEventSwitch(evsw)
336         }
337 }
338
339 // Add a Listener to accept inbound peer connections.
340 // Add listeners before starting the Node.
341 // The first listener is the primary listener (in NodeInfo)
342 func (n *Node) AddListener(l p2p.Listener) {
343         n.sw.AddListener(l)
344 }
345
346 func (n *Node) Switch() *p2p.Switch {
347         return n.sw
348 }
349
350 func (n *Node) EventSwitch() types.EventSwitch {
351         return n.evsw
352 }
353
354 func (n *Node) makeNodeInfo() *p2p.NodeInfo {
355         nodeInfo := &p2p.NodeInfo{
356                 PubKey:  n.privKey.PubKey().Unwrap().(crypto.PubKeyEd25519),
357                 Moniker: n.config.Moniker,
358                 Network: "bytom",
359                 Version: version.Version,
360                 Other: []string{
361                         cmn.Fmt("wire_version=%v", wire.Version),
362                         cmn.Fmt("p2p_version=%v", p2p.Version),
363                 },
364         }
365
366         if !n.sw.IsListening() {
367                 return nodeInfo
368         }
369
370         p2pListener := n.sw.Listeners()[0]
371         p2pHost := p2pListener.ExternalAddress().IP.String()
372         p2pPort := p2pListener.ExternalAddress().Port
373         //rpcListenAddr := n.config.RPC.ListenAddress
374
375         // We assume that the rpcListener has the same ExternalAddress.
376         // This is probably true because both P2P and RPC listeners use UPnP,
377         // except of course if the rpc is only bound to localhost
378         nodeInfo.ListenAddr = cmn.Fmt("%v:%v", p2pHost, p2pPort)
379         //nodeInfo.Other = append(nodeInfo.Other, cmn.Fmt("rpc_addr=%v", rpcListenAddr))
380         return nodeInfo
381 }
382
383 //------------------------------------------------------------------------------
384
385 func (n *Node) NodeInfo() *p2p.NodeInfo {
386         return n.sw.NodeInfo()
387 }
388
389 func (n *Node) DialSeeds(seeds []string) error {
390         return n.sw.DialSeeds(n.addrBook, seeds)
391 }
392
393 // Defaults to tcp
394 func ProtocolAndAddress(listenAddr string) (string, string) {
395         protocol, address := "tcp", listenAddr
396         parts := strings.SplitN(address, "://", 2)
397         if len(parts) == 2 {
398                 protocol, address = parts[0], parts[1]
399         }
400         return protocol, address
401 }
402
403 //------------------------------------------------------------------------------