OSDN Git Service

Merge remote-tracking branch 'zjb/monday-zjb' into dev
[bytom/bytom.git] / node / node.go
1 package node
2
3 import (
4         "context"
5         "crypto/tls"
6         "net"
7         "net/http"
8         "os"
9         "strings"
10         "sync"
11         "time"
12
13         "github.com/kr/secureheader"
14         log "github.com/sirupsen/logrus"
15         crypto "github.com/tendermint/go-crypto"
16         wire "github.com/tendermint/go-wire"
17         cmn "github.com/tendermint/tmlibs/common"
18         dbm "github.com/tendermint/tmlibs/db"
19         _ "net/http/pprof"
20
21         bc "github.com/bytom/blockchain"
22         "github.com/bytom/blockchain/account"
23         "github.com/bytom/blockchain/asset"
24         "github.com/bytom/blockchain/pin"
25         "github.com/bytom/blockchain/pseudohsm"
26         "github.com/bytom/blockchain/txdb"
27         cfg "github.com/bytom/config"
28         "github.com/bytom/consensus"
29         "github.com/bytom/env"
30         "github.com/bytom/errors"
31         p2p "github.com/bytom/p2p"
32         "github.com/bytom/protocol"
33         "github.com/bytom/protocol/bc/legacy"
34         "github.com/bytom/types"
35         "github.com/bytom/version"
36 )
37
38 const (
39         httpReadTimeout  = 2 * time.Minute
40         httpWriteTimeout = time.Hour
41 )
42
43 type Node struct {
44         cmn.BaseService
45
46         // config
47         config *cfg.Config
48
49         // network
50         privKey  crypto.PrivKeyEd25519 // local node's p2p key
51         sw       *p2p.Switch           // p2p connections
52         addrBook *p2p.AddrBook         // known peers
53
54         // services
55         evsw types.EventSwitch // pub/sub for services
56         //    blockStore       *bc.MemStore
57         blockStore *txdb.Store
58         bcReactor  *bc.BlockchainReactor
59         accounts   *account.Manager
60         assets     *asset.Registry
61 }
62
63 var (
64         // config vars
65         rootCAs       = env.String("ROOT_CA_CERTS", "") // file path
66         splunkAddr    = os.Getenv("SPLUNKADDR")
67         logFile       = os.Getenv("LOGFILE")
68         logSize       = env.Int("LOGSIZE", 5e6) // 5MB
69         logCount      = env.Int("LOGCOUNT", 9)
70         logQueries    = env.Bool("LOG_QUERIES", false)
71         maxDBConns    = env.Int("MAXDBCONNS", 10)           // set to 100 in prod
72         rpsToken      = env.Int("RATELIMIT_TOKEN", 0)       // reqs/sec
73         rpsRemoteAddr = env.Int("RATELIMIT_REMOTE_ADDR", 0) // reqs/sec
74         indexTxs      = env.Bool("INDEX_TRANSACTIONS", true)
75         home          = bc.HomeDirFromEnvironment()
76         bootURL       = env.String("BOOTURL", "")
77         // build vars; initialized by the linker
78         buildTag    = "?"
79         buildCommit = "?"
80         buildDate   = "?"
81         race        []interface{} // initialized in race.go
82 )
83
84 func NewNodeDefault(config *cfg.Config) *Node {
85         return NewNode(config)
86 }
87
88 func RedirectHandler(next http.Handler) http.Handler {
89         return http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
90                 if req.URL.Path == "/" {
91                         http.Redirect(w, req, "/dashboard/", http.StatusFound)
92                         return
93                 }
94                 next.ServeHTTP(w, req)
95         })
96 }
97
98 type waitHandler struct {
99         h  http.Handler
100         wg sync.WaitGroup
101 }
102
103 func (wh *waitHandler) Set(h http.Handler) {
104         wh.h = h
105         wh.wg.Done()
106 }
107
108 func (wh *waitHandler) ServeHTTP(w http.ResponseWriter, req *http.Request) {
109         wh.wg.Wait()
110         wh.h.ServeHTTP(w, req)
111 }
112
113 func rpcInit(h *bc.BlockchainReactor, config *cfg.Config) {
114         // The waitHandler accepts incoming requests, but blocks until its underlying
115         // handler is set, when the second phase is complete.
116         var coreHandler waitHandler
117         coreHandler.wg.Add(1)
118         mux := http.NewServeMux()
119         mux.Handle("/", &coreHandler)
120
121         var handler http.Handler = mux
122         handler = RedirectHandler(handler)
123
124         secureheader.DefaultConfig.PermitClearLoopback = true
125         secureheader.DefaultConfig.HTTPSRedirect = false
126         secureheader.DefaultConfig.Next = handler
127
128         server := &http.Server{
129                 // Note: we should not set TLSConfig here;
130                 // we took care of TLS with the listener in maybeUseTLS.
131                 Handler:      secureheader.DefaultConfig,
132                 ReadTimeout:  httpReadTimeout,
133                 WriteTimeout: httpWriteTimeout,
134                 // Disable HTTP/2 for now until the Go implementation is more stable.
135                 // https://github.com/golang/go/issues/16450
136                 // https://github.com/golang/go/issues/17071
137                 TLSNextProto: map[string]func(*http.Server, *tls.Conn, http.Handler){},
138         }
139         listenAddr := env.String("LISTEN", config.ApiAddress)
140         listener, _ := net.Listen("tcp", *listenAddr)
141
142         // The `Serve` call has to happen in its own goroutine because
143         // it's blocking and we need to proceed to the rest of the core setup after
144         // we call it.
145         go func() {
146                 err := server.Serve(listener)
147                 log.WithField("error", errors.Wrap(err, "Serve")).Error("Rpc server")
148         }()
149         coreHandler.Set(h)
150 }
151
152 func NewNode(config *cfg.Config) *Node {
153         ctx := context.Background()
154
155         // Get store
156         txDB := dbm.NewDB("txdb", config.DBBackend, config.DBDir())
157         store := txdb.NewStore(txDB)
158
159         privKey := crypto.GenPrivKeyEd25519()
160
161         // Make event switch
162         eventSwitch := types.NewEventSwitch()
163         _, err := eventSwitch.Start()
164         if err != nil {
165                 cmn.Exit(cmn.Fmt("Failed to start switch: %v", err))
166         }
167
168         sw := p2p.NewSwitch(config.P2P)
169
170         fastSync := config.FastSync
171
172         genesisBlock := &legacy.Block{
173                 BlockHeader:  legacy.BlockHeader{},
174                 Transactions: []*legacy.Tx{},
175         }
176         genesisBlock.UnmarshalText(consensus.InitBlock())
177
178         txPool := protocol.NewTxPool()
179         chain, err := protocol.NewChain(ctx, genesisBlock.Hash(), store, txPool, nil)
180
181         if store.Height() < 1 {
182                 if err := chain.AddBlock(nil, genesisBlock); err != nil {
183                         cmn.Exit(cmn.Fmt("Failed to add genesisBlock to Chain: %v", err))
184                 }
185         }
186
187         var accounts *account.Manager = nil
188         var assets *asset.Registry = nil
189         var pinStore *pin.Store = nil
190
191         if config.Wallet.Enable {
192                 accountsDB := dbm.NewDB("account", config.DBBackend, config.DBDir())
193                 accUTXODB := dbm.NewDB("accountutxos", config.DBBackend, config.DBDir())
194                 pinStore = pin.NewStore(accUTXODB)
195                 err = pinStore.LoadAll(ctx)
196                 if err != nil {
197                         log.WithField("error", err).Error("load pin store")
198                         return nil
199                 }
200
201                 pinHeight := store.Height()
202                 if pinHeight > 0 {
203                         pinHeight = pinHeight - 1
204                 }
205
206                 pins := []string{account.PinName, account.DeleteSpentsPinName}
207                 for _, p := range pins {
208                         err = pinStore.CreatePin(ctx, p, pinHeight)
209                         if err != nil {
210                                 log.WithField("error", err).Error("Create pin")
211                         }
212                 }
213
214                 accounts = account.NewManager(accountsDB, chain, pinStore)
215                 go accounts.ProcessBlocks(ctx)
216
217                 assetsDB := dbm.NewDB("asset", config.DBBackend, config.DBDir())
218                 assets = asset.NewRegistry(assetsDB, chain)
219         }
220         //Todo HSM
221         /*
222                 if config.HsmUrl != ""{
223                         // todo remoteHSM
224                         cmn.Exit(cmn.Fmt("not implement"))
225                 } else {
226                         hsm, err = pseudohsm.New(config.KeysDir())
227                         if err != nil {
228                                 cmn.Exit(cmn.Fmt("initialize HSM failed: %v", err))
229                         }
230                 }*/
231
232         hsm, err := pseudohsm.New(config.KeysDir())
233         if err != nil {
234                 cmn.Exit(cmn.Fmt("initialize HSM failed: %v", err))
235         }
236         bcReactor := bc.NewBlockchainReactor(
237                 store,
238                 chain,
239                 txPool,
240                 accounts,
241                 assets,
242                 sw,
243                 hsm,
244                 fastSync,
245                 pinStore)
246
247         sw.AddReactor("BLOCKCHAIN", bcReactor)
248
249         rpcInit(bcReactor, config)
250         // Optionally, start the pex reactor
251         var addrBook *p2p.AddrBook
252         if config.P2P.PexReactor {
253                 addrBook = p2p.NewAddrBook(config.P2P.AddrBookFile(), config.P2P.AddrBookStrict)
254                 pexReactor := p2p.NewPEXReactor(addrBook)
255                 sw.AddReactor("PEX", pexReactor)
256         }
257
258         // add the event switch to all services
259         // they should all satisfy events.Eventable
260         //SetEventSwitch(eventSwitch, bcReactor, mempoolReactor, consensusReactor)
261
262         // run the profile server
263         profileHost := config.ProfListenAddress
264         if profileHost != "" {
265
266                 go func() {
267                         log.WithField("error", http.ListenAndServe(profileHost, nil)).Error("Profile server")
268                 }()
269         }
270
271         node := &Node{
272                 config: config,
273
274                 privKey:  privKey,
275                 sw:       sw,
276                 addrBook: addrBook,
277
278                 evsw:       eventSwitch,
279                 bcReactor:  bcReactor,
280                 blockStore: store,
281                 accounts:   accounts,
282                 assets:     assets,
283         }
284         node.BaseService = *cmn.NewBaseService(nil, "Node", node)
285         return node
286 }
287
288 func (n *Node) OnStart() error {
289         // Create & add listener
290         protocol, address := ProtocolAndAddress(n.config.P2P.ListenAddress)
291         l := p2p.NewDefaultListener(protocol, address, n.config.P2P.SkipUPNP, nil)
292         n.sw.AddListener(l)
293
294         // Start the switch
295         n.sw.SetNodeInfo(n.makeNodeInfo())
296         n.sw.SetNodePrivKey(n.privKey)
297         _, err := n.sw.Start()
298         if err != nil {
299                 return err
300         }
301
302         // If seeds exist, add them to the address book and dial out
303         if n.config.P2P.Seeds != "" {
304                 // dial out
305                 seeds := strings.Split(n.config.P2P.Seeds, ",")
306                 if err := n.DialSeeds(seeds); err != nil {
307                         return err
308                 }
309         }
310         return nil
311 }
312
313 func (n *Node) OnStop() {
314         n.BaseService.OnStop()
315
316         log.Info("Stopping Node")
317         // TODO: gracefully disconnect from peers.
318         n.sw.Stop()
319
320 }
321
322 func (n *Node) RunForever() {
323         // Sleep forever and then...
324         cmn.TrapSignal(func() {
325                 n.Stop()
326         })
327 }
328
329 // Add the event switch to reactors, mempool, etc.
330 func SetEventSwitch(evsw types.EventSwitch, eventables ...types.Eventable) {
331         for _, e := range eventables {
332                 e.SetEventSwitch(evsw)
333         }
334 }
335
336 // Add a Listener to accept inbound peer connections.
337 // Add listeners before starting the Node.
338 // The first listener is the primary listener (in NodeInfo)
339 func (n *Node) AddListener(l p2p.Listener) {
340         n.sw.AddListener(l)
341 }
342
343 func (n *Node) Switch() *p2p.Switch {
344         return n.sw
345 }
346
347 func (n *Node) EventSwitch() types.EventSwitch {
348         return n.evsw
349 }
350
351 func (n *Node) makeNodeInfo() *p2p.NodeInfo {
352         nodeInfo := &p2p.NodeInfo{
353                 PubKey:  n.privKey.PubKey().Unwrap().(crypto.PubKeyEd25519),
354                 Moniker: n.config.Moniker,
355                 Network: "bytom",
356                 Version: version.Version,
357                 Other: []string{
358                         cmn.Fmt("wire_version=%v", wire.Version),
359                         cmn.Fmt("p2p_version=%v", p2p.Version),
360                 },
361         }
362
363         if !n.sw.IsListening() {
364                 return nodeInfo
365         }
366
367         p2pListener := n.sw.Listeners()[0]
368         p2pHost := p2pListener.ExternalAddress().IP.String()
369         p2pPort := p2pListener.ExternalAddress().Port
370         //rpcListenAddr := n.config.RPC.ListenAddress
371
372         // We assume that the rpcListener has the same ExternalAddress.
373         // This is probably true because both P2P and RPC listeners use UPnP,
374         // except of course if the rpc is only bound to localhost
375         nodeInfo.ListenAddr = cmn.Fmt("%v:%v", p2pHost, p2pPort)
376         //nodeInfo.Other = append(nodeInfo.Other, cmn.Fmt("rpc_addr=%v", rpcListenAddr))
377         return nodeInfo
378 }
379
380 //------------------------------------------------------------------------------
381
382 func (n *Node) NodeInfo() *p2p.NodeInfo {
383         return n.sw.NodeInfo()
384 }
385
386 func (n *Node) DialSeeds(seeds []string) error {
387         return n.sw.DialSeeds(n.addrBook, seeds)
388 }
389
390 // Defaults to tcp
391 func ProtocolAndAddress(listenAddr string) (string, string) {
392         protocol, address := "tcp", listenAddr
393         parts := strings.SplitN(address, "://", 2)
394         if len(parts) == 2 {
395                 protocol, address = parts[0], parts[1]
396         }
397         return protocol, address
398 }
399
400 //------------------------------------------------------------------------------