12 log "github.com/sirupsen/logrus"
13 wire "github.com/tendermint/go-wire"
14 cmn "github.com/tendermint/tmlibs/common"
16 "github.com/bytom/errors"
20 // PexChannel is a channel for PEX messages
21 PexChannel = byte(0x00)
23 // period to ensure peers connected
24 defaultEnsurePeersPeriod = 120 * time.Second
25 minNumOutboundPeers = 5
26 maxPexMessageSize = 1048576 // 1MB
28 // maximum messages one peer can send to us during `msgCountByPeerFlushInterval`
29 defaultMaxMsgCountByPeer = 1000
30 msgCountByPeerFlushInterval = 1 * time.Hour
33 var ErrSendPexFail = errors.New("Send pex message fail")
35 // PEXReactor handles PEX (peer exchange) and ensures that an
36 // adequate number of peers are connected to the switch.
38 // It uses `AddrBook` (address book) to store `NetAddress`es of the peers.
40 // ## Preventing abuse
42 // For now, it just limits the number of messages from one peer to
43 // `defaultMaxMsgCountByPeer` messages per `msgCountByPeerFlushInterval` (1000
47 // Limiting is fine for now. Maybe down the road we want to keep track of the
48 // quality of peer messages so if peerA keeps telling us about peers we can't
49 // connect to then maybe we should care less about peerA. But I don't think
50 // that kind of complexity is priority right now.
51 type PEXReactor struct {
56 ensurePeersPeriod time.Duration
58 // tracks message count by peer, so we can prevent abuse
59 msgCountByPeer *cmn.CMap
60 maxMsgCountByPeer uint16
63 // NewPEXReactor creates new PEX reactor.
64 func NewPEXReactor(b *AddrBook, sw *Switch) *PEXReactor {
68 ensurePeersPeriod: defaultEnsurePeersPeriod,
69 msgCountByPeer: cmn.NewCMap(),
70 maxMsgCountByPeer: defaultMaxMsgCountByPeer,
72 r.BaseReactor = *NewBaseReactor("PEXReactor", r)
76 // OnStart implements BaseService
77 func (r *PEXReactor) OnStart() error {
78 r.BaseReactor.OnStart()
80 go r.ensurePeersRoutine()
81 go r.flushMsgCountByPeer()
85 // OnStop implements BaseService
86 func (r *PEXReactor) OnStop() {
87 r.BaseReactor.OnStop()
91 // GetChannels implements Reactor
92 func (r *PEXReactor) GetChannels() []*ChannelDescriptor {
93 return []*ChannelDescriptor{
97 SendQueueCapacity: 10,
102 // AddPeer implements Reactor by adding peer to the address book (if inbound)
103 // or by requesting more addresses (if outbound).
104 func (r *PEXReactor) AddPeer(p *Peer) error {
106 // For outbound peers, the address is already in the books.
107 // Either it was added in DialSeeds or when we
108 // received the peer's address in r.Receive
109 if r.book.NeedMoreAddrs() {
110 if ok := r.RequestPEX(p); !ok {
111 return ErrSendPexFail
117 // For inbound connections, the peer is its own source
118 addr, err := NewNetAddressString(p.ListenAddr)
120 log.WithFields(log.Fields{"addr": p.ListenAddr, "error": err}).Error("Error in AddPeer: Invalid peer address")
121 return errors.New("Error in AddPeer: Invalid peer address")
123 r.book.AddAddress(addr, addr)
125 // close the connect if connect is big than max limit
126 if r.sw.peers.Size() >= r.sw.config.MaxNumPeers {
127 if ok := r.SendAddrs(p, r.book.GetSelection()); ok {
128 <-time.After(1 * time.Second)
129 r.sw.StopPeerGracefully(p)
131 return errors.New("Error in AddPeer: reach the max peer, exchange then close")
137 // RemovePeer implements Reactor.
138 func (r *PEXReactor) RemovePeer(p *Peer, reason interface{}) {
139 // If we aren't keeping track of local temp data for each peer here, then we
140 // don't have to do anything.
143 // Receive implements Reactor by handling incoming PEX messages.
144 func (r *PEXReactor) Receive(chID byte, src *Peer, msgBytes []byte) {
145 srcAddr := src.Connection().RemoteAddress
146 srcAddrStr := srcAddr.String()
148 r.IncrementMsgCountForPeer(srcAddrStr)
149 if r.ReachedMaxMsgCountForPeer(srcAddrStr) {
150 log.WithField("peer", srcAddrStr).Error("Maximum number of messages reached for peer")
151 // TODO remove src from peers?
155 _, msg, err := DecodeMessage(msgBytes)
157 log.WithField("error", err).Error("Error decoding message")
160 log.WithField("msg", msg).Info("Reveived message")
162 switch msg := msg.(type) {
163 case *pexRequestMessage:
164 // src requested some peers.
165 if ok := r.SendAddrs(src, r.book.GetSelection()); !ok {
166 log.Info("Send address message failed. Stop peer.")
168 case *pexAddrsMessage:
169 // We received some peer addresses from src.
170 // (We don't want to get spammed with bad peers)
171 for _, addr := range msg.Addrs {
173 r.book.AddAddress(addr, srcAddr)
177 log.WithField("type", reflect.TypeOf(msg)).Error("Unknown message type")
181 // RequestPEX asks peer for more addresses.
182 func (r *PEXReactor) RequestPEX(p *Peer) bool {
183 ok := p.TrySend(PexChannel, struct{ PexMessage }{&pexRequestMessage{}})
185 r.sw.StopPeerGracefully(p)
190 // SendAddrs sends addrs to the peer.
191 func (r *PEXReactor) SendAddrs(p *Peer, addrs []*NetAddress) bool {
192 ok := p.TrySend(PexChannel, struct{ PexMessage }{&pexAddrsMessage{Addrs: addrs}})
194 r.sw.StopPeerGracefully(p)
199 // SetEnsurePeersPeriod sets period to ensure peers connected.
200 func (r *PEXReactor) SetEnsurePeersPeriod(d time.Duration) {
201 r.ensurePeersPeriod = d
204 // SetMaxMsgCountByPeer sets maximum messages one peer can send to us during 'msgCountByPeerFlushInterval'.
205 func (r *PEXReactor) SetMaxMsgCountByPeer(v uint16) {
206 r.maxMsgCountByPeer = v
209 // ReachedMaxMsgCountForPeer returns true if we received too many
210 // messages from peer with address `addr`.
211 // NOTE: assumes the value in the CMap is non-nil
212 func (r *PEXReactor) ReachedMaxMsgCountForPeer(addr string) bool {
213 return r.msgCountByPeer.Get(addr).(uint16) >= r.maxMsgCountByPeer
216 // Increment or initialize the msg count for the peer in the CMap
217 func (r *PEXReactor) IncrementMsgCountForPeer(addr string) {
219 countI := r.msgCountByPeer.Get(addr)
221 count = countI.(uint16)
224 r.msgCountByPeer.Set(addr, count)
227 // Ensures that sufficient peers are connected. (continuous)
228 func (r *PEXReactor) ensurePeersRoutine() {
229 // Randomize when routine starts
230 ensurePeersPeriodMs := int64(10000)
231 time.Sleep(time.Duration(rand.Int63n(ensurePeersPeriodMs)) * time.Millisecond)
233 // fire once immediately.
237 ticker := time.NewTicker(r.ensurePeersPeriod)
238 quickTicker := time.NewTicker(time.Second * 1)
244 case <-quickTicker.C:
245 if r.sw.peers.Size() < 3 {
256 // ensurePeers ensures that sufficient peers are connected. (once)
258 // Old bucket / New bucket are arbitrary categories to denote whether an
259 // address is vetted or not, and this needs to be determined over time via a
260 // heuristic that we haven't perfected yet, or, perhaps is manually edited by
261 // the node operator. It should not be used to compute what addresses are
262 // already connected or not.
264 // TODO Basically, we need to work harder on our good-peer/bad-peer marking.
265 // What we're currently doing in terms of marking good/bad peers is just a
266 // placeholder. It should not be the case that an address becomes old/vetted
267 // upon a single successful connection.
268 func (r *PEXReactor) ensurePeers() {
269 numOutPeers, _, numDialing := r.Switch.NumPeers()
270 numToDial := (minNumOutboundPeers - (numOutPeers + numDialing)) * 5
271 log.WithFields(log.Fields{
272 "numOutPeers": numOutPeers,
273 "numDialing": numDialing,
274 "numToDial": numToDial,
275 }).Info("Ensure peers")
280 newBias := cmn.MinInt(numOutPeers, 8)*10 + 10
281 toDial := make(map[string]*NetAddress)
283 // Try to pick numToDial addresses to dial.
284 for i := 0; i < numToDial; i++ {
285 // The purpose of newBias is to first prioritize old (more vetted) peers
286 // when we have few connections, but to allow for new (less vetted) peers
287 // if we already have many connections. This algorithm isn't perfect, but
288 // it somewhat ensures that we prioritize connecting to more-vetted
291 var picked *NetAddress
292 // Try to fetch a new peer 3 times.
293 // This caps the maximum number of tries to 3 * numToDial.
294 for j := 0; j < 3; j++ {
295 try := r.book.PickAddress(newBias)
299 ka := r.book.addrLookup[try.String()]
305 _, alreadySelected := toDial[try.IP.String()]
306 alreadyDialing := r.Switch.IsDialing(try)
307 var alreadyConnected bool
309 for _, v := range r.Switch.Peers().list {
310 if strings.Compare(v.mconn.RemoteAddress.IP.String(), try.IP.String()) == 0 {
311 alreadyConnected = true
315 if alreadySelected || alreadyDialing || alreadyConnected {
318 log.Debug("Will dial address addr:", try)
326 toDial[picked.IP.String()] = picked
329 var wg sync.WaitGroup
330 for _, item := range toDial {
332 go r.dialPeerWorker(item, &wg)
336 // If we need more addresses, pick a random peer and ask for more.
337 if r.book.NeedMoreAddrs() {
338 if peers := r.Switch.Peers().List(); len(peers) > 0 {
339 i := rand.Int() % len(peers)
341 log.WithField("peer", peer).Info("No addresses to dial. Sending pexRequest to random peer")
342 if ok := r.RequestPEX(peer); !ok {
343 log.Info("Send request address message failed. Stop peer.")
349 func (r *PEXReactor) dialPeerWorker(a *NetAddress, wg *sync.WaitGroup) {
350 if _, err := r.Switch.DialPeerWithAddress(a, false); err != nil {
351 r.book.MarkAttempt(a)
358 func (r *PEXReactor) flushMsgCountByPeer() {
359 ticker := time.NewTicker(msgCountByPeerFlushInterval)
364 r.msgCountByPeer.Clear()
372 //-----------------------------------------------------------------------------
376 msgTypeRequest = byte(0x01)
377 msgTypeAddrs = byte(0x02)
380 // PexMessage is a primary type for PEX messages. Underneath, it could contain
381 // either pexRequestMessage, or pexAddrsMessage messages.
382 type PexMessage interface{}
384 var _ = wire.RegisterInterface(
385 struct{ PexMessage }{},
386 wire.ConcreteType{&pexRequestMessage{}, msgTypeRequest},
387 wire.ConcreteType{&pexAddrsMessage{}, msgTypeAddrs},
390 // DecodeMessage implements interface registered above.
391 func DecodeMessage(bz []byte) (msgType byte, msg PexMessage, err error) {
394 r := bytes.NewReader(bz)
395 msg = wire.ReadBinary(struct{ PexMessage }{}, r, maxPexMessageSize, n, &err).(struct{ PexMessage }).PexMessage
400 A pexRequestMessage requests additional peer addresses.
402 type pexRequestMessage struct {
405 func (m *pexRequestMessage) String() string {
406 return "[pexRequest]"
410 A message with announced peer addresses.
412 type pexAddrsMessage struct {
416 func (m *pexAddrsMessage) String() string {
417 return fmt.Sprintf("[pexAddrs %v]", m.Addrs)