OSDN Git Service

move connection to it's folder
authorpaladz <453256728@qq.com>
Thu, 24 May 2018 08:45:10 +0000 (16:45 +0800)
committerpaladz <453256728@qq.com>
Thu, 24 May 2018 08:45:10 +0000 (16:45 +0800)
netsync/protocol_reactor.go
p2p/base_reactor.go
p2p/connection/connection.go [moved from p2p/connection.go with 99% similarity]
p2p/connection/connection_test.go [moved from p2p/connection_test.go with 90% similarity]
p2p/connection/secret_connection.go [moved from p2p/secret_connection.go with 99% similarity]
p2p/connection/secret_connection_test.go [moved from p2p/secret_connection_test.go with 99% similarity]
p2p/node_info.go
p2p/peer.go
p2p/pex/pex_reactor.go
p2p/switch.go
p2p/test_util.go

index 8c38f57..3698482 100644 (file)
@@ -11,6 +11,7 @@ import (
 
        "github.com/bytom/errors"
        "github.com/bytom/p2p"
+       "github.com/bytom/p2p/connection"
        "github.com/bytom/protocol"
        "github.com/bytom/protocol/bc"
        "github.com/bytom/protocol/bc/types"
@@ -85,9 +86,9 @@ func NewProtocolReactor(chain *protocol.Chain, txPool *protocol.TxPool, sw *p2p.
 }
 
 // GetChannels implements Reactor
-func (pr *ProtocolReactor) GetChannels() []*p2p.ChannelDescriptor {
-       return []*p2p.ChannelDescriptor{
-               &p2p.ChannelDescriptor{
+func (pr *ProtocolReactor) GetChannels() []*connection.ChannelDescriptor {
+       return []*connection.ChannelDescriptor{
+               &connection.ChannelDescriptor{
                        ID:                BlockchainChannel,
                        Priority:          5,
                        SendQueueCapacity: 100,
index 2162a33..7d0f28d 100644 (file)
@@ -2,6 +2,8 @@ package p2p
 
 import (
        cmn "github.com/tendermint/tmlibs/common"
+
+       "github.com/bytom/p2p/connection"
 )
 
 //Reactor is responsible for handling incoming messages of one or more `Channels`
@@ -12,7 +14,7 @@ type Reactor interface {
        SetSwitch(*Switch)
 
        // GetChannels returns the list of channel descriptors.
-       GetChannels() []*ChannelDescriptor
+       GetChannels() []*connection.ChannelDescriptor
 
        // AddPeer is called by the switch when a new peer is added.
        AddPeer(peer *Peer) error
@@ -50,13 +52,13 @@ func (br *BaseReactor) SetSwitch(sw *Switch) {
 }
 
 //GetChannels returns the list of channel descriptors
-func (*BaseReactor) GetChannels() []*ChannelDescriptor              { return nil }
+func (*BaseReactor) GetChannels() []*connection.ChannelDescriptor { return nil }
 
 //AddPeer is called by the switch when a new peer is added
-func (*BaseReactor) AddPeer(peer *Peer)                             {}
+func (*BaseReactor) AddPeer(peer *Peer) {}
 
 //RemovePeer is called by the switch when the peer is stopped (due to error or other reason)
-func (*BaseReactor) RemovePeer(peer *Peer, reason interface{})      {}
+func (*BaseReactor) RemovePeer(peer *Peer, reason interface{}) {}
 
 //Receive is called when msgBytes is received from peer
 func (*BaseReactor) Receive(chID byte, peer *Peer, msgBytes []byte) {}
similarity index 99%
rename from p2p/connection.go
rename to p2p/connection/connection.go
index 7e16b15..77595d4 100644 (file)
@@ -1,4 +1,4 @@
-package p2p
+package connection
 
 import (
        "bufio"
@@ -81,9 +81,6 @@ type MConnection struct {
        flushTimer   *cmn.ThrottleTimer // flush writes as necessary but throttled.
        pingTimer    *time.Ticker       // send pings periodically
        chStatsTimer *time.Ticker       // update channel stats periodically
-
-       LocalAddress  *NetAddress
-       RemoteAddress *NetAddress
 }
 
 // MConnConfig is a MConnection configuration.
@@ -126,9 +123,6 @@ func NewMConnectionWithConfig(conn net.Conn, chDescs []*ChannelDescriptor, onRec
 
                pingTimer:    time.NewTicker(pingTimeout),
                chStatsTimer: time.NewTicker(updateState),
-
-               LocalAddress:  NewNetAddress(conn.LocalAddr()),
-               RemoteAddress: NewNetAddress(conn.RemoteAddr()),
        }
 
        // Create channels
similarity index 90%
rename from p2p/connection_test.go
rename to p2p/connection/connection_test.go
index c994f5f..0123a3e 100644 (file)
@@ -1,19 +1,16 @@
-// +build !network
-
-package p2p_test
+package connection
 
 import (
        "net"
        "testing"
        "time"
 
-       p2p "github.com/bytom/p2p"
        "github.com/stretchr/testify/assert"
        "github.com/stretchr/testify/require"
        "github.com/tendermint/tmlibs/log"
 )
 
-func createMConnection(conn net.Conn) *p2p.MConnection {
+func createMConnection(conn net.Conn) *MConnection {
        onReceive := func(chID byte, msgBytes []byte) {
        }
        onError := func(r interface{}) {
@@ -23,9 +20,9 @@ func createMConnection(conn net.Conn) *p2p.MConnection {
        return c
 }
 
-func createMConnectionWithCallbacks(conn net.Conn, onReceive func(chID byte, msgBytes []byte), onError func(r interface{})) *p2p.MConnection {
-       chDescs := []*p2p.ChannelDescriptor{&p2p.ChannelDescriptor{ID: 0x01, Priority: 1, SendQueueCapacity: 1}}
-       c := p2p.NewMConnection(conn, chDescs, onReceive, onError)
+func createMConnectionWithCallbacks(conn net.Conn, onReceive func(chID byte, msgBytes []byte), onError func(r interface{})) *MConnection {
+       chDescs := []*ChannelDescriptor{&ChannelDescriptor{ID: 0x01, Priority: 1, SendQueueCapacity: 1}}
+       c := NewMConnection(conn, chDescs, onReceive, onError)
        c.SetLogger(log.TestingLogger())
        return c
 }
similarity index 99%
rename from p2p/secret_connection.go
rename to p2p/connection/secret_connection.go
index 24cae0f..5bd8f9c 100644 (file)
@@ -4,7 +4,7 @@
 // is known ahead of time, and thus we are technically
 // still vulnerable to MITM. (TODO!)
 // See docs/sts-final.pdf for more info
-package p2p
+package connection
 
 import (
        "bytes"
similarity index 99%
rename from p2p/secret_connection_test.go
rename to p2p/connection/secret_connection_test.go
index 28db58a..fbfdf92 100644 (file)
@@ -1,6 +1,4 @@
-// +build !network
-
-package p2p
+package connection
 
 import (
        "bytes"
index 25d8da5..5d586c3 100644 (file)
@@ -16,7 +16,6 @@ type NodeInfo struct {
        PubKey     crypto.PubKeyEd25519 `json:"pub_key"`
        Moniker    string               `json:"moniker"`
        Network    string               `json:"network"`
-       RemoteAddr string               `json:"remote_addr"`
        ListenAddr string               `json:"listen_addr"`
        Version    string               `json:"version"` // major.minor.revision
        Other      []string             `json:"other"`   // other application specific data
@@ -73,12 +72,6 @@ func (info *NodeInfo) ListenPort() int {
        return portInt
 }
 
-//RemoteAddrHost peer external ip address
-func (info *NodeInfo) RemoteAddrHost() string {
-       host, _, _ := net.SplitHostPort(info.RemoteAddr)
-       return host
-}
-
 //String representation
 func (info NodeInfo) String() string {
        return fmt.Sprintf("NodeInfo{pk: %v, moniker: %v, network: %v [listen %v], version: %v (%v)}", info.PubKey, info.Moniker, info.Network, info.ListenAddr, info.Version, info.Other)
index 82f47e6..e445ad0 100644 (file)
@@ -12,6 +12,7 @@ import (
        cmn "github.com/tendermint/tmlibs/common"
 
        cfg "github.com/bytom/config"
+       "github.com/bytom/p2p/connection"
 )
 
 // peerConn contains the raw connection and its config.
@@ -27,7 +28,7 @@ type Peer struct {
 
        // raw peerConn and the multiplex connection
        *peerConn
-       mconn *MConnection // multiplex connection
+       mconn *connection.MConnection // multiplex connection
 
        *NodeInfo
        Key  string
@@ -42,7 +43,7 @@ type PeerConfig struct {
        HandshakeTimeout time.Duration `mapstructure:"handshake_timeout"`
        DialTimeout      time.Duration `mapstructure:"dial_timeout"`
 
-       MConfig *MConnConfig `mapstructure:"connection"`
+       MConfig *connection.MConnConfig `mapstructure:"connection"`
 
        Fuzz       bool            `mapstructure:"fuzz"` // fuzz connection (for testing)
        FuzzConfig *FuzzConnConfig `mapstructure:"fuzz_config"`
@@ -54,13 +55,13 @@ func DefaultPeerConfig(config *cfg.P2PConfig) *PeerConfig {
                AuthEnc:          true,
                HandshakeTimeout: time.Duration(config.HandshakeTimeout), // * time.Second,
                DialTimeout:      time.Duration(config.DialTimeout),      // * time.Second,
-               MConfig:          DefaultMConnConfig(),
+               MConfig:          connection.DefaultMConnConfig(),
                Fuzz:             false,
                FuzzConfig:       DefaultFuzzConnConfig(),
        }
 }
 
-func newPeer(pc *peerConn, nodeInfo *NodeInfo, reactorsByCh map[byte]Reactor, chDescs []*ChannelDescriptor, onPeerError func(*Peer, interface{})) *Peer {
+func newPeer(pc *peerConn, nodeInfo *NodeInfo, reactorsByCh map[byte]Reactor, chDescs []*connection.ChannelDescriptor, onPeerError func(*Peer, interface{})) *Peer {
        // Key and NodeInfo are set after Handshake
        p := &Peer{
                peerConn: pc,
@@ -75,11 +76,11 @@ func newPeer(pc *peerConn, nodeInfo *NodeInfo, reactorsByCh map[byte]Reactor, ch
        return p
 }
 
-func newOutboundPeer(addr *NetAddress, reactorsByCh map[byte]Reactor, chDescs []*ChannelDescriptor, onPeerError func(*Peer, interface{}), ourNodePrivKey crypto.PrivKeyEd25519, config *cfg.P2PConfig) (*peerConn, error) {
+func newOutboundPeer(addr *NetAddress, reactorsByCh map[byte]Reactor, chDescs []*connection.ChannelDescriptor, onPeerError func(*Peer, interface{}), ourNodePrivKey crypto.PrivKeyEd25519, config *cfg.P2PConfig) (*peerConn, error) {
        return newOutboundPeerConn(addr, reactorsByCh, chDescs, onPeerError, ourNodePrivKey, DefaultPeerConfig(config))
 }
 
-func newOutboundPeerConn(addr *NetAddress, reactorsByCh map[byte]Reactor, chDescs []*ChannelDescriptor, onPeerError func(*Peer, interface{}), ourNodePrivKey crypto.PrivKeyEd25519, config *PeerConfig) (*peerConn, error) {
+func newOutboundPeerConn(addr *NetAddress, reactorsByCh map[byte]Reactor, chDescs []*connection.ChannelDescriptor, onPeerError func(*Peer, interface{}), ourNodePrivKey crypto.PrivKeyEd25519, config *PeerConfig) (*peerConn, error) {
        conn, err := dial(addr, config)
        if err != nil {
                return nil, errors.Wrap(err, "Error dial peer")
@@ -94,11 +95,11 @@ func newOutboundPeerConn(addr *NetAddress, reactorsByCh map[byte]Reactor, chDesc
        return pc, nil
 }
 
-func newInboundPeerConn(conn net.Conn, reactorsByCh map[byte]Reactor, chDescs []*ChannelDescriptor, onPeerError func(*Peer, interface{}), ourNodePrivKey crypto.PrivKeyEd25519, config *cfg.P2PConfig) (*peerConn, error) {
+func newInboundPeerConn(conn net.Conn, reactorsByCh map[byte]Reactor, chDescs []*connection.ChannelDescriptor, onPeerError func(*Peer, interface{}), ourNodePrivKey crypto.PrivKeyEd25519, config *cfg.P2PConfig) (*peerConn, error) {
        return newPeerConn(conn, false, reactorsByCh, chDescs, onPeerError, ourNodePrivKey, DefaultPeerConfig(config))
 }
 
-func newPeerConn(rawConn net.Conn, outbound bool, reactorsByCh map[byte]Reactor, chDescs []*ChannelDescriptor, onPeerError func(*Peer, interface{}), ourNodePrivKey crypto.PrivKeyEd25519, config *PeerConfig) (*peerConn, error) {
+func newPeerConn(rawConn net.Conn, outbound bool, reactorsByCh map[byte]Reactor, chDescs []*connection.ChannelDescriptor, onPeerError func(*Peer, interface{}), ourNodePrivKey crypto.PrivKeyEd25519, config *PeerConfig) (*peerConn, error) {
        conn := rawConn
 
        // Fuzz connection
@@ -112,7 +113,7 @@ func newPeerConn(rawConn net.Conn, outbound bool, reactorsByCh map[byte]Reactor,
                conn.SetDeadline(time.Now().Add(config.HandshakeTimeout * time.Second))
 
                var err error
-               conn, err = MakeSecretConnection(conn, ourNodePrivKey)
+               conn, err = connection.MakeSecretConnection(conn, ourNodePrivKey)
                if err != nil {
                        return nil, errors.Wrap(err, "Error creating peer")
                }
@@ -171,7 +172,7 @@ func (p *Peer) Addr() net.Addr {
 // PubKey returns peer's public key.
 func (p *Peer) PubKey() crypto.PubKeyEd25519 {
        if p.config.AuthEnc {
-               return p.conn.(*SecretConnection).RemotePubKey()
+               return p.conn.(*connection.SecretConnection).RemotePubKey()
        }
        if p.NodeInfo == nil {
                panic("Attempt to get peer's PubKey before calling Handshake")
@@ -193,7 +194,7 @@ func (p *Peer) OnStop() {
 }
 
 // Connection returns underlying MConnection.
-func (p *Peer) Connection() *MConnection {
+func (p *Peer) Connection() *connection.MConnection {
        return p.mconn
 }
 
@@ -257,7 +258,7 @@ func dial(addr *NetAddress, config *PeerConfig) (net.Conn, error) {
        return conn, nil
 }
 
-func createMConnection(conn net.Conn, p *Peer, reactorsByCh map[byte]Reactor, chDescs []*ChannelDescriptor, onPeerError func(*Peer, interface{}), config *MConnConfig) *MConnection {
+func createMConnection(conn net.Conn, p *Peer, reactorsByCh map[byte]Reactor, chDescs []*connection.ChannelDescriptor, onPeerError func(*Peer, interface{}), config *connection.MConnConfig) *connection.MConnection {
        onReceive := func(chID byte, msgBytes []byte) {
                reactor := reactorsByCh[chID]
                if reactor == nil {
@@ -270,5 +271,5 @@ func createMConnection(conn net.Conn, p *Peer, reactorsByCh map[byte]Reactor, ch
                onPeerError(p, r)
        }
 
-       return NewMConnectionWithConfig(conn, chDescs, onReceive, onError, config)
+       return connection.NewMConnectionWithConfig(conn, chDescs, onReceive, onError, config)
 }
index 89edc87..95a33df 100644 (file)
@@ -12,6 +12,7 @@ import (
        cmn "github.com/tendermint/tmlibs/common"
 
        "github.com/bytom/p2p"
+       "github.com/bytom/p2p/connection"
 )
 
 const (
@@ -59,8 +60,8 @@ func (r *PEXReactor) OnStop() {
 }
 
 // GetChannels implements Reactor
-func (r *PEXReactor) GetChannels() []*p2p.ChannelDescriptor {
-       return []*p2p.ChannelDescriptor{&p2p.ChannelDescriptor{
+func (r *PEXReactor) GetChannels() []*connection.ChannelDescriptor {
+       return []*connection.ChannelDescriptor{&connection.ChannelDescriptor{
                ID:                PexChannel,
                Priority:          1,
                SendQueueCapacity: 10,
@@ -97,11 +98,10 @@ func (r *PEXReactor) AddPeer(p *p2p.Peer) error {
 
 // Receive implements Reactor by handling incoming PEX messages.
 func (r *PEXReactor) Receive(chID byte, p *p2p.Peer, rawMsg []byte) {
-       srcAddr := p.Connection().RemoteAddress
-       srcAddrStr := srcAddr.String()
-       r.incrementMsgCount(srcAddrStr)
-       if r.reachedMaxMsgLimit(srcAddrStr) {
-               log.WithField("peer", srcAddrStr).Error("reached the max pex messages limit")
+       addrStr := p.Addr().String()
+       r.incrementMsgCount(addrStr)
+       if r.reachedMaxMsgLimit(addrStr) {
+               log.WithField("peer", addrStr).Error("reached the max pex messages limit")
                r.Switch.StopPeerGracefully(p)
                return
        }
@@ -120,6 +120,12 @@ func (r *PEXReactor) Receive(chID byte, p *p2p.Peer, rawMsg []byte) {
                }
 
        case *pexAddrsMessage:
+               srcAddr, err := p2p.NewNetAddressString(addrStr)
+               if err != nil {
+                       log.WithField("error", err).Error("pex fail on create src address")
+                       return
+               }
+
                for _, addr := range msg.Addrs {
                        if err := r.book.AddAddress(addr, srcAddr); err != nil {
                                log.WithField("error", err).Error("pex fail on process pexAddrsMessage")
@@ -218,7 +224,7 @@ func (r *PEXReactor) ensurePeers() {
 
        connectedPeers := make(map[string]struct{})
        for _, peer := range r.Switch.Peers().List() {
-               connectedPeers[peer.RemoteAddrHost()] = struct{}{}
+               connectedPeers[peer.Addr().String()] = struct{}{}
        }
 
        for i := 0; i < maxAttempts && len(toDial) < numToDial; i++ {
@@ -232,7 +238,7 @@ func (r *PEXReactor) ensurePeers() {
                if dialling := r.Switch.IsDialing(try); dialling {
                        continue
                }
-               if _, ok := connectedPeers[try.IP.String()]; ok {
+               if _, ok := connectedPeers[try.String()]; ok {
                        continue
                }
 
index ed8d017..816d37b 100644 (file)
@@ -14,6 +14,7 @@ import (
 
        cfg "github.com/bytom/config"
        "github.com/bytom/errors"
+       "github.com/bytom/p2p/connection"
        "github.com/bytom/p2p/trust"
 )
 
@@ -51,7 +52,7 @@ type Switch struct {
        peerConfig   *PeerConfig
        listeners    []Listener
        reactors     map[string]Reactor
-       chDescs      []*ChannelDescriptor
+       chDescs      []*connection.ChannelDescriptor
        reactorsByCh map[byte]Reactor
        peers        *PeerSet
        dialing      *cmn.CMap
@@ -69,7 +70,7 @@ func NewSwitch(config *cfg.P2PConfig, addrBook AddrBook, trustHistoryDB dbm.DB)
                Config:       config,
                peerConfig:   DefaultPeerConfig(config),
                reactors:     make(map[string]Reactor),
-               chDescs:      make([]*ChannelDescriptor, 0),
+               chDescs:      make([]*connection.ChannelDescriptor, 0),
                reactorsByCh: make(map[byte]Reactor),
                peers:        NewPeerSet(),
                dialing:      cmn.NewCMap(),
@@ -276,7 +277,7 @@ func (sw *Switch) filterConnByIP(ip string) error {
 }
 
 func (sw *Switch) filterConnByPeer(peer *Peer) error {
-       if err := sw.checkBannedPeer(peer.NodeInfo.RemoteAddrHost()); err != nil {
+       if err := sw.checkBannedPeer(peer.Addr().String()); err != nil {
                return ErrConnectBannedPeer
        }
 
@@ -406,7 +407,7 @@ func (sw *Switch) addPeerWithConnection(conn net.Conn) error {
 func (sw *Switch) AddBannedPeer(peer *Peer) error {
        sw.mtx.Lock()
        defer sw.mtx.Unlock()
-       key := peer.NodeInfo.RemoteAddrHost()
+       key := peer.Addr().String()
        sw.bannedPeer[key] = time.Now().Add(defaultBanDuration)
        datajson, err := json.Marshal(sw.bannedPeer)
        if err != nil {
index 88c088d..d01fa6a 100644 (file)
@@ -8,6 +8,7 @@ import (
        cmn "github.com/tendermint/tmlibs/common"
 
        cfg "github.com/bytom/config"
+       "github.com/bytom/p2p/connection"
 )
 
 //PanicOnAddPeerErr add peer error
@@ -22,7 +23,7 @@ func CreateRandomPeer(outbound bool) *Peer {
                NodeInfo: &NodeInfo{
                        ListenAddr: netAddr.DialString(),
                },
-               mconn: &MConnection{},
+               mconn: &connection.MConnection{},
        }
        return p
 }
@@ -112,7 +113,6 @@ func MakeSwitch(cfg *cfg.P2PConfig, i int, network, version string, initSwitch f
                Moniker:    cmn.Fmt("switch%d", i),
                Network:    network,
                Version:    version,
-               RemoteAddr: cmn.Fmt("%v:%v", network, rand.Intn(64512)+1023),
                ListenAddr: cmn.Fmt("%v:%v", network, rand.Intn(64512)+1023),
        })
        s.SetNodePrivKey(privKey)