9 log "github.com/sirupsen/logrus"
11 "github.com/bytom/p2p"
12 "github.com/bytom/p2p/connection"
13 "github.com/bytom/p2p/discover"
17 // PexChannel is a channel for PEX messages
18 PexChannel = byte(0x00)
20 minNumOutboundPeers = 5
21 maxPexMessageSize = 1048576 // 1MB
24 // PEXReactor handles peer exchange and ensures that an adequate number of peers are connected to the switch.
25 type PEXReactor struct {
27 discv *discover.Network
30 // NewPEXReactor creates new PEX reactor.
31 func NewPEXReactor(discv *discover.Network) *PEXReactor {
35 r.BaseReactor = *p2p.NewBaseReactor("PEXReactor", r)
39 // OnStart implements BaseService
40 func (r *PEXReactor) OnStart() error {
41 r.BaseReactor.OnStart()
42 go r.ensurePeersRoutine()
46 // OnStop implements BaseService
47 func (r *PEXReactor) OnStop() {
48 r.BaseReactor.OnStop()
51 // GetChannels implements Reactor
52 func (r *PEXReactor) GetChannels() []*connection.ChannelDescriptor {
53 return []*connection.ChannelDescriptor{&connection.ChannelDescriptor{
56 SendQueueCapacity: 10,
60 // AddPeer adding peer to the address book
61 func (r *PEXReactor) AddPeer(p *p2p.Peer) error {
62 if r.Switch.Peers().Size() <= r.Switch.Config.P2P.MaxNumPeers {
66 nodes := make([]*discover.Node, 10)
67 if n := r.discv.ReadRandomNodes(nodes); n == 0 {
71 if r.SendAddrs(p, nodes) {
72 <-time.After(1 * time.Second)
73 r.Switch.StopPeerGracefully(p.Key)
75 return errors.New("addPeer: reach the max peer, exchange then close")
78 // Receive implements Reactor by handling incoming PEX messages.
79 func (r *PEXReactor) Receive(chID byte, p *p2p.Peer, rawMsg []byte) {
80 _, msg, err := DecodeMessage(rawMsg)
82 log.WithField("error", err).Error("failed to decoding pex message")
83 r.Switch.StopPeerGracefully(p.Key)
87 switch msg := msg.(type) {
88 case *pexRequestMessage:
89 nodes := make([]*discover.Node, 10)
90 if n := r.discv.ReadRandomNodes(nodes); n == 0 {
94 if !r.SendAddrs(p, nodes) {
95 log.Error("failed to send pex address message")
98 case *pexAddrsMessage:
100 log.WithField("type", reflect.TypeOf(msg)).Error("Unknown message type")
104 // RemovePeer implements Reactor.
105 func (r *PEXReactor) RemovePeer(p *p2p.Peer, reason interface{}) {}
107 // SendAddrs sends addrs to the peer.
108 func (r *PEXReactor) SendAddrs(p *p2p.Peer, nodes []*discover.Node) bool {
109 addrs := []*p2p.NetAddress{}
110 for _, node := range nodes {
114 addrs = append(addrs, p2p.NewNetAddressIPPort(node.IP, node.TCP))
117 ok := p.TrySend(PexChannel, struct{ PexMessage }{&pexAddrsMessage{Addrs: addrs}})
119 r.Switch.StopPeerGracefully(p.Key)
124 func (r *PEXReactor) dialPeerWorker(a *p2p.NetAddress, wg *sync.WaitGroup) {
125 if err := r.Switch.DialPeerWithAddress(a); err != nil {
126 log.WithFields(log.Fields{"addr": a, "err": err}).Error("dialPeerWorker fail on dial peer")
131 func (r *PEXReactor) ensurePeers() {
132 numOutPeers, _, numDialing := r.Switch.NumPeers()
133 numToDial := (minNumOutboundPeers - (numOutPeers + numDialing))
134 log.WithFields(log.Fields{
135 "numOutPeers": numOutPeers,
136 "numDialing": numDialing,
137 "numToDial": numToDial,
138 }).Debug("ensure peers")
143 connectedPeers := make(map[string]struct{})
144 for _, peer := range r.Switch.Peers().List() {
145 connectedPeers[peer.RemoteAddrHost()] = struct{}{}
148 var wg sync.WaitGroup
149 nodes := make([]*discover.Node, numToDial)
150 n := r.discv.ReadRandomNodes(nodes)
151 for i := 0; i < n; i++ {
152 try := p2p.NewNetAddressIPPort(nodes[i].IP, nodes[i].TCP)
153 if r.Switch.NodeInfo().ListenAddr == try.String() {
156 if dialling := r.Switch.IsDialing(try); dialling {
159 if _, ok := connectedPeers[try.IP.String()]; ok {
163 log.Debug("Will dial address addr:", try)
165 go r.dialPeerWorker(try, &wg)
170 func (r *PEXReactor) ensurePeersRoutine() {
172 ticker := time.NewTicker(120 * time.Second)
173 quickTicker := time.NewTicker(3 * time.Second)
179 case <-quickTicker.C:
180 if r.Switch.Peers().Size() < 3 {