OSDN Git Service

Remove p2p peer exchange module (#1196)
authoryahtoo <yahtoo.ma@gmail.com>
Tue, 31 Jul 2018 08:23:21 +0000 (16:23 +0800)
committerPaladz <yzhu101@uottawa.ca>
Tue, 31 Jul 2018 08:23:21 +0000 (16:23 +0800)
netsync/handle.go
p2p/pex/pex_message.go [deleted file]
p2p/pex/pex_reactor.go [deleted file]
p2p/switch.go

index 0793a6a..3f45988 100644 (file)
@@ -17,7 +17,6 @@ import (
        "github.com/bytom/consensus"
        "github.com/bytom/p2p"
        "github.com/bytom/p2p/discover"
-       "github.com/bytom/p2p/pex"
        core "github.com/bytom/protocol"
        "github.com/bytom/protocol/bc"
        "github.com/bytom/protocol/bc/types"
@@ -101,9 +100,7 @@ func NewSyncManager(config *cfg.Config, chain Chain, txPool *core.TxPool, newBlo
                if err != nil {
                        return nil, err
                }
-
-               pexReactor := pex.NewPEXReactor(discv)
-               manager.sw.AddReactor("PEX", pexReactor)
+               manager.sw.SetDiscv(discv)
        }
        manager.sw.SetNodeInfo(manager.makeNodeInfo(listenerStatus))
        manager.sw.SetNodePrivKey(manager.privKey)
diff --git a/p2p/pex/pex_message.go b/p2p/pex/pex_message.go
deleted file mode 100644 (file)
index 329c2b1..0000000
+++ /dev/null
@@ -1,44 +0,0 @@
-package pex
-
-import (
-       "bytes"
-       "fmt"
-
-       wire "github.com/tendermint/go-wire"
-
-       "github.com/bytom/p2p"
-)
-
-const (
-       msgTypeRequest = byte(0x01)
-       msgTypeAddrs   = byte(0x02)
-)
-
-// PexMessage is a primary type for PEX messages. Underneath, it could contain
-// either pexRequestMessage, or pexAddrsMessage messages.
-type PexMessage interface{}
-
-var _ = wire.RegisterInterface(
-       struct{ PexMessage }{},
-       wire.ConcreteType{&pexRequestMessage{}, msgTypeRequest},
-       wire.ConcreteType{&pexAddrsMessage{}, msgTypeAddrs},
-)
-
-// DecodeMessage implements interface registered above.
-func DecodeMessage(bz []byte) (msgType byte, msg PexMessage, err error) {
-       msgType = bz[0]
-       n := new(int)
-       r := bytes.NewReader(bz)
-       msg = wire.ReadBinary(struct{ PexMessage }{}, r, maxPexMessageSize, n, &err).(struct{ PexMessage }).PexMessage
-       return
-}
-
-type pexRequestMessage struct{}
-
-func (m *pexRequestMessage) String() string { return "[pexRequest]" }
-
-type pexAddrsMessage struct {
-       Addrs []*p2p.NetAddress
-}
-
-func (m *pexAddrsMessage) String() string { return fmt.Sprintf("[pexAddrs %v]", m.Addrs) }
diff --git a/p2p/pex/pex_reactor.go b/p2p/pex/pex_reactor.go
deleted file mode 100644 (file)
index b50274f..0000000
+++ /dev/null
@@ -1,187 +0,0 @@
-package pex
-
-import (
-       "errors"
-       "reflect"
-       "sync"
-       "time"
-
-       log "github.com/sirupsen/logrus"
-
-       "github.com/bytom/p2p"
-       "github.com/bytom/p2p/connection"
-       "github.com/bytom/p2p/discover"
-)
-
-const (
-       // PexChannel is a channel for PEX messages
-       PexChannel = byte(0x00)
-
-       minNumOutboundPeers = 5
-       maxPexMessageSize   = 1048576 // 1MB
-)
-
-// PEXReactor handles peer exchange and ensures that an adequate number of peers are connected to the switch.
-type PEXReactor struct {
-       p2p.BaseReactor
-       discv *discover.Network
-}
-
-// NewPEXReactor creates new PEX reactor.
-func NewPEXReactor(discv *discover.Network) *PEXReactor {
-       r := &PEXReactor{
-               discv: discv,
-       }
-       r.BaseReactor = *p2p.NewBaseReactor("PEXReactor", r)
-       return r
-}
-
-// OnStart implements BaseService
-func (r *PEXReactor) OnStart() error {
-       r.BaseReactor.OnStart()
-       go r.ensurePeersRoutine()
-       return nil
-}
-
-// OnStop implements BaseService
-func (r *PEXReactor) OnStop() {
-       r.BaseReactor.OnStop()
-}
-
-// GetChannels implements Reactor
-func (r *PEXReactor) GetChannels() []*connection.ChannelDescriptor {
-       return []*connection.ChannelDescriptor{&connection.ChannelDescriptor{
-               ID:                PexChannel,
-               Priority:          1,
-               SendQueueCapacity: 10,
-       }}
-}
-
-// AddPeer adding peer to the address book
-func (r *PEXReactor) AddPeer(p *p2p.Peer) error {
-       if r.Switch.Peers().Size() <= r.Switch.Config.P2P.MaxNumPeers {
-               return nil
-       }
-
-       nodes := make([]*discover.Node, 10)
-       if n := r.discv.ReadRandomNodes(nodes); n == 0 {
-               return nil
-       }
-
-       if r.SendAddrs(p, nodes) {
-               <-time.After(1 * time.Second)
-               r.Switch.StopPeerGracefully(p.Key)
-       }
-       return errors.New("addPeer: reach the max peer, exchange then close")
-}
-
-// Receive implements Reactor by handling incoming PEX messages.
-func (r *PEXReactor) Receive(chID byte, p *p2p.Peer, rawMsg []byte) {
-       _, msg, err := DecodeMessage(rawMsg)
-       if err != nil {
-               log.WithField("error", err).Error("failed to decoding pex message")
-               r.Switch.StopPeerGracefully(p.Key)
-               return
-       }
-
-       switch msg := msg.(type) {
-       case *pexRequestMessage:
-               nodes := make([]*discover.Node, 10)
-               if n := r.discv.ReadRandomNodes(nodes); n == 0 {
-                       return
-               }
-
-               if !r.SendAddrs(p, nodes) {
-                       log.Error("failed to send pex address message")
-               }
-
-       case *pexAddrsMessage:
-       default:
-               log.WithField("type", reflect.TypeOf(msg)).Error("Unknown message type")
-       }
-}
-
-// RemovePeer implements Reactor.
-func (r *PEXReactor) RemovePeer(p *p2p.Peer, reason interface{}) {}
-
-// SendAddrs sends addrs to the peer.
-func (r *PEXReactor) SendAddrs(p *p2p.Peer, nodes []*discover.Node) bool {
-       addrs := []*p2p.NetAddress{}
-       for _, node := range nodes {
-               if node == nil {
-                       break
-               }
-               addrs = append(addrs, p2p.NewNetAddressIPPort(node.IP, node.TCP))
-       }
-
-       ok := p.TrySend(PexChannel, struct{ PexMessage }{&pexAddrsMessage{Addrs: addrs}})
-       if !ok {
-               r.Switch.StopPeerGracefully(p.Key)
-       }
-       return ok
-}
-
-func (r *PEXReactor) dialPeerWorker(a *p2p.NetAddress, wg *sync.WaitGroup) {
-       if err := r.Switch.DialPeerWithAddress(a); err != nil {
-               log.WithFields(log.Fields{"addr": a, "err": err}).Error("dialPeerWorker fail on dial peer")
-       }
-       wg.Done()
-}
-
-func (r *PEXReactor) ensurePeers() {
-       numOutPeers, _, numDialing := r.Switch.NumPeers()
-       numToDial := (minNumOutboundPeers - (numOutPeers + numDialing))
-       log.WithFields(log.Fields{
-               "numOutPeers": numOutPeers,
-               "numDialing":  numDialing,
-               "numToDial":   numToDial,
-       }).Debug("ensure peers")
-       if numToDial <= 0 {
-               return
-       }
-
-       connectedPeers := make(map[string]struct{})
-       for _, peer := range r.Switch.Peers().List() {
-               connectedPeers[peer.RemoteAddrHost()] = struct{}{}
-       }
-
-       var wg sync.WaitGroup
-       nodes := make([]*discover.Node, numToDial)
-       n := r.discv.ReadRandomNodes(nodes)
-       for i := 0; i < n; i++ {
-               try := p2p.NewNetAddressIPPort(nodes[i].IP, nodes[i].TCP)
-               if r.Switch.NodeInfo().ListenAddr == try.String() {
-                       continue
-               }
-               if dialling := r.Switch.IsDialing(try); dialling {
-                       continue
-               }
-               if _, ok := connectedPeers[try.IP.String()]; ok {
-                       continue
-               }
-
-               log.Debug("Will dial address addr:", try)
-               wg.Add(1)
-               go r.dialPeerWorker(try, &wg)
-       }
-       wg.Wait()
-}
-
-func (r *PEXReactor) ensurePeersRoutine() {
-       r.ensurePeers()
-       ticker := time.NewTicker(120 * time.Second)
-       quickTicker := time.NewTicker(3 * time.Second)
-
-       for {
-               select {
-               case <-ticker.C:
-                       r.ensurePeers()
-               case <-quickTicker.C:
-                       if r.Switch.Peers().Size() < 3 {
-                               r.ensurePeers()
-                       }
-               case <-r.Quit:
-                       return
-               }
-       }
-}
index 5cbf59c..6a8b944 100644 (file)
@@ -15,12 +15,14 @@ import (
        cfg "github.com/bytom/config"
        "github.com/bytom/errors"
        "github.com/bytom/p2p/connection"
+       "github.com/bytom/p2p/discover"
        "github.com/bytom/p2p/trust"
 )
 
 const (
-       bannedPeerKey      = "BannedPeer"
-       defaultBanDuration = time.Hour * 1
+       bannedPeerKey       = "BannedPeer"
+       defaultBanDuration  = time.Hour * 1
+       minNumOutboundPeers = 5
 )
 
 //pre-define errors for connecting fail
@@ -47,6 +49,7 @@ type Switch struct {
        dialing      *cmn.CMap
        nodeInfo     *NodeInfo             // our node info
        nodePrivKey  crypto.PrivKeyEd25519 // our node privkey
+       discv        *discover.Network
        bannedPeer   map[string]time.Time
        db           dbm.DB
        mtx          sync.Mutex
@@ -86,6 +89,7 @@ func (sw *Switch) OnStart() error {
        for _, listener := range sw.listeners {
                go sw.listenerRoutine(listener)
        }
+       go sw.ensureOutboundPeersRoutine()
        return nil
 }
 
@@ -341,7 +345,7 @@ func (sw *Switch) listenerRoutine(l Listener) {
                // disconnect if we alrady have 2 * MaxNumPeers, we do this because we wanna address book get exchanged even if
                // the connect is full. The pex will disconnect the peer after address exchange, the max connected peer won't
                // be double of MaxNumPeers
-               if sw.peers.Size() >= sw.Config.P2P.MaxNumPeers*2 {
+               if sw.peers.Size() >= sw.Config.P2P.MaxNumPeers {
                        inConn.Close()
                        log.Info("Ignoring inbound connection: already have enough peers.")
                        continue
@@ -355,6 +359,67 @@ func (sw *Switch) listenerRoutine(l Listener) {
        }
 }
 
+func (sw *Switch) SetDiscv(discv *discover.Network) {
+       sw.discv = discv
+}
+
+func (sw *Switch) dialPeerWorker(a *NetAddress, wg *sync.WaitGroup) {
+       if err := sw.DialPeerWithAddress(a); err != nil {
+               log.WithFields(log.Fields{"addr": a, "err": err}).Error("dialPeerWorker fail on dial peer")
+       }
+       wg.Done()
+}
+
+func (sw *Switch) ensureOutboundPeers() {
+       numOutPeers, _, numDialing := sw.NumPeers()
+       numToDial := (minNumOutboundPeers - (numOutPeers + numDialing))
+       log.WithFields(log.Fields{"numOutPeers": numOutPeers, "numDialing": numDialing, "numToDial": numToDial}).Debug("ensure peers")
+       if numToDial <= 0 {
+               return
+       }
+
+       connectedPeers := make(map[string]struct{})
+       for _, peer := range sw.Peers().List() {
+               connectedPeers[peer.RemoteAddrHost()] = struct{}{}
+       }
+
+       var wg sync.WaitGroup
+       nodes := make([]*discover.Node, numToDial)
+       n := sw.discv.ReadRandomNodes(nodes)
+       for i := 0; i < n; i++ {
+               try := NewNetAddressIPPort(nodes[i].IP, nodes[i].TCP)
+               if sw.NodeInfo().ListenAddr == try.String() {
+                       continue
+               }
+               if dialling := sw.IsDialing(try); dialling {
+                       continue
+               }
+               if _, ok := connectedPeers[try.IP.String()]; ok {
+                       continue
+               }
+
+               wg.Add(1)
+               go sw.dialPeerWorker(try, &wg)
+       }
+       wg.Wait()
+}
+
+func (sw *Switch) ensureOutboundPeersRoutine() {
+       sw.ensureOutboundPeers()
+
+       ticker := time.NewTicker(10 * time.Second)
+       defer ticker.Stop()
+
+       for {
+               select {
+               case <-ticker.C:
+                       sw.ensureOutboundPeers()
+               case <-sw.Quit:
+                       return
+               }
+       }
+}
+
 func (sw *Switch) startInitPeer(peer *Peer) error {
        peer.Start() // spawn send/recv routines
        for _, reactor := range sw.reactors {