OSDN Git Service

fix the code style
[bytom/bytom-spv.git] / p2p / pex / pex_reactor.go
1 package pex
2
3 import (
4         "errors"
5         "math/rand"
6         "reflect"
7         "sync"
8         "time"
9
10         log "github.com/sirupsen/logrus"
11         cmn "github.com/tendermint/tmlibs/common"
12 )
13
14 const (
15         // PexChannel is a channel for PEX messages
16         PexChannel = byte(0x00)
17
18         defaultEnsurePeersPeriod    = 120 * time.Second // period to ensure peers connected
19         minNumOutboundPeers         = 5
20         maxPexMessageSize           = 1048576 // 1MB
21         defaultMaxMsgCountByPeer    = uint16(1000)
22         msgCountByPeerFlushInterval = 1 * time.Hour
23 )
24
25 // PEXReactor handles peer exchange and ensures that an adequate number of peers are connected to the switch.
26 type PEXReactor struct {
27         BaseReactor
28         book           *AddrBook
29         msgCountByPeer *cmn.CMap
30 }
31
32 // NewPEXReactor creates new PEX reactor.
33 func NewPEXReactor(b *AddrBook) *PEXReactor {
34         r := &PEXReactor{
35                 book:           b,
36                 msgCountByPeer: cmn.NewCMap(),
37         }
38         r.BaseReactor = *NewBaseReactor("PEXReactor", r)
39         return r
40 }
41
42 // OnStart implements BaseService
43 func (r *PEXReactor) OnStart() error {
44         r.BaseReactor.OnStart()
45         r.book.Start()
46         go r.ensurePeersRoutine()
47         go r.flushMsgCountByPeer()
48         return nil
49 }
50
51 // OnStop implements BaseService
52 func (r *PEXReactor) OnStop() {
53         r.BaseReactor.OnStop()
54         r.book.Stop()
55 }
56
57 // GetChannels implements Reactor
58 func (r *PEXReactor) GetChannels() []*ChannelDescriptor {
59         return []*ChannelDescriptor{&ChannelDescriptor{
60                 ID:                PexChannel,
61                 Priority:          1,
62                 SendQueueCapacity: 10,
63         }}
64 }
65
66 // AddPeer adding peer to the address book
67 func (r *PEXReactor) AddPeer(p *Peer) error {
68         if p.IsOutbound() {
69                 if r.book.NeedMoreAddrs() && !r.RequestAddrs(p) {
70                         return errors.New("Send pex message fail")
71                 }
72                 return nil
73         }
74
75         addr, err := NewNetAddressString(p.ListenAddr)
76         if err != nil {
77                 return errors.New("addPeer: invalid peer address")
78         }
79
80         r.book.AddAddress(addr, addr)
81         if r.Switch.peers.Size() >= r.Switch.config.MaxNumPeers {
82                 if r.SendAddrs(p, r.book.GetSelection()) {
83                         <-time.After(1 * time.Second)
84                         r.Switch.StopPeerGracefully(p)
85                 }
86                 return errors.New("addPeer: reach the max peer, exchange then close")
87         }
88         return nil
89 }
90
91 // Receive implements Reactor by handling incoming PEX messages.
92 func (r *PEXReactor) Receive(chID byte, p *Peer, rawMsg []byte) {
93         srcAddr := p.Connection().RemoteAddress
94         srcAddrStr := srcAddr.String()
95         r.incrementMsgCount(srcAddrStr)
96         if r.reachedMaxMsgLimit(srcAddrStr) {
97                 log.WithField("peer", srcAddrStr).Error("reached the max pex messages limit")
98                 r.Switch.StopPeerGracefully(p)
99                 return
100         }
101
102         _, msg, err := DecodeMessage(rawMsg)
103         if err != nil {
104                 log.WithField("error", err).Error("failed to decoding pex message")
105                 r.Switch.StopPeerGracefully(p)
106                 return
107         }
108
109         switch msg := msg.(type) {
110         case *pexRequestMessage:
111                 if !r.SendAddrs(src, r.book.GetSelection()) {
112                         log.Error("failed to send pex address message")
113                 }
114
115         case *pexAddrsMessage:
116                 for _, addr := range msg.Addrs {
117                         if err := r.book.AddAddress(addr, srcAddr); err != nil {
118                                 log.WithField("error", err).Error("pex fail on process pexAddrsMessage")
119                                 r.Switch.StopPeerGracefully(p)
120                                 return
121                         }
122                 }
123
124         default:
125                 log.WithField("type", reflect.TypeOf(msg)).Error("Unknown message type")
126         }
127 }
128
129 // RemovePeer implements Reactor.
130 func (r *PEXReactor) RemovePeer(p *Peer, reason interface{}) {}
131
132 // RequestPEX asks peer for more addresses.
133 func (r *PEXReactor) RequestAddrs(p *Peer) bool {
134         ok := p.TrySend(PexChannel, struct{ PexMessage }{&pexRequestMessage{}})
135         if !ok {
136                 r.Switch.StopPeerGracefully(p)
137         }
138         return ok
139 }
140
141 // SendAddrs sends addrs to the peer.
142 func (r *PEXReactor) SendAddrs(p *Peer, addrs []*NetAddress) bool {
143         ok := p.TrySend(PexChannel, struct{ PexMessage }{&pexAddrsMessage{Addrs: addrs}})
144         if !ok {
145                 r.Switch.StopPeerGracefully(p)
146         }
147         return ok
148 }
149
150 func (r *PEXReactor) dialPeerWorker(a *NetAddress, wg *sync.WaitGroup) {
151         if _, err := r.Switch.DialPeerWithAddress(a, false); err != nil {
152                 r.book.MarkAttempt(a)
153         } else {
154                 r.book.MarkGood(a)
155         }
156         wg.Done()
157 }
158
159 func (r *PEXReactor) ensurePeers() {
160         numOutPeers, _, numDialing := r.Switch.NumPeers()
161         numToDial := (minNumOutboundPeers - (numOutPeers + numDialing)) * 5
162         log.WithFields(log.Fields{
163                 "numOutPeers": numOutPeers,
164                 "numDialing":  numDialing,
165                 "numToDial":   numToDial,
166         }).Debug("ensure peers")
167         if numToDial <= 0 {
168                 return
169         }
170
171         newBias := cmn.MinInt(numOutPeers, 8)*10 + 10
172         toDial := make(map[string]*NetAddress)
173         maxAttempts := numToDial * 3
174         for i := 0; i < maxAttempts && len(toDial) < numToDial; i++ {
175                 try := r.book.PickAddress(newBias)
176                 if try == nil {
177                         continue
178                 }
179                 if _, selected := toDial[try.IP.String()]; selected {
180                         continue
181                 }
182                 if dialling := r.Switch.IsDialing(try); dialling {
183                         continue
184                 }
185                 if connected := r.Switch.Peers().Has(try.ID); connected {
186                         continue
187                 }
188
189                 log.Debug("Will dial address addr:", try)
190                 toDial[try.IP.String()] = try
191         }
192
193         var wg sync.WaitGroup
194         for _, item := range toDial {
195                 wg.Add(1)
196                 go r.dialPeerWorker(item, &wg)
197         }
198         wg.Wait()
199
200         if r.book.NeedMoreAddrs() {
201                 if peers := r.Switch.Peers().List(); len(peers) > 0 {
202                         peer := peers[rand.Int()%len(peers)]
203                         r.RequestAddrs(peer)
204                 }
205         }
206 }
207
208 func (r *PEXReactor) ensurePeersRoutine() {
209         r.ensurePeers()
210         ticker := time.NewTicker(defaultEnsurePeersPeriod)
211         quickTicker := time.NewTicker(time.Second * 1)
212
213         for {
214                 select {
215                 case <-ticker.C:
216                         r.ensurePeers()
217                 case <-quickTicker.C:
218                         if r.Switch.peers.Size() < 3 {
219                                 r.ensurePeers()
220                         }
221                 case <-r.Quit:
222                         return
223                 }
224         }
225 }
226
227 func (r *PEXReactor) flushMsgCountByPeer() {
228         ticker := time.NewTicker(msgCountByPeerFlushInterval)
229         for {
230                 select {
231                 case <-ticker.C:
232                         r.msgCountByPeer.Clear()
233                 case <-r.Quit:
234                         return
235                 }
236         }
237 }
238
239 func (r *PEXReactor) incrementMsgCount(addr string) {
240         var count uint16
241         if countI := r.msgCountByPeer.Get(addr); countI != nil {
242                 count = countI.(uint16)
243         }
244         count++
245         r.msgCountByPeer.Set(addr, count)
246 }
247
248 func (r *PEXReactor) reachedMaxMsgLimit(addr string) bool {
249         return r.msgCountByPeer.Get(addr).(uint16) >= defaultMaxMsgCountByPeer
250 }