10 log "github.com/sirupsen/logrus"
11 cmn "github.com/tendermint/tmlibs/common"
13 cfg "github.com/vapor/config"
14 "github.com/vapor/consensus"
15 "github.com/vapor/crypto/sha3pool"
16 "github.com/vapor/errors"
17 "github.com/vapor/event"
18 "github.com/vapor/p2p/connection"
19 "github.com/vapor/p2p/discover/dht"
20 "github.com/vapor/p2p/discover/mdns"
21 "github.com/vapor/p2p/netutil"
22 "github.com/vapor/p2p/signlib"
23 security "github.com/vapor/p2p/security"
24 "github.com/vapor/version"
30 minNumOutboundPeers = 4
32 //magicNumber used to generate unique netID
33 magicNumber = uint64(0x054c5638)
36 //pre-define errors for connecting fail
38 ErrDuplicatePeer = errors.New("Duplicate peer")
39 ErrConnectSelf = errors.New("Connect self")
40 ErrConnectSpvPeer = errors.New("Outbound connect spv peer")
43 type discv interface {
44 ReadRandomNodes(buf []*dht.Node) (n int)
47 type lanDiscv interface {
48 Subscribe() (*event.Subscription, error)
52 type Security interface {
53 DoFilter(ip string, pubKey string) error
54 IsBanned(ip string, level byte, reason string) bool
55 RegisterFilter(filter security.Filter)
59 // Switch handles peer connections and exposes an API to receive incoming messages
60 // on `Reactors`. Each `Reactor` is responsible for handling incoming messages of one
61 // or more `Channels`. So while sending outgoing messages is typically performed on the peer,
62 // incoming messages are received on the reactor.
67 peerConfig *PeerConfig
69 reactors map[string]Reactor
70 chDescs []*connection.ChannelDescriptor
71 reactorsByCh map[byte]Reactor
74 nodeInfo *NodeInfo // our node info
75 nodePrivKey signlib.PrivKey // our node privkey
81 // NewSwitch create a new Switch and set discover.
82 func NewSwitch(config *cfg.Config) (*Switch, error) {
86 var discv *dht.Network
87 var lanDiscv *mdns.LANDiscover
89 //generate unique netID
92 data = append(data, cfg.GenesisBlock().Hash().Bytes()...)
93 magic := make([]byte, 8)
94 binary.BigEndian.PutUint64(magic, magicNumber)
95 data = append(data, magic[:]...)
96 sha3pool.Sum256(h[:], data)
97 netID := binary.BigEndian.Uint64(h[:8])
99 privateKey := config.PrivateKey()
100 if !config.VaultMode {
102 l, listenAddr = GetListener(config.P2P)
103 discv, err = dht.NewDiscover(config, *privateKey, l.ExternalAddress().Port, netID)
107 if config.P2P.LANDiscover {
108 lanDiscv = mdns.NewLANDiscover(mdns.NewProtocol(), int(l.ExternalAddress().Port))
112 return newSwitch(config, discv, lanDiscv, l, *privateKey, listenAddr, netID)
115 // newSwitch creates a new Switch with the given config.
116 func newSwitch(config *cfg.Config, discv discv, lanDiscv lanDiscv, l Listener, privKey signlib.PrivKey, listenAddr string, netID uint64) (*Switch, error) {
119 peerConfig: DefaultPeerConfig(config.P2P),
120 reactors: make(map[string]Reactor),
121 chDescs: make([]*connection.ChannelDescriptor, 0),
122 reactorsByCh: make(map[byte]Reactor),
124 dialing: cmn.NewCMap(),
125 nodePrivKey: privKey,
128 nodeInfo: NewNodeInfo(config, privKey.XPub(), listenAddr, netID),
129 security: security.NewSecurity(config),
133 sw.BaseService = *cmn.NewBaseService(nil, "P2P Switch", sw)
134 log.WithFields(log.Fields{"module": logModule, "nodeInfo": sw.nodeInfo}).Info("init p2p network")
138 // OnStart implements BaseService. It starts all the reactors, peers, and listeners.
139 func (sw *Switch) OnStart() error {
140 for _, reactor := range sw.reactors {
141 if _, err := reactor.Start(); err != nil {
146 sw.security.RegisterFilter(sw.nodeInfo)
147 sw.security.RegisterFilter(sw.peers)
148 if err := sw.security.Start(); err != nil {
152 for _, listener := range sw.listeners {
153 go sw.listenerRoutine(listener)
155 go sw.ensureOutboundPeersRoutine()
156 go sw.connectLANPeersRoutine()
161 // OnStop implements BaseService. It stops all listeners, peers, and reactors.
162 func (sw *Switch) OnStop() {
163 if sw.Config.P2P.LANDiscover {
167 for _, listener := range sw.listeners {
172 for _, peer := range sw.peers.List() {
174 sw.peers.Remove(peer)
177 for _, reactor := range sw.reactors {
182 // AddPeer performs the P2P handshake with a peer
183 // that already has a SecretConnection. If all goes well,
184 // it starts the peer and adds it to the switch.
185 // NOTE: This performs a blocking handshake before the peer is added.
186 // CONTRACT: If error is returned, peer is nil, and conn is immediately closed.
187 func (sw *Switch) AddPeer(pc *peerConn, isLAN bool) error {
188 peerNodeInfo, err := pc.HandshakeTimeout(sw.nodeInfo, sw.peerConfig.HandshakeTimeout)
193 if err := version.Status.CheckUpdate(sw.nodeInfo.Version, peerNodeInfo.Version, peerNodeInfo.RemoteAddr); err != nil {
197 if err := sw.nodeInfo.compatibleWith(peerNodeInfo, version.CompatibleWith); err != nil {
201 peer := newPeer(pc, peerNodeInfo, sw.reactorsByCh, sw.chDescs, sw.StopPeerForError, isLAN)
202 if err := sw.security.DoFilter(peer.remoteAddrHost(), peer.PubKey()); err != nil {
206 if pc.outbound && !peer.ServiceFlag().IsEnable(consensus.SFFullNode) {
207 return ErrConnectSpvPeer
212 if err := sw.startInitPeer(peer); err != nil {
217 return sw.peers.Add(peer)
220 // AddReactor adds the given reactor to the switch.
221 // NOTE: Not goroutine safe.
222 func (sw *Switch) AddReactor(name string, reactor Reactor) Reactor {
223 // Validate the reactor.
224 // No two reactors can share the same channel.
225 for _, chDesc := range reactor.GetChannels() {
227 if sw.reactorsByCh[chID] != nil {
228 cmn.PanicSanity(fmt.Sprintf("Channel %X has multiple reactors %v & %v", chID, sw.reactorsByCh[chID], reactor))
230 sw.chDescs = append(sw.chDescs, chDesc)
231 sw.reactorsByCh[chID] = reactor
233 sw.reactors[name] = reactor
234 reactor.SetSwitch(sw)
238 // AddListener adds the given listener to the switch for listening to incoming peer connections.
239 // NOTE: Not goroutine safe.
240 func (sw *Switch) AddListener(l Listener) {
241 sw.listeners = append(sw.listeners, l)
244 //DialPeerWithAddress dial node from net address
245 func (sw *Switch) DialPeerWithAddress(addr *NetAddress) error {
246 log.WithFields(log.Fields{"module": logModule, "address": addr}).Debug("Dialing peer")
247 sw.dialing.Set(addr.IP.String(), addr)
248 defer sw.dialing.Delete(addr.IP.String())
249 if err := sw.security.DoFilter(addr.IP.String(), ""); err != nil {
253 pc, err := newOutboundPeerConn(addr, sw.nodePrivKey, sw.peerConfig)
255 log.WithFields(log.Fields{"module": logModule, "address": addr, " err": err}).Error("DialPeer fail on newOutboundPeerConn")
259 if err = sw.AddPeer(pc, addr.isLAN); err != nil {
260 log.WithFields(log.Fields{"module": logModule, "address": addr, " err": err}).Error("DialPeer fail on switch AddPeer")
264 log.WithFields(log.Fields{"module": logModule, "address": addr, "peer num": sw.peers.Size()}).Debug("DialPeer added peer")
268 func (sw *Switch) IsBanned(ip string, level byte, reason string) bool {
269 return sw.security.IsBanned(ip, level, reason)
272 //IsDialing prevent duplicate dialing
273 func (sw *Switch) IsDialing(addr *NetAddress) bool {
274 return sw.dialing.Has(addr.IP.String())
277 // IsListening returns true if the switch has at least one listener.
278 // NOTE: Not goroutine safe.
279 func (sw *Switch) IsListening() bool {
280 return len(sw.listeners) > 0
283 // Listeners returns the list of listeners the switch listens on.
284 // NOTE: Not goroutine safe.
285 func (sw *Switch) Listeners() []Listener {
289 // NumPeers Returns the count of outbound/inbound and outbound-dialing peers.
290 func (sw *Switch) NumPeers() (lan, outbound, inbound, dialing int) {
291 peers := sw.peers.List()
292 for _, peer := range peers {
293 if peer.outbound && !peer.isLAN {
302 dialing = sw.dialing.Size()
306 //Peers return switch peerset
307 func (sw *Switch) Peers() *PeerSet {
311 // StopPeerForError disconnects from a peer due to external error.
312 func (sw *Switch) StopPeerForError(peer *Peer, reason interface{}) {
313 log.WithFields(log.Fields{"module": logModule, "peer": peer, " err": reason}).Debug("stopping peer for error")
314 sw.stopAndRemovePeer(peer, reason)
317 // StopPeerGracefully disconnect from a peer gracefully.
318 func (sw *Switch) StopPeerGracefully(peerID string) {
319 if peer := sw.peers.Get(peerID); peer != nil {
320 sw.stopAndRemovePeer(peer, nil)
324 func (sw *Switch) addPeerWithConnection(conn net.Conn) error {
325 peerConn, err := newInboundPeerConn(conn, sw.nodePrivKey, sw.Config.P2P)
327 if err := conn.Close(); err != nil {
328 log.WithFields(log.Fields{"module": logModule, "remote peer:": conn.RemoteAddr().String(), " err:": err}).Error("closes connection err")
333 if err = sw.AddPeer(peerConn, false); err != nil {
334 if err := conn.Close(); err != nil {
335 log.WithFields(log.Fields{"module": logModule, "remote peer:": conn.RemoteAddr().String(), " err:": err}).Error("closes connection err")
340 log.WithFields(log.Fields{"module": logModule, "address": conn.RemoteAddr().String(), "peer num": sw.peers.Size()}).Debug("add inbound peer")
344 func (sw *Switch) connectLANPeers(lanPeer mdns.LANPeerEvent) {
345 lanPeers, _, _, numDialing := sw.NumPeers()
346 numToDial := maxNumLANPeers - lanPeers
347 log.WithFields(log.Fields{"module": logModule, "numDialing": numDialing, "numToDial": numToDial}).Debug("connect LAN peers")
351 addresses := make([]*NetAddress, 0)
352 for i := 0; i < len(lanPeer.IP); i++ {
353 addresses = append(addresses, NewLANNetAddressIPPort(lanPeer.IP[i], uint16(lanPeer.Port)))
355 sw.dialPeers(addresses)
358 func (sw *Switch) connectLANPeersRoutine() {
359 if !sw.Config.P2P.LANDiscover {
363 lanPeerEventSub, err := sw.lanDiscv.Subscribe()
365 log.WithFields(log.Fields{"module": logModule, "err": err}).Warning("subscribe LAN Peer Event error")
371 case obj, ok := <-lanPeerEventSub.Chan():
373 log.WithFields(log.Fields{"module": logModule}).Warning("LAN peer event subscription channel closed")
376 LANPeer, ok := obj.Data.(mdns.LANPeerEvent)
378 log.WithFields(log.Fields{"module": logModule}).Error("event type error")
381 sw.connectLANPeers(LANPeer)
388 func (sw *Switch) listenerRoutine(l Listener) {
390 inConn, ok := <-l.Connections()
395 // disconnect if we alrady have MaxNumPeers
396 if sw.peers.Size() >= sw.Config.P2P.MaxNumPeers {
397 if err := inConn.Close(); err != nil {
398 log.WithFields(log.Fields{"module": logModule, "remote peer:": inConn.RemoteAddr().String(), " err:": err}).Error("closes connection err")
400 log.Info("Ignoring inbound connection: already have enough peers.")
404 // New inbound connection!
405 if err := sw.addPeerWithConnection(inConn); err != nil {
406 log.Info("Ignoring inbound connection: error while adding peer.", " address:", inConn.RemoteAddr().String(), " error:", err)
412 func (sw *Switch) dialPeerWorker(a *NetAddress, wg *sync.WaitGroup) {
413 if err := sw.DialPeerWithAddress(a); err != nil {
414 log.WithFields(log.Fields{"module": logModule, "addr": a, "err": err}).Error("dialPeerWorker fail on dial peer")
419 func (sw *Switch) dialPeers(addresses []*NetAddress) {
420 connectedPeers := make(map[string]struct{})
421 for _, peer := range sw.Peers().List() {
422 connectedPeers[peer.remoteAddrHost()] = struct{}{}
425 var wg sync.WaitGroup
426 for _, address := range addresses {
427 if sw.nodeInfo.ListenAddr == address.String() {
430 if dialling := sw.IsDialing(address); dialling {
433 if _, ok := connectedPeers[address.IP.String()]; ok {
438 go sw.dialPeerWorker(address, &wg)
443 func (sw *Switch) ensureKeepConnectPeers() {
444 keepDials := netutil.CheckAndSplitAddresses(sw.Config.P2P.KeepDial)
445 addresses := make([]*NetAddress, 0)
446 for _, keepDial := range keepDials {
447 address, err := NewNetAddressString(keepDial)
449 log.WithFields(log.Fields{"module": logModule, "err": err, "address": keepDial}).Warn("parse address to NetAddress")
452 addresses = append(addresses, address)
455 sw.dialPeers(addresses)
458 func (sw *Switch) ensureOutboundPeers() {
459 lanPeers, numOutPeers, _, numDialing := sw.NumPeers()
460 numToDial := minNumOutboundPeers - (numOutPeers + numDialing)
461 log.WithFields(log.Fields{"module": logModule, "numOutPeers": numOutPeers, "LANPeers": lanPeers, "numDialing": numDialing, "numToDial": numToDial}).Debug("ensure peers")
466 nodes := make([]*dht.Node, numToDial)
467 n := sw.discv.ReadRandomNodes(nodes)
468 addresses := make([]*NetAddress, 0)
469 for i := 0; i < n; i++ {
470 address := NewNetAddressIPPort(nodes[i].IP, nodes[i].TCP)
471 addresses = append(addresses, address)
473 sw.dialPeers(addresses)
476 func (sw *Switch) ensureOutboundPeersRoutine() {
477 sw.ensureKeepConnectPeers()
478 sw.ensureOutboundPeers()
480 ticker := time.NewTicker(10 * time.Second)
486 sw.ensureKeepConnectPeers()
487 sw.ensureOutboundPeers()
494 func (sw *Switch) startInitPeer(peer *Peer) error {
495 // spawn send/recv routines
496 if _, err := peer.Start(); err != nil {
497 log.WithFields(log.Fields{"module": logModule, "remote peer:": peer.RemoteAddr, " err:": err}).Error("init peer err")
500 for _, reactor := range sw.reactors {
501 if err := reactor.AddPeer(peer); err != nil {
508 func (sw *Switch) stopAndRemovePeer(peer *Peer, reason interface{}) {
509 sw.peers.Remove(peer)
510 for _, reactor := range sw.reactors {
511 reactor.RemovePeer(peer, reason)
515 sentStatus, receivedStatus := peer.TrafficStatus()
516 log.WithFields(log.Fields{
518 "address": peer.Addr().String(),
520 "duration": sentStatus.Duration.String(),
521 "total_sent": sentStatus.Bytes,
522 "total_received": receivedStatus.Bytes,
523 "average_sent_rate": sentStatus.AvgRate,
524 "average_received_rate": receivedStatus.AvgRate,
525 "peer num": sw.peers.Size(),
526 }).Info("disconnect with peer")