OSDN Git Service

Merge pull request #970 from Bytom/p2p_test
authorPaladz <yzhu101@uottawa.ca>
Mon, 21 May 2018 07:22:34 +0000 (15:22 +0800)
committerGitHub <noreply@github.com>
Mon, 21 May 2018 07:22:34 +0000 (15:22 +0800)
Optimize p2p code

netsync/handle.go
p2p/base_reactor.go [new file with mode: 0644]
p2p/listener.go
p2p/netaddress.go
p2p/node_info.go [moved from p2p/types.go with 82% similarity]
p2p/peer.go
p2p/peer_set.go
p2p/pex_reactor.go
p2p/switch.go
p2p/test_util.go [new file with mode: 0644]

index d7d6e98..2fdc03b 100644 (file)
@@ -20,7 +20,6 @@ import (
 type SyncManager struct {
        networkID uint64
        sw        *p2p.Switch
-       addrBook  *p2p.AddrBook // known peers
 
        privKey     crypto.PrivKeyEd25519 // local node's p2p key
        chain       *core.Chain
@@ -68,18 +67,11 @@ func NewSyncManager(config *cfg.Config, chain *core.Chain, txPool *core.TxPool,
        var l p2p.Listener
        if !config.VaultMode {
                p, address := protocolAndAddress(manager.config.P2P.ListenAddress)
-               l, listenerStatus = p2p.NewDefaultListener(p, address, manager.config.P2P.SkipUPNP, nil)
+               l, listenerStatus = p2p.NewDefaultListener(p, address, manager.config.P2P.SkipUPNP)
                manager.sw.AddListener(l)
        }
        manager.sw.SetNodeInfo(manager.makeNodeInfo(listenerStatus))
        manager.sw.SetNodePrivKey(manager.privKey)
-       // Optionally, start the pex reactor
-       //var addrBook *p2p.AddrBook
-       if config.P2P.PexReactor {
-               manager.addrBook = p2p.NewAddrBook(config.P2P.AddrBookFile(), config.P2P.AddrBookStrict)
-               pexReactor := p2p.NewPEXReactor(manager.addrBook, manager.sw)
-               manager.sw.AddReactor("PEX", pexReactor)
-       }
 
        return manager, nil
 }
@@ -231,7 +223,7 @@ func (sm *SyncManager) Peers() *peerSet {
 
 //DialSeeds dial seed peers
 func (sm *SyncManager) DialSeeds(seeds []string) error {
-       return sm.sw.DialSeeds(sm.addrBook, seeds)
+       return sm.sw.DialSeeds(seeds)
 }
 
 //Switch get sync manager switch
diff --git a/p2p/base_reactor.go b/p2p/base_reactor.go
new file mode 100644 (file)
index 0000000..66b3cf1
--- /dev/null
@@ -0,0 +1,52 @@
+package p2p
+
+import (
+       cmn "github.com/tendermint/tmlibs/common"
+)
+
+type Reactor interface {
+       cmn.Service // Start, Stop
+
+       // SetSwitch allows setting a switch.
+       SetSwitch(*Switch)
+
+       // GetChannels returns the list of channel descriptors.
+       GetChannels() []*ChannelDescriptor
+
+       // AddPeer is called by the switch when a new peer is added.
+       AddPeer(peer *Peer) error
+
+       // RemovePeer is called by the switch when the peer is stopped (due to error
+       // or other reason).
+       RemovePeer(peer *Peer, reason interface{})
+
+       // Receive is called when msgBytes is received from peer.
+       //
+       // NOTE reactor can not keep msgBytes around after Receive completes without
+       // copying.
+       //
+       // CONTRACT: msgBytes are not nil.
+       Receive(chID byte, peer *Peer, msgBytes []byte)
+}
+
+//--------------------------------------
+
+type BaseReactor struct {
+       cmn.BaseService // Provides Start, Stop, .Quit
+       Switch          *Switch
+}
+
+func NewBaseReactor(name string, impl Reactor) *BaseReactor {
+       return &BaseReactor{
+               BaseService: *cmn.NewBaseService(nil, name, impl),
+               Switch:      nil,
+       }
+}
+
+func (br *BaseReactor) SetSwitch(sw *Switch) {
+       br.Switch = sw
+}
+func (*BaseReactor) GetChannels() []*ChannelDescriptor              { return nil }
+func (*BaseReactor) AddPeer(peer *Peer)                             {}
+func (*BaseReactor) RemovePeer(peer *Peer, reason interface{})      {}
+func (*BaseReactor) Receive(chID byte, peer *Peer, msgBytes []byte) {}
index 7ea574e..e022c34 100644 (file)
@@ -9,7 +9,6 @@ import (
        "github.com/bytom/p2p/upnp"
        log "github.com/sirupsen/logrus"
        cmn "github.com/tendermint/tmlibs/common"
-       tlog "github.com/tendermint/tmlibs/log"
 )
 
 type Listener interface {
@@ -49,7 +48,7 @@ func splitHostPort(addr string) (host string, port int) {
 }
 
 // skipUPNP: If true, does not try getUPNPExternalAddress()
-func NewDefaultListener(protocol string, lAddr string, skipUPNP bool, logger tlog.Logger) (Listener, bool) {
+func NewDefaultListener(protocol string, lAddr string, skipUPNP bool) (Listener, bool) {
        // Local listen IP & port
        lAddrIP, lAddrPort := splitHostPort(lAddr)
 
@@ -72,10 +71,7 @@ func NewDefaultListener(protocol string, lAddr string, skipUPNP bool, logger tlo
        }
        // Actual listener local IP & port
        listenerIP, listenerPort := splitHostPort(listener.Addr().String())
-       log.WithFields(log.Fields{
-               "ip":   listenerIP,
-               "port": listenerPort,
-       }).Info("Local listener")
+       log.Info("Local listener", " ip:", listenerIP, " port:", listenerPort)
 
        // Determine internal address...
        var intAddr *NetAddress
@@ -104,7 +100,7 @@ func NewDefaultListener(protocol string, lAddr string, skipUPNP bool, logger tlo
        }
        // Otherwise just use the local address...
        if extAddr == nil {
-               extAddr = getNaiveExternalAddress(listenerPort)
+               extAddr = getNaiveExternalAddress(listenerPort, false)
        }
        if extAddr == nil {
                cmn.PanicCrisis("Could not determine external address!")
@@ -116,7 +112,7 @@ func NewDefaultListener(protocol string, lAddr string, skipUPNP bool, logger tlo
                extAddr:     extAddr,
                connections: make(chan net.Conn, numBufferedConnections),
        }
-       dl.BaseService = *cmn.NewBaseService(logger, "DefaultListener", dl)
+       dl.BaseService = *cmn.NewBaseService(nil, "DefaultListener", dl)
        dl.Start() // Started upon construction
 
        if !listenerStatus && getExtIP {
@@ -203,13 +199,13 @@ func getUPNPExternalAddress(externalPort, internalPort int) *NetAddress {
        log.Info("Getting UPNP external address")
        nat, err := upnp.Discover()
        if err != nil {
-               log.WithField("error", err).Error("Could not perform UPNP discover")
+               log.Info("Could not perform UPNP discover. error:", err)
                return nil
        }
 
        ext, err := nat.GetExternalAddress()
        if err != nil {
-               log.WithField("error", err).Error("Could not perform UPNP external address")
+               log.Info("Could not perform UPNP external address. error:", err)
                return nil
        }
 
@@ -220,16 +216,15 @@ func getUPNPExternalAddress(externalPort, internalPort int) *NetAddress {
 
        externalPort, err = nat.AddPortMapping("tcp", externalPort, internalPort, "bytomd", 0)
        if err != nil {
-               log.WithField("error", err).Error("Could not add UPNP port mapping")
+               log.Info("Could not add UPNP port mapping. error:", err)
                return nil
        }
 
-       log.WithField("address", ext).Info("Got UPNP external address")
+       log.Info("Got UPNP external address ", ext)
        return NewNetAddressIPPort(ext, uint16(externalPort))
 }
 
-// TODO: use syscalls: http://pastebin.com/9exZG4rh
-func getNaiveExternalAddress(port int) *NetAddress {
+func getNaiveExternalAddress(port int, settleForLocal bool) *NetAddress {
        addrs, err := net.InterfaceAddrs()
        if err != nil {
                cmn.PanicCrisis(cmn.Fmt("Could not fetch interface addresses: %v", err))
@@ -241,10 +236,13 @@ func getNaiveExternalAddress(port int) *NetAddress {
                        continue
                }
                v4 := ipnet.IP.To4()
-               if v4 == nil || v4[0] == 127 {
+               if v4 == nil || (!settleForLocal && v4[0] == 127) {
                        continue
                } // loopback
                return NewNetAddressIPPort(ipnet.IP, uint16(port))
        }
-       return nil
+
+       // try again, but settle for local
+       log.Info("Node may not be connected to internet. Settling for local address")
+       return getNaiveExternalAddress(port, true)
 }
index 0978748..70fe3d2 100644 (file)
@@ -1,4 +1,4 @@
-// Modified for Tendermint
+// Modified for Bytom
 // Originally Copyright (c) 2013-2014 Conformal Systems LLC.
 // https://github.com/conformal/btcd/blob/master/LICENSE
 
@@ -45,7 +45,6 @@ func NewNetAddress(addr net.Addr) *NetAddress {
 // address in the form of "IP:Port". Also resolves the host if host
 // is not an IP.
 func NewNetAddressString(addr string) (*NetAddress, error) {
-
        host, portStr, err := net.SplitHostPort(addr)
        if err != nil {
                return nil, err
@@ -88,7 +87,7 @@ func NewNetAddressStrings(addrs []string) ([]*NetAddress, error) {
 // NewNetAddressIPPort returns a new NetAddress using the provided IP
 // and port number.
 func NewNetAddressIPPort(ip net.IP, port uint16) *NetAddress {
-       na := &NetAddress{
+       return &NetAddress{
                IP:   ip,
                Port: port,
                str: net.JoinHostPort(
@@ -96,7 +95,6 @@ func NewNetAddressIPPort(ip net.IP, port uint16) *NetAddress {
                        strconv.FormatUint(uint64(port), 10),
                ),
        }
-       return na
 }
 
 // Equals reports whether na and other are the same addresses.
@@ -104,16 +102,6 @@ func (na *NetAddress) Equals(other interface{}) bool {
        if o, ok := other.(*NetAddress); ok {
                return na.String() == o.String()
        }
-
-       return false
-}
-
-func (na *NetAddress) Less(other interface{}) bool {
-       if o, ok := other.(*NetAddress); ok {
-               return na.String() < o.String()
-       }
-
-       cmn.PanicSanity("Cannot compare unequal types")
        return false
 }
 
@@ -128,9 +116,16 @@ func (na *NetAddress) String() string {
        return na.str
 }
 
+func (na *NetAddress) DialString() string {
+       return net.JoinHostPort(
+               na.IP.String(),
+               strconv.FormatUint(uint64(na.Port), 10),
+       )
+}
+
 // Dial calls net.Dial on the address.
 func (na *NetAddress) Dial() (net.Conn, error) {
-       conn, err := net.Dial("tcp", na.String())
+       conn, err := net.Dial("tcp", na.DialString())
        if err != nil {
                return nil, err
        }
@@ -139,7 +134,7 @@ func (na *NetAddress) Dial() (net.Conn, error) {
 
 // DialTimeout calls net.DialTimeout on the address.
 func (na *NetAddress) DialTimeout(timeout time.Duration) (net.Conn, error) {
-       conn, err := net.DialTimeout("tcp", na.String(), timeout)
+       conn, err := net.DialTimeout("tcp", na.DialString(), timeout)
        if err != nil {
                return nil, err
        }
@@ -174,7 +169,6 @@ func (na *NetAddress) ReachabilityTo(o *NetAddress) int {
                Ipv6_weak
                Ipv4
                Ipv6_strong
-               Private
        )
        if !na.Routable() {
                return Unreachable
similarity index 82%
rename from p2p/types.go
rename to p2p/node_info.go
index 1d3770b..02245f0 100644 (file)
@@ -21,7 +21,9 @@ type NodeInfo struct {
        Other      []string             `json:"other"`   // other application specific data
 }
 
-// CONTRACT: two nodes are compatible if the major/minor versions match and network match
+// CompatibleWith checks if two NodeInfo are compatible with eachother.
+// CONTRACT: two nodes are compatible if the major version matches and network match
+// and they have at least one channel in common.
 func (info *NodeInfo) CompatibleWith(other *NodeInfo) error {
        iMajor, iMinor, _, iErr := splitVersion(info.Version)
        oMajor, oMinor, _, oErr := splitVersion(other.Version)
@@ -68,8 +70,13 @@ func (info *NodeInfo) ListenPort() int {
        return port_i
 }
 
+func (info *NodeInfo) RemoteAddrHost() string {
+       host, _, _ := net.SplitHostPort(info.RemoteAddr)
+       return host
+}
+
 func (info NodeInfo) String() string {
-       return fmt.Sprintf("NodeInfo{pk: %v, moniker: %v, network: %v [remote %v, listen %v], version: %v (%v)}", info.PubKey, info.Moniker, info.Network, info.RemoteAddr, info.ListenAddr, info.Version, info.Other)
+       return fmt.Sprintf("NodeInfo{pk: %v, moniker: %v, network: %v [listen %v], version: %v (%v)}", info.PubKey, info.Moniker, info.Network, info.ListenAddr, info.Version, info.Other)
 }
 
 func splitVersion(version string) (string, string, string, error) {
index 69861b9..2e14598 100644 (file)
@@ -19,18 +19,22 @@ import (
 // Redial function to reconnect. Note that inbound peers can't be
 // made persistent. They should be made persistent on the other end.
 //
+
+// peerConn contains the raw connection and its config.
+type peerConn struct {
+       outbound bool
+       config   *PeerConfig
+       conn     net.Conn // source connection
+}
+
 // Before using a peer, you will need to perform a handshake on connection.
 type Peer struct {
        cmn.BaseService
 
-       outbound bool
-
-       conn  net.Conn     // source connection
+       // raw peerConn and the multiplex connection
+       *peerConn
        mconn *MConnection // multiplex connection
 
-       persistent bool
-       config     *PeerConfig
-
        *NodeInfo
        Key  string
        Data *cmn.CMap // User data.
@@ -55,40 +59,52 @@ func DefaultPeerConfig(config *cfg.P2PConfig) *PeerConfig {
        return &PeerConfig{
                AuthEnc:          true,
                HandshakeTimeout: time.Duration(config.HandshakeTimeout), // * time.Second,
-               DialTimeout:      time.Duration(config.DialTimeout),  // * time.Second,
+               DialTimeout:      time.Duration(config.DialTimeout),      // * time.Second,
                MConfig:          DefaultMConnConfig(),
                Fuzz:             false,
                FuzzConfig:       DefaultFuzzConnConfig(),
        }
 }
 
-func newOutboundPeer(addr *NetAddress, reactorsByCh map[byte]Reactor, chDescs []*ChannelDescriptor, onPeerError func(*Peer, interface{}), ourNodePrivKey crypto.PrivKeyEd25519, config *cfg.P2PConfig) (*Peer, error) {
-       return newOutboundPeerWithConfig(addr, reactorsByCh, chDescs, onPeerError, ourNodePrivKey, DefaultPeerConfig(config))
+func newPeer(pc *peerConn, nodeInfo *NodeInfo, reactorsByCh map[byte]Reactor, chDescs []*ChannelDescriptor, onPeerError func(*Peer, interface{})) *Peer {
+       // Key and NodeInfo are set after Handshake
+       p := &Peer{
+               peerConn: pc,
+               NodeInfo: nodeInfo,
+
+               Data: cmn.NewCMap(),
+       }
+       p.Key = nodeInfo.PubKey.KeyString()
+       p.mconn = createMConnection(pc.conn, p, reactorsByCh, chDescs, onPeerError, pc.config.MConfig)
+
+       p.BaseService = *cmn.NewBaseService(nil, "Peer", p)
+       return p
+}
+
+func newOutboundPeer(addr *NetAddress, reactorsByCh map[byte]Reactor, chDescs []*ChannelDescriptor, onPeerError func(*Peer, interface{}), ourNodePrivKey crypto.PrivKeyEd25519, config *cfg.P2PConfig) (*peerConn, error) {
+       return newOutboundPeerConn(addr, reactorsByCh, chDescs, onPeerError, ourNodePrivKey, DefaultPeerConfig(config))
 }
 
-func newOutboundPeerWithConfig(addr *NetAddress, reactorsByCh map[byte]Reactor, chDescs []*ChannelDescriptor, onPeerError func(*Peer, interface{}), ourNodePrivKey crypto.PrivKeyEd25519, config *PeerConfig) (*Peer, error) {
+func newOutboundPeerConn(addr *NetAddress, reactorsByCh map[byte]Reactor, chDescs []*ChannelDescriptor, onPeerError func(*Peer, interface{}), ourNodePrivKey crypto.PrivKeyEd25519, config *PeerConfig) (*peerConn, error) {
        conn, err := dial(addr, config)
        if err != nil {
-               return nil, errors.Wrap(err, "Error creating peer")
+               return nil, errors.Wrap(err, "Error dial peer")
        }
 
-       peer, err := newPeerFromConnAndConfig(conn, true, reactorsByCh, chDescs, onPeerError, ourNodePrivKey, config)
+       pc, err := newPeerConn(conn, true, reactorsByCh, chDescs, onPeerError, ourNodePrivKey, config)
        if err != nil {
                conn.Close()
                return nil, err
        }
-       return peer, nil
-}
 
-func newInboundPeer(conn net.Conn, reactorsByCh map[byte]Reactor, chDescs []*ChannelDescriptor, onPeerError func(*Peer, interface{}), ourNodePrivKey crypto.PrivKeyEd25519, config *cfg.P2PConfig) (*Peer, error) {
-       return newInboundPeerWithConfig(conn, reactorsByCh, chDescs, onPeerError, ourNodePrivKey, DefaultPeerConfig(config))
+       return pc, nil
 }
 
-func newInboundPeerWithConfig(conn net.Conn, reactorsByCh map[byte]Reactor, chDescs []*ChannelDescriptor, onPeerError func(*Peer, interface{}), ourNodePrivKey crypto.PrivKeyEd25519, config *PeerConfig) (*Peer, error) {
-       return newPeerFromConnAndConfig(conn, false, reactorsByCh, chDescs, onPeerError, ourNodePrivKey, config)
+func newInboundPeerConn(conn net.Conn, reactorsByCh map[byte]Reactor, chDescs []*ChannelDescriptor, onPeerError func(*Peer, interface{}), ourNodePrivKey crypto.PrivKeyEd25519, config *cfg.P2PConfig) (*peerConn, error) {
+       return newPeerConn(conn, false, reactorsByCh, chDescs, onPeerError, ourNodePrivKey, DefaultPeerConfig(config))
 }
 
-func newPeerFromConnAndConfig(rawConn net.Conn, outbound bool, reactorsByCh map[byte]Reactor, chDescs []*ChannelDescriptor, onPeerError func(*Peer, interface{}), ourNodePrivKey crypto.PrivKeyEd25519, config *PeerConfig) (*Peer, error) {
+func newPeerConn(rawConn net.Conn, outbound bool, reactorsByCh map[byte]Reactor, chDescs []*ChannelDescriptor, onPeerError func(*Peer, interface{}), ourNodePrivKey crypto.PrivKeyEd25519, config *PeerConfig) (*peerConn, error) {
        conn := rawConn
 
        // Fuzz connection
@@ -108,45 +124,24 @@ func newPeerFromConnAndConfig(rawConn net.Conn, outbound bool, reactorsByCh map[
                }
        }
 
-       // Key and NodeInfo are set after Handshake
-       p := &Peer{
+       // Only the information we already have
+       return &peerConn{
+               config:   config,
                outbound: outbound,
                conn:     conn,
-               config:   config,
-               Data:     cmn.NewCMap(),
-       }
-
-       p.mconn = createMConnection(conn, p, reactorsByCh, chDescs, onPeerError, config.MConfig)
-
-       p.BaseService = *cmn.NewBaseService(nil, "Peer", p)
-
-       return p, nil
+       }, nil
 }
 
 // CloseConn should be used when the peer was created, but never started.
-func (p *Peer) CloseConn() {
-       p.conn.Close()
-}
-
-// makePersistent marks the peer as persistent.
-func (p *Peer) makePersistent() {
-       if !p.outbound {
-               panic("inbound peers can't be made persistent")
-       }
-
-       p.persistent = true
-}
-
-// IsPersistent returns true if the peer is persitent, false otherwise.
-func (p *Peer) IsPersistent() bool {
-       return p.persistent
+func (pc *peerConn) CloseConn() {
+       pc.conn.Close()
 }
 
 // HandshakeTimeout performs a handshake between a given node and the peer.
 // NOTE: blocking
-func (p *Peer) HandshakeTimeout(ourNodeInfo *NodeInfo, timeout time.Duration) error {
+func (pc *peerConn) HandshakeTimeout(ourNodeInfo *NodeInfo, timeout time.Duration) (*NodeInfo, error) {
        // Set deadline for handshake so we don't block forever on conn.ReadFull
-       p.conn.SetDeadline(time.Now().Add(timeout))
+       pc.conn.SetDeadline(time.Now().Add(timeout))
 
        var peerNodeInfo = new(NodeInfo)
        var err1 error
@@ -154,37 +149,24 @@ func (p *Peer) HandshakeTimeout(ourNodeInfo *NodeInfo, timeout time.Duration) er
        cmn.Parallel(
                func() {
                        var n int
-                       wire.WriteBinary(ourNodeInfo, p.conn, &n, &err1)
+                       wire.WriteBinary(ourNodeInfo, pc.conn, &n, &err1)
                },
                func() {
                        var n int
-                       wire.ReadBinary(peerNodeInfo, p.conn, maxNodeInfoSize, &n, &err2)
+                       wire.ReadBinary(peerNodeInfo, pc.conn, maxNodeInfoSize, &n, &err2)
                        log.WithField("peerNodeInfo", peerNodeInfo).Info("Peer handshake")
                })
        if err1 != nil {
-               return errors.Wrap(err1, "Error during handshake/write")
+               return peerNodeInfo, errors.Wrap(err1, "Error during handshake/write")
        }
        if err2 != nil {
-               return errors.Wrap(err2, "Error during handshake/read")
-       }
-
-       if p.config.AuthEnc {
-               // Check that the professed PubKey matches the sconn's.
-               if !peerNodeInfo.PubKey.Equals(p.PubKey().Wrap()) {
-                       return fmt.Errorf("Ignoring connection with unmatching pubkey: %v vs %v",
-                               peerNodeInfo.PubKey, p.PubKey())
-               }
+               return peerNodeInfo, errors.Wrap(err2, "Error during handshake/read")
        }
 
        // Remove deadline
-       p.conn.SetDeadline(time.Time{})
-
-       peerNodeInfo.RemoteAddr = p.Addr().String()
-
-       p.NodeInfo = peerNodeInfo
-       p.Key = peerNodeInfo.PubKey.KeyString()
+       pc.conn.SetDeadline(time.Time{})
 
-       return nil
+       return peerNodeInfo, nil
 }
 
 // Addr returns peer's remote network address.
@@ -293,11 +275,7 @@ func createMConnection(conn net.Conn, p *Peer, reactorsByCh map[byte]Reactor, ch
        onReceive := func(chID byte, msgBytes []byte) {
                reactor := reactorsByCh[chID]
                if reactor == nil {
-                       if chID == PexChannel {
-                               return
-                       } else {
-                               cmn.PanicSanity(cmn.Fmt("Unknown channel %X", chID))
-                       }
+                       cmn.PanicSanity(cmn.Fmt("Unknown channel %X", chID))
                }
                reactor.Receive(chID, p, msgBytes)
        }
index c5206d2..8350c7a 100644 (file)
@@ -27,6 +27,7 @@ type peerSetItem struct {
        index int
 }
 
+// NewPeerSet creates a new peerSet with a list of initial capacity of 256 items.
 func NewPeerSet() *PeerSet {
        return &PeerSet{
                lookup: make(map[string]*peerSetItem),
@@ -34,12 +35,13 @@ func NewPeerSet() *PeerSet {
        }
 }
 
+// Add adds the peer to the PeerSet.
 // Returns false if peer with key (PubKeyEd25519) is already set
 func (ps *PeerSet) Add(peer *Peer) error {
        ps.mtx.Lock()
        defer ps.mtx.Unlock()
        if ps.lookup[peer.Key] != nil {
-               return ErrSwitchDuplicatePeer
+               return ErrDuplicatePeer
        }
 
        index := len(ps.list)
@@ -50,6 +52,8 @@ func (ps *PeerSet) Add(peer *Peer) error {
        return nil
 }
 
+// Has returns true if the PeerSet contains
+// the peer referred to by this peerKey.
 func (ps *PeerSet) Has(peerKey string) bool {
        ps.mtx.Lock()
        defer ps.mtx.Unlock()
@@ -57,17 +61,18 @@ func (ps *PeerSet) Has(peerKey string) bool {
        return ok
 }
 
+// Get looks up a peer by the provided peerKey.
 func (ps *PeerSet) Get(peerKey string) *Peer {
        ps.mtx.Lock()
        defer ps.mtx.Unlock()
        item, ok := ps.lookup[peerKey]
        if ok {
                return item.peer
-       } else {
-               return nil
        }
+       return nil
 }
 
+// Remove discards peer if the peer was previously memoized.
 func (ps *PeerSet) Remove(peer *Peer) {
        ps.mtx.Lock()
        defer ps.mtx.Unlock()
@@ -99,6 +104,7 @@ func (ps *PeerSet) Remove(peer *Peer) {
 
 }
 
+// Size returns the number of unique items in the peerSet.
 func (ps *PeerSet) Size() int {
        ps.mtx.Lock()
        defer ps.mtx.Unlock()
index 8206e5f..395f199 100644 (file)
@@ -351,7 +351,7 @@ func (r *PEXReactor) ensurePeers() {
 }
 
 func (r *PEXReactor) dialPeerWorker(a *NetAddress, wg *sync.WaitGroup) {
-       if _, err := r.Switch.DialPeerWithAddress(a, false); err != nil {
+       if err := r.Switch.DialPeerWithAddress(a); err != nil {
                r.book.MarkAttempt(a)
        } else {
                r.book.MarkGood(a)
index 7dfc58d..74f633d 100644 (file)
@@ -5,7 +5,6 @@ import (
        "fmt"
        "math/rand"
        "net"
-       "strings"
        "sync"
        "time"
 
@@ -20,55 +19,22 @@ import (
 )
 
 const (
-       reconnectAttempts = 5
-       reconnectInterval = 10 * time.Second
-
        bannedPeerKey      = "BannedPeer"
        defaultBanDuration = time.Hour * 1
 )
 
-var ErrConnectBannedPeer = errors.New("Connect banned peer")
-
-type Reactor interface {
-       cmn.Service // Start, Stop
-
-       SetSwitch(*Switch)
-       GetChannels() []*ChannelDescriptor
-       AddPeer(peer *Peer) error
-       RemovePeer(peer *Peer, reason interface{})
-       Receive(chID byte, peer *Peer, msgBytes []byte)
-}
-
-//--------------------------------------
-
-type BaseReactor struct {
-       cmn.BaseService // Provides Start, Stop, .Quit
-       Switch          *Switch
-}
-
-func NewBaseReactor(name string, impl Reactor) *BaseReactor {
-       return &BaseReactor{
-               BaseService: *cmn.NewBaseService(nil, name, impl),
-               Switch:      nil,
-       }
-}
-
-func (br *BaseReactor) SetSwitch(sw *Switch) {
-       br.Switch = sw
-}
-func (_ *BaseReactor) GetChannels() []*ChannelDescriptor              { return nil }
-func (_ *BaseReactor) AddPeer(peer *Peer)                             {}
-func (_ *BaseReactor) RemovePeer(peer *Peer, reason interface{})      {}
-func (_ *BaseReactor) Receive(chID byte, peer *Peer, msgBytes []byte) {}
+var (
+       ErrDuplicatePeer     = errors.New("Duplicate peer")
+       ErrConnectSelf       = errors.New("Connect self")
+       ErrConnectBannedPeer = errors.New("Connect banned peer")
+)
 
 //-----------------------------------------------------------------------------
 
-/*
-The `Switch` handles peer connections and exposes an API to receive incoming messages
-on `Reactors`.  Each `Reactor` is responsible for handling incoming messages of one
-or more `Channels`.  So while sending outgoing messages is typically performed on the peer,
-incoming messages are received on the reactor.
-*/
+// Switch handles peer connections and exposes an API to receive incoming messages
+// on `Reactors`.  Each `Reactor` is responsible for handling incoming messages of one
+// or more `Channels`.  So while sending outgoing messages is typically performed on the peer,
+// incoming messages are received on the reactor.
 type Switch struct {
        cmn.BaseService
 
@@ -82,20 +48,13 @@ type Switch struct {
        dialing      *cmn.CMap
        nodeInfo     *NodeInfo             // our node info
        nodePrivKey  crypto.PrivKeyEd25519 // our node privkey
+       addrBook     *AddrBook
        bannedPeer   map[string]time.Time
        db           dbm.DB
        mtx          sync.Mutex
-
-       filterConnByAddr   func(net.Addr) error
-       filterConnByPubKey func(crypto.PubKeyEd25519) error
 }
 
-var (
-       ErrSwitchDuplicatePeer = errors.New("Duplicate peer")
-       ErrConnectSelf         = errors.New("Connect self")
-       ErrPeerConnected       = errors.New("Peer is connected")
-)
-
+// NewSwitch creates a new Switch with the given config.
 func NewSwitch(config *cfg.P2PConfig, trustHistoryDB dbm.DB) *Switch {
        sw := &Switch{
                config:       config,
@@ -110,6 +69,13 @@ func NewSwitch(config *cfg.P2PConfig, trustHistoryDB dbm.DB) *Switch {
        }
        sw.BaseService = *cmn.NewBaseService(nil, "P2P Switch", sw)
 
+       // Optionally, start the pex reactor
+       if config.PexReactor {
+               sw.addrBook = NewAddrBook(config.AddrBookFile(), config.AddrBookStrict)
+               pexReactor := NewPEXReactor(sw.addrBook, sw)
+               sw.AddReactor("PEX", pexReactor)
+       }
+
        sw.bannedPeer = make(map[string]time.Time)
        if datajson := sw.db.Get([]byte(bannedPeerKey)); datajson != nil {
                if err := json.Unmarshal(datajson, &sw.bannedPeer); err != nil {
@@ -120,7 +86,8 @@ func NewSwitch(config *cfg.P2PConfig, trustHistoryDB dbm.DB) *Switch {
        return sw
 }
 
-// Not goroutine safe.
+// AddReactor adds the given reactor to the switch.
+// NOTE: Not goroutine safe.
 func (sw *Switch) AddReactor(name string, reactor Reactor) Reactor {
        // Validate the reactor.
        // No two reactors can share the same channel.
@@ -138,43 +105,50 @@ func (sw *Switch) AddReactor(name string, reactor Reactor) Reactor {
        return reactor
 }
 
-// Not goroutine safe.
+// Reactors returns a map of reactors registered on the switch.
+// NOTE: Not goroutine safe.
 func (sw *Switch) Reactors() map[string]Reactor {
        return sw.reactors
 }
 
-// Not goroutine safe.
+// Reactor returns the reactor with the given name.
+// NOTE: Not goroutine safe.
 func (sw *Switch) Reactor(name string) Reactor {
        return sw.reactors[name]
 }
 
-// Not goroutine safe.
+// AddListener adds the given listener to the switch for listening to incoming peer connections.
+// NOTE: Not goroutine safe.
 func (sw *Switch) AddListener(l Listener) {
        sw.listeners = append(sw.listeners, l)
 }
 
-// Not goroutine safe.
+// Listeners returns the list of listeners the switch listens on.
+// NOTE: Not goroutine safe.
 func (sw *Switch) Listeners() []Listener {
        return sw.listeners
 }
 
-// Not goroutine safe.
+// IsListening returns true if the switch has at least one listener.
+// NOTE: Not goroutine safe.
 func (sw *Switch) IsListening() bool {
        return len(sw.listeners) > 0
 }
 
-// Not goroutine safe.
+// SetNodeInfo sets the switch's NodeInfo for checking compatibility and handshaking with other nodes.
+// NOTE: Not goroutine safe.
 func (sw *Switch) SetNodeInfo(nodeInfo *NodeInfo) {
        sw.nodeInfo = nodeInfo
 }
 
-// Not goroutine safe.
+// NodeInfo returns the switch's NodeInfo.
+// NOTE: Not goroutine safe.
 func (sw *Switch) NodeInfo() *NodeInfo {
        return sw.nodeInfo
 }
 
-// Not goroutine safe.
-// NOTE: Overwrites sw.nodeInfo.PubKey
+// SetNodeKey sets the switch's private key for authenticated encryption.
+// NOTE: Not goroutine safe.
 func (sw *Switch) SetNodePrivKey(nodePrivKey crypto.PrivKeyEd25519) {
        sw.nodePrivKey = nodePrivKey
        if sw.nodeInfo != nil {
@@ -182,9 +156,8 @@ func (sw *Switch) SetNodePrivKey(nodePrivKey crypto.PrivKeyEd25519) {
        }
 }
 
-// Switch.Start() starts all the reactors, peers, and listeners.
+// OnStart implements BaseService. It starts all the reactors, peers, and listeners.
 func (sw *Switch) OnStart() error {
-       sw.BaseService.OnStart()
        // Start reactors
        for _, reactor := range sw.reactors {
                _, err := reactor.Start()
@@ -199,8 +172,8 @@ func (sw *Switch) OnStart() error {
        return nil
 }
 
+// OnStop implements BaseService. It stops all listeners, peers, and reactors.
 func (sw *Switch) OnStop() {
-       sw.BaseService.OnStop()
        // Stop listeners
        for _, listener := range sw.listeners {
                listener.Stop()
@@ -217,39 +190,26 @@ func (sw *Switch) OnStop() {
        }
 }
 
+// addPeer performs the P2P handshake with a peer
+// that already has a SecretConnection. If all goes well,
+// it starts the peer and adds it to the switch.
 // NOTE: This performs a blocking handshake before the peer is added.
 // CONTRACT: If error is returned, peer is nil, and conn is immediately closed.
-func (sw *Switch) AddPeer(peer *Peer) error {
-       if err := sw.FilterConnByAddr(peer.Addr()); err != nil {
-               return err
-       }
-
-       if err := sw.FilterConnByPubKey(peer.PubKey()); err != nil {
-               return err
-       }
-
-       if err := peer.HandshakeTimeout(sw.nodeInfo, time.Duration(sw.peerConfig.HandshakeTimeout*time.Second)); err != nil {
-               return err
-       }
-
-       if err := sw.checkBannedPeer(peer.NodeInfo.ListenHost()); err != nil {
-               return err
-       }
-
-       // Avoid self
-       if sw.nodeInfo.PubKey.Equals(peer.PubKey().Wrap()) {
-               return errors.New("Ignoring connection from self")
+func (sw *Switch) AddPeer(pc *peerConn) error {
+       peerNodeInfo, err := pc.HandshakeTimeout(sw.nodeInfo, time.Duration(sw.peerConfig.HandshakeTimeout*time.Second))
+       if err != nil {
+               return ErrConnectBannedPeer
        }
-
        // Check version, chain id
-       if err := sw.nodeInfo.CompatibleWith(peer.NodeInfo); err != nil {
+       if err := sw.nodeInfo.CompatibleWith(peerNodeInfo); err != nil {
                return err
        }
 
-       // Check for duplicate peer
-       if sw.peers.Has(peer.Key) {
-               return ErrSwitchDuplicatePeer
+       peer := newPeer(pc, peerNodeInfo, sw.reactorsByCh, sw.chDescs, sw.StopPeerForError)
 
+       //filter peer
+       if err := sw.filterConnByPeer(peer); err != nil {
+               return err
        }
 
        // Start peer
@@ -266,31 +226,8 @@ func (sw *Switch) AddPeer(peer *Peer) error {
                return err
        }
 
-       log.WithField("peer", peer).Info("Added peer")
-       return nil
-}
-
-func (sw *Switch) FilterConnByAddr(addr net.Addr) error {
-       if sw.filterConnByAddr != nil {
-               return sw.filterConnByAddr(addr)
-       }
-       return nil
-}
-
-func (sw *Switch) FilterConnByPubKey(pubkey crypto.PubKeyEd25519) error {
-       if sw.filterConnByPubKey != nil {
-               return sw.filterConnByPubKey(pubkey)
-       }
+       log.Info("Added peer:", peer)
        return nil
-
-}
-
-func (sw *Switch) SetAddrFilter(f func(net.Addr) error) {
-       sw.filterConnByAddr = f
-}
-
-func (sw *Switch) SetPubKeyFilter(f func(crypto.PubKeyEd25519) error) {
-       sw.filterConnByPubKey = f
 }
 
 func (sw *Switch) startInitPeer(peer *Peer) error {
@@ -304,27 +241,26 @@ func (sw *Switch) startInitPeer(peer *Peer) error {
 }
 
 // Dial a list of seeds asynchronously in random order
-func (sw *Switch) DialSeeds(addrBook *AddrBook, seeds []string) error {
-
+func (sw *Switch) DialSeeds(seeds []string) error {
        netAddrs, err := NewNetAddressStrings(seeds)
        if err != nil {
                return err
        }
 
-       if addrBook != nil {
+       if sw.addrBook != nil {
                // add seeds to `addrBook`
-               ourAddrS := sw.nodeInfo.ListenAddr
-               ourAddr, _ := NewNetAddressString(ourAddrS)
+               ourAddr, _ := NewNetAddressString(sw.nodeInfo.ListenAddr)
                for _, netAddr := range netAddrs {
                        // do not add ourselves
                        if netAddr.Equals(ourAddr) {
                                continue
                        }
-                       addrBook.AddAddress(netAddr, ourAddr)
+                       sw.addrBook.AddAddress(netAddr, ourAddr)
                }
 
-               addrBook.Save()
+               sw.addrBook.Save()
        }
+
        //permute the list, dial them in random order.
        perm := rand.Perm(len(netAddrs))
        for i := 0; i < len(perm); i += 2 {
@@ -336,77 +272,85 @@ func (sw *Switch) DialSeeds(addrBook *AddrBook, seeds []string) error {
 }
 
 func (sw *Switch) dialSeed(addr *NetAddress) {
-       peer, err := sw.DialPeerWithAddress(addr, false)
+       err := sw.DialPeerWithAddress(addr)
        if err != nil {
-               log.WithField("error", err).Error("Error dialing seed")
-       } else {
-               log.WithField("peer", peer).Info("Connected to seed")
+               log.Info("Error dialing seed:", addr.String())
        }
 }
 
-func (sw *Switch) DialPeerWithAddress(addr *NetAddress, persistent bool) (*Peer, error) {
-       if err := sw.checkBannedPeer(addr.IP.String()); err != nil {
-               return nil, err
+func (sw *Switch) addrBookDelSelf() error {
+       addr, err := NewNetAddressString(sw.nodeInfo.ListenAddr)
+       if err != nil {
+               return err
        }
-       if strings.Compare(addr.IP.String(), sw.nodeInfo.ListenHost()) == 0 {
-               return nil, ErrConnectSelf
+       // remove the given address from the address book if we're added it earlier
+       sw.addrBook.RemoveAddress(addr)
+       // add the given address to the address book to avoid dialing ourselves
+       // again this is our public address
+       sw.addrBook.AddOurAddress(addr)
+       return nil
+}
+
+func (sw *Switch) filterConnByIP(ip string) error {
+       if err := sw.checkBannedPeer(ip); err != nil {
+               return ErrConnectBannedPeer
        }
-       for _, v := range sw.Peers().list {
-               if strings.Compare(v.mconn.RemoteAddress.IP.String(), addr.IP.String()) == 0 {
-                       return nil, ErrPeerConnected
-               }
+
+       if ip == sw.nodeInfo.ListenHost() {
+               sw.addrBookDelSelf()
+               return ErrConnectSelf
+       }
+
+       return nil
+}
+
+func (sw *Switch) filterConnByPeer(peer *Peer) error {
+       if err := sw.checkBannedPeer(peer.NodeInfo.RemoteAddrHost()); err != nil {
+               return ErrConnectBannedPeer
+       }
+
+       if sw.nodeInfo.PubKey.Equals(peer.PubKey().Wrap()) {
+               sw.addrBookDelSelf()
+               return ErrConnectSelf
+       }
+
+       // Check for duplicate peer
+       if sw.peers.Has(peer.Key) {
+               return ErrDuplicatePeer
+       }
+       return nil
+}
+
+func (sw *Switch) DialPeerWithAddress(addr *NetAddress) error {
+       log.Debug("Dialing peer address:", addr)
+
+       if err := sw.filterConnByIP(addr.IP.String()); err != nil {
+               return err
        }
+
        sw.dialing.Set(addr.IP.String(), addr)
        defer sw.dialing.Delete(addr.IP.String())
 
-       log.Debug("Dialing peer address:", addr)
-       peer, err := newOutboundPeerWithConfig(addr, sw.reactorsByCh, sw.chDescs, sw.StopPeerForError, sw.nodePrivKey, sw.peerConfig)
+       pc, err := newOutboundPeerConn(addr, sw.reactorsByCh, sw.chDescs, sw.StopPeerForError, sw.nodePrivKey, sw.peerConfig)
        if err != nil {
                log.Debug("Failed to dial peer", " address:", addr, " error:", err)
-               return nil, err
-       }
-       peer.SetLogger(sw.Logger.With("peer", addr))
-       if persistent {
-               peer.makePersistent()
+               return err
        }
-       err = sw.AddPeer(peer)
+
+       err = sw.AddPeer(pc)
        if err != nil {
-               log.WithFields(log.Fields{
-                       "address": addr,
-                       "error":   err,
-               }).Info("Failed to add peer")
-               peer.CloseConn()
-               return nil, err
+               log.Info("Failed to add peer:", addr, " err:", err)
+               pc.CloseConn()
+               return err
        }
-       log.WithFields(log.Fields{
-               "address": addr,
-       }).Info("Dialed and added peer")
-       return peer, nil
+       log.Info("Dialed and added peer:", addr)
+       return nil
 }
 
 func (sw *Switch) IsDialing(addr *NetAddress) bool {
        return sw.dialing.Has(addr.IP.String())
 }
 
-// Broadcast runs a go routine for each attempted send, which will block
-// trying to send for defaultSendTimeoutSeconds. Returns a channel
-// which receives success values for each attempted send (false if times out)
-// NOTE: Broadcast uses goroutines, so order of broadcast may not be preserved.
-func (sw *Switch) Broadcast(chID byte, msg interface{}) chan bool {
-       successChan := make(chan bool, len(sw.peers.List()))
-       log.WithFields(log.Fields{
-               "chID": chID,
-               "msg":  msg,
-       }).Debug("Broadcast")
-       for _, peer := range sw.peers.List() {
-               go func(peer *Peer) {
-                       success := peer.Send(chID, msg)
-                       successChan <- success
-               }(peer)
-       }
-       return successChan
-}
-
 // Returns the count of outbound/inbound and outbound-dialing peers.
 func (sw *Switch) NumPeers() (outbound, inbound, dialing int) {
        peers := sw.peers.List()
@@ -425,54 +369,13 @@ func (sw *Switch) Peers() *PeerSet {
        return sw.peers
 }
 
-// Disconnect from a peer due to external error, retry if it is a persistent peer.
-// TODO: make record depending on reason.
+// StopPeerForError disconnects from a peer due to external error.
 func (sw *Switch) StopPeerForError(peer *Peer, reason interface{}) {
-       addr := NewNetAddress(peer.Addr())
-       log.WithFields(log.Fields{
-               "peer":  peer,
-               "error": reason,
-       }).Info("Stopping peer due to error")
+       log.Info("Stopping peer for error.", " peer:", peer, " err:", reason)
        sw.stopAndRemovePeer(peer, reason)
-
-       if peer.IsPersistent() {
-               log.WithField("peer", peer).Info("Reconnecting to peer")
-               for i := 1; i < reconnectAttempts; i++ {
-                       if !sw.IsRunning() {
-                               return
-                       }
-
-                       peer, err := sw.DialPeerWithAddress(addr, false)
-                       if err != nil {
-                               if i == reconnectAttempts {
-                                       log.WithFields(log.Fields{
-                                               "retries": i,
-                                               "error":   err,
-                                       }).Info("Error reconnecting to peer. Giving up")
-                                       return
-                               }
-
-                               if errors.Root(err) == ErrConnectBannedPeer || errors.Root(err) == ErrPeerConnected || errors.Root(err) == ErrSwitchDuplicatePeer || errors.Root(err) == ErrConnectSelf {
-                                       log.WithField("error", err).Info("Error reconnecting to peer. ")
-                                       return
-                               }
-
-                               log.WithFields(log.Fields{
-                                       "retries": i,
-                                       "error":   err,
-                               }).Info("Error reconnecting to peer. Trying again")
-                               time.Sleep(reconnectInterval)
-                               continue
-                       }
-
-                       log.WithField("peer", peer).Info("Reconnected to peer")
-                       return
-               }
-       }
 }
 
 // Disconnect from a peer gracefully.
-// TODO: handle graceful disconnects.
 func (sw *Switch) StopPeerGracefully(peer *Peer) {
        log.Info("Stopping peer gracefully")
        sw.stopAndRemovePeer(peer, nil)
@@ -483,9 +386,7 @@ func (sw *Switch) stopAndRemovePeer(peer *Peer, reason interface{}) {
                reactor.RemovePeer(peer, reason)
        }
        sw.peers.Remove(peer)
-       log.Info("Del peer from switch.")
        peer.Stop()
-       log.Info("Peer connection is closed.")
 }
 
 func (sw *Switch) listenerRoutine(l Listener) {
@@ -498,136 +399,28 @@ func (sw *Switch) listenerRoutine(l Listener) {
                // disconnect if we alrady have 2 * MaxNumPeers, we do this because we wanna address book get exchanged even if
                // the connect is full. The pex will disconnect the peer after address exchange, the max connected peer won't
                // be double of MaxNumPeers
-               if sw.config.MaxNumPeers*2 <= sw.peers.Size() {
-                       // close inConn
+               if sw.peers.Size() >= sw.config.MaxNumPeers*2 {
                        inConn.Close()
-                       log.WithFields(log.Fields{
-                               "address":  inConn.RemoteAddr().String(),
-                               "numPeers": sw.peers.Size(),
-                       }).Info("Ignoring inbound connection: already have enough peers")
+                       log.Info("Ignoring inbound connection: already have enough peers.")
                        continue
                }
 
                // New inbound connection!
-               err := sw.addPeerWithConnectionAndConfig(inConn, sw.peerConfig)
+               err := sw.addPeerWithConnection(inConn)
                if err != nil {
-                       // conn close for returing err
-                       inConn.Close()
-                       log.WithFields(log.Fields{
-                               "address": inConn.RemoteAddr().String(),
-                               "error":   err,
-                       }).Info("Ignoring inbound connection: error while adding peer")
+                       log.Info("Ignoring inbound connection: error while adding peer.", " address:", inConn.RemoteAddr().String(), " error:", err)
                        continue
                }
-
-               // NOTE: We don't yet have the listening port of the
-               // remote (if they have a listener at all).
-               // The peerHandshake will handle that
-       }
-
-       // cleanup
-}
-
-//-----------------------------------------------------------------------------
-
-type SwitchEventNewPeer struct {
-       Peer *Peer
-}
-
-type SwitchEventDonePeer struct {
-       Peer  *Peer
-       Error interface{}
-}
-
-//------------------------------------------------------------------
-// Switches connected via arbitrary net.Conn; useful for testing
-
-// Returns n switches, connected according to the connect func.
-// If connect==Connect2Switches, the switches will be fully connected.
-// initSwitch defines how the ith switch should be initialized (ie. with what reactors).
-// NOTE: panics if any switch fails to start.
-func MakeConnectedSwitches(cfg *cfg.P2PConfig, n int, initSwitch func(int, *Switch) *Switch, connect func([]*Switch, int, int)) []*Switch {
-       switches := make([]*Switch, n)
-       for i := 0; i < n; i++ {
-               switches[i] = makeSwitch(cfg, i, "testing", "123.123.123", initSwitch)
-       }
-
-       if err := StartSwitches(switches); err != nil {
-               panic(err)
-       }
-
-       for i := 0; i < n; i++ {
-               for j := i; j < n; j++ {
-                       connect(switches, i, j)
-               }
        }
-
-       return switches
-}
-
-var PanicOnAddPeerErr = false
-
-// Will connect switches i and j via net.Pipe()
-// Blocks until a conection is established.
-// NOTE: caller ensures i and j are within bounds
-func Connect2Switches(switches []*Switch, i, j int) {
-       switchI := switches[i]
-       switchJ := switches[j]
-       c1, c2 := net.Pipe()
-       doneCh := make(chan struct{})
-       go func() {
-               err := switchI.addPeerWithConnection(c1)
-               if PanicOnAddPeerErr && err != nil {
-                       panic(err)
-               }
-               doneCh <- struct{}{}
-       }()
-       go func() {
-               err := switchJ.addPeerWithConnection(c2)
-               if PanicOnAddPeerErr && err != nil {
-                       panic(err)
-               }
-               doneCh <- struct{}{}
-       }()
-       <-doneCh
-       <-doneCh
-}
-
-func StartSwitches(switches []*Switch) error {
-       for _, s := range switches {
-               _, err := s.Start() // start switch and reactors
-               if err != nil {
-                       return err
-               }
-       }
-       return nil
-}
-
-func makeSwitch(cfg *cfg.P2PConfig, i int, network, version string, initSwitch func(int, *Switch) *Switch) *Switch {
-       privKey := crypto.GenPrivKeyEd25519()
-       // new switch, add reactors
-       // TODO: let the config be passed in?
-       s := initSwitch(i, NewSwitch(cfg, nil))
-       s.SetNodeInfo(&NodeInfo{
-               PubKey:     privKey.PubKey().Unwrap().(crypto.PubKeyEd25519),
-               Moniker:    cmn.Fmt("switch%d", i),
-               Network:    network,
-               Version:    version,
-               RemoteAddr: cmn.Fmt("%v:%v", network, rand.Intn(64512)+1023),
-               ListenAddr: cmn.Fmt("%v:%v", network, rand.Intn(64512)+1023),
-       })
-       s.SetNodePrivKey(privKey)
-       return s
 }
 
 func (sw *Switch) addPeerWithConnection(conn net.Conn) error {
-       peer, err := newInboundPeer(conn, sw.reactorsByCh, sw.chDescs, sw.StopPeerForError, sw.nodePrivKey, sw.config)
+       peerConn, err := newInboundPeerConn(conn, sw.reactorsByCh, sw.chDescs, sw.StopPeerForError, sw.nodePrivKey, sw.config)
        if err != nil {
                conn.Close()
                return err
        }
-       peer.SetLogger(sw.Logger.With("peer", conn.RemoteAddr()))
-       if err = sw.AddPeer(peer); err != nil {
+       if err = sw.AddPeer(peerConn); err != nil {
                conn.Close()
                return err
        }
@@ -635,36 +428,10 @@ func (sw *Switch) addPeerWithConnection(conn net.Conn) error {
        return nil
 }
 
-func (sw *Switch) addPeerWithConnectionAndConfig(conn net.Conn, config *PeerConfig) error {
-       fullAddr := conn.RemoteAddr().String()
-       host, _, err := net.SplitHostPort(fullAddr)
-       if err != nil {
-               return err
-       }
-
-       if err = sw.checkBannedPeer(host); err != nil {
-               return err
-       }
-
-       peer, err := newInboundPeerWithConfig(conn, sw.reactorsByCh, sw.chDescs, sw.StopPeerForError, sw.nodePrivKey, config)
-       if err != nil {
-               return err
-       }
-       peer.SetLogger(sw.Logger.With("peer", conn.RemoteAddr()))
-       if err = sw.AddPeer(peer); err != nil {
-               return err
-       }
-
-       return nil
-}
-
 func (sw *Switch) AddBannedPeer(peer *Peer) error {
        sw.mtx.Lock()
        defer sw.mtx.Unlock()
-       if peer == nil {
-               return nil
-       }
-       key := peer.mconn.RemoteAddress.IP.String()
+       key := peer.NodeInfo.RemoteAddrHost()
        sw.bannedPeer[key] = time.Now().Add(defaultBanDuration)
        datajson, err := json.Marshal(sw.bannedPeer)
        if err != nil {
diff --git a/p2p/test_util.go b/p2p/test_util.go
new file mode 100644 (file)
index 0000000..62d8b2b
--- /dev/null
@@ -0,0 +1,90 @@
+package p2p
+
+import (
+       "math/rand"
+       "net"
+
+       "github.com/tendermint/go-crypto"
+       cmn "github.com/tendermint/tmlibs/common"
+
+       cfg "github.com/bytom/config"
+)
+
+var PanicOnAddPeerErr = false
+
+// Switches connected via arbitrary net.Conn; useful for testing
+// Returns n switches, connected according to the connect func.
+// If connect==Connect2Switches, the switches will be fully connected.
+// initSwitch defines how the ith switch should be initialized (ie. with what reactors).
+// NOTE: panics if any switch fails to start.
+func MakeConnectedSwitches(cfg *cfg.P2PConfig, n int, initSwitch func(int, *Switch) *Switch, connect func([]*Switch, int, int)) []*Switch {
+       switches := make([]*Switch, n)
+       for i := 0; i < n; i++ {
+               switches[i] = makeSwitch(cfg, i, "testing", "123.123.123", initSwitch)
+       }
+
+       if err := startSwitches(switches); err != nil {
+               panic(err)
+       }
+
+       for i := 0; i < n; i++ {
+               for j := i; j < n; j++ {
+                       connect(switches, i, j)
+               }
+       }
+
+       return switches
+}
+
+// Will connect switches i and j via net.Pipe()
+// Blocks until a conection is established.
+// NOTE: caller ensures i and j are within bounds
+func Connect2Switches(switches []*Switch, i, j int) {
+       switchI := switches[i]
+       switchJ := switches[j]
+       c1, c2 := net.Pipe()
+       doneCh := make(chan struct{})
+       go func() {
+               err := switchI.addPeerWithConnection(c1)
+               if PanicOnAddPeerErr && err != nil {
+                       panic(err)
+               }
+               doneCh <- struct{}{}
+       }()
+       go func() {
+               err := switchJ.addPeerWithConnection(c2)
+               if PanicOnAddPeerErr && err != nil {
+                       panic(err)
+               }
+               doneCh <- struct{}{}
+       }()
+       <-doneCh
+       <-doneCh
+}
+
+func startSwitches(switches []*Switch) error {
+       for _, s := range switches {
+               _, err := s.Start() // start switch and reactors
+               if err != nil {
+                       return err
+               }
+       }
+       return nil
+}
+
+func makeSwitch(cfg *cfg.P2PConfig, i int, network, version string, initSwitch func(int, *Switch) *Switch) *Switch {
+       privKey := crypto.GenPrivKeyEd25519()
+       // new switch, add reactors
+       // TODO: let the config be passed in?
+       s := initSwitch(i, NewSwitch(cfg, nil))
+       s.SetNodeInfo(&NodeInfo{
+               PubKey:     privKey.PubKey().Unwrap().(crypto.PubKeyEd25519),
+               Moniker:    cmn.Fmt("switch%d", i),
+               Network:    network,
+               Version:    version,
+               RemoteAddr: cmn.Fmt("%v:%v", network, rand.Intn(64512)+1023),
+               ListenAddr: cmn.Fmt("%v:%v", network, rand.Intn(64512)+1023),
+       })
+       s.SetNodePrivKey(privKey)
+       return s
+}