OSDN Git Service

Mempool: add no btm input tx filter (#1605)
[bytom/bytom.git] / netsync / handle.go
index 1c78903..643ae0a 100644 (file)
@@ -1,27 +1,19 @@
 package netsync
 
 import (
-       "encoding/hex"
        "errors"
-       "net"
-       "path"
        "reflect"
-       "strconv"
-       "strings"
 
        log "github.com/sirupsen/logrus"
-       "github.com/tendermint/go-crypto"
-       cmn "github.com/tendermint/tmlibs/common"
 
        cfg "github.com/bytom/config"
        "github.com/bytom/consensus"
        "github.com/bytom/event"
        "github.com/bytom/p2p"
-       "github.com/bytom/p2p/discover"
        core "github.com/bytom/protocol"
        "github.com/bytom/protocol/bc"
        "github.com/bytom/protocol/bc/types"
-       "github.com/bytom/version"
+       "github.com/tendermint/go-crypto"
 )
 
 const (
@@ -31,6 +23,10 @@ const (
        maxFilterAddressCount = 1000
 )
 
+var (
+       errVaultModeDialPeer = errors.New("can't dial peer in vault mode")
+)
+
 // Chain is the interface for Bytom core
 type Chain interface {
        BestBlockHeader() *types.BlockHeader
@@ -46,71 +42,72 @@ type Chain interface {
        ValidateTx(*types.Tx) (bool, error)
 }
 
+type Switch interface {
+       AddReactor(name string, reactor p2p.Reactor) p2p.Reactor
+       AddBannedPeer(string) error
+       StopPeerGracefully(string)
+       NodeInfo() *p2p.NodeInfo
+       Start() (bool, error)
+       Stop() bool
+       IsListening() bool
+       DialPeerWithAddress(addr *p2p.NetAddress) error
+       Peers() *p2p.PeerSet
+}
+
 //SyncManager Sync Manager is responsible for the business layer information synchronization
 type SyncManager struct {
-       sw          *p2p.Switch
-       genesisHash bc.Hash
-
-       privKey      crypto.PrivKeyEd25519 // local node's p2p key
+       sw           Switch
+       genesisHash  bc.Hash
        chain        Chain
        txPool       *core.TxPool
        blockFetcher *blockFetcher
        blockKeeper  *blockKeeper
        peers        *peerSet
 
-       newTxCh  chan *types.Tx
        txSyncCh chan *txSyncMsg
        quitSync chan struct{}
        config   *cfg.Config
 
        eventDispatcher *event.Dispatcher
        minedBlockSub   *event.Subscription
+       txMsgSub        *event.Subscription
 }
 
-//NewSyncManager create a sync manager
+// CreateSyncManager create sync manager and set switch.
 func NewSyncManager(config *cfg.Config, chain Chain, txPool *core.TxPool, dispatcher *event.Dispatcher) (*SyncManager, error) {
-       genesisHeader, err := chain.GetHeaderByHeight(0)
+       sw, err := p2p.NewSwitch(config)
        if err != nil {
                return nil, err
        }
 
-       sw := p2p.NewSwitch(config)
+       return newSyncManager(config, sw, chain, txPool, dispatcher)
+}
+
+//NewSyncManager create a sync manager
+func newSyncManager(config *cfg.Config, sw Switch, chain Chain, txPool *core.TxPool, dispatcher *event.Dispatcher) (*SyncManager, error) {
+       genesisHeader, err := chain.GetHeaderByHeight(0)
+       if err != nil {
+               return nil, err
+       }
        peers := newPeerSet(sw)
        manager := &SyncManager{
                sw:              sw,
                genesisHash:     genesisHeader.Hash(),
                txPool:          txPool,
                chain:           chain,
-               privKey:         crypto.GenPrivKeyEd25519(),
                blockFetcher:    newBlockFetcher(chain, peers),
                blockKeeper:     newBlockKeeper(chain, peers),
                peers:           peers,
-               newTxCh:         make(chan *types.Tx, maxTxChanSize),
                txSyncCh:        make(chan *txSyncMsg),
                quitSync:        make(chan struct{}),
                config:          config,
                eventDispatcher: dispatcher,
        }
 
-       protocolReactor := NewProtocolReactor(manager, manager.peers)
-       manager.sw.AddReactor("PROTOCOL", protocolReactor)
-
-       // Create & add listener
-       var listenerStatus bool
-       var l p2p.Listener
        if !config.VaultMode {
-               p, address := protocolAndAddress(manager.config.P2P.ListenAddress)
-               l, listenerStatus = p2p.NewDefaultListener(p, address, manager.config.P2P.SkipUPNP)
-               manager.sw.AddListener(l)
-
-               discv, err := initDiscover(config, &manager.privKey, l.ExternalAddress().Port)
-               if err != nil {
-                       return nil, err
-               }
-               manager.sw.SetDiscv(discv)
+               protocolReactor := NewProtocolReactor(manager, peers)
+               manager.sw.AddReactor("PROTOCOL", protocolReactor)
        }
-       manager.sw.SetNodeInfo(manager.makeNodeInfo(listenerStatus))
-       manager.sw.SetNodePrivKey(manager.privKey)
        return manager, nil
 }
 
@@ -123,9 +120,16 @@ func (sm *SyncManager) BestPeer() *PeerInfo {
        return nil
 }
 
-// GetNewTxCh return a unconfirmed transaction feed channel
-func (sm *SyncManager) GetNewTxCh() chan *types.Tx {
-       return sm.newTxCh
+func (sm *SyncManager) DialPeerWithAddress(addr *p2p.NetAddress) error {
+       if sm.config.VaultMode {
+               return errVaultModeDialPeer
+       }
+
+       return sm.sw.DialPeerWithAddress(addr)
+}
+
+func (sm *SyncManager) GetNetwork() string {
+       return sm.config.ChainID
 }
 
 //GetPeerInfos return peer info of all peers
@@ -139,11 +143,6 @@ func (sm *SyncManager) IsCaughtUp() bool {
        return peer == nil || peer.Height() <= sm.chain.BestBlockHeight()
 }
 
-//NodeInfo get P2P peer node info
-func (sm *SyncManager) NodeInfo() *p2p.NodeInfo {
-       return sm.sw.NodeInfo()
-}
-
 //StopPeer try to stop peer by given ID
 func (sm *SyncManager) StopPeer(peerID string) error {
        if peer := sm.peers.getPeer(peerID); peer == nil {
@@ -153,11 +152,6 @@ func (sm *SyncManager) StopPeer(peerID string) error {
        return nil
 }
 
-//Switch get sync manager switch
-func (sm *SyncManager) Switch() *p2p.Switch {
-       return sm.sw
-}
-
 func (sm *SyncManager) handleBlockMsg(peer *peer, msg *BlockMessage) {
        block, err := msg.GetBlock()
        if err != nil {
@@ -332,11 +326,7 @@ func (sm *SyncManager) handleStatusResponseMsg(basePeer BasePeer, msg *StatusRes
        }
 
        if genesisHash := msg.GetGenesisHash(); sm.genesisHash != *genesisHash {
-               log.WithFields(log.Fields{
-                       "module":         logModule,
-                       "remote genesis": genesisHash.String(),
-                       "local genesis":  sm.genesisHash.String(),
-               }).Warn("fail hand shake due to differnt genesis")
+               log.WithFields(log.Fields{"module": logModule, "remote genesis": genesisHash.String(), "local genesis": sm.genesisHash.String()}).Warn("fail hand shake due to differnt genesis")
                return
        }
 
@@ -350,11 +340,32 @@ func (sm *SyncManager) handleTransactionMsg(peer *peer, msg *TransactionMessage)
                return
        }
 
-       if isOrphan, err := sm.chain.ValidateTx(tx); err != nil && !isOrphan {
+       if isOrphan, err := sm.chain.ValidateTx(tx); err != nil && err != core.ErrDustTx && !isOrphan {
                sm.peers.addBanScore(peer.ID(), 10, 0, "fail on validate tx transaction")
        }
 }
 
+func (sm *SyncManager) IsListening() bool {
+       if sm.config.VaultMode {
+               return false
+       }
+       return sm.sw.IsListening()
+}
+
+func (sm *SyncManager) NodeInfo() *p2p.NodeInfo {
+       if sm.config.VaultMode {
+               return p2p.NewNodeInfo(sm.config, crypto.PubKeyEd25519{}, "")
+       }
+       return sm.sw.NodeInfo()
+}
+
+func (sm *SyncManager) PeerCount() int {
+       if sm.config.VaultMode {
+               return 0
+       }
+       return len(sm.sw.Peers().List())
+}
+
 func (sm *SyncManager) processMsg(basePeer BasePeer, msgType byte, msg BlockchainMessage) {
        peer := sm.peers.getPeer(basePeer.ID())
        if peer == nil && msgType != StatusResponseByte && msgType != StatusRequestByte {
@@ -420,106 +431,38 @@ func (sm *SyncManager) processMsg(basePeer BasePeer, msgType byte, msg Blockchai
        }
 }
 
-// Defaults to tcp
-func protocolAndAddress(listenAddr string) (string, string) {
-       p, address := "tcp", listenAddr
-       parts := strings.SplitN(address, "://", 2)
-       if len(parts) == 2 {
-               p, address = parts[0], parts[1]
-       }
-       return p, address
-}
-
-func (sm *SyncManager) makeNodeInfo(listenerStatus bool) *p2p.NodeInfo {
-       nodeInfo := &p2p.NodeInfo{
-               PubKey:  sm.privKey.PubKey().Unwrap().(crypto.PubKeyEd25519),
-               Moniker: sm.config.Moniker,
-               Network: sm.config.ChainID,
-               Version: version.Version,
-               Other:   []string{strconv.FormatUint(uint64(consensus.DefaultServices), 10)},
-       }
-
-       if !sm.sw.IsListening() {
-               return nodeInfo
-       }
-
-       p2pListener := sm.sw.Listeners()[0]
-
-       // We assume that the rpcListener has the same ExternalAddress.
-       // This is probably true because both P2P and RPC listeners use UPnP,
-       // except of course if the rpc is only bound to localhost
-       if listenerStatus {
-               nodeInfo.ListenAddr = cmn.Fmt("%v:%v", p2pListener.ExternalAddress().IP.String(), p2pListener.ExternalAddress().Port)
-       } else {
-               nodeInfo.ListenAddr = cmn.Fmt("%v:%v", p2pListener.InternalAddress().IP.String(), p2pListener.InternalAddress().Port)
+func (sm *SyncManager) Start() error {
+       var err error
+       if _, err = sm.sw.Start(); err != nil {
+               log.Error("switch start err")
+               return err
        }
-       return nodeInfo
-}
 
-//Start start sync manager service
-func (sm *SyncManager) Start() {
-       _, err := sm.sw.Start()
+       sm.minedBlockSub, err = sm.eventDispatcher.Subscribe(event.NewMinedBlockEvent{})
        if err != nil {
-               cmn.Exit(cmn.Fmt("fail on start SyncManager: %v", err))
+               return err
        }
-       // broadcast transactions
-       go sm.txBroadcastLoop()
 
-       sm.minedBlockSub, err = sm.eventDispatcher.Subscribe(event.NewMinedBlockEvent{})
+       sm.txMsgSub, err = sm.eventDispatcher.Subscribe(core.TxMsgEvent{})
        if err != nil {
-               cmn.Exit(cmn.Fmt("fail on start SyncManager: %v", err))
+               return err
        }
 
+       // broadcast transactions
+       go sm.txBroadcastLoop()
        go sm.minedBroadcastLoop()
        go sm.txSyncLoop()
+
+       return nil
 }
 
 //Stop stop sync manager
 func (sm *SyncManager) Stop() {
        close(sm.quitSync)
        sm.minedBlockSub.Unsubscribe()
-       sm.sw.Stop()
-}
-
-func initDiscover(config *cfg.Config, priv *crypto.PrivKeyEd25519, port uint16) (*discover.Network, error) {
-       addr, err := net.ResolveUDPAddr("udp", net.JoinHostPort("0.0.0.0", strconv.FormatUint(uint64(port), 10)))
-       if err != nil {
-               return nil, err
-       }
-
-       conn, err := net.ListenUDP("udp", addr)
-       if err != nil {
-               return nil, err
-       }
-
-       realaddr := conn.LocalAddr().(*net.UDPAddr)
-       ntab, err := discover.ListenUDP(priv, conn, realaddr, path.Join(config.DBDir(), "discover.db"), nil)
-       if err != nil {
-               return nil, err
-       }
-
-       seeds, err := p2p.QueryDNSSeeds(net.LookupHost)
-       if err != nil {
-               log.WithFields(log.Fields{"module": logModule, "err": err}).Error("fail on query dns seeds")
-       }
-
-       seeds = append(seeds, strings.Split(config.P2P.Seeds, ",")...)
-
-       if len(seeds) == 0 {
-               return ntab, nil
-       }
-
-       var nodes []*discover.Node
-       for _, seed := range seeds {
-               version.Status.AddSeed(seed)
-               url := "enode://" + hex.EncodeToString(crypto.Sha256([]byte(seed))) + "@" + seed
-               nodes = append(nodes, discover.MustParseNode(url))
-       }
-
-       if err = ntab.SetFallbackNodes(nodes); err != nil {
-               return nil, err
+       if !sm.config.VaultMode {
+               sm.sw.Stop()
        }
-       return ntab, nil
 }
 
 func (sm *SyncManager) minedBroadcastLoop() {