"time"
log "github.com/sirupsen/logrus"
- cmn "github.com/tendermint/tmlibs/common"
"github.com/bytom/p2p"
"github.com/bytom/p2p/connection"
// PexChannel is a channel for PEX messages
PexChannel = byte(0x00)
- minNumOutboundPeers = 5
- maxPexMessageSize = 1048576 // 1MB
- defaultMaxMsgCountByPeer = uint16(1000)
+ minNumOutboundPeers = 5
+ maxPexMessageSize = 1048576 // 1MB
)
// PEXReactor handles peer exchange and ensures that an adequate number of peers are connected to the switch.
type PEXReactor struct {
p2p.BaseReactor
- discv *discover.Network
- msgCountByPeer *cmn.CMap
+ discv *discover.Network
}
// NewPEXReactor creates new PEX reactor.
func NewPEXReactor(discv *discover.Network) *PEXReactor {
r := &PEXReactor{
- discv: discv,
- msgCountByPeer: cmn.NewCMap(),
+ discv: discv,
}
r.BaseReactor = *p2p.NewBaseReactor("PEXReactor", r)
return r
func (r *PEXReactor) OnStart() error {
r.BaseReactor.OnStart()
go r.ensurePeersRoutine()
- go r.flushMsgCountByPeer()
return nil
}
// Receive implements Reactor by handling incoming PEX messages.
func (r *PEXReactor) Receive(chID byte, p *p2p.Peer, rawMsg []byte) {
- r.incrementMsgCount(p.RemoteAddr)
- if r.reachedMaxMsgLimit(p.RemoteAddr) {
- log.WithField("peer", p.RemoteAddr).Error("reached the max pex messages limit")
- r.Switch.StopPeerGracefully(p)
- return
- }
-
_, msg, err := DecodeMessage(rawMsg)
if err != nil {
log.WithField("error", err).Error("failed to decoding pex message")
return
}
- toDial := make(map[string]*p2p.NetAddress)
connectedPeers := make(map[string]struct{})
for _, peer := range r.Switch.Peers().List() {
connectedPeers[peer.RemoteAddrHost()] = struct{}{}
}
+ var wg sync.WaitGroup
nodes := make([]*discover.Node, numToDial)
n := r.discv.ReadRandomNodes(nodes)
- for i := 0; i < n && len(toDial) < numToDial; i++ {
+ for i := 0; i < n; i++ {
try := p2p.NewNetAddressIPPort(nodes[i].IP, nodes[i].TCP)
- if r.Switch.NodeInfo().RemoteAddr == try.String() {
- continue
- }
- if _, selected := toDial[try.IP.String()]; selected {
+ if r.Switch.NodeInfo().ListenAddr == try.String() {
continue
}
if dialling := r.Switch.IsDialing(try); dialling {
}
log.Debug("Will dial address addr:", try)
- toDial[try.IP.String()] = try
- }
-
- var wg sync.WaitGroup
- for _, item := range toDial {
wg.Add(1)
- go r.dialPeerWorker(item, &wg)
+ go r.dialPeerWorker(try, &wg)
}
wg.Wait()
}
}
}
}
-
-func (r *PEXReactor) flushMsgCountByPeer() {
- ticker := time.NewTicker(1 * time.Hour)
- for {
- select {
- case <-ticker.C:
- r.msgCountByPeer.Clear()
- case <-r.Quit:
- return
- }
- }
-}
-
-func (r *PEXReactor) incrementMsgCount(addr string) {
- var count uint16
- if countI := r.msgCountByPeer.Get(addr); countI != nil {
- count = countI.(uint16)
- }
- count++
- r.msgCountByPeer.Set(addr, count)
-}
-
-func (r *PEXReactor) reachedMaxMsgLimit(addr string) bool {
- return r.msgCountByPeer.Get(addr).(uint16) >= defaultMaxMsgCountByPeer
-}