10 log "github.com/sirupsen/logrus"
11 "github.com/tendermint/go-crypto"
12 cmn "github.com/tendermint/tmlibs/common"
14 cfg "github.com/bytom/bytom/config"
15 "github.com/bytom/bytom/consensus"
16 "github.com/bytom/bytom/errors"
17 "github.com/bytom/bytom/event"
18 "github.com/bytom/bytom/p2p/connection"
19 "github.com/bytom/bytom/p2p/discover/dht"
20 "github.com/bytom/bytom/p2p/discover/mdns"
21 "github.com/bytom/bytom/p2p/netutil"
22 "github.com/bytom/bytom/p2p/security"
23 "github.com/bytom/bytom/version"
29 minNumOutboundPeers = 4
33 //pre-define errors for connecting fail
35 ErrDuplicatePeer = errors.New("Duplicate peer")
36 ErrConnectSelf = errors.New("Connect self")
37 ErrConnectSpvPeer = errors.New("Outbound connect spv peer")
40 type discv interface {
41 ReadRandomNodes(buf []*dht.Node) (n int)
44 type lanDiscv interface {
45 Subscribe() (*event.Subscription, error)
49 type Security interface {
50 DoFilter(ip string, pubKey string) error
51 IsBanned(ip string, level byte, reason string) bool
52 RegisterFilter(filter security.Filter)
56 // Switch handles peer connections and exposes an API to receive incoming messages
57 // on `Reactors`. Each `Reactor` is responsible for handling incoming messages of one
58 // or more `Channels`. So while sending outgoing messages is typically performed on the peer,
59 // incoming messages are received on the reactor.
64 peerConfig *PeerConfig
66 reactors map[string]Reactor
67 chDescs []*connection.ChannelDescriptor
68 reactorsByCh map[byte]Reactor
71 nodeInfo *NodeInfo // our node info
72 nodePrivKey crypto.PrivKeyEd25519 // our node privkey
78 // NewSwitch create a new Switch and set discover.
79 func NewSwitch(config *cfg.Config) (*Switch, error) {
83 var discv *dht.Network
84 var lanDiscv *mdns.LANDiscover
86 bytes := config.PrivateKey().Bytes()
88 copy(newKey[:], bytes)
89 privKey := crypto.PrivKeyEd25519(newKey)
90 if !config.VaultMode {
92 l, listenAddr = GetListener(config.P2P)
93 discv, err = dht.NewDiscover(config, ed25519.PrivateKey(bytes), l.ExternalAddress().Port)
97 if config.P2P.LANDiscover {
98 lanDiscv = mdns.NewLANDiscover(mdns.NewProtocol(config.ChainID), int(l.ExternalAddress().Port))
102 return newSwitch(config, discv, lanDiscv, l, privKey, listenAddr)
105 // newSwitch creates a new Switch with the given config.
106 func newSwitch(config *cfg.Config, discv discv, lanDiscv lanDiscv, l Listener, priv crypto.PrivKeyEd25519, listenAddr string) (*Switch, error) {
109 peerConfig: DefaultPeerConfig(config.P2P),
110 reactors: make(map[string]Reactor),
111 chDescs: make([]*connection.ChannelDescriptor, 0),
112 reactorsByCh: make(map[byte]Reactor),
114 dialing: cmn.NewCMap(),
118 nodeInfo: NewNodeInfo(config, priv.PubKey().Unwrap().(crypto.PubKeyEd25519), listenAddr),
119 security: security.NewSecurity(config),
123 sw.BaseService = *cmn.NewBaseService(nil, "P2P Switch", sw)
127 // OnStart implements BaseService. It starts all the reactors, peers, and listeners.
128 func (sw *Switch) OnStart() error {
129 for _, reactor := range sw.reactors {
130 if err := reactor.Start(); err != nil {
135 sw.security.RegisterFilter(sw.nodeInfo)
136 sw.security.RegisterFilter(sw.peers)
137 if err := sw.security.Start(); err != nil {
141 for _, listener := range sw.listeners {
142 go sw.listenerRoutine(listener)
144 go sw.ensureOutboundPeersRoutine()
145 go sw.connectLANPeersRoutine()
150 // OnStop implements BaseService. It stops all listeners, peers, and reactors.
151 func (sw *Switch) OnStop() {
152 if sw.Config.P2P.LANDiscover {
156 for _, listener := range sw.listeners {
161 for _, peer := range sw.peers.List() {
163 sw.peers.Remove(peer)
166 for _, reactor := range sw.reactors {
171 // AddPeer performs the P2P handshake with a peer
172 // that already has a SecretConnection. If all goes well,
173 // it starts the peer and adds it to the switch.
174 // NOTE: This performs a blocking handshake before the peer is added.
175 // CONTRACT: If error is returned, peer is nil, and conn is immediately closed.
176 func (sw *Switch) AddPeer(pc *peerConn, isLAN bool) error {
177 peerNodeInfo, err := pc.HandshakeTimeout(sw.nodeInfo, sw.peerConfig.HandshakeTimeout)
182 if err := version.Status.CheckUpdate(sw.nodeInfo.Version, peerNodeInfo.Version, peerNodeInfo.RemoteAddr); err != nil {
185 if err := sw.nodeInfo.CompatibleWith(peerNodeInfo); err != nil {
189 peer := newPeer(pc, peerNodeInfo, sw.reactorsByCh, sw.chDescs, sw.StopPeerForError, isLAN)
190 if err := sw.security.DoFilter(peer.RemoteAddrHost(), peer.PubKey().String()); err != nil {
194 if pc.outbound && !peer.ServiceFlag().IsEnable(consensus.SFFullNode) {
195 return ErrConnectSpvPeer
200 if err := sw.startInitPeer(peer); err != nil {
205 return sw.peers.Add(peer)
208 // AddReactor adds the given reactor to the switch.
209 // NOTE: Not goroutine safe.
210 func (sw *Switch) AddReactor(name string, reactor Reactor) Reactor {
211 // Validate the reactor.
212 // No two reactors can share the same channel.
213 for _, chDesc := range reactor.GetChannels() {
215 if sw.reactorsByCh[chID] != nil {
216 cmn.PanicSanity(fmt.Sprintf("Channel %X has multiple reactors %v & %v", chID, sw.reactorsByCh[chID], reactor))
218 sw.chDescs = append(sw.chDescs, chDesc)
219 sw.reactorsByCh[chID] = reactor
221 sw.reactors[name] = reactor
222 reactor.SetSwitch(sw)
226 // AddListener adds the given listener to the switch for listening to incoming peer connections.
227 // NOTE: Not goroutine safe.
228 func (sw *Switch) AddListener(l Listener) {
229 sw.listeners = append(sw.listeners, l)
232 //DialPeerWithAddress dial node from net address
233 func (sw *Switch) DialPeerWithAddress(addr *NetAddress) error {
234 log.WithFields(log.Fields{"module": logModule, "address": addr}).Debug("Dialing peer")
235 sw.dialing.Set(addr.IP.String(), addr)
236 defer sw.dialing.Delete(addr.IP.String())
237 if err := sw.security.DoFilter(addr.IP.String(), ""); err != nil {
241 pc, err := newOutboundPeerConn(addr, sw.nodePrivKey, sw.peerConfig)
243 log.WithFields(log.Fields{"module": logModule, "address": addr, " err": err}).Warn("DialPeer fail on newOutboundPeerConn")
247 if err = sw.AddPeer(pc, addr.isLAN); err != nil {
248 log.WithFields(log.Fields{"module": logModule, "address": addr, " err": err}).Warn("DialPeer fail on switch AddPeer")
252 log.WithFields(log.Fields{"module": logModule, "address": addr, "peer num": sw.peers.Size()}).Debug("DialPeer added peer")
256 func (sw *Switch) IsBanned(ip string, level byte, reason string) bool {
257 return sw.security.IsBanned(ip, level, reason)
260 //IsDialing prevent duplicate dialing
261 func (sw *Switch) IsDialing(addr *NetAddress) bool {
262 return sw.dialing.Has(addr.IP.String())
265 // IsListening returns true if the switch has at least one listener.
266 // NOTE: Not goroutine safe.
267 func (sw *Switch) IsListening() bool {
268 return len(sw.listeners) > 0
271 // Listeners returns the list of listeners the switch listens on.
272 // NOTE: Not goroutine safe.
273 func (sw *Switch) Listeners() []Listener {
277 // NumPeers Returns the count of outbound/inbound and outbound-dialing peers.
278 func (sw *Switch) NumPeers() (lan, outbound, inbound, dialing int) {
279 peers := sw.peers.List()
280 for _, peer := range peers {
281 if peer.outbound && !peer.isLAN {
290 dialing = sw.dialing.Size()
294 // NodeInfo returns the switch's NodeInfo.
295 // NOTE: Not goroutine safe.
296 func (sw *Switch) NodeInfo() *NodeInfo {
300 //Peers return switch peerset
301 func (sw *Switch) Peers() *PeerSet {
305 // StopPeerForError disconnects from a peer due to external error.
306 func (sw *Switch) StopPeerForError(peer *Peer, reason interface{}) {
307 log.WithFields(log.Fields{"module": logModule, "peer": peer, " err": reason}).Debug("stopping peer for error")
308 sw.stopAndRemovePeer(peer, reason)
311 // StopPeerGracefully disconnect from a peer gracefully.
312 func (sw *Switch) StopPeerGracefully(peerID string) {
313 if peer := sw.peers.Get(peerID); peer != nil {
314 sw.stopAndRemovePeer(peer, nil)
318 func (sw *Switch) addPeerWithConnection(conn net.Conn) error {
319 peerConn, err := newInboundPeerConn(conn, sw.nodePrivKey, sw.Config.P2P)
321 if err := conn.Close(); err != nil {
322 log.WithFields(log.Fields{"module": logModule, "remote peer:": conn.RemoteAddr().String(), " err:": err}).Warn("closes connection err")
327 if err = sw.AddPeer(peerConn, false); err != nil {
328 if err := conn.Close(); err != nil {
329 log.WithFields(log.Fields{"module": logModule, "remote peer:": conn.RemoteAddr().String(), " err:": err}).Warn("closes connection err")
334 log.WithFields(log.Fields{"module": logModule, "address": conn.RemoteAddr().String(), "peer num": sw.peers.Size()}).Debug("add inbound peer")
338 func (sw *Switch) connectLANPeers(lanPeer mdns.LANPeerEvent) {
339 lanPeers, _, _, numDialing := sw.NumPeers()
340 numToDial := maxNumLANPeers - lanPeers
341 log.WithFields(log.Fields{"module": logModule, "numDialing": numDialing, "numToDial": numToDial}).Debug("connect LAN peers")
345 addresses := make([]*NetAddress, 0)
346 for i := 0; i < len(lanPeer.IP); i++ {
347 addresses = append(addresses, NewLANNetAddressIPPort(lanPeer.IP[i], uint16(lanPeer.Port)))
349 sw.dialPeers(addresses)
352 func (sw *Switch) connectLANPeersRoutine() {
353 if !sw.Config.P2P.LANDiscover {
357 lanPeerEventSub, err := sw.lanDiscv.Subscribe()
359 log.WithFields(log.Fields{"module": logModule, "err": err}).Warning("subscribe LAN Peer Event error")
365 case obj, ok := <-lanPeerEventSub.Chan():
367 log.WithFields(log.Fields{"module": logModule}).Warning("LAN peer event subscription channel closed")
370 LANPeer, ok := obj.Data.(mdns.LANPeerEvent)
372 log.WithFields(log.Fields{"module": logModule}).Error("event type error")
375 sw.connectLANPeers(LANPeer)
382 func (sw *Switch) listenerRoutine(l Listener) {
384 inConn, ok := <-l.Connections()
389 // disconnect if we alrady have MaxNumPeers
390 if sw.peers.Size() >= sw.Config.P2P.MaxNumPeers {
391 if err := inConn.Close(); err != nil {
392 log.WithFields(log.Fields{"module": logModule, "remote peer:": inConn.RemoteAddr().String(), " err:": err}).Warn("closes connection err")
394 log.Info("Ignoring inbound connection: already have enough peers.")
398 // New inbound connection!
399 if err := sw.addPeerWithConnection(inConn); err != nil {
400 log.Info("Ignoring inbound connection: error while adding peer.", " address:", inConn.RemoteAddr().String(), " error:", err)
406 func (sw *Switch) dialPeerWorker(a *NetAddress, wg *sync.WaitGroup) {
407 if err := sw.DialPeerWithAddress(a); err != nil {
408 log.WithFields(log.Fields{"module": logModule, "addr": a, "err": err}).Warn("dialPeerWorker fail on dial peer")
413 func (sw *Switch) dialPeers(addresses []*NetAddress) {
414 connectedPeers := make(map[string]struct{})
415 for _, peer := range sw.Peers().List() {
416 connectedPeers[peer.RemoteAddrHost()] = struct{}{}
419 var wg sync.WaitGroup
420 for _, address := range addresses {
421 if sw.NodeInfo().ListenAddr == address.String() {
424 if dialling := sw.IsDialing(address); dialling {
427 if _, ok := connectedPeers[address.IP.String()]; ok {
432 go sw.dialPeerWorker(address, &wg)
437 func (sw *Switch) ensureKeepConnectPeers() {
438 keepDials := netutil.CheckAndSplitAddresses(sw.Config.P2P.KeepDial)
439 addresses := make([]*NetAddress, 0)
440 for _, keepDial := range keepDials {
441 address, err := NewNetAddressString(keepDial)
443 log.WithFields(log.Fields{"module": logModule, "err": err, "address": keepDial}).Warn("parse address to NetAddress")
446 addresses = append(addresses, address)
449 sw.dialPeers(addresses)
452 func (sw *Switch) ensureOutboundPeers() {
453 lanPeers, numOutPeers, _, numDialing := sw.NumPeers()
454 numToDial := minNumOutboundPeers - (numOutPeers + numDialing)
455 log.WithFields(log.Fields{"module": logModule, "numOutPeers": numOutPeers, "LANPeers": lanPeers, "numDialing": numDialing, "numToDial": numToDial}).Debug("ensure peers")
460 nodes := make([]*dht.Node, numToDial)
461 n := sw.discv.ReadRandomNodes(nodes)
462 addresses := make([]*NetAddress, 0)
463 for i := 0; i < n; i++ {
464 address := NewNetAddressIPPort(nodes[i].IP, nodes[i].TCP)
465 addresses = append(addresses, address)
467 sw.dialPeers(addresses)
470 func (sw *Switch) ensureOutboundPeersRoutine() {
471 sw.ensureKeepConnectPeers()
472 sw.ensureOutboundPeers()
474 ticker := time.NewTicker(10 * time.Second)
480 sw.ensureKeepConnectPeers()
481 sw.ensureOutboundPeers()
488 func (sw *Switch) startInitPeer(peer *Peer) error {
489 // spawn send/recv routines
490 if err := peer.Start(); err != nil {
491 log.WithFields(log.Fields{"module": logModule, "remote peer:": peer.RemoteAddr, " err:": err}).Error("init peer err")
494 for _, reactor := range sw.reactors {
495 if err := reactor.AddPeer(peer); err != nil {
502 func (sw *Switch) stopAndRemovePeer(peer *Peer, reason interface{}) {
503 sw.peers.Remove(peer)
504 for _, reactor := range sw.reactors {
505 reactor.RemovePeer(peer, reason)
509 sentStatus, receivedStatus := peer.TrafficStatus()
510 log.WithFields(log.Fields{
512 "address": peer.Addr().String(),
514 "duration": sentStatus.Duration.String(),
515 "total_sent": sentStatus.Bytes,
516 "total_received": receivedStatus.Bytes,
517 "average_sent_rate": sentStatus.AvgRate,
518 "average_received_rate": receivedStatus.AvgRate,
519 "peer num": sw.peers.Size(),
520 }).Info("disconnect with peer")