OSDN Git Service

Merge pull request #89 from Bytom/dev
[bytom/bytom.git] / node / node.go
1 package node
2
3 import (
4         "context"
5         "crypto/tls"
6         "net"
7         "net/http"
8         _ "net/http/pprof"
9         "os"
10         "strings"
11         "sync"
12         "time"
13
14         "github.com/kr/secureheader"
15         log "github.com/sirupsen/logrus"
16         crypto "github.com/tendermint/go-crypto"
17         wire "github.com/tendermint/go-wire"
18         cmn "github.com/tendermint/tmlibs/common"
19         dbm "github.com/tendermint/tmlibs/db"
20
21         bc "github.com/bytom/blockchain"
22         "github.com/bytom/blockchain/account"
23         "github.com/bytom/blockchain/asset"
24         "github.com/bytom/blockchain/pin"
25         "github.com/bytom/blockchain/pseudohsm"
26         "github.com/bytom/blockchain/txdb"
27         cfg "github.com/bytom/config"
28         "github.com/bytom/consensus"
29         "github.com/bytom/env"
30         "github.com/bytom/errors"
31         "github.com/bytom/p2p"
32         "github.com/bytom/protocol"
33         "github.com/bytom/protocol/bc/legacy"
34         "github.com/bytom/types"
35         "github.com/bytom/version"
36 )
37
38 const (
39         httpReadTimeout  = 2 * time.Minute
40         httpWriteTimeout = time.Hour
41 )
42
43 type Node struct {
44         cmn.BaseService
45
46         // config
47         config *cfg.Config
48
49         // network
50         privKey  crypto.PrivKeyEd25519 // local node's p2p key
51         sw       *p2p.Switch           // p2p connections
52         addrBook *p2p.AddrBook         // known peers
53
54         evsw       types.EventSwitch // pub/sub for services
55         blockStore *txdb.Store
56         bcReactor  *bc.BlockchainReactor
57         accounts   *account.Manager
58         assets     *asset.Registry
59 }
60
61 var (
62         // config vars
63         rootCAs       = env.String("ROOT_CA_CERTS", "") // file path
64         splunkAddr    = os.Getenv("SPLUNKADDR")
65         logFile       = os.Getenv("LOGFILE")
66         logSize       = env.Int("LOGSIZE", 5e6) // 5MB
67         logCount      = env.Int("LOGCOUNT", 9)
68         logQueries    = env.Bool("LOG_QUERIES", false)
69         maxDBConns    = env.Int("MAXDBCONNS", 10)           // set to 100 in prod
70         rpsToken      = env.Int("RATELIMIT_TOKEN", 0)       // reqs/sec
71         rpsRemoteAddr = env.Int("RATELIMIT_REMOTE_ADDR", 0) // reqs/sec
72         indexTxs      = env.Bool("INDEX_TRANSACTIONS", true)
73         home          = bc.HomeDirFromEnvironment()
74         bootURL       = env.String("BOOTURL", "")
75         // build vars; initialized by the linker
76         buildTag    = "?"
77         buildCommit = "?"
78         buildDate   = "?"
79         race        []interface{} // initialized in race.go
80 )
81
82 func NewNodeDefault(config *cfg.Config) *Node {
83         return NewNode(config)
84 }
85
86 func RedirectHandler(next http.Handler) http.Handler {
87         return http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
88                 if req.URL.Path == "/" {
89                         http.Redirect(w, req, "/dashboard/", http.StatusFound)
90                         return
91                 }
92                 next.ServeHTTP(w, req)
93         })
94 }
95
96 type waitHandler struct {
97         h  http.Handler
98         wg sync.WaitGroup
99 }
100
101 func (wh *waitHandler) Set(h http.Handler) {
102         wh.h = h
103         wh.wg.Done()
104 }
105
106 func (wh *waitHandler) ServeHTTP(w http.ResponseWriter, req *http.Request) {
107         wh.wg.Wait()
108         wh.h.ServeHTTP(w, req)
109 }
110
111 func rpcInit(h *bc.BlockchainReactor, config *cfg.Config) {
112         // The waitHandler accepts incoming requests, but blocks until its underlying
113         // handler is set, when the second phase is complete.
114         var coreHandler waitHandler
115         coreHandler.wg.Add(1)
116         mux := http.NewServeMux()
117         mux.Handle("/", &coreHandler)
118
119         var handler http.Handler = mux
120         handler = RedirectHandler(handler)
121
122         secureheader.DefaultConfig.PermitClearLoopback = true
123         secureheader.DefaultConfig.HTTPSRedirect = false
124         secureheader.DefaultConfig.Next = handler
125
126         server := &http.Server{
127                 // Note: we should not set TLSConfig here;
128                 // we took care of TLS with the listener in maybeUseTLS.
129                 Handler:      secureheader.DefaultConfig,
130                 ReadTimeout:  httpReadTimeout,
131                 WriteTimeout: httpWriteTimeout,
132                 // Disable HTTP/2 for now until the Go implementation is more stable.
133                 // https://github.com/golang/go/issues/16450
134                 // https://github.com/golang/go/issues/17071
135                 TLSNextProto: map[string]func(*http.Server, *tls.Conn, http.Handler){},
136         }
137         listenAddr := env.String("LISTEN", config.ApiAddress)
138         listener, err := net.Listen("tcp", *listenAddr)
139         if err != nil {
140                 cmn.Exit(cmn.Fmt("Failed to register tcp port: %v", err))
141         }
142
143         // The `Serve` call has to happen in its own goroutine because
144         // it's blocking and we need to proceed to the rest of the core setup after
145         // we call it.
146         go func() {
147                 if err := server.Serve(listener); err != nil {
148                         log.WithField("error", errors.Wrap(err, "Serve")).Error("Rpc server")
149                 }
150         }()
151         coreHandler.Set(h)
152 }
153
154 func NewNode(config *cfg.Config) *Node {
155         ctx := context.Background()
156
157         // Get store
158         txDB := dbm.NewDB("txdb", config.DBBackend, config.DBDir())
159         store := txdb.NewStore(txDB)
160
161         privKey := crypto.GenPrivKeyEd25519()
162
163         // Make event switch
164         eventSwitch := types.NewEventSwitch()
165         _, err := eventSwitch.Start()
166         if err != nil {
167                 cmn.Exit(cmn.Fmt("Failed to start switch: %v", err))
168         }
169
170         sw := p2p.NewSwitch(config.P2P)
171
172         genesisBlock := &legacy.Block{
173                 BlockHeader:  legacy.BlockHeader{},
174                 Transactions: []*legacy.Tx{},
175         }
176         genesisBlock.UnmarshalText(consensus.InitBlock())
177
178         txPool := protocol.NewTxPool()
179         chain, err := protocol.NewChain(genesisBlock.Hash(), store, txPool)
180         if err != nil {
181                 cmn.Exit(cmn.Fmt("Failed to create chain structure: %v", err))
182         }
183
184         if chain.Height() == 0 {
185                 if err := chain.SaveBlock(genesisBlock); err != nil {
186                         cmn.Exit(cmn.Fmt("Failed to save genesisBlock to store: %v", err))
187                 }
188                 if err := chain.ConnectBlock(genesisBlock); err != nil {
189                         cmn.Exit(cmn.Fmt("Failed to connect genesisBlock to chain: %v", err))
190                 }
191         }
192
193         var accounts *account.Manager = nil
194         var assets *asset.Registry = nil
195         var pinStore *pin.Store = nil
196
197         if config.Wallet.Enable {
198                 accountsDB := dbm.NewDB("account", config.DBBackend, config.DBDir())
199                 accUTXODB := dbm.NewDB("accountutxos", config.DBBackend, config.DBDir())
200                 pinStore = pin.NewStore(accUTXODB)
201                 if err = pinStore.LoadAll(ctx); err != nil {
202                         log.WithField("error", err).Error("load pin store")
203                         return nil
204                 }
205
206                 pinHeight := chain.Height()
207                 if pinHeight > 0 {
208                         pinHeight = pinHeight - 1
209                 }
210
211                 pins := []string{account.PinName, account.DeleteSpentsPinName}
212                 for _, p := range pins {
213                         if err = pinStore.CreatePin(ctx, p, pinHeight); err != nil {
214                                 log.WithField("error", err).Error("Create pin")
215                         }
216                 }
217
218                 accounts = account.NewManager(accountsDB, chain, pinStore)
219                 go accounts.ProcessBlocks(ctx)
220
221                 assetsDB := dbm.NewDB("asset", config.DBBackend, config.DBDir())
222                 assets = asset.NewRegistry(assetsDB, chain)
223         }
224         //Todo HSM
225         /*
226                 if config.HsmUrl != ""{
227                         // todo remoteHSM
228                         cmn.Exit(cmn.Fmt("not implement"))
229                 } else {
230                         hsm, err = pseudohsm.New(config.KeysDir())
231                         if err != nil {
232                                 cmn.Exit(cmn.Fmt("initialize HSM failed: %v", err))
233                         }
234                 }*/
235
236         hsm, err := pseudohsm.New(config.KeysDir())
237         if err != nil {
238                 cmn.Exit(cmn.Fmt("initialize HSM failed: %v", err))
239         }
240         bcReactor := bc.NewBlockchainReactor(chain, txPool, accounts, assets, sw, hsm, pinStore)
241
242         sw.AddReactor("BLOCKCHAIN", bcReactor)
243
244         rpcInit(bcReactor, config)
245         // Optionally, start the pex reactor
246         var addrBook *p2p.AddrBook
247         if config.P2P.PexReactor {
248                 addrBook = p2p.NewAddrBook(config.P2P.AddrBookFile(), config.P2P.AddrBookStrict)
249                 pexReactor := p2p.NewPEXReactor(addrBook)
250                 sw.AddReactor("PEX", pexReactor)
251         }
252
253         // add the event switch to all services
254         // they should all satisfy events.Eventable
255         //SetEventSwitch(eventSwitch, bcReactor, mempoolReactor, consensusReactor)
256
257         // run the profile server
258         profileHost := config.ProfListenAddress
259         if profileHost != "" {
260
261                 go func() {
262                         log.WithField("error", http.ListenAndServe(profileHost, nil)).Error("Profile server")
263                 }()
264         }
265
266         node := &Node{
267                 config: config,
268
269                 privKey:  privKey,
270                 sw:       sw,
271                 addrBook: addrBook,
272
273                 evsw:       eventSwitch,
274                 bcReactor:  bcReactor,
275                 blockStore: store,
276                 accounts:   accounts,
277                 assets:     assets,
278         }
279         node.BaseService = *cmn.NewBaseService(nil, "Node", node)
280         return node
281 }
282
283 func (n *Node) OnStart() error {
284         // Create & add listener
285         protocol, address := ProtocolAndAddress(n.config.P2P.ListenAddress)
286         l := p2p.NewDefaultListener(protocol, address, n.config.P2P.SkipUPNP, nil)
287         n.sw.AddListener(l)
288
289         // Start the switch
290         n.sw.SetNodeInfo(n.makeNodeInfo())
291         n.sw.SetNodePrivKey(n.privKey)
292         _, err := n.sw.Start()
293         if err != nil {
294                 return err
295         }
296
297         // If seeds exist, add them to the address book and dial out
298         if n.config.P2P.Seeds != "" {
299                 // dial out
300                 seeds := strings.Split(n.config.P2P.Seeds, ",")
301                 if err := n.DialSeeds(seeds); err != nil {
302                         return err
303                 }
304         }
305         return nil
306 }
307
308 func (n *Node) OnStop() {
309         n.BaseService.OnStop()
310
311         log.Info("Stopping Node")
312         // TODO: gracefully disconnect from peers.
313         n.sw.Stop()
314
315 }
316
317 func (n *Node) RunForever() {
318         // Sleep forever and then...
319         cmn.TrapSignal(func() {
320                 n.Stop()
321         })
322 }
323
324 // Add the event switch to reactors, mempool, etc.
325 func SetEventSwitch(evsw types.EventSwitch, eventables ...types.Eventable) {
326         for _, e := range eventables {
327                 e.SetEventSwitch(evsw)
328         }
329 }
330
331 // Add a Listener to accept inbound peer connections.
332 // Add listeners before starting the Node.
333 // The first listener is the primary listener (in NodeInfo)
334 func (n *Node) AddListener(l p2p.Listener) {
335         n.sw.AddListener(l)
336 }
337
338 func (n *Node) Switch() *p2p.Switch {
339         return n.sw
340 }
341
342 func (n *Node) EventSwitch() types.EventSwitch {
343         return n.evsw
344 }
345
346 func (n *Node) makeNodeInfo() *p2p.NodeInfo {
347         nodeInfo := &p2p.NodeInfo{
348                 PubKey:  n.privKey.PubKey().Unwrap().(crypto.PubKeyEd25519),
349                 Moniker: n.config.Moniker,
350                 Network: "bytom",
351                 Version: version.Version,
352                 Other: []string{
353                         cmn.Fmt("wire_version=%v", wire.Version),
354                         cmn.Fmt("p2p_version=%v", p2p.Version),
355                 },
356         }
357
358         if !n.sw.IsListening() {
359                 return nodeInfo
360         }
361
362         p2pListener := n.sw.Listeners()[0]
363         p2pHost := p2pListener.ExternalAddress().IP.String()
364         p2pPort := p2pListener.ExternalAddress().Port
365         //rpcListenAddr := n.config.RPC.ListenAddress
366
367         // We assume that the rpcListener has the same ExternalAddress.
368         // This is probably true because both P2P and RPC listeners use UPnP,
369         // except of course if the rpc is only bound to localhost
370         nodeInfo.ListenAddr = cmn.Fmt("%v:%v", p2pHost, p2pPort)
371         //nodeInfo.Other = append(nodeInfo.Other, cmn.Fmt("rpc_addr=%v", rpcListenAddr))
372         return nodeInfo
373 }
374
375 //------------------------------------------------------------------------------
376
377 func (n *Node) NodeInfo() *p2p.NodeInfo {
378         return n.sw.NodeInfo()
379 }
380
381 func (n *Node) DialSeeds(seeds []string) error {
382         return n.sw.DialSeeds(n.addrBook, seeds)
383 }
384
385 // Defaults to tcp
386 func ProtocolAndAddress(listenAddr string) (string, string) {
387         protocol, address := "tcp", listenAddr
388         parts := strings.SplitN(address, "://", 2)
389         if len(parts) == 2 {
390                 protocol, address = parts[0], parts[1]
391         }
392         return protocol, address
393 }
394
395 //------------------------------------------------------------------------------