10 log "github.com/sirupsen/logrus"
11 "github.com/tendermint/go-crypto"
12 cmn "github.com/tendermint/tmlibs/common"
14 cfg "github.com/bytom/bytom/config"
15 "github.com/bytom/bytom/consensus"
16 "github.com/bytom/bytom/crypto/ed25519"
17 "github.com/bytom/bytom/errors"
18 "github.com/bytom/bytom/event"
19 "github.com/bytom/bytom/p2p/connection"
20 "github.com/bytom/bytom/p2p/discover/dht"
21 "github.com/bytom/bytom/p2p/discover/mdns"
22 "github.com/bytom/bytom/p2p/netutil"
23 "github.com/bytom/bytom/p2p/security"
24 "github.com/bytom/bytom/version"
30 minNumOutboundPeers = 4
34 //pre-define errors for connecting fail
36 ErrDuplicatePeer = errors.New("Duplicate peer")
37 ErrConnectSelf = errors.New("Connect self")
38 ErrConnectSpvPeer = errors.New("Outbound connect spv peer")
41 type discv interface {
42 ReadRandomNodes(buf []*dht.Node) (n int)
45 type lanDiscv interface {
46 Subscribe() (*event.Subscription, error)
50 type Security interface {
51 DoFilter(ip string, pubKey string) error
52 IsBanned(ip string, level byte, reason string) bool
53 RegisterFilter(filter security.Filter)
57 // Switch handles peer connections and exposes an API to receive incoming messages
58 // on `Reactors`. Each `Reactor` is responsible for handling incoming messages of one
59 // or more `Channels`. So while sending outgoing messages is typically performed on the peer,
60 // incoming messages are received on the reactor.
65 peerConfig *PeerConfig
67 reactors map[string]Reactor
68 chDescs []*connection.ChannelDescriptor
69 reactorsByCh map[byte]Reactor
72 nodeInfo *NodeInfo // our node info
73 nodePrivKey crypto.PrivKeyEd25519 // our node privkey
79 // NewSwitch create a new Switch and set discover.
80 func NewSwitch(config *cfg.Config) (*Switch, error) {
84 var discv *dht.Network
85 var lanDiscv *mdns.LANDiscover
87 config.P2P.PrivateKey, err = config.NodeKey()
92 bytes, err := hex.DecodeString(config.P2P.PrivateKey)
98 copy(newKey[:], bytes)
99 privKey := crypto.PrivKeyEd25519(newKey)
100 if !config.VaultMode {
102 l, listenAddr = GetListener(config.P2P)
103 discv, err = dht.NewDiscover(config, ed25519.PrivateKey(bytes), l.ExternalAddress().Port)
107 if config.P2P.LANDiscover {
108 lanDiscv = mdns.NewLANDiscover(mdns.NewProtocol(config.ChainID), int(l.ExternalAddress().Port))
112 return newSwitch(config, discv, lanDiscv, l, privKey, listenAddr)
115 // newSwitch creates a new Switch with the given config.
116 func newSwitch(config *cfg.Config, discv discv, lanDiscv lanDiscv, l Listener, priv crypto.PrivKeyEd25519, listenAddr string) (*Switch, error) {
119 peerConfig: DefaultPeerConfig(config.P2P),
120 reactors: make(map[string]Reactor),
121 chDescs: make([]*connection.ChannelDescriptor, 0),
122 reactorsByCh: make(map[byte]Reactor),
124 dialing: cmn.NewCMap(),
128 nodeInfo: NewNodeInfo(config, priv.PubKey().Unwrap().(crypto.PubKeyEd25519), listenAddr),
129 security: security.NewSecurity(config),
133 sw.BaseService = *cmn.NewBaseService(nil, "P2P Switch", sw)
137 // OnStart implements BaseService. It starts all the reactors, peers, and listeners.
138 func (sw *Switch) OnStart() error {
139 for _, reactor := range sw.reactors {
140 if _, err := reactor.Start(); err != nil {
145 sw.security.RegisterFilter(sw.nodeInfo)
146 sw.security.RegisterFilter(sw.peers)
147 if err := sw.security.Start(); err != nil {
151 for _, listener := range sw.listeners {
152 go sw.listenerRoutine(listener)
154 go sw.ensureOutboundPeersRoutine()
155 go sw.connectLANPeersRoutine()
160 // OnStop implements BaseService. It stops all listeners, peers, and reactors.
161 func (sw *Switch) OnStop() {
162 if sw.Config.P2P.LANDiscover {
166 for _, listener := range sw.listeners {
171 for _, peer := range sw.peers.List() {
173 sw.peers.Remove(peer)
176 for _, reactor := range sw.reactors {
181 // AddPeer performs the P2P handshake with a peer
182 // that already has a SecretConnection. If all goes well,
183 // it starts the peer and adds it to the switch.
184 // NOTE: This performs a blocking handshake before the peer is added.
185 // CONTRACT: If error is returned, peer is nil, and conn is immediately closed.
186 func (sw *Switch) AddPeer(pc *peerConn, isLAN bool) error {
187 peerNodeInfo, err := pc.HandshakeTimeout(sw.nodeInfo, sw.peerConfig.HandshakeTimeout)
192 if err := version.Status.CheckUpdate(sw.nodeInfo.Version, peerNodeInfo.Version, peerNodeInfo.RemoteAddr); err != nil {
195 if err := sw.nodeInfo.CompatibleWith(peerNodeInfo); err != nil {
199 peer := newPeer(pc, peerNodeInfo, sw.reactorsByCh, sw.chDescs, sw.StopPeerForError, isLAN)
200 if err := sw.security.DoFilter(peer.RemoteAddrHost(), peer.PubKey().String()); err != nil {
204 if pc.outbound && !peer.ServiceFlag().IsEnable(consensus.SFFullNode) {
205 return ErrConnectSpvPeer
210 if err := sw.startInitPeer(peer); err != nil {
215 return sw.peers.Add(peer)
218 // AddReactor adds the given reactor to the switch.
219 // NOTE: Not goroutine safe.
220 func (sw *Switch) AddReactor(name string, reactor Reactor) Reactor {
221 // Validate the reactor.
222 // No two reactors can share the same channel.
223 for _, chDesc := range reactor.GetChannels() {
225 if sw.reactorsByCh[chID] != nil {
226 cmn.PanicSanity(fmt.Sprintf("Channel %X has multiple reactors %v & %v", chID, sw.reactorsByCh[chID], reactor))
228 sw.chDescs = append(sw.chDescs, chDesc)
229 sw.reactorsByCh[chID] = reactor
231 sw.reactors[name] = reactor
232 reactor.SetSwitch(sw)
236 // AddListener adds the given listener to the switch for listening to incoming peer connections.
237 // NOTE: Not goroutine safe.
238 func (sw *Switch) AddListener(l Listener) {
239 sw.listeners = append(sw.listeners, l)
242 //DialPeerWithAddress dial node from net address
243 func (sw *Switch) DialPeerWithAddress(addr *NetAddress) error {
244 log.WithFields(log.Fields{"module": logModule, "address": addr}).Debug("Dialing peer")
245 sw.dialing.Set(addr.IP.String(), addr)
246 defer sw.dialing.Delete(addr.IP.String())
247 if err := sw.security.DoFilter(addr.IP.String(), ""); err != nil {
251 pc, err := newOutboundPeerConn(addr, sw.nodePrivKey, sw.peerConfig)
253 log.WithFields(log.Fields{"module": logModule, "address": addr, " err": err}).Warn("DialPeer fail on newOutboundPeerConn")
257 if err = sw.AddPeer(pc, addr.isLAN); err != nil {
258 log.WithFields(log.Fields{"module": logModule, "address": addr, " err": err}).Warn("DialPeer fail on switch AddPeer")
262 log.WithFields(log.Fields{"module": logModule, "address": addr, "peer num": sw.peers.Size()}).Debug("DialPeer added peer")
266 func (sw *Switch) IsBanned(ip string, level byte, reason string) bool {
267 return sw.security.IsBanned(ip, level, reason)
270 //IsDialing prevent duplicate dialing
271 func (sw *Switch) IsDialing(addr *NetAddress) bool {
272 return sw.dialing.Has(addr.IP.String())
275 // IsListening returns true if the switch has at least one listener.
276 // NOTE: Not goroutine safe.
277 func (sw *Switch) IsListening() bool {
278 return len(sw.listeners) > 0
281 // Listeners returns the list of listeners the switch listens on.
282 // NOTE: Not goroutine safe.
283 func (sw *Switch) Listeners() []Listener {
287 // NumPeers Returns the count of outbound/inbound and outbound-dialing peers.
288 func (sw *Switch) NumPeers() (lan, outbound, inbound, dialing int) {
289 peers := sw.peers.List()
290 for _, peer := range peers {
291 if peer.outbound && !peer.isLAN {
300 dialing = sw.dialing.Size()
304 // NodeInfo returns the switch's NodeInfo.
305 // NOTE: Not goroutine safe.
306 func (sw *Switch) NodeInfo() *NodeInfo {
310 //Peers return switch peerset
311 func (sw *Switch) Peers() *PeerSet {
315 // StopPeerForError disconnects from a peer due to external error.
316 func (sw *Switch) StopPeerForError(peer *Peer, reason interface{}) {
317 log.WithFields(log.Fields{"module": logModule, "peer": peer, " err": reason}).Debug("stopping peer for error")
318 sw.stopAndRemovePeer(peer, reason)
321 // StopPeerGracefully disconnect from a peer gracefully.
322 func (sw *Switch) StopPeerGracefully(peerID string) {
323 if peer := sw.peers.Get(peerID); peer != nil {
324 sw.stopAndRemovePeer(peer, nil)
328 func (sw *Switch) addPeerWithConnection(conn net.Conn) error {
329 peerConn, err := newInboundPeerConn(conn, sw.nodePrivKey, sw.Config.P2P)
331 if err := conn.Close(); err != nil {
332 log.WithFields(log.Fields{"module": logModule, "remote peer:": conn.RemoteAddr().String(), " err:": err}).Warn("closes connection err")
337 if err = sw.AddPeer(peerConn, false); err != nil {
338 if err := conn.Close(); err != nil {
339 log.WithFields(log.Fields{"module": logModule, "remote peer:": conn.RemoteAddr().String(), " err:": err}).Warn("closes connection err")
344 log.WithFields(log.Fields{"module": logModule, "address": conn.RemoteAddr().String(), "peer num": sw.peers.Size()}).Debug("add inbound peer")
348 func (sw *Switch) connectLANPeers(lanPeer mdns.LANPeerEvent) {
349 lanPeers, _, _, numDialing := sw.NumPeers()
350 numToDial := maxNumLANPeers - lanPeers
351 log.WithFields(log.Fields{"module": logModule, "numDialing": numDialing, "numToDial": numToDial}).Debug("connect LAN peers")
355 addresses := make([]*NetAddress, 0)
356 for i := 0; i < len(lanPeer.IP); i++ {
357 addresses = append(addresses, NewLANNetAddressIPPort(lanPeer.IP[i], uint16(lanPeer.Port)))
359 sw.dialPeers(addresses)
362 func (sw *Switch) connectLANPeersRoutine() {
363 if !sw.Config.P2P.LANDiscover {
367 lanPeerEventSub, err := sw.lanDiscv.Subscribe()
369 log.WithFields(log.Fields{"module": logModule, "err": err}).Warning("subscribe LAN Peer Event error")
375 case obj, ok := <-lanPeerEventSub.Chan():
377 log.WithFields(log.Fields{"module": logModule}).Warning("LAN peer event subscription channel closed")
380 LANPeer, ok := obj.Data.(mdns.LANPeerEvent)
382 log.WithFields(log.Fields{"module": logModule}).Error("event type error")
385 sw.connectLANPeers(LANPeer)
392 func (sw *Switch) listenerRoutine(l Listener) {
394 inConn, ok := <-l.Connections()
399 // disconnect if we alrady have MaxNumPeers
400 if sw.peers.Size() >= sw.Config.P2P.MaxNumPeers {
401 if err := inConn.Close(); err != nil {
402 log.WithFields(log.Fields{"module": logModule, "remote peer:": inConn.RemoteAddr().String(), " err:": err}).Warn("closes connection err")
404 log.Info("Ignoring inbound connection: already have enough peers.")
408 // New inbound connection!
409 if err := sw.addPeerWithConnection(inConn); err != nil {
410 log.Info("Ignoring inbound connection: error while adding peer.", " address:", inConn.RemoteAddr().String(), " error:", err)
416 func (sw *Switch) dialPeerWorker(a *NetAddress, wg *sync.WaitGroup) {
417 if err := sw.DialPeerWithAddress(a); err != nil {
418 log.WithFields(log.Fields{"module": logModule, "addr": a, "err": err}).Warn("dialPeerWorker fail on dial peer")
423 func (sw *Switch) dialPeers(addresses []*NetAddress) {
424 connectedPeers := make(map[string]struct{})
425 for _, peer := range sw.Peers().List() {
426 connectedPeers[peer.RemoteAddrHost()] = struct{}{}
429 var wg sync.WaitGroup
430 for _, address := range addresses {
431 if sw.NodeInfo().ListenAddr == address.String() {
434 if dialling := sw.IsDialing(address); dialling {
437 if _, ok := connectedPeers[address.IP.String()]; ok {
442 go sw.dialPeerWorker(address, &wg)
447 func (sw *Switch) ensureKeepConnectPeers() {
448 keepDials := netutil.CheckAndSplitAddresses(sw.Config.P2P.KeepDial)
449 addresses := make([]*NetAddress, 0)
450 for _, keepDial := range keepDials {
451 address, err := NewNetAddressString(keepDial)
453 log.WithFields(log.Fields{"module": logModule, "err": err, "address": keepDial}).Warn("parse address to NetAddress")
456 addresses = append(addresses, address)
459 sw.dialPeers(addresses)
462 func (sw *Switch) ensureOutboundPeers() {
463 lanPeers, numOutPeers, _, numDialing := sw.NumPeers()
464 numToDial := minNumOutboundPeers - (numOutPeers + numDialing)
465 log.WithFields(log.Fields{"module": logModule, "numOutPeers": numOutPeers, "LANPeers": lanPeers, "numDialing": numDialing, "numToDial": numToDial}).Debug("ensure peers")
470 nodes := make([]*dht.Node, numToDial)
471 n := sw.discv.ReadRandomNodes(nodes)
472 addresses := make([]*NetAddress, 0)
473 for i := 0; i < n; i++ {
474 address := NewNetAddressIPPort(nodes[i].IP, nodes[i].TCP)
475 addresses = append(addresses, address)
477 sw.dialPeers(addresses)
480 func (sw *Switch) ensureOutboundPeersRoutine() {
481 sw.ensureKeepConnectPeers()
482 sw.ensureOutboundPeers()
484 ticker := time.NewTicker(10 * time.Second)
490 sw.ensureKeepConnectPeers()
491 sw.ensureOutboundPeers()
498 func (sw *Switch) startInitPeer(peer *Peer) error {
499 // spawn send/recv routines
500 if _, err := peer.Start(); err != nil {
501 log.WithFields(log.Fields{"module": logModule, "remote peer:": peer.RemoteAddr, " err:": err}).Error("init peer err")
504 for _, reactor := range sw.reactors {
505 if err := reactor.AddPeer(peer); err != nil {
512 func (sw *Switch) stopAndRemovePeer(peer *Peer, reason interface{}) {
513 sw.peers.Remove(peer)
514 for _, reactor := range sw.reactors {
515 reactor.RemovePeer(peer, reason)
519 sentStatus, receivedStatus := peer.TrafficStatus()
520 log.WithFields(log.Fields{
522 "address": peer.Addr().String(),
524 "duration": sentStatus.Duration.String(),
525 "total_sent": sentStatus.Bytes,
526 "total_received": receivedStatus.Bytes,
527 "average_sent_rate": sentStatus.AvgRate,
528 "average_received_rate": receivedStatus.AvgRate,
529 "peer num": sw.peers.Size(),
530 }).Info("disconnect with peer")