OSDN Git Service

Merge branch 'dev' into dev-verify
[bytom/bytom.git] / node / node.go
old mode 100755 (executable)
new mode 100644 (file)
index 3e73fb7..5f82e93
@@ -2,45 +2,38 @@ package node
 
 import (
        "context"
-       "crypto/tls"
-       "net"
        "net/http"
        _ "net/http/pprof"
-       "strings"
-       "sync"
        "time"
 
-       "github.com/kr/secureheader"
        log "github.com/sirupsen/logrus"
-       "github.com/tendermint/go-crypto"
-       "github.com/tendermint/go-wire"
        cmn "github.com/tendermint/tmlibs/common"
        dbm "github.com/tendermint/tmlibs/db"
+       browser "github.com/toqueteos/webbrowser"
 
-       "github.com/bytom/crypto/ed25519/chainkd"
-       bc "github.com/bytom/blockchain"
-       "github.com/bytom/blockchain/accesstoken"
-       "github.com/bytom/blockchain/account"
-       "github.com/bytom/blockchain/asset"
+       "github.com/bytom/accesstoken"
+       "github.com/bytom/account"
+       "github.com/bytom/api"
+       "github.com/bytom/asset"
        "github.com/bytom/blockchain/pseudohsm"
-       "github.com/bytom/blockchain/txdb"
        "github.com/bytom/blockchain/txfeed"
-       w "github.com/bytom/blockchain/wallet"
        cfg "github.com/bytom/config"
+       "github.com/bytom/consensus"
+       "github.com/bytom/database/leveldb"
        "github.com/bytom/env"
-       "github.com/bytom/errors"
-       "github.com/bytom/p2p"
+       "github.com/bytom/mining/cpuminer"
+       "github.com/bytom/mining/miningpool"
+       "github.com/bytom/netsync"
        "github.com/bytom/protocol"
+       "github.com/bytom/protocol/bc"
        "github.com/bytom/types"
-       "github.com/bytom/util/browser"
-       "github.com/bytom/version"
+       w "github.com/bytom/wallet"
 )
 
 const (
-       httpReadTimeout          = 2 * time.Minute
-       httpWriteTimeout         = time.Hour
        webAddress               = "http://127.0.0.1:9888"
        expireReservationsPeriod = time.Second
+       maxNewBlockChSize        = 1024
 )
 
 type Node struct {
@@ -49,132 +42,42 @@ type Node struct {
        // config
        config *cfg.Config
 
-       // network
-       privKey  crypto.PrivKeyEd25519 // local node's p2p key
-       sw       *p2p.Switch           // p2p connections
-       addrBook *p2p.AddrBook         // known peers
+       syncManager *netsync.SyncManager
 
-       evsw       types.EventSwitch // pub/sub for services
-       bcReactor  *bc.BlockchainReactor
-}
-
-func NewNodeDefault(config *cfg.Config) *Node {
-       return NewNode(config)
-}
-
-func RedirectHandler(next http.Handler) http.Handler {
-       return http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
-               if req.URL.Path == "/" {
-                       http.Redirect(w, req, "/dashboard/", http.StatusFound)
-                       return
-               }
-               next.ServeHTTP(w, req)
-       })
-}
-
-type waitHandler struct {
-       h  http.Handler
-       wg sync.WaitGroup
-}
-
-func (wh *waitHandler) Set(h http.Handler) {
-       wh.h = h
-       wh.wg.Done()
-}
-
-func (wh *waitHandler) ServeHTTP(w http.ResponseWriter, req *http.Request) {
-       wh.wg.Wait()
-       wh.h.ServeHTTP(w, req)
-}
-
-func rpcInit(h *bc.BlockchainReactor, config *cfg.Config, accessTokens *accesstoken.CredentialStore) {
-       // The waitHandler accepts incoming requests, but blocks until its underlying
-       // handler is set, when the second phase is complete.
-       var coreHandler waitHandler
-       coreHandler.wg.Add(1)
-       mux := http.NewServeMux()
-       mux.Handle("/", &coreHandler)
-
-       var handler http.Handler = mux
-
-       if config.Auth.Disable == false {
-               handler = bc.AuthHandler(handler, accessTokens)
-       }
-       handler = RedirectHandler(handler)
-
-       secureheader.DefaultConfig.PermitClearLoopback = true
-       secureheader.DefaultConfig.HTTPSRedirect = false
-       secureheader.DefaultConfig.Next = handler
-
-       server := &http.Server{
-               // Note: we should not set TLSConfig here;
-               // we took care of TLS with the listener in maybeUseTLS.
-               Handler:      secureheader.DefaultConfig,
-               ReadTimeout:  httpReadTimeout,
-               WriteTimeout: httpWriteTimeout,
-               // Disable HTTP/2 for now until the Go implementation is more stable.
-               // https://github.com/golang/go/issues/16450
-               // https://github.com/golang/go/issues/17071
-               TLSNextProto: map[string]func(*http.Server, *tls.Conn, http.Handler){},
-       }
-       listenAddr := env.String("LISTEN", config.ApiAddress)
-       log.WithField("api address:", config.ApiAddress).Info("Rpc listen")
-       listener, err := net.Listen("tcp", *listenAddr)
-       if err != nil {
-               cmn.Exit(cmn.Fmt("Failed to register tcp port: %v", err))
-       }
-
-       // The `Serve` call has to happen in its own goroutine because
-       // it's blocking and we need to proceed to the rest of the core setup after
-       // we call it.
-       go func() {
-               if err := server.Serve(listener); err != nil {
-                       log.WithField("error", errors.Wrap(err, "Serve")).Error("Rpc server")
-               }
-       }()
-       coreHandler.Set(h)
+       evsw types.EventSwitch // pub/sub for services
+       //bcReactor    *bc.BlockchainReactor
+       wallet       *w.Wallet
+       accessTokens *accesstoken.CredentialStore
+       api          *api.API
+       chain        *protocol.Chain
+       txfeed       *txfeed.Tracker
+       cpuMiner     *cpuminer.CPUMiner
+       miningPool   *miningpool.MiningPool
+       miningEnable bool
 }
 
 func NewNode(config *cfg.Config) *Node {
        ctx := context.Background()
-
+       initActiveNetParams(config)
        // Get store
-       txDB := dbm.NewDB("txdb", config.DBBackend, config.DBDir())
-       store := txdb.NewStore(txDB)
+       coreDB := dbm.NewDB("core", config.DBBackend, config.DBDir())
+       store := leveldb.NewStore(coreDB)
 
        tokenDB := dbm.NewDB("accesstoken", config.DBBackend, config.DBDir())
        accessTokens := accesstoken.NewStore(tokenDB)
 
-       privKey := crypto.GenPrivKeyEd25519()
-
        // Make event switch
        eventSwitch := types.NewEventSwitch()
-       _, err := eventSwitch.Start()
-       if err != nil {
+       if _, err := eventSwitch.Start(); err != nil {
                cmn.Exit(cmn.Fmt("Failed to start switch: %v", err))
        }
 
-       trustHistoryDB := dbm.NewDB("trusthistory", config.DBBackend, config.DBDir())
-
-       sw := p2p.NewSwitch(config.P2P, trustHistoryDB)
-
-       genesisBlock := cfg.GenerateGenesisBlock()
-
        txPool := protocol.NewTxPool()
-       chain, err := protocol.NewChain(genesisBlock.Hash(), store, txPool)
+       chain, err := protocol.NewChain(store, txPool)
        if err != nil {
                cmn.Exit(cmn.Fmt("Failed to create chain structure: %v", err))
        }
 
-       if chain.BestBlockHash() == nil {
-               if err := chain.SaveBlock(genesisBlock); err != nil {
-                       cmn.Exit(cmn.Fmt("Failed to save genesisBlock to store: %v", err))
-               }
-               if err := chain.ConnectBlock(genesisBlock); err != nil {
-                       cmn.Exit(cmn.Fmt("Failed to connect genesisBlock to chain: %v", err))
-               }
-       }
-
        var accounts *account.Manager = nil
        var assets *asset.Registry = nil
        var wallet *w.Wallet = nil
@@ -202,26 +105,12 @@ func NewNode(config *cfg.Config) *Node {
                        log.WithField("error", err).Error("init NewWallet")
                }
 
-               if err := initOrRecoverAccount(hsm, wallet); err != nil {
-                       log.WithField("error", err).Error("initialize or recover account")
-               }
-
                // Clean up expired UTXO reservations periodically.
                go accounts.ExpireReservations(ctx, expireReservationsPeriod)
        }
+       newBlockCh := make(chan *bc.Hash, maxNewBlockChSize)
 
-       bcReactor := bc.NewBlockchainReactor(chain, txPool,sw, wallet, txFeed, accessTokens, config.Mining)
-
-       sw.AddReactor("BLOCKCHAIN", bcReactor)
-
-       rpcInit(bcReactor, config, accessTokens)
-       // Optionally, start the pex reactor
-       var addrBook *p2p.AddrBook
-       if config.P2P.PexReactor {
-               addrBook = p2p.NewAddrBook(config.P2P.AddrBookFile(), config.P2P.AddrBookStrict)
-               pexReactor := p2p.NewPEXReactor(addrBook)
-               sw.AddReactor("PEX", pexReactor)
-       }
+       syncManager, _ := netsync.NewSyncManager(config, chain, txPool, newBlockCh)
 
        // run the profile server
        profileHost := config.ProfListenAddress
@@ -234,94 +123,70 @@ func NewNode(config *cfg.Config) *Node {
        }
 
        node := &Node{
-               config: config,
+               config:       config,
+               syncManager:  syncManager,
+               evsw:         eventSwitch,
+               accessTokens: accessTokens,
+               wallet:       wallet,
+               chain:        chain,
+               txfeed:       txFeed,
+               miningEnable: config.Mining,
+       }
 
-               privKey:  privKey,
-               sw:       sw,
-               addrBook: addrBook,
+       node.cpuMiner = cpuminer.NewCPUMiner(chain, accounts, txPool, newBlockCh)
+       node.miningPool = miningpool.NewMiningPool(chain, accounts, txPool, newBlockCh)
 
-               evsw:       eventSwitch,
-               bcReactor:  bcReactor,
-       }
        node.BaseService = *cmn.NewBaseService(nil, "Node", node)
 
        return node
 }
 
-func initOrRecoverAccount(hsm *pseudohsm.HSM, wallet *w.Wallet) error {
-       xpubs := hsm.ListKeys()
-
-       if len(xpubs) == 0 {
-               xpub, err := hsm.XCreate("default", "123456")
-               if err != nil {
-                       return err
-               }
-
-               wallet.AccountMgr.Create(nil, []chainkd.XPub{xpub.XPub}, 1, "default", nil)
-               return nil
-       }
-
-       accounts, err := wallet.AccountMgr.ListAccounts("")
-       if err != nil {
-               return err
-       }
-
-       if len(accounts) > 0 {
-               return nil
+func initActiveNetParams(config *cfg.Config) {
+       var exist bool
+       consensus.ActiveNetParams, exist = consensus.NetParams[config.ChainID]
+       if !exist {
+               cmn.Exit(cmn.Fmt("chain_id[%v] don't exist", config.ChainID))
        }
-
-       for i, xPub := range xpubs {
-               if err := wallet.ImportAccountXpubKey(i, xPub, w.RecoveryIndex); err != nil {
-                       return err
-               }
-       }
-       return nil
 }
 
 // Lanch web broser or not
-func lanchWebBroser(lanch bool) {
-       if lanch {
-               log.Info("Launching System Browser with :", webAddress)
-               if err := browser.Open(webAddress); err != nil {
-                       log.Error(err.Error())
-                       return
-               }
+func lanchWebBroser() {
+       log.Info("Launching System Browser with :", webAddress)
+       if err := browser.Open(webAddress); err != nil {
+               log.Error(err.Error())
+               return
        }
 }
 
+func (n *Node) initAndstartApiServer() {
+       n.api = api.NewAPI(n.syncManager, n.wallet, n.txfeed, n.cpuMiner, n.miningPool, n.chain, n.config, n.accessTokens)
+
+       listenAddr := env.String("LISTEN", n.config.ApiAddress)
+       env.Parse()
+       n.api.StartServer(*listenAddr)
+}
+
 func (n *Node) OnStart() error {
-       // Create & add listener
-       p, address := ProtocolAndAddress(n.config.P2P.ListenAddress)
-       l := p2p.NewDefaultListener(p, address, n.config.P2P.SkipUPNP, nil)
-       n.sw.AddListener(l)
-
-       // Start the switch
-       n.sw.SetNodeInfo(n.makeNodeInfo())
-       n.sw.SetNodePrivKey(n.privKey)
-       _, err := n.sw.Start()
-       if err != nil {
-               return err
+       if n.miningEnable {
+               n.cpuMiner.Start()
        }
-
-       // If seeds exist, add them to the address book and dial out
-       if n.config.P2P.Seeds != "" {
-               // dial out
-               seeds := strings.Split(n.config.P2P.Seeds, ",")
-               if err := n.DialSeeds(seeds); err != nil {
-                       return err
-               }
+       n.syncManager.Start()
+       n.initAndstartApiServer()
+       if !n.config.Web.Closed {
+               lanchWebBroser()
        }
-       lanchWebBroser(!n.config.Web.Closed)
+
        return nil
 }
 
 func (n *Node) OnStop() {
        n.BaseService.OnStop()
-
+       if n.miningEnable {
+               n.cpuMiner.Stop()
+       }
+       n.syncManager.Stop()
        log.Info("Stopping Node")
        // TODO: gracefully disconnect from peers.
-       n.sw.Stop()
-
 }
 
 func (n *Node) RunForever() {
@@ -331,68 +196,14 @@ func (n *Node) RunForever() {
        })
 }
 
-// Add a Listener to accept inbound peer connections.
-// Add listeners before starting the Node.
-// The first listener is the primary listener (in NodeInfo)
-func (n *Node) AddListener(l p2p.Listener) {
-       n.sw.AddListener(l)
-}
-
-func (n *Node) Switch() *p2p.Switch {
-       return n.sw
-}
-
 func (n *Node) EventSwitch() types.EventSwitch {
        return n.evsw
 }
 
-func (n *Node) makeNodeInfo() *p2p.NodeInfo {
-       nodeInfo := &p2p.NodeInfo{
-               PubKey:  n.privKey.PubKey().Unwrap().(crypto.PubKeyEd25519),
-               Moniker: n.config.Moniker,
-               Network: "bytom",
-               Version: version.Version,
-               Other: []string{
-                       cmn.Fmt("wire_version=%v", wire.Version),
-                       cmn.Fmt("p2p_version=%v", p2p.Version),
-               },
-       }
-
-       if !n.sw.IsListening() {
-               return nodeInfo
-       }
-
-       p2pListener := n.sw.Listeners()[0]
-       p2pHost := p2pListener.ExternalAddress().IP.String()
-       p2pPort := p2pListener.ExternalAddress().Port
-       //rpcListenAddr := n.config.RPC.ListenAddress
-
-       // We assume that the rpcListener has the same ExternalAddress.
-       // This is probably true because both P2P and RPC listeners use UPnP,
-       // except of course if the rpc is only bound to localhost
-       nodeInfo.ListenAddr = cmn.Fmt("%v:%v", p2pHost, p2pPort)
-       //nodeInfo.Other = append(nodeInfo.Other, cmn.Fmt("rpc_addr=%v", rpcListenAddr))
-       return nodeInfo
-}
-
-//------------------------------------------------------------------------------
-
-func (n *Node) NodeInfo() *p2p.NodeInfo {
-       return n.sw.NodeInfo()
+func (n *Node) SyncManager() *netsync.SyncManager {
+       return n.syncManager
 }
 
-func (n *Node) DialSeeds(seeds []string) error {
-       return n.sw.DialSeeds(n.addrBook, seeds)
+func (n *Node) MiningPool() *miningpool.MiningPool {
+       return n.miningPool
 }
-
-// Defaults to tcp
-func ProtocolAndAddress(listenAddr string) (string, string) {
-       p, address := "tcp", listenAddr
-       parts := strings.SplitN(address, "://", 2)
-       if len(parts) == 2 {
-               p, address = parts[0], parts[1]
-       }
-       return p, address
-}
-
-//------------------------------------------------------------------------------