type SyncManager struct {
networkID uint64
sw *p2p.Switch
- addrBook *p2p.AddrBook // known peers
privKey crypto.PrivKeyEd25519 // local node's p2p key
chain *core.Chain
var l p2p.Listener
if !config.VaultMode {
p, address := protocolAndAddress(manager.config.P2P.ListenAddress)
- l, listenerStatus = p2p.NewDefaultListener(p, address, manager.config.P2P.SkipUPNP, nil)
+ l, listenerStatus = p2p.NewDefaultListener(p, address, manager.config.P2P.SkipUPNP)
manager.sw.AddListener(l)
}
manager.sw.SetNodeInfo(manager.makeNodeInfo(listenerStatus))
manager.sw.SetNodePrivKey(manager.privKey)
- // Optionally, start the pex reactor
- //var addrBook *p2p.AddrBook
- if config.P2P.PexReactor {
- manager.addrBook = p2p.NewAddrBook(config.P2P.AddrBookFile(), config.P2P.AddrBookStrict)
- pexReactor := p2p.NewPEXReactor(manager.addrBook, manager.sw)
- manager.sw.AddReactor("PEX", pexReactor)
- }
return manager, nil
}
//DialSeeds dial seed peers
func (sm *SyncManager) DialSeeds(seeds []string) error {
- return sm.sw.DialSeeds(sm.addrBook, seeds)
+ return sm.sw.DialSeeds(seeds)
}
//Switch get sync manager switch
--- /dev/null
+package p2p
+
+import (
+ cmn "github.com/tendermint/tmlibs/common"
+)
+
+type Reactor interface {
+ cmn.Service // Start, Stop
+
+ // SetSwitch allows setting a switch.
+ SetSwitch(*Switch)
+
+ // GetChannels returns the list of channel descriptors.
+ GetChannels() []*ChannelDescriptor
+
+ // AddPeer is called by the switch when a new peer is added.
+ AddPeer(peer *Peer) error
+
+ // RemovePeer is called by the switch when the peer is stopped (due to error
+ // or other reason).
+ RemovePeer(peer *Peer, reason interface{})
+
+ // Receive is called when msgBytes is received from peer.
+ //
+ // NOTE reactor can not keep msgBytes around after Receive completes without
+ // copying.
+ //
+ // CONTRACT: msgBytes are not nil.
+ Receive(chID byte, peer *Peer, msgBytes []byte)
+}
+
+//--------------------------------------
+
+type BaseReactor struct {
+ cmn.BaseService // Provides Start, Stop, .Quit
+ Switch *Switch
+}
+
+func NewBaseReactor(name string, impl Reactor) *BaseReactor {
+ return &BaseReactor{
+ BaseService: *cmn.NewBaseService(nil, name, impl),
+ Switch: nil,
+ }
+}
+
+func (br *BaseReactor) SetSwitch(sw *Switch) {
+ br.Switch = sw
+}
+func (*BaseReactor) GetChannels() []*ChannelDescriptor { return nil }
+func (*BaseReactor) AddPeer(peer *Peer) {}
+func (*BaseReactor) RemovePeer(peer *Peer, reason interface{}) {}
+func (*BaseReactor) Receive(chID byte, peer *Peer, msgBytes []byte) {}
"github.com/bytom/p2p/upnp"
log "github.com/sirupsen/logrus"
cmn "github.com/tendermint/tmlibs/common"
- tlog "github.com/tendermint/tmlibs/log"
)
type Listener interface {
}
// skipUPNP: If true, does not try getUPNPExternalAddress()
-func NewDefaultListener(protocol string, lAddr string, skipUPNP bool, logger tlog.Logger) (Listener, bool) {
+func NewDefaultListener(protocol string, lAddr string, skipUPNP bool) (Listener, bool) {
// Local listen IP & port
lAddrIP, lAddrPort := splitHostPort(lAddr)
}
// Actual listener local IP & port
listenerIP, listenerPort := splitHostPort(listener.Addr().String())
- log.WithFields(log.Fields{
- "ip": listenerIP,
- "port": listenerPort,
- }).Info("Local listener")
+ log.Info("Local listener", " ip:", listenerIP, " port:", listenerPort)
// Determine internal address...
var intAddr *NetAddress
}
// Otherwise just use the local address...
if extAddr == nil {
- extAddr = getNaiveExternalAddress(listenerPort)
+ extAddr = getNaiveExternalAddress(listenerPort, false)
}
if extAddr == nil {
cmn.PanicCrisis("Could not determine external address!")
extAddr: extAddr,
connections: make(chan net.Conn, numBufferedConnections),
}
- dl.BaseService = *cmn.NewBaseService(logger, "DefaultListener", dl)
+ dl.BaseService = *cmn.NewBaseService(nil, "DefaultListener", dl)
dl.Start() // Started upon construction
if !listenerStatus && getExtIP {
log.Info("Getting UPNP external address")
nat, err := upnp.Discover()
if err != nil {
- log.WithField("error", err).Error("Could not perform UPNP discover")
+ log.Info("Could not perform UPNP discover. error:", err)
return nil
}
ext, err := nat.GetExternalAddress()
if err != nil {
- log.WithField("error", err).Error("Could not perform UPNP external address")
+ log.Info("Could not perform UPNP external address. error:", err)
return nil
}
externalPort, err = nat.AddPortMapping("tcp", externalPort, internalPort, "bytomd", 0)
if err != nil {
- log.WithField("error", err).Error("Could not add UPNP port mapping")
+ log.Info("Could not add UPNP port mapping. error:", err)
return nil
}
- log.WithField("address", ext).Info("Got UPNP external address")
+ log.Info("Got UPNP external address ", ext)
return NewNetAddressIPPort(ext, uint16(externalPort))
}
-// TODO: use syscalls: http://pastebin.com/9exZG4rh
-func getNaiveExternalAddress(port int) *NetAddress {
+func getNaiveExternalAddress(port int, settleForLocal bool) *NetAddress {
addrs, err := net.InterfaceAddrs()
if err != nil {
cmn.PanicCrisis(cmn.Fmt("Could not fetch interface addresses: %v", err))
continue
}
v4 := ipnet.IP.To4()
- if v4 == nil || v4[0] == 127 {
+ if v4 == nil || (!settleForLocal && v4[0] == 127) {
continue
} // loopback
return NewNetAddressIPPort(ipnet.IP, uint16(port))
}
- return nil
+
+ // try again, but settle for local
+ log.Info("Node may not be connected to internet. Settling for local address")
+ return getNaiveExternalAddress(port, true)
}
-// Modified for Tendermint
+// Modified for Bytom
// Originally Copyright (c) 2013-2014 Conformal Systems LLC.
// https://github.com/conformal/btcd/blob/master/LICENSE
// address in the form of "IP:Port". Also resolves the host if host
// is not an IP.
func NewNetAddressString(addr string) (*NetAddress, error) {
-
host, portStr, err := net.SplitHostPort(addr)
if err != nil {
return nil, err
// NewNetAddressIPPort returns a new NetAddress using the provided IP
// and port number.
func NewNetAddressIPPort(ip net.IP, port uint16) *NetAddress {
- na := &NetAddress{
+ return &NetAddress{
IP: ip,
Port: port,
str: net.JoinHostPort(
strconv.FormatUint(uint64(port), 10),
),
}
- return na
}
// Equals reports whether na and other are the same addresses.
if o, ok := other.(*NetAddress); ok {
return na.String() == o.String()
}
-
- return false
-}
-
-func (na *NetAddress) Less(other interface{}) bool {
- if o, ok := other.(*NetAddress); ok {
- return na.String() < o.String()
- }
-
- cmn.PanicSanity("Cannot compare unequal types")
return false
}
return na.str
}
+func (na *NetAddress) DialString() string {
+ return net.JoinHostPort(
+ na.IP.String(),
+ strconv.FormatUint(uint64(na.Port), 10),
+ )
+}
+
// Dial calls net.Dial on the address.
func (na *NetAddress) Dial() (net.Conn, error) {
- conn, err := net.Dial("tcp", na.String())
+ conn, err := net.Dial("tcp", na.DialString())
if err != nil {
return nil, err
}
// DialTimeout calls net.DialTimeout on the address.
func (na *NetAddress) DialTimeout(timeout time.Duration) (net.Conn, error) {
- conn, err := net.DialTimeout("tcp", na.String(), timeout)
+ conn, err := net.DialTimeout("tcp", na.DialString(), timeout)
if err != nil {
return nil, err
}
Ipv6_weak
Ipv4
Ipv6_strong
- Private
)
if !na.Routable() {
return Unreachable
Other []string `json:"other"` // other application specific data
}
-// CONTRACT: two nodes are compatible if the major/minor versions match and network match
+// CompatibleWith checks if two NodeInfo are compatible with eachother.
+// CONTRACT: two nodes are compatible if the major version matches and network match
+// and they have at least one channel in common.
func (info *NodeInfo) CompatibleWith(other *NodeInfo) error {
iMajor, iMinor, _, iErr := splitVersion(info.Version)
oMajor, oMinor, _, oErr := splitVersion(other.Version)
return port_i
}
+func (info *NodeInfo) RemoteAddrHost() string {
+ host, _, _ := net.SplitHostPort(info.RemoteAddr)
+ return host
+}
+
func (info NodeInfo) String() string {
- return fmt.Sprintf("NodeInfo{pk: %v, moniker: %v, network: %v [remote %v, listen %v], version: %v (%v)}", info.PubKey, info.Moniker, info.Network, info.RemoteAddr, info.ListenAddr, info.Version, info.Other)
+ return fmt.Sprintf("NodeInfo{pk: %v, moniker: %v, network: %v [listen %v], version: %v (%v)}", info.PubKey, info.Moniker, info.Network, info.ListenAddr, info.Version, info.Other)
}
func splitVersion(version string) (string, string, string, error) {
// Redial function to reconnect. Note that inbound peers can't be
// made persistent. They should be made persistent on the other end.
//
+
+// peerConn contains the raw connection and its config.
+type peerConn struct {
+ outbound bool
+ config *PeerConfig
+ conn net.Conn // source connection
+}
+
// Before using a peer, you will need to perform a handshake on connection.
type Peer struct {
cmn.BaseService
- outbound bool
-
- conn net.Conn // source connection
+ // raw peerConn and the multiplex connection
+ *peerConn
mconn *MConnection // multiplex connection
- persistent bool
- config *PeerConfig
-
*NodeInfo
Key string
Data *cmn.CMap // User data.
return &PeerConfig{
AuthEnc: true,
HandshakeTimeout: time.Duration(config.HandshakeTimeout), // * time.Second,
- DialTimeout: time.Duration(config.DialTimeout), // * time.Second,
+ DialTimeout: time.Duration(config.DialTimeout), // * time.Second,
MConfig: DefaultMConnConfig(),
Fuzz: false,
FuzzConfig: DefaultFuzzConnConfig(),
}
}
-func newOutboundPeer(addr *NetAddress, reactorsByCh map[byte]Reactor, chDescs []*ChannelDescriptor, onPeerError func(*Peer, interface{}), ourNodePrivKey crypto.PrivKeyEd25519, config *cfg.P2PConfig) (*Peer, error) {
- return newOutboundPeerWithConfig(addr, reactorsByCh, chDescs, onPeerError, ourNodePrivKey, DefaultPeerConfig(config))
+func newPeer(pc *peerConn, nodeInfo *NodeInfo, reactorsByCh map[byte]Reactor, chDescs []*ChannelDescriptor, onPeerError func(*Peer, interface{})) *Peer {
+ // Key and NodeInfo are set after Handshake
+ p := &Peer{
+ peerConn: pc,
+ NodeInfo: nodeInfo,
+
+ Data: cmn.NewCMap(),
+ }
+ p.Key = nodeInfo.PubKey.KeyString()
+ p.mconn = createMConnection(pc.conn, p, reactorsByCh, chDescs, onPeerError, pc.config.MConfig)
+
+ p.BaseService = *cmn.NewBaseService(nil, "Peer", p)
+ return p
+}
+
+func newOutboundPeer(addr *NetAddress, reactorsByCh map[byte]Reactor, chDescs []*ChannelDescriptor, onPeerError func(*Peer, interface{}), ourNodePrivKey crypto.PrivKeyEd25519, config *cfg.P2PConfig) (*peerConn, error) {
+ return newOutboundPeerConn(addr, reactorsByCh, chDescs, onPeerError, ourNodePrivKey, DefaultPeerConfig(config))
}
-func newOutboundPeerWithConfig(addr *NetAddress, reactorsByCh map[byte]Reactor, chDescs []*ChannelDescriptor, onPeerError func(*Peer, interface{}), ourNodePrivKey crypto.PrivKeyEd25519, config *PeerConfig) (*Peer, error) {
+func newOutboundPeerConn(addr *NetAddress, reactorsByCh map[byte]Reactor, chDescs []*ChannelDescriptor, onPeerError func(*Peer, interface{}), ourNodePrivKey crypto.PrivKeyEd25519, config *PeerConfig) (*peerConn, error) {
conn, err := dial(addr, config)
if err != nil {
- return nil, errors.Wrap(err, "Error creating peer")
+ return nil, errors.Wrap(err, "Error dial peer")
}
- peer, err := newPeerFromConnAndConfig(conn, true, reactorsByCh, chDescs, onPeerError, ourNodePrivKey, config)
+ pc, err := newPeerConn(conn, true, reactorsByCh, chDescs, onPeerError, ourNodePrivKey, config)
if err != nil {
conn.Close()
return nil, err
}
- return peer, nil
-}
-func newInboundPeer(conn net.Conn, reactorsByCh map[byte]Reactor, chDescs []*ChannelDescriptor, onPeerError func(*Peer, interface{}), ourNodePrivKey crypto.PrivKeyEd25519, config *cfg.P2PConfig) (*Peer, error) {
- return newInboundPeerWithConfig(conn, reactorsByCh, chDescs, onPeerError, ourNodePrivKey, DefaultPeerConfig(config))
+ return pc, nil
}
-func newInboundPeerWithConfig(conn net.Conn, reactorsByCh map[byte]Reactor, chDescs []*ChannelDescriptor, onPeerError func(*Peer, interface{}), ourNodePrivKey crypto.PrivKeyEd25519, config *PeerConfig) (*Peer, error) {
- return newPeerFromConnAndConfig(conn, false, reactorsByCh, chDescs, onPeerError, ourNodePrivKey, config)
+func newInboundPeerConn(conn net.Conn, reactorsByCh map[byte]Reactor, chDescs []*ChannelDescriptor, onPeerError func(*Peer, interface{}), ourNodePrivKey crypto.PrivKeyEd25519, config *cfg.P2PConfig) (*peerConn, error) {
+ return newPeerConn(conn, false, reactorsByCh, chDescs, onPeerError, ourNodePrivKey, DefaultPeerConfig(config))
}
-func newPeerFromConnAndConfig(rawConn net.Conn, outbound bool, reactorsByCh map[byte]Reactor, chDescs []*ChannelDescriptor, onPeerError func(*Peer, interface{}), ourNodePrivKey crypto.PrivKeyEd25519, config *PeerConfig) (*Peer, error) {
+func newPeerConn(rawConn net.Conn, outbound bool, reactorsByCh map[byte]Reactor, chDescs []*ChannelDescriptor, onPeerError func(*Peer, interface{}), ourNodePrivKey crypto.PrivKeyEd25519, config *PeerConfig) (*peerConn, error) {
conn := rawConn
// Fuzz connection
}
}
- // Key and NodeInfo are set after Handshake
- p := &Peer{
+ // Only the information we already have
+ return &peerConn{
+ config: config,
outbound: outbound,
conn: conn,
- config: config,
- Data: cmn.NewCMap(),
- }
-
- p.mconn = createMConnection(conn, p, reactorsByCh, chDescs, onPeerError, config.MConfig)
-
- p.BaseService = *cmn.NewBaseService(nil, "Peer", p)
-
- return p, nil
+ }, nil
}
// CloseConn should be used when the peer was created, but never started.
-func (p *Peer) CloseConn() {
- p.conn.Close()
-}
-
-// makePersistent marks the peer as persistent.
-func (p *Peer) makePersistent() {
- if !p.outbound {
- panic("inbound peers can't be made persistent")
- }
-
- p.persistent = true
-}
-
-// IsPersistent returns true if the peer is persitent, false otherwise.
-func (p *Peer) IsPersistent() bool {
- return p.persistent
+func (pc *peerConn) CloseConn() {
+ pc.conn.Close()
}
// HandshakeTimeout performs a handshake between a given node and the peer.
// NOTE: blocking
-func (p *Peer) HandshakeTimeout(ourNodeInfo *NodeInfo, timeout time.Duration) error {
+func (pc *peerConn) HandshakeTimeout(ourNodeInfo *NodeInfo, timeout time.Duration) (*NodeInfo, error) {
// Set deadline for handshake so we don't block forever on conn.ReadFull
- p.conn.SetDeadline(time.Now().Add(timeout))
+ pc.conn.SetDeadline(time.Now().Add(timeout))
var peerNodeInfo = new(NodeInfo)
var err1 error
cmn.Parallel(
func() {
var n int
- wire.WriteBinary(ourNodeInfo, p.conn, &n, &err1)
+ wire.WriteBinary(ourNodeInfo, pc.conn, &n, &err1)
},
func() {
var n int
- wire.ReadBinary(peerNodeInfo, p.conn, maxNodeInfoSize, &n, &err2)
+ wire.ReadBinary(peerNodeInfo, pc.conn, maxNodeInfoSize, &n, &err2)
log.WithField("peerNodeInfo", peerNodeInfo).Info("Peer handshake")
})
if err1 != nil {
- return errors.Wrap(err1, "Error during handshake/write")
+ return peerNodeInfo, errors.Wrap(err1, "Error during handshake/write")
}
if err2 != nil {
- return errors.Wrap(err2, "Error during handshake/read")
- }
-
- if p.config.AuthEnc {
- // Check that the professed PubKey matches the sconn's.
- if !peerNodeInfo.PubKey.Equals(p.PubKey().Wrap()) {
- return fmt.Errorf("Ignoring connection with unmatching pubkey: %v vs %v",
- peerNodeInfo.PubKey, p.PubKey())
- }
+ return peerNodeInfo, errors.Wrap(err2, "Error during handshake/read")
}
// Remove deadline
- p.conn.SetDeadline(time.Time{})
-
- peerNodeInfo.RemoteAddr = p.Addr().String()
-
- p.NodeInfo = peerNodeInfo
- p.Key = peerNodeInfo.PubKey.KeyString()
+ pc.conn.SetDeadline(time.Time{})
- return nil
+ return peerNodeInfo, nil
}
// Addr returns peer's remote network address.
onReceive := func(chID byte, msgBytes []byte) {
reactor := reactorsByCh[chID]
if reactor == nil {
- if chID == PexChannel {
- return
- } else {
- cmn.PanicSanity(cmn.Fmt("Unknown channel %X", chID))
- }
+ cmn.PanicSanity(cmn.Fmt("Unknown channel %X", chID))
}
reactor.Receive(chID, p, msgBytes)
}
index int
}
+// NewPeerSet creates a new peerSet with a list of initial capacity of 256 items.
func NewPeerSet() *PeerSet {
return &PeerSet{
lookup: make(map[string]*peerSetItem),
}
}
+// Add adds the peer to the PeerSet.
// Returns false if peer with key (PubKeyEd25519) is already set
func (ps *PeerSet) Add(peer *Peer) error {
ps.mtx.Lock()
defer ps.mtx.Unlock()
if ps.lookup[peer.Key] != nil {
- return ErrSwitchDuplicatePeer
+ return ErrDuplicatePeer
}
index := len(ps.list)
return nil
}
+// Has returns true if the PeerSet contains
+// the peer referred to by this peerKey.
func (ps *PeerSet) Has(peerKey string) bool {
ps.mtx.Lock()
defer ps.mtx.Unlock()
return ok
}
+// Get looks up a peer by the provided peerKey.
func (ps *PeerSet) Get(peerKey string) *Peer {
ps.mtx.Lock()
defer ps.mtx.Unlock()
item, ok := ps.lookup[peerKey]
if ok {
return item.peer
- } else {
- return nil
}
+ return nil
}
+// Remove discards peer if the peer was previously memoized.
func (ps *PeerSet) Remove(peer *Peer) {
ps.mtx.Lock()
defer ps.mtx.Unlock()
}
+// Size returns the number of unique items in the peerSet.
func (ps *PeerSet) Size() int {
ps.mtx.Lock()
defer ps.mtx.Unlock()
}
func (r *PEXReactor) dialPeerWorker(a *NetAddress, wg *sync.WaitGroup) {
- if _, err := r.Switch.DialPeerWithAddress(a, false); err != nil {
+ if err := r.Switch.DialPeerWithAddress(a); err != nil {
r.book.MarkAttempt(a)
} else {
r.book.MarkGood(a)
"fmt"
"math/rand"
"net"
- "strings"
"sync"
"time"
)
const (
- reconnectAttempts = 5
- reconnectInterval = 10 * time.Second
-
bannedPeerKey = "BannedPeer"
defaultBanDuration = time.Hour * 1
)
-var ErrConnectBannedPeer = errors.New("Connect banned peer")
-
-type Reactor interface {
- cmn.Service // Start, Stop
-
- SetSwitch(*Switch)
- GetChannels() []*ChannelDescriptor
- AddPeer(peer *Peer) error
- RemovePeer(peer *Peer, reason interface{})
- Receive(chID byte, peer *Peer, msgBytes []byte)
-}
-
-//--------------------------------------
-
-type BaseReactor struct {
- cmn.BaseService // Provides Start, Stop, .Quit
- Switch *Switch
-}
-
-func NewBaseReactor(name string, impl Reactor) *BaseReactor {
- return &BaseReactor{
- BaseService: *cmn.NewBaseService(nil, name, impl),
- Switch: nil,
- }
-}
-
-func (br *BaseReactor) SetSwitch(sw *Switch) {
- br.Switch = sw
-}
-func (_ *BaseReactor) GetChannels() []*ChannelDescriptor { return nil }
-func (_ *BaseReactor) AddPeer(peer *Peer) {}
-func (_ *BaseReactor) RemovePeer(peer *Peer, reason interface{}) {}
-func (_ *BaseReactor) Receive(chID byte, peer *Peer, msgBytes []byte) {}
+var (
+ ErrDuplicatePeer = errors.New("Duplicate peer")
+ ErrConnectSelf = errors.New("Connect self")
+ ErrConnectBannedPeer = errors.New("Connect banned peer")
+)
//-----------------------------------------------------------------------------
-/*
-The `Switch` handles peer connections and exposes an API to receive incoming messages
-on `Reactors`. Each `Reactor` is responsible for handling incoming messages of one
-or more `Channels`. So while sending outgoing messages is typically performed on the peer,
-incoming messages are received on the reactor.
-*/
+// Switch handles peer connections and exposes an API to receive incoming messages
+// on `Reactors`. Each `Reactor` is responsible for handling incoming messages of one
+// or more `Channels`. So while sending outgoing messages is typically performed on the peer,
+// incoming messages are received on the reactor.
type Switch struct {
cmn.BaseService
dialing *cmn.CMap
nodeInfo *NodeInfo // our node info
nodePrivKey crypto.PrivKeyEd25519 // our node privkey
+ addrBook *AddrBook
bannedPeer map[string]time.Time
db dbm.DB
mtx sync.Mutex
-
- filterConnByAddr func(net.Addr) error
- filterConnByPubKey func(crypto.PubKeyEd25519) error
}
-var (
- ErrSwitchDuplicatePeer = errors.New("Duplicate peer")
- ErrConnectSelf = errors.New("Connect self")
- ErrPeerConnected = errors.New("Peer is connected")
-)
-
+// NewSwitch creates a new Switch with the given config.
func NewSwitch(config *cfg.P2PConfig, trustHistoryDB dbm.DB) *Switch {
sw := &Switch{
config: config,
}
sw.BaseService = *cmn.NewBaseService(nil, "P2P Switch", sw)
+ // Optionally, start the pex reactor
+ if config.PexReactor {
+ sw.addrBook = NewAddrBook(config.AddrBookFile(), config.AddrBookStrict)
+ pexReactor := NewPEXReactor(sw.addrBook, sw)
+ sw.AddReactor("PEX", pexReactor)
+ }
+
sw.bannedPeer = make(map[string]time.Time)
if datajson := sw.db.Get([]byte(bannedPeerKey)); datajson != nil {
if err := json.Unmarshal(datajson, &sw.bannedPeer); err != nil {
return sw
}
-// Not goroutine safe.
+// AddReactor adds the given reactor to the switch.
+// NOTE: Not goroutine safe.
func (sw *Switch) AddReactor(name string, reactor Reactor) Reactor {
// Validate the reactor.
// No two reactors can share the same channel.
return reactor
}
-// Not goroutine safe.
+// Reactors returns a map of reactors registered on the switch.
+// NOTE: Not goroutine safe.
func (sw *Switch) Reactors() map[string]Reactor {
return sw.reactors
}
-// Not goroutine safe.
+// Reactor returns the reactor with the given name.
+// NOTE: Not goroutine safe.
func (sw *Switch) Reactor(name string) Reactor {
return sw.reactors[name]
}
-// Not goroutine safe.
+// AddListener adds the given listener to the switch for listening to incoming peer connections.
+// NOTE: Not goroutine safe.
func (sw *Switch) AddListener(l Listener) {
sw.listeners = append(sw.listeners, l)
}
-// Not goroutine safe.
+// Listeners returns the list of listeners the switch listens on.
+// NOTE: Not goroutine safe.
func (sw *Switch) Listeners() []Listener {
return sw.listeners
}
-// Not goroutine safe.
+// IsListening returns true if the switch has at least one listener.
+// NOTE: Not goroutine safe.
func (sw *Switch) IsListening() bool {
return len(sw.listeners) > 0
}
-// Not goroutine safe.
+// SetNodeInfo sets the switch's NodeInfo for checking compatibility and handshaking with other nodes.
+// NOTE: Not goroutine safe.
func (sw *Switch) SetNodeInfo(nodeInfo *NodeInfo) {
sw.nodeInfo = nodeInfo
}
-// Not goroutine safe.
+// NodeInfo returns the switch's NodeInfo.
+// NOTE: Not goroutine safe.
func (sw *Switch) NodeInfo() *NodeInfo {
return sw.nodeInfo
}
-// Not goroutine safe.
-// NOTE: Overwrites sw.nodeInfo.PubKey
+// SetNodeKey sets the switch's private key for authenticated encryption.
+// NOTE: Not goroutine safe.
func (sw *Switch) SetNodePrivKey(nodePrivKey crypto.PrivKeyEd25519) {
sw.nodePrivKey = nodePrivKey
if sw.nodeInfo != nil {
}
}
-// Switch.Start() starts all the reactors, peers, and listeners.
+// OnStart implements BaseService. It starts all the reactors, peers, and listeners.
func (sw *Switch) OnStart() error {
- sw.BaseService.OnStart()
// Start reactors
for _, reactor := range sw.reactors {
_, err := reactor.Start()
return nil
}
+// OnStop implements BaseService. It stops all listeners, peers, and reactors.
func (sw *Switch) OnStop() {
- sw.BaseService.OnStop()
// Stop listeners
for _, listener := range sw.listeners {
listener.Stop()
}
}
+// addPeer performs the P2P handshake with a peer
+// that already has a SecretConnection. If all goes well,
+// it starts the peer and adds it to the switch.
// NOTE: This performs a blocking handshake before the peer is added.
// CONTRACT: If error is returned, peer is nil, and conn is immediately closed.
-func (sw *Switch) AddPeer(peer *Peer) error {
- if err := sw.FilterConnByAddr(peer.Addr()); err != nil {
- return err
- }
-
- if err := sw.FilterConnByPubKey(peer.PubKey()); err != nil {
- return err
- }
-
- if err := peer.HandshakeTimeout(sw.nodeInfo, time.Duration(sw.peerConfig.HandshakeTimeout*time.Second)); err != nil {
- return err
- }
-
- if err := sw.checkBannedPeer(peer.NodeInfo.ListenHost()); err != nil {
- return err
- }
-
- // Avoid self
- if sw.nodeInfo.PubKey.Equals(peer.PubKey().Wrap()) {
- return errors.New("Ignoring connection from self")
+func (sw *Switch) AddPeer(pc *peerConn) error {
+ peerNodeInfo, err := pc.HandshakeTimeout(sw.nodeInfo, time.Duration(sw.peerConfig.HandshakeTimeout*time.Second))
+ if err != nil {
+ return ErrConnectBannedPeer
}
-
// Check version, chain id
- if err := sw.nodeInfo.CompatibleWith(peer.NodeInfo); err != nil {
+ if err := sw.nodeInfo.CompatibleWith(peerNodeInfo); err != nil {
return err
}
- // Check for duplicate peer
- if sw.peers.Has(peer.Key) {
- return ErrSwitchDuplicatePeer
+ peer := newPeer(pc, peerNodeInfo, sw.reactorsByCh, sw.chDescs, sw.StopPeerForError)
+ //filter peer
+ if err := sw.filterConnByPeer(peer); err != nil {
+ return err
}
// Start peer
return err
}
- log.WithField("peer", peer).Info("Added peer")
- return nil
-}
-
-func (sw *Switch) FilterConnByAddr(addr net.Addr) error {
- if sw.filterConnByAddr != nil {
- return sw.filterConnByAddr(addr)
- }
- return nil
-}
-
-func (sw *Switch) FilterConnByPubKey(pubkey crypto.PubKeyEd25519) error {
- if sw.filterConnByPubKey != nil {
- return sw.filterConnByPubKey(pubkey)
- }
+ log.Info("Added peer:", peer)
return nil
-
-}
-
-func (sw *Switch) SetAddrFilter(f func(net.Addr) error) {
- sw.filterConnByAddr = f
-}
-
-func (sw *Switch) SetPubKeyFilter(f func(crypto.PubKeyEd25519) error) {
- sw.filterConnByPubKey = f
}
func (sw *Switch) startInitPeer(peer *Peer) error {
}
// Dial a list of seeds asynchronously in random order
-func (sw *Switch) DialSeeds(addrBook *AddrBook, seeds []string) error {
-
+func (sw *Switch) DialSeeds(seeds []string) error {
netAddrs, err := NewNetAddressStrings(seeds)
if err != nil {
return err
}
- if addrBook != nil {
+ if sw.addrBook != nil {
// add seeds to `addrBook`
- ourAddrS := sw.nodeInfo.ListenAddr
- ourAddr, _ := NewNetAddressString(ourAddrS)
+ ourAddr, _ := NewNetAddressString(sw.nodeInfo.ListenAddr)
for _, netAddr := range netAddrs {
// do not add ourselves
if netAddr.Equals(ourAddr) {
continue
}
- addrBook.AddAddress(netAddr, ourAddr)
+ sw.addrBook.AddAddress(netAddr, ourAddr)
}
- addrBook.Save()
+ sw.addrBook.Save()
}
+
//permute the list, dial them in random order.
perm := rand.Perm(len(netAddrs))
for i := 0; i < len(perm); i += 2 {
}
func (sw *Switch) dialSeed(addr *NetAddress) {
- peer, err := sw.DialPeerWithAddress(addr, false)
+ err := sw.DialPeerWithAddress(addr)
if err != nil {
- log.WithField("error", err).Error("Error dialing seed")
- } else {
- log.WithField("peer", peer).Info("Connected to seed")
+ log.Info("Error dialing seed:", addr.String())
}
}
-func (sw *Switch) DialPeerWithAddress(addr *NetAddress, persistent bool) (*Peer, error) {
- if err := sw.checkBannedPeer(addr.IP.String()); err != nil {
- return nil, err
+func (sw *Switch) addrBookDelSelf() error {
+ addr, err := NewNetAddressString(sw.nodeInfo.ListenAddr)
+ if err != nil {
+ return err
}
- if strings.Compare(addr.IP.String(), sw.nodeInfo.ListenHost()) == 0 {
- return nil, ErrConnectSelf
+ // remove the given address from the address book if we're added it earlier
+ sw.addrBook.RemoveAddress(addr)
+ // add the given address to the address book to avoid dialing ourselves
+ // again this is our public address
+ sw.addrBook.AddOurAddress(addr)
+ return nil
+}
+
+func (sw *Switch) filterConnByIP(ip string) error {
+ if err := sw.checkBannedPeer(ip); err != nil {
+ return ErrConnectBannedPeer
}
- for _, v := range sw.Peers().list {
- if strings.Compare(v.mconn.RemoteAddress.IP.String(), addr.IP.String()) == 0 {
- return nil, ErrPeerConnected
- }
+
+ if ip == sw.nodeInfo.ListenHost() {
+ sw.addrBookDelSelf()
+ return ErrConnectSelf
+ }
+
+ return nil
+}
+
+func (sw *Switch) filterConnByPeer(peer *Peer) error {
+ if err := sw.checkBannedPeer(peer.NodeInfo.RemoteAddrHost()); err != nil {
+ return ErrConnectBannedPeer
+ }
+
+ if sw.nodeInfo.PubKey.Equals(peer.PubKey().Wrap()) {
+ sw.addrBookDelSelf()
+ return ErrConnectSelf
+ }
+
+ // Check for duplicate peer
+ if sw.peers.Has(peer.Key) {
+ return ErrDuplicatePeer
+ }
+ return nil
+}
+
+func (sw *Switch) DialPeerWithAddress(addr *NetAddress) error {
+ log.Debug("Dialing peer address:", addr)
+
+ if err := sw.filterConnByIP(addr.IP.String()); err != nil {
+ return err
}
+
sw.dialing.Set(addr.IP.String(), addr)
defer sw.dialing.Delete(addr.IP.String())
- log.Debug("Dialing peer address:", addr)
- peer, err := newOutboundPeerWithConfig(addr, sw.reactorsByCh, sw.chDescs, sw.StopPeerForError, sw.nodePrivKey, sw.peerConfig)
+ pc, err := newOutboundPeerConn(addr, sw.reactorsByCh, sw.chDescs, sw.StopPeerForError, sw.nodePrivKey, sw.peerConfig)
if err != nil {
log.Debug("Failed to dial peer", " address:", addr, " error:", err)
- return nil, err
- }
- peer.SetLogger(sw.Logger.With("peer", addr))
- if persistent {
- peer.makePersistent()
+ return err
}
- err = sw.AddPeer(peer)
+
+ err = sw.AddPeer(pc)
if err != nil {
- log.WithFields(log.Fields{
- "address": addr,
- "error": err,
- }).Info("Failed to add peer")
- peer.CloseConn()
- return nil, err
+ log.Info("Failed to add peer:", addr, " err:", err)
+ pc.CloseConn()
+ return err
}
- log.WithFields(log.Fields{
- "address": addr,
- }).Info("Dialed and added peer")
- return peer, nil
+ log.Info("Dialed and added peer:", addr)
+ return nil
}
func (sw *Switch) IsDialing(addr *NetAddress) bool {
return sw.dialing.Has(addr.IP.String())
}
-// Broadcast runs a go routine for each attempted send, which will block
-// trying to send for defaultSendTimeoutSeconds. Returns a channel
-// which receives success values for each attempted send (false if times out)
-// NOTE: Broadcast uses goroutines, so order of broadcast may not be preserved.
-func (sw *Switch) Broadcast(chID byte, msg interface{}) chan bool {
- successChan := make(chan bool, len(sw.peers.List()))
- log.WithFields(log.Fields{
- "chID": chID,
- "msg": msg,
- }).Debug("Broadcast")
- for _, peer := range sw.peers.List() {
- go func(peer *Peer) {
- success := peer.Send(chID, msg)
- successChan <- success
- }(peer)
- }
- return successChan
-}
-
// Returns the count of outbound/inbound and outbound-dialing peers.
func (sw *Switch) NumPeers() (outbound, inbound, dialing int) {
peers := sw.peers.List()
return sw.peers
}
-// Disconnect from a peer due to external error, retry if it is a persistent peer.
-// TODO: make record depending on reason.
+// StopPeerForError disconnects from a peer due to external error.
func (sw *Switch) StopPeerForError(peer *Peer, reason interface{}) {
- addr := NewNetAddress(peer.Addr())
- log.WithFields(log.Fields{
- "peer": peer,
- "error": reason,
- }).Info("Stopping peer due to error")
+ log.Info("Stopping peer for error.", " peer:", peer, " err:", reason)
sw.stopAndRemovePeer(peer, reason)
-
- if peer.IsPersistent() {
- log.WithField("peer", peer).Info("Reconnecting to peer")
- for i := 1; i < reconnectAttempts; i++ {
- if !sw.IsRunning() {
- return
- }
-
- peer, err := sw.DialPeerWithAddress(addr, false)
- if err != nil {
- if i == reconnectAttempts {
- log.WithFields(log.Fields{
- "retries": i,
- "error": err,
- }).Info("Error reconnecting to peer. Giving up")
- return
- }
-
- if errors.Root(err) == ErrConnectBannedPeer || errors.Root(err) == ErrPeerConnected || errors.Root(err) == ErrSwitchDuplicatePeer || errors.Root(err) == ErrConnectSelf {
- log.WithField("error", err).Info("Error reconnecting to peer. ")
- return
- }
-
- log.WithFields(log.Fields{
- "retries": i,
- "error": err,
- }).Info("Error reconnecting to peer. Trying again")
- time.Sleep(reconnectInterval)
- continue
- }
-
- log.WithField("peer", peer).Info("Reconnected to peer")
- return
- }
- }
}
// Disconnect from a peer gracefully.
-// TODO: handle graceful disconnects.
func (sw *Switch) StopPeerGracefully(peer *Peer) {
log.Info("Stopping peer gracefully")
sw.stopAndRemovePeer(peer, nil)
reactor.RemovePeer(peer, reason)
}
sw.peers.Remove(peer)
- log.Info("Del peer from switch.")
peer.Stop()
- log.Info("Peer connection is closed.")
}
func (sw *Switch) listenerRoutine(l Listener) {
// disconnect if we alrady have 2 * MaxNumPeers, we do this because we wanna address book get exchanged even if
// the connect is full. The pex will disconnect the peer after address exchange, the max connected peer won't
// be double of MaxNumPeers
- if sw.config.MaxNumPeers*2 <= sw.peers.Size() {
- // close inConn
+ if sw.peers.Size() >= sw.config.MaxNumPeers*2 {
inConn.Close()
- log.WithFields(log.Fields{
- "address": inConn.RemoteAddr().String(),
- "numPeers": sw.peers.Size(),
- }).Info("Ignoring inbound connection: already have enough peers")
+ log.Info("Ignoring inbound connection: already have enough peers.")
continue
}
// New inbound connection!
- err := sw.addPeerWithConnectionAndConfig(inConn, sw.peerConfig)
+ err := sw.addPeerWithConnection(inConn)
if err != nil {
- // conn close for returing err
- inConn.Close()
- log.WithFields(log.Fields{
- "address": inConn.RemoteAddr().String(),
- "error": err,
- }).Info("Ignoring inbound connection: error while adding peer")
+ log.Info("Ignoring inbound connection: error while adding peer.", " address:", inConn.RemoteAddr().String(), " error:", err)
continue
}
-
- // NOTE: We don't yet have the listening port of the
- // remote (if they have a listener at all).
- // The peerHandshake will handle that
- }
-
- // cleanup
-}
-
-//-----------------------------------------------------------------------------
-
-type SwitchEventNewPeer struct {
- Peer *Peer
-}
-
-type SwitchEventDonePeer struct {
- Peer *Peer
- Error interface{}
-}
-
-//------------------------------------------------------------------
-// Switches connected via arbitrary net.Conn; useful for testing
-
-// Returns n switches, connected according to the connect func.
-// If connect==Connect2Switches, the switches will be fully connected.
-// initSwitch defines how the ith switch should be initialized (ie. with what reactors).
-// NOTE: panics if any switch fails to start.
-func MakeConnectedSwitches(cfg *cfg.P2PConfig, n int, initSwitch func(int, *Switch) *Switch, connect func([]*Switch, int, int)) []*Switch {
- switches := make([]*Switch, n)
- for i := 0; i < n; i++ {
- switches[i] = makeSwitch(cfg, i, "testing", "123.123.123", initSwitch)
- }
-
- if err := StartSwitches(switches); err != nil {
- panic(err)
- }
-
- for i := 0; i < n; i++ {
- for j := i; j < n; j++ {
- connect(switches, i, j)
- }
}
-
- return switches
-}
-
-var PanicOnAddPeerErr = false
-
-// Will connect switches i and j via net.Pipe()
-// Blocks until a conection is established.
-// NOTE: caller ensures i and j are within bounds
-func Connect2Switches(switches []*Switch, i, j int) {
- switchI := switches[i]
- switchJ := switches[j]
- c1, c2 := net.Pipe()
- doneCh := make(chan struct{})
- go func() {
- err := switchI.addPeerWithConnection(c1)
- if PanicOnAddPeerErr && err != nil {
- panic(err)
- }
- doneCh <- struct{}{}
- }()
- go func() {
- err := switchJ.addPeerWithConnection(c2)
- if PanicOnAddPeerErr && err != nil {
- panic(err)
- }
- doneCh <- struct{}{}
- }()
- <-doneCh
- <-doneCh
-}
-
-func StartSwitches(switches []*Switch) error {
- for _, s := range switches {
- _, err := s.Start() // start switch and reactors
- if err != nil {
- return err
- }
- }
- return nil
-}
-
-func makeSwitch(cfg *cfg.P2PConfig, i int, network, version string, initSwitch func(int, *Switch) *Switch) *Switch {
- privKey := crypto.GenPrivKeyEd25519()
- // new switch, add reactors
- // TODO: let the config be passed in?
- s := initSwitch(i, NewSwitch(cfg, nil))
- s.SetNodeInfo(&NodeInfo{
- PubKey: privKey.PubKey().Unwrap().(crypto.PubKeyEd25519),
- Moniker: cmn.Fmt("switch%d", i),
- Network: network,
- Version: version,
- RemoteAddr: cmn.Fmt("%v:%v", network, rand.Intn(64512)+1023),
- ListenAddr: cmn.Fmt("%v:%v", network, rand.Intn(64512)+1023),
- })
- s.SetNodePrivKey(privKey)
- return s
}
func (sw *Switch) addPeerWithConnection(conn net.Conn) error {
- peer, err := newInboundPeer(conn, sw.reactorsByCh, sw.chDescs, sw.StopPeerForError, sw.nodePrivKey, sw.config)
+ peerConn, err := newInboundPeerConn(conn, sw.reactorsByCh, sw.chDescs, sw.StopPeerForError, sw.nodePrivKey, sw.config)
if err != nil {
conn.Close()
return err
}
- peer.SetLogger(sw.Logger.With("peer", conn.RemoteAddr()))
- if err = sw.AddPeer(peer); err != nil {
+ if err = sw.AddPeer(peerConn); err != nil {
conn.Close()
return err
}
return nil
}
-func (sw *Switch) addPeerWithConnectionAndConfig(conn net.Conn, config *PeerConfig) error {
- fullAddr := conn.RemoteAddr().String()
- host, _, err := net.SplitHostPort(fullAddr)
- if err != nil {
- return err
- }
-
- if err = sw.checkBannedPeer(host); err != nil {
- return err
- }
-
- peer, err := newInboundPeerWithConfig(conn, sw.reactorsByCh, sw.chDescs, sw.StopPeerForError, sw.nodePrivKey, config)
- if err != nil {
- return err
- }
- peer.SetLogger(sw.Logger.With("peer", conn.RemoteAddr()))
- if err = sw.AddPeer(peer); err != nil {
- return err
- }
-
- return nil
-}
-
func (sw *Switch) AddBannedPeer(peer *Peer) error {
sw.mtx.Lock()
defer sw.mtx.Unlock()
- if peer == nil {
- return nil
- }
- key := peer.mconn.RemoteAddress.IP.String()
+ key := peer.NodeInfo.RemoteAddrHost()
sw.bannedPeer[key] = time.Now().Add(defaultBanDuration)
datajson, err := json.Marshal(sw.bannedPeer)
if err != nil {
--- /dev/null
+package p2p
+
+import (
+ "math/rand"
+ "net"
+
+ "github.com/tendermint/go-crypto"
+ cmn "github.com/tendermint/tmlibs/common"
+
+ cfg "github.com/bytom/config"
+)
+
+var PanicOnAddPeerErr = false
+
+// Switches connected via arbitrary net.Conn; useful for testing
+// Returns n switches, connected according to the connect func.
+// If connect==Connect2Switches, the switches will be fully connected.
+// initSwitch defines how the ith switch should be initialized (ie. with what reactors).
+// NOTE: panics if any switch fails to start.
+func MakeConnectedSwitches(cfg *cfg.P2PConfig, n int, initSwitch func(int, *Switch) *Switch, connect func([]*Switch, int, int)) []*Switch {
+ switches := make([]*Switch, n)
+ for i := 0; i < n; i++ {
+ switches[i] = makeSwitch(cfg, i, "testing", "123.123.123", initSwitch)
+ }
+
+ if err := startSwitches(switches); err != nil {
+ panic(err)
+ }
+
+ for i := 0; i < n; i++ {
+ for j := i; j < n; j++ {
+ connect(switches, i, j)
+ }
+ }
+
+ return switches
+}
+
+// Will connect switches i and j via net.Pipe()
+// Blocks until a conection is established.
+// NOTE: caller ensures i and j are within bounds
+func Connect2Switches(switches []*Switch, i, j int) {
+ switchI := switches[i]
+ switchJ := switches[j]
+ c1, c2 := net.Pipe()
+ doneCh := make(chan struct{})
+ go func() {
+ err := switchI.addPeerWithConnection(c1)
+ if PanicOnAddPeerErr && err != nil {
+ panic(err)
+ }
+ doneCh <- struct{}{}
+ }()
+ go func() {
+ err := switchJ.addPeerWithConnection(c2)
+ if PanicOnAddPeerErr && err != nil {
+ panic(err)
+ }
+ doneCh <- struct{}{}
+ }()
+ <-doneCh
+ <-doneCh
+}
+
+func startSwitches(switches []*Switch) error {
+ for _, s := range switches {
+ _, err := s.Start() // start switch and reactors
+ if err != nil {
+ return err
+ }
+ }
+ return nil
+}
+
+func makeSwitch(cfg *cfg.P2PConfig, i int, network, version string, initSwitch func(int, *Switch) *Switch) *Switch {
+ privKey := crypto.GenPrivKeyEd25519()
+ // new switch, add reactors
+ // TODO: let the config be passed in?
+ s := initSwitch(i, NewSwitch(cfg, nil))
+ s.SetNodeInfo(&NodeInfo{
+ PubKey: privKey.PubKey().Unwrap().(crypto.PubKeyEd25519),
+ Moniker: cmn.Fmt("switch%d", i),
+ Network: network,
+ Version: version,
+ RemoteAddr: cmn.Fmt("%v:%v", network, rand.Intn(64512)+1023),
+ ListenAddr: cmn.Fmt("%v:%v", network, rand.Intn(64512)+1023),
+ })
+ s.SetNodePrivKey(privKey)
+ return s
+}