OSDN Git Service

Mempool: add no btm input tx filter (#1605)
[bytom/bytom.git] / netsync / handle.go
index ac1619b..643ae0a 100644 (file)
 package netsync
 
 import (
-       "strings"
+       "errors"
+       "reflect"
 
        log "github.com/sirupsen/logrus"
-       "github.com/tendermint/go-crypto"
-       "github.com/tendermint/go-wire"
-       cmn "github.com/tendermint/tmlibs/common"
-       dbm "github.com/tendermint/tmlibs/db"
 
        cfg "github.com/bytom/config"
+       "github.com/bytom/consensus"
+       "github.com/bytom/event"
        "github.com/bytom/p2p"
-       "github.com/bytom/p2p/pex"
        core "github.com/bytom/protocol"
        "github.com/bytom/protocol/bc"
-       "github.com/bytom/version"
+       "github.com/bytom/protocol/bc/types"
+       "github.com/tendermint/go-crypto"
+)
+
+const (
+       logModule             = "netsync"
+       maxTxChanSize         = 10000
+       maxFilterAddressSize  = 50
+       maxFilterAddressCount = 1000
+)
+
+var (
+       errVaultModeDialPeer = errors.New("can't dial peer in vault mode")
 )
 
+// Chain is the interface for Bytom core
+type Chain interface {
+       BestBlockHeader() *types.BlockHeader
+       BestBlockHeight() uint64
+       CalcNextSeed(*bc.Hash) (*bc.Hash, error)
+       GetBlockByHash(*bc.Hash) (*types.Block, error)
+       GetBlockByHeight(uint64) (*types.Block, error)
+       GetHeaderByHash(*bc.Hash) (*types.BlockHeader, error)
+       GetHeaderByHeight(uint64) (*types.BlockHeader, error)
+       GetTransactionStatus(*bc.Hash) (*bc.TransactionStatus, error)
+       InMainChain(bc.Hash) bool
+       ProcessBlock(*types.Block) (bool, error)
+       ValidateTx(*types.Tx) (bool, error)
+}
+
+type Switch interface {
+       AddReactor(name string, reactor p2p.Reactor) p2p.Reactor
+       AddBannedPeer(string) error
+       StopPeerGracefully(string)
+       NodeInfo() *p2p.NodeInfo
+       Start() (bool, error)
+       Stop() bool
+       IsListening() bool
+       DialPeerWithAddress(addr *p2p.NetAddress) error
+       Peers() *p2p.PeerSet
+}
+
 //SyncManager Sync Manager is responsible for the business layer information synchronization
 type SyncManager struct {
-       networkID uint64
-       sw        *p2p.Switch
+       sw           Switch
+       genesisHash  bc.Hash
+       chain        Chain
+       txPool       *core.TxPool
+       blockFetcher *blockFetcher
+       blockKeeper  *blockKeeper
+       peers        *peerSet
 
-       privKey     crypto.PrivKeyEd25519 // local node's p2p key
-       chain       *core.Chain
-       txPool      *core.TxPool
-       fetcher     *Fetcher
-       blockKeeper *blockKeeper
-       peers       *peerSet
+       txSyncCh chan *txSyncMsg
+       quitSync chan struct{}
+       config   *cfg.Config
 
-       newBlockCh    chan *bc.Hash
-       newPeerCh     chan struct{}
-       txSyncCh      chan *txsync
-       dropPeerCh    chan *string
-       quitSync      chan struct{}
-       config        *cfg.Config
-       synchronising int32
+       eventDispatcher *event.Dispatcher
+       minedBlockSub   *event.Subscription
+       txMsgSub        *event.Subscription
+}
+
+// CreateSyncManager create sync manager and set switch.
+func NewSyncManager(config *cfg.Config, chain Chain, txPool *core.TxPool, dispatcher *event.Dispatcher) (*SyncManager, error) {
+       sw, err := p2p.NewSwitch(config)
+       if err != nil {
+               return nil, err
+       }
+
+       return newSyncManager(config, sw, chain, txPool, dispatcher)
 }
 
 //NewSyncManager create a sync manager
-func NewSyncManager(config *cfg.Config, chain *core.Chain, txPool *core.TxPool, newBlockCh chan *bc.Hash) (*SyncManager, error) {
-       // Create the protocol manager with the base fields
+func newSyncManager(config *cfg.Config, sw Switch, chain Chain, txPool *core.TxPool, dispatcher *event.Dispatcher) (*SyncManager, error) {
+       genesisHeader, err := chain.GetHeaderByHeight(0)
+       if err != nil {
+               return nil, err
+       }
+       peers := newPeerSet(sw)
        manager := &SyncManager{
-               txPool:     txPool,
-               chain:      chain,
-               privKey:    crypto.GenPrivKeyEd25519(),
-               config:     config,
-               quitSync:   make(chan struct{}),
-               newBlockCh: newBlockCh,
-               newPeerCh:  make(chan struct{}),
-               txSyncCh:   make(chan *txsync),
-               dropPeerCh: make(chan *string, maxQuitReq),
-               peers:      newPeerSet(),
-       }
-
-       trustHistoryDB := dbm.NewDB("trusthistory", config.DBBackend, config.DBDir())
-       addrBook := pex.NewAddrBook(config.P2P.AddrBookFile(), config.P2P.AddrBookStrict)
-       manager.sw = p2p.NewSwitch(config.P2P, addrBook, trustHistoryDB)
-
-       pexReactor := pex.NewPEXReactor(addrBook)
-       manager.sw.AddReactor("PEX", pexReactor)
-
-       manager.blockKeeper = newBlockKeeper(manager.chain, manager.sw, manager.peers, manager.dropPeerCh)
-       manager.fetcher = NewFetcher(chain, manager.sw, manager.peers)
-       protocolReactor := NewProtocolReactor(chain, txPool, manager.sw, manager.blockKeeper, manager.fetcher, manager.peers, manager.newPeerCh, manager.txSyncCh, manager.dropPeerCh)
-       manager.sw.AddReactor("PROTOCOL", protocolReactor)
-
-       // Create & add listener
-       var listenerStatus bool
-       var l p2p.Listener
-       if !config.VaultMode {
-               p, address := protocolAndAddress(manager.config.P2P.ListenAddress)
-               l, listenerStatus = p2p.NewDefaultListener(p, address, manager.config.P2P.SkipUPNP)
-               manager.sw.AddListener(l)
+               sw:              sw,
+               genesisHash:     genesisHeader.Hash(),
+               txPool:          txPool,
+               chain:           chain,
+               blockFetcher:    newBlockFetcher(chain, peers),
+               blockKeeper:     newBlockKeeper(chain, peers),
+               peers:           peers,
+               txSyncCh:        make(chan *txSyncMsg),
+               quitSync:        make(chan struct{}),
+               config:          config,
+               eventDispatcher: dispatcher,
        }
-       manager.sw.SetNodeInfo(manager.makeNodeInfo(listenerStatus))
-       manager.sw.SetNodePrivKey(manager.privKey)
 
+       if !config.VaultMode {
+               protocolReactor := NewProtocolReactor(manager, peers)
+               manager.sw.AddReactor("PROTOCOL", protocolReactor)
+       }
        return manager, nil
 }
 
-// Defaults to tcp
-func protocolAndAddress(listenAddr string) (string, string) {
-       p, address := "tcp", listenAddr
-       parts := strings.SplitN(address, "://", 2)
-       if len(parts) == 2 {
-               p, address = parts[0], parts[1]
+//BestPeer return the highest p2p peerInfo
+func (sm *SyncManager) BestPeer() *PeerInfo {
+       bestPeer := sm.peers.bestPeer(consensus.SFFullNode)
+       if bestPeer != nil {
+               return bestPeer.getPeerInfo()
+       }
+       return nil
+}
+
+func (sm *SyncManager) DialPeerWithAddress(addr *p2p.NetAddress) error {
+       if sm.config.VaultMode {
+               return errVaultModeDialPeer
+       }
+
+       return sm.sw.DialPeerWithAddress(addr)
+}
+
+func (sm *SyncManager) GetNetwork() string {
+       return sm.config.ChainID
+}
+
+//GetPeerInfos return peer info of all peers
+func (sm *SyncManager) GetPeerInfos() []*PeerInfo {
+       return sm.peers.getPeerInfos()
+}
+
+//IsCaughtUp check wheather the peer finish the sync
+func (sm *SyncManager) IsCaughtUp() bool {
+       peer := sm.peers.bestPeer(consensus.SFFullNode)
+       return peer == nil || peer.Height() <= sm.chain.BestBlockHeight()
+}
+
+//StopPeer try to stop peer by given ID
+func (sm *SyncManager) StopPeer(peerID string) error {
+       if peer := sm.peers.getPeer(peerID); peer == nil {
+               return errors.New("peerId not exist")
+       }
+       sm.peers.removePeer(peerID)
+       return nil
+}
+
+func (sm *SyncManager) handleBlockMsg(peer *peer, msg *BlockMessage) {
+       block, err := msg.GetBlock()
+       if err != nil {
+               return
+       }
+       sm.blockKeeper.processBlock(peer.ID(), block)
+}
+
+func (sm *SyncManager) handleBlocksMsg(peer *peer, msg *BlocksMessage) {
+       blocks, err := msg.GetBlocks()
+       if err != nil {
+               log.WithFields(log.Fields{"module": logModule, "err": err}).Debug("fail on handleBlocksMsg GetBlocks")
+               return
+       }
+
+       sm.blockKeeper.processBlocks(peer.ID(), blocks)
+}
+
+func (sm *SyncManager) handleFilterAddMsg(peer *peer, msg *FilterAddMessage) {
+       peer.addFilterAddress(msg.Address)
+}
+
+func (sm *SyncManager) handleFilterClearMsg(peer *peer) {
+       peer.filterAdds.Clear()
+}
+
+func (sm *SyncManager) handleFilterLoadMsg(peer *peer, msg *FilterLoadMessage) {
+       peer.addFilterAddresses(msg.Addresses)
+}
+
+func (sm *SyncManager) handleGetBlockMsg(peer *peer, msg *GetBlockMessage) {
+       var block *types.Block
+       var err error
+       if msg.Height != 0 {
+               block, err = sm.chain.GetBlockByHeight(msg.Height)
+       } else {
+               block, err = sm.chain.GetBlockByHash(msg.GetHash())
+       }
+       if err != nil {
+               log.WithFields(log.Fields{"module": logModule, "err": err}).Warning("fail on handleGetBlockMsg get block from chain")
+               return
+       }
+
+       ok, err := peer.sendBlock(block)
+       if !ok {
+               sm.peers.removePeer(peer.ID())
+       }
+       if err != nil {
+               log.WithFields(log.Fields{"module": logModule, "err": err}).Error("fail on handleGetBlockMsg sentBlock")
        }
-       return p, address
 }
 
-func (sm *SyncManager) makeNodeInfo(listenerStatus bool) *p2p.NodeInfo {
-       nodeInfo := &p2p.NodeInfo{
-               PubKey:  sm.privKey.PubKey().Unwrap().(crypto.PubKeyEd25519),
-               Moniker: sm.config.Moniker,
-               Network: sm.config.ChainID,
-               Version: version.Version,
-               Other: []string{
-                       cmn.Fmt("wire_version=%v", wire.Version),
-                       cmn.Fmt("p2p_version=%v", p2p.Version),
-               },
+func (sm *SyncManager) handleGetBlocksMsg(peer *peer, msg *GetBlocksMessage) {
+       blocks, err := sm.blockKeeper.locateBlocks(msg.GetBlockLocator(), msg.GetStopHash())
+       if err != nil || len(blocks) == 0 {
+               return
        }
 
-       if !sm.sw.IsListening() {
-               return nodeInfo
+       totalSize := 0
+       sendBlocks := []*types.Block{}
+       for _, block := range blocks {
+               rawData, err := block.MarshalText()
+               if err != nil {
+                       log.WithFields(log.Fields{"module": logModule, "err": err}).Error("fail on handleGetBlocksMsg marshal block")
+                       continue
+               }
+
+               if totalSize+len(rawData) > maxBlockchainResponseSize/2 {
+                       break
+               }
+               totalSize += len(rawData)
+               sendBlocks = append(sendBlocks, block)
        }
 
-       p2pListener := sm.sw.Listeners()[0]
+       ok, err := peer.sendBlocks(sendBlocks)
+       if !ok {
+               sm.peers.removePeer(peer.ID())
+       }
+       if err != nil {
+               log.WithFields(log.Fields{"module": logModule, "err": err}).Error("fail on handleGetBlocksMsg sentBlock")
+       }
+}
 
-       // We assume that the rpcListener has the same ExternalAddress.
-       // This is probably true because both P2P and RPC listeners use UPnP,
-       // except of course if the rpc is only bound to localhost
-       if listenerStatus {
-               nodeInfo.ListenAddr = cmn.Fmt("%v:%v", p2pListener.ExternalAddress().IP.String(), p2pListener.ExternalAddress().Port)
+func (sm *SyncManager) handleGetHeadersMsg(peer *peer, msg *GetHeadersMessage) {
+       headers, err := sm.blockKeeper.locateHeaders(msg.GetBlockLocator(), msg.GetStopHash())
+       if err != nil || len(headers) == 0 {
+               log.WithFields(log.Fields{"module": logModule, "err": err}).Debug("fail on handleGetHeadersMsg locateHeaders")
+               return
+       }
+
+       ok, err := peer.sendHeaders(headers)
+       if !ok {
+               sm.peers.removePeer(peer.ID())
+       }
+       if err != nil {
+               log.WithFields(log.Fields{"module": logModule, "err": err}).Error("fail on handleGetHeadersMsg sentBlock")
+       }
+}
+
+func (sm *SyncManager) handleGetMerkleBlockMsg(peer *peer, msg *GetMerkleBlockMessage) {
+       var err error
+       var block *types.Block
+       if msg.Height != 0 {
+               block, err = sm.chain.GetBlockByHeight(msg.Height)
        } else {
-               nodeInfo.ListenAddr = cmn.Fmt("%v:%v", p2pListener.InternalAddress().IP.String(), p2pListener.InternalAddress().Port)
+               block, err = sm.chain.GetBlockByHash(msg.GetHash())
+       }
+       if err != nil {
+               log.WithFields(log.Fields{"module": logModule, "err": err}).Warning("fail on handleGetMerkleBlockMsg get block from chain")
+               return
+       }
+
+       blockHash := block.Hash()
+       txStatus, err := sm.chain.GetTransactionStatus(&blockHash)
+       if err != nil {
+               log.WithFields(log.Fields{"module": logModule, "err": err}).Warning("fail on handleGetMerkleBlockMsg get transaction status")
+               return
+       }
+
+       ok, err := peer.sendMerkleBlock(block, txStatus)
+       if err != nil {
+               log.WithFields(log.Fields{"module": logModule, "err": err}).Error("fail on handleGetMerkleBlockMsg sentMerkleBlock")
+               return
+       }
+
+       if !ok {
+               sm.peers.removePeer(peer.ID())
+       }
+}
+
+func (sm *SyncManager) handleHeadersMsg(peer *peer, msg *HeadersMessage) {
+       headers, err := msg.GetHeaders()
+       if err != nil {
+               log.WithFields(log.Fields{"module": logModule, "err": err}).Debug("fail on handleHeadersMsg GetHeaders")
+               return
+       }
+
+       sm.blockKeeper.processHeaders(peer.ID(), headers)
+}
+
+func (sm *SyncManager) handleMineBlockMsg(peer *peer, msg *MineBlockMessage) {
+       block, err := msg.GetMineBlock()
+       if err != nil {
+               log.WithFields(log.Fields{"module": logModule, "err": err}).Warning("fail on handleMineBlockMsg GetMineBlock")
+               return
        }
-       return nodeInfo
+
+       hash := block.Hash()
+       peer.markBlock(&hash)
+       sm.blockFetcher.processNewBlock(&blockMsg{peerID: peer.ID(), block: block})
+       peer.setStatus(block.Height, &hash)
+}
+
+func (sm *SyncManager) handleStatusRequestMsg(peer BasePeer) {
+       bestHeader := sm.chain.BestBlockHeader()
+       genesisBlock, err := sm.chain.GetBlockByHeight(0)
+       if err != nil {
+               log.WithFields(log.Fields{"module": logModule, "err": err}).Error("fail on handleStatusRequestMsg get genesis")
+       }
+
+       genesisHash := genesisBlock.Hash()
+       msg := NewStatusResponseMessage(bestHeader, &genesisHash)
+       if ok := peer.TrySend(BlockchainChannel, struct{ BlockchainMessage }{msg}); !ok {
+               sm.peers.removePeer(peer.ID())
+       }
+}
+
+func (sm *SyncManager) handleStatusResponseMsg(basePeer BasePeer, msg *StatusResponseMessage) {
+       if peer := sm.peers.getPeer(basePeer.ID()); peer != nil {
+               peer.setStatus(msg.Height, msg.GetHash())
+               return
+       }
+
+       if genesisHash := msg.GetGenesisHash(); sm.genesisHash != *genesisHash {
+               log.WithFields(log.Fields{"module": logModule, "remote genesis": genesisHash.String(), "local genesis": sm.genesisHash.String()}).Warn("fail hand shake due to differnt genesis")
+               return
+       }
+
+       sm.peers.addPeer(basePeer, msg.Height, msg.GetHash())
+}
+
+func (sm *SyncManager) handleTransactionMsg(peer *peer, msg *TransactionMessage) {
+       tx, err := msg.GetTransaction()
+       if err != nil {
+               sm.peers.addBanScore(peer.ID(), 0, 10, "fail on get tx from message")
+               return
+       }
+
+       if isOrphan, err := sm.chain.ValidateTx(tx); err != nil && err != core.ErrDustTx && !isOrphan {
+               sm.peers.addBanScore(peer.ID(), 10, 0, "fail on validate tx transaction")
+       }
+}
+
+func (sm *SyncManager) IsListening() bool {
+       if sm.config.VaultMode {
+               return false
+       }
+       return sm.sw.IsListening()
 }
 
-func (sm *SyncManager) netStart() error {
-       _, err := sm.sw.Start()
-       return err
+func (sm *SyncManager) NodeInfo() *p2p.NodeInfo {
+       if sm.config.VaultMode {
+               return p2p.NewNodeInfo(sm.config, crypto.PubKeyEd25519{}, "")
+       }
+       return sm.sw.NodeInfo()
+}
+
+func (sm *SyncManager) PeerCount() int {
+       if sm.config.VaultMode {
+               return 0
+       }
+       return len(sm.sw.Peers().List())
+}
+
+func (sm *SyncManager) processMsg(basePeer BasePeer, msgType byte, msg BlockchainMessage) {
+       peer := sm.peers.getPeer(basePeer.ID())
+       if peer == nil && msgType != StatusResponseByte && msgType != StatusRequestByte {
+               return
+       }
+
+       log.WithFields(log.Fields{
+               "module":  logModule,
+               "peer":    basePeer.Addr(),
+               "type":    reflect.TypeOf(msg),
+               "message": msg.String(),
+       }).Info("receive message from peer")
+
+       switch msg := msg.(type) {
+       case *GetBlockMessage:
+               sm.handleGetBlockMsg(peer, msg)
+
+       case *BlockMessage:
+               sm.handleBlockMsg(peer, msg)
+
+       case *StatusRequestMessage:
+               sm.handleStatusRequestMsg(basePeer)
+
+       case *StatusResponseMessage:
+               sm.handleStatusResponseMsg(basePeer, msg)
+
+       case *TransactionMessage:
+               sm.handleTransactionMsg(peer, msg)
+
+       case *MineBlockMessage:
+               sm.handleMineBlockMsg(peer, msg)
+
+       case *GetHeadersMessage:
+               sm.handleGetHeadersMsg(peer, msg)
+
+       case *HeadersMessage:
+               sm.handleHeadersMsg(peer, msg)
+
+       case *GetBlocksMessage:
+               sm.handleGetBlocksMsg(peer, msg)
+
+       case *BlocksMessage:
+               sm.handleBlocksMsg(peer, msg)
+
+       case *FilterLoadMessage:
+               sm.handleFilterLoadMsg(peer, msg)
+
+       case *FilterAddMessage:
+               sm.handleFilterAddMsg(peer, msg)
+
+       case *FilterClearMessage:
+               sm.handleFilterClearMsg(peer)
+
+       case *GetMerkleBlockMessage:
+               sm.handleGetMerkleBlockMsg(peer, msg)
+
+       default:
+               log.WithFields(log.Fields{
+                       "module":       logModule,
+                       "peer":         basePeer.Addr(),
+                       "message_type": reflect.TypeOf(msg),
+               }).Error("unhandled message type")
+       }
 }
 
-//Start start sync manager service
-func (sm *SyncManager) Start() {
-       go sm.netStart()
+func (sm *SyncManager) Start() error {
+       var err error
+       if _, err = sm.sw.Start(); err != nil {
+               log.Error("switch start err")
+               return err
+       }
+
+       sm.minedBlockSub, err = sm.eventDispatcher.Subscribe(event.NewMinedBlockEvent{})
+       if err != nil {
+               return err
+       }
+
+       sm.txMsgSub, err = sm.eventDispatcher.Subscribe(core.TxMsgEvent{})
+       if err != nil {
+               return err
+       }
+
        // broadcast transactions
        go sm.txBroadcastLoop()
-
-       // broadcast mined blocks
        go sm.minedBroadcastLoop()
+       go sm.txSyncLoop()
 
-       // start sync handlers
-       go sm.syncer()
-
-       go sm.txsyncLoop()
+       return nil
 }
 
 //Stop stop sync manager
 func (sm *SyncManager) Stop() {
        close(sm.quitSync)
-       sm.sw.Stop()
-}
-
-func (sm *SyncManager) txBroadcastLoop() {
-       newTxCh := sm.txPool.GetNewTxCh()
-       for {
-               select {
-               case newTx := <-newTxCh:
-                       peers, err := sm.peers.BroadcastTx(newTx)
-                       if err != nil {
-                               log.Errorf("Broadcast new tx error. %v", err)
-                               return
-                       }
-                       for _, smPeer := range peers {
-                               if smPeer == nil {
-                                       continue
-                               }
-                               swPeer := smPeer.getPeer()
-                               log.Info("Tx broadcast error. Stop Peer.")
-                               sm.sw.StopPeerGracefully(swPeer)
-                       }
-               case <-sm.quitSync:
-                       return
-               }
+       sm.minedBlockSub.Unsubscribe()
+       if !sm.config.VaultMode {
+               sm.sw.Stop()
        }
 }
 
 func (sm *SyncManager) minedBroadcastLoop() {
        for {
                select {
-               case blockHash := <-sm.newBlockCh:
-                       block, err := sm.chain.GetBlockByHash(blockHash)
-                       if err != nil {
-                               log.Errorf("Failed on mined broadcast loop get block %v", err)
+               case obj, ok := <-sm.minedBlockSub.Chan():
+                       if !ok {
+                               log.WithFields(log.Fields{"module": logModule}).Warning("mined block subscription channel closed")
                                return
                        }
-                       peers, err := sm.peers.BroadcastMinedBlock(block)
-                       if err != nil {
-                               log.Errorf("Broadcast mine block error. %v", err)
-                               return
+
+                       ev, ok := obj.Data.(event.NewMinedBlockEvent)
+                       if !ok {
+                               log.WithFields(log.Fields{"module": logModule}).Error("event type error")
+                               continue
                        }
-                       for _, smPeer := range peers {
-                               if smPeer == nil {
-                                       continue
-                               }
-                               swPeer := smPeer.getPeer()
-                               log.Info("New mined block broadcast error. Stop Peer.")
-                               sm.sw.StopPeerGracefully(swPeer)
+
+                       if err := sm.peers.broadcastMinedBlock(ev.Block); err != nil {
+                               log.WithFields(log.Fields{"module": logModule, "err": err}).Error("fail on broadcast mine block")
+                               continue
                        }
+
                case <-sm.quitSync:
                        return
                }
        }
 }
-
-//NodeInfo get P2P peer node info
-func (sm *SyncManager) NodeInfo() *p2p.NodeInfo {
-       return sm.sw.NodeInfo()
-}
-
-//BlockKeeper get block keeper
-func (sm *SyncManager) BlockKeeper() *blockKeeper {
-       return sm.blockKeeper
-}
-
-//Peers get sync manager peer set
-func (sm *SyncManager) Peers() *peerSet {
-       return sm.peers
-}
-
-//Switch get sync manager switch
-func (sm *SyncManager) Switch() *p2p.Switch {
-       return sm.sw
-}