OSDN Git Service

295320719805f5ab83c1915eba95eff66aa62082
[bytom/bytom-spv.git] / p2p / pex_reactor.go
1 package p2p
2
3 import (
4         "bytes"
5         "fmt"
6         "math/rand"
7         "reflect"
8         "strings"
9         "sync"
10         "time"
11
12         log "github.com/sirupsen/logrus"
13         wire "github.com/tendermint/go-wire"
14         cmn "github.com/tendermint/tmlibs/common"
15
16         "github.com/bytom/errors"
17 )
18
19 const (
20         // PexChannel is a channel for PEX messages
21         PexChannel = byte(0x00)
22
23         // period to ensure peers connected
24         defaultEnsurePeersPeriod = 120 * time.Second
25         minNumOutboundPeers      = 5
26         maxPexMessageSize        = 1048576 // 1MB
27
28         // maximum messages one peer can send to us during `msgCountByPeerFlushInterval`
29         defaultMaxMsgCountByPeer    = 1000
30         msgCountByPeerFlushInterval = 1 * time.Hour
31 )
32
33 var ErrSendPexFail = errors.New("Send pex message fail")
34
35 // PEXReactor handles PEX (peer exchange) and ensures that an
36 // adequate number of peers are connected to the switch.
37 //
38 // It uses `AddrBook` (address book) to store `NetAddress`es of the peers.
39 //
40 // ## Preventing abuse
41 //
42 // For now, it just limits the number of messages from one peer to
43 // `defaultMaxMsgCountByPeer` messages per `msgCountByPeerFlushInterval` (1000
44 // msg/hour).
45 //
46 // NOTE [2017-01-17]:
47 //   Limiting is fine for now. Maybe down the road we want to keep track of the
48 //   quality of peer messages so if peerA keeps telling us about peers we can't
49 //   connect to then maybe we should care less about peerA. But I don't think
50 //   that kind of complexity is priority right now.
51 type PEXReactor struct {
52         BaseReactor
53
54         sw                *Switch
55         book              *AddrBook
56         ensurePeersPeriod time.Duration
57
58         // tracks message count by peer, so we can prevent abuse
59         msgCountByPeer    *cmn.CMap
60         maxMsgCountByPeer uint16
61 }
62
63 // NewPEXReactor creates new PEX reactor.
64 func NewPEXReactor(b *AddrBook, sw *Switch) *PEXReactor {
65         r := &PEXReactor{
66                 sw:                sw,
67                 book:              b,
68                 ensurePeersPeriod: defaultEnsurePeersPeriod,
69                 msgCountByPeer:    cmn.NewCMap(),
70                 maxMsgCountByPeer: defaultMaxMsgCountByPeer,
71         }
72         r.BaseReactor = *NewBaseReactor("PEXReactor", r)
73         return r
74 }
75
76 // OnStart implements BaseService
77 func (r *PEXReactor) OnStart() error {
78         r.BaseReactor.OnStart()
79         r.book.Start()
80         go r.ensurePeersRoutine()
81         go r.flushMsgCountByPeer()
82         return nil
83 }
84
85 // OnStop implements BaseService
86 func (r *PEXReactor) OnStop() {
87         r.BaseReactor.OnStop()
88         r.book.Stop()
89 }
90
91 // GetChannels implements Reactor
92 func (r *PEXReactor) GetChannels() []*ChannelDescriptor {
93         return []*ChannelDescriptor{
94                 &ChannelDescriptor{
95                         ID:                PexChannel,
96                         Priority:          1,
97                         SendQueueCapacity: 10,
98                 },
99         }
100 }
101
102 // AddPeer implements Reactor by adding peer to the address book (if inbound)
103 // or by requesting more addresses (if outbound).
104 func (r *PEXReactor) AddPeer(p *Peer) error {
105         if p.IsOutbound() {
106                 // For outbound peers, the address is already in the books.
107                 // Either it was added in DialSeeds or when we
108                 // received the peer's address in r.Receive
109                 if r.book.NeedMoreAddrs() {
110                         if ok := r.RequestPEX(p); !ok {
111                                 return ErrSendPexFail
112                         }
113                 }
114                 return nil
115         }
116
117         // For inbound connections, the peer is its own source
118         addr, err := NewNetAddressString(p.ListenAddr)
119         if err != nil {
120                 log.WithFields(log.Fields{"addr": p.ListenAddr, "error": err}).Error("Error in AddPeer: Invalid peer address")
121                 return errors.New("Error in AddPeer: Invalid peer address")
122         }
123         r.book.AddAddress(addr, addr)
124
125         // close the connect if connect is big than max limit
126         if r.sw.peers.Size() >= r.sw.config.MaxNumPeers {
127                 if ok := r.SendAddrs(p, r.book.GetSelection()); ok {
128                         <-time.After(1 * time.Second)
129                         r.sw.StopPeerGracefully(p)
130                 }
131                 return errors.New("Error in AddPeer: reach the max peer, exchange then close")
132         }
133
134         return nil
135 }
136
137 // RemovePeer implements Reactor.
138 func (r *PEXReactor) RemovePeer(p *Peer, reason interface{}) {
139         // If we aren't keeping track of local temp data for each peer here, then we
140         // don't have to do anything.
141 }
142
143 // Receive implements Reactor by handling incoming PEX messages.
144 func (r *PEXReactor) Receive(chID byte, src *Peer, msgBytes []byte) {
145         srcAddr := src.Connection().RemoteAddress
146         srcAddrStr := srcAddr.String()
147
148         r.IncrementMsgCountForPeer(srcAddrStr)
149         if r.ReachedMaxMsgCountForPeer(srcAddrStr) {
150                 log.WithField("peer", srcAddrStr).Error("Maximum number of messages reached for peer")
151                 // TODO remove src from peers?
152                 return
153         }
154
155         _, msg, err := DecodeMessage(msgBytes)
156         if err != nil {
157                 log.WithField("error", err).Error("Error decoding message")
158                 return
159         }
160         log.WithField("msg", msg).Info("Reveived message")
161
162         switch msg := msg.(type) {
163         case *pexRequestMessage:
164                 // src requested some peers.
165                 if ok := r.SendAddrs(src, r.book.GetSelection()); !ok {
166                         log.Info("Send address message failed. Stop peer.")
167                 }
168         case *pexAddrsMessage:
169                 // We received some peer addresses from src.
170                 // (We don't want to get spammed with bad peers)
171                 for _, addr := range msg.Addrs {
172                         if addr != nil {
173                                 r.book.AddAddress(addr, srcAddr)
174                         }
175                 }
176         default:
177                 log.WithField("type", reflect.TypeOf(msg)).Error("Unknown message type")
178         }
179 }
180
181 // RequestPEX asks peer for more addresses.
182 func (r *PEXReactor) RequestPEX(p *Peer) bool {
183         ok := p.TrySend(PexChannel, struct{ PexMessage }{&pexRequestMessage{}})
184         if !ok {
185                 r.sw.StopPeerGracefully(p)
186         }
187         return ok
188 }
189
190 // SendAddrs sends addrs to the peer.
191 func (r *PEXReactor) SendAddrs(p *Peer, addrs []*NetAddress) bool {
192         ok := p.TrySend(PexChannel, struct{ PexMessage }{&pexAddrsMessage{Addrs: addrs}})
193         if !ok {
194                 r.sw.StopPeerGracefully(p)
195         }
196         return ok
197 }
198
199 // SetEnsurePeersPeriod sets period to ensure peers connected.
200 func (r *PEXReactor) SetEnsurePeersPeriod(d time.Duration) {
201         r.ensurePeersPeriod = d
202 }
203
204 // SetMaxMsgCountByPeer sets maximum messages one peer can send to us during 'msgCountByPeerFlushInterval'.
205 func (r *PEXReactor) SetMaxMsgCountByPeer(v uint16) {
206         r.maxMsgCountByPeer = v
207 }
208
209 // ReachedMaxMsgCountForPeer returns true if we received too many
210 // messages from peer with address `addr`.
211 // NOTE: assumes the value in the CMap is non-nil
212 func (r *PEXReactor) ReachedMaxMsgCountForPeer(addr string) bool {
213         return r.msgCountByPeer.Get(addr).(uint16) >= r.maxMsgCountByPeer
214 }
215
216 // Increment or initialize the msg count for the peer in the CMap
217 func (r *PEXReactor) IncrementMsgCountForPeer(addr string) {
218         var count uint16
219         countI := r.msgCountByPeer.Get(addr)
220         if countI != nil {
221                 count = countI.(uint16)
222         }
223         count++
224         r.msgCountByPeer.Set(addr, count)
225 }
226
227 // Ensures that sufficient peers are connected. (continuous)
228 func (r *PEXReactor) ensurePeersRoutine() {
229         // Randomize when routine starts
230         ensurePeersPeriodMs := int64(10000)
231         time.Sleep(time.Duration(rand.Int63n(ensurePeersPeriodMs)) * time.Millisecond)
232
233         // fire once immediately.
234         r.ensurePeers()
235
236         // fire periodically
237         ticker := time.NewTicker(r.ensurePeersPeriod)
238         quickTicker := time.NewTicker(time.Second * 1)
239
240         for {
241                 select {
242                 case <-ticker.C:
243                         r.ensurePeers()
244                 case <-quickTicker.C:
245                         if r.sw.peers.Size() < 3 {
246                                 r.ensurePeers()
247                         }
248                 case <-r.Quit:
249                         ticker.Stop()
250                         quickTicker.Stop()
251                         return
252                 }
253         }
254 }
255
256 // ensurePeers ensures that sufficient peers are connected. (once)
257 //
258 // Old bucket / New bucket are arbitrary categories to denote whether an
259 // address is vetted or not, and this needs to be determined over time via a
260 // heuristic that we haven't perfected yet, or, perhaps is manually edited by
261 // the node operator. It should not be used to compute what addresses are
262 // already connected or not.
263 //
264 // TODO Basically, we need to work harder on our good-peer/bad-peer marking.
265 // What we're currently doing in terms of marking good/bad peers is just a
266 // placeholder. It should not be the case that an address becomes old/vetted
267 // upon a single successful connection.
268 func (r *PEXReactor) ensurePeers() {
269         numOutPeers, _, numDialing := r.Switch.NumPeers()
270         numToDial := (minNumOutboundPeers - (numOutPeers + numDialing)) * 5
271         log.WithFields(log.Fields{
272                 "numOutPeers": numOutPeers,
273                 "numDialing":  numDialing,
274                 "numToDial":   numToDial,
275         }).Info("Ensure peers")
276         if numToDial <= 0 {
277                 return
278         }
279
280         newBias := cmn.MinInt(numOutPeers, 8)*10 + 10
281         toDial := make(map[string]*NetAddress)
282
283         // Try to pick numToDial addresses to dial.
284         for i := 0; i < numToDial; i++ {
285                 // The purpose of newBias is to first prioritize old (more vetted) peers
286                 // when we have few connections, but to allow for new (less vetted) peers
287                 // if we already have many connections. This algorithm isn't perfect, but
288                 // it somewhat ensures that we prioritize connecting to more-vetted
289                 // peers.
290
291                 var picked *NetAddress
292                 // Try to fetch a new peer 3 times.
293                 // This caps the maximum number of tries to 3 * numToDial.
294                 for j := 0; j < 3; j++ {
295                         try := r.book.PickAddress(newBias)
296                         if try == nil {
297                                 break
298                         }
299                         ka := r.book.addrLookup[try.String()]
300                         if ka != nil {
301                                 if ka.isBad() {
302                                         continue
303                                 }
304                         }
305                         _, alreadySelected := toDial[try.IP.String()]
306                         alreadyDialing := r.Switch.IsDialing(try)
307                         var alreadyConnected bool
308
309                         for _, v := range r.Switch.Peers().list {
310                                 if strings.Compare(v.mconn.RemoteAddress.IP.String(), try.IP.String()) == 0 {
311                                         alreadyConnected = true
312                                         break
313                                 }
314                         }
315                         if alreadySelected || alreadyDialing || alreadyConnected {
316                                 continue
317                         } else {
318                                 log.Debug("Will dial address addr:", try)
319                                 picked = try
320                                 break
321                         }
322                 }
323                 if picked == nil {
324                         continue
325                 }
326                 toDial[picked.IP.String()] = picked
327         }
328
329         var wg sync.WaitGroup
330         for _, item := range toDial {
331                 wg.Add(1)
332                 go r.dialPeerWorker(item, &wg)
333         }
334         wg.Wait()
335
336         // If we need more addresses, pick a random peer and ask for more.
337         if r.book.NeedMoreAddrs() {
338                 if peers := r.Switch.Peers().List(); len(peers) > 0 {
339                         i := rand.Int() % len(peers)
340                         peer := peers[i]
341                         log.WithField("peer", peer).Info("No addresses to dial. Sending pexRequest to random peer")
342                         if ok := r.RequestPEX(peer); !ok {
343                                 log.Info("Send request address message failed. Stop peer.")
344                         }
345                 }
346         }
347 }
348
349 func (r *PEXReactor) dialPeerWorker(a *NetAddress, wg *sync.WaitGroup) {
350         if _, err := r.Switch.DialPeerWithAddress(a, false); err != nil {
351                 r.book.MarkAttempt(a)
352         } else {
353                 r.book.MarkGood(a)
354         }
355         wg.Done()
356 }
357
358 func (r *PEXReactor) flushMsgCountByPeer() {
359         ticker := time.NewTicker(msgCountByPeerFlushInterval)
360
361         for {
362                 select {
363                 case <-ticker.C:
364                         r.msgCountByPeer.Clear()
365                 case <-r.Quit:
366                         ticker.Stop()
367                         return
368                 }
369         }
370 }
371
372 //-----------------------------------------------------------------------------
373 // Messages
374
375 const (
376         msgTypeRequest = byte(0x01)
377         msgTypeAddrs   = byte(0x02)
378 )
379
380 // PexMessage is a primary type for PEX messages. Underneath, it could contain
381 // either pexRequestMessage, or pexAddrsMessage messages.
382 type PexMessage interface{}
383
384 var _ = wire.RegisterInterface(
385         struct{ PexMessage }{},
386         wire.ConcreteType{&pexRequestMessage{}, msgTypeRequest},
387         wire.ConcreteType{&pexAddrsMessage{}, msgTypeAddrs},
388 )
389
390 // DecodeMessage implements interface registered above.
391 func DecodeMessage(bz []byte) (msgType byte, msg PexMessage, err error) {
392         msgType = bz[0]
393         n := new(int)
394         r := bytes.NewReader(bz)
395         msg = wire.ReadBinary(struct{ PexMessage }{}, r, maxPexMessageSize, n, &err).(struct{ PexMessage }).PexMessage
396         return
397 }
398
399 /*
400 A pexRequestMessage requests additional peer addresses.
401 */
402 type pexRequestMessage struct {
403 }
404
405 func (m *pexRequestMessage) String() string {
406         return "[pexRequest]"
407 }
408
409 /*
410 A message with announced peer addresses.
411 */
412 type pexAddrsMessage struct {
413         Addrs []*NetAddress
414 }
415
416 func (m *pexAddrsMessage) String() string {
417         return fmt.Sprintf("[pexAddrs %v]", m.Addrs)
418 }