OSDN Git Service

89edc87b9f24c222291a9ad8f74295c78637d56a
[bytom/bytom.git] / p2p / pex / pex_reactor.go
1 package pex
2
3 import (
4         "errors"
5         "math/rand"
6         "reflect"
7         "strings"
8         "sync"
9         "time"
10
11         log "github.com/sirupsen/logrus"
12         cmn "github.com/tendermint/tmlibs/common"
13
14         "github.com/bytom/p2p"
15 )
16
17 const (
18         // PexChannel is a channel for PEX messages
19         PexChannel = byte(0x00)
20
21         minNumOutboundPeers      = 5
22         maxPexMessageSize        = 1048576 // 1MB
23         defaultMaxMsgCountByPeer = uint16(1000)
24 )
25
26 // PEXReactor handles peer exchange and ensures that an adequate number of peers are connected to the switch.
27 type PEXReactor struct {
28         p2p.BaseReactor
29         book           *AddrBook
30         msgCountByPeer *cmn.CMap
31 }
32
33 // NewPEXReactor creates new PEX reactor.
34 func NewPEXReactor(b *AddrBook) *PEXReactor {
35         r := &PEXReactor{
36                 book:           b,
37                 msgCountByPeer: cmn.NewCMap(),
38         }
39         r.BaseReactor = *p2p.NewBaseReactor("PEXReactor", r)
40         return r
41 }
42
43 // OnStart implements BaseService
44 func (r *PEXReactor) OnStart() error {
45         r.BaseReactor.OnStart()
46         if _, err := r.book.Start(); err != nil {
47                 return err
48         }
49
50         go r.ensurePeersRoutine()
51         go r.flushMsgCountByPeer()
52         return nil
53 }
54
55 // OnStop implements BaseService
56 func (r *PEXReactor) OnStop() {
57         r.BaseReactor.OnStop()
58         r.book.Stop()
59 }
60
61 // GetChannels implements Reactor
62 func (r *PEXReactor) GetChannels() []*p2p.ChannelDescriptor {
63         return []*p2p.ChannelDescriptor{&p2p.ChannelDescriptor{
64                 ID:                PexChannel,
65                 Priority:          1,
66                 SendQueueCapacity: 10,
67         }}
68 }
69
70 // AddPeer adding peer to the address book
71 func (r *PEXReactor) AddPeer(p *p2p.Peer) error {
72         if p.IsOutbound() {
73                 if r.book.NeedMoreAddrs() && !r.RequestAddrs(p) {
74                         return errors.New("Send pex message fail")
75                 }
76                 return nil
77         }
78
79         addr, err := p2p.NewNetAddressString(p.ListenAddr)
80         if err != nil {
81                 return errors.New("addPeer: invalid peer address")
82         }
83
84         if err := r.book.AddAddress(addr, addr); err != nil {
85                 return err
86         }
87
88         if r.Switch.Peers().Size() >= r.Switch.Config.MaxNumPeers {
89                 if r.SendAddrs(p, r.book.GetSelection()) {
90                         <-time.After(1 * time.Second)
91                         r.Switch.StopPeerGracefully(p)
92                 }
93                 return errors.New("addPeer: reach the max peer, exchange then close")
94         }
95         return nil
96 }
97
98 // Receive implements Reactor by handling incoming PEX messages.
99 func (r *PEXReactor) Receive(chID byte, p *p2p.Peer, rawMsg []byte) {
100         srcAddr := p.Connection().RemoteAddress
101         srcAddrStr := srcAddr.String()
102         r.incrementMsgCount(srcAddrStr)
103         if r.reachedMaxMsgLimit(srcAddrStr) {
104                 log.WithField("peer", srcAddrStr).Error("reached the max pex messages limit")
105                 r.Switch.StopPeerGracefully(p)
106                 return
107         }
108
109         _, msg, err := DecodeMessage(rawMsg)
110         if err != nil {
111                 log.WithField("error", err).Error("failed to decoding pex message")
112                 r.Switch.StopPeerGracefully(p)
113                 return
114         }
115
116         switch msg := msg.(type) {
117         case *pexRequestMessage:
118                 if !r.SendAddrs(p, r.book.GetSelection()) {
119                         log.Error("failed to send pex address message")
120                 }
121
122         case *pexAddrsMessage:
123                 for _, addr := range msg.Addrs {
124                         if err := r.book.AddAddress(addr, srcAddr); err != nil {
125                                 log.WithField("error", err).Error("pex fail on process pexAddrsMessage")
126                                 r.Switch.StopPeerGracefully(p)
127                                 return
128                         }
129                 }
130
131         default:
132                 log.WithField("type", reflect.TypeOf(msg)).Error("Unknown message type")
133         }
134 }
135
136 // RemovePeer implements Reactor.
137 func (r *PEXReactor) RemovePeer(p *p2p.Peer, reason interface{}) {}
138
139 // RequestAddrs asks peer for more addresses.
140 func (r *PEXReactor) RequestAddrs(p *p2p.Peer) bool {
141         ok := p.TrySend(PexChannel, struct{ PexMessage }{&pexRequestMessage{}})
142         if !ok {
143                 r.Switch.StopPeerGracefully(p)
144         }
145         return ok
146 }
147
148 // SendAddrs sends addrs to the peer.
149 func (r *PEXReactor) SendAddrs(p *p2p.Peer, addrs []*p2p.NetAddress) bool {
150         ok := p.TrySend(PexChannel, struct{ PexMessage }{&pexAddrsMessage{Addrs: addrs}})
151         if !ok {
152                 r.Switch.StopPeerGracefully(p)
153         }
154         return ok
155 }
156
157 func (r *PEXReactor) dialPeerWorker(a *p2p.NetAddress, wg *sync.WaitGroup) {
158         if err := r.Switch.DialPeerWithAddress(a); err != nil {
159                 r.book.MarkAttempt(a)
160         } else {
161                 r.book.MarkGood(a)
162         }
163         wg.Done()
164 }
165
166 func (r *PEXReactor) dialSeeds() {
167         if r.Switch.Config.Seeds == "" {
168                 return
169         }
170
171         seeds := strings.Split(r.Switch.Config.Seeds, ",")
172         netAddrs, err := p2p.NewNetAddressStrings(seeds)
173         if err != nil {
174                 log.WithField("err", err).Error("dialSeeds: fail to decode net address strings")
175         }
176
177         ourAddr, err := p2p.NewNetAddressString(r.Switch.NodeInfo().ListenAddr)
178         if err != nil {
179                 log.WithField("err", err).Error("dialSeeds: fail to get our address")
180         }
181
182         for _, netAddr := range netAddrs {
183                 if netAddr.Equals(ourAddr) {
184                         continue
185                 }
186                 if err := r.book.AddAddress(netAddr, ourAddr); err != nil {
187                         log.WithField("err", err).Warn("dialSeeds: fail to add address")
188                 }
189         }
190
191         if err := r.book.SaveToFile(); err != nil {
192                 log.WithField("err", err).Warn("dialSeeds: fail to save address book")
193         }
194
195         perm := rand.Perm(len(netAddrs))
196         for i := 0; i < len(perm); i += 2 {
197                 if err := r.Switch.DialPeerWithAddress(netAddrs[perm[i]]); err != nil {
198                         log.WithField("err", err).Warn("dialSeeds: fail to dial seed")
199                 }
200         }
201 }
202
203 func (r *PEXReactor) ensurePeers() {
204         numOutPeers, _, numDialing := r.Switch.NumPeers()
205         numToDial := (minNumOutboundPeers - (numOutPeers + numDialing)) * 3
206         log.WithFields(log.Fields{
207                 "numOutPeers": numOutPeers,
208                 "numDialing":  numDialing,
209                 "numToDial":   numToDial,
210         }).Debug("ensure peers")
211         if numToDial <= 0 {
212                 return
213         }
214
215         newBias := cmn.MinInt(numOutPeers, 8)*10 + 10
216         toDial := make(map[string]*p2p.NetAddress)
217         maxAttempts := numToDial * 3
218
219         connectedPeers := make(map[string]struct{})
220         for _, peer := range r.Switch.Peers().List() {
221                 connectedPeers[peer.RemoteAddrHost()] = struct{}{}
222         }
223
224         for i := 0; i < maxAttempts && len(toDial) < numToDial; i++ {
225                 try := r.book.PickAddress(newBias)
226                 if try == nil {
227                         continue
228                 }
229                 if _, selected := toDial[try.IP.String()]; selected {
230                         continue
231                 }
232                 if dialling := r.Switch.IsDialing(try); dialling {
233                         continue
234                 }
235                 if _, ok := connectedPeers[try.IP.String()]; ok {
236                         continue
237                 }
238
239                 log.Debug("Will dial address addr:", try)
240                 toDial[try.IP.String()] = try
241         }
242
243         var wg sync.WaitGroup
244         for _, item := range toDial {
245                 wg.Add(1)
246                 go r.dialPeerWorker(item, &wg)
247         }
248         wg.Wait()
249
250         if r.book.NeedMoreAddrs() {
251                 if peers := r.Switch.Peers().List(); len(peers) > 0 {
252                         peer := peers[rand.Int()%len(peers)]
253                         r.RequestAddrs(peer)
254                 }
255         }
256 }
257
258 func (r *PEXReactor) ensurePeersRoutine() {
259         r.ensurePeers()
260         if r.Switch.Peers().Size() < 3 {
261                 r.dialSeeds()
262         }
263
264         ticker := time.NewTicker(120 * time.Second)
265         quickTicker := time.NewTicker(3 * time.Second)
266
267         for {
268                 select {
269                 case <-ticker.C:
270                         r.ensurePeers()
271                 case <-quickTicker.C:
272                         if r.Switch.Peers().Size() < 3 {
273                                 r.ensurePeers()
274                         }
275                 case <-r.Quit:
276                         return
277                 }
278         }
279 }
280
281 func (r *PEXReactor) flushMsgCountByPeer() {
282         ticker := time.NewTicker(1 * time.Hour)
283         for {
284                 select {
285                 case <-ticker.C:
286                         r.msgCountByPeer.Clear()
287                 case <-r.Quit:
288                         return
289                 }
290         }
291 }
292
293 func (r *PEXReactor) incrementMsgCount(addr string) {
294         var count uint16
295         if countI := r.msgCountByPeer.Get(addr); countI != nil {
296                 count = countI.(uint16)
297         }
298         count++
299         r.msgCountByPeer.Set(addr, count)
300 }
301
302 func (r *PEXReactor) reachedMaxMsgLimit(addr string) bool {
303         return r.msgCountByPeer.Get(addr).(uint16) >= defaultMaxMsgCountByPeer
304 }