"math/rand"
"reflect"
"strings"
+ "sync/atomic"
"time"
+ "sync"
log "github.com/sirupsen/logrus"
wire "github.com/tendermint/go-wire"
// tracks message count by peer, so we can prevent abuse
msgCountByPeer *cmn.CMap
maxMsgCountByPeer uint16
+ dialing int32
+ wg sync.WaitGroup
}
// NewPEXReactor creates new PEX reactor.
// close the connect if connect is big than max limit
if r.sw.peers.Size() >= r.sw.config.MaxNumPeers {
if ok := r.SendAddrs(p, r.book.GetSelection()); ok {
- select {
- case <-time.After(1 * time.Second):
- r.sw.StopPeerGracefully(p)
- }
+ <-time.After(1 * time.Second)
+ r.sw.StopPeerGracefully(p)
}
return errors.New("Error in AddPeer: reach the max peer, exchange then close")
}
time.Sleep(time.Duration(rand.Int63n(ensurePeersPeriodMs)) * time.Millisecond)
// fire once immediately.
- r.ensurePeers(r.sw.peers.Size())
+ r.ensurePeers()
// fire periodically
ticker := time.NewTicker(r.ensurePeersPeriod)
- quickTicker := time.NewTicker(time.Second * 5)
+ quickTicker := time.NewTicker(time.Second * 1)
+
for {
select {
case <-ticker.C:
- if r.sw.peers.Size() >= 3 {
- r.ensurePeers(r.sw.peers.Size())
- }
+ r.ensurePeers()
case <-quickTicker.C:
if r.sw.peers.Size() < 3 {
- r.ensurePeers(r.sw.peers.Size())
+ r.ensurePeers()
}
case <-r.Quit:
ticker.Stop()
+ quickTicker.Stop()
return
}
}
// What we're currently doing in terms of marking good/bad peers is just a
// placeholder. It should not be the case that an address becomes old/vetted
// upon a single successful connection.
-func (r *PEXReactor) ensurePeers(num int) {
+func (r *PEXReactor) ensurePeers() {
+ if !atomic.CompareAndSwapInt32(&r.dialing, 0, 1) {
+ log.Info("Ensure peers ...")
+ return
+ }
+ defer atomic.StoreInt32(&r.dialing, 0)
+
numOutPeers, _, numDialing := r.Switch.NumPeers()
- numToDial := minNumOutboundPeers*(minNumOutboundPeers-num) - (numOutPeers + numDialing)
+ numToDial := minNumOutboundPeers*(minNumOutboundPeers-numOutPeers) - (numOutPeers + numDialing)
log.WithFields(log.Fields{
"numOutPeers": numOutPeers,
"numDialing": numDialing,
}
// Dial picked addresses
for _, item := range toDial {
+ r.wg.Add(1)
go func(picked *NetAddress) {
if _, err := r.Switch.DialPeerWithAddress(picked, false); err != nil {
r.book.MarkAttempt(picked)
} else {
r.book.MarkGood(picked)
}
+ r.wg.Done()
}(item)
}
+ r.wg.Wait()
// If we need more addresses, pick a random peer and ask for more.
if r.book.NeedMoreAddrs() {