OSDN Git Service

Merge branch 'dev' into dev-verify
[bytom/bytom.git] / node / node.go
index f4d413e..5f82e93 100644 (file)
@@ -2,42 +2,38 @@ package node
 
 import (
        "context"
-       "crypto/tls"
-       "net"
        "net/http"
-       "os"
-       "strings"
-       "sync"
+       _ "net/http/pprof"
        "time"
 
-       "github.com/kr/secureheader"
        log "github.com/sirupsen/logrus"
-       crypto "github.com/tendermint/go-crypto"
-       wire "github.com/tendermint/go-wire"
        cmn "github.com/tendermint/tmlibs/common"
        dbm "github.com/tendermint/tmlibs/db"
-       _ "net/http/pprof"
+       browser "github.com/toqueteos/webbrowser"
 
-       bc "github.com/bytom/blockchain"
-       "github.com/bytom/blockchain/account"
-       "github.com/bytom/blockchain/asset"
-       "github.com/bytom/blockchain/pin"
+       "github.com/bytom/accesstoken"
+       "github.com/bytom/account"
+       "github.com/bytom/api"
+       "github.com/bytom/asset"
        "github.com/bytom/blockchain/pseudohsm"
-       "github.com/bytom/blockchain/txdb"
+       "github.com/bytom/blockchain/txfeed"
        cfg "github.com/bytom/config"
        "github.com/bytom/consensus"
+       "github.com/bytom/database/leveldb"
        "github.com/bytom/env"
-       "github.com/bytom/errors"
-       p2p "github.com/bytom/p2p"
+       "github.com/bytom/mining/cpuminer"
+       "github.com/bytom/mining/miningpool"
+       "github.com/bytom/netsync"
        "github.com/bytom/protocol"
-       "github.com/bytom/protocol/bc/legacy"
+       "github.com/bytom/protocol/bc"
        "github.com/bytom/types"
-       "github.com/bytom/version"
+       w "github.com/bytom/wallet"
 )
 
 const (
-       httpReadTimeout  = 2 * time.Minute
-       httpWriteTimeout = time.Hour
+       webAddress               = "http://127.0.0.1:9888"
+       expireReservationsPeriod = time.Second
+       maxNewBlockChSize        = 1024
 )
 
 type Node struct {
@@ -46,277 +42,151 @@ type Node struct {
        // config
        config *cfg.Config
 
-       // network
-       privKey  crypto.PrivKeyEd25519 // local node's p2p key
-       sw       *p2p.Switch           // p2p connections
-       addrBook *p2p.AddrBook         // known peers
+       syncManager *netsync.SyncManager
 
-       // services
        evsw types.EventSwitch // pub/sub for services
-       //    blockStore       *bc.MemStore
-       blockStore *txdb.Store
-       bcReactor  *bc.BlockchainReactor
-       accounts   *account.Manager
-       assets     *asset.Registry
-}
-
-var (
-       // config vars
-       rootCAs       = env.String("ROOT_CA_CERTS", "") // file path
-       splunkAddr    = os.Getenv("SPLUNKADDR")
-       logFile       = os.Getenv("LOGFILE")
-       logSize       = env.Int("LOGSIZE", 5e6) // 5MB
-       logCount      = env.Int("LOGCOUNT", 9)
-       logQueries    = env.Bool("LOG_QUERIES", false)
-       maxDBConns    = env.Int("MAXDBCONNS", 10)           // set to 100 in prod
-       rpsToken      = env.Int("RATELIMIT_TOKEN", 0)       // reqs/sec
-       rpsRemoteAddr = env.Int("RATELIMIT_REMOTE_ADDR", 0) // reqs/sec
-       indexTxs      = env.Bool("INDEX_TRANSACTIONS", true)
-       home          = bc.HomeDirFromEnvironment()
-       bootURL       = env.String("BOOTURL", "")
-       // build vars; initialized by the linker
-       buildTag    = "?"
-       buildCommit = "?"
-       buildDate   = "?"
-       race        []interface{} // initialized in race.go
-)
-
-func NewNodeDefault(config *cfg.Config) *Node {
-       return NewNode(config)
-}
-
-func RedirectHandler(next http.Handler) http.Handler {
-       return http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
-               if req.URL.Path == "/" {
-                       http.Redirect(w, req, "/dashboard/", http.StatusFound)
-                       return
-               }
-               next.ServeHTTP(w, req)
-       })
-}
-
-type waitHandler struct {
-       h  http.Handler
-       wg sync.WaitGroup
-}
-
-func (wh *waitHandler) Set(h http.Handler) {
-       wh.h = h
-       wh.wg.Done()
-}
-
-func (wh *waitHandler) ServeHTTP(w http.ResponseWriter, req *http.Request) {
-       wh.wg.Wait()
-       wh.h.ServeHTTP(w, req)
-}
-
-func rpcInit(h *bc.BlockchainReactor, config *cfg.Config) {
-       // The waitHandler accepts incoming requests, but blocks until its underlying
-       // handler is set, when the second phase is complete.
-       var coreHandler waitHandler
-       coreHandler.wg.Add(1)
-       mux := http.NewServeMux()
-       mux.Handle("/", &coreHandler)
-
-       var handler http.Handler = mux
-       handler = RedirectHandler(handler)
-
-       secureheader.DefaultConfig.PermitClearLoopback = true
-       secureheader.DefaultConfig.HTTPSRedirect = false
-       secureheader.DefaultConfig.Next = handler
-
-       server := &http.Server{
-               // Note: we should not set TLSConfig here;
-               // we took care of TLS with the listener in maybeUseTLS.
-               Handler:      secureheader.DefaultConfig,
-               ReadTimeout:  httpReadTimeout,
-               WriteTimeout: httpWriteTimeout,
-               // Disable HTTP/2 for now until the Go implementation is more stable.
-               // https://github.com/golang/go/issues/16450
-               // https://github.com/golang/go/issues/17071
-               TLSNextProto: map[string]func(*http.Server, *tls.Conn, http.Handler){},
-       }
-       listenAddr := env.String("LISTEN", config.ApiAddress)
-       listener, _ := net.Listen("tcp", *listenAddr)
-
-       // The `Serve` call has to happen in its own goroutine because
-       // it's blocking and we need to proceed to the rest of the core setup after
-       // we call it.
-       go func() {
-               err := server.Serve(listener)
-               log.WithField("error", errors.Wrap(err, "Serve")).Error("Rpc server")
-       }()
-       coreHandler.Set(h)
+       //bcReactor    *bc.BlockchainReactor
+       wallet       *w.Wallet
+       accessTokens *accesstoken.CredentialStore
+       api          *api.API
+       chain        *protocol.Chain
+       txfeed       *txfeed.Tracker
+       cpuMiner     *cpuminer.CPUMiner
+       miningPool   *miningpool.MiningPool
+       miningEnable bool
 }
 
 func NewNode(config *cfg.Config) *Node {
        ctx := context.Background()
-
+       initActiveNetParams(config)
        // Get store
-       txDB := dbm.NewDB("txdb", config.DBBackend, config.DBDir())
-       store := txdb.NewStore(txDB)
+       coreDB := dbm.NewDB("core", config.DBBackend, config.DBDir())
+       store := leveldb.NewStore(coreDB)
 
-       privKey := crypto.GenPrivKeyEd25519()
+       tokenDB := dbm.NewDB("accesstoken", config.DBBackend, config.DBDir())
+       accessTokens := accesstoken.NewStore(tokenDB)
 
        // Make event switch
        eventSwitch := types.NewEventSwitch()
-       _, err := eventSwitch.Start()
-       if err != nil {
+       if _, err := eventSwitch.Start(); err != nil {
                cmn.Exit(cmn.Fmt("Failed to start switch: %v", err))
        }
 
-       sw := p2p.NewSwitch(config.P2P)
-
-       fastSync := config.FastSync
-
-       genesisBlock := &legacy.Block{
-               BlockHeader:  legacy.BlockHeader{},
-               Transactions: []*legacy.Tx{},
-       }
-       genesisBlock.UnmarshalText(consensus.InitBlock())
-
        txPool := protocol.NewTxPool()
-       chain, err := protocol.NewChain(ctx, genesisBlock.Hash(), store, txPool, nil)
-
-       if store.Height() < 1 {
-               if err := chain.AddBlock(nil, genesisBlock); err != nil {
-                       cmn.Exit(cmn.Fmt("Failed to add genesisBlock to Chain: %v", err))
-               }
+       chain, err := protocol.NewChain(store, txPool)
+       if err != nil {
+               cmn.Exit(cmn.Fmt("Failed to create chain structure: %v", err))
        }
 
        var accounts *account.Manager = nil
        var assets *asset.Registry = nil
-       var pinStore *pin.Store = nil
+       var wallet *w.Wallet = nil
+       var txFeed *txfeed.Tracker = nil
 
-       if config.Wallet.Enable {
-               accountsDB := dbm.NewDB("account", config.DBBackend, config.DBDir())
-               accUTXODB := dbm.NewDB("accountutxos", config.DBBackend, config.DBDir())
-               pinStore = pin.NewStore(accUTXODB)
-               err = pinStore.LoadAll(ctx)
-               if err != nil {
-                       log.WithField("error", err).Error("load pin store")
-                       return nil
-               }
+       txFeedDB := dbm.NewDB("txfeeds", config.DBBackend, config.DBDir())
+       txFeed = txfeed.NewTracker(txFeedDB, chain)
 
-               pinHeight := store.Height()
-               if pinHeight > 0 {
-                       pinHeight = pinHeight - 1
-               }
-
-               pins := []string{account.PinName, account.DeleteSpentsPinName}
-               for _, p := range pins {
-                       err = pinStore.CreatePin(ctx, p, pinHeight)
-                       if err != nil {
-                               log.WithField("error", err).Error("Create pin")
-                       }
-               }
-
-               accounts = account.NewManager(accountsDB, chain, pinStore)
-               go accounts.ProcessBlocks(ctx)
-
-               assetsDB := dbm.NewDB("asset", config.DBBackend, config.DBDir())
-               assets = asset.NewRegistry(assetsDB, chain)
+       if err = txFeed.Prepare(ctx); err != nil {
+               log.WithField("error", err).Error("start txfeed")
+               return nil
        }
-       //Todo HSM
-       /*
-               if config.HsmUrl != ""{
-                       // todo remoteHSM
-                       cmn.Exit(cmn.Fmt("not implement"))
-               } else {
-                       hsm, err = pseudohsm.New(config.KeysDir())
-                       if err != nil {
-                               cmn.Exit(cmn.Fmt("initialize HSM failed: %v", err))
-                       }
-               }*/
 
        hsm, err := pseudohsm.New(config.KeysDir())
        if err != nil {
                cmn.Exit(cmn.Fmt("initialize HSM failed: %v", err))
        }
-       bcReactor := bc.NewBlockchainReactor(
-               store,
-               chain,
-               txPool,
-               accounts,
-               assets,
-               sw,
-               hsm,
-               fastSync,
-               pinStore)
-
-       sw.AddReactor("BLOCKCHAIN", bcReactor)
-
-       rpcInit(bcReactor, config)
-       // Optionally, start the pex reactor
-       var addrBook *p2p.AddrBook
-       if config.P2P.PexReactor {
-               addrBook = p2p.NewAddrBook(config.P2P.AddrBookFile(), config.P2P.AddrBookStrict)
-               pexReactor := p2p.NewPEXReactor(addrBook)
-               sw.AddReactor("PEX", pexReactor)
+
+       if !config.Wallet.Disable {
+               walletDB := dbm.NewDB("wallet", config.DBBackend, config.DBDir())
+               accounts = account.NewManager(walletDB, chain)
+               assets = asset.NewRegistry(walletDB, chain)
+               wallet, err = w.NewWallet(walletDB, accounts, assets, hsm, chain)
+               if err != nil {
+                       log.WithField("error", err).Error("init NewWallet")
+               }
+
+               // Clean up expired UTXO reservations periodically.
+               go accounts.ExpireReservations(ctx, expireReservationsPeriod)
        }
+       newBlockCh := make(chan *bc.Hash, maxNewBlockChSize)
 
-       // add the event switch to all services
-       // they should all satisfy events.Eventable
-       //SetEventSwitch(eventSwitch, bcReactor, mempoolReactor, consensusReactor)
+       syncManager, _ := netsync.NewSyncManager(config, chain, txPool, newBlockCh)
 
        // run the profile server
        profileHost := config.ProfListenAddress
        if profileHost != "" {
-
+               // Profiling bytomd programs.see (https://blog.golang.org/profiling-go-programs)
+               // go tool pprof http://profileHose/debug/pprof/heap
                go func() {
-                       log.WithField("error", http.ListenAndServe(profileHost, nil)).Error("Profile server")
+                       http.ListenAndServe(profileHost, nil)
                }()
        }
 
        node := &Node{
-               config: config,
+               config:       config,
+               syncManager:  syncManager,
+               evsw:         eventSwitch,
+               accessTokens: accessTokens,
+               wallet:       wallet,
+               chain:        chain,
+               txfeed:       txFeed,
+               miningEnable: config.Mining,
+       }
 
-               privKey:  privKey,
-               sw:       sw,
-               addrBook: addrBook,
+       node.cpuMiner = cpuminer.NewCPUMiner(chain, accounts, txPool, newBlockCh)
+       node.miningPool = miningpool.NewMiningPool(chain, accounts, txPool, newBlockCh)
 
-               evsw:       eventSwitch,
-               bcReactor:  bcReactor,
-               blockStore: store,
-               accounts:   accounts,
-               assets:     assets,
-       }
        node.BaseService = *cmn.NewBaseService(nil, "Node", node)
+
        return node
 }
 
-func (n *Node) OnStart() error {
-       // Create & add listener
-       protocol, address := ProtocolAndAddress(n.config.P2P.ListenAddress)
-       l := p2p.NewDefaultListener(protocol, address, n.config.P2P.SkipUPNP, nil)
-       n.sw.AddListener(l)
-
-       // Start the switch
-       n.sw.SetNodeInfo(n.makeNodeInfo())
-       n.sw.SetNodePrivKey(n.privKey)
-       _, err := n.sw.Start()
-       if err != nil {
-               return err
+func initActiveNetParams(config *cfg.Config) {
+       var exist bool
+       consensus.ActiveNetParams, exist = consensus.NetParams[config.ChainID]
+       if !exist {
+               cmn.Exit(cmn.Fmt("chain_id[%v] don't exist", config.ChainID))
        }
+}
 
-       // If seeds exist, add them to the address book and dial out
-       if n.config.P2P.Seeds != "" {
-               // dial out
-               seeds := strings.Split(n.config.P2P.Seeds, ",")
-               if err := n.DialSeeds(seeds); err != nil {
-                       return err
-               }
+// Lanch web broser or not
+func lanchWebBroser() {
+       log.Info("Launching System Browser with :", webAddress)
+       if err := browser.Open(webAddress); err != nil {
+               log.Error(err.Error())
+               return
        }
+}
+
+func (n *Node) initAndstartApiServer() {
+       n.api = api.NewAPI(n.syncManager, n.wallet, n.txfeed, n.cpuMiner, n.miningPool, n.chain, n.config, n.accessTokens)
+
+       listenAddr := env.String("LISTEN", n.config.ApiAddress)
+       env.Parse()
+       n.api.StartServer(*listenAddr)
+}
+
+func (n *Node) OnStart() error {
+       if n.miningEnable {
+               n.cpuMiner.Start()
+       }
+       n.syncManager.Start()
+       n.initAndstartApiServer()
+       if !n.config.Web.Closed {
+               lanchWebBroser()
+       }
+
        return nil
 }
 
 func (n *Node) OnStop() {
        n.BaseService.OnStop()
-
+       if n.miningEnable {
+               n.cpuMiner.Stop()
+       }
+       n.syncManager.Stop()
        log.Info("Stopping Node")
        // TODO: gracefully disconnect from peers.
-       n.sw.Stop()
-
 }
 
 func (n *Node) RunForever() {
@@ -326,75 +196,14 @@ func (n *Node) RunForever() {
        })
 }
 
-// Add the event switch to reactors, mempool, etc.
-func SetEventSwitch(evsw types.EventSwitch, eventables ...types.Eventable) {
-       for _, e := range eventables {
-               e.SetEventSwitch(evsw)
-       }
-}
-
-// Add a Listener to accept inbound peer connections.
-// Add listeners before starting the Node.
-// The first listener is the primary listener (in NodeInfo)
-func (n *Node) AddListener(l p2p.Listener) {
-       n.sw.AddListener(l)
-}
-
-func (n *Node) Switch() *p2p.Switch {
-       return n.sw
-}
-
 func (n *Node) EventSwitch() types.EventSwitch {
        return n.evsw
 }
 
-func (n *Node) makeNodeInfo() *p2p.NodeInfo {
-       nodeInfo := &p2p.NodeInfo{
-               PubKey:  n.privKey.PubKey().Unwrap().(crypto.PubKeyEd25519),
-               Moniker: n.config.Moniker,
-               Network: "bytom",
-               Version: version.Version,
-               Other: []string{
-                       cmn.Fmt("wire_version=%v", wire.Version),
-                       cmn.Fmt("p2p_version=%v", p2p.Version),
-               },
-       }
-
-       if !n.sw.IsListening() {
-               return nodeInfo
-       }
-
-       p2pListener := n.sw.Listeners()[0]
-       p2pHost := p2pListener.ExternalAddress().IP.String()
-       p2pPort := p2pListener.ExternalAddress().Port
-       //rpcListenAddr := n.config.RPC.ListenAddress
-
-       // We assume that the rpcListener has the same ExternalAddress.
-       // This is probably true because both P2P and RPC listeners use UPnP,
-       // except of course if the rpc is only bound to localhost
-       nodeInfo.ListenAddr = cmn.Fmt("%v:%v", p2pHost, p2pPort)
-       //nodeInfo.Other = append(nodeInfo.Other, cmn.Fmt("rpc_addr=%v", rpcListenAddr))
-       return nodeInfo
-}
-
-//------------------------------------------------------------------------------
-
-func (n *Node) NodeInfo() *p2p.NodeInfo {
-       return n.sw.NodeInfo()
+func (n *Node) SyncManager() *netsync.SyncManager {
+       return n.syncManager
 }
 
-func (n *Node) DialSeeds(seeds []string) error {
-       return n.sw.DialSeeds(n.addrBook, seeds)
+func (n *Node) MiningPool() *miningpool.MiningPool {
+       return n.miningPool
 }
-
-// Defaults to tcp
-func ProtocolAndAddress(listenAddr string) (string, string) {
-       protocol, address := "tcp", listenAddr
-       parts := strings.SplitN(address, "://", 2)
-       if len(parts) == 2 {
-               protocol, address = parts[0], parts[1]
-       }
-       return protocol, address
-}
-
-//------------------------------------------------------------------------------