13 "github.com/kr/secureheader"
14 log "github.com/sirupsen/logrus"
15 crypto "github.com/tendermint/go-crypto"
16 wire "github.com/tendermint/go-wire"
17 cmn "github.com/tendermint/tmlibs/common"
18 dbm "github.com/tendermint/tmlibs/db"
20 bc "github.com/bytom/blockchain"
21 "github.com/bytom/blockchain/accesstoken"
22 "github.com/bytom/blockchain/account"
23 "github.com/bytom/blockchain/asset"
24 "github.com/bytom/blockchain/pseudohsm"
25 "github.com/bytom/blockchain/txdb"
26 "github.com/bytom/blockchain/txfeed"
27 w "github.com/bytom/blockchain/wallet"
28 cfg "github.com/bytom/config"
29 "github.com/bytom/env"
30 "github.com/bytom/errors"
31 "github.com/bytom/net/http/authn"
32 "github.com/bytom/p2p"
33 "github.com/bytom/protocol"
34 "github.com/bytom/types"
35 "github.com/bytom/version"
39 httpReadTimeout = 2 * time.Minute
40 httpWriteTimeout = time.Hour
50 privKey crypto.PrivKeyEd25519 // local node's p2p key
51 sw *p2p.Switch // p2p connections
52 addrBook *p2p.AddrBook // known peers
54 evsw types.EventSwitch // pub/sub for services
55 blockStore *txdb.Store
56 bcReactor *bc.BlockchainReactor
57 accounts *account.Manager
58 assets *asset.Registry
61 func NewNodeDefault(config *cfg.Config) *Node {
62 return NewNode(config)
65 func RedirectHandler(next http.Handler) http.Handler {
66 return http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
67 if req.URL.Path == "/" {
68 http.Redirect(w, req, "/dashboard/", http.StatusFound)
71 next.ServeHTTP(w, req)
75 type waitHandler struct {
80 func (wh *waitHandler) Set(h http.Handler) {
85 func (wh *waitHandler) ServeHTTP(w http.ResponseWriter, req *http.Request) {
87 wh.h.ServeHTTP(w, req)
90 func AuthHandler(handler http.Handler, accessTokens *accesstoken.CredentialStore) http.Handler {
92 authenticator := authn.NewAPI(accessTokens)
94 return http.HandlerFunc(func(rw http.ResponseWriter, req *http.Request) {
95 // TODO(tessr): check that this path exists; return early if this path isn't legit
96 req, err := authenticator.Authenticate(req)
98 log.WithField("error", errors.Wrap(err, "Serve")).Error("Authenticate fail")
102 handler.ServeHTTP(rw, req)
106 func rpcInit(h *bc.BlockchainReactor, config *cfg.Config, accessTokens *accesstoken.CredentialStore) {
107 // The waitHandler accepts incoming requests, but blocks until its underlying
108 // handler is set, when the second phase is complete.
109 var coreHandler waitHandler
110 coreHandler.wg.Add(1)
111 mux := http.NewServeMux()
112 mux.Handle("/", &coreHandler)
114 var handler http.Handler = mux
115 handler = AuthHandler(handler, accessTokens)
116 handler = RedirectHandler(handler)
118 secureheader.DefaultConfig.PermitClearLoopback = true
119 secureheader.DefaultConfig.HTTPSRedirect = false
120 secureheader.DefaultConfig.Next = handler
122 server := &http.Server{
123 // Note: we should not set TLSConfig here;
124 // we took care of TLS with the listener in maybeUseTLS.
125 Handler: secureheader.DefaultConfig,
126 ReadTimeout: httpReadTimeout,
127 WriteTimeout: httpWriteTimeout,
128 // Disable HTTP/2 for now until the Go implementation is more stable.
129 // https://github.com/golang/go/issues/16450
130 // https://github.com/golang/go/issues/17071
131 TLSNextProto: map[string]func(*http.Server, *tls.Conn, http.Handler){},
133 listenAddr := env.String("LISTEN", config.ApiAddress)
134 listener, err := net.Listen("tcp", *listenAddr)
136 cmn.Exit(cmn.Fmt("Failed to register tcp port: %v", err))
139 // The `Serve` call has to happen in its own goroutine because
140 // it's blocking and we need to proceed to the rest of the core setup after
143 if err := server.Serve(listener); err != nil {
144 log.WithField("error", errors.Wrap(err, "Serve")).Error("Rpc server")
150 func NewNode(config *cfg.Config) *Node {
151 ctx := context.Background()
154 txDB := dbm.NewDB("txdb", config.DBBackend, config.DBDir())
155 store := txdb.NewStore(txDB)
157 tokenDB := dbm.NewDB("accesstoken", config.DBBackend, config.DBDir())
158 accessTokens := accesstoken.NewStore(tokenDB)
160 privKey := crypto.GenPrivKeyEd25519()
163 eventSwitch := types.NewEventSwitch()
164 _, err := eventSwitch.Start()
166 cmn.Exit(cmn.Fmt("Failed to start switch: %v", err))
169 sw := p2p.NewSwitch(config.P2P)
171 genesisBlock := cfg.GenerateGenesisBlock()
173 txPool := protocol.NewTxPool()
174 chain, err := protocol.NewChain(genesisBlock.Hash(), store, txPool)
176 cmn.Exit(cmn.Fmt("Failed to create chain structure: %v", err))
179 if chain.Height() == 0 {
180 if err := chain.SaveBlock(genesisBlock); err != nil {
181 cmn.Exit(cmn.Fmt("Failed to save genesisBlock to store: %v", err))
183 if err := chain.ConnectBlock(genesisBlock); err != nil {
184 cmn.Exit(cmn.Fmt("Failed to connect genesisBlock to chain: %v", err))
188 var accounts *account.Manager = nil
189 var assets *asset.Registry = nil
190 var wallet *w.Wallet = nil
191 var txFeed *txfeed.Tracker = nil
193 if config.Wallet.Enable {
194 accountsDB := dbm.NewDB("account", config.DBBackend, config.DBDir())
195 assetsDB := dbm.NewDB("asset", config.DBBackend, config.DBDir())
196 walletDB := dbm.NewDB("wallet", config.DBBackend, config.DBDir())
198 accounts = account.NewManager(accountsDB, walletDB, w.GetWalletHeight, chain)
199 assets = asset.NewRegistry(assetsDB, chain)
201 wallet = w.InitWallet(walletDB, accounts, assets)
202 wallet.Ind.RegisterAnnotator(accounts.AnnotateTxs)
203 wallet.Ind.RegisterAnnotator(assets.AnnotateTxs)
205 go wallet.WalletUpdate(chain)
207 txFeedDB := dbm.NewDB("txfeeds", config.DBBackend, config.DBDir())
208 txFeed = txfeed.NewTracker(txFeedDB, chain)
210 if err = txFeed.Prepare(ctx); err != nil {
211 log.WithField("error", err).Error("start txfeed")
218 if config.HsmUrl != ""{
220 cmn.Exit(cmn.Fmt("not implement"))
222 hsm, err = pseudohsm.New(config.KeysDir())
224 cmn.Exit(cmn.Fmt("initialize HSM failed: %v", err))
228 hsm, err := pseudohsm.New(config.KeysDir())
230 cmn.Exit(cmn.Fmt("initialize HSM failed: %v", err))
232 bcReactor := bc.NewBlockchainReactor(chain, txPool, accounts, assets, sw, hsm, wallet, txFeed, accessTokens)
234 sw.AddReactor("BLOCKCHAIN", bcReactor)
236 rpcInit(bcReactor, config, accessTokens)
237 // Optionally, start the pex reactor
238 var addrBook *p2p.AddrBook
239 if config.P2P.PexReactor {
240 addrBook = p2p.NewAddrBook(config.P2P.AddrBookFile(), config.P2P.AddrBookStrict)
241 pexReactor := p2p.NewPEXReactor(addrBook)
242 sw.AddReactor("PEX", pexReactor)
245 // run the profile server
246 profileHost := config.ProfListenAddress
247 if profileHost != "" {
248 // Profiling bytomd programs.see (https://blog.golang.org/profiling-go-programs)
249 // go tool pprof http://profileHose/debug/pprof/heap
251 http.ListenAndServe(profileHost, nil)
263 bcReactor: bcReactor,
268 node.BaseService = *cmn.NewBaseService(nil, "Node", node)
272 func (n *Node) OnStart() error {
273 // Create & add listener
274 protocol, address := ProtocolAndAddress(n.config.P2P.ListenAddress)
275 l := p2p.NewDefaultListener(protocol, address, n.config.P2P.SkipUPNP, nil)
279 n.sw.SetNodeInfo(n.makeNodeInfo())
280 n.sw.SetNodePrivKey(n.privKey)
281 _, err := n.sw.Start()
286 // If seeds exist, add them to the address book and dial out
287 if n.config.P2P.Seeds != "" {
289 seeds := strings.Split(n.config.P2P.Seeds, ",")
290 if err := n.DialSeeds(seeds); err != nil {
297 func (n *Node) OnStop() {
298 n.BaseService.OnStop()
300 log.Info("Stopping Node")
301 // TODO: gracefully disconnect from peers.
306 func (n *Node) RunForever() {
307 // Sleep forever and then...
308 cmn.TrapSignal(func() {
313 // Add the event switch to reactors, mempool, etc.
314 func SetEventSwitch(evsw types.EventSwitch, eventables ...types.Eventable) {
315 for _, e := range eventables {
316 e.SetEventSwitch(evsw)
320 // Add a Listener to accept inbound peer connections.
321 // Add listeners before starting the Node.
322 // The first listener is the primary listener (in NodeInfo)
323 func (n *Node) AddListener(l p2p.Listener) {
327 func (n *Node) Switch() *p2p.Switch {
331 func (n *Node) EventSwitch() types.EventSwitch {
335 func (n *Node) makeNodeInfo() *p2p.NodeInfo {
336 nodeInfo := &p2p.NodeInfo{
337 PubKey: n.privKey.PubKey().Unwrap().(crypto.PubKeyEd25519),
338 Moniker: n.config.Moniker,
340 Version: version.Version,
342 cmn.Fmt("wire_version=%v", wire.Version),
343 cmn.Fmt("p2p_version=%v", p2p.Version),
347 if !n.sw.IsListening() {
351 p2pListener := n.sw.Listeners()[0]
352 p2pHost := p2pListener.ExternalAddress().IP.String()
353 p2pPort := p2pListener.ExternalAddress().Port
354 //rpcListenAddr := n.config.RPC.ListenAddress
356 // We assume that the rpcListener has the same ExternalAddress.
357 // This is probably true because both P2P and RPC listeners use UPnP,
358 // except of course if the rpc is only bound to localhost
359 nodeInfo.ListenAddr = cmn.Fmt("%v:%v", p2pHost, p2pPort)
360 //nodeInfo.Other = append(nodeInfo.Other, cmn.Fmt("rpc_addr=%v", rpcListenAddr))
364 //------------------------------------------------------------------------------
366 func (n *Node) NodeInfo() *p2p.NodeInfo {
367 return n.sw.NodeInfo()
370 func (n *Node) DialSeeds(seeds []string) error {
371 return n.sw.DialSeeds(n.addrBook, seeds)
375 func ProtocolAndAddress(listenAddr string) (string, string) {
376 protocol, address := "tcp", listenAddr
377 parts := strings.SplitN(address, "://", 2)
379 protocol, address = parts[0], parts[1]
381 return protocol, address
384 //------------------------------------------------------------------------------