OSDN Git Service

Fix pex stopAndRemovePeer panic
[bytom/bytom.git] / p2p / pex_reactor.go
1 package p2p
2
3 import (
4         "bytes"
5         "fmt"
6         "math/rand"
7         "reflect"
8         "strings"
9         "time"
10
11         log "github.com/sirupsen/logrus"
12         wire "github.com/tendermint/go-wire"
13         cmn "github.com/tendermint/tmlibs/common"
14
15         "github.com/bytom/errors"
16 )
17
18 const (
19         // PexChannel is a channel for PEX messages
20         PexChannel = byte(0x00)
21
22         // period to ensure peers connected
23         defaultEnsurePeersPeriod = 120 * time.Second
24         minNumOutboundPeers      = 10
25         maxPexMessageSize        = 1048576 // 1MB
26
27         // maximum messages one peer can send to us during `msgCountByPeerFlushInterval`
28         defaultMaxMsgCountByPeer    = 1000
29         msgCountByPeerFlushInterval = 1 * time.Hour
30 )
31
32 var ErrSendPexFail = errors.New("Send pex message fail")
33
34 // PEXReactor handles PEX (peer exchange) and ensures that an
35 // adequate number of peers are connected to the switch.
36 //
37 // It uses `AddrBook` (address book) to store `NetAddress`es of the peers.
38 //
39 // ## Preventing abuse
40 //
41 // For now, it just limits the number of messages from one peer to
42 // `defaultMaxMsgCountByPeer` messages per `msgCountByPeerFlushInterval` (1000
43 // msg/hour).
44 //
45 // NOTE [2017-01-17]:
46 //   Limiting is fine for now. Maybe down the road we want to keep track of the
47 //   quality of peer messages so if peerA keeps telling us about peers we can't
48 //   connect to then maybe we should care less about peerA. But I don't think
49 //   that kind of complexity is priority right now.
50 type PEXReactor struct {
51         BaseReactor
52
53         sw                *Switch
54         book              *AddrBook
55         ensurePeersPeriod time.Duration
56
57         // tracks message count by peer, so we can prevent abuse
58         msgCountByPeer    *cmn.CMap
59         maxMsgCountByPeer uint16
60 }
61
62 // NewPEXReactor creates new PEX reactor.
63 func NewPEXReactor(b *AddrBook, sw *Switch) *PEXReactor {
64         r := &PEXReactor{
65                 sw:                sw,
66                 book:              b,
67                 ensurePeersPeriod: defaultEnsurePeersPeriod,
68                 msgCountByPeer:    cmn.NewCMap(),
69                 maxMsgCountByPeer: defaultMaxMsgCountByPeer,
70         }
71         r.BaseReactor = *NewBaseReactor("PEXReactor", r)
72         return r
73 }
74
75 // OnStart implements BaseService
76 func (r *PEXReactor) OnStart() error {
77         r.BaseReactor.OnStart()
78         r.book.Start()
79         go r.ensurePeersRoutine()
80         go r.flushMsgCountByPeer()
81         return nil
82 }
83
84 // OnStop implements BaseService
85 func (r *PEXReactor) OnStop() {
86         r.BaseReactor.OnStop()
87         r.book.Stop()
88 }
89
90 // GetChannels implements Reactor
91 func (r *PEXReactor) GetChannels() []*ChannelDescriptor {
92         return []*ChannelDescriptor{
93                 &ChannelDescriptor{
94                         ID:                PexChannel,
95                         Priority:          1,
96                         SendQueueCapacity: 10,
97                 },
98         }
99 }
100
101 // AddPeer implements Reactor by adding peer to the address book (if inbound)
102 // or by requesting more addresses (if outbound).
103 func (r *PEXReactor) AddPeer(p *Peer) error {
104         if p.IsOutbound() {
105                 // For outbound peers, the address is already in the books.
106                 // Either it was added in DialSeeds or when we
107                 // received the peer's address in r.Receive
108                 if r.book.NeedMoreAddrs() {
109                         if ok := r.RequestPEX(p); !ok {
110                                 return ErrSendPexFail
111                         }
112                 }
113         } else { // For inbound connections, the peer is its own source
114                 addr, err := NewNetAddressString(p.ListenAddr)
115                 if err != nil {
116                         // this should never happen
117                         log.WithFields(log.Fields{
118                                 "addr":  p.ListenAddr,
119                                 "error": err,
120                         }).Error("Error in AddPeer: Invalid peer address")
121                         return errors.New("Error in AddPeer: Invalid peer address")
122                 }
123                 r.book.AddAddress(addr, addr)
124         }
125         return nil
126 }
127
128 // RemovePeer implements Reactor.
129 func (r *PEXReactor) RemovePeer(p *Peer, reason interface{}) {
130         // If we aren't keeping track of local temp data for each peer here, then we
131         // don't have to do anything.
132 }
133
134 // Receive implements Reactor by handling incoming PEX messages.
135 func (r *PEXReactor) Receive(chID byte, src *Peer, msgBytes []byte) {
136         srcAddr := src.Connection().RemoteAddress
137         srcAddrStr := srcAddr.String()
138
139         r.IncrementMsgCountForPeer(srcAddrStr)
140         if r.ReachedMaxMsgCountForPeer(srcAddrStr) {
141                 log.WithField("peer", srcAddrStr).Error("Maximum number of messages reached for peer")
142                 // TODO remove src from peers?
143                 return
144         }
145
146         _, msg, err := DecodeMessage(msgBytes)
147         if err != nil {
148                 log.WithField("error", err).Error("Error decoding message")
149                 return
150         }
151         log.WithField("msg", msg).Info("Reveived message")
152
153         switch msg := msg.(type) {
154         case *pexRequestMessage:
155                 // src requested some peers.
156                 if ok := r.SendAddrs(src, r.book.GetSelection()); !ok {
157                         log.Info("Send address message failed. Stop peer.")
158                 }
159         case *pexAddrsMessage:
160                 // We received some peer addresses from src.
161                 // (We don't want to get spammed with bad peers)
162                 for _, addr := range msg.Addrs {
163                         if addr != nil {
164                                 r.book.AddAddress(addr, srcAddr)
165                         }
166                 }
167         default:
168                 log.WithField("type", reflect.TypeOf(msg)).Error("Unknown message type")
169         }
170 }
171
172 // RequestPEX asks peer for more addresses.
173 func (r *PEXReactor) RequestPEX(p *Peer) bool {
174         ok := p.TrySend(PexChannel, struct{ PexMessage }{&pexRequestMessage{}})
175         if !ok {
176                 r.sw.StopPeerGracefully(p)
177         }
178         return ok
179 }
180
181 // SendAddrs sends addrs to the peer.
182 func (r *PEXReactor) SendAddrs(p *Peer, addrs []*NetAddress) bool {
183         ok := p.TrySend(PexChannel, struct{ PexMessage }{&pexAddrsMessage{Addrs: addrs}})
184         if !ok {
185                 r.sw.StopPeerGracefully(p)
186         }
187         return ok
188 }
189
190 // SetEnsurePeersPeriod sets period to ensure peers connected.
191 func (r *PEXReactor) SetEnsurePeersPeriod(d time.Duration) {
192         r.ensurePeersPeriod = d
193 }
194
195 // SetMaxMsgCountByPeer sets maximum messages one peer can send to us during 'msgCountByPeerFlushInterval'.
196 func (r *PEXReactor) SetMaxMsgCountByPeer(v uint16) {
197         r.maxMsgCountByPeer = v
198 }
199
200 // ReachedMaxMsgCountForPeer returns true if we received too many
201 // messages from peer with address `addr`.
202 // NOTE: assumes the value in the CMap is non-nil
203 func (r *PEXReactor) ReachedMaxMsgCountForPeer(addr string) bool {
204         return r.msgCountByPeer.Get(addr).(uint16) >= r.maxMsgCountByPeer
205 }
206
207 // Increment or initialize the msg count for the peer in the CMap
208 func (r *PEXReactor) IncrementMsgCountForPeer(addr string) {
209         var count uint16
210         countI := r.msgCountByPeer.Get(addr)
211         if countI != nil {
212                 count = countI.(uint16)
213         }
214         count++
215         r.msgCountByPeer.Set(addr, count)
216 }
217
218 // Ensures that sufficient peers are connected. (continuous)
219 func (r *PEXReactor) ensurePeersRoutine() {
220         // Randomize when routine starts
221         ensurePeersPeriodMs := int64(10000)
222         time.Sleep(time.Duration(rand.Int63n(ensurePeersPeriodMs)) * time.Millisecond)
223
224         // fire once immediately.
225         r.ensurePeers()
226
227         // fire periodically
228         ticker := time.NewTicker(r.ensurePeersPeriod)
229
230         for {
231                 select {
232                 case <-ticker.C:
233                         r.ensurePeers()
234                 case <-r.Quit:
235                         ticker.Stop()
236                         return
237                 }
238         }
239 }
240
241 // ensurePeers ensures that sufficient peers are connected. (once)
242 //
243 // Old bucket / New bucket are arbitrary categories to denote whether an
244 // address is vetted or not, and this needs to be determined over time via a
245 // heuristic that we haven't perfected yet, or, perhaps is manually edited by
246 // the node operator. It should not be used to compute what addresses are
247 // already connected or not.
248 //
249 // TODO Basically, we need to work harder on our good-peer/bad-peer marking.
250 // What we're currently doing in terms of marking good/bad peers is just a
251 // placeholder. It should not be the case that an address becomes old/vetted
252 // upon a single successful connection.
253 func (r *PEXReactor) ensurePeers() {
254         numOutPeers, _, numDialing := r.Switch.NumPeers()
255         numToDial := minNumOutboundPeers - (numOutPeers + numDialing)
256         log.WithFields(log.Fields{
257                 "numOutPeers": numOutPeers,
258                 "numDialing":  numDialing,
259                 "numToDial":   numToDial,
260         }).Info("Ensure peers")
261         if numToDial <= 0 {
262                 return
263         }
264
265         newBias := cmn.MinInt(numOutPeers, 8)*10 + 10
266         toDial := make(map[string]*NetAddress)
267
268         // Try to pick numToDial addresses to dial.
269         for i := 0; i < numToDial; i++ {
270                 // The purpose of newBias is to first prioritize old (more vetted) peers
271                 // when we have few connections, but to allow for new (less vetted) peers
272                 // if we already have many connections. This algorithm isn't perfect, but
273                 // it somewhat ensures that we prioritize connecting to more-vetted
274                 // peers.
275
276                 var picked *NetAddress
277                 // Try to fetch a new peer 3 times.
278                 // This caps the maximum number of tries to 3 * numToDial.
279                 for j := 0; j < 3; j++ {
280                         try := r.book.PickAddress(newBias)
281                         if try == nil {
282                                 break
283                         }
284                         ka := r.book.addrLookup[try.String()]
285                         if ka != nil {
286                                 if ka.isBad() {
287                                         continue
288                                 }
289                         }
290                         _, alreadySelected := toDial[try.IP.String()]
291                         alreadyDialing := r.Switch.IsDialing(try)
292                         var alreadyConnected bool
293
294                         for _, v := range r.Switch.Peers().list {
295                                 if strings.Compare(v.mconn.RemoteAddress.IP.String(), try.IP.String()) == 0 {
296                                         alreadyConnected = true
297                                         break
298                                 }
299                         }
300                         if alreadySelected || alreadyDialing || alreadyConnected {
301                                 continue
302                         } else {
303                                 log.WithField("addr", try).Info("Will dial address")
304                                 picked = try
305                                 break
306                         }
307                 }
308                 if picked == nil {
309                         continue
310                 }
311                 toDial[picked.IP.String()] = picked
312         }
313
314         // Dial picked addresses
315         for _, item := range toDial {
316                 if _, err := r.Switch.DialPeerWithAddress(item, false); err != nil {
317                         r.book.MarkAttempt(item)
318                 } else {
319                         r.book.MarkGood(item)
320                 }
321         }
322
323         // If we need more addresses, pick a random peer and ask for more.
324         if r.book.NeedMoreAddrs() {
325                 if peers := r.Switch.Peers().List(); len(peers) > 0 {
326                         i := rand.Int() % len(peers)
327                         peer := peers[i]
328                         log.WithField("peer", peer).Info("No addresses to dial. Sending pexRequest to random peer")
329                         if ok := r.RequestPEX(peer); !ok {
330                                 log.Info("Send request address message failed. Stop peer.")
331                         }
332                 }
333         }
334 }
335
336 func (r *PEXReactor) flushMsgCountByPeer() {
337         ticker := time.NewTicker(msgCountByPeerFlushInterval)
338
339         for {
340                 select {
341                 case <-ticker.C:
342                         r.msgCountByPeer.Clear()
343                 case <-r.Quit:
344                         ticker.Stop()
345                         return
346                 }
347         }
348 }
349
350 //-----------------------------------------------------------------------------
351 // Messages
352
353 const (
354         msgTypeRequest = byte(0x01)
355         msgTypeAddrs   = byte(0x02)
356 )
357
358 // PexMessage is a primary type for PEX messages. Underneath, it could contain
359 // either pexRequestMessage, or pexAddrsMessage messages.
360 type PexMessage interface{}
361
362 var _ = wire.RegisterInterface(
363         struct{ PexMessage }{},
364         wire.ConcreteType{&pexRequestMessage{}, msgTypeRequest},
365         wire.ConcreteType{&pexAddrsMessage{}, msgTypeAddrs},
366 )
367
368 // DecodeMessage implements interface registered above.
369 func DecodeMessage(bz []byte) (msgType byte, msg PexMessage, err error) {
370         msgType = bz[0]
371         n := new(int)
372         r := bytes.NewReader(bz)
373         msg = wire.ReadBinary(struct{ PexMessage }{}, r, maxPexMessageSize, n, &err).(struct{ PexMessage }).PexMessage
374         return
375 }
376
377 /*
378 A pexRequestMessage requests additional peer addresses.
379 */
380 type pexRequestMessage struct {
381 }
382
383 func (m *pexRequestMessage) String() string {
384         return "[pexRequest]"
385 }
386
387 /*
388 A message with announced peer addresses.
389 */
390 type pexAddrsMessage struct {
391         Addrs []*NetAddress
392 }
393
394 func (m *pexAddrsMessage) String() string {
395         return fmt.Sprintf("[pexAddrs %v]", m.Addrs)
396 }