OSDN Git Service

Add initial value
[bytom/bytom.git] / p2p / pex_reactor.go
1 package p2p
2
3 import (
4         "bytes"
5         "fmt"
6         "math/rand"
7         "reflect"
8         "strings"
9         "time"
10
11         log "github.com/sirupsen/logrus"
12         wire "github.com/tendermint/go-wire"
13         cmn "github.com/tendermint/tmlibs/common"
14
15         "github.com/bytom/errors"
16 )
17
18 const (
19         // PexChannel is a channel for PEX messages
20         PexChannel = byte(0x00)
21
22         // period to ensure peers connected
23         defaultEnsurePeersPeriod = 120 * time.Second
24         minNumOutboundPeers      = 5
25         maxPexMessageSize        = 1048576 // 1MB
26
27         // maximum messages one peer can send to us during `msgCountByPeerFlushInterval`
28         defaultMaxMsgCountByPeer    = 1000
29         msgCountByPeerFlushInterval = 1 * time.Hour
30 )
31
32 var ErrSendPexFail = errors.New("Send pex message fail")
33
34 // PEXReactor handles PEX (peer exchange) and ensures that an
35 // adequate number of peers are connected to the switch.
36 //
37 // It uses `AddrBook` (address book) to store `NetAddress`es of the peers.
38 //
39 // ## Preventing abuse
40 //
41 // For now, it just limits the number of messages from one peer to
42 // `defaultMaxMsgCountByPeer` messages per `msgCountByPeerFlushInterval` (1000
43 // msg/hour).
44 //
45 // NOTE [2017-01-17]:
46 //   Limiting is fine for now. Maybe down the road we want to keep track of the
47 //   quality of peer messages so if peerA keeps telling us about peers we can't
48 //   connect to then maybe we should care less about peerA. But I don't think
49 //   that kind of complexity is priority right now.
50 type PEXReactor struct {
51         BaseReactor
52
53         sw                *Switch
54         book              *AddrBook
55         ensurePeersPeriod time.Duration
56
57         // tracks message count by peer, so we can prevent abuse
58         msgCountByPeer    *cmn.CMap
59         maxMsgCountByPeer uint16
60 }
61
62 // NewPEXReactor creates new PEX reactor.
63 func NewPEXReactor(b *AddrBook, sw *Switch) *PEXReactor {
64         r := &PEXReactor{
65                 sw:                sw,
66                 book:              b,
67                 ensurePeersPeriod: defaultEnsurePeersPeriod,
68                 msgCountByPeer:    cmn.NewCMap(),
69                 maxMsgCountByPeer: defaultMaxMsgCountByPeer,
70         }
71         r.BaseReactor = *NewBaseReactor("PEXReactor", r)
72         return r
73 }
74
75 // OnStart implements BaseService
76 func (r *PEXReactor) OnStart() error {
77         r.BaseReactor.OnStart()
78         r.book.Start()
79         go r.ensurePeersRoutine()
80         go r.flushMsgCountByPeer()
81         return nil
82 }
83
84 // OnStop implements BaseService
85 func (r *PEXReactor) OnStop() {
86         r.BaseReactor.OnStop()
87         r.book.Stop()
88 }
89
90 // GetChannels implements Reactor
91 func (r *PEXReactor) GetChannels() []*ChannelDescriptor {
92         return []*ChannelDescriptor{
93                 &ChannelDescriptor{
94                         ID:                PexChannel,
95                         Priority:          1,
96                         SendQueueCapacity: 10,
97                 },
98         }
99 }
100
101 // AddPeer implements Reactor by adding peer to the address book (if inbound)
102 // or by requesting more addresses (if outbound).
103 func (r *PEXReactor) AddPeer(p *Peer) error {
104         if p.IsOutbound() {
105                 // For outbound peers, the address is already in the books.
106                 // Either it was added in DialSeeds or when we
107                 // received the peer's address in r.Receive
108                 if r.book.NeedMoreAddrs() {
109                         if ok := r.RequestPEX(p); !ok {
110                                 return ErrSendPexFail
111                         }
112                 }
113                 return nil
114         }
115
116         // For inbound connections, the peer is its own source
117         addr, err := NewNetAddressString(p.ListenAddr)
118         if err != nil {
119                 // this should never happen
120                 log.WithFields(log.Fields{
121                         "addr":  p.ListenAddr,
122                         "error": err,
123                 }).Error("Error in AddPeer: Invalid peer address")
124                 return errors.New("Error in AddPeer: Invalid peer address")
125         }
126         r.book.AddAddress(addr, addr)
127
128         // close the connect if connect is big than max limit
129         if r.sw.peers.Size() >= r.sw.config.MaxNumPeers {
130                 if ok := r.SendAddrs(p, r.book.GetSelection()); ok {
131                         r.sw.StopPeerGracefully(p)
132                 }
133                 return errors.New("Error in AddPeer: reach the max peer, exchange then close")
134         }
135         return nil
136 }
137
138 // RemovePeer implements Reactor.
139 func (r *PEXReactor) RemovePeer(p *Peer, reason interface{}) {
140         // If we aren't keeping track of local temp data for each peer here, then we
141         // don't have to do anything.
142 }
143
144 // Receive implements Reactor by handling incoming PEX messages.
145 func (r *PEXReactor) Receive(chID byte, src *Peer, msgBytes []byte) {
146         srcAddr := src.Connection().RemoteAddress
147         srcAddrStr := srcAddr.String()
148
149         r.IncrementMsgCountForPeer(srcAddrStr)
150         if r.ReachedMaxMsgCountForPeer(srcAddrStr) {
151                 log.WithField("peer", srcAddrStr).Error("Maximum number of messages reached for peer")
152                 // TODO remove src from peers?
153                 return
154         }
155
156         _, msg, err := DecodeMessage(msgBytes)
157         if err != nil {
158                 log.WithField("error", err).Error("Error decoding message")
159                 return
160         }
161         log.WithField("msg", msg).Info("Reveived message")
162
163         switch msg := msg.(type) {
164         case *pexRequestMessage:
165                 // src requested some peers.
166                 if ok := r.SendAddrs(src, r.book.GetSelection()); !ok {
167                         log.Info("Send address message failed. Stop peer.")
168                 }
169         case *pexAddrsMessage:
170                 // We received some peer addresses from src.
171                 // (We don't want to get spammed with bad peers)
172                 for _, addr := range msg.Addrs {
173                         if addr != nil {
174                                 r.book.AddAddress(addr, srcAddr)
175                         }
176                 }
177         default:
178                 log.WithField("type", reflect.TypeOf(msg)).Error("Unknown message type")
179         }
180 }
181
182 // RequestPEX asks peer for more addresses.
183 func (r *PEXReactor) RequestPEX(p *Peer) bool {
184         ok := p.TrySend(PexChannel, struct{ PexMessage }{&pexRequestMessage{}})
185         if !ok {
186                 r.sw.StopPeerGracefully(p)
187         }
188         return ok
189 }
190
191 // SendAddrs sends addrs to the peer.
192 func (r *PEXReactor) SendAddrs(p *Peer, addrs []*NetAddress) bool {
193         ok := p.TrySend(PexChannel, struct{ PexMessage }{&pexAddrsMessage{Addrs: addrs}})
194         if !ok {
195                 r.sw.StopPeerGracefully(p)
196         }
197         return ok
198 }
199
200 // SetEnsurePeersPeriod sets period to ensure peers connected.
201 func (r *PEXReactor) SetEnsurePeersPeriod(d time.Duration) {
202         r.ensurePeersPeriod = d
203 }
204
205 // SetMaxMsgCountByPeer sets maximum messages one peer can send to us during 'msgCountByPeerFlushInterval'.
206 func (r *PEXReactor) SetMaxMsgCountByPeer(v uint16) {
207         r.maxMsgCountByPeer = v
208 }
209
210 // ReachedMaxMsgCountForPeer returns true if we received too many
211 // messages from peer with address `addr`.
212 // NOTE: assumes the value in the CMap is non-nil
213 func (r *PEXReactor) ReachedMaxMsgCountForPeer(addr string) bool {
214         return r.msgCountByPeer.Get(addr).(uint16) >= r.maxMsgCountByPeer
215 }
216
217 // Increment or initialize the msg count for the peer in the CMap
218 func (r *PEXReactor) IncrementMsgCountForPeer(addr string) {
219         var count uint16
220         countI := r.msgCountByPeer.Get(addr)
221         if countI != nil {
222                 count = countI.(uint16)
223         }
224         count++
225         r.msgCountByPeer.Set(addr, count)
226 }
227
228 // Ensures that sufficient peers are connected. (continuous)
229 func (r *PEXReactor) ensurePeersRoutine() {
230         // Randomize when routine starts
231         ensurePeersPeriodMs := int64(10000)
232         time.Sleep(time.Duration(rand.Int63n(ensurePeersPeriodMs)) * time.Millisecond)
233
234         // fire once immediately.
235         r.ensurePeers()
236
237         // fire periodically
238         ticker := time.NewTicker(r.ensurePeersPeriod)
239
240         for {
241                 select {
242                 case <-ticker.C:
243                         r.ensurePeers()
244                 case <-r.Quit:
245                         ticker.Stop()
246                         return
247                 }
248         }
249 }
250
251 // ensurePeers ensures that sufficient peers are connected. (once)
252 //
253 // Old bucket / New bucket are arbitrary categories to denote whether an
254 // address is vetted or not, and this needs to be determined over time via a
255 // heuristic that we haven't perfected yet, or, perhaps is manually edited by
256 // the node operator. It should not be used to compute what addresses are
257 // already connected or not.
258 //
259 // TODO Basically, we need to work harder on our good-peer/bad-peer marking.
260 // What we're currently doing in terms of marking good/bad peers is just a
261 // placeholder. It should not be the case that an address becomes old/vetted
262 // upon a single successful connection.
263 func (r *PEXReactor) ensurePeers() {
264         numOutPeers, _, numDialing := r.Switch.NumPeers()
265         numToDial := minNumOutboundPeers - (numOutPeers + numDialing)
266         log.WithFields(log.Fields{
267                 "numOutPeers": numOutPeers,
268                 "numDialing":  numDialing,
269                 "numToDial":   numToDial,
270         }).Info("Ensure peers")
271         if numToDial <= 0 {
272                 return
273         }
274
275         newBias := cmn.MinInt(numOutPeers, 8)*10 + 10
276         toDial := make(map[string]*NetAddress)
277
278         // Try to pick numToDial addresses to dial.
279         for i := 0; i < numToDial; i++ {
280                 // The purpose of newBias is to first prioritize old (more vetted) peers
281                 // when we have few connections, but to allow for new (less vetted) peers
282                 // if we already have many connections. This algorithm isn't perfect, but
283                 // it somewhat ensures that we prioritize connecting to more-vetted
284                 // peers.
285
286                 var picked *NetAddress
287                 // Try to fetch a new peer 3 times.
288                 // This caps the maximum number of tries to 3 * numToDial.
289                 for j := 0; j < 3; j++ {
290                         try := r.book.PickAddress(newBias)
291                         if try == nil {
292                                 break
293                         }
294                         ka := r.book.addrLookup[try.String()]
295                         if ka != nil {
296                                 if ka.isBad() {
297                                         continue
298                                 }
299                         }
300                         _, alreadySelected := toDial[try.IP.String()]
301                         alreadyDialing := r.Switch.IsDialing(try)
302                         var alreadyConnected bool
303
304                         for _, v := range r.Switch.Peers().list {
305                                 if strings.Compare(v.mconn.RemoteAddress.IP.String(), try.IP.String()) == 0 {
306                                         alreadyConnected = true
307                                         break
308                                 }
309                         }
310                         if alreadySelected || alreadyDialing || alreadyConnected {
311                                 continue
312                         } else {
313                                 log.WithField("addr", try).Info("Will dial address")
314                                 picked = try
315                                 break
316                         }
317                 }
318                 if picked == nil {
319                         continue
320                 }
321                 toDial[picked.IP.String()] = picked
322         }
323
324         // Dial picked addresses
325         for _, item := range toDial {
326                 if _, err := r.Switch.DialPeerWithAddress(item, false); err != nil {
327                         r.book.MarkAttempt(item)
328                 } else {
329                         r.book.MarkGood(item)
330                 }
331         }
332
333         // If we need more addresses, pick a random peer and ask for more.
334         if r.book.NeedMoreAddrs() {
335                 if peers := r.Switch.Peers().List(); len(peers) > 0 {
336                         i := rand.Int() % len(peers)
337                         peer := peers[i]
338                         log.WithField("peer", peer).Info("No addresses to dial. Sending pexRequest to random peer")
339                         if ok := r.RequestPEX(peer); !ok {
340                                 log.Info("Send request address message failed. Stop peer.")
341                         }
342                 }
343         }
344 }
345
346 func (r *PEXReactor) flushMsgCountByPeer() {
347         ticker := time.NewTicker(msgCountByPeerFlushInterval)
348
349         for {
350                 select {
351                 case <-ticker.C:
352                         r.msgCountByPeer.Clear()
353                 case <-r.Quit:
354                         ticker.Stop()
355                         return
356                 }
357         }
358 }
359
360 //-----------------------------------------------------------------------------
361 // Messages
362
363 const (
364         msgTypeRequest = byte(0x01)
365         msgTypeAddrs   = byte(0x02)
366 )
367
368 // PexMessage is a primary type for PEX messages. Underneath, it could contain
369 // either pexRequestMessage, or pexAddrsMessage messages.
370 type PexMessage interface{}
371
372 var _ = wire.RegisterInterface(
373         struct{ PexMessage }{},
374         wire.ConcreteType{&pexRequestMessage{}, msgTypeRequest},
375         wire.ConcreteType{&pexAddrsMessage{}, msgTypeAddrs},
376 )
377
378 // DecodeMessage implements interface registered above.
379 func DecodeMessage(bz []byte) (msgType byte, msg PexMessage, err error) {
380         msgType = bz[0]
381         n := new(int)
382         r := bytes.NewReader(bz)
383         msg = wire.ReadBinary(struct{ PexMessage }{}, r, maxPexMessageSize, n, &err).(struct{ PexMessage }).PexMessage
384         return
385 }
386
387 /*
388 A pexRequestMessage requests additional peer addresses.
389 */
390 type pexRequestMessage struct {
391 }
392
393 func (m *pexRequestMessage) String() string {
394         return "[pexRequest]"
395 }
396
397 /*
398 A message with announced peer addresses.
399 */
400 type pexAddrsMessage struct {
401         Addrs []*NetAddress
402 }
403
404 func (m *pexAddrsMessage) String() string {
405         return fmt.Sprintf("[pexAddrs %v]", m.Addrs)
406 }