From a32d097d6e8f4e34c0168c50847bc6ccf5267236 Mon Sep 17 00:00:00 2001 From: paladz <453256728@qq.com> Date: Thu, 24 May 2018 16:45:10 +0800 Subject: [PATCH] move connection to it's folder --- netsync/protocol_reactor.go | 7 ++++--- p2p/base_reactor.go | 10 ++++++---- p2p/{ => connection}/connection.go | 8 +------- p2p/{ => connection}/connection_test.go | 13 +++++-------- p2p/{ => connection}/secret_connection.go | 2 +- p2p/{ => connection}/secret_connection_test.go | 4 +--- p2p/node_info.go | 7 ------- p2p/peer.go | 27 +++++++++++++------------- p2p/pex/pex_reactor.go | 24 ++++++++++++++--------- p2p/switch.go | 9 +++++---- p2p/test_util.go | 4 ++-- 11 files changed, 54 insertions(+), 61 deletions(-) rename p2p/{ => connection}/connection.go (99%) rename p2p/{ => connection}/connection_test.go (90%) rename p2p/{ => connection}/secret_connection.go (99%) rename p2p/{ => connection}/secret_connection_test.go (99%) diff --git a/netsync/protocol_reactor.go b/netsync/protocol_reactor.go index 8c38f57a..36984825 100644 --- a/netsync/protocol_reactor.go +++ b/netsync/protocol_reactor.go @@ -11,6 +11,7 @@ import ( "github.com/bytom/errors" "github.com/bytom/p2p" + "github.com/bytom/p2p/connection" "github.com/bytom/protocol" "github.com/bytom/protocol/bc" "github.com/bytom/protocol/bc/types" @@ -85,9 +86,9 @@ func NewProtocolReactor(chain *protocol.Chain, txPool *protocol.TxPool, sw *p2p. } // GetChannels implements Reactor -func (pr *ProtocolReactor) GetChannels() []*p2p.ChannelDescriptor { - return []*p2p.ChannelDescriptor{ - &p2p.ChannelDescriptor{ +func (pr *ProtocolReactor) GetChannels() []*connection.ChannelDescriptor { + return []*connection.ChannelDescriptor{ + &connection.ChannelDescriptor{ ID: BlockchainChannel, Priority: 5, SendQueueCapacity: 100, diff --git a/p2p/base_reactor.go b/p2p/base_reactor.go index 2162a336..7d0f28d3 100644 --- a/p2p/base_reactor.go +++ b/p2p/base_reactor.go @@ -2,6 +2,8 @@ package p2p import ( cmn "github.com/tendermint/tmlibs/common" + + "github.com/bytom/p2p/connection" ) //Reactor is responsible for handling incoming messages of one or more `Channels` @@ -12,7 +14,7 @@ type Reactor interface { SetSwitch(*Switch) // GetChannels returns the list of channel descriptors. - GetChannels() []*ChannelDescriptor + GetChannels() []*connection.ChannelDescriptor // AddPeer is called by the switch when a new peer is added. AddPeer(peer *Peer) error @@ -50,13 +52,13 @@ func (br *BaseReactor) SetSwitch(sw *Switch) { } //GetChannels returns the list of channel descriptors -func (*BaseReactor) GetChannels() []*ChannelDescriptor { return nil } +func (*BaseReactor) GetChannels() []*connection.ChannelDescriptor { return nil } //AddPeer is called by the switch when a new peer is added -func (*BaseReactor) AddPeer(peer *Peer) {} +func (*BaseReactor) AddPeer(peer *Peer) {} //RemovePeer is called by the switch when the peer is stopped (due to error or other reason) -func (*BaseReactor) RemovePeer(peer *Peer, reason interface{}) {} +func (*BaseReactor) RemovePeer(peer *Peer, reason interface{}) {} //Receive is called when msgBytes is received from peer func (*BaseReactor) Receive(chID byte, peer *Peer, msgBytes []byte) {} diff --git a/p2p/connection.go b/p2p/connection/connection.go similarity index 99% rename from p2p/connection.go rename to p2p/connection/connection.go index 7e16b159..77595d43 100644 --- a/p2p/connection.go +++ b/p2p/connection/connection.go @@ -1,4 +1,4 @@ -package p2p +package connection import ( "bufio" @@ -81,9 +81,6 @@ type MConnection struct { flushTimer *cmn.ThrottleTimer // flush writes as necessary but throttled. pingTimer *time.Ticker // send pings periodically chStatsTimer *time.Ticker // update channel stats periodically - - LocalAddress *NetAddress - RemoteAddress *NetAddress } // MConnConfig is a MConnection configuration. @@ -126,9 +123,6 @@ func NewMConnectionWithConfig(conn net.Conn, chDescs []*ChannelDescriptor, onRec pingTimer: time.NewTicker(pingTimeout), chStatsTimer: time.NewTicker(updateState), - - LocalAddress: NewNetAddress(conn.LocalAddr()), - RemoteAddress: NewNetAddress(conn.RemoteAddr()), } // Create channels diff --git a/p2p/connection_test.go b/p2p/connection/connection_test.go similarity index 90% rename from p2p/connection_test.go rename to p2p/connection/connection_test.go index c994f5f2..0123a3e5 100644 --- a/p2p/connection_test.go +++ b/p2p/connection/connection_test.go @@ -1,19 +1,16 @@ -// +build !network - -package p2p_test +package connection import ( "net" "testing" "time" - p2p "github.com/bytom/p2p" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "github.com/tendermint/tmlibs/log" ) -func createMConnection(conn net.Conn) *p2p.MConnection { +func createMConnection(conn net.Conn) *MConnection { onReceive := func(chID byte, msgBytes []byte) { } onError := func(r interface{}) { @@ -23,9 +20,9 @@ func createMConnection(conn net.Conn) *p2p.MConnection { return c } -func createMConnectionWithCallbacks(conn net.Conn, onReceive func(chID byte, msgBytes []byte), onError func(r interface{})) *p2p.MConnection { - chDescs := []*p2p.ChannelDescriptor{&p2p.ChannelDescriptor{ID: 0x01, Priority: 1, SendQueueCapacity: 1}} - c := p2p.NewMConnection(conn, chDescs, onReceive, onError) +func createMConnectionWithCallbacks(conn net.Conn, onReceive func(chID byte, msgBytes []byte), onError func(r interface{})) *MConnection { + chDescs := []*ChannelDescriptor{&ChannelDescriptor{ID: 0x01, Priority: 1, SendQueueCapacity: 1}} + c := NewMConnection(conn, chDescs, onReceive, onError) c.SetLogger(log.TestingLogger()) return c } diff --git a/p2p/secret_connection.go b/p2p/connection/secret_connection.go similarity index 99% rename from p2p/secret_connection.go rename to p2p/connection/secret_connection.go index 24cae0f6..5bd8f9ca 100644 --- a/p2p/secret_connection.go +++ b/p2p/connection/secret_connection.go @@ -4,7 +4,7 @@ // is known ahead of time, and thus we are technically // still vulnerable to MITM. (TODO!) // See docs/sts-final.pdf for more info -package p2p +package connection import ( "bytes" diff --git a/p2p/secret_connection_test.go b/p2p/connection/secret_connection_test.go similarity index 99% rename from p2p/secret_connection_test.go rename to p2p/connection/secret_connection_test.go index 28db58a2..fbfdf922 100644 --- a/p2p/secret_connection_test.go +++ b/p2p/connection/secret_connection_test.go @@ -1,6 +1,4 @@ -// +build !network - -package p2p +package connection import ( "bytes" diff --git a/p2p/node_info.go b/p2p/node_info.go index 25d8da5a..5d586c31 100644 --- a/p2p/node_info.go +++ b/p2p/node_info.go @@ -16,7 +16,6 @@ type NodeInfo struct { PubKey crypto.PubKeyEd25519 `json:"pub_key"` Moniker string `json:"moniker"` Network string `json:"network"` - RemoteAddr string `json:"remote_addr"` ListenAddr string `json:"listen_addr"` Version string `json:"version"` // major.minor.revision Other []string `json:"other"` // other application specific data @@ -73,12 +72,6 @@ func (info *NodeInfo) ListenPort() int { return portInt } -//RemoteAddrHost peer external ip address -func (info *NodeInfo) RemoteAddrHost() string { - host, _, _ := net.SplitHostPort(info.RemoteAddr) - return host -} - //String representation func (info NodeInfo) String() string { return fmt.Sprintf("NodeInfo{pk: %v, moniker: %v, network: %v [listen %v], version: %v (%v)}", info.PubKey, info.Moniker, info.Network, info.ListenAddr, info.Version, info.Other) diff --git a/p2p/peer.go b/p2p/peer.go index 82f47e65..e445ad04 100644 --- a/p2p/peer.go +++ b/p2p/peer.go @@ -12,6 +12,7 @@ import ( cmn "github.com/tendermint/tmlibs/common" cfg "github.com/bytom/config" + "github.com/bytom/p2p/connection" ) // peerConn contains the raw connection and its config. @@ -27,7 +28,7 @@ type Peer struct { // raw peerConn and the multiplex connection *peerConn - mconn *MConnection // multiplex connection + mconn *connection.MConnection // multiplex connection *NodeInfo Key string @@ -42,7 +43,7 @@ type PeerConfig struct { HandshakeTimeout time.Duration `mapstructure:"handshake_timeout"` DialTimeout time.Duration `mapstructure:"dial_timeout"` - MConfig *MConnConfig `mapstructure:"connection"` + MConfig *connection.MConnConfig `mapstructure:"connection"` Fuzz bool `mapstructure:"fuzz"` // fuzz connection (for testing) FuzzConfig *FuzzConnConfig `mapstructure:"fuzz_config"` @@ -54,13 +55,13 @@ func DefaultPeerConfig(config *cfg.P2PConfig) *PeerConfig { AuthEnc: true, HandshakeTimeout: time.Duration(config.HandshakeTimeout), // * time.Second, DialTimeout: time.Duration(config.DialTimeout), // * time.Second, - MConfig: DefaultMConnConfig(), + MConfig: connection.DefaultMConnConfig(), Fuzz: false, FuzzConfig: DefaultFuzzConnConfig(), } } -func newPeer(pc *peerConn, nodeInfo *NodeInfo, reactorsByCh map[byte]Reactor, chDescs []*ChannelDescriptor, onPeerError func(*Peer, interface{})) *Peer { +func newPeer(pc *peerConn, nodeInfo *NodeInfo, reactorsByCh map[byte]Reactor, chDescs []*connection.ChannelDescriptor, onPeerError func(*Peer, interface{})) *Peer { // Key and NodeInfo are set after Handshake p := &Peer{ peerConn: pc, @@ -75,11 +76,11 @@ func newPeer(pc *peerConn, nodeInfo *NodeInfo, reactorsByCh map[byte]Reactor, ch return p } -func newOutboundPeer(addr *NetAddress, reactorsByCh map[byte]Reactor, chDescs []*ChannelDescriptor, onPeerError func(*Peer, interface{}), ourNodePrivKey crypto.PrivKeyEd25519, config *cfg.P2PConfig) (*peerConn, error) { +func newOutboundPeer(addr *NetAddress, reactorsByCh map[byte]Reactor, chDescs []*connection.ChannelDescriptor, onPeerError func(*Peer, interface{}), ourNodePrivKey crypto.PrivKeyEd25519, config *cfg.P2PConfig) (*peerConn, error) { return newOutboundPeerConn(addr, reactorsByCh, chDescs, onPeerError, ourNodePrivKey, DefaultPeerConfig(config)) } -func newOutboundPeerConn(addr *NetAddress, reactorsByCh map[byte]Reactor, chDescs []*ChannelDescriptor, onPeerError func(*Peer, interface{}), ourNodePrivKey crypto.PrivKeyEd25519, config *PeerConfig) (*peerConn, error) { +func newOutboundPeerConn(addr *NetAddress, reactorsByCh map[byte]Reactor, chDescs []*connection.ChannelDescriptor, onPeerError func(*Peer, interface{}), ourNodePrivKey crypto.PrivKeyEd25519, config *PeerConfig) (*peerConn, error) { conn, err := dial(addr, config) if err != nil { return nil, errors.Wrap(err, "Error dial peer") @@ -94,11 +95,11 @@ func newOutboundPeerConn(addr *NetAddress, reactorsByCh map[byte]Reactor, chDesc return pc, nil } -func newInboundPeerConn(conn net.Conn, reactorsByCh map[byte]Reactor, chDescs []*ChannelDescriptor, onPeerError func(*Peer, interface{}), ourNodePrivKey crypto.PrivKeyEd25519, config *cfg.P2PConfig) (*peerConn, error) { +func newInboundPeerConn(conn net.Conn, reactorsByCh map[byte]Reactor, chDescs []*connection.ChannelDescriptor, onPeerError func(*Peer, interface{}), ourNodePrivKey crypto.PrivKeyEd25519, config *cfg.P2PConfig) (*peerConn, error) { return newPeerConn(conn, false, reactorsByCh, chDescs, onPeerError, ourNodePrivKey, DefaultPeerConfig(config)) } -func newPeerConn(rawConn net.Conn, outbound bool, reactorsByCh map[byte]Reactor, chDescs []*ChannelDescriptor, onPeerError func(*Peer, interface{}), ourNodePrivKey crypto.PrivKeyEd25519, config *PeerConfig) (*peerConn, error) { +func newPeerConn(rawConn net.Conn, outbound bool, reactorsByCh map[byte]Reactor, chDescs []*connection.ChannelDescriptor, onPeerError func(*Peer, interface{}), ourNodePrivKey crypto.PrivKeyEd25519, config *PeerConfig) (*peerConn, error) { conn := rawConn // Fuzz connection @@ -112,7 +113,7 @@ func newPeerConn(rawConn net.Conn, outbound bool, reactorsByCh map[byte]Reactor, conn.SetDeadline(time.Now().Add(config.HandshakeTimeout * time.Second)) var err error - conn, err = MakeSecretConnection(conn, ourNodePrivKey) + conn, err = connection.MakeSecretConnection(conn, ourNodePrivKey) if err != nil { return nil, errors.Wrap(err, "Error creating peer") } @@ -171,7 +172,7 @@ func (p *Peer) Addr() net.Addr { // PubKey returns peer's public key. func (p *Peer) PubKey() crypto.PubKeyEd25519 { if p.config.AuthEnc { - return p.conn.(*SecretConnection).RemotePubKey() + return p.conn.(*connection.SecretConnection).RemotePubKey() } if p.NodeInfo == nil { panic("Attempt to get peer's PubKey before calling Handshake") @@ -193,7 +194,7 @@ func (p *Peer) OnStop() { } // Connection returns underlying MConnection. -func (p *Peer) Connection() *MConnection { +func (p *Peer) Connection() *connection.MConnection { return p.mconn } @@ -257,7 +258,7 @@ func dial(addr *NetAddress, config *PeerConfig) (net.Conn, error) { return conn, nil } -func createMConnection(conn net.Conn, p *Peer, reactorsByCh map[byte]Reactor, chDescs []*ChannelDescriptor, onPeerError func(*Peer, interface{}), config *MConnConfig) *MConnection { +func createMConnection(conn net.Conn, p *Peer, reactorsByCh map[byte]Reactor, chDescs []*connection.ChannelDescriptor, onPeerError func(*Peer, interface{}), config *connection.MConnConfig) *connection.MConnection { onReceive := func(chID byte, msgBytes []byte) { reactor := reactorsByCh[chID] if reactor == nil { @@ -270,5 +271,5 @@ func createMConnection(conn net.Conn, p *Peer, reactorsByCh map[byte]Reactor, ch onPeerError(p, r) } - return NewMConnectionWithConfig(conn, chDescs, onReceive, onError, config) + return connection.NewMConnectionWithConfig(conn, chDescs, onReceive, onError, config) } diff --git a/p2p/pex/pex_reactor.go b/p2p/pex/pex_reactor.go index 89edc87b..95a33df6 100644 --- a/p2p/pex/pex_reactor.go +++ b/p2p/pex/pex_reactor.go @@ -12,6 +12,7 @@ import ( cmn "github.com/tendermint/tmlibs/common" "github.com/bytom/p2p" + "github.com/bytom/p2p/connection" ) const ( @@ -59,8 +60,8 @@ func (r *PEXReactor) OnStop() { } // GetChannels implements Reactor -func (r *PEXReactor) GetChannels() []*p2p.ChannelDescriptor { - return []*p2p.ChannelDescriptor{&p2p.ChannelDescriptor{ +func (r *PEXReactor) GetChannels() []*connection.ChannelDescriptor { + return []*connection.ChannelDescriptor{&connection.ChannelDescriptor{ ID: PexChannel, Priority: 1, SendQueueCapacity: 10, @@ -97,11 +98,10 @@ func (r *PEXReactor) AddPeer(p *p2p.Peer) error { // Receive implements Reactor by handling incoming PEX messages. func (r *PEXReactor) Receive(chID byte, p *p2p.Peer, rawMsg []byte) { - srcAddr := p.Connection().RemoteAddress - srcAddrStr := srcAddr.String() - r.incrementMsgCount(srcAddrStr) - if r.reachedMaxMsgLimit(srcAddrStr) { - log.WithField("peer", srcAddrStr).Error("reached the max pex messages limit") + addrStr := p.Addr().String() + r.incrementMsgCount(addrStr) + if r.reachedMaxMsgLimit(addrStr) { + log.WithField("peer", addrStr).Error("reached the max pex messages limit") r.Switch.StopPeerGracefully(p) return } @@ -120,6 +120,12 @@ func (r *PEXReactor) Receive(chID byte, p *p2p.Peer, rawMsg []byte) { } case *pexAddrsMessage: + srcAddr, err := p2p.NewNetAddressString(addrStr) + if err != nil { + log.WithField("error", err).Error("pex fail on create src address") + return + } + for _, addr := range msg.Addrs { if err := r.book.AddAddress(addr, srcAddr); err != nil { log.WithField("error", err).Error("pex fail on process pexAddrsMessage") @@ -218,7 +224,7 @@ func (r *PEXReactor) ensurePeers() { connectedPeers := make(map[string]struct{}) for _, peer := range r.Switch.Peers().List() { - connectedPeers[peer.RemoteAddrHost()] = struct{}{} + connectedPeers[peer.Addr().String()] = struct{}{} } for i := 0; i < maxAttempts && len(toDial) < numToDial; i++ { @@ -232,7 +238,7 @@ func (r *PEXReactor) ensurePeers() { if dialling := r.Switch.IsDialing(try); dialling { continue } - if _, ok := connectedPeers[try.IP.String()]; ok { + if _, ok := connectedPeers[try.String()]; ok { continue } diff --git a/p2p/switch.go b/p2p/switch.go index ed8d0177..816d37bb 100644 --- a/p2p/switch.go +++ b/p2p/switch.go @@ -14,6 +14,7 @@ import ( cfg "github.com/bytom/config" "github.com/bytom/errors" + "github.com/bytom/p2p/connection" "github.com/bytom/p2p/trust" ) @@ -51,7 +52,7 @@ type Switch struct { peerConfig *PeerConfig listeners []Listener reactors map[string]Reactor - chDescs []*ChannelDescriptor + chDescs []*connection.ChannelDescriptor reactorsByCh map[byte]Reactor peers *PeerSet dialing *cmn.CMap @@ -69,7 +70,7 @@ func NewSwitch(config *cfg.P2PConfig, addrBook AddrBook, trustHistoryDB dbm.DB) Config: config, peerConfig: DefaultPeerConfig(config), reactors: make(map[string]Reactor), - chDescs: make([]*ChannelDescriptor, 0), + chDescs: make([]*connection.ChannelDescriptor, 0), reactorsByCh: make(map[byte]Reactor), peers: NewPeerSet(), dialing: cmn.NewCMap(), @@ -276,7 +277,7 @@ func (sw *Switch) filterConnByIP(ip string) error { } func (sw *Switch) filterConnByPeer(peer *Peer) error { - if err := sw.checkBannedPeer(peer.NodeInfo.RemoteAddrHost()); err != nil { + if err := sw.checkBannedPeer(peer.Addr().String()); err != nil { return ErrConnectBannedPeer } @@ -406,7 +407,7 @@ func (sw *Switch) addPeerWithConnection(conn net.Conn) error { func (sw *Switch) AddBannedPeer(peer *Peer) error { sw.mtx.Lock() defer sw.mtx.Unlock() - key := peer.NodeInfo.RemoteAddrHost() + key := peer.Addr().String() sw.bannedPeer[key] = time.Now().Add(defaultBanDuration) datajson, err := json.Marshal(sw.bannedPeer) if err != nil { diff --git a/p2p/test_util.go b/p2p/test_util.go index 88c088db..d01fa6a4 100644 --- a/p2p/test_util.go +++ b/p2p/test_util.go @@ -8,6 +8,7 @@ import ( cmn "github.com/tendermint/tmlibs/common" cfg "github.com/bytom/config" + "github.com/bytom/p2p/connection" ) //PanicOnAddPeerErr add peer error @@ -22,7 +23,7 @@ func CreateRandomPeer(outbound bool) *Peer { NodeInfo: &NodeInfo{ ListenAddr: netAddr.DialString(), }, - mconn: &MConnection{}, + mconn: &connection.MConnection{}, } return p } @@ -112,7 +113,6 @@ func MakeSwitch(cfg *cfg.P2PConfig, i int, network, version string, initSwitch f Moniker: cmn.Fmt("switch%d", i), Network: network, Version: version, - RemoteAddr: cmn.Fmt("%v:%v", network, rand.Intn(64512)+1023), ListenAddr: cmn.Fmt("%v:%v", network, rand.Intn(64512)+1023), }) s.SetNodePrivKey(privKey) -- 2.11.0