11 log "github.com/sirupsen/logrus"
12 wire "github.com/tendermint/go-wire"
13 cmn "github.com/tendermint/tmlibs/common"
15 "github.com/bytom/errors"
19 // PexChannel is a channel for PEX messages
20 PexChannel = byte(0x00)
22 // period to ensure peers connected
23 defaultEnsurePeersPeriod = 30 * time.Second
24 minNumOutboundPeers = 10
25 maxPexMessageSize = 1048576 // 1MB
27 // maximum messages one peer can send to us during `msgCountByPeerFlushInterval`
28 defaultMaxMsgCountByPeer = 1000
29 msgCountByPeerFlushInterval = 1 * time.Hour
32 // PEXReactor handles PEX (peer exchange) and ensures that an
33 // adequate number of peers are connected to the switch.
35 // It uses `AddrBook` (address book) to store `NetAddress`es of the peers.
37 // ## Preventing abuse
39 // For now, it just limits the number of messages from one peer to
40 // `defaultMaxMsgCountByPeer` messages per `msgCountByPeerFlushInterval` (1000
44 // Limiting is fine for now. Maybe down the road we want to keep track of the
45 // quality of peer messages so if peerA keeps telling us about peers we can't
46 // connect to then maybe we should care less about peerA. But I don't think
47 // that kind of complexity is priority right now.
48 type PEXReactor struct {
53 ensurePeersPeriod time.Duration
55 // tracks message count by peer, so we can prevent abuse
56 msgCountByPeer *cmn.CMap
57 maxMsgCountByPeer uint16
60 // NewPEXReactor creates new PEX reactor.
61 func NewPEXReactor(b *AddrBook) *PEXReactor {
64 ensurePeersPeriod: defaultEnsurePeersPeriod,
65 msgCountByPeer: cmn.NewCMap(),
66 maxMsgCountByPeer: defaultMaxMsgCountByPeer,
68 r.BaseReactor = *NewBaseReactor("PEXReactor", r)
72 // OnStart implements BaseService
73 func (r *PEXReactor) OnStart() error {
74 r.BaseReactor.OnStart()
76 go r.ensurePeersRoutine()
77 go r.flushMsgCountByPeer()
81 // OnStop implements BaseService
82 func (r *PEXReactor) OnStop() {
83 r.BaseReactor.OnStop()
87 // GetChannels implements Reactor
88 func (r *PEXReactor) GetChannels() []*ChannelDescriptor {
89 return []*ChannelDescriptor{
93 SendQueueCapacity: 10,
98 // AddPeer implements Reactor by adding peer to the address book (if inbound)
99 // or by requesting more addresses (if outbound).
100 func (r *PEXReactor) AddPeer(p *Peer) error {
102 // For outbound peers, the address is already in the books.
103 // Either it was added in DialSeeds or when we
104 // received the peer's address in r.Receive
105 if r.book.NeedMoreAddrs() {
108 } else { // For inbound connections, the peer is its own source
109 addr, err := NewNetAddressString(p.ListenAddr)
111 // this should never happen
112 log.WithFields(log.Fields{
113 "addr": p.ListenAddr,
115 }).Error("Error in AddPeer: Invalid peer address")
116 return errors.New("Error in AddPeer: Invalid peer address")
118 r.book.AddAddress(addr, addr)
123 // RemovePeer implements Reactor.
124 func (r *PEXReactor) RemovePeer(p *Peer, reason interface{}) {
125 // If we aren't keeping track of local temp data for each peer here, then we
126 // don't have to do anything.
129 // Receive implements Reactor by handling incoming PEX messages.
130 func (r *PEXReactor) Receive(chID byte, src *Peer, msgBytes []byte) {
131 srcAddr := src.Connection().RemoteAddress
132 srcAddrStr := srcAddr.String()
134 r.IncrementMsgCountForPeer(srcAddrStr)
135 if r.ReachedMaxMsgCountForPeer(srcAddrStr) {
136 log.WithField("peer", srcAddrStr).Error("Maximum number of messages reached for peer")
137 // TODO remove src from peers?
141 _, msg, err := DecodeMessage(msgBytes)
143 log.WithField("error", err).Error("Error decoding message")
146 log.WithField("msg", msg).Info("Reveived message")
148 switch msg := msg.(type) {
149 case *pexRequestMessage:
150 // src requested some peers.
151 r.SendAddrs(src, r.book.GetSelection())
152 case *pexAddrsMessage:
153 // We received some peer addresses from src.
154 // (We don't want to get spammed with bad peers)
155 for _, addr := range msg.Addrs {
157 r.book.AddAddress(addr, srcAddr)
161 log.WithField("type", reflect.TypeOf(msg)).Error("Unknown message type")
165 // RequestPEX asks peer for more addresses.
166 func (r *PEXReactor) RequestPEX(p *Peer) {
167 p.Send(PexChannel, struct{ PexMessage }{&pexRequestMessage{}})
170 // SendAddrs sends addrs to the peer.
171 func (r *PEXReactor) SendAddrs(p *Peer, addrs []*NetAddress) {
172 p.Send(PexChannel, struct{ PexMessage }{&pexAddrsMessage{Addrs: addrs}})
175 // SetEnsurePeersPeriod sets period to ensure peers connected.
176 func (r *PEXReactor) SetEnsurePeersPeriod(d time.Duration) {
177 r.ensurePeersPeriod = d
180 // SetMaxMsgCountByPeer sets maximum messages one peer can send to us during 'msgCountByPeerFlushInterval'.
181 func (r *PEXReactor) SetMaxMsgCountByPeer(v uint16) {
182 r.maxMsgCountByPeer = v
185 // ReachedMaxMsgCountForPeer returns true if we received too many
186 // messages from peer with address `addr`.
187 // NOTE: assumes the value in the CMap is non-nil
188 func (r *PEXReactor) ReachedMaxMsgCountForPeer(addr string) bool {
189 return r.msgCountByPeer.Get(addr).(uint16) >= r.maxMsgCountByPeer
192 // Increment or initialize the msg count for the peer in the CMap
193 func (r *PEXReactor) IncrementMsgCountForPeer(addr string) {
195 countI := r.msgCountByPeer.Get(addr)
197 count = countI.(uint16)
200 r.msgCountByPeer.Set(addr, count)
203 // Ensures that sufficient peers are connected. (continuous)
204 func (r *PEXReactor) ensurePeersRoutine() {
205 // Randomize when routine starts
206 ensurePeersPeriodMs := r.ensurePeersPeriod.Nanoseconds() / 1e6
207 time.Sleep(time.Duration(rand.Int63n(ensurePeersPeriodMs)) * time.Millisecond)
209 // fire once immediately.
213 ticker := time.NewTicker(r.ensurePeersPeriod)
226 // ensurePeers ensures that sufficient peers are connected. (once)
228 // Old bucket / New bucket are arbitrary categories to denote whether an
229 // address is vetted or not, and this needs to be determined over time via a
230 // heuristic that we haven't perfected yet, or, perhaps is manually edited by
231 // the node operator. It should not be used to compute what addresses are
232 // already connected or not.
234 // TODO Basically, we need to work harder on our good-peer/bad-peer marking.
235 // What we're currently doing in terms of marking good/bad peers is just a
236 // placeholder. It should not be the case that an address becomes old/vetted
237 // upon a single successful connection.
238 func (r *PEXReactor) ensurePeers() {
239 numOutPeers, _, numDialing := r.Switch.NumPeers()
240 numToDial := minNumOutboundPeers - (numOutPeers + numDialing)
241 log.WithFields(log.Fields{
242 "numOutPeers": numOutPeers,
243 "numDialing": numDialing,
244 "numToDial": numToDial,
245 }).Info("Ensure peers")
250 newBias := cmn.MinInt(numOutPeers, 8)*10 + 10
251 toDial := make(map[string]*NetAddress)
253 // Try to pick numToDial addresses to dial.
254 for i := 0; i < numToDial; i++ {
255 // The purpose of newBias is to first prioritize old (more vetted) peers
256 // when we have few connections, but to allow for new (less vetted) peers
257 // if we already have many connections. This algorithm isn't perfect, but
258 // it somewhat ensures that we prioritize connecting to more-vetted
261 var picked *NetAddress
262 // Try to fetch a new peer 3 times.
263 // This caps the maximum number of tries to 3 * numToDial.
264 for j := 0; j < 3; j++ {
265 try := r.book.PickAddress(newBias)
269 _, alreadySelected := toDial[try.IP.String()]
270 alreadyDialing := r.Switch.IsDialing(try)
271 var alreadyConnected bool
273 for _, v := range r.Switch.Peers().list {
274 if strings.Compare(v.mconn.RemoteAddress.IP.String(), try.IP.String()) == 0 {
275 alreadyConnected = true
279 if alreadySelected || alreadyDialing || alreadyConnected {
282 log.WithField("addr", try).Info("Will dial address")
290 toDial[picked.IP.String()] = picked
293 // Dial picked addresses
294 for _, item := range toDial {
295 if _, err := r.Switch.DialPeerWithAddress(item, false); err != nil {
296 r.book.MarkAttempt(item)
300 // If we need more addresses, pick a random peer and ask for more.
301 if r.book.NeedMoreAddrs() {
302 if peers := r.Switch.Peers().List(); len(peers) > 0 {
303 i := rand.Int() % len(peers)
305 log.WithField("peer", peer).Info("No addresses to dial. Sending pexRequest to random peer")
311 func (r *PEXReactor) flushMsgCountByPeer() {
312 ticker := time.NewTicker(msgCountByPeerFlushInterval)
317 r.msgCountByPeer.Clear()
325 //-----------------------------------------------------------------------------
329 msgTypeRequest = byte(0x01)
330 msgTypeAddrs = byte(0x02)
333 // PexMessage is a primary type for PEX messages. Underneath, it could contain
334 // either pexRequestMessage, or pexAddrsMessage messages.
335 type PexMessage interface{}
337 var _ = wire.RegisterInterface(
338 struct{ PexMessage }{},
339 wire.ConcreteType{&pexRequestMessage{}, msgTypeRequest},
340 wire.ConcreteType{&pexAddrsMessage{}, msgTypeAddrs},
343 // DecodeMessage implements interface registered above.
344 func DecodeMessage(bz []byte) (msgType byte, msg PexMessage, err error) {
347 r := bytes.NewReader(bz)
348 msg = wire.ReadBinary(struct{ PexMessage }{}, r, maxPexMessageSize, n, &err).(struct{ PexMessage }).PexMessage
353 A pexRequestMessage requests additional peer addresses.
355 type pexRequestMessage struct {
358 func (m *pexRequestMessage) String() string {
359 return "[pexRequest]"
363 A message with announced peer addresses.
365 type pexAddrsMessage struct {
369 func (m *pexAddrsMessage) String() string {
370 return fmt.Sprintf("[pexAddrs %v]", m.Addrs)