OSDN Git Service

Optimize code logic
[bytom/bytom.git] / p2p / pex_reactor.go
index be6f75d..0fdd20d 100644 (file)
@@ -6,7 +6,9 @@ import (
        "math/rand"
        "reflect"
        "strings"
+       "sync/atomic"
        "time"
+       "sync"
 
        log "github.com/sirupsen/logrus"
        wire "github.com/tendermint/go-wire"
@@ -57,6 +59,8 @@ type PEXReactor struct {
        // tracks message count by peer, so we can prevent abuse
        msgCountByPeer    *cmn.CMap
        maxMsgCountByPeer uint16
+       dialing           int32
+       wg                sync.WaitGroup
 }
 
 // NewPEXReactor creates new PEX reactor.
@@ -128,10 +132,8 @@ func (r *PEXReactor) AddPeer(p *Peer) error {
        // close the connect if connect is big than max limit
        if r.sw.peers.Size() >= r.sw.config.MaxNumPeers {
                if ok := r.SendAddrs(p, r.book.GetSelection()); ok {
-                       select {
-                       case <-time.After(1 * time.Second):
-                               r.sw.StopPeerGracefully(p)
-                       }
+                       <-time.After(1 * time.Second)
+                       r.sw.StopPeerGracefully(p)
                }
                return errors.New("Error in AddPeer: reach the max peer, exchange then close")
        }
@@ -236,23 +238,23 @@ func (r *PEXReactor) ensurePeersRoutine() {
        time.Sleep(time.Duration(rand.Int63n(ensurePeersPeriodMs)) * time.Millisecond)
 
        // fire once immediately.
-       r.ensurePeers(r.sw.peers.Size())
+       r.ensurePeers()
 
        // fire periodically
        ticker := time.NewTicker(r.ensurePeersPeriod)
-       quickTicker := time.NewTicker(time.Second * 5)
+       quickTicker := time.NewTicker(time.Second * 1)
+
        for {
                select {
                case <-ticker.C:
-                       if r.sw.peers.Size() >= 3 {
-                               r.ensurePeers(r.sw.peers.Size())
-                       }
+                       r.ensurePeers()
                case <-quickTicker.C:
                        if r.sw.peers.Size() < 3 {
-                               r.ensurePeers(r.sw.peers.Size())
+                               r.ensurePeers()
                        }
                case <-r.Quit:
                        ticker.Stop()
+                       quickTicker.Stop()
                        return
                }
        }
@@ -270,9 +272,15 @@ func (r *PEXReactor) ensurePeersRoutine() {
 // What we're currently doing in terms of marking good/bad peers is just a
 // placeholder. It should not be the case that an address becomes old/vetted
 // upon a single successful connection.
-func (r *PEXReactor) ensurePeers(num int) {
+func (r *PEXReactor) ensurePeers() {
+       if !atomic.CompareAndSwapInt32(&r.dialing, 0, 1) {
+               log.Info("Ensure peers ...")
+               return
+       }
+       defer atomic.StoreInt32(&r.dialing, 0)
+
        numOutPeers, _, numDialing := r.Switch.NumPeers()
-       numToDial := minNumOutboundPeers*(minNumOutboundPeers-num) - (numOutPeers + numDialing)
+       numToDial := minNumOutboundPeers*(minNumOutboundPeers-numOutPeers) - (numOutPeers + numDialing)
        log.WithFields(log.Fields{
                "numOutPeers": numOutPeers,
                "numDialing":  numDialing,
@@ -332,14 +340,17 @@ func (r *PEXReactor) ensurePeers(num int) {
        }
        // Dial picked addresses
        for _, item := range toDial {
+               r.wg.Add(1)
                go func(picked *NetAddress) {
                        if _, err := r.Switch.DialPeerWithAddress(picked, false); err != nil {
                                r.book.MarkAttempt(picked)
                        } else {
                                r.book.MarkGood(picked)
                        }
+                       r.wg.Done()
                }(item)
        }
+       r.wg.Wait()
 
        // If we need more addresses, pick a random peer and ask for more.
        if r.book.NeedMoreAddrs() {