OSDN Git Service

Revert "update master (#487)" (#518)
[bytom/bytom.git] / node / node.go
old mode 100644 (file)
new mode 100755 (executable)
index 80d91ba..043a801
@@ -5,47 +5,41 @@ import (
        "crypto/tls"
        "net"
        "net/http"
-       "os"
+       _ "net/http/pprof"
        "strings"
        "sync"
        "time"
 
-       bc "github.com/bytom/blockchain"
-       cfg "github.com/bytom/config"
-       "github.com/bytom/consensus"
-       p2p "github.com/bytom/p2p"
-       "github.com/bytom/protocol/bc/legacy"
-       rpccore "github.com/bytom/rpc/core"
-       grpccore "github.com/bytom/rpc/grpc"
-       "github.com/bytom/types"
-       "github.com/bytom/version"
+       "github.com/kr/secureheader"
+       log "github.com/sirupsen/logrus"
        crypto "github.com/tendermint/go-crypto"
        wire "github.com/tendermint/go-wire"
        cmn "github.com/tendermint/tmlibs/common"
        dbm "github.com/tendermint/tmlibs/db"
-       "github.com/tendermint/tmlibs/log"
-       //rpc "github.com/blockchain/rpc/lib"
+
+       bc "github.com/bytom/blockchain"
+       "github.com/bytom/blockchain/accesstoken"
        "github.com/bytom/blockchain/account"
        "github.com/bytom/blockchain/asset"
        "github.com/bytom/blockchain/pseudohsm"
        "github.com/bytom/blockchain/txdb"
-       "github.com/bytom/net/http/reqid"
-       "github.com/bytom/protocol"
-       rpcserver "github.com/bytom/rpc/lib/server"
-       //      "github.com/bytom/net/http/static"
-       //      "github.com/bytom/generated/dashboard"
-
+       "github.com/bytom/blockchain/txfeed"
+       w "github.com/bytom/blockchain/wallet"
+       cfg "github.com/bytom/config"
        "github.com/bytom/env"
        "github.com/bytom/errors"
-       bytomlog "github.com/bytom/log"
-       "github.com/kr/secureheader"
-
-       _ "net/http/pprof"
+       "github.com/bytom/p2p"
+       "github.com/bytom/protocol"
+       "github.com/bytom/types"
+       "github.com/bytom/util/browser"
+       "github.com/bytom/version"
 )
 
 const (
-       httpReadTimeout  = 2 * time.Minute
-       httpWriteTimeout = time.Hour
+       httpReadTimeout          = 2 * time.Minute
+       httpWriteTimeout         = time.Hour
+       webAddress               = "http://127.0.0.1:9888"
+       expireReservationsPeriod = time.Second
 )
 
 type Node struct {
@@ -59,39 +53,15 @@ type Node struct {
        sw       *p2p.Switch           // p2p connections
        addrBook *p2p.AddrBook         // known peers
 
-       // services
-       evsw types.EventSwitch // pub/sub for services
-       //    blockStore       *bc.MemStore
-       blockStore   *txdb.Store
-       bcReactor    *bc.BlockchainReactor
-       accounts     *account.Manager
-       assets       *asset.Registry
-       rpcListeners []net.Listener // rpc servers
+       evsw       types.EventSwitch // pub/sub for services
+       blockStore *txdb.Store
+       bcReactor  *bc.BlockchainReactor
+       accounts   *account.Manager
+       assets     *asset.Registry
 }
 
-var (
-       // config vars
-       rootCAs       = env.String("ROOT_CA_CERTS", "") // file path
-       splunkAddr    = os.Getenv("SPLUNKADDR")
-       logFile       = os.Getenv("LOGFILE")
-       logSize       = env.Int("LOGSIZE", 5e6) // 5MB
-       logCount      = env.Int("LOGCOUNT", 9)
-       logQueries    = env.Bool("LOG_QUERIES", false)
-       maxDBConns    = env.Int("MAXDBCONNS", 10)           // set to 100 in prod
-       rpsToken      = env.Int("RATELIMIT_TOKEN", 0)       // reqs/sec
-       rpsRemoteAddr = env.Int("RATELIMIT_REMOTE_ADDR", 0) // reqs/sec
-       indexTxs      = env.Bool("INDEX_TRANSACTIONS", true)
-       home          = bc.HomeDirFromEnvironment()
-       bootURL       = env.String("BOOTURL", "")
-       // build vars; initialized by the linker
-       buildTag    = "?"
-       buildCommit = "?"
-       buildDate   = "?"
-       race        []interface{} // initialized in race.go
-)
-
-func NewNodeDefault(config *cfg.Config, logger log.Logger) *Node {
-       return NewNode(config, logger)
+func NewNodeDefault(config *cfg.Config) *Node {
+       return NewNode(config)
 }
 
 func RedirectHandler(next http.Handler) http.Handler {
@@ -119,7 +89,7 @@ func (wh *waitHandler) ServeHTTP(w http.ResponseWriter, req *http.Request) {
        wh.h.ServeHTTP(w, req)
 }
 
-func rpcInit(h *bc.BlockchainReactor, config *cfg.Config) {
+func rpcInit(h *bc.BlockchainReactor, config *cfg.Config, accessTokens *accesstoken.CredentialStore) {
        // The waitHandler accepts incoming requests, but blocks until its underlying
        // handler is set, when the second phase is complete.
        var coreHandler waitHandler
@@ -128,9 +98,11 @@ func rpcInit(h *bc.BlockchainReactor, config *cfg.Config) {
        mux.Handle("/", &coreHandler)
 
        var handler http.Handler = mux
-       //handler = core.AuthHandler(handler, raftDB, accessTokens, tlsConfig)
+
+       if config.Auth.Disable == false {
+               handler = bc.AuthHandler(handler, accessTokens)
+       }
        handler = RedirectHandler(handler)
-       handler = reqid.Handler(handler)
 
        secureheader.DefaultConfig.PermitClearLoopback = true
        secureheader.DefaultConfig.HTTPSRedirect = false
@@ -148,114 +120,112 @@ func rpcInit(h *bc.BlockchainReactor, config *cfg.Config) {
                TLSNextProto: map[string]func(*http.Server, *tls.Conn, http.Handler){},
        }
        listenAddr := env.String("LISTEN", config.ApiAddress)
-       listener, _ := net.Listen("tcp", *listenAddr)
+       log.WithField("api address:", config.ApiAddress).Info("Rpc listen")
+       listener, err := net.Listen("tcp", *listenAddr)
+       if err != nil {
+               cmn.Exit(cmn.Fmt("Failed to register tcp port: %v", err))
+       }
 
        // The `Serve` call has to happen in its own goroutine because
        // it's blocking and we need to proceed to the rest of the core setup after
        // we call it.
        go func() {
-               err := server.Serve(listener)
-               bytomlog.Fatalkv(context.Background(), bytomlog.KeyError, errors.Wrap(err, "Serve"))
+               if err := server.Serve(listener); err != nil {
+                       log.WithField("error", errors.Wrap(err, "Serve")).Error("Rpc server")
+               }
        }()
        coreHandler.Set(h)
 }
 
-func NewNode(config *cfg.Config, logger log.Logger) *Node {
+func NewNode(config *cfg.Config) *Node {
+       ctx := context.Background()
+
        // Get store
-       tx_db := dbm.NewDB("txdb", config.DBBackend, config.DBDir())
-       store := txdb.NewStore(tx_db)
+       txDB := dbm.NewDB("txdb", config.DBBackend, config.DBDir())
+       store := txdb.NewStore(txDB)
+
+       tokenDB := dbm.NewDB("accesstoken", config.DBBackend, config.DBDir())
+       accessTokens := accesstoken.NewStore(tokenDB)
 
        privKey := crypto.GenPrivKeyEd25519()
 
        // Make event switch
        eventSwitch := types.NewEventSwitch()
-       eventSwitch.SetLogger(logger.With("module", "types"))
        _, err := eventSwitch.Start()
        if err != nil {
                cmn.Exit(cmn.Fmt("Failed to start switch: %v", err))
        }
 
-       p2pLogger := logger.With("module", "p2p")
-
        sw := p2p.NewSwitch(config.P2P)
-       sw.SetLogger(p2pLogger)
-
-       fastSync := config.FastSync
 
-       genesisBlock := &legacy.Block{
-               BlockHeader:  legacy.BlockHeader{},
-               Transactions: []*legacy.Tx{},
-       }
-       genesisBlock.UnmarshalText(consensus.InitBlock())
+       genesisBlock := cfg.GenerateGenesisBlock()
 
-       chain, err := protocol.NewChain(context.Background(), genesisBlock.Hash(), store, nil)
-       genesisSnap, err := chain.ApplyValidBlock(genesisBlock)
+       txPool := protocol.NewTxPool()
+       chain, err := protocol.NewChain(genesisBlock.Hash(), store, txPool)
        if err != nil {
-               cmn.Exit(cmn.Fmt("Failed to apply valid block: %v", err))
+               cmn.Exit(cmn.Fmt("Failed to create chain structure: %v", err))
        }
-       if err := chain.CommitAppliedBlock(nil, genesisBlock, genesisSnap); err != nil {
-               cmn.Exit(cmn.Fmt("Failed to commit applied block: %v", err))
+
+       if chain.BestBlockHash() == nil {
+               if err := chain.SaveBlock(genesisBlock); err != nil {
+                       cmn.Exit(cmn.Fmt("Failed to save genesisBlock to store: %v", err))
+               }
+               if err := chain.ConnectBlock(genesisBlock); err != nil {
+                       cmn.Exit(cmn.Fmt("Failed to connect genesisBlock to chain: %v", err))
+               }
        }
 
-       txPool := protocol.NewTxPool()
-       /* if err != nil {
-            cmn.Exit(cmn.Fmt("protocol new chain failed: %v", err))
-          }
-          err = chain.CommitAppliedBlock(context.Background(), block, state.Empty())
-          if err != nil {
-            cmn.Exit(cmn.Fmt("commit block failed: %v", err))
-          }
-          chain.MaxIssuanceWindow = bc.MillisDuration(c.MaxIssuanceWindowMs)
-       */
-
-       accounts_db := dbm.NewDB("account", config.DBBackend, config.DBDir())
-       accounts := account.NewManager(accounts_db, chain)
-       assets_db := dbm.NewDB("asset", config.DBBackend, config.DBDir())
-       assets := asset.NewRegistry(assets_db, chain)
-
-       //Todo HSM
-       /*
-               if config.HsmUrl != ""{
-                       // todo remoteHSM
-                       cmn.Exit(cmn.Fmt("not implement"))
-
-               } else {
-                       hsm, err = pseudohsm.New(config.KeysDir())
-                       if err != nil {
-                               cmn.Exit(cmn.Fmt("initialize HSM failed: %v", err))
-                       }
-               }*/
+       var accounts *account.Manager = nil
+       var assets *asset.Registry = nil
+       var wallet *w.Wallet = nil
+       var txFeed *txfeed.Tracker = nil
+
+       txFeedDB := dbm.NewDB("txfeeds", config.DBBackend, config.DBDir())
+       txFeed = txfeed.NewTracker(txFeedDB, chain)
+
+       if err = txFeed.Prepare(ctx); err != nil {
+               log.WithField("error", err).Error("start txfeed")
+               return nil
+       }
 
        hsm, err := pseudohsm.New(config.KeysDir())
        if err != nil {
                cmn.Exit(cmn.Fmt("initialize HSM failed: %v", err))
        }
-       bcReactor := bc.NewBlockchainReactor(store, chain, txPool, accounts, assets, hsm, fastSync)
 
-       bcReactor.SetLogger(logger.With("module", "blockchain"))
+       if !config.Wallet.Disable {
+               xpubs, _ := hsm.ListKeys()
+               walletDB := dbm.NewDB("wallet", config.DBBackend, config.DBDir())
+               accounts = account.NewManager(walletDB, chain)
+               assets = asset.NewRegistry(walletDB, chain)
+               wallet, err = w.NewWallet(walletDB, accounts, assets, chain, xpubs)
+               if err != nil {
+                       log.WithField("error", err).Error("init NewWallet")
+               }
+               // Clean up expired UTXO reservations periodically.
+               go accounts.ExpireReservations(ctx, expireReservationsPeriod)
+       }
+
+       bcReactor := bc.NewBlockchainReactor(chain, txPool, accounts, assets, sw, hsm, wallet, txFeed, accessTokens, config.Mining)
+
        sw.AddReactor("BLOCKCHAIN", bcReactor)
 
-       rpcInit(bcReactor, config)
+       rpcInit(bcReactor, config, accessTokens)
        // Optionally, start the pex reactor
        var addrBook *p2p.AddrBook
        if config.P2P.PexReactor {
                addrBook = p2p.NewAddrBook(config.P2P.AddrBookFile(), config.P2P.AddrBookStrict)
-               addrBook.SetLogger(p2pLogger.With("book", config.P2P.AddrBookFile()))
                pexReactor := p2p.NewPEXReactor(addrBook)
-               pexReactor.SetLogger(p2pLogger)
                sw.AddReactor("PEX", pexReactor)
        }
 
-       // add the event switch to all services
-       // they should all satisfy events.Eventable
-       //SetEventSwitch(eventSwitch, bcReactor, mempoolReactor, consensusReactor)
-
        // run the profile server
        profileHost := config.ProfListenAddress
        if profileHost != "" {
-
+               // Profiling bytomd programs.see (https://blog.golang.org/profiling-go-programs)
+               // go tool pprof http://profileHose/debug/pprof/heap
                go func() {
-                       logger.Error("Profile server", "error", http.ListenAndServe(profileHost, nil))
+                       http.ListenAndServe(profileHost, nil)
                }()
        }
 
@@ -272,14 +242,26 @@ func NewNode(config *cfg.Config, logger log.Logger) *Node {
                accounts:   accounts,
                assets:     assets,
        }
-       node.BaseService = *cmn.NewBaseService(logger, "Node", node)
+       node.BaseService = *cmn.NewBaseService(nil, "Node", node)
+
        return node
 }
 
+// Lanch web broser or not
+func lanchWebBroser(lanch bool) {
+       if lanch {
+               log.Info("Launching System Browser with :", webAddress)
+               if err := browser.Open(webAddress); err != nil {
+                       log.Error(err.Error())
+                       return
+               }
+       }
+}
+
 func (n *Node) OnStart() error {
        // Create & add listener
-       protocol, address := ProtocolAndAddress(n.config.P2P.ListenAddress)
-       l := p2p.NewDefaultListener(protocol, address, n.config.P2P.SkipUPNP, n.Logger.With("module", "p2p"))
+       p, address := ProtocolAndAddress(n.config.P2P.ListenAddress)
+       l := p2p.NewDefaultListener(p, address, n.config.P2P.SkipUPNP, nil)
        n.sw.AddListener(l)
 
        // Start the switch
@@ -298,31 +280,17 @@ func (n *Node) OnStart() error {
                        return err
                }
        }
-       // Run the RPC server
-       if n.config.RPC.ListenAddress != "" {
-               listeners, err := n.startRPC()
-               if err != nil {
-                       return err
-               }
-               n.rpcListeners = listeners
-       }
-
+       lanchWebBroser(!n.config.Web.Closed)
        return nil
 }
 
 func (n *Node) OnStop() {
        n.BaseService.OnStop()
 
-       n.Logger.Info("Stopping Node")
+       log.Info("Stopping Node")
        // TODO: gracefully disconnect from peers.
        n.sw.Stop()
 
-       for _, l := range n.rpcListeners {
-               n.Logger.Info("Closing rpc listener", "listener", l)
-               if err := l.Close(); err != nil {
-                       n.Logger.Error("Error closing listener", "listener", l, "error", err)
-               }
-       }
 }
 
 func (n *Node) RunForever() {
@@ -332,13 +300,6 @@ func (n *Node) RunForever() {
        })
 }
 
-// Add the event switch to reactors, mempool, etc.
-func SetEventSwitch(evsw types.EventSwitch, eventables ...types.Eventable) {
-       for _, e := range eventables {
-               e.SetEventSwitch(evsw)
-       }
-}
-
 // Add a Listener to accept inbound peer connections.
 // Add listeners before starting the Node.
 // The first listener is the primary listener (in NodeInfo)
@@ -346,57 +307,6 @@ func (n *Node) AddListener(l p2p.Listener) {
        n.sw.AddListener(l)
 }
 
-// ConfigureRPC sets all variables in rpccore so they will serve
-// rpc calls from this node
-func (n *Node) ConfigureRPC() {
-       rpccore.SetEventSwitch(n.evsw)
-       rpccore.SetBlockStore(n.blockStore)
-       //rpccore.SetConsensusState(n.consensusState)
-       //rpccore.SetMempool(n.mempoolReactor.Mempool)
-       rpccore.SetSwitch(n.sw)
-       //rpccore.SetGenesisDoc(n.genesisDoc)
-       rpccore.SetAddrBook(n.addrBook)
-       //rpccore.SetProxyAppQuery(n.proxyApp.Query())
-       //rpccore.SetTxIndexer(n.txIndexer)
-       rpccore.SetLogger(n.Logger.With("module", "rpc"))
-}
-
-func (n *Node) startRPC() ([]net.Listener, error) {
-       n.ConfigureRPC()
-       listenAddrs := strings.Split(n.config.RPC.ListenAddress, ",")
-
-       if n.config.RPC.Unsafe {
-               rpccore.AddUnsafeRoutes()
-       }
-
-       // we may expose the rpc over both a unix and tcp socket
-       listeners := make([]net.Listener, len(listenAddrs))
-       for i, listenAddr := range listenAddrs {
-               mux := http.NewServeMux()
-               wm := rpcserver.NewWebsocketManager(rpccore.Routes, n.evsw)
-               rpcLogger := n.Logger.With("module", "rpc-server")
-               wm.SetLogger(rpcLogger)
-               mux.HandleFunc("/websocket", wm.WebsocketHandler)
-               rpcserver.RegisterRPCFuncs(mux, rpccore.Routes, rpcLogger)
-               listener, err := rpcserver.StartHTTPServer(listenAddr, mux, rpcLogger)
-               if err != nil {
-                       return nil, err
-               }
-               listeners[i] = listener
-       }
-
-       // we expose a simplified api over grpc for convenience to app devs
-       grpcListenAddr := n.config.RPC.GRPCListenAddress
-       if grpcListenAddr != "" {
-               listener, err := grpccore.StartGRPCServer(grpcListenAddr)
-               if err != nil {
-                       return nil, err
-               }
-               listeners = append(listeners, listener)
-       }
-       return listeners, nil
-}
-
 func (n *Node) Switch() *p2p.Switch {
        return n.sw
 }
@@ -409,7 +319,7 @@ func (n *Node) makeNodeInfo() *p2p.NodeInfo {
        nodeInfo := &p2p.NodeInfo{
                PubKey:  n.privKey.PubKey().Unwrap().(crypto.PubKeyEd25519),
                Moniker: n.config.Moniker,
-               Network: "chain0",
+               Network: "bytom",
                Version: version.Version,
                Other: []string{
                        cmn.Fmt("wire_version=%v", wire.Version),
@@ -424,13 +334,13 @@ func (n *Node) makeNodeInfo() *p2p.NodeInfo {
        p2pListener := n.sw.Listeners()[0]
        p2pHost := p2pListener.ExternalAddress().IP.String()
        p2pPort := p2pListener.ExternalAddress().Port
-       rpcListenAddr := n.config.RPC.ListenAddress
+       //rpcListenAddr := n.config.RPC.ListenAddress
 
        // We assume that the rpcListener has the same ExternalAddress.
        // This is probably true because both P2P and RPC listeners use UPnP,
        // except of course if the rpc is only bound to localhost
        nodeInfo.ListenAddr = cmn.Fmt("%v:%v", p2pHost, p2pPort)
-       nodeInfo.Other = append(nodeInfo.Other, cmn.Fmt("rpc_addr=%v", rpcListenAddr))
+       //nodeInfo.Other = append(nodeInfo.Other, cmn.Fmt("rpc_addr=%v", rpcListenAddr))
        return nodeInfo
 }
 
@@ -446,12 +356,12 @@ func (n *Node) DialSeeds(seeds []string) error {
 
 // Defaults to tcp
 func ProtocolAndAddress(listenAddr string) (string, string) {
-       protocol, address := "tcp", listenAddr
+       p, address := "tcp", listenAddr
        parts := strings.SplitN(address, "://", 2)
        if len(parts) == 2 {
-               protocol, address = parts[0], parts[1]
+               p, address = parts[0], parts[1]
        }
-       return protocol, address
+       return p, address
 }
 
 //------------------------------------------------------------------------------