OSDN Git Service

Mempool: add no btm input tx filter (#1605)
[bytom/bytom.git] / netsync / handle.go
index eea401b..643ae0a 100644 (file)
@@ -1,34 +1,32 @@
 package netsync
 
 import (
-       "encoding/hex"
        "errors"
-       "net"
-       "path"
        "reflect"
-       "strconv"
-       "strings"
 
        log "github.com/sirupsen/logrus"
-       "github.com/tendermint/go-crypto"
-       cmn "github.com/tendermint/tmlibs/common"
 
        cfg "github.com/bytom/config"
        "github.com/bytom/consensus"
+       "github.com/bytom/event"
        "github.com/bytom/p2p"
-       "github.com/bytom/p2p/discover"
        core "github.com/bytom/protocol"
        "github.com/bytom/protocol/bc"
        "github.com/bytom/protocol/bc/types"
-       "github.com/bytom/version"
+       "github.com/tendermint/go-crypto"
 )
 
 const (
+       logModule             = "netsync"
        maxTxChanSize         = 10000
        maxFilterAddressSize  = 50
        maxFilterAddressCount = 1000
 )
 
+var (
+       errVaultModeDialPeer = errors.New("can't dial peer in vault mode")
+)
+
 // Chain is the interface for Bytom core
 type Chain interface {
        BestBlockHeader() *types.BlockHeader
@@ -44,69 +42,72 @@ type Chain interface {
        ValidateTx(*types.Tx) (bool, error)
 }
 
+type Switch interface {
+       AddReactor(name string, reactor p2p.Reactor) p2p.Reactor
+       AddBannedPeer(string) error
+       StopPeerGracefully(string)
+       NodeInfo() *p2p.NodeInfo
+       Start() (bool, error)
+       Stop() bool
+       IsListening() bool
+       DialPeerWithAddress(addr *p2p.NetAddress) error
+       Peers() *p2p.PeerSet
+}
+
 //SyncManager Sync Manager is responsible for the business layer information synchronization
 type SyncManager struct {
-       sw          *p2p.Switch
-       genesisHash bc.Hash
-
-       privKey      crypto.PrivKeyEd25519 // local node's p2p key
+       sw           Switch
+       genesisHash  bc.Hash
        chain        Chain
        txPool       *core.TxPool
        blockFetcher *blockFetcher
        blockKeeper  *blockKeeper
        peers        *peerSet
 
-       newTxCh    chan *types.Tx
-       newBlockCh chan *bc.Hash
-       txSyncCh   chan *txSyncMsg
-       quitSync   chan struct{}
-       config     *cfg.Config
+       txSyncCh chan *txSyncMsg
+       quitSync chan struct{}
+       config   *cfg.Config
+
+       eventDispatcher *event.Dispatcher
+       minedBlockSub   *event.Subscription
+       txMsgSub        *event.Subscription
+}
+
+// CreateSyncManager create sync manager and set switch.
+func NewSyncManager(config *cfg.Config, chain Chain, txPool *core.TxPool, dispatcher *event.Dispatcher) (*SyncManager, error) {
+       sw, err := p2p.NewSwitch(config)
+       if err != nil {
+               return nil, err
+       }
+
+       return newSyncManager(config, sw, chain, txPool, dispatcher)
 }
 
 //NewSyncManager create a sync manager
-func NewSyncManager(config *cfg.Config, chain Chain, txPool *core.TxPool, newBlockCh chan *bc.Hash) (*SyncManager, error) {
+func newSyncManager(config *cfg.Config, sw Switch, chain Chain, txPool *core.TxPool, dispatcher *event.Dispatcher) (*SyncManager, error) {
        genesisHeader, err := chain.GetHeaderByHeight(0)
        if err != nil {
                return nil, err
        }
-
-       sw := p2p.NewSwitch(config)
        peers := newPeerSet(sw)
        manager := &SyncManager{
-               sw:           sw,
-               genesisHash:  genesisHeader.Hash(),
-               txPool:       txPool,
-               chain:        chain,
-               privKey:      crypto.GenPrivKeyEd25519(),
-               blockFetcher: newBlockFetcher(chain, peers),
-               blockKeeper:  newBlockKeeper(chain, peers),
-               peers:        peers,
-               newTxCh:      make(chan *types.Tx, maxTxChanSize),
-               newBlockCh:   newBlockCh,
-               txSyncCh:     make(chan *txSyncMsg),
-               quitSync:     make(chan struct{}),
-               config:       config,
-       }
-
-       protocolReactor := NewProtocolReactor(manager, manager.peers)
-       manager.sw.AddReactor("PROTOCOL", protocolReactor)
-
-       // Create & add listener
-       var listenerStatus bool
-       var l p2p.Listener
-       if !config.VaultMode {
-               p, address := protocolAndAddress(manager.config.P2P.ListenAddress)
-               l, listenerStatus = p2p.NewDefaultListener(p, address, manager.config.P2P.SkipUPNP)
-               manager.sw.AddListener(l)
+               sw:              sw,
+               genesisHash:     genesisHeader.Hash(),
+               txPool:          txPool,
+               chain:           chain,
+               blockFetcher:    newBlockFetcher(chain, peers),
+               blockKeeper:     newBlockKeeper(chain, peers),
+               peers:           peers,
+               txSyncCh:        make(chan *txSyncMsg),
+               quitSync:        make(chan struct{}),
+               config:          config,
+               eventDispatcher: dispatcher,
+       }
 
-               discv, err := initDiscover(config, &manager.privKey, l.ExternalAddress().Port)
-               if err != nil {
-                       return nil, err
-               }
-               manager.sw.SetDiscv(discv)
+       if !config.VaultMode {
+               protocolReactor := NewProtocolReactor(manager, peers)
+               manager.sw.AddReactor("PROTOCOL", protocolReactor)
        }
-       manager.sw.SetNodeInfo(manager.makeNodeInfo(listenerStatus))
-       manager.sw.SetNodePrivKey(manager.privKey)
        return manager, nil
 }
 
@@ -119,9 +120,16 @@ func (sm *SyncManager) BestPeer() *PeerInfo {
        return nil
 }
 
-// GetNewTxCh return a unconfirmed transaction feed channel
-func (sm *SyncManager) GetNewTxCh() chan *types.Tx {
-       return sm.newTxCh
+func (sm *SyncManager) DialPeerWithAddress(addr *p2p.NetAddress) error {
+       if sm.config.VaultMode {
+               return errVaultModeDialPeer
+       }
+
+       return sm.sw.DialPeerWithAddress(addr)
+}
+
+func (sm *SyncManager) GetNetwork() string {
+       return sm.config.ChainID
 }
 
 //GetPeerInfos return peer info of all peers
@@ -135,11 +143,6 @@ func (sm *SyncManager) IsCaughtUp() bool {
        return peer == nil || peer.Height() <= sm.chain.BestBlockHeight()
 }
 
-//NodeInfo get P2P peer node info
-func (sm *SyncManager) NodeInfo() *p2p.NodeInfo {
-       return sm.sw.NodeInfo()
-}
-
 //StopPeer try to stop peer by given ID
 func (sm *SyncManager) StopPeer(peerID string) error {
        if peer := sm.peers.getPeer(peerID); peer == nil {
@@ -149,11 +152,6 @@ func (sm *SyncManager) StopPeer(peerID string) error {
        return nil
 }
 
-//Switch get sync manager switch
-func (sm *SyncManager) Switch() *p2p.Switch {
-       return sm.sw
-}
-
 func (sm *SyncManager) handleBlockMsg(peer *peer, msg *BlockMessage) {
        block, err := msg.GetBlock()
        if err != nil {
@@ -165,7 +163,7 @@ func (sm *SyncManager) handleBlockMsg(peer *peer, msg *BlockMessage) {
 func (sm *SyncManager) handleBlocksMsg(peer *peer, msg *BlocksMessage) {
        blocks, err := msg.GetBlocks()
        if err != nil {
-               log.WithField("err", err).Debug("fail on handleBlocksMsg GetBlocks")
+               log.WithFields(log.Fields{"module": logModule, "err": err}).Debug("fail on handleBlocksMsg GetBlocks")
                return
        }
 
@@ -193,7 +191,7 @@ func (sm *SyncManager) handleGetBlockMsg(peer *peer, msg *GetBlockMessage) {
                block, err = sm.chain.GetBlockByHash(msg.GetHash())
        }
        if err != nil {
-               log.WithField("err", err).Warning("fail on handleGetBlockMsg get block from chain")
+               log.WithFields(log.Fields{"module": logModule, "err": err}).Warning("fail on handleGetBlockMsg get block from chain")
                return
        }
 
@@ -202,7 +200,7 @@ func (sm *SyncManager) handleGetBlockMsg(peer *peer, msg *GetBlockMessage) {
                sm.peers.removePeer(peer.ID())
        }
        if err != nil {
-               log.WithField("err", err).Error("fail on handleGetBlockMsg sentBlock")
+               log.WithFields(log.Fields{"module": logModule, "err": err}).Error("fail on handleGetBlockMsg sentBlock")
        }
 }
 
@@ -217,7 +215,7 @@ func (sm *SyncManager) handleGetBlocksMsg(peer *peer, msg *GetBlocksMessage) {
        for _, block := range blocks {
                rawData, err := block.MarshalText()
                if err != nil {
-                       log.WithField("err", err).Error("fail on handleGetBlocksMsg marshal block")
+                       log.WithFields(log.Fields{"module": logModule, "err": err}).Error("fail on handleGetBlocksMsg marshal block")
                        continue
                }
 
@@ -233,14 +231,14 @@ func (sm *SyncManager) handleGetBlocksMsg(peer *peer, msg *GetBlocksMessage) {
                sm.peers.removePeer(peer.ID())
        }
        if err != nil {
-               log.WithField("err", err).Error("fail on handleGetBlocksMsg sentBlock")
+               log.WithFields(log.Fields{"module": logModule, "err": err}).Error("fail on handleGetBlocksMsg sentBlock")
        }
 }
 
 func (sm *SyncManager) handleGetHeadersMsg(peer *peer, msg *GetHeadersMessage) {
        headers, err := sm.blockKeeper.locateHeaders(msg.GetBlockLocator(), msg.GetStopHash())
        if err != nil || len(headers) == 0 {
-               log.WithField("err", err).Debug("fail on handleGetHeadersMsg locateHeaders")
+               log.WithFields(log.Fields{"module": logModule, "err": err}).Debug("fail on handleGetHeadersMsg locateHeaders")
                return
        }
 
@@ -249,7 +247,7 @@ func (sm *SyncManager) handleGetHeadersMsg(peer *peer, msg *GetHeadersMessage) {
                sm.peers.removePeer(peer.ID())
        }
        if err != nil {
-               log.WithField("err", err).Error("fail on handleGetHeadersMsg sentBlock")
+               log.WithFields(log.Fields{"module": logModule, "err": err}).Error("fail on handleGetHeadersMsg sentBlock")
        }
 }
 
@@ -262,20 +260,20 @@ func (sm *SyncManager) handleGetMerkleBlockMsg(peer *peer, msg *GetMerkleBlockMe
                block, err = sm.chain.GetBlockByHash(msg.GetHash())
        }
        if err != nil {
-               log.WithField("err", err).Warning("fail on handleGetMerkleBlockMsg get block from chain")
+               log.WithFields(log.Fields{"module": logModule, "err": err}).Warning("fail on handleGetMerkleBlockMsg get block from chain")
                return
        }
 
        blockHash := block.Hash()
        txStatus, err := sm.chain.GetTransactionStatus(&blockHash)
        if err != nil {
-               log.WithField("err", err).Warning("fail on handleGetMerkleBlockMsg get transaction status")
+               log.WithFields(log.Fields{"module": logModule, "err": err}).Warning("fail on handleGetMerkleBlockMsg get transaction status")
                return
        }
 
        ok, err := peer.sendMerkleBlock(block, txStatus)
        if err != nil {
-               log.WithField("err", err).Error("fail on handleGetMerkleBlockMsg sentMerkleBlock")
+               log.WithFields(log.Fields{"module": logModule, "err": err}).Error("fail on handleGetMerkleBlockMsg sentMerkleBlock")
                return
        }
 
@@ -287,7 +285,7 @@ func (sm *SyncManager) handleGetMerkleBlockMsg(peer *peer, msg *GetMerkleBlockMe
 func (sm *SyncManager) handleHeadersMsg(peer *peer, msg *HeadersMessage) {
        headers, err := msg.GetHeaders()
        if err != nil {
-               log.WithField("err", err).Debug("fail on handleHeadersMsg GetHeaders")
+               log.WithFields(log.Fields{"module": logModule, "err": err}).Debug("fail on handleHeadersMsg GetHeaders")
                return
        }
 
@@ -297,7 +295,7 @@ func (sm *SyncManager) handleHeadersMsg(peer *peer, msg *HeadersMessage) {
 func (sm *SyncManager) handleMineBlockMsg(peer *peer, msg *MineBlockMessage) {
        block, err := msg.GetMineBlock()
        if err != nil {
-               log.WithField("err", err).Warning("fail on handleMineBlockMsg GetMineBlock")
+               log.WithFields(log.Fields{"module": logModule, "err": err}).Warning("fail on handleMineBlockMsg GetMineBlock")
                return
        }
 
@@ -311,7 +309,7 @@ func (sm *SyncManager) handleStatusRequestMsg(peer BasePeer) {
        bestHeader := sm.chain.BestBlockHeader()
        genesisBlock, err := sm.chain.GetBlockByHeight(0)
        if err != nil {
-               log.WithField("err", err).Error("fail on handleStatusRequestMsg get genesis")
+               log.WithFields(log.Fields{"module": logModule, "err": err}).Error("fail on handleStatusRequestMsg get genesis")
        }
 
        genesisHash := genesisBlock.Hash()
@@ -328,10 +326,7 @@ func (sm *SyncManager) handleStatusResponseMsg(basePeer BasePeer, msg *StatusRes
        }
 
        if genesisHash := msg.GetGenesisHash(); sm.genesisHash != *genesisHash {
-               log.WithFields(log.Fields{
-                       "remote genesis": genesisHash.String(),
-                       "local genesis":  sm.genesisHash.String(),
-               }).Warn("fail hand shake due to differnt genesis")
+               log.WithFields(log.Fields{"module": logModule, "remote genesis": genesisHash.String(), "local genesis": sm.genesisHash.String()}).Warn("fail hand shake due to differnt genesis")
                return
        }
 
@@ -345,17 +340,45 @@ func (sm *SyncManager) handleTransactionMsg(peer *peer, msg *TransactionMessage)
                return
        }
 
-       if isOrphan, err := sm.chain.ValidateTx(tx); err != nil && isOrphan == false {
+       if isOrphan, err := sm.chain.ValidateTx(tx); err != nil && err != core.ErrDustTx && !isOrphan {
                sm.peers.addBanScore(peer.ID(), 10, 0, "fail on validate tx transaction")
        }
 }
 
+func (sm *SyncManager) IsListening() bool {
+       if sm.config.VaultMode {
+               return false
+       }
+       return sm.sw.IsListening()
+}
+
+func (sm *SyncManager) NodeInfo() *p2p.NodeInfo {
+       if sm.config.VaultMode {
+               return p2p.NewNodeInfo(sm.config, crypto.PubKeyEd25519{}, "")
+       }
+       return sm.sw.NodeInfo()
+}
+
+func (sm *SyncManager) PeerCount() int {
+       if sm.config.VaultMode {
+               return 0
+       }
+       return len(sm.sw.Peers().List())
+}
+
 func (sm *SyncManager) processMsg(basePeer BasePeer, msgType byte, msg BlockchainMessage) {
        peer := sm.peers.getPeer(basePeer.ID())
        if peer == nil && msgType != StatusResponseByte && msgType != StatusRequestByte {
                return
        }
 
+       log.WithFields(log.Fields{
+               "module":  logModule,
+               "peer":    basePeer.Addr(),
+               "type":    reflect.TypeOf(msg),
+               "message": msg.String(),
+       }).Info("receive message from peer")
+
        switch msg := msg.(type) {
        case *GetBlockMessage:
                sm.handleGetBlockMsg(peer, msg)
@@ -400,109 +423,68 @@ func (sm *SyncManager) processMsg(basePeer BasePeer, msgType byte, msg Blockchai
                sm.handleGetMerkleBlockMsg(peer, msg)
 
        default:
-               log.Errorf("unknown message type %v", reflect.TypeOf(msg))
-       }
-}
-
-// Defaults to tcp
-func protocolAndAddress(listenAddr string) (string, string) {
-       p, address := "tcp", listenAddr
-       parts := strings.SplitN(address, "://", 2)
-       if len(parts) == 2 {
-               p, address = parts[0], parts[1]
+               log.WithFields(log.Fields{
+                       "module":       logModule,
+                       "peer":         basePeer.Addr(),
+                       "message_type": reflect.TypeOf(msg),
+               }).Error("unhandled message type")
        }
-       return p, address
 }
 
-func (sm *SyncManager) makeNodeInfo(listenerStatus bool) *p2p.NodeInfo {
-       nodeInfo := &p2p.NodeInfo{
-               PubKey:  sm.privKey.PubKey().Unwrap().(crypto.PubKeyEd25519),
-               Moniker: sm.config.Moniker,
-               Network: sm.config.ChainID,
-               Version: version.Version,
-               Other:   []string{strconv.FormatUint(uint64(consensus.DefaultServices), 10)},
+func (sm *SyncManager) Start() error {
+       var err error
+       if _, err = sm.sw.Start(); err != nil {
+               log.Error("switch start err")
+               return err
        }
 
-       if !sm.sw.IsListening() {
-               return nodeInfo
+       sm.minedBlockSub, err = sm.eventDispatcher.Subscribe(event.NewMinedBlockEvent{})
+       if err != nil {
+               return err
        }
 
-       p2pListener := sm.sw.Listeners()[0]
-
-       // We assume that the rpcListener has the same ExternalAddress.
-       // This is probably true because both P2P and RPC listeners use UPnP,
-       // except of course if the rpc is only bound to localhost
-       if listenerStatus {
-               nodeInfo.ListenAddr = cmn.Fmt("%v:%v", p2pListener.ExternalAddress().IP.String(), p2pListener.ExternalAddress().Port)
-       } else {
-               nodeInfo.ListenAddr = cmn.Fmt("%v:%v", p2pListener.InternalAddress().IP.String(), p2pListener.InternalAddress().Port)
+       sm.txMsgSub, err = sm.eventDispatcher.Subscribe(core.TxMsgEvent{})
+       if err != nil {
+               return err
        }
-       return nodeInfo
-}
 
-//Start start sync manager service
-func (sm *SyncManager) Start() {
-       if _, err := sm.sw.Start(); err != nil {
-               cmn.Exit(cmn.Fmt("fail on start SyncManager: %v", err))
-       }
        // broadcast transactions
        go sm.txBroadcastLoop()
        go sm.minedBroadcastLoop()
        go sm.txSyncLoop()
+
+       return nil
 }
 
 //Stop stop sync manager
 func (sm *SyncManager) Stop() {
        close(sm.quitSync)
-       sm.sw.Stop()
-}
-
-func initDiscover(config *cfg.Config, priv *crypto.PrivKeyEd25519, port uint16) (*discover.Network, error) {
-       addr, err := net.ResolveUDPAddr("udp", net.JoinHostPort("0.0.0.0", strconv.FormatUint(uint64(port), 10)))
-       if err != nil {
-               return nil, err
-       }
-
-       conn, err := net.ListenUDP("udp", addr)
-       if err != nil {
-               return nil, err
+       sm.minedBlockSub.Unsubscribe()
+       if !sm.config.VaultMode {
+               sm.sw.Stop()
        }
-
-       realaddr := conn.LocalAddr().(*net.UDPAddr)
-       ntab, err := discover.ListenUDP(priv, conn, realaddr, path.Join(config.DBDir(), "discover.db"), nil)
-       if err != nil {
-               return nil, err
-       }
-
-       // add the seeds node to the discover table
-       if config.P2P.Seeds == "" {
-               return ntab, nil
-       }
-       nodes := []*discover.Node{}
-       for _, seed := range strings.Split(config.P2P.Seeds, ",") {
-               version.Status.AddSeed(seed)
-               url := "enode://" + hex.EncodeToString(crypto.Sha256([]byte(seed))) + "@" + seed
-               nodes = append(nodes, discover.MustParseNode(url))
-       }
-       if err = ntab.SetFallbackNodes(nodes); err != nil {
-               return nil, err
-       }
-       return ntab, nil
 }
 
 func (sm *SyncManager) minedBroadcastLoop() {
        for {
                select {
-               case blockHash := <-sm.newBlockCh:
-                       block, err := sm.chain.GetBlockByHash(blockHash)
-                       if err != nil {
-                               log.Errorf("Failed on mined broadcast loop get block %v", err)
+               case obj, ok := <-sm.minedBlockSub.Chan():
+                       if !ok {
+                               log.WithFields(log.Fields{"module": logModule}).Warning("mined block subscription channel closed")
                                return
                        }
-                       if err := sm.peers.broadcastMinedBlock(block); err != nil {
-                               log.Errorf("Broadcast mine block error. %v", err)
-                               return
+
+                       ev, ok := obj.Data.(event.NewMinedBlockEvent)
+                       if !ok {
+                               log.WithFields(log.Fields{"module": logModule}).Error("event type error")
+                               continue
+                       }
+
+                       if err := sm.peers.broadcastMinedBlock(ev.Block); err != nil {
+                               log.WithFields(log.Fields{"module": logModule, "err": err}).Error("fail on broadcast mine block")
+                               continue
                        }
+
                case <-sm.quitSync:
                        return
                }