OSDN Git Service

Add p2p network manager function (#397)
[bytom/bytom.git] / node / node.go
1 package node
2
3 import (
4         "context"
5         "crypto/tls"
6         "net"
7         "net/http"
8         _ "net/http/pprof"
9         "strings"
10         "sync"
11         "time"
12
13         "github.com/kr/secureheader"
14         log "github.com/sirupsen/logrus"
15         crypto "github.com/tendermint/go-crypto"
16         wire "github.com/tendermint/go-wire"
17         cmn "github.com/tendermint/tmlibs/common"
18         dbm "github.com/tendermint/tmlibs/db"
19
20         bc "github.com/bytom/blockchain"
21         "github.com/bytom/blockchain/accesstoken"
22         "github.com/bytom/blockchain/account"
23         "github.com/bytom/blockchain/asset"
24         "github.com/bytom/blockchain/pseudohsm"
25         "github.com/bytom/blockchain/txdb"
26         "github.com/bytom/blockchain/txfeed"
27         w "github.com/bytom/blockchain/wallet"
28         cfg "github.com/bytom/config"
29         "github.com/bytom/env"
30         "github.com/bytom/errors"
31         "github.com/bytom/p2p"
32         "github.com/bytom/protocol"
33         "github.com/bytom/types"
34         "github.com/bytom/util/browser"
35         "github.com/bytom/version"
36 )
37
38 const (
39         httpReadTimeout          = 2 * time.Minute
40         httpWriteTimeout         = time.Hour
41         webAddress               = "http://127.0.0.1:9888"
42         expireReservationsPeriod = time.Second
43 )
44
45 type Node struct {
46         cmn.BaseService
47
48         // config
49         config *cfg.Config
50
51         // network
52         privKey  crypto.PrivKeyEd25519 // local node's p2p key
53         sw       *p2p.Switch           // p2p connections
54         addrBook *p2p.AddrBook         // known peers
55
56         evsw       types.EventSwitch // pub/sub for services
57         blockStore *txdb.Store
58         bcReactor  *bc.BlockchainReactor
59         accounts   *account.Manager
60         assets     *asset.Registry
61 }
62
63 func NewNodeDefault(config *cfg.Config) *Node {
64         return NewNode(config)
65 }
66
67 func RedirectHandler(next http.Handler) http.Handler {
68         return http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
69                 if req.URL.Path == "/" {
70                         http.Redirect(w, req, "/dashboard/", http.StatusFound)
71                         return
72                 }
73                 next.ServeHTTP(w, req)
74         })
75 }
76
77 type waitHandler struct {
78         h  http.Handler
79         wg sync.WaitGroup
80 }
81
82 func (wh *waitHandler) Set(h http.Handler) {
83         wh.h = h
84         wh.wg.Done()
85 }
86
87 func (wh *waitHandler) ServeHTTP(w http.ResponseWriter, req *http.Request) {
88         wh.wg.Wait()
89         wh.h.ServeHTTP(w, req)
90 }
91
92 func rpcInit(h *bc.BlockchainReactor, config *cfg.Config, accessTokens *accesstoken.CredentialStore) {
93         // The waitHandler accepts incoming requests, but blocks until its underlying
94         // handler is set, when the second phase is complete.
95         var coreHandler waitHandler
96         coreHandler.wg.Add(1)
97         mux := http.NewServeMux()
98         mux.Handle("/", &coreHandler)
99
100         var handler http.Handler = mux
101
102         if config.Auth.Disable == false {
103                 handler = bc.AuthHandler(handler, accessTokens)
104         }
105         handler = RedirectHandler(handler)
106
107         secureheader.DefaultConfig.PermitClearLoopback = true
108         secureheader.DefaultConfig.HTTPSRedirect = false
109         secureheader.DefaultConfig.Next = handler
110
111         server := &http.Server{
112                 // Note: we should not set TLSConfig here;
113                 // we took care of TLS with the listener in maybeUseTLS.
114                 Handler:      secureheader.DefaultConfig,
115                 ReadTimeout:  httpReadTimeout,
116                 WriteTimeout: httpWriteTimeout,
117                 // Disable HTTP/2 for now until the Go implementation is more stable.
118                 // https://github.com/golang/go/issues/16450
119                 // https://github.com/golang/go/issues/17071
120                 TLSNextProto: map[string]func(*http.Server, *tls.Conn, http.Handler){},
121         }
122         listenAddr := env.String("LISTEN", config.ApiAddress)
123         log.WithField("api address:", config.ApiAddress).Info("Rpc listen")
124         listener, err := net.Listen("tcp", *listenAddr)
125         if err != nil {
126                 cmn.Exit(cmn.Fmt("Failed to register tcp port: %v", err))
127         }
128
129         // The `Serve` call has to happen in its own goroutine because
130         // it's blocking and we need to proceed to the rest of the core setup after
131         // we call it.
132         go func() {
133                 if err := server.Serve(listener); err != nil {
134                         log.WithField("error", errors.Wrap(err, "Serve")).Error("Rpc server")
135                 }
136         }()
137         coreHandler.Set(h)
138 }
139
140 func NewNode(config *cfg.Config) *Node {
141         ctx := context.Background()
142
143         // Get store
144         txDB := dbm.NewDB("txdb", config.DBBackend, config.DBDir())
145         store := txdb.NewStore(txDB)
146
147         tokenDB := dbm.NewDB("accesstoken", config.DBBackend, config.DBDir())
148         accessTokens := accesstoken.NewStore(tokenDB)
149
150         privKey := crypto.GenPrivKeyEd25519()
151
152         // Make event switch
153         eventSwitch := types.NewEventSwitch()
154         _, err := eventSwitch.Start()
155         if err != nil {
156                 cmn.Exit(cmn.Fmt("Failed to start switch: %v", err))
157         }
158
159         trustHistoryDB := dbm.NewDB("trusthistory", config.DBBackend, config.DBDir())
160
161         sw := p2p.NewSwitch(config.P2P, trustHistoryDB)
162
163         genesisBlock := cfg.GenerateGenesisBlock()
164
165         txPool := protocol.NewTxPool()
166         chain, err := protocol.NewChain(genesisBlock.Hash(), store, txPool)
167         if err != nil {
168                 cmn.Exit(cmn.Fmt("Failed to create chain structure: %v", err))
169         }
170
171         if chain.BestBlockHash() == nil {
172                 if err := chain.SaveBlock(genesisBlock); err != nil {
173                         cmn.Exit(cmn.Fmt("Failed to save genesisBlock to store: %v", err))
174                 }
175                 if err := chain.ConnectBlock(genesisBlock); err != nil {
176                         cmn.Exit(cmn.Fmt("Failed to connect genesisBlock to chain: %v", err))
177                 }
178         }
179
180         var accounts *account.Manager = nil
181         var assets *asset.Registry = nil
182         var wallet *w.Wallet = nil
183         var txFeed *txfeed.Tracker = nil
184
185         txFeedDB := dbm.NewDB("txfeeds", config.DBBackend, config.DBDir())
186         txFeed = txfeed.NewTracker(txFeedDB, chain)
187
188         if err = txFeed.Prepare(ctx); err != nil {
189                 log.WithField("error", err).Error("start txfeed")
190                 return nil
191         }
192
193         hsm, err := pseudohsm.New(config.KeysDir())
194         if err != nil {
195                 cmn.Exit(cmn.Fmt("initialize HSM failed: %v", err))
196         }
197
198         if !config.Wallet.Disable {
199                 xpubs, _ := hsm.ListKeys()
200                 walletDB := dbm.NewDB("wallet", config.DBBackend, config.DBDir())
201                 accounts = account.NewManager(walletDB, chain)
202                 assets = asset.NewRegistry(walletDB, chain)
203                 wallet, err = w.NewWallet(walletDB, accounts, assets, chain, xpubs)
204                 if err != nil {
205                         log.WithField("error", err).Error("init NewWallet")
206                 }
207                 // Clean up expired UTXO reservations periodically.
208                 go accounts.ExpireReservations(ctx, expireReservationsPeriod)
209         }
210
211         bcReactor := bc.NewBlockchainReactor(chain, txPool, accounts, assets, sw, hsm, wallet, txFeed, accessTokens, config.Mining)
212
213         sw.AddReactor("BLOCKCHAIN", bcReactor)
214
215         rpcInit(bcReactor, config, accessTokens)
216         // Optionally, start the pex reactor
217         var addrBook *p2p.AddrBook
218         if config.P2P.PexReactor {
219                 addrBook = p2p.NewAddrBook(config.P2P.AddrBookFile(), config.P2P.AddrBookStrict)
220                 pexReactor := p2p.NewPEXReactor(addrBook)
221                 sw.AddReactor("PEX", pexReactor)
222         }
223
224         // run the profile server
225         profileHost := config.ProfListenAddress
226         if profileHost != "" {
227                 // Profiling bytomd programs.see (https://blog.golang.org/profiling-go-programs)
228                 // go tool pprof http://profileHose/debug/pprof/heap
229                 go func() {
230                         http.ListenAndServe(profileHost, nil)
231                 }()
232         }
233
234         node := &Node{
235                 config: config,
236
237                 privKey:  privKey,
238                 sw:       sw,
239                 addrBook: addrBook,
240
241                 evsw:       eventSwitch,
242                 bcReactor:  bcReactor,
243                 blockStore: store,
244                 accounts:   accounts,
245                 assets:     assets,
246         }
247         node.BaseService = *cmn.NewBaseService(nil, "Node", node)
248
249         return node
250 }
251
252 // Lanch web broser or not
253 func lanchWebBroser(lanch bool) {
254         if lanch {
255                 log.Info("Launching System Browser with :", webAddress)
256                 if err := browser.Open(webAddress); err != nil {
257                         log.Error(err.Error())
258                         return
259                 }
260         }
261 }
262
263 func (n *Node) OnStart() error {
264         // Create & add listener
265         p, address := ProtocolAndAddress(n.config.P2P.ListenAddress)
266         l := p2p.NewDefaultListener(p, address, n.config.P2P.SkipUPNP, nil)
267         n.sw.AddListener(l)
268
269         // Start the switch
270         n.sw.SetNodeInfo(n.makeNodeInfo())
271         n.sw.SetNodePrivKey(n.privKey)
272         _, err := n.sw.Start()
273         if err != nil {
274                 return err
275         }
276
277         // If seeds exist, add them to the address book and dial out
278         if n.config.P2P.Seeds != "" {
279                 // dial out
280                 seeds := strings.Split(n.config.P2P.Seeds, ",")
281                 if err := n.DialSeeds(seeds); err != nil {
282                         return err
283                 }
284         }
285         lanchWebBroser(!n.config.Web.Closed)
286         return nil
287 }
288
289 func (n *Node) OnStop() {
290         n.BaseService.OnStop()
291
292         log.Info("Stopping Node")
293         // TODO: gracefully disconnect from peers.
294         n.sw.Stop()
295
296 }
297
298 func (n *Node) RunForever() {
299         // Sleep forever and then...
300         cmn.TrapSignal(func() {
301                 n.Stop()
302         })
303 }
304
305 // Add a Listener to accept inbound peer connections.
306 // Add listeners before starting the Node.
307 // The first listener is the primary listener (in NodeInfo)
308 func (n *Node) AddListener(l p2p.Listener) {
309         n.sw.AddListener(l)
310 }
311
312 func (n *Node) Switch() *p2p.Switch {
313         return n.sw
314 }
315
316 func (n *Node) EventSwitch() types.EventSwitch {
317         return n.evsw
318 }
319
320 func (n *Node) makeNodeInfo() *p2p.NodeInfo {
321         nodeInfo := &p2p.NodeInfo{
322                 PubKey:  n.privKey.PubKey().Unwrap().(crypto.PubKeyEd25519),
323                 Moniker: n.config.Moniker,
324                 Network: "bytom",
325                 Version: version.Version,
326                 Other: []string{
327                         cmn.Fmt("wire_version=%v", wire.Version),
328                         cmn.Fmt("p2p_version=%v", p2p.Version),
329                 },
330         }
331
332         if !n.sw.IsListening() {
333                 return nodeInfo
334         }
335
336         p2pListener := n.sw.Listeners()[0]
337         p2pHost := p2pListener.ExternalAddress().IP.String()
338         p2pPort := p2pListener.ExternalAddress().Port
339         //rpcListenAddr := n.config.RPC.ListenAddress
340
341         // We assume that the rpcListener has the same ExternalAddress.
342         // This is probably true because both P2P and RPC listeners use UPnP,
343         // except of course if the rpc is only bound to localhost
344         nodeInfo.ListenAddr = cmn.Fmt("%v:%v", p2pHost, p2pPort)
345         //nodeInfo.Other = append(nodeInfo.Other, cmn.Fmt("rpc_addr=%v", rpcListenAddr))
346         return nodeInfo
347 }
348
349 //------------------------------------------------------------------------------
350
351 func (n *Node) NodeInfo() *p2p.NodeInfo {
352         return n.sw.NodeInfo()
353 }
354
355 func (n *Node) DialSeeds(seeds []string) error {
356         return n.sw.DialSeeds(n.addrBook, seeds)
357 }
358
359 // Defaults to tcp
360 func ProtocolAndAddress(listenAddr string) (string, string) {
361         p, address := "tcp", listenAddr
362         parts := strings.SplitN(address, "://", 2)
363         if len(parts) == 2 {
364                 p, address = parts[0], parts[1]
365         }
366         return p, address
367 }
368
369 //------------------------------------------------------------------------------