11 log "github.com/sirupsen/logrus"
12 cmn "github.com/tendermint/tmlibs/common"
14 "github.com/bytom/p2p"
18 // PexChannel is a channel for PEX messages
19 PexChannel = byte(0x00)
21 minNumOutboundPeers = 5
22 maxPexMessageSize = 1048576 // 1MB
23 defaultMaxMsgCountByPeer = uint16(1000)
26 // PEXReactor handles peer exchange and ensures that an adequate number of peers are connected to the switch.
27 type PEXReactor struct {
30 msgCountByPeer *cmn.CMap
33 // NewPEXReactor creates new PEX reactor.
34 func NewPEXReactor(b *AddrBook) *PEXReactor {
37 msgCountByPeer: cmn.NewCMap(),
39 r.BaseReactor = *p2p.NewBaseReactor("PEXReactor", r)
43 // OnStart implements BaseService
44 func (r *PEXReactor) OnStart() error {
45 r.BaseReactor.OnStart()
46 if _, err := r.book.Start(); err != nil {
50 go r.ensurePeersRoutine()
51 go r.flushMsgCountByPeer()
55 // OnStop implements BaseService
56 func (r *PEXReactor) OnStop() {
57 r.BaseReactor.OnStop()
61 // GetChannels implements Reactor
62 func (r *PEXReactor) GetChannels() []*p2p.ChannelDescriptor {
63 return []*p2p.ChannelDescriptor{&p2p.ChannelDescriptor{
66 SendQueueCapacity: 10,
70 // AddPeer adding peer to the address book
71 func (r *PEXReactor) AddPeer(p *p2p.Peer) error {
73 if r.book.NeedMoreAddrs() && !r.RequestAddrs(p) {
74 return errors.New("Send pex message fail")
79 addr, err := p2p.NewNetAddressString(p.ListenAddr)
81 return errors.New("addPeer: invalid peer address")
84 if err := r.book.AddAddress(addr, addr); err != nil {
88 if r.Switch.Peers().Size() >= r.Switch.Config.MaxNumPeers {
89 if r.SendAddrs(p, r.book.GetSelection()) {
90 <-time.After(1 * time.Second)
91 r.Switch.StopPeerGracefully(p)
93 return errors.New("addPeer: reach the max peer, exchange then close")
98 // Receive implements Reactor by handling incoming PEX messages.
99 func (r *PEXReactor) Receive(chID byte, p *p2p.Peer, rawMsg []byte) {
100 srcAddr := p.Connection().RemoteAddress
101 srcAddrStr := srcAddr.String()
102 r.incrementMsgCount(srcAddrStr)
103 if r.reachedMaxMsgLimit(srcAddrStr) {
104 log.WithField("peer", srcAddrStr).Error("reached the max pex messages limit")
105 r.Switch.StopPeerGracefully(p)
109 _, msg, err := DecodeMessage(rawMsg)
111 log.WithField("error", err).Error("failed to decoding pex message")
112 r.Switch.StopPeerGracefully(p)
116 switch msg := msg.(type) {
117 case *pexRequestMessage:
118 if !r.SendAddrs(p, r.book.GetSelection()) {
119 log.Error("failed to send pex address message")
122 case *pexAddrsMessage:
123 for _, addr := range msg.Addrs {
124 if err := r.book.AddAddress(addr, srcAddr); err != nil {
125 log.WithField("error", err).Error("pex fail on process pexAddrsMessage")
126 r.Switch.StopPeerGracefully(p)
132 log.WithField("type", reflect.TypeOf(msg)).Error("Unknown message type")
136 // RemovePeer implements Reactor.
137 func (r *PEXReactor) RemovePeer(p *p2p.Peer, reason interface{}) {}
139 // RequestAddrs asks peer for more addresses.
140 func (r *PEXReactor) RequestAddrs(p *p2p.Peer) bool {
141 ok := p.TrySend(PexChannel, struct{ PexMessage }{&pexRequestMessage{}})
143 r.Switch.StopPeerGracefully(p)
148 // SendAddrs sends addrs to the peer.
149 func (r *PEXReactor) SendAddrs(p *p2p.Peer, addrs []*p2p.NetAddress) bool {
150 ok := p.TrySend(PexChannel, struct{ PexMessage }{&pexAddrsMessage{Addrs: addrs}})
152 r.Switch.StopPeerGracefully(p)
157 func (r *PEXReactor) dialPeerWorker(a *p2p.NetAddress, wg *sync.WaitGroup) {
158 if err := r.Switch.DialPeerWithAddress(a); err != nil {
159 r.book.MarkAttempt(a)
166 func (r *PEXReactor) dialSeeds() {
167 if r.Switch.Config.Seeds == "" {
171 seeds := strings.Split(r.Switch.Config.Seeds, ",")
172 netAddrs, err := p2p.NewNetAddressStrings(seeds)
174 log.WithField("err", err).Error("dialSeeds: fail to decode net address strings")
177 ourAddr, err := p2p.NewNetAddressString(r.Switch.NodeInfo().ListenAddr)
179 log.WithField("err", err).Error("dialSeeds: fail to get our address")
182 for _, netAddr := range netAddrs {
183 if netAddr.Equals(ourAddr) {
186 if err := r.book.AddAddress(netAddr, ourAddr); err != nil {
187 log.WithField("err", err).Warn("dialSeeds: fail to add address")
191 if err := r.book.SaveToFile(); err != nil {
192 log.WithField("err", err).Warn("dialSeeds: fail to save address book")
195 perm := rand.Perm(len(netAddrs))
196 for i := 0; i < len(perm); i += 2 {
197 if err := r.Switch.DialPeerWithAddress(netAddrs[perm[i]]); err != nil {
198 log.WithField("err", err).Warn("dialSeeds: fail to dial seed")
203 func (r *PEXReactor) ensurePeers() {
204 numOutPeers, _, numDialing := r.Switch.NumPeers()
205 numToDial := (minNumOutboundPeers - (numOutPeers + numDialing)) * 3
206 log.WithFields(log.Fields{
207 "numOutPeers": numOutPeers,
208 "numDialing": numDialing,
209 "numToDial": numToDial,
210 }).Debug("ensure peers")
215 newBias := cmn.MinInt(numOutPeers, 8)*10 + 10
216 toDial := make(map[string]*p2p.NetAddress)
217 maxAttempts := numToDial * 3
219 connectedPeers := make(map[string]struct{})
220 for _, peer := range r.Switch.Peers().List() {
221 connectedPeers[peer.RemoteAddrHost()] = struct{}{}
224 for i := 0; i < maxAttempts && len(toDial) < numToDial; i++ {
225 try := r.book.PickAddress(newBias)
229 if _, selected := toDial[try.IP.String()]; selected {
232 if dialling := r.Switch.IsDialing(try); dialling {
235 if _, ok := connectedPeers[try.IP.String()]; ok {
239 log.Debug("Will dial address addr:", try)
240 toDial[try.IP.String()] = try
243 var wg sync.WaitGroup
244 for _, item := range toDial {
246 go r.dialPeerWorker(item, &wg)
250 if r.book.NeedMoreAddrs() {
251 if peers := r.Switch.Peers().List(); len(peers) > 0 {
252 peer := peers[rand.Int()%len(peers)]
258 func (r *PEXReactor) ensurePeersRoutine() {
260 if r.Switch.Peers().Size() < 3 {
264 ticker := time.NewTicker(120 * time.Second)
265 quickTicker := time.NewTicker(3 * time.Second)
271 case <-quickTicker.C:
272 if r.Switch.Peers().Size() < 3 {
281 func (r *PEXReactor) flushMsgCountByPeer() {
282 ticker := time.NewTicker(1 * time.Hour)
286 r.msgCountByPeer.Clear()
293 func (r *PEXReactor) incrementMsgCount(addr string) {
295 if countI := r.msgCountByPeer.Get(addr); countI != nil {
296 count = countI.(uint16)
299 r.msgCountByPeer.Set(addr, count)
302 func (r *PEXReactor) reachedMaxMsgLimit(addr string) bool {
303 return r.msgCountByPeer.Get(addr).(uint16) >= defaultMaxMsgCountByPeer