11 log "github.com/sirupsen/logrus"
12 wire "github.com/tendermint/go-wire"
13 cmn "github.com/tendermint/tmlibs/common"
15 "github.com/bytom/errors"
19 // PexChannel is a channel for PEX messages
20 PexChannel = byte(0x00)
22 // period to ensure peers connected
23 defaultEnsurePeersPeriod = 120 * time.Second
24 minNumOutboundPeers = 5
25 maxPexMessageSize = 1048576 // 1MB
27 // maximum messages one peer can send to us during `msgCountByPeerFlushInterval`
28 defaultMaxMsgCountByPeer = 1000
29 msgCountByPeerFlushInterval = 1 * time.Hour
32 var ErrSendPexFail = errors.New("Send pex message fail")
34 // PEXReactor handles PEX (peer exchange) and ensures that an
35 // adequate number of peers are connected to the switch.
37 // It uses `AddrBook` (address book) to store `NetAddress`es of the peers.
39 // ## Preventing abuse
41 // For now, it just limits the number of messages from one peer to
42 // `defaultMaxMsgCountByPeer` messages per `msgCountByPeerFlushInterval` (1000
46 // Limiting is fine for now. Maybe down the road we want to keep track of the
47 // quality of peer messages so if peerA keeps telling us about peers we can't
48 // connect to then maybe we should care less about peerA. But I don't think
49 // that kind of complexity is priority right now.
50 type PEXReactor struct {
55 ensurePeersPeriod time.Duration
57 // tracks message count by peer, so we can prevent abuse
58 msgCountByPeer *cmn.CMap
59 maxMsgCountByPeer uint16
62 // NewPEXReactor creates new PEX reactor.
63 func NewPEXReactor(b *AddrBook, sw *Switch) *PEXReactor {
67 ensurePeersPeriod: defaultEnsurePeersPeriod,
68 msgCountByPeer: cmn.NewCMap(),
69 maxMsgCountByPeer: defaultMaxMsgCountByPeer,
71 r.BaseReactor = *NewBaseReactor("PEXReactor", r)
75 // OnStart implements BaseService
76 func (r *PEXReactor) OnStart() error {
77 r.BaseReactor.OnStart()
79 go r.ensurePeersRoutine()
80 go r.flushMsgCountByPeer()
84 // OnStop implements BaseService
85 func (r *PEXReactor) OnStop() {
86 r.BaseReactor.OnStop()
90 // GetChannels implements Reactor
91 func (r *PEXReactor) GetChannels() []*ChannelDescriptor {
92 return []*ChannelDescriptor{
96 SendQueueCapacity: 10,
101 // AddPeer implements Reactor by adding peer to the address book (if inbound)
102 // or by requesting more addresses (if outbound).
103 func (r *PEXReactor) AddPeer(p *Peer) error {
105 // For outbound peers, the address is already in the books.
106 // Either it was added in DialSeeds or when we
107 // received the peer's address in r.Receive
108 if r.book.NeedMoreAddrs() {
109 if ok := r.RequestPEX(p); !ok {
110 return ErrSendPexFail
116 // For inbound connections, the peer is its own source
117 addr, err := NewNetAddressString(p.ListenAddr)
119 // this should never happen
120 log.WithFields(log.Fields{
121 "addr": p.ListenAddr,
123 }).Error("Error in AddPeer: Invalid peer address")
124 return errors.New("Error in AddPeer: Invalid peer address")
126 r.book.AddAddress(addr, addr)
128 // close the connect if connect is big than max limit
129 if r.sw.peers.Size() >= r.sw.config.MaxNumPeers {
130 if ok := r.SendAddrs(p, r.book.GetSelection()); ok {
131 r.sw.StopPeerGracefully(p)
133 return errors.New("Error in AddPeer: reach the max peer, exchange then close")
138 // RemovePeer implements Reactor.
139 func (r *PEXReactor) RemovePeer(p *Peer, reason interface{}) {
140 // If we aren't keeping track of local temp data for each peer here, then we
141 // don't have to do anything.
144 // Receive implements Reactor by handling incoming PEX messages.
145 func (r *PEXReactor) Receive(chID byte, src *Peer, msgBytes []byte) {
146 srcAddr := src.Connection().RemoteAddress
147 srcAddrStr := srcAddr.String()
149 r.IncrementMsgCountForPeer(srcAddrStr)
150 if r.ReachedMaxMsgCountForPeer(srcAddrStr) {
151 log.WithField("peer", srcAddrStr).Error("Maximum number of messages reached for peer")
152 // TODO remove src from peers?
156 _, msg, err := DecodeMessage(msgBytes)
158 log.WithField("error", err).Error("Error decoding message")
161 log.WithField("msg", msg).Info("Reveived message")
163 switch msg := msg.(type) {
164 case *pexRequestMessage:
165 // src requested some peers.
166 if ok := r.SendAddrs(src, r.book.GetSelection()); !ok {
167 log.Info("Send address message failed. Stop peer.")
169 case *pexAddrsMessage:
170 // We received some peer addresses from src.
171 // (We don't want to get spammed with bad peers)
172 for _, addr := range msg.Addrs {
174 r.book.AddAddress(addr, srcAddr)
178 log.WithField("type", reflect.TypeOf(msg)).Error("Unknown message type")
182 // RequestPEX asks peer for more addresses.
183 func (r *PEXReactor) RequestPEX(p *Peer) bool {
184 ok := p.TrySend(PexChannel, struct{ PexMessage }{&pexRequestMessage{}})
186 r.sw.StopPeerGracefully(p)
191 // SendAddrs sends addrs to the peer.
192 func (r *PEXReactor) SendAddrs(p *Peer, addrs []*NetAddress) bool {
193 ok := p.TrySend(PexChannel, struct{ PexMessage }{&pexAddrsMessage{Addrs: addrs}})
195 r.sw.StopPeerGracefully(p)
200 // SetEnsurePeersPeriod sets period to ensure peers connected.
201 func (r *PEXReactor) SetEnsurePeersPeriod(d time.Duration) {
202 r.ensurePeersPeriod = d
205 // SetMaxMsgCountByPeer sets maximum messages one peer can send to us during 'msgCountByPeerFlushInterval'.
206 func (r *PEXReactor) SetMaxMsgCountByPeer(v uint16) {
207 r.maxMsgCountByPeer = v
210 // ReachedMaxMsgCountForPeer returns true if we received too many
211 // messages from peer with address `addr`.
212 // NOTE: assumes the value in the CMap is non-nil
213 func (r *PEXReactor) ReachedMaxMsgCountForPeer(addr string) bool {
214 return r.msgCountByPeer.Get(addr).(uint16) >= r.maxMsgCountByPeer
217 // Increment or initialize the msg count for the peer in the CMap
218 func (r *PEXReactor) IncrementMsgCountForPeer(addr string) {
220 countI := r.msgCountByPeer.Get(addr)
222 count = countI.(uint16)
225 r.msgCountByPeer.Set(addr, count)
228 // Ensures that sufficient peers are connected. (continuous)
229 func (r *PEXReactor) ensurePeersRoutine() {
230 // Randomize when routine starts
231 ensurePeersPeriodMs := int64(10000)
232 time.Sleep(time.Duration(rand.Int63n(ensurePeersPeriodMs)) * time.Millisecond)
234 // fire once immediately.
238 ticker := time.NewTicker(r.ensurePeersPeriod)
251 // ensurePeers ensures that sufficient peers are connected. (once)
253 // Old bucket / New bucket are arbitrary categories to denote whether an
254 // address is vetted or not, and this needs to be determined over time via a
255 // heuristic that we haven't perfected yet, or, perhaps is manually edited by
256 // the node operator. It should not be used to compute what addresses are
257 // already connected or not.
259 // TODO Basically, we need to work harder on our good-peer/bad-peer marking.
260 // What we're currently doing in terms of marking good/bad peers is just a
261 // placeholder. It should not be the case that an address becomes old/vetted
262 // upon a single successful connection.
263 func (r *PEXReactor) ensurePeers() {
264 numOutPeers, _, numDialing := r.Switch.NumPeers()
265 numToDial := minNumOutboundPeers - (numOutPeers + numDialing)
266 log.WithFields(log.Fields{
267 "numOutPeers": numOutPeers,
268 "numDialing": numDialing,
269 "numToDial": numToDial,
270 }).Info("Ensure peers")
275 newBias := cmn.MinInt(numOutPeers, 8)*10 + 10
276 toDial := make(map[string]*NetAddress)
278 // Try to pick numToDial addresses to dial.
279 for i := 0; i < numToDial; i++ {
280 // The purpose of newBias is to first prioritize old (more vetted) peers
281 // when we have few connections, but to allow for new (less vetted) peers
282 // if we already have many connections. This algorithm isn't perfect, but
283 // it somewhat ensures that we prioritize connecting to more-vetted
286 var picked *NetAddress
287 // Try to fetch a new peer 3 times.
288 // This caps the maximum number of tries to 3 * numToDial.
289 for j := 0; j < 3; j++ {
290 try := r.book.PickAddress(newBias)
294 ka := r.book.addrLookup[try.String()]
300 _, alreadySelected := toDial[try.IP.String()]
301 alreadyDialing := r.Switch.IsDialing(try)
302 var alreadyConnected bool
304 for _, v := range r.Switch.Peers().list {
305 if strings.Compare(v.mconn.RemoteAddress.IP.String(), try.IP.String()) == 0 {
306 alreadyConnected = true
310 if alreadySelected || alreadyDialing || alreadyConnected {
313 log.WithField("addr", try).Info("Will dial address")
321 toDial[picked.IP.String()] = picked
324 // Dial picked addresses
325 for _, item := range toDial {
326 if _, err := r.Switch.DialPeerWithAddress(item, false); err != nil {
327 r.book.MarkAttempt(item)
329 r.book.MarkGood(item)
333 // If we need more addresses, pick a random peer and ask for more.
334 if r.book.NeedMoreAddrs() {
335 if peers := r.Switch.Peers().List(); len(peers) > 0 {
336 i := rand.Int() % len(peers)
338 log.WithField("peer", peer).Info("No addresses to dial. Sending pexRequest to random peer")
339 if ok := r.RequestPEX(peer); !ok {
340 log.Info("Send request address message failed. Stop peer.")
346 func (r *PEXReactor) flushMsgCountByPeer() {
347 ticker := time.NewTicker(msgCountByPeerFlushInterval)
352 r.msgCountByPeer.Clear()
360 //-----------------------------------------------------------------------------
364 msgTypeRequest = byte(0x01)
365 msgTypeAddrs = byte(0x02)
368 // PexMessage is a primary type for PEX messages. Underneath, it could contain
369 // either pexRequestMessage, or pexAddrsMessage messages.
370 type PexMessage interface{}
372 var _ = wire.RegisterInterface(
373 struct{ PexMessage }{},
374 wire.ConcreteType{&pexRequestMessage{}, msgTypeRequest},
375 wire.ConcreteType{&pexAddrsMessage{}, msgTypeAddrs},
378 // DecodeMessage implements interface registered above.
379 func DecodeMessage(bz []byte) (msgType byte, msg PexMessage, err error) {
382 r := bytes.NewReader(bz)
383 msg = wire.ReadBinary(struct{ PexMessage }{}, r, maxPexMessageSize, n, &err).(struct{ PexMessage }).PexMessage
388 A pexRequestMessage requests additional peer addresses.
390 type pexRequestMessage struct {
393 func (m *pexRequestMessage) String() string {
394 return "[pexRequest]"
398 A message with announced peer addresses.
400 type pexAddrsMessage struct {
404 func (m *pexAddrsMessage) String() string {
405 return fmt.Sprintf("[pexAddrs %v]", m.Addrs)