11 log "github.com/sirupsen/logrus"
12 wire "github.com/tendermint/go-wire"
13 cmn "github.com/tendermint/tmlibs/common"
15 "github.com/bytom/errors"
19 // PexChannel is a channel for PEX messages
20 PexChannel = byte(0x00)
22 // period to ensure peers connected
23 defaultEnsurePeersPeriod = 120 * time.Second
24 minNumOutboundPeers = 10
25 maxPexMessageSize = 1048576 // 1MB
27 // maximum messages one peer can send to us during `msgCountByPeerFlushInterval`
28 defaultMaxMsgCountByPeer = 1000
29 msgCountByPeerFlushInterval = 1 * time.Hour
32 var ErrSendPexFail = errors.New("Send pex message fail")
34 // PEXReactor handles PEX (peer exchange) and ensures that an
35 // adequate number of peers are connected to the switch.
37 // It uses `AddrBook` (address book) to store `NetAddress`es of the peers.
39 // ## Preventing abuse
41 // For now, it just limits the number of messages from one peer to
42 // `defaultMaxMsgCountByPeer` messages per `msgCountByPeerFlushInterval` (1000
46 // Limiting is fine for now. Maybe down the road we want to keep track of the
47 // quality of peer messages so if peerA keeps telling us about peers we can't
48 // connect to then maybe we should care less about peerA. But I don't think
49 // that kind of complexity is priority right now.
50 type PEXReactor struct {
55 ensurePeersPeriod time.Duration
57 // tracks message count by peer, so we can prevent abuse
58 msgCountByPeer *cmn.CMap
59 maxMsgCountByPeer uint16
62 // NewPEXReactor creates new PEX reactor.
63 func NewPEXReactor(b *AddrBook, sw *Switch) *PEXReactor {
67 ensurePeersPeriod: defaultEnsurePeersPeriod,
68 msgCountByPeer: cmn.NewCMap(),
69 maxMsgCountByPeer: defaultMaxMsgCountByPeer,
71 r.BaseReactor = *NewBaseReactor("PEXReactor", r)
75 // OnStart implements BaseService
76 func (r *PEXReactor) OnStart() error {
77 r.BaseReactor.OnStart()
79 go r.ensurePeersRoutine()
80 go r.flushMsgCountByPeer()
84 // OnStop implements BaseService
85 func (r *PEXReactor) OnStop() {
86 r.BaseReactor.OnStop()
90 // GetChannels implements Reactor
91 func (r *PEXReactor) GetChannels() []*ChannelDescriptor {
92 return []*ChannelDescriptor{
96 SendQueueCapacity: 10,
101 // AddPeer implements Reactor by adding peer to the address book (if inbound)
102 // or by requesting more addresses (if outbound).
103 func (r *PEXReactor) AddPeer(p *Peer) error {
105 // For outbound peers, the address is already in the books.
106 // Either it was added in DialSeeds or when we
107 // received the peer's address in r.Receive
108 if r.book.NeedMoreAddrs() {
109 if ok := r.RequestPEX(p); !ok {
110 return ErrSendPexFail
113 } else { // For inbound connections, the peer is its own source
114 addr, err := NewNetAddressString(p.ListenAddr)
116 // this should never happen
117 log.WithFields(log.Fields{
118 "addr": p.ListenAddr,
120 }).Error("Error in AddPeer: Invalid peer address")
121 return errors.New("Error in AddPeer: Invalid peer address")
123 r.book.AddAddress(addr, addr)
128 // RemovePeer implements Reactor.
129 func (r *PEXReactor) RemovePeer(p *Peer, reason interface{}) {
130 // If we aren't keeping track of local temp data for each peer here, then we
131 // don't have to do anything.
134 // Receive implements Reactor by handling incoming PEX messages.
135 func (r *PEXReactor) Receive(chID byte, src *Peer, msgBytes []byte) {
136 srcAddr := src.Connection().RemoteAddress
137 srcAddrStr := srcAddr.String()
139 r.IncrementMsgCountForPeer(srcAddrStr)
140 if r.ReachedMaxMsgCountForPeer(srcAddrStr) {
141 log.WithField("peer", srcAddrStr).Error("Maximum number of messages reached for peer")
142 // TODO remove src from peers?
146 _, msg, err := DecodeMessage(msgBytes)
148 log.WithField("error", err).Error("Error decoding message")
151 log.WithField("msg", msg).Info("Reveived message")
153 switch msg := msg.(type) {
154 case *pexRequestMessage:
155 // src requested some peers.
156 if ok := r.SendAddrs(src, r.book.GetSelection()); !ok {
157 log.Info("Send address message failed. Stop peer.")
159 case *pexAddrsMessage:
160 // We received some peer addresses from src.
161 // (We don't want to get spammed with bad peers)
162 for _, addr := range msg.Addrs {
164 r.book.AddAddress(addr, srcAddr)
168 log.WithField("type", reflect.TypeOf(msg)).Error("Unknown message type")
172 // RequestPEX asks peer for more addresses.
173 func (r *PEXReactor) RequestPEX(p *Peer) bool {
174 ok := p.TrySend(PexChannel, struct{ PexMessage }{&pexRequestMessage{}})
176 r.sw.StopPeerGracefully(p)
181 // SendAddrs sends addrs to the peer.
182 func (r *PEXReactor) SendAddrs(p *Peer, addrs []*NetAddress) bool {
183 ok := p.TrySend(PexChannel, struct{ PexMessage }{&pexAddrsMessage{Addrs: addrs}})
185 r.sw.StopPeerGracefully(p)
190 // SetEnsurePeersPeriod sets period to ensure peers connected.
191 func (r *PEXReactor) SetEnsurePeersPeriod(d time.Duration) {
192 r.ensurePeersPeriod = d
195 // SetMaxMsgCountByPeer sets maximum messages one peer can send to us during 'msgCountByPeerFlushInterval'.
196 func (r *PEXReactor) SetMaxMsgCountByPeer(v uint16) {
197 r.maxMsgCountByPeer = v
200 // ReachedMaxMsgCountForPeer returns true if we received too many
201 // messages from peer with address `addr`.
202 // NOTE: assumes the value in the CMap is non-nil
203 func (r *PEXReactor) ReachedMaxMsgCountForPeer(addr string) bool {
204 return r.msgCountByPeer.Get(addr).(uint16) >= r.maxMsgCountByPeer
207 // Increment or initialize the msg count for the peer in the CMap
208 func (r *PEXReactor) IncrementMsgCountForPeer(addr string) {
210 countI := r.msgCountByPeer.Get(addr)
212 count = countI.(uint16)
215 r.msgCountByPeer.Set(addr, count)
218 // Ensures that sufficient peers are connected. (continuous)
219 func (r *PEXReactor) ensurePeersRoutine() {
220 // Randomize when routine starts
221 ensurePeersPeriodMs := int64(10000)
222 time.Sleep(time.Duration(rand.Int63n(ensurePeersPeriodMs)) * time.Millisecond)
224 // fire once immediately.
228 ticker := time.NewTicker(r.ensurePeersPeriod)
241 // ensurePeers ensures that sufficient peers are connected. (once)
243 // Old bucket / New bucket are arbitrary categories to denote whether an
244 // address is vetted or not, and this needs to be determined over time via a
245 // heuristic that we haven't perfected yet, or, perhaps is manually edited by
246 // the node operator. It should not be used to compute what addresses are
247 // already connected or not.
249 // TODO Basically, we need to work harder on our good-peer/bad-peer marking.
250 // What we're currently doing in terms of marking good/bad peers is just a
251 // placeholder. It should not be the case that an address becomes old/vetted
252 // upon a single successful connection.
253 func (r *PEXReactor) ensurePeers() {
254 numOutPeers, _, numDialing := r.Switch.NumPeers()
255 numToDial := minNumOutboundPeers - (numOutPeers + numDialing)
256 log.WithFields(log.Fields{
257 "numOutPeers": numOutPeers,
258 "numDialing": numDialing,
259 "numToDial": numToDial,
260 }).Info("Ensure peers")
265 newBias := cmn.MinInt(numOutPeers, 8)*10 + 10
266 toDial := make(map[string]*NetAddress)
268 // Try to pick numToDial addresses to dial.
269 for i := 0; i < numToDial; i++ {
270 // The purpose of newBias is to first prioritize old (more vetted) peers
271 // when we have few connections, but to allow for new (less vetted) peers
272 // if we already have many connections. This algorithm isn't perfect, but
273 // it somewhat ensures that we prioritize connecting to more-vetted
276 var picked *NetAddress
277 // Try to fetch a new peer 3 times.
278 // This caps the maximum number of tries to 3 * numToDial.
279 for j := 0; j < 3; j++ {
280 try := r.book.PickAddress(newBias)
284 ka := r.book.addrLookup[try.String()]
290 _, alreadySelected := toDial[try.IP.String()]
291 alreadyDialing := r.Switch.IsDialing(try)
292 var alreadyConnected bool
294 for _, v := range r.Switch.Peers().list {
295 if strings.Compare(v.mconn.RemoteAddress.IP.String(), try.IP.String()) == 0 {
296 alreadyConnected = true
300 if alreadySelected || alreadyDialing || alreadyConnected {
303 log.WithField("addr", try).Info("Will dial address")
311 toDial[picked.IP.String()] = picked
314 // Dial picked addresses
315 for _, item := range toDial {
316 if _, err := r.Switch.DialPeerWithAddress(item, false); err != nil {
317 r.book.MarkAttempt(item)
319 r.book.MarkGood(item)
323 // If we need more addresses, pick a random peer and ask for more.
324 if r.book.NeedMoreAddrs() {
325 if peers := r.Switch.Peers().List(); len(peers) > 0 {
326 i := rand.Int() % len(peers)
328 log.WithField("peer", peer).Info("No addresses to dial. Sending pexRequest to random peer")
329 if ok := r.RequestPEX(peer); !ok {
330 log.Info("Send request address message failed. Stop peer.")
336 func (r *PEXReactor) flushMsgCountByPeer() {
337 ticker := time.NewTicker(msgCountByPeerFlushInterval)
342 r.msgCountByPeer.Clear()
350 //-----------------------------------------------------------------------------
354 msgTypeRequest = byte(0x01)
355 msgTypeAddrs = byte(0x02)
358 // PexMessage is a primary type for PEX messages. Underneath, it could contain
359 // either pexRequestMessage, or pexAddrsMessage messages.
360 type PexMessage interface{}
362 var _ = wire.RegisterInterface(
363 struct{ PexMessage }{},
364 wire.ConcreteType{&pexRequestMessage{}, msgTypeRequest},
365 wire.ConcreteType{&pexAddrsMessage{}, msgTypeAddrs},
368 // DecodeMessage implements interface registered above.
369 func DecodeMessage(bz []byte) (msgType byte, msg PexMessage, err error) {
372 r := bytes.NewReader(bz)
373 msg = wire.ReadBinary(struct{ PexMessage }{}, r, maxPexMessageSize, n, &err).(struct{ PexMessage }).PexMessage
378 A pexRequestMessage requests additional peer addresses.
380 type pexRequestMessage struct {
383 func (m *pexRequestMessage) String() string {
384 return "[pexRequest]"
388 A message with announced peer addresses.
390 type pexAddrsMessage struct {
394 func (m *pexAddrsMessage) String() string {
395 return fmt.Sprintf("[pexAddrs %v]", m.Addrs)