OSDN Git Service

Disconnect only for diff major version (#1203)
[bytom/bytom-spv.git] / p2p / pex / pex_reactor.go
1 package pex
2
3 import (
4         "errors"
5         "reflect"
6         "sync"
7         "time"
8
9         log "github.com/sirupsen/logrus"
10
11         "github.com/bytom/p2p"
12         "github.com/bytom/p2p/connection"
13         "github.com/bytom/p2p/discover"
14 )
15
16 const (
17         // PexChannel is a channel for PEX messages
18         PexChannel = byte(0x00)
19
20         minNumOutboundPeers = 5
21         maxPexMessageSize   = 1048576 // 1MB
22 )
23
24 // PEXReactor handles peer exchange and ensures that an adequate number of peers are connected to the switch.
25 type PEXReactor struct {
26         p2p.BaseReactor
27         discv *discover.Network
28 }
29
30 // NewPEXReactor creates new PEX reactor.
31 func NewPEXReactor(discv *discover.Network) *PEXReactor {
32         r := &PEXReactor{
33                 discv: discv,
34         }
35         r.BaseReactor = *p2p.NewBaseReactor("PEXReactor", r)
36         return r
37 }
38
39 // OnStart implements BaseService
40 func (r *PEXReactor) OnStart() error {
41         r.BaseReactor.OnStart()
42         go r.ensurePeersRoutine()
43         return nil
44 }
45
46 // OnStop implements BaseService
47 func (r *PEXReactor) OnStop() {
48         r.BaseReactor.OnStop()
49 }
50
51 // GetChannels implements Reactor
52 func (r *PEXReactor) GetChannels() []*connection.ChannelDescriptor {
53         return []*connection.ChannelDescriptor{&connection.ChannelDescriptor{
54                 ID:                PexChannel,
55                 Priority:          1,
56                 SendQueueCapacity: 10,
57         }}
58 }
59
60 // AddPeer adding peer to the address book
61 func (r *PEXReactor) AddPeer(p *p2p.Peer) error {
62         if r.Switch.Peers().Size() <= r.Switch.Config.P2P.MaxNumPeers {
63                 return nil
64         }
65
66         nodes := make([]*discover.Node, 10)
67         if n := r.discv.ReadRandomNodes(nodes); n == 0 {
68                 return nil
69         }
70
71         if r.SendAddrs(p, nodes) {
72                 <-time.After(1 * time.Second)
73                 r.Switch.StopPeerGracefully(p.Key)
74         }
75         return errors.New("addPeer: reach the max peer, exchange then close")
76 }
77
78 // Receive implements Reactor by handling incoming PEX messages.
79 func (r *PEXReactor) Receive(chID byte, p *p2p.Peer, rawMsg []byte) {
80         _, msg, err := DecodeMessage(rawMsg)
81         if err != nil {
82                 log.WithField("error", err).Error("failed to decoding pex message")
83                 r.Switch.StopPeerGracefully(p.Key)
84                 return
85         }
86
87         switch msg := msg.(type) {
88         case *pexRequestMessage:
89                 nodes := make([]*discover.Node, 10)
90                 if n := r.discv.ReadRandomNodes(nodes); n == 0 {
91                         return
92                 }
93
94                 if !r.SendAddrs(p, nodes) {
95                         log.Error("failed to send pex address message")
96                 }
97
98         case *pexAddrsMessage:
99         default:
100                 log.WithField("type", reflect.TypeOf(msg)).Error("Unknown message type")
101         }
102 }
103
104 // RemovePeer implements Reactor.
105 func (r *PEXReactor) RemovePeer(p *p2p.Peer, reason interface{}) {}
106
107 // SendAddrs sends addrs to the peer.
108 func (r *PEXReactor) SendAddrs(p *p2p.Peer, nodes []*discover.Node) bool {
109         addrs := []*p2p.NetAddress{}
110         for _, node := range nodes {
111                 if node == nil {
112                         break
113                 }
114                 addrs = append(addrs, p2p.NewNetAddressIPPort(node.IP, node.TCP))
115         }
116
117         ok := p.TrySend(PexChannel, struct{ PexMessage }{&pexAddrsMessage{Addrs: addrs}})
118         if !ok {
119                 r.Switch.StopPeerGracefully(p.Key)
120         }
121         return ok
122 }
123
124 func (r *PEXReactor) dialPeerWorker(a *p2p.NetAddress, wg *sync.WaitGroup) {
125         if err := r.Switch.DialPeerWithAddress(a); err != nil {
126                 log.WithFields(log.Fields{"addr": a, "err": err}).Error("dialPeerWorker fail on dial peer")
127         }
128         wg.Done()
129 }
130
131 func (r *PEXReactor) ensurePeers() {
132         numOutPeers, _, numDialing := r.Switch.NumPeers()
133         numToDial := (minNumOutboundPeers - (numOutPeers + numDialing))
134         log.WithFields(log.Fields{
135                 "numOutPeers": numOutPeers,
136                 "numDialing":  numDialing,
137                 "numToDial":   numToDial,
138         }).Debug("ensure peers")
139         if numToDial <= 0 {
140                 return
141         }
142
143         connectedPeers := make(map[string]struct{})
144         for _, peer := range r.Switch.Peers().List() {
145                 connectedPeers[peer.RemoteAddrHost()] = struct{}{}
146         }
147
148         var wg sync.WaitGroup
149         nodes := make([]*discover.Node, numToDial)
150         n := r.discv.ReadRandomNodes(nodes)
151         for i := 0; i < n; i++ {
152                 try := p2p.NewNetAddressIPPort(nodes[i].IP, nodes[i].TCP)
153                 if r.Switch.NodeInfo().ListenAddr == try.String() {
154                         continue
155                 }
156                 if dialling := r.Switch.IsDialing(try); dialling {
157                         continue
158                 }
159                 if _, ok := connectedPeers[try.IP.String()]; ok {
160                         continue
161                 }
162
163                 log.Debug("Will dial address addr:", try)
164                 wg.Add(1)
165                 go r.dialPeerWorker(try, &wg)
166         }
167         wg.Wait()
168 }
169
170 func (r *PEXReactor) ensurePeersRoutine() {
171         r.ensurePeers()
172         ticker := time.NewTicker(120 * time.Second)
173         quickTicker := time.NewTicker(3 * time.Second)
174
175         for {
176                 select {
177                 case <-ticker.C:
178                         r.ensurePeers()
179                 case <-quickTicker.C:
180                         if r.Switch.Peers().Size() < 3 {
181                                 r.ensurePeers()
182                         }
183                 case <-r.Quit:
184                         return
185                 }
186         }
187 }