10 log "github.com/sirupsen/logrus"
11 cmn "github.com/tendermint/tmlibs/common"
15 // PexChannel is a channel for PEX messages
16 PexChannel = byte(0x00)
18 defaultEnsurePeersPeriod = 120 * time.Second // period to ensure peers connected
19 minNumOutboundPeers = 5
20 maxPexMessageSize = 1048576 // 1MB
21 defaultMaxMsgCountByPeer = uint16(1000)
22 msgCountByPeerFlushInterval = 1 * time.Hour
25 // PEXReactor handles peer exchange and ensures that an adequate number of peers are connected to the switch.
26 type PEXReactor struct {
29 msgCountByPeer *cmn.CMap
32 // NewPEXReactor creates new PEX reactor.
33 func NewPEXReactor(b *AddrBook) *PEXReactor {
36 msgCountByPeer: cmn.NewCMap(),
38 r.BaseReactor = *NewBaseReactor("PEXReactor", r)
42 // OnStart implements BaseService
43 func (r *PEXReactor) OnStart() error {
44 r.BaseReactor.OnStart()
46 go r.ensurePeersRoutine()
47 go r.flushMsgCountByPeer()
51 // OnStop implements BaseService
52 func (r *PEXReactor) OnStop() {
53 r.BaseReactor.OnStop()
57 // GetChannels implements Reactor
58 func (r *PEXReactor) GetChannels() []*ChannelDescriptor {
59 return []*ChannelDescriptor{&ChannelDescriptor{
62 SendQueueCapacity: 10,
66 // AddPeer adding peer to the address book
67 func (r *PEXReactor) AddPeer(p *Peer) error {
69 if r.book.NeedMoreAddrs() && !r.RequestAddrs(p) {
70 return errors.New("Send pex message fail")
75 addr, err := NewNetAddressString(p.ListenAddr)
77 return errors.New("addPeer: invalid peer address")
80 r.book.AddAddress(addr, addr)
81 if r.Switch.peers.Size() >= r.Switch.config.MaxNumPeers {
82 if r.SendAddrs(p, r.book.GetSelection()) {
83 <-time.After(1 * time.Second)
84 r.Switch.StopPeerGracefully(p)
86 return errors.New("addPeer: reach the max peer, exchange then close")
91 // Receive implements Reactor by handling incoming PEX messages.
92 func (r *PEXReactor) Receive(chID byte, p *Peer, rawMsg []byte) {
93 srcAddr := p.Connection().RemoteAddress
94 srcAddrStr := srcAddr.String()
95 r.incrementMsgCount(srcAddrStr)
96 if r.reachedMaxMsgLimit(srcAddrStr) {
97 log.WithField("peer", srcAddrStr).Error("reached the max pex messages limit")
98 r.Switch.StopPeerGracefully(p)
102 _, msg, err := DecodeMessage(rawMsg)
104 log.WithField("error", err).Error("failed to decoding pex message")
105 r.Switch.StopPeerGracefully(p)
109 switch msg := msg.(type) {
110 case *pexRequestMessage:
111 if !r.SendAddrs(src, r.book.GetSelection()) {
112 log.Error("failed to send pex address message")
115 case *pexAddrsMessage:
116 for _, addr := range msg.Addrs {
117 if err := r.book.AddAddress(addr, srcAddr); err != nil {
118 log.WithField("error", err).Error("pex fail on process pexAddrsMessage")
119 r.Switch.StopPeerGracefully(p)
125 log.WithField("type", reflect.TypeOf(msg)).Error("Unknown message type")
129 // RemovePeer implements Reactor.
130 func (r *PEXReactor) RemovePeer(p *Peer, reason interface{}) {}
132 // RequestPEX asks peer for more addresses.
133 func (r *PEXReactor) RequestAddrs(p *Peer) bool {
134 ok := p.TrySend(PexChannel, struct{ PexMessage }{&pexRequestMessage{}})
136 r.Switch.StopPeerGracefully(p)
141 // SendAddrs sends addrs to the peer.
142 func (r *PEXReactor) SendAddrs(p *Peer, addrs []*NetAddress) bool {
143 ok := p.TrySend(PexChannel, struct{ PexMessage }{&pexAddrsMessage{Addrs: addrs}})
145 r.Switch.StopPeerGracefully(p)
150 func (r *PEXReactor) dialPeerWorker(a *NetAddress, wg *sync.WaitGroup) {
151 if _, err := r.Switch.DialPeerWithAddress(a, false); err != nil {
152 r.book.MarkAttempt(a)
159 func (r *PEXReactor) ensurePeers() {
160 numOutPeers, _, numDialing := r.Switch.NumPeers()
161 numToDial := (minNumOutboundPeers - (numOutPeers + numDialing)) * 5
162 log.WithFields(log.Fields{
163 "numOutPeers": numOutPeers,
164 "numDialing": numDialing,
165 "numToDial": numToDial,
166 }).Debug("ensure peers")
171 newBias := cmn.MinInt(numOutPeers, 8)*10 + 10
172 toDial := make(map[string]*NetAddress)
173 maxAttempts := numToDial * 3
174 for i := 0; i < maxAttempts && len(toDial) < numToDial; i++ {
175 try := r.book.PickAddress(newBias)
179 if _, selected := toDial[try.IP.String()]; selected {
182 if dialling := r.Switch.IsDialing(try); dialling {
185 if connected := r.Switch.Peers().Has(try.ID); connected {
189 log.Debug("Will dial address addr:", try)
190 toDial[try.IP.String()] = try
193 var wg sync.WaitGroup
194 for _, item := range toDial {
196 go r.dialPeerWorker(item, &wg)
200 if r.book.NeedMoreAddrs() {
201 if peers := r.Switch.Peers().List(); len(peers) > 0 {
202 peer := peers[rand.Int()%len(peers)]
208 func (r *PEXReactor) ensurePeersRoutine() {
210 ticker := time.NewTicker(defaultEnsurePeersPeriod)
211 quickTicker := time.NewTicker(time.Second * 1)
217 case <-quickTicker.C:
218 if r.Switch.peers.Size() < 3 {
227 func (r *PEXReactor) flushMsgCountByPeer() {
228 ticker := time.NewTicker(msgCountByPeerFlushInterval)
232 r.msgCountByPeer.Clear()
239 func (r *PEXReactor) incrementMsgCount(addr string) {
241 if countI := r.msgCountByPeer.Get(addr); countI != nil {
242 count = countI.(uint16)
245 r.msgCountByPeer.Set(addr, count)
248 func (r *PEXReactor) reachedMaxMsgLimit(addr string) bool {
249 return r.msgCountByPeer.Get(addr).(uint16) >= defaultMaxMsgCountByPeer