"github.com/bytom/errors"
"github.com/bytom/p2p"
+ "github.com/bytom/p2p/connection"
"github.com/bytom/protocol"
"github.com/bytom/protocol/bc"
"github.com/bytom/protocol/bc/types"
}
// GetChannels implements Reactor
-func (pr *ProtocolReactor) GetChannels() []*p2p.ChannelDescriptor {
- return []*p2p.ChannelDescriptor{
- &p2p.ChannelDescriptor{
+func (pr *ProtocolReactor) GetChannels() []*connection.ChannelDescriptor {
+ return []*connection.ChannelDescriptor{
+ &connection.ChannelDescriptor{
ID: BlockchainChannel,
Priority: 5,
SendQueueCapacity: 100,
import (
cmn "github.com/tendermint/tmlibs/common"
+
+ "github.com/bytom/p2p/connection"
)
//Reactor is responsible for handling incoming messages of one or more `Channels`
SetSwitch(*Switch)
// GetChannels returns the list of channel descriptors.
- GetChannels() []*ChannelDescriptor
+ GetChannels() []*connection.ChannelDescriptor
// AddPeer is called by the switch when a new peer is added.
AddPeer(peer *Peer) error
}
//GetChannels returns the list of channel descriptors
-func (*BaseReactor) GetChannels() []*ChannelDescriptor { return nil }
+func (*BaseReactor) GetChannels() []*connection.ChannelDescriptor { return nil }
//AddPeer is called by the switch when a new peer is added
-func (*BaseReactor) AddPeer(peer *Peer) {}
+func (*BaseReactor) AddPeer(peer *Peer) {}
//RemovePeer is called by the switch when the peer is stopped (due to error or other reason)
-func (*BaseReactor) RemovePeer(peer *Peer, reason interface{}) {}
+func (*BaseReactor) RemovePeer(peer *Peer, reason interface{}) {}
//Receive is called when msgBytes is received from peer
func (*BaseReactor) Receive(chID byte, peer *Peer, msgBytes []byte) {}
-package p2p
+package connection
import (
"bufio"
flushTimer *cmn.ThrottleTimer // flush writes as necessary but throttled.
pingTimer *time.Ticker // send pings periodically
chStatsTimer *time.Ticker // update channel stats periodically
-
- LocalAddress *NetAddress
- RemoteAddress *NetAddress
}
// MConnConfig is a MConnection configuration.
pingTimer: time.NewTicker(pingTimeout),
chStatsTimer: time.NewTicker(updateState),
-
- LocalAddress: NewNetAddress(conn.LocalAddr()),
- RemoteAddress: NewNetAddress(conn.RemoteAddr()),
}
// Create channels
-// +build !network
-
-package p2p_test
+package connection
import (
"net"
"testing"
"time"
- p2p "github.com/bytom/p2p"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"github.com/tendermint/tmlibs/log"
)
-func createMConnection(conn net.Conn) *p2p.MConnection {
+func createMConnection(conn net.Conn) *MConnection {
onReceive := func(chID byte, msgBytes []byte) {
}
onError := func(r interface{}) {
return c
}
-func createMConnectionWithCallbacks(conn net.Conn, onReceive func(chID byte, msgBytes []byte), onError func(r interface{})) *p2p.MConnection {
- chDescs := []*p2p.ChannelDescriptor{&p2p.ChannelDescriptor{ID: 0x01, Priority: 1, SendQueueCapacity: 1}}
- c := p2p.NewMConnection(conn, chDescs, onReceive, onError)
+func createMConnectionWithCallbacks(conn net.Conn, onReceive func(chID byte, msgBytes []byte), onError func(r interface{})) *MConnection {
+ chDescs := []*ChannelDescriptor{&ChannelDescriptor{ID: 0x01, Priority: 1, SendQueueCapacity: 1}}
+ c := NewMConnection(conn, chDescs, onReceive, onError)
c.SetLogger(log.TestingLogger())
return c
}
// is known ahead of time, and thus we are technically
// still vulnerable to MITM. (TODO!)
// See docs/sts-final.pdf for more info
-package p2p
+package connection
import (
"bytes"
-// +build !network
-
-package p2p
+package connection
import (
"bytes"
PubKey crypto.PubKeyEd25519 `json:"pub_key"`
Moniker string `json:"moniker"`
Network string `json:"network"`
- RemoteAddr string `json:"remote_addr"`
ListenAddr string `json:"listen_addr"`
Version string `json:"version"` // major.minor.revision
Other []string `json:"other"` // other application specific data
return portInt
}
-//RemoteAddrHost peer external ip address
-func (info *NodeInfo) RemoteAddrHost() string {
- host, _, _ := net.SplitHostPort(info.RemoteAddr)
- return host
-}
-
//String representation
func (info NodeInfo) String() string {
return fmt.Sprintf("NodeInfo{pk: %v, moniker: %v, network: %v [listen %v], version: %v (%v)}", info.PubKey, info.Moniker, info.Network, info.ListenAddr, info.Version, info.Other)
cmn "github.com/tendermint/tmlibs/common"
cfg "github.com/bytom/config"
+ "github.com/bytom/p2p/connection"
)
// peerConn contains the raw connection and its config.
// raw peerConn and the multiplex connection
*peerConn
- mconn *MConnection // multiplex connection
+ mconn *connection.MConnection // multiplex connection
*NodeInfo
Key string
HandshakeTimeout time.Duration `mapstructure:"handshake_timeout"`
DialTimeout time.Duration `mapstructure:"dial_timeout"`
- MConfig *MConnConfig `mapstructure:"connection"`
+ MConfig *connection.MConnConfig `mapstructure:"connection"`
Fuzz bool `mapstructure:"fuzz"` // fuzz connection (for testing)
FuzzConfig *FuzzConnConfig `mapstructure:"fuzz_config"`
AuthEnc: true,
HandshakeTimeout: time.Duration(config.HandshakeTimeout), // * time.Second,
DialTimeout: time.Duration(config.DialTimeout), // * time.Second,
- MConfig: DefaultMConnConfig(),
+ MConfig: connection.DefaultMConnConfig(),
Fuzz: false,
FuzzConfig: DefaultFuzzConnConfig(),
}
}
-func newPeer(pc *peerConn, nodeInfo *NodeInfo, reactorsByCh map[byte]Reactor, chDescs []*ChannelDescriptor, onPeerError func(*Peer, interface{})) *Peer {
+func newPeer(pc *peerConn, nodeInfo *NodeInfo, reactorsByCh map[byte]Reactor, chDescs []*connection.ChannelDescriptor, onPeerError func(*Peer, interface{})) *Peer {
// Key and NodeInfo are set after Handshake
p := &Peer{
peerConn: pc,
return p
}
-func newOutboundPeer(addr *NetAddress, reactorsByCh map[byte]Reactor, chDescs []*ChannelDescriptor, onPeerError func(*Peer, interface{}), ourNodePrivKey crypto.PrivKeyEd25519, config *cfg.P2PConfig) (*peerConn, error) {
+func newOutboundPeer(addr *NetAddress, reactorsByCh map[byte]Reactor, chDescs []*connection.ChannelDescriptor, onPeerError func(*Peer, interface{}), ourNodePrivKey crypto.PrivKeyEd25519, config *cfg.P2PConfig) (*peerConn, error) {
return newOutboundPeerConn(addr, reactorsByCh, chDescs, onPeerError, ourNodePrivKey, DefaultPeerConfig(config))
}
-func newOutboundPeerConn(addr *NetAddress, reactorsByCh map[byte]Reactor, chDescs []*ChannelDescriptor, onPeerError func(*Peer, interface{}), ourNodePrivKey crypto.PrivKeyEd25519, config *PeerConfig) (*peerConn, error) {
+func newOutboundPeerConn(addr *NetAddress, reactorsByCh map[byte]Reactor, chDescs []*connection.ChannelDescriptor, onPeerError func(*Peer, interface{}), ourNodePrivKey crypto.PrivKeyEd25519, config *PeerConfig) (*peerConn, error) {
conn, err := dial(addr, config)
if err != nil {
return nil, errors.Wrap(err, "Error dial peer")
return pc, nil
}
-func newInboundPeerConn(conn net.Conn, reactorsByCh map[byte]Reactor, chDescs []*ChannelDescriptor, onPeerError func(*Peer, interface{}), ourNodePrivKey crypto.PrivKeyEd25519, config *cfg.P2PConfig) (*peerConn, error) {
+func newInboundPeerConn(conn net.Conn, reactorsByCh map[byte]Reactor, chDescs []*connection.ChannelDescriptor, onPeerError func(*Peer, interface{}), ourNodePrivKey crypto.PrivKeyEd25519, config *cfg.P2PConfig) (*peerConn, error) {
return newPeerConn(conn, false, reactorsByCh, chDescs, onPeerError, ourNodePrivKey, DefaultPeerConfig(config))
}
-func newPeerConn(rawConn net.Conn, outbound bool, reactorsByCh map[byte]Reactor, chDescs []*ChannelDescriptor, onPeerError func(*Peer, interface{}), ourNodePrivKey crypto.PrivKeyEd25519, config *PeerConfig) (*peerConn, error) {
+func newPeerConn(rawConn net.Conn, outbound bool, reactorsByCh map[byte]Reactor, chDescs []*connection.ChannelDescriptor, onPeerError func(*Peer, interface{}), ourNodePrivKey crypto.PrivKeyEd25519, config *PeerConfig) (*peerConn, error) {
conn := rawConn
// Fuzz connection
conn.SetDeadline(time.Now().Add(config.HandshakeTimeout * time.Second))
var err error
- conn, err = MakeSecretConnection(conn, ourNodePrivKey)
+ conn, err = connection.MakeSecretConnection(conn, ourNodePrivKey)
if err != nil {
return nil, errors.Wrap(err, "Error creating peer")
}
// PubKey returns peer's public key.
func (p *Peer) PubKey() crypto.PubKeyEd25519 {
if p.config.AuthEnc {
- return p.conn.(*SecretConnection).RemotePubKey()
+ return p.conn.(*connection.SecretConnection).RemotePubKey()
}
if p.NodeInfo == nil {
panic("Attempt to get peer's PubKey before calling Handshake")
}
// Connection returns underlying MConnection.
-func (p *Peer) Connection() *MConnection {
+func (p *Peer) Connection() *connection.MConnection {
return p.mconn
}
return conn, nil
}
-func createMConnection(conn net.Conn, p *Peer, reactorsByCh map[byte]Reactor, chDescs []*ChannelDescriptor, onPeerError func(*Peer, interface{}), config *MConnConfig) *MConnection {
+func createMConnection(conn net.Conn, p *Peer, reactorsByCh map[byte]Reactor, chDescs []*connection.ChannelDescriptor, onPeerError func(*Peer, interface{}), config *connection.MConnConfig) *connection.MConnection {
onReceive := func(chID byte, msgBytes []byte) {
reactor := reactorsByCh[chID]
if reactor == nil {
onPeerError(p, r)
}
- return NewMConnectionWithConfig(conn, chDescs, onReceive, onError, config)
+ return connection.NewMConnectionWithConfig(conn, chDescs, onReceive, onError, config)
}
cmn "github.com/tendermint/tmlibs/common"
"github.com/bytom/p2p"
+ "github.com/bytom/p2p/connection"
)
const (
}
// GetChannels implements Reactor
-func (r *PEXReactor) GetChannels() []*p2p.ChannelDescriptor {
- return []*p2p.ChannelDescriptor{&p2p.ChannelDescriptor{
+func (r *PEXReactor) GetChannels() []*connection.ChannelDescriptor {
+ return []*connection.ChannelDescriptor{&connection.ChannelDescriptor{
ID: PexChannel,
Priority: 1,
SendQueueCapacity: 10,
// Receive implements Reactor by handling incoming PEX messages.
func (r *PEXReactor) Receive(chID byte, p *p2p.Peer, rawMsg []byte) {
- srcAddr := p.Connection().RemoteAddress
- srcAddrStr := srcAddr.String()
- r.incrementMsgCount(srcAddrStr)
- if r.reachedMaxMsgLimit(srcAddrStr) {
- log.WithField("peer", srcAddrStr).Error("reached the max pex messages limit")
+ addrStr := p.Addr().String()
+ r.incrementMsgCount(addrStr)
+ if r.reachedMaxMsgLimit(addrStr) {
+ log.WithField("peer", addrStr).Error("reached the max pex messages limit")
r.Switch.StopPeerGracefully(p)
return
}
}
case *pexAddrsMessage:
+ srcAddr, err := p2p.NewNetAddressString(addrStr)
+ if err != nil {
+ log.WithField("error", err).Error("pex fail on create src address")
+ return
+ }
+
for _, addr := range msg.Addrs {
if err := r.book.AddAddress(addr, srcAddr); err != nil {
log.WithField("error", err).Error("pex fail on process pexAddrsMessage")
connectedPeers := make(map[string]struct{})
for _, peer := range r.Switch.Peers().List() {
- connectedPeers[peer.RemoteAddrHost()] = struct{}{}
+ connectedPeers[peer.Addr().String()] = struct{}{}
}
for i := 0; i < maxAttempts && len(toDial) < numToDial; i++ {
if dialling := r.Switch.IsDialing(try); dialling {
continue
}
- if _, ok := connectedPeers[try.IP.String()]; ok {
+ if _, ok := connectedPeers[try.String()]; ok {
continue
}
cfg "github.com/bytom/config"
"github.com/bytom/errors"
+ "github.com/bytom/p2p/connection"
"github.com/bytom/p2p/trust"
)
peerConfig *PeerConfig
listeners []Listener
reactors map[string]Reactor
- chDescs []*ChannelDescriptor
+ chDescs []*connection.ChannelDescriptor
reactorsByCh map[byte]Reactor
peers *PeerSet
dialing *cmn.CMap
Config: config,
peerConfig: DefaultPeerConfig(config),
reactors: make(map[string]Reactor),
- chDescs: make([]*ChannelDescriptor, 0),
+ chDescs: make([]*connection.ChannelDescriptor, 0),
reactorsByCh: make(map[byte]Reactor),
peers: NewPeerSet(),
dialing: cmn.NewCMap(),
}
func (sw *Switch) filterConnByPeer(peer *Peer) error {
- if err := sw.checkBannedPeer(peer.NodeInfo.RemoteAddrHost()); err != nil {
+ if err := sw.checkBannedPeer(peer.Addr().String()); err != nil {
return ErrConnectBannedPeer
}
func (sw *Switch) AddBannedPeer(peer *Peer) error {
sw.mtx.Lock()
defer sw.mtx.Unlock()
- key := peer.NodeInfo.RemoteAddrHost()
+ key := peer.Addr().String()
sw.bannedPeer[key] = time.Now().Add(defaultBanDuration)
datajson, err := json.Marshal(sw.bannedPeer)
if err != nil {
cmn "github.com/tendermint/tmlibs/common"
cfg "github.com/bytom/config"
+ "github.com/bytom/p2p/connection"
)
//PanicOnAddPeerErr add peer error
NodeInfo: &NodeInfo{
ListenAddr: netAddr.DialString(),
},
- mconn: &MConnection{},
+ mconn: &connection.MConnection{},
}
return p
}
Moniker: cmn.Fmt("switch%d", i),
Network: network,
Version: version,
- RemoteAddr: cmn.Fmt("%v:%v", network, rand.Intn(64512)+1023),
ListenAddr: cmn.Fmt("%v:%v", network, rand.Intn(64512)+1023),
})
s.SetNodePrivKey(privKey)