OSDN Git Service

fix nil pointer bug
[bytom/bytom.git] / p2p / pex_reactor.go
1 package p2p
2
3 import (
4         "bytes"
5         "fmt"
6         "math/rand"
7         "reflect"
8         "strings"
9         "sync"
10         "time"
11
12         log "github.com/sirupsen/logrus"
13         wire "github.com/tendermint/go-wire"
14         cmn "github.com/tendermint/tmlibs/common"
15
16         "github.com/bytom/errors"
17 )
18
19 const (
20         // PexChannel is a channel for PEX messages
21         PexChannel = byte(0x00)
22
23         // period to ensure peers connected
24         defaultEnsurePeersPeriod = 120 * time.Second
25         minNumOutboundPeers      = 5
26         maxPexMessageSize        = 1048576 // 1MB
27
28         // maximum messages one peer can send to us during `msgCountByPeerFlushInterval`
29         defaultMaxMsgCountByPeer    = 1000
30         msgCountByPeerFlushInterval = 1 * time.Hour
31 )
32
33 var ErrSendPexFail = errors.New("Send pex message fail")
34
35 // PEXReactor handles PEX (peer exchange) and ensures that an
36 // adequate number of peers are connected to the switch.
37 //
38 // It uses `AddrBook` (address book) to store `NetAddress`es of the peers.
39 //
40 // ## Preventing abuse
41 //
42 // For now, it just limits the number of messages from one peer to
43 // `defaultMaxMsgCountByPeer` messages per `msgCountByPeerFlushInterval` (1000
44 // msg/hour).
45 //
46 // NOTE [2017-01-17]:
47 //   Limiting is fine for now. Maybe down the road we want to keep track of the
48 //   quality of peer messages so if peerA keeps telling us about peers we can't
49 //   connect to then maybe we should care less about peerA. But I don't think
50 //   that kind of complexity is priority right now.
51 type PEXReactor struct {
52         BaseReactor
53
54         sw                *Switch
55         book              *AddrBook
56         ensurePeersPeriod time.Duration
57
58         // tracks message count by peer, so we can prevent abuse
59         msgCountByPeer    *cmn.CMap
60         maxMsgCountByPeer uint16
61 }
62
63 // NewPEXReactor creates new PEX reactor.
64 func NewPEXReactor(b *AddrBook, sw *Switch) *PEXReactor {
65         r := &PEXReactor{
66                 sw:                sw,
67                 book:              b,
68                 ensurePeersPeriod: defaultEnsurePeersPeriod,
69                 msgCountByPeer:    cmn.NewCMap(),
70                 maxMsgCountByPeer: defaultMaxMsgCountByPeer,
71         }
72         r.BaseReactor = *NewBaseReactor("PEXReactor", r)
73         return r
74 }
75
76 // OnStart implements BaseService
77 func (r *PEXReactor) OnStart() error {
78         r.BaseReactor.OnStart()
79         r.book.Start()
80         go r.ensurePeersRoutine()
81         go r.flushMsgCountByPeer()
82         return nil
83 }
84
85 // OnStop implements BaseService
86 func (r *PEXReactor) OnStop() {
87         r.BaseReactor.OnStop()
88         r.book.Stop()
89 }
90
91 // GetChannels implements Reactor
92 func (r *PEXReactor) GetChannels() []*ChannelDescriptor {
93         return []*ChannelDescriptor{
94                 &ChannelDescriptor{
95                         ID:                PexChannel,
96                         Priority:          1,
97                         SendQueueCapacity: 10,
98                 },
99         }
100 }
101
102 // AddPeer implements Reactor by adding peer to the address book (if inbound)
103 // or by requesting more addresses (if outbound).
104 func (r *PEXReactor) AddPeer(p *Peer) error {
105         if p.IsOutbound() {
106                 // For outbound peers, the address is already in the books.
107                 // Either it was added in DialSeeds or when we
108                 // received the peer's address in r.Receive
109                 if r.book.NeedMoreAddrs() {
110                         if ok := r.RequestPEX(p); !ok {
111                                 return ErrSendPexFail
112                         }
113                 }
114                 return nil
115         }
116
117         // For inbound connections, the peer is its own source
118         addr, err := NewNetAddressString(p.ListenAddr)
119         if err != nil {
120                 // this should never happen
121                 log.WithFields(log.Fields{
122                         "addr":  p.ListenAddr,
123                         "error": err,
124                 }).Error("Error in AddPeer: Invalid peer address")
125                 return errors.New("Error in AddPeer: Invalid peer address")
126         }
127         r.book.AddAddress(addr, addr)
128
129         // close the connect if connect is big than max limit
130         if r.sw.peers.Size() >= r.sw.config.MaxNumPeers {
131                 if ok := r.SendAddrs(p, r.book.GetSelection()); ok {
132                         <-time.After(1 * time.Second)
133                         r.sw.StopPeerGracefully(p)
134                 }
135                 return errors.New("Error in AddPeer: reach the max peer, exchange then close")
136         }
137
138         return nil
139 }
140
141 // RemovePeer implements Reactor.
142 func (r *PEXReactor) RemovePeer(p *Peer, reason interface{}) {
143         // If we aren't keeping track of local temp data for each peer here, then we
144         // don't have to do anything.
145 }
146
147 // Receive implements Reactor by handling incoming PEX messages.
148 func (r *PEXReactor) Receive(chID byte, src *Peer, msgBytes []byte) {
149         srcAddr := src.Connection().RemoteAddress
150         srcAddrStr := srcAddr.String()
151
152         r.IncrementMsgCountForPeer(srcAddrStr)
153         if r.ReachedMaxMsgCountForPeer(srcAddrStr) {
154                 log.WithField("peer", srcAddrStr).Error("Maximum number of messages reached for peer")
155                 // TODO remove src from peers?
156                 return
157         }
158
159         _, msg, err := DecodeMessage(msgBytes)
160         if err != nil {
161                 log.WithField("error", err).Error("Error decoding message")
162                 return
163         }
164         log.WithField("msg", msg).Info("Reveived message")
165
166         switch msg := msg.(type) {
167         case *pexRequestMessage:
168                 // src requested some peers.
169                 if ok := r.SendAddrs(src, r.book.GetSelection()); !ok {
170                         log.Info("Send address message failed. Stop peer.")
171                 }
172         case *pexAddrsMessage:
173                 // We received some peer addresses from src.
174                 // (We don't want to get spammed with bad peers)
175                 for _, addr := range msg.Addrs {
176                         if addr != nil {
177                                 r.book.AddAddress(addr, srcAddr)
178                         }
179                 }
180         default:
181                 log.WithField("type", reflect.TypeOf(msg)).Error("Unknown message type")
182         }
183 }
184
185 // RequestPEX asks peer for more addresses.
186 func (r *PEXReactor) RequestPEX(p *Peer) bool {
187         ok := p.TrySend(PexChannel, struct{ PexMessage }{&pexRequestMessage{}})
188         if !ok {
189                 r.sw.StopPeerGracefully(p)
190         }
191         return ok
192 }
193
194 // SendAddrs sends addrs to the peer.
195 func (r *PEXReactor) SendAddrs(p *Peer, addrs []*NetAddress) bool {
196         ok := p.TrySend(PexChannel, struct{ PexMessage }{&pexAddrsMessage{Addrs: addrs}})
197         if !ok {
198                 r.sw.StopPeerGracefully(p)
199         }
200         return ok
201 }
202
203 // SetEnsurePeersPeriod sets period to ensure peers connected.
204 func (r *PEXReactor) SetEnsurePeersPeriod(d time.Duration) {
205         r.ensurePeersPeriod = d
206 }
207
208 // SetMaxMsgCountByPeer sets maximum messages one peer can send to us during 'msgCountByPeerFlushInterval'.
209 func (r *PEXReactor) SetMaxMsgCountByPeer(v uint16) {
210         r.maxMsgCountByPeer = v
211 }
212
213 // ReachedMaxMsgCountForPeer returns true if we received too many
214 // messages from peer with address `addr`.
215 // NOTE: assumes the value in the CMap is non-nil
216 func (r *PEXReactor) ReachedMaxMsgCountForPeer(addr string) bool {
217         return r.msgCountByPeer.Get(addr).(uint16) >= r.maxMsgCountByPeer
218 }
219
220 // Increment or initialize the msg count for the peer in the CMap
221 func (r *PEXReactor) IncrementMsgCountForPeer(addr string) {
222         var count uint16
223         countI := r.msgCountByPeer.Get(addr)
224         if countI != nil {
225                 count = countI.(uint16)
226         }
227         count++
228         r.msgCountByPeer.Set(addr, count)
229 }
230
231 // Ensures that sufficient peers are connected. (continuous)
232 func (r *PEXReactor) ensurePeersRoutine() {
233         // Randomize when routine starts
234         ensurePeersPeriodMs := int64(10000)
235         time.Sleep(time.Duration(rand.Int63n(ensurePeersPeriodMs)) * time.Millisecond)
236
237         // fire once immediately.
238         r.ensurePeers()
239
240         // fire periodically
241         ticker := time.NewTicker(r.ensurePeersPeriod)
242         quickTicker := time.NewTicker(time.Second * 1)
243
244         for {
245                 select {
246                 case <-ticker.C:
247                         r.ensurePeers()
248                 case <-quickTicker.C:
249                         if r.sw.peers.Size() < 3 {
250                                 r.ensurePeers()
251                         }
252                 case <-r.Quit:
253                         ticker.Stop()
254                         quickTicker.Stop()
255                         return
256                 }
257         }
258 }
259
260 // ensurePeers ensures that sufficient peers are connected. (once)
261 //
262 // Old bucket / New bucket are arbitrary categories to denote whether an
263 // address is vetted or not, and this needs to be determined over time via a
264 // heuristic that we haven't perfected yet, or, perhaps is manually edited by
265 // the node operator. It should not be used to compute what addresses are
266 // already connected or not.
267 //
268 // TODO Basically, we need to work harder on our good-peer/bad-peer marking.
269 // What we're currently doing in terms of marking good/bad peers is just a
270 // placeholder. It should not be the case that an address becomes old/vetted
271 // upon a single successful connection.
272 func (r *PEXReactor) ensurePeers() {
273         numOutPeers, _, numDialing := r.Switch.NumPeers()
274         numToDial := (minNumOutboundPeers - (numOutPeers + numDialing)) * 5
275         log.WithFields(log.Fields{
276                 "numOutPeers": numOutPeers,
277                 "numDialing":  numDialing,
278                 "numToDial":   numToDial,
279         }).Info("Ensure peers")
280         if numToDial <= 0 {
281                 return
282         }
283
284         newBias := cmn.MinInt(numOutPeers, 8)*10 + 10
285         toDial := make(map[string]*NetAddress)
286
287         // Try to pick numToDial addresses to dial.
288         for i := 0; i < numToDial; i++ {
289                 // The purpose of newBias is to first prioritize old (more vetted) peers
290                 // when we have few connections, but to allow for new (less vetted) peers
291                 // if we already have many connections. This algorithm isn't perfect, but
292                 // it somewhat ensures that we prioritize connecting to more-vetted
293                 // peers.
294
295                 var picked *NetAddress
296                 // Try to fetch a new peer 3 times.
297                 // This caps the maximum number of tries to 3 * numToDial.
298                 for j := 0; j < 3; j++ {
299                         try := r.book.PickAddress(newBias)
300                         if try == nil {
301                                 break
302                         }
303                         ka := r.book.addrLookup[try.String()]
304                         if ka != nil {
305                                 if ka.isBad() {
306                                         continue
307                                 }
308                         }
309                         _, alreadySelected := toDial[try.IP.String()]
310                         alreadyDialing := r.Switch.IsDialing(try)
311                         var alreadyConnected bool
312
313                         for _, v := range r.Switch.Peers().list {
314                                 if strings.Compare(v.mconn.RemoteAddress.IP.String(), try.IP.String()) == 0 {
315                                         alreadyConnected = true
316                                         break
317                                 }
318                         }
319                         if alreadySelected || alreadyDialing || alreadyConnected {
320                                 continue
321                         } else {
322                                 log.Debug("Will dial address addr:", try)
323                                 picked = try
324                                 break
325                         }
326                 }
327                 if picked == nil {
328                         continue
329                 }
330                 toDial[picked.IP.String()] = picked
331         }
332
333         var wg sync.WaitGroup
334         for _, item := range toDial {
335                 wg.Add(1)
336                 go r.dialPeerWorker(item, &wg)
337         }
338         wg.Wait()
339
340         // If we need more addresses, pick a random peer and ask for more.
341         if r.book.NeedMoreAddrs() {
342                 if peers := r.Switch.Peers().List(); len(peers) > 0 {
343                         i := rand.Int() % len(peers)
344                         peer := peers[i]
345                         log.WithField("peer", peer).Info("No addresses to dial. Sending pexRequest to random peer")
346                         if ok := r.RequestPEX(peer); !ok {
347                                 log.Info("Send request address message failed. Stop peer.")
348                         }
349                 }
350         }
351 }
352
353 func (r *PEXReactor) dialPeerWorker(a *NetAddress, wg *sync.WaitGroup) {
354         if _, err := r.Switch.DialPeerWithAddress(a, false); err != nil {
355                 r.book.MarkAttempt(a)
356         } else {
357                 r.book.MarkGood(a)
358         }
359         wg.Done()
360 }
361
362 func (r *PEXReactor) flushMsgCountByPeer() {
363         ticker := time.NewTicker(msgCountByPeerFlushInterval)
364
365         for {
366                 select {
367                 case <-ticker.C:
368                         r.msgCountByPeer.Clear()
369                 case <-r.Quit:
370                         ticker.Stop()
371                         return
372                 }
373         }
374 }
375
376 //-----------------------------------------------------------------------------
377 // Messages
378
379 const (
380         msgTypeRequest = byte(0x01)
381         msgTypeAddrs   = byte(0x02)
382 )
383
384 // PexMessage is a primary type for PEX messages. Underneath, it could contain
385 // either pexRequestMessage, or pexAddrsMessage messages.
386 type PexMessage interface{}
387
388 var _ = wire.RegisterInterface(
389         struct{ PexMessage }{},
390         wire.ConcreteType{&pexRequestMessage{}, msgTypeRequest},
391         wire.ConcreteType{&pexAddrsMessage{}, msgTypeAddrs},
392 )
393
394 // DecodeMessage implements interface registered above.
395 func DecodeMessage(bz []byte) (msgType byte, msg PexMessage, err error) {
396         msgType = bz[0]
397         n := new(int)
398         r := bytes.NewReader(bz)
399         msg = wire.ReadBinary(struct{ PexMessage }{}, r, maxPexMessageSize, n, &err).(struct{ PexMessage }).PexMessage
400         return
401 }
402
403 /*
404 A pexRequestMessage requests additional peer addresses.
405 */
406 type pexRequestMessage struct {
407 }
408
409 func (m *pexRequestMessage) String() string {
410         return "[pexRequest]"
411 }
412
413 /*
414 A message with announced peer addresses.
415 */
416 type pexAddrsMessage struct {
417         Addrs []*NetAddress
418 }
419
420 func (m *pexAddrsMessage) String() string {
421         return fmt.Sprintf("[pexAddrs %v]", m.Addrs)
422 }