OSDN Git Service

add cpu log
[bytom/bytom.git] / p2p / switch.go
index 121302b..4e747a4 100644 (file)
@@ -8,19 +8,23 @@ import (
        "time"
 
        log "github.com/sirupsen/logrus"
-       crypto "github.com/tendermint/go-crypto"
+       "github.com/tendermint/go-crypto"
        cmn "github.com/tendermint/tmlibs/common"
        dbm "github.com/tendermint/tmlibs/db"
 
        cfg "github.com/bytom/config"
+       "github.com/bytom/consensus"
        "github.com/bytom/errors"
        "github.com/bytom/p2p/connection"
+       "github.com/bytom/p2p/discover"
        "github.com/bytom/p2p/trust"
+       "github.com/bytom/version"
 )
 
 const (
-       bannedPeerKey      = "BannedPeer"
-       defaultBanDuration = time.Hour * 1
+       bannedPeerKey       = "BannedPeer"
+       defaultBanDuration  = time.Hour * 1
+       minNumOutboundPeers = 3
 )
 
 //pre-define errors for connecting fail
@@ -28,19 +32,9 @@ var (
        ErrDuplicatePeer     = errors.New("Duplicate peer")
        ErrConnectSelf       = errors.New("Connect self")
        ErrConnectBannedPeer = errors.New("Connect banned peer")
+       ErrConnectSpvPeer    = errors.New("Outbound connect spv peer")
 )
 
-// An AddrBook represents an address book from the pex package, which is used to store peer addresses.
-type AddrBook interface {
-       AddAddress(*NetAddress, *NetAddress) error
-       AddOurAddress(*NetAddress)
-       MarkGood(*NetAddress)
-       RemoveAddress(*NetAddress)
-       SaveToFile() error
-}
-
-//-----------------------------------------------------------------------------
-
 // Switch handles peer connections and exposes an API to receive incoming messages
 // on `Reactors`.  Each `Reactor` is responsible for handling incoming messages of one
 // or more `Channels`.  So while sending outgoing messages is typically performed on the peer,
@@ -48,7 +42,7 @@ type AddrBook interface {
 type Switch struct {
        cmn.BaseService
 
-       Config       *cfg.P2PConfig
+       Config       *cfg.Config
        peerConfig   *PeerConfig
        listeners    []Listener
        reactors     map[string]Reactor
@@ -58,25 +52,24 @@ type Switch struct {
        dialing      *cmn.CMap
        nodeInfo     *NodeInfo             // our node info
        nodePrivKey  crypto.PrivKeyEd25519 // our node privkey
-       addrBook     AddrBook
+       discv        *discover.Network
        bannedPeer   map[string]time.Time
        db           dbm.DB
        mtx          sync.Mutex
 }
 
 // NewSwitch creates a new Switch with the given config.
-func NewSwitch(config *cfg.P2PConfig, addrBook AddrBook, trustHistoryDB dbm.DB) *Switch {
+func NewSwitch(config *cfg.Config) *Switch {
        sw := &Switch{
                Config:       config,
-               peerConfig:   DefaultPeerConfig(config),
+               peerConfig:   DefaultPeerConfig(config.P2P),
                reactors:     make(map[string]Reactor),
                chDescs:      make([]*connection.ChannelDescriptor, 0),
                reactorsByCh: make(map[byte]Reactor),
                peers:        NewPeerSet(),
                dialing:      cmn.NewCMap(),
                nodeInfo:     nil,
-               addrBook:     addrBook,
-               db:           trustHistoryDB,
+               db:           dbm.NewDB("trusthistory", config.DBBackend, config.DBDir()),
        }
        sw.BaseService = *cmn.NewBaseService(nil, "P2P Switch", sw)
        sw.bannedPeer = make(map[string]time.Time)
@@ -89,234 +82,133 @@ func NewSwitch(config *cfg.P2PConfig, addrBook AddrBook, trustHistoryDB dbm.DB)
        return sw
 }
 
-// AddReactor adds the given reactor to the switch.
-// NOTE: Not goroutine safe.
-func (sw *Switch) AddReactor(name string, reactor Reactor) Reactor {
-       // Validate the reactor.
-       // No two reactors can share the same channel.
-       reactorChannels := reactor.GetChannels()
-       for _, chDesc := range reactorChannels {
-               chID := chDesc.ID
-               if sw.reactorsByCh[chID] != nil {
-                       cmn.PanicSanity(fmt.Sprintf("Channel %X has multiple reactors %v & %v", chID, sw.reactorsByCh[chID], reactor))
-               }
-               sw.chDescs = append(sw.chDescs, chDesc)
-               sw.reactorsByCh[chID] = reactor
-       }
-       sw.reactors[name] = reactor
-       reactor.SetSwitch(sw)
-       return reactor
-}
-
-// Reactors returns a map of reactors registered on the switch.
-// NOTE: Not goroutine safe.
-func (sw *Switch) Reactors() map[string]Reactor {
-       return sw.reactors
-}
-
-// Reactor returns the reactor with the given name.
-// NOTE: Not goroutine safe.
-func (sw *Switch) Reactor(name string) Reactor {
-       return sw.reactors[name]
-}
-
-// AddListener adds the given listener to the switch for listening to incoming peer connections.
-// NOTE: Not goroutine safe.
-func (sw *Switch) AddListener(l Listener) {
-       sw.listeners = append(sw.listeners, l)
-}
-
-// Listeners returns the list of listeners the switch listens on.
-// NOTE: Not goroutine safe.
-func (sw *Switch) Listeners() []Listener {
-       return sw.listeners
-}
-
-// IsListening returns true if the switch has at least one listener.
-// NOTE: Not goroutine safe.
-func (sw *Switch) IsListening() bool {
-       return len(sw.listeners) > 0
-}
-
-// SetNodeInfo sets the switch's NodeInfo for checking compatibility and handshaking with other nodes.
-// NOTE: Not goroutine safe.
-func (sw *Switch) SetNodeInfo(nodeInfo *NodeInfo) {
-       sw.nodeInfo = nodeInfo
-}
-
-// NodeInfo returns the switch's NodeInfo.
-// NOTE: Not goroutine safe.
-func (sw *Switch) NodeInfo() *NodeInfo {
-       return sw.nodeInfo
-}
-
-// SetNodePrivKey sets the switch's private key for authenticated encryption.
-// NOTE: Not goroutine safe.
-func (sw *Switch) SetNodePrivKey(nodePrivKey crypto.PrivKeyEd25519) {
-       sw.nodePrivKey = nodePrivKey
-       if sw.nodeInfo != nil {
-               sw.nodeInfo.PubKey = nodePrivKey.PubKey().Unwrap().(crypto.PubKeyEd25519)
-       }
-}
-
 // OnStart implements BaseService. It starts all the reactors, peers, and listeners.
 func (sw *Switch) OnStart() error {
-       // Start reactors
        for _, reactor := range sw.reactors {
-               _, err := reactor.Start()
-               if err != nil {
+               if _, err := reactor.Start(); err != nil {
                        return err
                }
        }
-       // Start listeners
        for _, listener := range sw.listeners {
                go sw.listenerRoutine(listener)
        }
+       go sw.ensureOutboundPeersRoutine()
        return nil
 }
 
 // OnStop implements BaseService. It stops all listeners, peers, and reactors.
 func (sw *Switch) OnStop() {
-       // Stop listeners
        for _, listener := range sw.listeners {
                listener.Stop()
        }
        sw.listeners = nil
-       // Stop peers
+
        for _, peer := range sw.peers.List() {
                peer.Stop()
                sw.peers.Remove(peer)
        }
-       // Stop reactors
+
        for _, reactor := range sw.reactors {
                reactor.Stop()
        }
 }
 
+//AddBannedPeer add peer to blacklist
+func (sw *Switch) AddBannedPeer(ip string) error {
+       sw.mtx.Lock()
+       defer sw.mtx.Unlock()
+
+       sw.bannedPeer[ip] = time.Now().Add(defaultBanDuration)
+       datajson, err := json.Marshal(sw.bannedPeer)
+       if err != nil {
+               return err
+       }
+
+       sw.db.Set([]byte(bannedPeerKey), datajson)
+       return nil
+}
+
 // AddPeer performs the P2P handshake with a peer
 // that already has a SecretConnection. If all goes well,
 // it starts the peer and adds it to the switch.
 // NOTE: This performs a blocking handshake before the peer is added.
 // CONTRACT: If error is returned, peer is nil, and conn is immediately closed.
 func (sw *Switch) AddPeer(pc *peerConn) error {
-       peerNodeInfo, err := pc.HandshakeTimeout(sw.nodeInfo, time.Duration(sw.peerConfig.HandshakeTimeout*time.Second))
+       peerNodeInfo, err := pc.HandshakeTimeout(sw.nodeInfo, time.Duration(sw.peerConfig.HandshakeTimeout))
        if err != nil {
                return err
        }
-       // Check version, chain id
+
+       if err := version.Status.CheckUpdate(sw.nodeInfo.Version, peerNodeInfo.Version, peerNodeInfo.RemoteAddr); err != nil {
+               return err
+       }
        if err := sw.nodeInfo.CompatibleWith(peerNodeInfo); err != nil {
                return err
        }
 
        peer := newPeer(pc, peerNodeInfo, sw.reactorsByCh, sw.chDescs, sw.StopPeerForError)
-
-       //filter peer
        if err := sw.filterConnByPeer(peer); err != nil {
                return err
        }
 
+       if pc.outbound && !peer.ServiceFlag().IsEnable(consensus.SFFullNode) {
+               return ErrConnectSpvPeer
+       }
+
        // Start peer
        if sw.IsRunning() {
                if err := sw.startInitPeer(peer); err != nil {
                        return err
                }
        }
-
-       // Add the peer to .peers.
-       // We start it first so that a peer in the list is safe to Stop.
-       // It should not err since we already checked peers.Has()
-       if err := sw.peers.Add(peer); err != nil {
-               return err
-       }
-
-       log.Info("Added peer:", peer)
-       return nil
+       return sw.peers.Add(peer)
 }
 
-func (sw *Switch) startInitPeer(peer *Peer) error {
-       peer.Start() // spawn send/recv routines
-       for _, reactor := range sw.reactors {
-               if err := reactor.AddPeer(peer); err != nil {
-                       return err
+// AddReactor adds the given reactor to the switch.
+// NOTE: Not goroutine safe.
+func (sw *Switch) AddReactor(name string, reactor Reactor) Reactor {
+       // Validate the reactor.
+       // No two reactors can share the same channel.
+       for _, chDesc := range reactor.GetChannels() {
+               chID := chDesc.ID
+               if sw.reactorsByCh[chID] != nil {
+                       cmn.PanicSanity(fmt.Sprintf("Channel %X has multiple reactors %v & %v", chID, sw.reactorsByCh[chID], reactor))
                }
+               sw.chDescs = append(sw.chDescs, chDesc)
+               sw.reactorsByCh[chID] = reactor
        }
-       return nil
-}
-
-func (sw *Switch) dialSeed(addr *NetAddress) {
-       err := sw.DialPeerWithAddress(addr)
-       if err != nil {
-               log.Info("Error dialing seed:", addr.String())
-       }
-}
-
-func (sw *Switch) addrBookDelSelf() error {
-       addr, err := NewNetAddressString(sw.nodeInfo.ListenAddr)
-       if err != nil {
-               return err
-       }
-       // remove the given address from the address book if we're added it earlier
-       sw.addrBook.RemoveAddress(addr)
-       // add the given address to the address book to avoid dialing ourselves
-       // again this is our public address
-       sw.addrBook.AddOurAddress(addr)
-       return nil
-}
-
-func (sw *Switch) filterConnByIP(ip string) error {
-       if err := sw.checkBannedPeer(ip); err != nil {
-               return ErrConnectBannedPeer
-       }
-
-       if ip == sw.nodeInfo.ListenHost() {
-               sw.addrBookDelSelf()
-               return ErrConnectSelf
-       }
-
-       return nil
+       sw.reactors[name] = reactor
+       reactor.SetSwitch(sw)
+       return reactor
 }
 
-func (sw *Switch) filterConnByPeer(peer *Peer) error {
-       if err := sw.checkBannedPeer(peer.RemoteAddrHost()); err != nil {
-               return ErrConnectBannedPeer
-       }
-
-       if sw.nodeInfo.PubKey.Equals(peer.PubKey().Wrap()) {
-               sw.addrBookDelSelf()
-               return ErrConnectSelf
-       }
-
-       // Check for duplicate peer
-       if sw.peers.Has(peer.Key) {
-               return ErrDuplicatePeer
-       }
-       return nil
+// AddListener adds the given listener to the switch for listening to incoming peer connections.
+// NOTE: Not goroutine safe.
+func (sw *Switch) AddListener(l Listener) {
+       sw.listeners = append(sw.listeners, l)
 }
 
 //DialPeerWithAddress dial node from net address
 func (sw *Switch) DialPeerWithAddress(addr *NetAddress) error {
        log.Debug("Dialing peer address:", addr)
-
+       sw.dialing.Set(addr.IP.String(), addr)
+       defer sw.dialing.Delete(addr.IP.String())
        if err := sw.filterConnByIP(addr.IP.String()); err != nil {
                return err
        }
 
-       sw.dialing.Set(addr.IP.String(), addr)
-       defer sw.dialing.Delete(addr.IP.String())
-
-       pc, err := newOutboundPeerConn(addr, sw.reactorsByCh, sw.chDescs, sw.StopPeerForError, sw.nodePrivKey, sw.peerConfig)
+       pc, err := newOutboundPeerConn(addr, sw.nodePrivKey, sw.peerConfig)
        if err != nil {
-               log.Debug("Failed to dial peer", " address:", addr, " error:", err)
+               log.WithFields(log.Fields{"address": addr, " err": err}).Debug("DialPeer fail on newOutboundPeerConn")
                return err
        }
 
-       err = sw.AddPeer(pc)
-       if err != nil {
-               log.Info("Failed to add peer:", addr, " err:", err)
+       if err = sw.AddPeer(pc); err != nil {
+               log.WithFields(log.Fields{"address": addr, " err": err}).Debug("DialPeer fail on switch AddPeer")
                pc.CloseConn()
                return err
        }
-       log.Info("Dialed and added peer:", addr)
+       log.Debug("DialPeer added peer:", addr)
        return nil
 }
 
@@ -325,6 +217,18 @@ func (sw *Switch) IsDialing(addr *NetAddress) bool {
        return sw.dialing.Has(addr.IP.String())
 }
 
+// IsListening returns true if the switch has at least one listener.
+// NOTE: Not goroutine safe.
+func (sw *Switch) IsListening() bool {
+       return len(sw.listeners) > 0
+}
+
+// Listeners returns the list of listeners the switch listens on.
+// NOTE: Not goroutine safe.
+func (sw *Switch) Listeners() []Listener {
+       return sw.listeners
+}
+
 // NumPeers Returns the count of outbound/inbound and outbound-dialing peers.
 func (sw *Switch) NumPeers() (outbound, inbound, dialing int) {
        peers := sw.peers.List()
@@ -339,29 +243,106 @@ func (sw *Switch) NumPeers() (outbound, inbound, dialing int) {
        return
 }
 
+// NodeInfo returns the switch's NodeInfo.
+// NOTE: Not goroutine safe.
+func (sw *Switch) NodeInfo() *NodeInfo {
+       return sw.nodeInfo
+}
+
 //Peers return switch peerset
 func (sw *Switch) Peers() *PeerSet {
        return sw.peers
 }
 
+// SetNodeInfo sets the switch's NodeInfo for checking compatibility and handshaking with other nodes.
+// NOTE: Not goroutine safe.
+func (sw *Switch) SetNodeInfo(nodeInfo *NodeInfo) {
+       sw.nodeInfo = nodeInfo
+}
+
+// SetNodePrivKey sets the switch's private key for authenticated encryption.
+// NOTE: Not goroutine safe.
+func (sw *Switch) SetNodePrivKey(nodePrivKey crypto.PrivKeyEd25519) {
+       sw.nodePrivKey = nodePrivKey
+       if sw.nodeInfo != nil {
+               sw.nodeInfo.PubKey = nodePrivKey.PubKey().Unwrap().(crypto.PubKeyEd25519)
+       }
+}
+
 // StopPeerForError disconnects from a peer due to external error.
 func (sw *Switch) StopPeerForError(peer *Peer, reason interface{}) {
-       log.Info("Stopping peer for error.", " peer:", peer, " err:", reason)
+       log.WithFields(log.Fields{"peer": peer, " err": reason}).Debug("stopping peer for error")
        sw.stopAndRemovePeer(peer, reason)
 }
 
 // StopPeerGracefully disconnect from a peer gracefully.
-func (sw *Switch) StopPeerGracefully(peer *Peer) {
-       log.Info("Stopping peer gracefully")
-       sw.stopAndRemovePeer(peer, nil)
+func (sw *Switch) StopPeerGracefully(peerID string) {
+       if peer := sw.peers.Get(peerID); peer != nil {
+               sw.stopAndRemovePeer(peer, nil)
+       }
 }
 
-func (sw *Switch) stopAndRemovePeer(peer *Peer, reason interface{}) {
-       for _, reactor := range sw.reactors {
-               reactor.RemovePeer(peer, reason)
+func (sw *Switch) addPeerWithConnection(conn net.Conn) error {
+       peerConn, err := newInboundPeerConn(conn, sw.nodePrivKey, sw.Config.P2P)
+       if err != nil {
+               conn.Close()
+               return err
        }
-       sw.peers.Remove(peer)
-       peer.Stop()
+
+       if err = sw.AddPeer(peerConn); err != nil {
+               conn.Close()
+               return err
+       }
+       return nil
+}
+
+func (sw *Switch) checkBannedPeer(peer string) error {
+       sw.mtx.Lock()
+       defer sw.mtx.Unlock()
+
+       if banEnd, ok := sw.bannedPeer[peer]; ok {
+               if time.Now().Before(banEnd) {
+                       return ErrConnectBannedPeer
+               }
+               sw.delBannedPeer(peer)
+       }
+       return nil
+}
+
+func (sw *Switch) delBannedPeer(addr string) error {
+       sw.mtx.Lock()
+       defer sw.mtx.Unlock()
+
+       delete(sw.bannedPeer, addr)
+       datajson, err := json.Marshal(sw.bannedPeer)
+       if err != nil {
+               return err
+       }
+
+       sw.db.Set([]byte(bannedPeerKey), datajson)
+       return nil
+}
+
+func (sw *Switch) filterConnByIP(ip string) error {
+       if ip == sw.nodeInfo.ListenHost() {
+               return ErrConnectSelf
+       }
+       return sw.checkBannedPeer(ip)
+}
+
+func (sw *Switch) filterConnByPeer(peer *Peer) error {
+       if err := sw.checkBannedPeer(peer.RemoteAddrHost()); err != nil {
+               return err
+       }
+
+       if sw.nodeInfo.PubKey.Equals(peer.PubKey().Wrap()) {
+               return ErrConnectSelf
+       }
+
+       if sw.peers.Has(peer.Key) {
+               return ErrDuplicatePeer
+       }
+       return nil
 }
 
 func (sw *Switch) listenerRoutine(l Listener) {
@@ -371,71 +352,96 @@ func (sw *Switch) listenerRoutine(l Listener) {
                        break
                }
 
-               // disconnect if we alrady have 2 * MaxNumPeers, we do this because we wanna address book get exchanged even if
-               // the connect is full. The pex will disconnect the peer after address exchange, the max connected peer won't
-               // be double of MaxNumPeers
-               if sw.peers.Size() >= sw.Config.MaxNumPeers*2 {
+               // disconnect if we alrady have MaxNumPeers
+               if sw.peers.Size() >= sw.Config.P2P.MaxNumPeers {
                        inConn.Close()
                        log.Info("Ignoring inbound connection: already have enough peers.")
                        continue
                }
 
                // New inbound connection!
-               err := sw.addPeerWithConnection(inConn)
-               if err != nil {
+               if err := sw.addPeerWithConnection(inConn); err != nil {
                        log.Info("Ignoring inbound connection: error while adding peer.", " address:", inConn.RemoteAddr().String(), " error:", err)
                        continue
                }
        }
 }
 
-func (sw *Switch) addPeerWithConnection(conn net.Conn) error {
-       peerConn, err := newInboundPeerConn(conn, sw.reactorsByCh, sw.chDescs, sw.StopPeerForError, sw.nodePrivKey, sw.Config)
-       if err != nil {
-               conn.Close()
-               return err
+func (sw *Switch) SetDiscv(discv *discover.Network) {
+       sw.discv = discv
+}
+
+func (sw *Switch) dialPeerWorker(a *NetAddress, wg *sync.WaitGroup) {
+       if err := sw.DialPeerWithAddress(a); err != nil {
+               log.WithFields(log.Fields{"addr": a, "err": err}).Error("dialPeerWorker fail on dial peer")
        }
-       if err = sw.AddPeer(peerConn); err != nil {
-               conn.Close()
-               return err
+       wg.Done()
+}
+
+func (sw *Switch) ensureOutboundPeers() {
+       numOutPeers, _, numDialing := sw.NumPeers()
+       numToDial := (minNumOutboundPeers - (numOutPeers + numDialing))
+       log.WithFields(log.Fields{"numOutPeers": numOutPeers, "numDialing": numDialing, "numToDial": numToDial}).Debug("ensure peers")
+       if numToDial <= 0 {
+               return
        }
 
-       return nil
-}
+       connectedPeers := make(map[string]struct{})
+       for _, peer := range sw.Peers().List() {
+               connectedPeers[peer.RemoteAddrHost()] = struct{}{}
+       }
 
-//AddBannedPeer add peer to blacklist
-func (sw *Switch) AddBannedPeer(peer *Peer) error {
-       sw.mtx.Lock()
-       defer sw.mtx.Unlock()
-       key := peer.NodeInfo.RemoteAddrHost()
-       sw.bannedPeer[key] = time.Now().Add(defaultBanDuration)
-       datajson, err := json.Marshal(sw.bannedPeer)
-       if err != nil {
-               return err
+       var wg sync.WaitGroup
+       nodes := make([]*discover.Node, numToDial)
+       n := sw.discv.ReadRandomNodes(nodes)
+       for i := 0; i < n; i++ {
+               try := NewNetAddressIPPort(nodes[i].IP, nodes[i].TCP)
+               if sw.NodeInfo().ListenAddr == try.String() {
+                       continue
+               }
+               if dialling := sw.IsDialing(try); dialling {
+                       continue
+               }
+               if _, ok := connectedPeers[try.IP.String()]; ok {
+                       continue
+               }
+
+               wg.Add(1)
+               go sw.dialPeerWorker(try, &wg)
        }
-       sw.db.Set([]byte(bannedPeerKey), datajson)
-       return nil
+       wg.Wait()
 }
 
-func (sw *Switch) delBannedPeer(addr string) error {
-       delete(sw.bannedPeer, addr)
-       datajson, err := json.Marshal(sw.bannedPeer)
-       if err != nil {
-               return err
+func (sw *Switch) ensureOutboundPeersRoutine() {
+       sw.ensureOutboundPeers()
+
+       ticker := time.NewTicker(10 * time.Second)
+       defer ticker.Stop()
+
+       for {
+               select {
+               case <-ticker.C:
+                       sw.ensureOutboundPeers()
+               case <-sw.Quit:
+                       return
+               }
        }
-       sw.db.Set([]byte(bannedPeerKey), datajson)
-       return nil
 }
 
-func (sw *Switch) checkBannedPeer(peer string) error {
-       sw.mtx.Lock()
-       defer sw.mtx.Unlock()
-
-       if banEnd, ok := sw.bannedPeer[peer]; ok {
-               if time.Now().Before(banEnd) {
-                       return ErrConnectBannedPeer
+func (sw *Switch) startInitPeer(peer *Peer) error {
+       peer.Start() // spawn send/recv routines
+       for _, reactor := range sw.reactors {
+               if err := reactor.AddPeer(peer); err != nil {
+                       return err
                }
-               sw.delBannedPeer(peer)
        }
        return nil
 }
+
+func (sw *Switch) stopAndRemovePeer(peer *Peer, reason interface{}) {
+       sw.peers.Remove(peer)
+       for _, reactor := range sw.reactors {
+               reactor.RemovePeer(peer, reason)
+       }
+       peer.Stop()
+}