10 log "github.com/sirupsen/logrus"
11 cmn "github.com/tendermint/tmlibs/common"
13 cfg "github.com/bytom/bytom/config"
14 "github.com/bytom/bytom/consensus"
15 "github.com/bytom/bytom/crypto/ed25519/chainkd"
16 "github.com/bytom/bytom/errors"
17 "github.com/bytom/bytom/event"
18 "github.com/bytom/bytom/p2p/connection"
19 "github.com/bytom/bytom/p2p/discover/dht"
20 "github.com/bytom/bytom/p2p/discover/mdns"
21 "github.com/bytom/bytom/p2p/netutil"
22 "github.com/bytom/bytom/p2p/security"
23 "github.com/bytom/bytom/version"
29 minNumOutboundPeers = 4
33 //pre-define errors for connecting fail
35 ErrDuplicatePeer = errors.New("Duplicate peer")
36 ErrConnectSelf = errors.New("Connect self")
37 ErrConnectSpvPeer = errors.New("Outbound connect spv peer")
40 type discv interface {
41 ReadRandomNodes(buf []*dht.Node) (n int)
44 type lanDiscv interface {
45 Subscribe() (*event.Subscription, error)
49 type Security interface {
50 DoFilter(ip string, pubKey string) error
51 IsBanned(ip string, level byte, reason string) bool
52 RegisterFilter(filter security.Filter)
56 // Switch handles peer connections and exposes an API to receive incoming messages
57 // on `Reactors`. Each `Reactor` is responsible for handling incoming messages of one
58 // or more `Channels`. So while sending outgoing messages is typically performed on the peer,
59 // incoming messages are received on the reactor.
64 peerConfig *PeerConfig
66 reactors map[string]Reactor
67 chDescs []*connection.ChannelDescriptor
68 reactorsByCh map[byte]Reactor
71 nodeInfo *NodeInfo // our node info
72 nodePrivKey chainkd.XPrv // our node privkey
78 // NewSwitch create a new Switch and set discover.
79 func NewSwitch(config *cfg.Config) (*Switch, error) {
83 var discv *dht.Network
84 var lanDiscv *mdns.LANDiscover
86 xPrv := config.PrivateKey()
87 if !config.VaultMode {
89 l, listenAddr = GetListener(config.P2P)
90 discv, err = dht.NewDiscover(config, *xPrv, l.ExternalAddress().Port)
94 if config.P2P.LANDiscover {
95 lanDiscv = mdns.NewLANDiscover(mdns.NewProtocol(config.ChainID), int(l.ExternalAddress().Port))
99 return newSwitch(config, discv, lanDiscv, l, *xPrv, listenAddr)
102 // newSwitch creates a new Switch with the given config.
103 func newSwitch(config *cfg.Config, discv discv, lanDiscv lanDiscv, l Listener, priv chainkd.XPrv, listenAddr string) (*Switch, error) {
106 peerConfig: DefaultPeerConfig(config.P2P),
107 reactors: make(map[string]Reactor),
108 chDescs: make([]*connection.ChannelDescriptor, 0),
109 reactorsByCh: make(map[byte]Reactor),
111 dialing: cmn.NewCMap(),
115 nodeInfo: NewNodeInfo(config, priv.XPub().PublicKey(), listenAddr),
116 security: security.NewSecurity(config),
120 sw.BaseService = *cmn.NewBaseService(nil, "P2P Switch", sw)
124 // OnStart implements BaseService. It starts all the reactors, peers, and listeners.
125 func (sw *Switch) OnStart() error {
126 for _, reactor := range sw.reactors {
127 if err := reactor.Start(); err != nil {
132 sw.security.RegisterFilter(sw.nodeInfo)
133 sw.security.RegisterFilter(sw.peers)
134 if err := sw.security.Start(); err != nil {
138 for _, listener := range sw.listeners {
139 go sw.listenerRoutine(listener)
141 go sw.ensureOutboundPeersRoutine()
142 go sw.connectLANPeersRoutine()
147 // OnStop implements BaseService. It stops all listeners, peers, and reactors.
148 func (sw *Switch) OnStop() {
149 if sw.Config.P2P.LANDiscover {
153 for _, listener := range sw.listeners {
158 for _, peer := range sw.peers.List() {
160 sw.peers.Remove(peer)
163 for _, reactor := range sw.reactors {
168 // AddPeer performs the P2P handshake with a peer
169 // that already has a SecretConnection. If all goes well,
170 // it starts the peer and adds it to the switch.
171 // NOTE: This performs a blocking handshake before the peer is added.
172 // CONTRACT: If error is returned, peer is nil, and conn is immediately closed.
173 func (sw *Switch) AddPeer(pc *peerConn, isLAN bool) error {
174 peerNodeInfo, err := pc.HandshakeTimeout(sw.nodeInfo, sw.peerConfig.HandshakeTimeout)
179 if err := version.Status.CheckUpdate(sw.nodeInfo.Version, peerNodeInfo.Version, peerNodeInfo.RemoteAddr); err != nil {
182 if err := sw.nodeInfo.CompatibleWith(peerNodeInfo); err != nil {
186 peer := newPeer(pc, peerNodeInfo, sw.reactorsByCh, sw.chDescs, sw.StopPeerForError, isLAN)
187 if err := sw.security.DoFilter(peer.RemoteAddrHost(), hex.EncodeToString(peer.PubKey())); err != nil {
191 if pc.outbound && !peer.ServiceFlag().IsEnable(consensus.SFFullNode) {
192 return ErrConnectSpvPeer
197 if err := sw.startInitPeer(peer); err != nil {
202 return sw.peers.Add(peer)
205 // AddReactor adds the given reactor to the switch.
206 // NOTE: Not goroutine safe.
207 func (sw *Switch) AddReactor(name string, reactor Reactor) Reactor {
208 // Validate the reactor.
209 // No two reactors can share the same channel.
210 for _, chDesc := range reactor.GetChannels() {
212 if sw.reactorsByCh[chID] != nil {
213 cmn.PanicSanity(fmt.Sprintf("Channel %X has multiple reactors %v & %v", chID, sw.reactorsByCh[chID], reactor))
215 sw.chDescs = append(sw.chDescs, chDesc)
216 sw.reactorsByCh[chID] = reactor
218 sw.reactors[name] = reactor
219 reactor.SetSwitch(sw)
223 // AddListener adds the given listener to the switch for listening to incoming peer connections.
224 // NOTE: Not goroutine safe.
225 func (sw *Switch) AddListener(l Listener) {
226 sw.listeners = append(sw.listeners, l)
229 //DialPeerWithAddress dial node from net address
230 func (sw *Switch) DialPeerWithAddress(addr *NetAddress) error {
231 log.WithFields(log.Fields{"module": logModule, "address": addr}).Debug("Dialing peer")
232 sw.dialing.Set(addr.IP.String(), addr)
233 defer sw.dialing.Delete(addr.IP.String())
234 if err := sw.security.DoFilter(addr.IP.String(), ""); err != nil {
238 pc, err := newOutboundPeerConn(addr, sw.nodePrivKey, sw.peerConfig)
240 log.WithFields(log.Fields{"module": logModule, "address": addr, " err": err}).Warn("DialPeer fail on newOutboundPeerConn")
244 if err = sw.AddPeer(pc, addr.isLAN); err != nil {
245 log.WithFields(log.Fields{"module": logModule, "address": addr, " err": err}).Warn("DialPeer fail on switch AddPeer")
249 log.WithFields(log.Fields{"module": logModule, "address": addr, "peer num": sw.peers.Size()}).Debug("DialPeer added peer")
253 func (sw *Switch) IsBanned(ip string, level byte, reason string) bool {
254 return sw.security.IsBanned(ip, level, reason)
257 //IsDialing prevent duplicate dialing
258 func (sw *Switch) IsDialing(addr *NetAddress) bool {
259 return sw.dialing.Has(addr.IP.String())
262 // IsListening returns true if the switch has at least one listener.
263 // NOTE: Not goroutine safe.
264 func (sw *Switch) IsListening() bool {
265 return len(sw.listeners) > 0
268 // Listeners returns the list of listeners the switch listens on.
269 // NOTE: Not goroutine safe.
270 func (sw *Switch) Listeners() []Listener {
274 // NumPeers Returns the count of outbound/inbound and outbound-dialing peers.
275 func (sw *Switch) NumPeers() (lan, outbound, inbound, dialing int) {
276 peers := sw.peers.List()
277 for _, peer := range peers {
278 if peer.outbound && !peer.isLAN {
287 dialing = sw.dialing.Size()
291 // NodeInfo returns the switch's NodeInfo.
292 // NOTE: Not goroutine safe.
293 func (sw *Switch) NodeInfo() *NodeInfo {
297 //Peers return switch peerset
298 func (sw *Switch) Peers() *PeerSet {
302 // StopPeerForError disconnects from a peer due to external error.
303 func (sw *Switch) StopPeerForError(peer *Peer, reason interface{}) {
304 log.WithFields(log.Fields{"module": logModule, "peer": peer, " err": reason}).Debug("stopping peer for error")
305 sw.stopAndRemovePeer(peer, reason)
308 // StopPeerGracefully disconnect from a peer gracefully.
309 func (sw *Switch) StopPeerGracefully(peerID string) {
310 if peer := sw.peers.Get(peerID); peer != nil {
311 sw.stopAndRemovePeer(peer, nil)
315 func (sw *Switch) addPeerWithConnection(conn net.Conn) error {
316 peerConn, err := newInboundPeerConn(conn, sw.nodePrivKey, sw.Config.P2P)
318 if err := conn.Close(); err != nil {
319 log.WithFields(log.Fields{"module": logModule, "remote peer:": conn.RemoteAddr().String(), " err:": err}).Warn("closes connection err")
324 if err = sw.AddPeer(peerConn, false); err != nil {
325 if err := conn.Close(); err != nil {
326 log.WithFields(log.Fields{"module": logModule, "remote peer:": conn.RemoteAddr().String(), " err:": err}).Warn("closes connection err")
331 log.WithFields(log.Fields{"module": logModule, "address": conn.RemoteAddr().String(), "peer num": sw.peers.Size()}).Debug("add inbound peer")
335 func (sw *Switch) connectLANPeers(lanPeer mdns.LANPeerEvent) {
336 lanPeers, _, _, numDialing := sw.NumPeers()
337 numToDial := maxNumLANPeers - lanPeers
338 log.WithFields(log.Fields{"module": logModule, "numDialing": numDialing, "numToDial": numToDial}).Debug("connect LAN peers")
342 addresses := make([]*NetAddress, 0)
343 for i := 0; i < len(lanPeer.IP); i++ {
344 addresses = append(addresses, NewLANNetAddressIPPort(lanPeer.IP[i], uint16(lanPeer.Port)))
346 sw.dialPeers(addresses)
349 func (sw *Switch) connectLANPeersRoutine() {
350 if !sw.Config.P2P.LANDiscover {
354 lanPeerEventSub, err := sw.lanDiscv.Subscribe()
356 log.WithFields(log.Fields{"module": logModule, "err": err}).Warning("subscribe LAN Peer Event error")
362 case obj, ok := <-lanPeerEventSub.Chan():
364 log.WithFields(log.Fields{"module": logModule}).Warning("LAN peer event subscription channel closed")
367 LANPeer, ok := obj.Data.(mdns.LANPeerEvent)
369 log.WithFields(log.Fields{"module": logModule}).Error("event type error")
372 sw.connectLANPeers(LANPeer)
379 func (sw *Switch) listenerRoutine(l Listener) {
381 inConn, ok := <-l.Connections()
386 // disconnect if we alrady have MaxNumPeers
387 if sw.peers.Size() >= sw.Config.P2P.MaxNumPeers {
388 if err := inConn.Close(); err != nil {
389 log.WithFields(log.Fields{"module": logModule, "remote peer:": inConn.RemoteAddr().String(), " err:": err}).Warn("closes connection err")
391 log.Info("Ignoring inbound connection: already have enough peers.")
395 // New inbound connection!
396 if err := sw.addPeerWithConnection(inConn); err != nil {
397 log.Info("Ignoring inbound connection: error while adding peer.", " address:", inConn.RemoteAddr().String(), " error:", err)
403 func (sw *Switch) dialPeerWorker(a *NetAddress, wg *sync.WaitGroup) {
404 if err := sw.DialPeerWithAddress(a); err != nil {
405 log.WithFields(log.Fields{"module": logModule, "addr": a, "err": err}).Warn("dialPeerWorker fail on dial peer")
410 func (sw *Switch) dialPeers(addresses []*NetAddress) {
411 connectedPeers := make(map[string]struct{})
412 for _, peer := range sw.Peers().List() {
413 connectedPeers[peer.RemoteAddrHost()] = struct{}{}
416 var wg sync.WaitGroup
417 for _, address := range addresses {
418 if sw.NodeInfo().ListenAddr == address.String() {
421 if dialling := sw.IsDialing(address); dialling {
424 if _, ok := connectedPeers[address.IP.String()]; ok {
429 go sw.dialPeerWorker(address, &wg)
434 func (sw *Switch) ensureKeepConnectPeers() {
435 keepDials := netutil.CheckAndSplitAddresses(sw.Config.P2P.KeepDial)
436 addresses := make([]*NetAddress, 0)
437 for _, keepDial := range keepDials {
438 address, err := NewNetAddressString(keepDial)
440 log.WithFields(log.Fields{"module": logModule, "err": err, "address": keepDial}).Warn("parse address to NetAddress")
443 addresses = append(addresses, address)
446 sw.dialPeers(addresses)
449 func (sw *Switch) ensureOutboundPeers() {
450 lanPeers, numOutPeers, _, numDialing := sw.NumPeers()
451 numToDial := minNumOutboundPeers - (numOutPeers + numDialing)
452 log.WithFields(log.Fields{"module": logModule, "numOutPeers": numOutPeers, "LANPeers": lanPeers, "numDialing": numDialing, "numToDial": numToDial}).Debug("ensure peers")
457 nodes := make([]*dht.Node, numToDial)
458 n := sw.discv.ReadRandomNodes(nodes)
459 addresses := make([]*NetAddress, 0)
460 for i := 0; i < n; i++ {
461 address := NewNetAddressIPPort(nodes[i].IP, nodes[i].TCP)
462 addresses = append(addresses, address)
464 sw.dialPeers(addresses)
467 func (sw *Switch) ensureOutboundPeersRoutine() {
468 sw.ensureKeepConnectPeers()
469 sw.ensureOutboundPeers()
471 ticker := time.NewTicker(10 * time.Second)
477 sw.ensureKeepConnectPeers()
478 sw.ensureOutboundPeers()
485 func (sw *Switch) startInitPeer(peer *Peer) error {
486 // spawn send/recv routines
487 if err := peer.Start(); err != nil {
488 log.WithFields(log.Fields{"module": logModule, "remote peer:": peer.RemoteAddr, " err:": err}).Error("init peer err")
491 for _, reactor := range sw.reactors {
492 if err := reactor.AddPeer(peer); err != nil {
499 func (sw *Switch) stopAndRemovePeer(peer *Peer, reason interface{}) {
500 sw.peers.Remove(peer)
501 for _, reactor := range sw.reactors {
502 reactor.RemovePeer(peer, reason)
506 sentStatus, receivedStatus := peer.TrafficStatus()
507 log.WithFields(log.Fields{
509 "address": peer.Addr().String(),
511 "duration": sentStatus.Duration.String(),
512 "total_sent": sentStatus.Bytes,
513 "total_received": receivedStatus.Bytes,
514 "average_sent_rate": sentStatus.AvgRate,
515 "average_received_rate": receivedStatus.AvgRate,
516 "peer num": sw.peers.Size(),
517 }).Info("disconnect with peer")