OSDN Git Service

Remove p2p peer exchange module (#1196)
[bytom/bytom-spv.git] / p2p / switch.go
index 5cbf59c..6a8b944 100644 (file)
@@ -15,12 +15,14 @@ import (
        cfg "github.com/bytom/config"
        "github.com/bytom/errors"
        "github.com/bytom/p2p/connection"
+       "github.com/bytom/p2p/discover"
        "github.com/bytom/p2p/trust"
 )
 
 const (
-       bannedPeerKey      = "BannedPeer"
-       defaultBanDuration = time.Hour * 1
+       bannedPeerKey       = "BannedPeer"
+       defaultBanDuration  = time.Hour * 1
+       minNumOutboundPeers = 5
 )
 
 //pre-define errors for connecting fail
@@ -47,6 +49,7 @@ type Switch struct {
        dialing      *cmn.CMap
        nodeInfo     *NodeInfo             // our node info
        nodePrivKey  crypto.PrivKeyEd25519 // our node privkey
+       discv        *discover.Network
        bannedPeer   map[string]time.Time
        db           dbm.DB
        mtx          sync.Mutex
@@ -86,6 +89,7 @@ func (sw *Switch) OnStart() error {
        for _, listener := range sw.listeners {
                go sw.listenerRoutine(listener)
        }
+       go sw.ensureOutboundPeersRoutine()
        return nil
 }
 
@@ -341,7 +345,7 @@ func (sw *Switch) listenerRoutine(l Listener) {
                // disconnect if we alrady have 2 * MaxNumPeers, we do this because we wanna address book get exchanged even if
                // the connect is full. The pex will disconnect the peer after address exchange, the max connected peer won't
                // be double of MaxNumPeers
-               if sw.peers.Size() >= sw.Config.P2P.MaxNumPeers*2 {
+               if sw.peers.Size() >= sw.Config.P2P.MaxNumPeers {
                        inConn.Close()
                        log.Info("Ignoring inbound connection: already have enough peers.")
                        continue
@@ -355,6 +359,67 @@ func (sw *Switch) listenerRoutine(l Listener) {
        }
 }
 
+func (sw *Switch) SetDiscv(discv *discover.Network) {
+       sw.discv = discv
+}
+
+func (sw *Switch) dialPeerWorker(a *NetAddress, wg *sync.WaitGroup) {
+       if err := sw.DialPeerWithAddress(a); err != nil {
+               log.WithFields(log.Fields{"addr": a, "err": err}).Error("dialPeerWorker fail on dial peer")
+       }
+       wg.Done()
+}
+
+func (sw *Switch) ensureOutboundPeers() {
+       numOutPeers, _, numDialing := sw.NumPeers()
+       numToDial := (minNumOutboundPeers - (numOutPeers + numDialing))
+       log.WithFields(log.Fields{"numOutPeers": numOutPeers, "numDialing": numDialing, "numToDial": numToDial}).Debug("ensure peers")
+       if numToDial <= 0 {
+               return
+       }
+
+       connectedPeers := make(map[string]struct{})
+       for _, peer := range sw.Peers().List() {
+               connectedPeers[peer.RemoteAddrHost()] = struct{}{}
+       }
+
+       var wg sync.WaitGroup
+       nodes := make([]*discover.Node, numToDial)
+       n := sw.discv.ReadRandomNodes(nodes)
+       for i := 0; i < n; i++ {
+               try := NewNetAddressIPPort(nodes[i].IP, nodes[i].TCP)
+               if sw.NodeInfo().ListenAddr == try.String() {
+                       continue
+               }
+               if dialling := sw.IsDialing(try); dialling {
+                       continue
+               }
+               if _, ok := connectedPeers[try.IP.String()]; ok {
+                       continue
+               }
+
+               wg.Add(1)
+               go sw.dialPeerWorker(try, &wg)
+       }
+       wg.Wait()
+}
+
+func (sw *Switch) ensureOutboundPeersRoutine() {
+       sw.ensureOutboundPeers()
+
+       ticker := time.NewTicker(10 * time.Second)
+       defer ticker.Stop()
+
+       for {
+               select {
+               case <-ticker.C:
+                       sw.ensureOutboundPeers()
+               case <-sw.Quit:
+                       return
+               }
+       }
+}
+
 func (sw *Switch) startInitPeer(peer *Peer) error {
        peer.Start() // spawn send/recv routines
        for _, reactor := range sw.reactors {