OSDN Git Service

Add rpc token authenticate function (#135)
[bytom/bytom.git] / node / node.go
1 package node
2
3 import (
4         "context"
5         "crypto/tls"
6         "net"
7         "net/http"
8         _ "net/http/pprof"
9         "strings"
10         "sync"
11         "time"
12
13         "github.com/kr/secureheader"
14         log "github.com/sirupsen/logrus"
15         crypto "github.com/tendermint/go-crypto"
16         wire "github.com/tendermint/go-wire"
17         cmn "github.com/tendermint/tmlibs/common"
18         dbm "github.com/tendermint/tmlibs/db"
19
20         bc "github.com/bytom/blockchain"
21         "github.com/bytom/blockchain/accesstoken"
22         "github.com/bytom/blockchain/account"
23         "github.com/bytom/blockchain/asset"
24         "github.com/bytom/blockchain/pseudohsm"
25         "github.com/bytom/blockchain/txdb"
26         "github.com/bytom/blockchain/txfeed"
27         w "github.com/bytom/blockchain/wallet"
28         cfg "github.com/bytom/config"
29         "github.com/bytom/env"
30         "github.com/bytom/errors"
31         "github.com/bytom/net/http/authn"
32         "github.com/bytom/p2p"
33         "github.com/bytom/protocol"
34         "github.com/bytom/types"
35         "github.com/bytom/version"
36 )
37
38 const (
39         httpReadTimeout  = 2 * time.Minute
40         httpWriteTimeout = time.Hour
41 )
42
43 type Node struct {
44         cmn.BaseService
45
46         // config
47         config *cfg.Config
48
49         // network
50         privKey  crypto.PrivKeyEd25519 // local node's p2p key
51         sw       *p2p.Switch           // p2p connections
52         addrBook *p2p.AddrBook         // known peers
53
54         evsw       types.EventSwitch // pub/sub for services
55         blockStore *txdb.Store
56         bcReactor  *bc.BlockchainReactor
57         accounts   *account.Manager
58         assets     *asset.Registry
59 }
60
61 func NewNodeDefault(config *cfg.Config) *Node {
62         return NewNode(config)
63 }
64
65 func RedirectHandler(next http.Handler) http.Handler {
66         return http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
67                 if req.URL.Path == "/" {
68                         http.Redirect(w, req, "/dashboard/", http.StatusFound)
69                         return
70                 }
71                 next.ServeHTTP(w, req)
72         })
73 }
74
75 type waitHandler struct {
76         h  http.Handler
77         wg sync.WaitGroup
78 }
79
80 func (wh *waitHandler) Set(h http.Handler) {
81         wh.h = h
82         wh.wg.Done()
83 }
84
85 func (wh *waitHandler) ServeHTTP(w http.ResponseWriter, req *http.Request) {
86         wh.wg.Wait()
87         wh.h.ServeHTTP(w, req)
88 }
89
90 func AuthHandler(handler http.Handler, accessTokens *accesstoken.CredentialStore) http.Handler {
91
92         authenticator := authn.NewAPI(accessTokens)
93
94         return http.HandlerFunc(func(rw http.ResponseWriter, req *http.Request) {
95                 // TODO(tessr): check that this path exists; return early if this path isn't legit
96                 req, err := authenticator.Authenticate(req)
97                 if err != nil {
98                         log.WithField("error", errors.Wrap(err, "Serve")).Error("Authenticate fail")
99
100                         return
101                 }
102                 handler.ServeHTTP(rw, req)
103         })
104 }
105
106 func rpcInit(h *bc.BlockchainReactor, config *cfg.Config, accessTokens *accesstoken.CredentialStore) {
107         // The waitHandler accepts incoming requests, but blocks until its underlying
108         // handler is set, when the second phase is complete.
109         var coreHandler waitHandler
110         coreHandler.wg.Add(1)
111         mux := http.NewServeMux()
112         mux.Handle("/", &coreHandler)
113
114         var handler http.Handler = mux
115         handler = AuthHandler(handler, accessTokens)
116         handler = RedirectHandler(handler)
117
118         secureheader.DefaultConfig.PermitClearLoopback = true
119         secureheader.DefaultConfig.HTTPSRedirect = false
120         secureheader.DefaultConfig.Next = handler
121
122         server := &http.Server{
123                 // Note: we should not set TLSConfig here;
124                 // we took care of TLS with the listener in maybeUseTLS.
125                 Handler:      secureheader.DefaultConfig,
126                 ReadTimeout:  httpReadTimeout,
127                 WriteTimeout: httpWriteTimeout,
128                 // Disable HTTP/2 for now until the Go implementation is more stable.
129                 // https://github.com/golang/go/issues/16450
130                 // https://github.com/golang/go/issues/17071
131                 TLSNextProto: map[string]func(*http.Server, *tls.Conn, http.Handler){},
132         }
133         listenAddr := env.String("LISTEN", config.ApiAddress)
134         listener, err := net.Listen("tcp", *listenAddr)
135         if err != nil {
136                 cmn.Exit(cmn.Fmt("Failed to register tcp port: %v", err))
137         }
138
139         // The `Serve` call has to happen in its own goroutine because
140         // it's blocking and we need to proceed to the rest of the core setup after
141         // we call it.
142         go func() {
143                 if err := server.Serve(listener); err != nil {
144                         log.WithField("error", errors.Wrap(err, "Serve")).Error("Rpc server")
145                 }
146         }()
147         coreHandler.Set(h)
148 }
149
150 func NewNode(config *cfg.Config) *Node {
151         ctx := context.Background()
152
153         // Get store
154         txDB := dbm.NewDB("txdb", config.DBBackend, config.DBDir())
155         store := txdb.NewStore(txDB)
156
157         tokenDB := dbm.NewDB("accesstoken", config.DBBackend, config.DBDir())
158         accessTokens := accesstoken.NewStore(tokenDB)
159
160         privKey := crypto.GenPrivKeyEd25519()
161
162         // Make event switch
163         eventSwitch := types.NewEventSwitch()
164         _, err := eventSwitch.Start()
165         if err != nil {
166                 cmn.Exit(cmn.Fmt("Failed to start switch: %v", err))
167         }
168
169         sw := p2p.NewSwitch(config.P2P)
170
171         genesisBlock := cfg.GenerateGenesisBlock()
172
173         txPool := protocol.NewTxPool()
174         chain, err := protocol.NewChain(genesisBlock.Hash(), store, txPool)
175         if err != nil {
176                 cmn.Exit(cmn.Fmt("Failed to create chain structure: %v", err))
177         }
178
179         if chain.Height() == 0 {
180                 if err := chain.SaveBlock(genesisBlock); err != nil {
181                         cmn.Exit(cmn.Fmt("Failed to save genesisBlock to store: %v", err))
182                 }
183                 if err := chain.ConnectBlock(genesisBlock); err != nil {
184                         cmn.Exit(cmn.Fmt("Failed to connect genesisBlock to chain: %v", err))
185                 }
186         }
187
188         var accounts *account.Manager = nil
189         var assets *asset.Registry = nil
190         var wallet *w.Wallet = nil
191         var txFeed *txfeed.Tracker = nil
192
193         if config.Wallet.Enable {
194                 accountsDB := dbm.NewDB("account", config.DBBackend, config.DBDir())
195                 assetsDB := dbm.NewDB("asset", config.DBBackend, config.DBDir())
196                 walletDB := dbm.NewDB("wallet", config.DBBackend, config.DBDir())
197
198                 accounts = account.NewManager(accountsDB, walletDB, w.GetWalletHeight, chain)
199                 assets = asset.NewRegistry(assetsDB, chain)
200
201                 wallet = w.InitWallet(walletDB, accounts, assets)
202                 wallet.Ind.RegisterAnnotator(accounts.AnnotateTxs)
203                 wallet.Ind.RegisterAnnotator(assets.AnnotateTxs)
204
205                 go wallet.WalletUpdate(chain)
206
207                 txFeedDB := dbm.NewDB("txfeeds", config.DBBackend, config.DBDir())
208                 txFeed = txfeed.NewTracker(txFeedDB, chain)
209
210                 if err = txFeed.Prepare(ctx); err != nil {
211                         log.WithField("error", err).Error("start txfeed")
212                         return nil
213                 }
214
215         }
216         //Todo HSM
217         /*
218                 if config.HsmUrl != ""{
219                         // todo remoteHSM
220                         cmn.Exit(cmn.Fmt("not implement"))
221                 } else {
222                         hsm, err = pseudohsm.New(config.KeysDir())
223                         if err != nil {
224                                 cmn.Exit(cmn.Fmt("initialize HSM failed: %v", err))
225                         }
226                 }*/
227
228         hsm, err := pseudohsm.New(config.KeysDir())
229         if err != nil {
230                 cmn.Exit(cmn.Fmt("initialize HSM failed: %v", err))
231         }
232         bcReactor := bc.NewBlockchainReactor(chain, txPool, accounts, assets, sw, hsm, wallet, txFeed, accessTokens)
233
234         sw.AddReactor("BLOCKCHAIN", bcReactor)
235
236         rpcInit(bcReactor, config, accessTokens)
237         // Optionally, start the pex reactor
238         var addrBook *p2p.AddrBook
239         if config.P2P.PexReactor {
240                 addrBook = p2p.NewAddrBook(config.P2P.AddrBookFile(), config.P2P.AddrBookStrict)
241                 pexReactor := p2p.NewPEXReactor(addrBook)
242                 sw.AddReactor("PEX", pexReactor)
243         }
244
245         // run the profile server
246         profileHost := config.ProfListenAddress
247         if profileHost != "" {
248                 // Profiling bytomd programs.see (https://blog.golang.org/profiling-go-programs)
249                 // go tool pprof http://profileHose/debug/pprof/heap
250                 go func() {
251                         http.ListenAndServe(profileHost, nil)
252                 }()
253         }
254
255         node := &Node{
256                 config: config,
257
258                 privKey:  privKey,
259                 sw:       sw,
260                 addrBook: addrBook,
261
262                 evsw:       eventSwitch,
263                 bcReactor:  bcReactor,
264                 blockStore: store,
265                 accounts:   accounts,
266                 assets:     assets,
267         }
268         node.BaseService = *cmn.NewBaseService(nil, "Node", node)
269         return node
270 }
271
272 func (n *Node) OnStart() error {
273         // Create & add listener
274         protocol, address := ProtocolAndAddress(n.config.P2P.ListenAddress)
275         l := p2p.NewDefaultListener(protocol, address, n.config.P2P.SkipUPNP, nil)
276         n.sw.AddListener(l)
277
278         // Start the switch
279         n.sw.SetNodeInfo(n.makeNodeInfo())
280         n.sw.SetNodePrivKey(n.privKey)
281         _, err := n.sw.Start()
282         if err != nil {
283                 return err
284         }
285
286         // If seeds exist, add them to the address book and dial out
287         if n.config.P2P.Seeds != "" {
288                 // dial out
289                 seeds := strings.Split(n.config.P2P.Seeds, ",")
290                 if err := n.DialSeeds(seeds); err != nil {
291                         return err
292                 }
293         }
294         return nil
295 }
296
297 func (n *Node) OnStop() {
298         n.BaseService.OnStop()
299
300         log.Info("Stopping Node")
301         // TODO: gracefully disconnect from peers.
302         n.sw.Stop()
303
304 }
305
306 func (n *Node) RunForever() {
307         // Sleep forever and then...
308         cmn.TrapSignal(func() {
309                 n.Stop()
310         })
311 }
312
313 // Add the event switch to reactors, mempool, etc.
314 func SetEventSwitch(evsw types.EventSwitch, eventables ...types.Eventable) {
315         for _, e := range eventables {
316                 e.SetEventSwitch(evsw)
317         }
318 }
319
320 // Add a Listener to accept inbound peer connections.
321 // Add listeners before starting the Node.
322 // The first listener is the primary listener (in NodeInfo)
323 func (n *Node) AddListener(l p2p.Listener) {
324         n.sw.AddListener(l)
325 }
326
327 func (n *Node) Switch() *p2p.Switch {
328         return n.sw
329 }
330
331 func (n *Node) EventSwitch() types.EventSwitch {
332         return n.evsw
333 }
334
335 func (n *Node) makeNodeInfo() *p2p.NodeInfo {
336         nodeInfo := &p2p.NodeInfo{
337                 PubKey:  n.privKey.PubKey().Unwrap().(crypto.PubKeyEd25519),
338                 Moniker: n.config.Moniker,
339                 Network: "bytom",
340                 Version: version.Version,
341                 Other: []string{
342                         cmn.Fmt("wire_version=%v", wire.Version),
343                         cmn.Fmt("p2p_version=%v", p2p.Version),
344                 },
345         }
346
347         if !n.sw.IsListening() {
348                 return nodeInfo
349         }
350
351         p2pListener := n.sw.Listeners()[0]
352         p2pHost := p2pListener.ExternalAddress().IP.String()
353         p2pPort := p2pListener.ExternalAddress().Port
354         //rpcListenAddr := n.config.RPC.ListenAddress
355
356         // We assume that the rpcListener has the same ExternalAddress.
357         // This is probably true because both P2P and RPC listeners use UPnP,
358         // except of course if the rpc is only bound to localhost
359         nodeInfo.ListenAddr = cmn.Fmt("%v:%v", p2pHost, p2pPort)
360         //nodeInfo.Other = append(nodeInfo.Other, cmn.Fmt("rpc_addr=%v", rpcListenAddr))
361         return nodeInfo
362 }
363
364 //------------------------------------------------------------------------------
365
366 func (n *Node) NodeInfo() *p2p.NodeInfo {
367         return n.sw.NodeInfo()
368 }
369
370 func (n *Node) DialSeeds(seeds []string) error {
371         return n.sw.DialSeeds(n.addrBook, seeds)
372 }
373
374 // Defaults to tcp
375 func ProtocolAndAddress(listenAddr string) (string, string) {
376         protocol, address := "tcp", listenAddr
377         parts := strings.SplitN(address, "://", 2)
378         if len(parts) == 2 {
379                 protocol, address = parts[0], parts[1]
380         }
381         return protocol, address
382 }
383
384 //------------------------------------------------------------------------------