OSDN Git Service

Add accesstoken authz error return (#266)
[bytom/bytom.git] / node / node.go
1 package node
2
3 import (
4         "context"
5         "crypto/tls"
6         "net"
7         "net/http"
8         _ "net/http/pprof"
9         "strings"
10         "sync"
11         "time"
12
13         "github.com/kr/secureheader"
14         log "github.com/sirupsen/logrus"
15         crypto "github.com/tendermint/go-crypto"
16         wire "github.com/tendermint/go-wire"
17         cmn "github.com/tendermint/tmlibs/common"
18         dbm "github.com/tendermint/tmlibs/db"
19
20         bc "github.com/bytom/blockchain"
21         "github.com/bytom/blockchain/accesstoken"
22         "github.com/bytom/blockchain/account"
23         "github.com/bytom/blockchain/asset"
24         "github.com/bytom/blockchain/pseudohsm"
25         "github.com/bytom/blockchain/txdb"
26         "github.com/bytom/blockchain/txfeed"
27         w "github.com/bytom/blockchain/wallet"
28         cfg "github.com/bytom/config"
29         "github.com/bytom/env"
30         "github.com/bytom/errors"
31         "github.com/bytom/p2p"
32         "github.com/bytom/protocol"
33         "github.com/bytom/types"
34         "github.com/bytom/util/browser"
35         "github.com/bytom/version"
36 )
37
38 const (
39         httpReadTimeout  = 2 * time.Minute
40         httpWriteTimeout = time.Hour
41         webAddress       = "http://127.0.0.1:9888"
42 )
43
44 type Node struct {
45         cmn.BaseService
46
47         // config
48         config *cfg.Config
49
50         // network
51         privKey  crypto.PrivKeyEd25519 // local node's p2p key
52         sw       *p2p.Switch           // p2p connections
53         addrBook *p2p.AddrBook         // known peers
54
55         evsw       types.EventSwitch // pub/sub for services
56         blockStore *txdb.Store
57         bcReactor  *bc.BlockchainReactor
58         accounts   *account.Manager
59         assets     *asset.Registry
60 }
61
62 func NewNodeDefault(config *cfg.Config) *Node {
63         return NewNode(config)
64 }
65
66 func RedirectHandler(next http.Handler) http.Handler {
67         return http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
68                 if req.URL.Path == "/" {
69                         http.Redirect(w, req, "/dashboard/", http.StatusFound)
70                         return
71                 }
72                 next.ServeHTTP(w, req)
73         })
74 }
75
76 type waitHandler struct {
77         h  http.Handler
78         wg sync.WaitGroup
79 }
80
81 func (wh *waitHandler) Set(h http.Handler) {
82         wh.h = h
83         wh.wg.Done()
84 }
85
86 func (wh *waitHandler) ServeHTTP(w http.ResponseWriter, req *http.Request) {
87         wh.wg.Wait()
88         wh.h.ServeHTTP(w, req)
89 }
90
91 func rpcInit(h *bc.BlockchainReactor, config *cfg.Config, accessTokens *accesstoken.CredentialStore) {
92         // The waitHandler accepts incoming requests, but blocks until its underlying
93         // handler is set, when the second phase is complete.
94         var coreHandler waitHandler
95         coreHandler.wg.Add(1)
96         mux := http.NewServeMux()
97         mux.Handle("/", &coreHandler)
98
99         var handler http.Handler = mux
100
101         if config.Auth.Disable == false {
102                 handler = bc.AuthHandler(handler, accessTokens)
103         }
104         handler = RedirectHandler(handler)
105
106         secureheader.DefaultConfig.PermitClearLoopback = true
107         secureheader.DefaultConfig.HTTPSRedirect = false
108         secureheader.DefaultConfig.Next = handler
109
110         server := &http.Server{
111                 // Note: we should not set TLSConfig here;
112                 // we took care of TLS with the listener in maybeUseTLS.
113                 Handler:      secureheader.DefaultConfig,
114                 ReadTimeout:  httpReadTimeout,
115                 WriteTimeout: httpWriteTimeout,
116                 // Disable HTTP/2 for now until the Go implementation is more stable.
117                 // https://github.com/golang/go/issues/16450
118                 // https://github.com/golang/go/issues/17071
119                 TLSNextProto: map[string]func(*http.Server, *tls.Conn, http.Handler){},
120         }
121         listenAddr := env.String("LISTEN", config.ApiAddress)
122         listener, err := net.Listen("tcp", *listenAddr)
123         if err != nil {
124                 cmn.Exit(cmn.Fmt("Failed to register tcp port: %v", err))
125         }
126
127         // The `Serve` call has to happen in its own goroutine because
128         // it's blocking and we need to proceed to the rest of the core setup after
129         // we call it.
130         go func() {
131                 if err := server.Serve(listener); err != nil {
132                         log.WithField("error", errors.Wrap(err, "Serve")).Error("Rpc server")
133                 }
134         }()
135         coreHandler.Set(h)
136 }
137
138 func NewNode(config *cfg.Config) *Node {
139         ctx := context.Background()
140
141         // Get store
142         txDB := dbm.NewDB("txdb", config.DBBackend, config.DBDir())
143         store := txdb.NewStore(txDB)
144
145         tokenDB := dbm.NewDB("accesstoken", config.DBBackend, config.DBDir())
146         accessTokens := accesstoken.NewStore(tokenDB)
147
148         privKey := crypto.GenPrivKeyEd25519()
149
150         // Make event switch
151         eventSwitch := types.NewEventSwitch()
152         _, err := eventSwitch.Start()
153         if err != nil {
154                 cmn.Exit(cmn.Fmt("Failed to start switch: %v", err))
155         }
156
157         sw := p2p.NewSwitch(config.P2P)
158
159         genesisBlock := cfg.GenerateGenesisBlock()
160
161         txPool := protocol.NewTxPool()
162         chain, err := protocol.NewChain(genesisBlock.Hash(), store, txPool)
163         if err != nil {
164                 cmn.Exit(cmn.Fmt("Failed to create chain structure: %v", err))
165         }
166
167         if chain.BestBlockHash() == nil {
168                 if err := chain.SaveBlock(genesisBlock); err != nil {
169                         cmn.Exit(cmn.Fmt("Failed to save genesisBlock to store: %v", err))
170                 }
171                 if err := chain.ConnectBlock(genesisBlock); err != nil {
172                         cmn.Exit(cmn.Fmt("Failed to connect genesisBlock to chain: %v", err))
173                 }
174         }
175
176         var accounts *account.Manager = nil
177         var assets *asset.Registry = nil
178         var wallet *w.Wallet = nil
179         var txFeed *txfeed.Tracker = nil
180
181         txFeedDB := dbm.NewDB("txfeeds", config.DBBackend, config.DBDir())
182         txFeed = txfeed.NewTracker(txFeedDB, chain)
183
184         if err = txFeed.Prepare(ctx); err != nil {
185                 log.WithField("error", err).Error("start txfeed")
186                 return nil
187         }
188
189         if !config.Wallet.Disable {
190                 walletDB := dbm.NewDB("wallet", config.DBBackend, config.DBDir())
191                 accounts = account.NewManager(walletDB, chain)
192                 assets = asset.NewRegistry(walletDB, chain)
193                 wallet, err = w.NewWallet(walletDB, accounts, assets, chain)
194                 if err != nil {
195                         log.WithField("error", err).Error("init NewWallet")
196                 }
197         }
198
199         hsm, err := pseudohsm.New(config.KeysDir())
200         if err != nil {
201                 cmn.Exit(cmn.Fmt("initialize HSM failed: %v", err))
202         }
203         bcReactor := bc.NewBlockchainReactor(chain, txPool, accounts, assets, sw, hsm, wallet, txFeed, accessTokens, config.Mining)
204
205         sw.AddReactor("BLOCKCHAIN", bcReactor)
206
207         rpcInit(bcReactor, config, accessTokens)
208         // Optionally, start the pex reactor
209         var addrBook *p2p.AddrBook
210         if config.P2P.PexReactor {
211                 addrBook = p2p.NewAddrBook(config.P2P.AddrBookFile(), config.P2P.AddrBookStrict)
212                 pexReactor := p2p.NewPEXReactor(addrBook)
213                 sw.AddReactor("PEX", pexReactor)
214         }
215
216         // run the profile server
217         profileHost := config.ProfListenAddress
218         if profileHost != "" {
219                 // Profiling bytomd programs.see (https://blog.golang.org/profiling-go-programs)
220                 // go tool pprof http://profileHose/debug/pprof/heap
221                 go func() {
222                         http.ListenAndServe(profileHost, nil)
223                 }()
224         }
225
226         node := &Node{
227                 config: config,
228
229                 privKey:  privKey,
230                 sw:       sw,
231                 addrBook: addrBook,
232
233                 evsw:       eventSwitch,
234                 bcReactor:  bcReactor,
235                 blockStore: store,
236                 accounts:   accounts,
237                 assets:     assets,
238         }
239         node.BaseService = *cmn.NewBaseService(nil, "Node", node)
240
241         return node
242 }
243
244 // Lanch web broser or not
245 func lanchWebBroser(lanch bool) {
246         if lanch {
247                 log.Info("Launching System Browser with :", webAddress)
248                 if err := browser.Open(webAddress); err != nil {
249                         log.Error(err.Error())
250                         return
251                 }
252         }
253 }
254
255 func (n *Node) OnStart() error {
256         // Create & add listener
257         p, address := ProtocolAndAddress(n.config.P2P.ListenAddress)
258         l := p2p.NewDefaultListener(p, address, n.config.P2P.SkipUPNP, nil)
259         n.sw.AddListener(l)
260
261         // Start the switch
262         n.sw.SetNodeInfo(n.makeNodeInfo())
263         n.sw.SetNodePrivKey(n.privKey)
264         _, err := n.sw.Start()
265         if err != nil {
266                 return err
267         }
268
269         // If seeds exist, add them to the address book and dial out
270         if n.config.P2P.Seeds != "" {
271                 // dial out
272                 seeds := strings.Split(n.config.P2P.Seeds, ",")
273                 if err := n.DialSeeds(seeds); err != nil {
274                         return err
275                 }
276         }
277         lanchWebBroser(!n.config.Web.Closed)
278         return nil
279 }
280
281 func (n *Node) OnStop() {
282         n.BaseService.OnStop()
283
284         log.Info("Stopping Node")
285         // TODO: gracefully disconnect from peers.
286         n.sw.Stop()
287
288 }
289
290 func (n *Node) RunForever() {
291         // Sleep forever and then...
292         cmn.TrapSignal(func() {
293                 n.Stop()
294         })
295 }
296
297 // Add a Listener to accept inbound peer connections.
298 // Add listeners before starting the Node.
299 // The first listener is the primary listener (in NodeInfo)
300 func (n *Node) AddListener(l p2p.Listener) {
301         n.sw.AddListener(l)
302 }
303
304 func (n *Node) Switch() *p2p.Switch {
305         return n.sw
306 }
307
308 func (n *Node) EventSwitch() types.EventSwitch {
309         return n.evsw
310 }
311
312 func (n *Node) makeNodeInfo() *p2p.NodeInfo {
313         nodeInfo := &p2p.NodeInfo{
314                 PubKey:  n.privKey.PubKey().Unwrap().(crypto.PubKeyEd25519),
315                 Moniker: n.config.Moniker,
316                 Network: "bytom",
317                 Version: version.Version,
318                 Other: []string{
319                         cmn.Fmt("wire_version=%v", wire.Version),
320                         cmn.Fmt("p2p_version=%v", p2p.Version),
321                 },
322         }
323
324         if !n.sw.IsListening() {
325                 return nodeInfo
326         }
327
328         p2pListener := n.sw.Listeners()[0]
329         p2pHost := p2pListener.ExternalAddress().IP.String()
330         p2pPort := p2pListener.ExternalAddress().Port
331         //rpcListenAddr := n.config.RPC.ListenAddress
332
333         // We assume that the rpcListener has the same ExternalAddress.
334         // This is probably true because both P2P and RPC listeners use UPnP,
335         // except of course if the rpc is only bound to localhost
336         nodeInfo.ListenAddr = cmn.Fmt("%v:%v", p2pHost, p2pPort)
337         //nodeInfo.Other = append(nodeInfo.Other, cmn.Fmt("rpc_addr=%v", rpcListenAddr))
338         return nodeInfo
339 }
340
341 //------------------------------------------------------------------------------
342
343 func (n *Node) NodeInfo() *p2p.NodeInfo {
344         return n.sw.NodeInfo()
345 }
346
347 func (n *Node) DialSeeds(seeds []string) error {
348         return n.sw.DialSeeds(n.addrBook, seeds)
349 }
350
351 // Defaults to tcp
352 func ProtocolAndAddress(listenAddr string) (string, string) {
353         p, address := "tcp", listenAddr
354         parts := strings.SplitN(address, "://", 2)
355         if len(parts) == 2 {
356                 p, address = parts[0], parts[1]
357         }
358         return p, address
359 }
360
361 //------------------------------------------------------------------------------