10 log "github.com/sirupsen/logrus"
11 cmn "github.com/tendermint/tmlibs/common"
13 cfg "github.com/vapor/config"
14 "github.com/vapor/consensus"
15 "github.com/vapor/crypto/sha3pool"
16 "github.com/vapor/errors"
17 "github.com/vapor/event"
18 "github.com/vapor/p2p/connection"
19 "github.com/vapor/p2p/discover/dht"
20 "github.com/vapor/p2p/discover/mdns"
21 "github.com/vapor/p2p/netutil"
22 security "github.com/vapor/p2p/security"
23 "github.com/vapor/p2p/signlib"
24 "github.com/vapor/version"
30 minNumOutboundPeers = 3
32 //magicNumber used to generate unique netID
33 magicNumber = uint64(0x054c5638)
36 //pre-define errors for connecting fail
38 ErrDuplicatePeer = errors.New("Duplicate peer")
39 ErrConnectSelf = errors.New("Connect self")
40 ErrConnectSpvPeer = errors.New("Outbound connect spv peer")
43 type discv interface {
44 ReadRandomNodes(buf []*dht.Node) (n int)
47 type lanDiscv interface {
48 Subscribe() (*event.Subscription, error)
52 type Security interface {
53 DoFilter(ip string, pubKey string) error
54 IsBanned(ip string, level byte, reason string) bool
55 RegisterFilter(filter security.Filter)
59 // Switch handles peer connections and exposes an API to receive incoming messages
60 // on `Reactors`. Each `Reactor` is responsible for handling incoming messages of one
61 // or more `Channels`. So while sending outgoing messages is typically performed on the peer,
62 // incoming messages are received on the reactor.
67 peerConfig *PeerConfig
69 reactors map[string]Reactor
70 chDescs []*connection.ChannelDescriptor
71 reactorsByCh map[byte]Reactor
74 nodeInfo *NodeInfo // our node info
75 nodePrivKey signlib.PrivKey // our node privkey
81 // NewSwitchMaybeDiscover create a new Switch and set discover.
82 func NewSwitchMaybeDiscover(config *cfg.Config) (*Switch, error) {
86 var discv *dht.Network
87 var lanDiscv *mdns.LANDiscover
89 //generate unique netID
92 data = append(data, cfg.GenesisBlock().Hash().Bytes()...)
93 magic := make([]byte, 8)
94 binary.BigEndian.PutUint64(magic, magicNumber)
95 data = append(data, magic[:]...)
96 sha3pool.Sum256(h[:], data)
97 netID := binary.BigEndian.Uint64(h[:8])
99 privateKey := config.PrivateKey()
100 if !config.VaultMode {
102 l, listenAddr = GetListener(config.P2P)
103 discv, err = dht.NewDiscover(config, *privateKey, l.ExternalAddress().Port, netID)
107 if config.P2P.LANDiscover {
108 lanDiscv = mdns.NewLANDiscover(mdns.NewProtocol(config.ChainID), int(l.ExternalAddress().Port))
112 return NewSwitch(config, discv, lanDiscv, l, *privateKey, listenAddr, netID)
115 // newSwitch creates a new Switch with the given config.
116 func NewSwitch(config *cfg.Config, discv discv, lanDiscv lanDiscv, l Listener, privKey signlib.PrivKey, listenAddr string, netID uint64) (*Switch, error) {
119 peerConfig: DefaultPeerConfig(config.P2P),
120 reactors: make(map[string]Reactor),
121 chDescs: make([]*connection.ChannelDescriptor, 0),
122 reactorsByCh: make(map[byte]Reactor),
124 dialing: cmn.NewCMap(),
125 nodePrivKey: privKey,
128 nodeInfo: NewNodeInfo(config, privKey.XPub(), listenAddr, netID),
129 security: security.NewSecurity(config),
133 sw.BaseService = *cmn.NewBaseService(nil, "P2P Switch", sw)
134 log.WithFields(log.Fields{"module": logModule, "nodeInfo": sw.nodeInfo}).Info("init p2p network")
138 func (sw *Switch) GetDiscv() discv {
142 func (sw *Switch) GetNodeInfo() *NodeInfo {
146 func (sw *Switch) GetPeers() *PeerSet {
150 func (sw *Switch) GetReactors() map[string]Reactor {
154 func (sw *Switch) GetSecurity() Security {
158 // OnStart implements BaseService. It starts all the reactors, peers, and listeners.
159 func (sw *Switch) OnStart() error {
160 for _, reactor := range sw.reactors {
161 if _, err := reactor.Start(); err != nil {
166 sw.security.RegisterFilter(sw.nodeInfo)
167 sw.security.RegisterFilter(sw.peers)
168 if err := sw.security.Start(); err != nil {
172 for _, listener := range sw.listeners {
173 go sw.listenerRoutine(listener)
175 go sw.ensureOutboundPeersRoutine()
176 go sw.connectLANPeersRoutine()
181 // OnStop implements BaseService. It stops all listeners, peers, and reactors.
182 func (sw *Switch) OnStop() {
183 if sw.Config.P2P.LANDiscover {
187 for _, listener := range sw.listeners {
192 for _, peer := range sw.peers.List() {
194 sw.peers.Remove(peer)
197 for _, reactor := range sw.reactors {
202 // AddPeer performs the P2P handshake with a peer
203 // that already has a SecretConnection. If all goes well,
204 // it starts the peer and adds it to the switch.
205 // NOTE: This performs a blocking handshake before the peer is added.
206 // CONTRACT: If error is returned, peer is nil, and conn is immediately closed.
207 func (sw *Switch) AddPeer(pc *peerConn, isLAN bool) error {
208 peerNodeInfo, err := pc.HandshakeTimeout(sw.nodeInfo, sw.peerConfig.HandshakeTimeout)
213 if err := version.Status.CheckUpdate(sw.nodeInfo.Version, peerNodeInfo.Version, peerNodeInfo.RemoteAddr); err != nil {
217 if err := sw.nodeInfo.compatibleWith(peerNodeInfo, version.CompatibleWith); err != nil {
221 peer := newPeer(pc, peerNodeInfo, sw.reactorsByCh, sw.chDescs, sw.StopPeerForError, isLAN)
222 if err := sw.security.DoFilter(peer.RemoteAddrHost(), peer.PubKey()); err != nil {
226 if pc.outbound && !peer.ServiceFlag().IsEnable(consensus.SFFullNode) {
227 return ErrConnectSpvPeer
232 if err := sw.startInitPeer(peer); err != nil {
237 return sw.peers.Add(peer)
240 // AddReactor adds the given reactor to the switch.
241 // NOTE: Not goroutine safe.
242 func (sw *Switch) AddReactor(name string, reactor Reactor) Reactor {
243 // Validate the reactor.
244 // No two reactors can share the same channel.
245 for _, chDesc := range reactor.GetChannels() {
247 if sw.reactorsByCh[chID] != nil {
248 cmn.PanicSanity(fmt.Sprintf("Channel %X has multiple reactors %v & %v", chID, sw.reactorsByCh[chID], reactor))
250 sw.chDescs = append(sw.chDescs, chDesc)
251 sw.reactorsByCh[chID] = reactor
253 sw.reactors[name] = reactor
254 reactor.SetSwitch(sw)
258 // AddListener adds the given listener to the switch for listening to incoming peer connections.
259 // NOTE: Not goroutine safe.
260 func (sw *Switch) AddListener(l Listener) {
261 sw.listeners = append(sw.listeners, l)
264 //DialPeerWithAddress dial node from net address
265 func (sw *Switch) DialPeerWithAddress(addr *NetAddress) error {
266 log.WithFields(log.Fields{"module": logModule, "address": addr}).Debug("Dialing peer")
267 sw.dialing.Set(addr.IP.String(), addr)
268 defer sw.dialing.Delete(addr.IP.String())
269 if err := sw.security.DoFilter(addr.IP.String(), ""); err != nil {
273 pc, err := newOutboundPeerConn(addr, sw.nodePrivKey, sw.peerConfig)
275 log.WithFields(log.Fields{"module": logModule, "address": addr, " err": err}).Warn("DialPeer fail on newOutboundPeerConn")
279 if err = sw.AddPeer(pc, addr.isLAN); err != nil {
280 log.WithFields(log.Fields{"module": logModule, "address": addr, " err": err}).Error("DialPeer fail on switch AddPeer")
284 log.WithFields(log.Fields{"module": logModule, "address": addr, "peer num": sw.peers.Size()}).Debug("DialPeer added peer")
288 func (sw *Switch) IsBanned(ip string, level byte, reason string) bool {
289 return sw.security.IsBanned(ip, level, reason)
292 //IsDialing prevent duplicate dialing
293 func (sw *Switch) IsDialing(addr *NetAddress) bool {
294 return sw.dialing.Has(addr.IP.String())
297 // IsListening returns true if the switch has at least one listener.
298 // NOTE: Not goroutine safe.
299 func (sw *Switch) IsListening() bool {
300 return len(sw.listeners) > 0
303 // Listeners returns the list of listeners the switch listens on.
304 // NOTE: Not goroutine safe.
305 func (sw *Switch) Listeners() []Listener {
309 // NumPeers Returns the count of outbound/inbound and outbound-dialing peers.
310 func (sw *Switch) NumPeers() (lan, outbound, inbound, dialing int) {
311 peers := sw.peers.List()
312 for _, peer := range peers {
313 if peer.outbound && !peer.isLAN {
322 dialing = sw.dialing.Size()
326 //Peers return switch peerset
327 func (sw *Switch) Peers() *PeerSet {
331 // StopPeerForError disconnects from a peer due to external error.
332 func (sw *Switch) StopPeerForError(peer *Peer, reason interface{}) {
333 log.WithFields(log.Fields{"module": logModule, "peer": peer, " err": reason}).Debug("stopping peer for error")
334 sw.stopAndRemovePeer(peer, reason)
337 // StopPeerGracefully disconnect from a peer gracefully.
338 func (sw *Switch) StopPeerGracefully(peerID string) {
339 if peer := sw.peers.Get(peerID); peer != nil {
340 sw.stopAndRemovePeer(peer, nil)
344 func (sw *Switch) addPeerWithConnection(conn net.Conn) error {
345 peerConn, err := newInboundPeerConn(conn, sw.nodePrivKey, sw.Config.P2P)
347 if err := conn.Close(); err != nil {
348 log.WithFields(log.Fields{"module": logModule, "remote peer:": conn.RemoteAddr().String(), " err:": err}).Error("closes connection err")
353 if err = sw.AddPeer(peerConn, false); err != nil {
354 if err := conn.Close(); err != nil {
355 log.WithFields(log.Fields{"module": logModule, "remote peer:": conn.RemoteAddr().String(), " err:": err}).Error("closes connection err")
360 log.WithFields(log.Fields{"module": logModule, "address": conn.RemoteAddr().String(), "peer num": sw.peers.Size()}).Debug("add inbound peer")
364 func (sw *Switch) connectLANPeers(lanPeer mdns.LANPeerEvent) {
365 lanPeers, _, _, numDialing := sw.NumPeers()
366 numToDial := maxNumLANPeers - lanPeers
367 log.WithFields(log.Fields{"module": logModule, "numDialing": numDialing, "numToDial": numToDial}).Debug("connect LAN peers")
371 addresses := make([]*NetAddress, 0)
372 for i := 0; i < len(lanPeer.IP); i++ {
373 addresses = append(addresses, NewLANNetAddressIPPort(lanPeer.IP[i], uint16(lanPeer.Port)))
375 sw.DialPeers(addresses)
378 func (sw *Switch) connectLANPeersRoutine() {
379 if !sw.Config.P2P.LANDiscover {
383 lanPeerEventSub, err := sw.lanDiscv.Subscribe()
385 log.WithFields(log.Fields{"module": logModule, "err": err}).Warning("subscribe LAN Peer Event error")
391 case obj, ok := <-lanPeerEventSub.Chan():
393 log.WithFields(log.Fields{"module": logModule}).Warning("LAN peer event subscription channel closed")
396 LANPeer, ok := obj.Data.(mdns.LANPeerEvent)
398 log.WithFields(log.Fields{"module": logModule}).Error("event type error")
401 sw.connectLANPeers(LANPeer)
408 func (sw *Switch) listenerRoutine(l Listener) {
410 inConn, ok := <-l.Connections()
415 // disconnect if we already have MaxNumPeers
416 if sw.peers.Size() >= sw.Config.P2P.MaxNumPeers {
417 if err := inConn.Close(); err != nil {
418 log.WithFields(log.Fields{"module": logModule, "remote peer:": inConn.RemoteAddr().String(), " err:": err}).Error("closes connection err")
420 log.Info("Ignoring inbound connection: already have enough peers.")
424 // New inbound connection!
425 if err := sw.addPeerWithConnection(inConn); err != nil {
426 log.Info("Ignoring inbound connection: error while adding peer.", " address:", inConn.RemoteAddr().String(), " error:", err)
432 func (sw *Switch) dialPeerWorker(a *NetAddress, wg *sync.WaitGroup) {
433 if err := sw.DialPeerWithAddress(a); err != nil {
434 log.WithFields(log.Fields{"module": logModule, "addr": a, "err": err}).Warn("dialPeerWorker fail on dial peer")
439 func (sw *Switch) DialPeers(addresses []*NetAddress) {
440 connectedPeers := make(map[string]struct{})
441 for _, peer := range sw.Peers().List() {
442 connectedPeers[peer.RemoteAddrHost()] = struct{}{}
445 var wg sync.WaitGroup
446 for _, address := range addresses {
447 if sw.nodeInfo.ListenAddr == address.String() {
450 if dialling := sw.IsDialing(address); dialling {
453 if _, ok := connectedPeers[address.IP.String()]; ok {
458 go sw.dialPeerWorker(address, &wg)
463 func (sw *Switch) ensureKeepConnectPeers() {
464 keepDials := netutil.CheckAndSplitAddresses(sw.Config.P2P.KeepDial)
465 addresses := make([]*NetAddress, 0)
466 for _, keepDial := range keepDials {
467 address, err := NewNetAddressString(keepDial)
469 log.WithFields(log.Fields{"module": logModule, "err": err, "address": keepDial}).Warn("parse address to NetAddress")
472 addresses = append(addresses, address)
475 sw.DialPeers(addresses)
478 func (sw *Switch) ensureOutboundPeers() {
479 lanPeers, numOutPeers, _, numDialing := sw.NumPeers()
480 numToDial := minNumOutboundPeers - (numOutPeers + numDialing)
481 log.WithFields(log.Fields{"module": logModule, "numOutPeers": numOutPeers, "LANPeers": lanPeers, "numDialing": numDialing, "numToDial": numToDial}).Debug("ensure peers")
486 nodes := make([]*dht.Node, numToDial)
487 n := sw.discv.ReadRandomNodes(nodes)
488 addresses := make([]*NetAddress, 0)
489 for i := 0; i < n; i++ {
490 address := NewNetAddressIPPort(nodes[i].IP, nodes[i].TCP)
491 addresses = append(addresses, address)
493 sw.DialPeers(addresses)
496 func (sw *Switch) ensureOutboundPeersRoutine() {
497 sw.ensureKeepConnectPeers()
498 sw.ensureOutboundPeers()
500 ticker := time.NewTicker(10 * time.Second)
506 sw.ensureKeepConnectPeers()
507 sw.ensureOutboundPeers()
514 func (sw *Switch) startInitPeer(peer *Peer) error {
515 // spawn send/recv routines
516 if _, err := peer.Start(); err != nil {
517 log.WithFields(log.Fields{"module": logModule, "remote peer:": peer.RemoteAddr, " err:": err}).Error("init peer err")
520 for _, reactor := range sw.reactors {
521 if err := reactor.AddPeer(peer); err != nil {
528 func (sw *Switch) stopAndRemovePeer(peer *Peer, reason interface{}) {
529 sw.peers.Remove(peer)
530 for _, reactor := range sw.reactors {
531 reactor.RemovePeer(peer, reason)
535 sentStatus, receivedStatus := peer.TrafficStatus()
536 log.WithFields(log.Fields{
538 "address": peer.Addr().String(),
540 "duration": sentStatus.Duration.String(),
541 "total_sent": sentStatus.Bytes,
542 "total_received": receivedStatus.Bytes,
543 "average_sent_rate": sentStatus.AvgRate,
544 "average_received_rate": receivedStatus.AvgRate,
545 "peer num": sw.peers.Size(),
546 }).Info("disconnect with peer")