12 log "github.com/sirupsen/logrus"
13 wire "github.com/tendermint/go-wire"
14 cmn "github.com/tendermint/tmlibs/common"
16 "github.com/bytom/errors"
20 // PexChannel is a channel for PEX messages
21 PexChannel = byte(0x00)
23 // period to ensure peers connected
24 defaultEnsurePeersPeriod = 120 * time.Second
25 minNumOutboundPeers = 5
26 maxPexMessageSize = 1048576 // 1MB
28 // maximum messages one peer can send to us during `msgCountByPeerFlushInterval`
29 defaultMaxMsgCountByPeer = 1000
30 msgCountByPeerFlushInterval = 1 * time.Hour
33 var ErrSendPexFail = errors.New("Send pex message fail")
35 // PEXReactor handles PEX (peer exchange) and ensures that an
36 // adequate number of peers are connected to the switch.
38 // It uses `AddrBook` (address book) to store `NetAddress`es of the peers.
40 // ## Preventing abuse
42 // For now, it just limits the number of messages from one peer to
43 // `defaultMaxMsgCountByPeer` messages per `msgCountByPeerFlushInterval` (1000
47 // Limiting is fine for now. Maybe down the road we want to keep track of the
48 // quality of peer messages so if peerA keeps telling us about peers we can't
49 // connect to then maybe we should care less about peerA. But I don't think
50 // that kind of complexity is priority right now.
51 type PEXReactor struct {
56 ensurePeersPeriod time.Duration
58 // tracks message count by peer, so we can prevent abuse
59 msgCountByPeer *cmn.CMap
60 maxMsgCountByPeer uint16
63 // NewPEXReactor creates new PEX reactor.
64 func NewPEXReactor(b *AddrBook, sw *Switch) *PEXReactor {
68 ensurePeersPeriod: defaultEnsurePeersPeriod,
69 msgCountByPeer: cmn.NewCMap(),
70 maxMsgCountByPeer: defaultMaxMsgCountByPeer,
72 r.BaseReactor = *NewBaseReactor("PEXReactor", r)
76 // OnStart implements BaseService
77 func (r *PEXReactor) OnStart() error {
78 r.BaseReactor.OnStart()
80 go r.ensurePeersRoutine()
81 go r.flushMsgCountByPeer()
85 // OnStop implements BaseService
86 func (r *PEXReactor) OnStop() {
87 r.BaseReactor.OnStop()
91 // GetChannels implements Reactor
92 func (r *PEXReactor) GetChannels() []*ChannelDescriptor {
93 return []*ChannelDescriptor{
97 SendQueueCapacity: 10,
102 // AddPeer implements Reactor by adding peer to the address book (if inbound)
103 // or by requesting more addresses (if outbound).
104 func (r *PEXReactor) AddPeer(p *Peer) error {
106 // For outbound peers, the address is already in the books.
107 // Either it was added in DialSeeds or when we
108 // received the peer's address in r.Receive
109 if r.book.NeedMoreAddrs() {
110 if ok := r.RequestPEX(p); !ok {
111 return ErrSendPexFail
117 // For inbound connections, the peer is its own source
118 addr, err := NewNetAddressString(p.ListenAddr)
120 // this should never happen
121 log.WithFields(log.Fields{
122 "addr": p.ListenAddr,
124 }).Error("Error in AddPeer: Invalid peer address")
125 return errors.New("Error in AddPeer: Invalid peer address")
127 r.book.AddAddress(addr, addr)
129 // close the connect if connect is big than max limit
130 if r.sw.peers.Size() >= r.sw.config.MaxNumPeers {
131 if ok := r.SendAddrs(p, r.book.GetSelection()); ok {
132 <-time.After(1 * time.Second)
133 r.sw.StopPeerGracefully(p)
135 return errors.New("Error in AddPeer: reach the max peer, exchange then close")
141 // RemovePeer implements Reactor.
142 func (r *PEXReactor) RemovePeer(p *Peer, reason interface{}) {
143 // If we aren't keeping track of local temp data for each peer here, then we
144 // don't have to do anything.
147 // Receive implements Reactor by handling incoming PEX messages.
148 func (r *PEXReactor) Receive(chID byte, src *Peer, msgBytes []byte) {
149 srcAddr := src.Connection().RemoteAddress
150 srcAddrStr := srcAddr.String()
152 r.IncrementMsgCountForPeer(srcAddrStr)
153 if r.ReachedMaxMsgCountForPeer(srcAddrStr) {
154 log.WithField("peer", srcAddrStr).Error("Maximum number of messages reached for peer")
155 // TODO remove src from peers?
159 _, msg, err := DecodeMessage(msgBytes)
161 log.WithField("error", err).Error("Error decoding message")
164 log.WithField("msg", msg).Info("Reveived message")
166 switch msg := msg.(type) {
167 case *pexRequestMessage:
168 // src requested some peers.
169 if ok := r.SendAddrs(src, r.book.GetSelection()); !ok {
170 log.Info("Send address message failed. Stop peer.")
172 case *pexAddrsMessage:
173 // We received some peer addresses from src.
174 // (We don't want to get spammed with bad peers)
175 for _, addr := range msg.Addrs {
177 r.book.AddAddress(addr, srcAddr)
181 log.WithField("type", reflect.TypeOf(msg)).Error("Unknown message type")
185 // RequestPEX asks peer for more addresses.
186 func (r *PEXReactor) RequestPEX(p *Peer) bool {
187 ok := p.TrySend(PexChannel, struct{ PexMessage }{&pexRequestMessage{}})
189 r.sw.StopPeerGracefully(p)
194 // SendAddrs sends addrs to the peer.
195 func (r *PEXReactor) SendAddrs(p *Peer, addrs []*NetAddress) bool {
196 ok := p.TrySend(PexChannel, struct{ PexMessage }{&pexAddrsMessage{Addrs: addrs}})
198 r.sw.StopPeerGracefully(p)
203 // SetEnsurePeersPeriod sets period to ensure peers connected.
204 func (r *PEXReactor) SetEnsurePeersPeriod(d time.Duration) {
205 r.ensurePeersPeriod = d
208 // SetMaxMsgCountByPeer sets maximum messages one peer can send to us during 'msgCountByPeerFlushInterval'.
209 func (r *PEXReactor) SetMaxMsgCountByPeer(v uint16) {
210 r.maxMsgCountByPeer = v
213 // ReachedMaxMsgCountForPeer returns true if we received too many
214 // messages from peer with address `addr`.
215 // NOTE: assumes the value in the CMap is non-nil
216 func (r *PEXReactor) ReachedMaxMsgCountForPeer(addr string) bool {
217 return r.msgCountByPeer.Get(addr).(uint16) >= r.maxMsgCountByPeer
220 // Increment or initialize the msg count for the peer in the CMap
221 func (r *PEXReactor) IncrementMsgCountForPeer(addr string) {
223 countI := r.msgCountByPeer.Get(addr)
225 count = countI.(uint16)
228 r.msgCountByPeer.Set(addr, count)
231 // Ensures that sufficient peers are connected. (continuous)
232 func (r *PEXReactor) ensurePeersRoutine() {
233 // Randomize when routine starts
234 ensurePeersPeriodMs := int64(10000)
235 time.Sleep(time.Duration(rand.Int63n(ensurePeersPeriodMs)) * time.Millisecond)
237 // fire once immediately.
241 ticker := time.NewTicker(r.ensurePeersPeriod)
242 quickTicker := time.NewTicker(time.Second * 1)
248 case <-quickTicker.C:
249 if r.sw.peers.Size() < 3 {
260 // ensurePeers ensures that sufficient peers are connected. (once)
262 // Old bucket / New bucket are arbitrary categories to denote whether an
263 // address is vetted or not, and this needs to be determined over time via a
264 // heuristic that we haven't perfected yet, or, perhaps is manually edited by
265 // the node operator. It should not be used to compute what addresses are
266 // already connected or not.
268 // TODO Basically, we need to work harder on our good-peer/bad-peer marking.
269 // What we're currently doing in terms of marking good/bad peers is just a
270 // placeholder. It should not be the case that an address becomes old/vetted
271 // upon a single successful connection.
272 func (r *PEXReactor) ensurePeers() {
273 numOutPeers, _, numDialing := r.Switch.NumPeers()
274 numToDial := (minNumOutboundPeers - (numOutPeers + numDialing)) * 5
275 log.WithFields(log.Fields{
276 "numOutPeers": numOutPeers,
277 "numDialing": numDialing,
278 "numToDial": numToDial,
279 }).Info("Ensure peers")
284 newBias := cmn.MinInt(numOutPeers, 8)*10 + 10
285 toDial := make(map[string]*NetAddress)
287 // Try to pick numToDial addresses to dial.
288 for i := 0; i < numToDial; i++ {
289 // The purpose of newBias is to first prioritize old (more vetted) peers
290 // when we have few connections, but to allow for new (less vetted) peers
291 // if we already have many connections. This algorithm isn't perfect, but
292 // it somewhat ensures that we prioritize connecting to more-vetted
295 var picked *NetAddress
296 // Try to fetch a new peer 3 times.
297 // This caps the maximum number of tries to 3 * numToDial.
298 for j := 0; j < 3; j++ {
299 try := r.book.PickAddress(newBias)
303 ka := r.book.addrLookup[try.String()]
309 _, alreadySelected := toDial[try.IP.String()]
310 alreadyDialing := r.Switch.IsDialing(try)
311 var alreadyConnected bool
313 for _, v := range r.Switch.Peers().list {
314 if strings.Compare(v.mconn.RemoteAddress.IP.String(), try.IP.String()) == 0 {
315 alreadyConnected = true
319 if alreadySelected || alreadyDialing || alreadyConnected {
322 log.Debug("Will dial address addr:", try)
330 toDial[picked.IP.String()] = picked
333 var wg sync.WaitGroup
334 for _, item := range toDial {
336 go r.dialPeerWorker(item, &wg)
340 // If we need more addresses, pick a random peer and ask for more.
341 if r.book.NeedMoreAddrs() {
342 if peers := r.Switch.Peers().List(); len(peers) > 0 {
343 i := rand.Int() % len(peers)
345 log.WithField("peer", peer).Info("No addresses to dial. Sending pexRequest to random peer")
346 if ok := r.RequestPEX(peer); !ok {
347 log.Info("Send request address message failed. Stop peer.")
353 func (r *PEXReactor) dialPeerWorker(a *NetAddress, wg *sync.WaitGroup) {
354 if _, err := r.Switch.DialPeerWithAddress(a, false); err != nil {
355 r.book.MarkAttempt(a)
362 func (r *PEXReactor) flushMsgCountByPeer() {
363 ticker := time.NewTicker(msgCountByPeerFlushInterval)
368 r.msgCountByPeer.Clear()
376 //-----------------------------------------------------------------------------
380 msgTypeRequest = byte(0x01)
381 msgTypeAddrs = byte(0x02)
384 // PexMessage is a primary type for PEX messages. Underneath, it could contain
385 // either pexRequestMessage, or pexAddrsMessage messages.
386 type PexMessage interface{}
388 var _ = wire.RegisterInterface(
389 struct{ PexMessage }{},
390 wire.ConcreteType{&pexRequestMessage{}, msgTypeRequest},
391 wire.ConcreteType{&pexAddrsMessage{}, msgTypeAddrs},
394 // DecodeMessage implements interface registered above.
395 func DecodeMessage(bz []byte) (msgType byte, msg PexMessage, err error) {
398 r := bytes.NewReader(bz)
399 msg = wire.ReadBinary(struct{ PexMessage }{}, r, maxPexMessageSize, n, &err).(struct{ PexMessage }).PexMessage
404 A pexRequestMessage requests additional peer addresses.
406 type pexRequestMessage struct {
409 func (m *pexRequestMessage) String() string {
410 return "[pexRequest]"
414 A message with announced peer addresses.
416 type pexAddrsMessage struct {
420 func (m *pexAddrsMessage) String() string {
421 return fmt.Sprintf("[pexAddrs %v]", m.Addrs)