12 log "github.com/sirupsen/logrus"
13 crypto "github.com/tendermint/go-crypto"
14 cmn "github.com/tendermint/tmlibs/common"
16 cfg "github.com/vapor/config"
17 "github.com/vapor/consensus"
18 "github.com/vapor/crypto/ed25519"
19 "github.com/vapor/crypto/sha3pool"
20 dbm "github.com/vapor/database/leveldb"
21 "github.com/vapor/errors"
22 "github.com/vapor/event"
23 "github.com/vapor/p2p/connection"
24 "github.com/vapor/p2p/discover/dht"
25 "github.com/vapor/p2p/discover/mdns"
26 "github.com/vapor/p2p/netutil"
27 "github.com/vapor/p2p/trust"
28 "github.com/vapor/version"
32 bannedPeerKey = "BannedPeer"
33 defaultBanDuration = time.Hour * 1
36 minNumOutboundPeers = 4
38 //magicNumber used to generate unique netID
39 magicNumber = uint64(0x054c5638)
42 //pre-define errors for connecting fail
44 ErrDuplicatePeer = errors.New("Duplicate peer")
45 ErrConnectSelf = errors.New("Connect self")
46 ErrConnectBannedPeer = errors.New("Connect banned peer")
47 ErrConnectSpvPeer = errors.New("Outbound connect spv peer")
50 type discv interface {
51 ReadRandomNodes(buf []*dht.Node) (n int)
54 type lanDiscv interface {
55 Subscribe() (*event.Subscription, error)
59 // Switch handles peer connections and exposes an API to receive incoming messages
60 // on `Reactors`. Each `Reactor` is responsible for handling incoming messages of one
61 // or more `Channels`. So while sending outgoing messages is typically performed on the peer,
62 // incoming messages are received on the reactor.
67 peerConfig *PeerConfig
69 reactors map[string]Reactor
70 chDescs []*connection.ChannelDescriptor
71 reactorsByCh map[byte]Reactor
74 nodeInfo *NodeInfo // our node info
75 nodePrivKey crypto.PrivKeyEd25519 // our node privkey
78 bannedPeer map[string]time.Time
83 // NewSwitch create a new Switch and set discover.
84 func NewSwitch(config *cfg.Config) (*Switch, error) {
88 var discv *dht.Network
89 var lanDiscv *mdns.LANDiscover
91 //generate unique netID
94 data = append(data, cfg.GenesisBlock().Hash().Bytes()...)
95 magic := make([]byte, 8)
96 binary.BigEndian.PutUint64(magic, magicNumber)
97 data = append(data, magic[:]...)
98 sha3pool.Sum256(h[:], data)
99 netID := binary.BigEndian.Uint64(h[:8])
101 blacklistDB := dbm.NewDB("trusthistory", config.DBBackend, config.DBDir())
103 _, yyy, _ := ed25519.GenerateKey(nil)
106 bytes, err := hex.DecodeString(zzz)
111 copy(newKey[:], bytes)
112 privKey := crypto.PrivKeyEd25519(newKey)
113 if !config.VaultMode {
115 l, listenAddr = GetListener(config.P2P)
116 discv, err = dht.NewDiscover(config, ed25519.PrivateKey(bytes), l.ExternalAddress().Port, netID)
120 if config.P2P.LANDiscover {
121 lanDiscv = mdns.NewLANDiscover(mdns.NewProtocol(), int(l.ExternalAddress().Port))
125 return newSwitch(config, discv, lanDiscv, blacklistDB, l, privKey, listenAddr, netID)
128 // newSwitch creates a new Switch with the given config.
129 func newSwitch(config *cfg.Config, discv discv, lanDiscv lanDiscv, blacklistDB dbm.DB, l Listener, priv crypto.PrivKeyEd25519, listenAddr string, netID uint64) (*Switch, error) {
132 peerConfig: DefaultPeerConfig(config.P2P),
133 reactors: make(map[string]Reactor),
134 chDescs: make([]*connection.ChannelDescriptor, 0),
135 reactorsByCh: make(map[byte]Reactor),
137 dialing: cmn.NewCMap(),
142 nodeInfo: NewNodeInfo(config, priv.PubKey().Unwrap().(crypto.PubKeyEd25519), listenAddr, netID),
143 bannedPeer: make(map[string]time.Time),
145 if err := sw.loadBannedPeers(); err != nil {
150 sw.BaseService = *cmn.NewBaseService(nil, "P2P Switch", sw)
152 log.WithFields(log.Fields{"module": logModule, "nodeInfo": sw.nodeInfo}).Info("init p2p network")
156 // OnStart implements BaseService. It starts all the reactors, peers, and listeners.
157 func (sw *Switch) OnStart() error {
158 for _, reactor := range sw.reactors {
159 if _, err := reactor.Start(); err != nil {
163 for _, listener := range sw.listeners {
164 go sw.listenerRoutine(listener)
166 go sw.ensureOutboundPeersRoutine()
167 go sw.connectLANPeersRoutine()
172 // OnStop implements BaseService. It stops all listeners, peers, and reactors.
173 func (sw *Switch) OnStop() {
174 if sw.Config.P2P.LANDiscover {
178 for _, listener := range sw.listeners {
183 for _, peer := range sw.peers.List() {
185 sw.peers.Remove(peer)
188 for _, reactor := range sw.reactors {
193 //AddBannedPeer add peer to blacklist
194 func (sw *Switch) AddBannedPeer(ip string) error {
196 defer sw.mtx.Unlock()
198 sw.bannedPeer[ip] = time.Now().Add(defaultBanDuration)
199 dataJSON, err := json.Marshal(sw.bannedPeer)
204 sw.db.Set([]byte(bannedPeerKey), dataJSON)
208 // AddPeer performs the P2P handshake with a peer
209 // that already has a SecretConnection. If all goes well,
210 // it starts the peer and adds it to the switch.
211 // NOTE: This performs a blocking handshake before the peer is added.
212 // CONTRACT: If error is returned, peer is nil, and conn is immediately closed.
213 func (sw *Switch) AddPeer(pc *peerConn, isLAN bool) error {
214 peerNodeInfo, err := pc.HandshakeTimeout(sw.nodeInfo, sw.peerConfig.HandshakeTimeout)
219 if err := version.Status.CheckUpdate(sw.nodeInfo.Version, peerNodeInfo.Version, peerNodeInfo.RemoteAddr); err != nil {
223 if err := sw.nodeInfo.compatibleWith(peerNodeInfo, version.CompatibleWith); err != nil {
227 peer := newPeer(pc, peerNodeInfo, sw.reactorsByCh, sw.chDescs, sw.StopPeerForError, isLAN)
228 if err := sw.filterConnByPeer(peer); err != nil {
232 if pc.outbound && !peer.ServiceFlag().IsEnable(consensus.SFFullNode) {
233 return ErrConnectSpvPeer
238 if err := sw.startInitPeer(peer); err != nil {
243 return sw.peers.Add(peer)
246 // AddReactor adds the given reactor to the switch.
247 // NOTE: Not goroutine safe.
248 func (sw *Switch) AddReactor(name string, reactor Reactor) Reactor {
249 // Validate the reactor.
250 // No two reactors can share the same channel.
251 for _, chDesc := range reactor.GetChannels() {
253 if sw.reactorsByCh[chID] != nil {
254 cmn.PanicSanity(fmt.Sprintf("Channel %X has multiple reactors %v & %v", chID, sw.reactorsByCh[chID], reactor))
256 sw.chDescs = append(sw.chDescs, chDesc)
257 sw.reactorsByCh[chID] = reactor
259 sw.reactors[name] = reactor
260 reactor.SetSwitch(sw)
264 // AddListener adds the given listener to the switch for listening to incoming peer connections.
265 // NOTE: Not goroutine safe.
266 func (sw *Switch) AddListener(l Listener) {
267 sw.listeners = append(sw.listeners, l)
270 //DialPeerWithAddress dial node from net address
271 func (sw *Switch) DialPeerWithAddress(addr *NetAddress) error {
272 log.WithFields(log.Fields{"module": logModule, "address": addr}).Debug("Dialing peer")
273 sw.dialing.Set(addr.IP.String(), addr)
274 defer sw.dialing.Delete(addr.IP.String())
275 if err := sw.filterConnByIP(addr.IP.String()); err != nil {
279 pc, err := newOutboundPeerConn(addr, sw.nodePrivKey, sw.peerConfig)
281 log.WithFields(log.Fields{"module": logModule, "address": addr, " err": err}).Error("DialPeer fail on newOutboundPeerConn")
285 if err = sw.AddPeer(pc, addr.isLAN); err != nil {
286 log.WithFields(log.Fields{"module": logModule, "address": addr, " err": err}).Error("DialPeer fail on switch AddPeer")
290 log.WithFields(log.Fields{"module": logModule, "address": addr, "peer num": sw.peers.Size()}).Debug("DialPeer added peer")
294 func (sw *Switch) ID() [32]byte {
295 return sw.nodeInfo.PubKey
298 //IsDialing prevent duplicate dialing
299 func (sw *Switch) IsDialing(addr *NetAddress) bool {
300 return sw.dialing.Has(addr.IP.String())
303 // IsListening returns true if the switch has at least one listener.
304 // NOTE: Not goroutine safe.
305 func (sw *Switch) IsListening() bool {
306 return len(sw.listeners) > 0
309 // loadBannedPeers load banned peers from db
310 func (sw *Switch) loadBannedPeers() error {
311 if dataJSON := sw.db.Get([]byte(bannedPeerKey)); dataJSON != nil {
312 if err := json.Unmarshal(dataJSON, &sw.bannedPeer); err != nil {
320 // Listeners returns the list of listeners the switch listens on.
321 // NOTE: Not goroutine safe.
322 func (sw *Switch) Listeners() []Listener {
326 // NumPeers Returns the count of outbound/inbound and outbound-dialing peers.
327 func (sw *Switch) NumPeers() (lan, outbound, inbound, dialing int) {
328 peers := sw.peers.List()
329 for _, peer := range peers {
330 if peer.outbound && !peer.isLAN {
339 dialing = sw.dialing.Size()
343 //Peers return switch peerset
344 func (sw *Switch) Peers() *PeerSet {
348 // StopPeerForError disconnects from a peer due to external error.
349 func (sw *Switch) StopPeerForError(peer *Peer, reason interface{}) {
350 log.WithFields(log.Fields{"module": logModule, "peer": peer, " err": reason}).Debug("stopping peer for error")
351 sw.stopAndRemovePeer(peer, reason)
354 // StopPeerGracefully disconnect from a peer gracefully.
355 func (sw *Switch) StopPeerGracefully(peerID string) {
356 if peer := sw.peers.Get(peerID); peer != nil {
357 sw.stopAndRemovePeer(peer, nil)
361 func (sw *Switch) addPeerWithConnection(conn net.Conn) error {
362 peerConn, err := newInboundPeerConn(conn, sw.nodePrivKey, sw.Config.P2P)
364 if err := conn.Close(); err != nil {
365 log.WithFields(log.Fields{"module": logModule, "remote peer:": conn.RemoteAddr().String(), " err:": err}).Error("closes connection err")
370 if err = sw.AddPeer(peerConn, false); err != nil {
371 if err := conn.Close(); err != nil {
372 log.WithFields(log.Fields{"module": logModule, "remote peer:": conn.RemoteAddr().String(), " err:": err}).Error("closes connection err")
377 log.WithFields(log.Fields{"module": logModule, "address": conn.RemoteAddr().String(), "peer num": sw.peers.Size()}).Debug("add inbound peer")
381 func (sw *Switch) checkBannedPeer(peer string) error {
383 defer sw.mtx.Unlock()
385 if banEnd, ok := sw.bannedPeer[peer]; ok {
386 if time.Now().Before(banEnd) {
387 return ErrConnectBannedPeer
390 if err := sw.delBannedPeer(peer); err != nil {
397 func (sw *Switch) connectLANPeers(lanPeer mdns.LANPeerEvent) {
398 lanPeers, _, _, numDialing := sw.NumPeers()
399 numToDial := maxNumLANPeers - lanPeers
400 log.WithFields(log.Fields{"module": logModule, "numDialing": numDialing, "numToDial": numToDial}).Debug("connect LAN peers")
404 addresses := make([]*NetAddress, 0)
405 for i := 0; i < len(lanPeer.IP); i++ {
406 addresses = append(addresses, NewLANNetAddressIPPort(lanPeer.IP[i], uint16(lanPeer.Port)))
408 sw.dialPeers(addresses)
411 func (sw *Switch) connectLANPeersRoutine() {
412 if !sw.Config.P2P.LANDiscover {
416 lanPeerEventSub, err := sw.lanDiscv.Subscribe()
418 log.WithFields(log.Fields{"module": logModule, "err": err}).Warning("subscribe LAN Peer Event error")
424 case obj, ok := <-lanPeerEventSub.Chan():
426 log.WithFields(log.Fields{"module": logModule}).Warning("LAN peer event subscription channel closed")
429 LANPeer, ok := obj.Data.(mdns.LANPeerEvent)
431 log.WithFields(log.Fields{"module": logModule}).Error("event type error")
434 sw.connectLANPeers(LANPeer)
441 func (sw *Switch) delBannedPeer(addr string) error {
443 defer sw.mtx.Unlock()
445 delete(sw.bannedPeer, addr)
446 datajson, err := json.Marshal(sw.bannedPeer)
451 sw.db.Set([]byte(bannedPeerKey), datajson)
455 func (sw *Switch) filterConnByIP(ip string) error {
456 if ip == sw.nodeInfo.listenHost() {
457 return ErrConnectSelf
459 return sw.checkBannedPeer(ip)
462 func (sw *Switch) filterConnByPeer(peer *Peer) error {
463 if err := sw.checkBannedPeer(peer.remoteAddrHost()); err != nil {
467 if sw.nodeInfo.PubKey.Equals(peer.PubKey().Wrap()) {
468 return ErrConnectSelf
471 if sw.peers.Has(peer.Key) {
472 return ErrDuplicatePeer
477 func (sw *Switch) listenerRoutine(l Listener) {
479 inConn, ok := <-l.Connections()
484 // disconnect if we alrady have MaxNumPeers
485 if sw.peers.Size() >= sw.Config.P2P.MaxNumPeers {
486 if err := inConn.Close(); err != nil {
487 log.WithFields(log.Fields{"module": logModule, "remote peer:": inConn.RemoteAddr().String(), " err:": err}).Error("closes connection err")
489 log.Info("Ignoring inbound connection: already have enough peers.")
493 // New inbound connection!
494 if err := sw.addPeerWithConnection(inConn); err != nil {
495 log.Info("Ignoring inbound connection: error while adding peer.", " address:", inConn.RemoteAddr().String(), " error:", err)
501 func (sw *Switch) dialPeerWorker(a *NetAddress, wg *sync.WaitGroup) {
502 if err := sw.DialPeerWithAddress(a); err != nil {
503 log.WithFields(log.Fields{"module": logModule, "addr": a, "err": err}).Error("dialPeerWorker fail on dial peer")
508 func (sw *Switch) dialPeers(addresses []*NetAddress) {
509 connectedPeers := make(map[string]struct{})
510 for _, peer := range sw.Peers().List() {
511 connectedPeers[peer.remoteAddrHost()] = struct{}{}
514 var wg sync.WaitGroup
515 for _, address := range addresses {
516 if sw.nodeInfo.ListenAddr == address.String() {
519 if dialling := sw.IsDialing(address); dialling {
522 if _, ok := connectedPeers[address.IP.String()]; ok {
527 go sw.dialPeerWorker(address, &wg)
532 func (sw *Switch) ensureKeepConnectPeers() {
533 keepDials := netutil.CheckAndSplitAddresses(sw.Config.P2P.KeepDial)
534 addresses := make([]*NetAddress, 0)
535 for _, keepDial := range keepDials {
536 address, err := NewNetAddressString(keepDial)
538 log.WithFields(log.Fields{"module": logModule, "err": err, "address": keepDial}).Warn("parse address to NetAddress")
541 addresses = append(addresses, address)
544 sw.dialPeers(addresses)
547 func (sw *Switch) ensureOutboundPeers() {
548 lanPeers, numOutPeers, _, numDialing := sw.NumPeers()
549 numToDial := minNumOutboundPeers - (numOutPeers + numDialing)
550 log.WithFields(log.Fields{"module": logModule, "numOutPeers": numOutPeers, "LANPeers": lanPeers, "numDialing": numDialing, "numToDial": numToDial}).Debug("ensure peers")
555 nodes := make([]*dht.Node, numToDial)
556 n := sw.discv.ReadRandomNodes(nodes)
557 addresses := make([]*NetAddress, 0)
558 for i := 0; i < n; i++ {
559 address := NewNetAddressIPPort(nodes[i].IP, nodes[i].TCP)
560 addresses = append(addresses, address)
562 sw.dialPeers(addresses)
565 func (sw *Switch) ensureOutboundPeersRoutine() {
566 sw.ensureKeepConnectPeers()
567 sw.ensureOutboundPeers()
569 ticker := time.NewTicker(10 * time.Second)
575 sw.ensureKeepConnectPeers()
576 sw.ensureOutboundPeers()
583 func (sw *Switch) startInitPeer(peer *Peer) error {
584 // spawn send/recv routines
585 if _, err := peer.Start(); err != nil {
586 log.WithFields(log.Fields{"module": logModule, "remote peer:": peer.RemoteAddr, " err:": err}).Error("init peer err")
589 for _, reactor := range sw.reactors {
590 if err := reactor.AddPeer(peer); err != nil {
597 func (sw *Switch) stopAndRemovePeer(peer *Peer, reason interface{}) {
598 sw.peers.Remove(peer)
599 for _, reactor := range sw.reactors {
600 reactor.RemovePeer(peer, reason)
604 sentStatus, receivedStatus := peer.TrafficStatus()
605 log.WithFields(log.Fields{
607 "address": peer.Addr().String(),
609 "duration": sentStatus.Duration.String(),
610 "total_sent": sentStatus.Bytes,
611 "total_received": receivedStatus.Bytes,
612 "average_sent_rate": sentStatus.AvgRate,
613 "average_received_rate": receivedStatus.AvgRate,
614 "peer num": sw.peers.Size(),
615 }).Info("disconnect with peer")