OSDN Git Service

add cpu log
[bytom/bytom.git] / p2p / switch.go
index 5cbf59c..4e747a4 100644 (file)
@@ -13,14 +13,18 @@ import (
        dbm "github.com/tendermint/tmlibs/db"
 
        cfg "github.com/bytom/config"
+       "github.com/bytom/consensus"
        "github.com/bytom/errors"
        "github.com/bytom/p2p/connection"
+       "github.com/bytom/p2p/discover"
        "github.com/bytom/p2p/trust"
+       "github.com/bytom/version"
 )
 
 const (
-       bannedPeerKey      = "BannedPeer"
-       defaultBanDuration = time.Hour * 1
+       bannedPeerKey       = "BannedPeer"
+       defaultBanDuration  = time.Hour * 1
+       minNumOutboundPeers = 3
 )
 
 //pre-define errors for connecting fail
@@ -28,6 +32,7 @@ var (
        ErrDuplicatePeer     = errors.New("Duplicate peer")
        ErrConnectSelf       = errors.New("Connect self")
        ErrConnectBannedPeer = errors.New("Connect banned peer")
+       ErrConnectSpvPeer    = errors.New("Outbound connect spv peer")
 )
 
 // Switch handles peer connections and exposes an API to receive incoming messages
@@ -47,6 +52,7 @@ type Switch struct {
        dialing      *cmn.CMap
        nodeInfo     *NodeInfo             // our node info
        nodePrivKey  crypto.PrivKeyEd25519 // our node privkey
+       discv        *discover.Network
        bannedPeer   map[string]time.Time
        db           dbm.DB
        mtx          sync.Mutex
@@ -86,6 +92,7 @@ func (sw *Switch) OnStart() error {
        for _, listener := range sw.listeners {
                go sw.listenerRoutine(listener)
        }
+       go sw.ensureOutboundPeersRoutine()
        return nil
 }
 
@@ -127,11 +134,14 @@ func (sw *Switch) AddBannedPeer(ip string) error {
 // NOTE: This performs a blocking handshake before the peer is added.
 // CONTRACT: If error is returned, peer is nil, and conn is immediately closed.
 func (sw *Switch) AddPeer(pc *peerConn) error {
-       peerNodeInfo, err := pc.HandshakeTimeout(sw.nodeInfo, time.Duration(sw.peerConfig.HandshakeTimeout*time.Second))
+       peerNodeInfo, err := pc.HandshakeTimeout(sw.nodeInfo, time.Duration(sw.peerConfig.HandshakeTimeout))
        if err != nil {
                return err
        }
 
+       if err := version.Status.CheckUpdate(sw.nodeInfo.Version, peerNodeInfo.Version, peerNodeInfo.RemoteAddr); err != nil {
+               return err
+       }
        if err := sw.nodeInfo.CompatibleWith(peerNodeInfo); err != nil {
                return err
        }
@@ -141,6 +151,10 @@ func (sw *Switch) AddPeer(pc *peerConn) error {
                return err
        }
 
+       if pc.outbound && !peer.ServiceFlag().IsEnable(consensus.SFFullNode) {
+               return ErrConnectSpvPeer
+       }
+
        // Start peer
        if sw.IsRunning() {
                if err := sw.startInitPeer(peer); err != nil {
@@ -338,10 +352,8 @@ func (sw *Switch) listenerRoutine(l Listener) {
                        break
                }
 
-               // disconnect if we alrady have 2 * MaxNumPeers, we do this because we wanna address book get exchanged even if
-               // the connect is full. The pex will disconnect the peer after address exchange, the max connected peer won't
-               // be double of MaxNumPeers
-               if sw.peers.Size() >= sw.Config.P2P.MaxNumPeers*2 {
+               // disconnect if we alrady have MaxNumPeers
+               if sw.peers.Size() >= sw.Config.P2P.MaxNumPeers {
                        inConn.Close()
                        log.Info("Ignoring inbound connection: already have enough peers.")
                        continue
@@ -355,6 +367,67 @@ func (sw *Switch) listenerRoutine(l Listener) {
        }
 }
 
+func (sw *Switch) SetDiscv(discv *discover.Network) {
+       sw.discv = discv
+}
+
+func (sw *Switch) dialPeerWorker(a *NetAddress, wg *sync.WaitGroup) {
+       if err := sw.DialPeerWithAddress(a); err != nil {
+               log.WithFields(log.Fields{"addr": a, "err": err}).Error("dialPeerWorker fail on dial peer")
+       }
+       wg.Done()
+}
+
+func (sw *Switch) ensureOutboundPeers() {
+       numOutPeers, _, numDialing := sw.NumPeers()
+       numToDial := (minNumOutboundPeers - (numOutPeers + numDialing))
+       log.WithFields(log.Fields{"numOutPeers": numOutPeers, "numDialing": numDialing, "numToDial": numToDial}).Debug("ensure peers")
+       if numToDial <= 0 {
+               return
+       }
+
+       connectedPeers := make(map[string]struct{})
+       for _, peer := range sw.Peers().List() {
+               connectedPeers[peer.RemoteAddrHost()] = struct{}{}
+       }
+
+       var wg sync.WaitGroup
+       nodes := make([]*discover.Node, numToDial)
+       n := sw.discv.ReadRandomNodes(nodes)
+       for i := 0; i < n; i++ {
+               try := NewNetAddressIPPort(nodes[i].IP, nodes[i].TCP)
+               if sw.NodeInfo().ListenAddr == try.String() {
+                       continue
+               }
+               if dialling := sw.IsDialing(try); dialling {
+                       continue
+               }
+               if _, ok := connectedPeers[try.IP.String()]; ok {
+                       continue
+               }
+
+               wg.Add(1)
+               go sw.dialPeerWorker(try, &wg)
+       }
+       wg.Wait()
+}
+
+func (sw *Switch) ensureOutboundPeersRoutine() {
+       sw.ensureOutboundPeers()
+
+       ticker := time.NewTicker(10 * time.Second)
+       defer ticker.Stop()
+
+       for {
+               select {
+               case <-ticker.C:
+                       sw.ensureOutboundPeers()
+               case <-sw.Quit:
+                       return
+               }
+       }
+}
+
 func (sw *Switch) startInitPeer(peer *Peer) error {
        peer.Start() // spawn send/recv routines
        for _, reactor := range sw.reactors {