OSDN Git Service

Optimize p2p peer code
authorYahtoo Ma <yahtoo.ma@gmail.com>
Mon, 21 May 2018 02:52:23 +0000 (10:52 +0800)
committerYahtoo Ma <yahtoo.ma@gmail.com>
Mon, 21 May 2018 02:54:19 +0000 (10:54 +0800)
p2p/peer.go
p2p/pex_reactor.go
p2p/switch.go

index 9937271..2e14598 100644 (file)
@@ -19,18 +19,22 @@ import (
 // Redial function to reconnect. Note that inbound peers can't be
 // made persistent. They should be made persistent on the other end.
 //
+
+// peerConn contains the raw connection and its config.
+type peerConn struct {
+       outbound bool
+       config   *PeerConfig
+       conn     net.Conn // source connection
+}
+
 // Before using a peer, you will need to perform a handshake on connection.
 type Peer struct {
        cmn.BaseService
 
-       outbound bool
-
-       conn  net.Conn     // source connection
+       // raw peerConn and the multiplex connection
+       *peerConn
        mconn *MConnection // multiplex connection
 
-       persistent bool
-       config     *PeerConfig
-
        *NodeInfo
        Key  string
        Data *cmn.CMap // User data.
@@ -55,40 +59,52 @@ func DefaultPeerConfig(config *cfg.P2PConfig) *PeerConfig {
        return &PeerConfig{
                AuthEnc:          true,
                HandshakeTimeout: time.Duration(config.HandshakeTimeout), // * time.Second,
-               DialTimeout:      time.Duration(config.DialTimeout),  // * time.Second,
+               DialTimeout:      time.Duration(config.DialTimeout),      // * time.Second,
                MConfig:          DefaultMConnConfig(),
                Fuzz:             false,
                FuzzConfig:       DefaultFuzzConnConfig(),
        }
 }
 
-func newOutboundPeer(addr *NetAddress, reactorsByCh map[byte]Reactor, chDescs []*ChannelDescriptor, onPeerError func(*Peer, interface{}), ourNodePrivKey crypto.PrivKeyEd25519, config *cfg.P2PConfig) (*Peer, error) {
-       return newOutboundPeerWithConfig(addr, reactorsByCh, chDescs, onPeerError, ourNodePrivKey, DefaultPeerConfig(config))
+func newPeer(pc *peerConn, nodeInfo *NodeInfo, reactorsByCh map[byte]Reactor, chDescs []*ChannelDescriptor, onPeerError func(*Peer, interface{})) *Peer {
+       // Key and NodeInfo are set after Handshake
+       p := &Peer{
+               peerConn: pc,
+               NodeInfo: nodeInfo,
+
+               Data: cmn.NewCMap(),
+       }
+       p.Key = nodeInfo.PubKey.KeyString()
+       p.mconn = createMConnection(pc.conn, p, reactorsByCh, chDescs, onPeerError, pc.config.MConfig)
+
+       p.BaseService = *cmn.NewBaseService(nil, "Peer", p)
+       return p
+}
+
+func newOutboundPeer(addr *NetAddress, reactorsByCh map[byte]Reactor, chDescs []*ChannelDescriptor, onPeerError func(*Peer, interface{}), ourNodePrivKey crypto.PrivKeyEd25519, config *cfg.P2PConfig) (*peerConn, error) {
+       return newOutboundPeerConn(addr, reactorsByCh, chDescs, onPeerError, ourNodePrivKey, DefaultPeerConfig(config))
 }
 
-func newOutboundPeerWithConfig(addr *NetAddress, reactorsByCh map[byte]Reactor, chDescs []*ChannelDescriptor, onPeerError func(*Peer, interface{}), ourNodePrivKey crypto.PrivKeyEd25519, config *PeerConfig) (*Peer, error) {
+func newOutboundPeerConn(addr *NetAddress, reactorsByCh map[byte]Reactor, chDescs []*ChannelDescriptor, onPeerError func(*Peer, interface{}), ourNodePrivKey crypto.PrivKeyEd25519, config *PeerConfig) (*peerConn, error) {
        conn, err := dial(addr, config)
        if err != nil {
-               return nil, errors.Wrap(err, "Error creating peer")
+               return nil, errors.Wrap(err, "Error dial peer")
        }
 
-       peer, err := newPeerFromConnAndConfig(conn, true, reactorsByCh, chDescs, onPeerError, ourNodePrivKey, config)
+       pc, err := newPeerConn(conn, true, reactorsByCh, chDescs, onPeerError, ourNodePrivKey, config)
        if err != nil {
                conn.Close()
                return nil, err
        }
-       return peer, nil
-}
 
-func newInboundPeerConn(conn net.Conn, reactorsByCh map[byte]Reactor, chDescs []*ChannelDescriptor, onPeerError func(*Peer, interface{}), ourNodePrivKey crypto.PrivKeyEd25519, config *cfg.P2PConfig) (*Peer, error) {
-       return newInboundPeerWithConfig(conn, reactorsByCh, chDescs, onPeerError, ourNodePrivKey, DefaultPeerConfig(config))
+       return pc, nil
 }
 
-func newInboundPeerWithConfig(conn net.Conn, reactorsByCh map[byte]Reactor, chDescs []*ChannelDescriptor, onPeerError func(*Peer, interface{}), ourNodePrivKey crypto.PrivKeyEd25519, config *PeerConfig) (*Peer, error) {
-       return newPeerFromConnAndConfig(conn, false, reactorsByCh, chDescs, onPeerError, ourNodePrivKey, config)
+func newInboundPeerConn(conn net.Conn, reactorsByCh map[byte]Reactor, chDescs []*ChannelDescriptor, onPeerError func(*Peer, interface{}), ourNodePrivKey crypto.PrivKeyEd25519, config *cfg.P2PConfig) (*peerConn, error) {
+       return newPeerConn(conn, false, reactorsByCh, chDescs, onPeerError, ourNodePrivKey, DefaultPeerConfig(config))
 }
 
-func newPeerFromConnAndConfig(rawConn net.Conn, outbound bool, reactorsByCh map[byte]Reactor, chDescs []*ChannelDescriptor, onPeerError func(*Peer, interface{}), ourNodePrivKey crypto.PrivKeyEd25519, config *PeerConfig) (*Peer, error) {
+func newPeerConn(rawConn net.Conn, outbound bool, reactorsByCh map[byte]Reactor, chDescs []*ChannelDescriptor, onPeerError func(*Peer, interface{}), ourNodePrivKey crypto.PrivKeyEd25519, config *PeerConfig) (*peerConn, error) {
        conn := rawConn
 
        // Fuzz connection
@@ -108,45 +124,24 @@ func newPeerFromConnAndConfig(rawConn net.Conn, outbound bool, reactorsByCh map[
                }
        }
 
-       // Key and NodeInfo are set after Handshake
-       p := &Peer{
+       // Only the information we already have
+       return &peerConn{
+               config:   config,
                outbound: outbound,
                conn:     conn,
-               config:   config,
-               Data:     cmn.NewCMap(),
-       }
-
-       p.mconn = createMConnection(conn, p, reactorsByCh, chDescs, onPeerError, config.MConfig)
-
-       p.BaseService = *cmn.NewBaseService(nil, "Peer", p)
-
-       return p, nil
+       }, nil
 }
 
 // CloseConn should be used when the peer was created, but never started.
-func (p *Peer) CloseConn() {
-       p.conn.Close()
-}
-
-// makePersistent marks the peer as persistent.
-func (p *Peer) makePersistent() {
-       if !p.outbound {
-               panic("inbound peers can't be made persistent")
-       }
-
-       p.persistent = true
-}
-
-// IsPersistent returns true if the peer is persitent, false otherwise.
-func (p *Peer) IsPersistent() bool {
-       return p.persistent
+func (pc *peerConn) CloseConn() {
+       pc.conn.Close()
 }
 
 // HandshakeTimeout performs a handshake between a given node and the peer.
 // NOTE: blocking
-func (p *Peer) HandshakeTimeout(ourNodeInfo *NodeInfo, timeout time.Duration) error {
+func (pc *peerConn) HandshakeTimeout(ourNodeInfo *NodeInfo, timeout time.Duration) (*NodeInfo, error) {
        // Set deadline for handshake so we don't block forever on conn.ReadFull
-       p.conn.SetDeadline(time.Now().Add(timeout))
+       pc.conn.SetDeadline(time.Now().Add(timeout))
 
        var peerNodeInfo = new(NodeInfo)
        var err1 error
@@ -154,37 +149,24 @@ func (p *Peer) HandshakeTimeout(ourNodeInfo *NodeInfo, timeout time.Duration) er
        cmn.Parallel(
                func() {
                        var n int
-                       wire.WriteBinary(ourNodeInfo, p.conn, &n, &err1)
+                       wire.WriteBinary(ourNodeInfo, pc.conn, &n, &err1)
                },
                func() {
                        var n int
-                       wire.ReadBinary(peerNodeInfo, p.conn, maxNodeInfoSize, &n, &err2)
+                       wire.ReadBinary(peerNodeInfo, pc.conn, maxNodeInfoSize, &n, &err2)
                        log.WithField("peerNodeInfo", peerNodeInfo).Info("Peer handshake")
                })
        if err1 != nil {
-               return errors.Wrap(err1, "Error during handshake/write")
+               return peerNodeInfo, errors.Wrap(err1, "Error during handshake/write")
        }
        if err2 != nil {
-               return errors.Wrap(err2, "Error during handshake/read")
-       }
-
-       if p.config.AuthEnc {
-               // Check that the professed PubKey matches the sconn's.
-               if !peerNodeInfo.PubKey.Equals(p.PubKey().Wrap()) {
-                       return fmt.Errorf("Ignoring connection with unmatching pubkey: %v vs %v",
-                               peerNodeInfo.PubKey, p.PubKey())
-               }
+               return peerNodeInfo, errors.Wrap(err2, "Error during handshake/read")
        }
 
        // Remove deadline
-       p.conn.SetDeadline(time.Time{})
-
-       peerNodeInfo.RemoteAddr = p.Addr().String()
-
-       p.NodeInfo = peerNodeInfo
-       p.Key = peerNodeInfo.PubKey.KeyString()
+       pc.conn.SetDeadline(time.Time{})
 
-       return nil
+       return peerNodeInfo, nil
 }
 
 // Addr returns peer's remote network address.
@@ -293,11 +275,7 @@ func createMConnection(conn net.Conn, p *Peer, reactorsByCh map[byte]Reactor, ch
        onReceive := func(chID byte, msgBytes []byte) {
                reactor := reactorsByCh[chID]
                if reactor == nil {
-                       if chID == PexChannel {
-                               return
-                       } else {
-                               cmn.PanicSanity(cmn.Fmt("Unknown channel %X", chID))
-                       }
+                       cmn.PanicSanity(cmn.Fmt("Unknown channel %X", chID))
                }
                reactor.Receive(chID, p, msgBytes)
        }
index f68c5d4..395f199 100644 (file)
@@ -351,7 +351,7 @@ func (r *PEXReactor) ensurePeers() {
 }
 
 func (r *PEXReactor) dialPeerWorker(a *NetAddress, wg *sync.WaitGroup) {
-       if _, err := r.Switch.DialPeerWithAddress(a); err != nil {
+       if err := r.Switch.DialPeerWithAddress(a); err != nil {
                r.book.MarkAttempt(a)
        } else {
                r.book.MarkGood(a)
index 003ad3d..74f633d 100644 (file)
@@ -14,8 +14,8 @@ import (
        dbm "github.com/tendermint/tmlibs/db"
 
        cfg "github.com/bytom/config"
-       "github.com/bytom/p2p/trust"
        "github.com/bytom/errors"
+       "github.com/bytom/p2p/trust"
 )
 
 const (
@@ -195,24 +195,30 @@ func (sw *Switch) OnStop() {
 // it starts the peer and adds it to the switch.
 // NOTE: This performs a blocking handshake before the peer is added.
 // CONTRACT: If error is returned, peer is nil, and conn is immediately closed.
-func (sw *Switch) AddPeer(peer *Peer) error {
-       if err := peer.HandshakeTimeout(sw.nodeInfo, time.Duration(sw.peerConfig.HandshakeTimeout*time.Second)); err != nil {
+func (sw *Switch) AddPeer(pc *peerConn) error {
+       peerNodeInfo, err := pc.HandshakeTimeout(sw.nodeInfo, time.Duration(sw.peerConfig.HandshakeTimeout*time.Second))
+       if err != nil {
                return ErrConnectBannedPeer
        }
-       //filter peer
-       if err := sw.filterConnByPeer(peer); err != nil {
+       // Check version, chain id
+       if err := sw.nodeInfo.CompatibleWith(peerNodeInfo); err != nil {
                return err
        }
-       // Check version, chain id
-       if err := sw.nodeInfo.CompatibleWith(peer.NodeInfo); err != nil {
+
+       peer := newPeer(pc, peerNodeInfo, sw.reactorsByCh, sw.chDescs, sw.StopPeerForError)
+
+       //filter peer
+       if err := sw.filterConnByPeer(peer); err != nil {
                return err
        }
+
        // Start peer
        if sw.IsRunning() {
                if err := sw.startInitPeer(peer); err != nil {
                        return err
                }
        }
+
        // Add the peer to .peers.
        // We start it first so that a peer in the list is safe to Stop.
        // It should not err since we already checked peers.Has()
@@ -266,11 +272,9 @@ func (sw *Switch) DialSeeds(seeds []string) error {
 }
 
 func (sw *Switch) dialSeed(addr *NetAddress) {
-       peer, err := sw.DialPeerWithAddress(addr)
+       err := sw.DialPeerWithAddress(addr)
        if err != nil {
-               log.WithField("error", err).Error("Error dialing seed")
-       } else {
-               log.WithField("peer", peer).Info("Connected to seed")
+               log.Info("Error dialing seed:", addr.String())
        }
 }
 
@@ -317,30 +321,30 @@ func (sw *Switch) filterConnByPeer(peer *Peer) error {
        return nil
 }
 
-func (sw *Switch) DialPeerWithAddress(addr *NetAddress) (*Peer, error) {
+func (sw *Switch) DialPeerWithAddress(addr *NetAddress) error {
        log.Debug("Dialing peer address:", addr)
 
        if err := sw.filterConnByIP(addr.IP.String()); err != nil {
-               return nil, err
+               return err
        }
 
        sw.dialing.Set(addr.IP.String(), addr)
        defer sw.dialing.Delete(addr.IP.String())
 
-       peer, err := newOutboundPeerWithConfig(addr, sw.reactorsByCh, sw.chDescs, sw.StopPeerForError, sw.nodePrivKey, sw.peerConfig)
+       pc, err := newOutboundPeerConn(addr, sw.reactorsByCh, sw.chDescs, sw.StopPeerForError, sw.nodePrivKey, sw.peerConfig)
        if err != nil {
                log.Debug("Failed to dial peer", " address:", addr, " error:", err)
-               return nil, err
+               return err
        }
 
-       err = sw.AddPeer(peer)
+       err = sw.AddPeer(pc)
        if err != nil {
                log.Info("Failed to add peer:", addr, " err:", err)
-               peer.CloseConn()
-               return nil, err
+               pc.CloseConn()
+               return err
        }
        log.Info("Dialed and added peer:", addr)
-       return peer, nil
+       return nil
 }
 
 func (sw *Switch) IsDialing(addr *NetAddress) bool {
@@ -411,12 +415,12 @@ func (sw *Switch) listenerRoutine(l Listener) {
 }
 
 func (sw *Switch) addPeerWithConnection(conn net.Conn) error {
-       peer, err := newInboundPeerConn(conn, sw.reactorsByCh, sw.chDescs, sw.StopPeerForError, sw.nodePrivKey, sw.config)
+       peerConn, err := newInboundPeerConn(conn, sw.reactorsByCh, sw.chDescs, sw.StopPeerForError, sw.nodePrivKey, sw.config)
        if err != nil {
                conn.Close()
                return err
        }
-       if err = sw.AddPeer(peer); err != nil {
+       if err = sw.AddPeer(peerConn); err != nil {
                conn.Close()
                return err
        }
@@ -424,7 +428,6 @@ func (sw *Switch) addPeerWithConnection(conn net.Conn) error {
        return nil
 }
 
-
 func (sw *Switch) AddBannedPeer(peer *Peer) error {
        sw.mtx.Lock()
        defer sw.mtx.Unlock()
@@ -460,5 +463,3 @@ func (sw *Switch) checkBannedPeer(peer string) error {
        }
        return nil
 }
-
-