"fmt"
"math/rand"
"net"
- "strings"
"sync"
"time"
dbm "github.com/tendermint/tmlibs/db"
cfg "github.com/bytom/config"
- "github.com/bytom/errors"
"github.com/bytom/p2p/trust"
+ "github.com/bytom/errors"
)
const (
- reconnectAttempts = 5
- reconnectInterval = 10 * time.Second
-
bannedPeerKey = "BannedPeer"
defaultBanDuration = time.Hour * 1
)
-var ErrConnectBannedPeer = errors.New("Connect banned peer")
-
-type Reactor interface {
- cmn.Service // Start, Stop
-
- SetSwitch(*Switch)
- GetChannels() []*ChannelDescriptor
- AddPeer(peer *Peer) error
- RemovePeer(peer *Peer, reason interface{})
- Receive(chID byte, peer *Peer, msgBytes []byte)
-}
-
-//--------------------------------------
-
-type BaseReactor struct {
- cmn.BaseService // Provides Start, Stop, .Quit
- Switch *Switch
-}
-
-func NewBaseReactor(name string, impl Reactor) *BaseReactor {
- return &BaseReactor{
- BaseService: *cmn.NewBaseService(nil, name, impl),
- Switch: nil,
- }
-}
-
-func (br *BaseReactor) SetSwitch(sw *Switch) {
- br.Switch = sw
-}
-func (_ *BaseReactor) GetChannels() []*ChannelDescriptor { return nil }
-func (_ *BaseReactor) AddPeer(peer *Peer) {}
-func (_ *BaseReactor) RemovePeer(peer *Peer, reason interface{}) {}
-func (_ *BaseReactor) Receive(chID byte, peer *Peer, msgBytes []byte) {}
+var (
+ ErrDuplicatePeer = errors.New("Duplicate peer")
+ ErrConnectSelf = errors.New("Connect self")
+ ErrConnectBannedPeer = errors.New("Connect banned peer")
+)
//-----------------------------------------------------------------------------
-/*
-The `Switch` handles peer connections and exposes an API to receive incoming messages
-on `Reactors`. Each `Reactor` is responsible for handling incoming messages of one
-or more `Channels`. So while sending outgoing messages is typically performed on the peer,
-incoming messages are received on the reactor.
-*/
+// Switch handles peer connections and exposes an API to receive incoming messages
+// on `Reactors`. Each `Reactor` is responsible for handling incoming messages of one
+// or more `Channels`. So while sending outgoing messages is typically performed on the peer,
+// incoming messages are received on the reactor.
type Switch struct {
cmn.BaseService
dialing *cmn.CMap
nodeInfo *NodeInfo // our node info
nodePrivKey crypto.PrivKeyEd25519 // our node privkey
+ addrBook *AddrBook
bannedPeer map[string]time.Time
db dbm.DB
mtx sync.Mutex
-
- filterConnByAddr func(net.Addr) error
- filterConnByPubKey func(crypto.PubKeyEd25519) error
}
-var (
- ErrSwitchDuplicatePeer = errors.New("Duplicate peer")
- ErrConnectSelf = errors.New("Connect self")
- ErrPeerConnected = errors.New("Peer is connected")
-)
-
+// NewSwitch creates a new Switch with the given config.
func NewSwitch(config *cfg.P2PConfig, trustHistoryDB dbm.DB) *Switch {
sw := &Switch{
config: config,
}
sw.BaseService = *cmn.NewBaseService(nil, "P2P Switch", sw)
+ // Optionally, start the pex reactor
+ if config.PexReactor {
+ sw.addrBook = NewAddrBook(config.AddrBookFile(), config.AddrBookStrict)
+ pexReactor := NewPEXReactor(sw.addrBook, sw)
+ sw.AddReactor("PEX", pexReactor)
+ }
+
sw.bannedPeer = make(map[string]time.Time)
if datajson := sw.db.Get([]byte(bannedPeerKey)); datajson != nil {
if err := json.Unmarshal(datajson, &sw.bannedPeer); err != nil {
return sw
}
-// Not goroutine safe.
+// AddReactor adds the given reactor to the switch.
+// NOTE: Not goroutine safe.
func (sw *Switch) AddReactor(name string, reactor Reactor) Reactor {
// Validate the reactor.
// No two reactors can share the same channel.
return reactor
}
-// Not goroutine safe.
+// Reactors returns a map of reactors registered on the switch.
+// NOTE: Not goroutine safe.
func (sw *Switch) Reactors() map[string]Reactor {
return sw.reactors
}
-// Not goroutine safe.
+// Reactor returns the reactor with the given name.
+// NOTE: Not goroutine safe.
func (sw *Switch) Reactor(name string) Reactor {
return sw.reactors[name]
}
-// Not goroutine safe.
+// AddListener adds the given listener to the switch for listening to incoming peer connections.
+// NOTE: Not goroutine safe.
func (sw *Switch) AddListener(l Listener) {
sw.listeners = append(sw.listeners, l)
}
-// Not goroutine safe.
+// Listeners returns the list of listeners the switch listens on.
+// NOTE: Not goroutine safe.
func (sw *Switch) Listeners() []Listener {
return sw.listeners
}
-// Not goroutine safe.
+// IsListening returns true if the switch has at least one listener.
+// NOTE: Not goroutine safe.
func (sw *Switch) IsListening() bool {
return len(sw.listeners) > 0
}
-// Not goroutine safe.
+// SetNodeInfo sets the switch's NodeInfo for checking compatibility and handshaking with other nodes.
+// NOTE: Not goroutine safe.
func (sw *Switch) SetNodeInfo(nodeInfo *NodeInfo) {
sw.nodeInfo = nodeInfo
}
-// Not goroutine safe.
+// NodeInfo returns the switch's NodeInfo.
+// NOTE: Not goroutine safe.
func (sw *Switch) NodeInfo() *NodeInfo {
return sw.nodeInfo
}
-// Not goroutine safe.
-// NOTE: Overwrites sw.nodeInfo.PubKey
+// SetNodeKey sets the switch's private key for authenticated encryption.
+// NOTE: Not goroutine safe.
func (sw *Switch) SetNodePrivKey(nodePrivKey crypto.PrivKeyEd25519) {
sw.nodePrivKey = nodePrivKey
if sw.nodeInfo != nil {
}
}
-// Switch.Start() starts all the reactors, peers, and listeners.
+// OnStart implements BaseService. It starts all the reactors, peers, and listeners.
func (sw *Switch) OnStart() error {
- sw.BaseService.OnStart()
// Start reactors
for _, reactor := range sw.reactors {
_, err := reactor.Start()
return nil
}
+// OnStop implements BaseService. It stops all listeners, peers, and reactors.
func (sw *Switch) OnStop() {
- sw.BaseService.OnStop()
// Stop listeners
for _, listener := range sw.listeners {
listener.Stop()
}
}
+// addPeer performs the P2P handshake with a peer
+// that already has a SecretConnection. If all goes well,
+// it starts the peer and adds it to the switch.
// NOTE: This performs a blocking handshake before the peer is added.
// CONTRACT: If error is returned, peer is nil, and conn is immediately closed.
func (sw *Switch) AddPeer(peer *Peer) error {
- if err := sw.FilterConnByAddr(peer.Addr()); err != nil {
- return err
- }
-
- if err := sw.FilterConnByPubKey(peer.PubKey()); err != nil {
- return err
- }
-
if err := peer.HandshakeTimeout(sw.nodeInfo, time.Duration(sw.peerConfig.HandshakeTimeout*time.Second)); err != nil {
- return err
+ return ErrConnectBannedPeer
}
-
- if err := sw.checkBannedPeer(peer.NodeInfo.ListenHost()); err != nil {
+ //filter peer
+ if err := sw.filterConnByPeer(peer); err != nil {
return err
}
-
- // Avoid self
- if sw.nodeInfo.PubKey.Equals(peer.PubKey().Wrap()) {
- return errors.New("Ignoring connection from self")
- }
-
// Check version, chain id
if err := sw.nodeInfo.CompatibleWith(peer.NodeInfo); err != nil {
return err
}
-
- // Check for duplicate peer
- if sw.peers.Has(peer.Key) {
- return ErrSwitchDuplicatePeer
-
- }
-
// Start peer
if sw.IsRunning() {
if err := sw.startInitPeer(peer); err != nil {
return err
}
}
-
// Add the peer to .peers.
// We start it first so that a peer in the list is safe to Stop.
// It should not err since we already checked peers.Has()
return err
}
- log.WithField("peer", peer).Info("Added peer")
+ log.Info("Added peer:", peer)
return nil
}
-func (sw *Switch) FilterConnByAddr(addr net.Addr) error {
- if sw.filterConnByAddr != nil {
- return sw.filterConnByAddr(addr)
- }
- return nil
-}
-
-func (sw *Switch) FilterConnByPubKey(pubkey crypto.PubKeyEd25519) error {
- if sw.filterConnByPubKey != nil {
- return sw.filterConnByPubKey(pubkey)
- }
- return nil
-
-}
-
-func (sw *Switch) SetAddrFilter(f func(net.Addr) error) {
- sw.filterConnByAddr = f
-}
-
-func (sw *Switch) SetPubKeyFilter(f func(crypto.PubKeyEd25519) error) {
- sw.filterConnByPubKey = f
-}
-
func (sw *Switch) startInitPeer(peer *Peer) error {
peer.Start() // spawn send/recv routines
for _, reactor := range sw.reactors {
}
// Dial a list of seeds asynchronously in random order
-func (sw *Switch) DialSeeds(addrBook *AddrBook, seeds []string) error {
-
+func (sw *Switch) DialSeeds(seeds []string) error {
netAddrs, err := NewNetAddressStrings(seeds)
if err != nil {
return err
}
- if addrBook != nil {
+ if sw.addrBook != nil {
// add seeds to `addrBook`
- ourAddrS := sw.nodeInfo.ListenAddr
- ourAddr, _ := NewNetAddressString(ourAddrS)
+ ourAddr, _ := NewNetAddressString(sw.nodeInfo.ListenAddr)
for _, netAddr := range netAddrs {
// do not add ourselves
if netAddr.Equals(ourAddr) {
continue
}
- addrBook.AddAddress(netAddr, ourAddr)
+ sw.addrBook.AddAddress(netAddr, ourAddr)
}
- addrBook.Save()
+ sw.addrBook.Save()
}
+
//permute the list, dial them in random order.
perm := rand.Perm(len(netAddrs))
for i := 0; i < len(perm); i += 2 {
}
func (sw *Switch) dialSeed(addr *NetAddress) {
- peer, err := sw.DialPeerWithAddress(addr, false)
+ peer, err := sw.DialPeerWithAddress(addr)
if err != nil {
log.WithField("error", err).Error("Error dialing seed")
} else {
}
}
-func (sw *Switch) DialPeerWithAddress(addr *NetAddress, persistent bool) (*Peer, error) {
- if err := sw.checkBannedPeer(addr.IP.String()); err != nil {
- return nil, err
+func (sw *Switch) addrBookDelSelf() error {
+ addr, err := NewNetAddressString(sw.nodeInfo.ListenAddr)
+ if err != nil {
+ return err
}
- if strings.Compare(addr.IP.String(), sw.nodeInfo.ListenHost()) == 0 {
- return nil, ErrConnectSelf
+ // remove the given address from the address book if we're added it earlier
+ sw.addrBook.RemoveAddress(addr)
+ // add the given address to the address book to avoid dialing ourselves
+ // again this is our public address
+ sw.addrBook.AddOurAddress(addr)
+ return nil
+}
+
+func (sw *Switch) filterConnByIP(ip string) error {
+ if err := sw.checkBannedPeer(ip); err != nil {
+ return ErrConnectBannedPeer
}
- for _, v := range sw.Peers().list {
- if strings.Compare(v.mconn.RemoteAddress.IP.String(), addr.IP.String()) == 0 {
- return nil, ErrPeerConnected
- }
+
+ if ip == sw.nodeInfo.ListenHost() {
+ sw.addrBookDelSelf()
+ return ErrConnectSelf
}
+
+ return nil
+}
+
+func (sw *Switch) filterConnByPeer(peer *Peer) error {
+ if err := sw.checkBannedPeer(peer.mconn.RemoteAddress.IP.String()); err != nil {
+ return ErrConnectBannedPeer
+ }
+
+ if sw.nodeInfo.PubKey.Equals(peer.PubKey().Wrap()) {
+ sw.addrBookDelSelf()
+ return ErrConnectSelf
+ }
+
+ // Check for duplicate peer
+ if sw.peers.Has(peer.Key) {
+ return ErrDuplicatePeer
+ }
+ return nil
+}
+
+func (sw *Switch) DialPeerWithAddress(addr *NetAddress) (*Peer, error) {
+ log.Debug("Dialing peer address:", addr)
+
+ if err := sw.filterConnByIP(addr.IP.String()); err != nil {
+ return nil, err
+ }
+
sw.dialing.Set(addr.IP.String(), addr)
defer sw.dialing.Delete(addr.IP.String())
- log.Debug("Dialing peer address:", addr)
peer, err := newOutboundPeerWithConfig(addr, sw.reactorsByCh, sw.chDescs, sw.StopPeerForError, sw.nodePrivKey, sw.peerConfig)
if err != nil {
log.Debug("Failed to dial peer", " address:", addr, " error:", err)
return nil, err
}
- peer.SetLogger(sw.Logger.With("peer", addr))
- if persistent {
- peer.makePersistent()
- }
+
err = sw.AddPeer(peer)
if err != nil {
- log.WithFields(log.Fields{
- "address": addr,
- "error": err,
- }).Info("Failed to add peer")
+ log.Info("Failed to add peer:", addr, " err:", err)
peer.CloseConn()
return nil, err
}
- log.WithFields(log.Fields{
- "address": addr,
- }).Info("Dialed and added peer")
+ log.Info("Dialed and added peer:", addr)
return peer, nil
}
return sw.dialing.Has(addr.IP.String())
}
-// Broadcast runs a go routine for each attempted send, which will block
-// trying to send for defaultSendTimeoutSeconds. Returns a channel
-// which receives success values for each attempted send (false if times out)
-// NOTE: Broadcast uses goroutines, so order of broadcast may not be preserved.
-func (sw *Switch) Broadcast(chID byte, msg interface{}) chan bool {
- successChan := make(chan bool, len(sw.peers.List()))
- log.WithFields(log.Fields{
- "chID": chID,
- "msg": msg,
- }).Debug("Broadcast")
- for _, peer := range sw.peers.List() {
- go func(peer *Peer) {
- success := peer.Send(chID, msg)
- successChan <- success
- }(peer)
- }
- return successChan
-}
-
// Returns the count of outbound/inbound and outbound-dialing peers.
func (sw *Switch) NumPeers() (outbound, inbound, dialing int) {
peers := sw.peers.List()
return sw.peers
}
-// Disconnect from a peer due to external error, retry if it is a persistent peer.
-// TODO: make record depending on reason.
+// StopPeerForError disconnects from a peer due to external error.
func (sw *Switch) StopPeerForError(peer *Peer, reason interface{}) {
- addr := NewNetAddress(peer.Addr())
- log.WithFields(log.Fields{
- "peer": peer,
- "error": reason,
- }).Info("Stopping peer due to error")
+ log.Info("Stopping peer for error.", " peer:", peer, " err:", reason)
sw.stopAndRemovePeer(peer, reason)
-
- if peer.IsPersistent() {
- log.WithField("peer", peer).Info("Reconnecting to peer")
- for i := 1; i < reconnectAttempts; i++ {
- if !sw.IsRunning() {
- return
- }
-
- peer, err := sw.DialPeerWithAddress(addr, false)
- if err != nil {
- if i == reconnectAttempts {
- log.WithFields(log.Fields{
- "retries": i,
- "error": err,
- }).Info("Error reconnecting to peer. Giving up")
- return
- }
-
- if errors.Root(err) == ErrConnectBannedPeer || errors.Root(err) == ErrPeerConnected || errors.Root(err) == ErrSwitchDuplicatePeer || errors.Root(err) == ErrConnectSelf {
- log.WithField("error", err).Info("Error reconnecting to peer. ")
- return
- }
-
- log.WithFields(log.Fields{
- "retries": i,
- "error": err,
- }).Info("Error reconnecting to peer. Trying again")
- time.Sleep(reconnectInterval)
- continue
- }
-
- log.WithField("peer", peer).Info("Reconnected to peer")
- return
- }
- }
}
// Disconnect from a peer gracefully.
-// TODO: handle graceful disconnects.
func (sw *Switch) StopPeerGracefully(peer *Peer) {
log.Info("Stopping peer gracefully")
sw.stopAndRemovePeer(peer, nil)
reactor.RemovePeer(peer, reason)
}
sw.peers.Remove(peer)
- log.Info("Del peer from switch.")
peer.Stop()
- log.Info("Peer connection is closed.")
}
func (sw *Switch) listenerRoutine(l Listener) {
// disconnect if we alrady have 2 * MaxNumPeers, we do this because we wanna address book get exchanged even if
// the connect is full. The pex will disconnect the peer after address exchange, the max connected peer won't
// be double of MaxNumPeers
- if sw.config.MaxNumPeers*2 <= sw.peers.Size() {
- // close inConn
+ if sw.peers.Size() >= sw.config.MaxNumPeers*2 {
inConn.Close()
- log.WithFields(log.Fields{
- "address": inConn.RemoteAddr().String(),
- "numPeers": sw.peers.Size(),
- }).Info("Ignoring inbound connection: already have enough peers")
+ log.Info("Ignoring inbound connection: already have enough peers.")
continue
}
// New inbound connection!
- err := sw.addPeerWithConnectionAndConfig(inConn, sw.peerConfig)
+ err := sw.addPeerWithConnection(inConn)
if err != nil {
- // conn close for returing err
- inConn.Close()
- log.WithFields(log.Fields{
- "address": inConn.RemoteAddr().String(),
- "error": err,
- }).Info("Ignoring inbound connection: error while adding peer")
+ log.Info("Ignoring inbound connection: error while adding peer.", " address:", inConn.RemoteAddr().String(), " error:", err)
continue
}
+ }
+}
- // NOTE: We don't yet have the listening port of the
- // remote (if they have a listener at all).
- // The peerHandshake will handle that
+func (sw *Switch) addPeerWithConnection(conn net.Conn) error {
+ peer, err := newInboundPeerConn(conn, sw.reactorsByCh, sw.chDescs, sw.StopPeerForError, sw.nodePrivKey, sw.config)
+ if err != nil {
+ conn.Close()
+ return err
+ }
+ if err = sw.AddPeer(peer); err != nil {
+ conn.Close()
+ return err
}
- // cleanup
+ return nil
}
-//-----------------------------------------------------------------------------
-type SwitchEventNewPeer struct {
- Peer *Peer
+func (sw *Switch) AddBannedPeer(peer *Peer) error {
+ sw.mtx.Lock()
+ defer sw.mtx.Unlock()
+ key := peer.mconn.RemoteAddress.IP.String()
+ sw.bannedPeer[key] = time.Now().Add(defaultBanDuration)
+ datajson, err := json.Marshal(sw.bannedPeer)
+ if err != nil {
+ return err
+ }
+ sw.db.Set([]byte(bannedPeerKey), datajson)
+ return nil
}
-type SwitchEventDonePeer struct {
- Peer *Peer
- Error interface{}
+func (sw *Switch) delBannedPeer(addr string) error {
+ delete(sw.bannedPeer, addr)
+ datajson, err := json.Marshal(sw.bannedPeer)
+ if err != nil {
+ return err
+ }
+ sw.db.Set([]byte(bannedPeerKey), datajson)
+ return nil
}
-//------------------------------------------------------------------
-// Switches connected via arbitrary net.Conn; useful for testing
+func (sw *Switch) checkBannedPeer(peer string) error {
+ sw.mtx.Lock()
+ defer sw.mtx.Unlock()
+ if banEnd, ok := sw.bannedPeer[peer]; ok {
+ if time.Now().Before(banEnd) {
+ return ErrConnectBannedPeer
+ }
+ sw.delBannedPeer(peer)
+ }
+ return nil
+}
+
+// Switches connected via arbitrary net.Conn; useful for testing
// Returns n switches, connected according to the connect func.
// If connect==Connect2Switches, the switches will be fully connected.
// initSwitch defines how the ith switch should be initialized (ie. with what reactors).
return s
}
-func (sw *Switch) addPeerWithConnection(conn net.Conn) error {
- peer, err := newInboundPeer(conn, sw.reactorsByCh, sw.chDescs, sw.StopPeerForError, sw.nodePrivKey, sw.config)
- if err != nil {
- conn.Close()
- return err
- }
- peer.SetLogger(sw.Logger.With("peer", conn.RemoteAddr()))
- if err = sw.AddPeer(peer); err != nil {
- conn.Close()
- return err
- }
-
- return nil
-}
-
-func (sw *Switch) addPeerWithConnectionAndConfig(conn net.Conn, config *PeerConfig) error {
- fullAddr := conn.RemoteAddr().String()
- host, _, err := net.SplitHostPort(fullAddr)
- if err != nil {
- return err
- }
-
- if err = sw.checkBannedPeer(host); err != nil {
- return err
- }
-
- peer, err := newInboundPeerWithConfig(conn, sw.reactorsByCh, sw.chDescs, sw.StopPeerForError, sw.nodePrivKey, config)
- if err != nil {
- return err
- }
- peer.SetLogger(sw.Logger.With("peer", conn.RemoteAddr()))
- if err = sw.AddPeer(peer); err != nil {
- return err
- }
-
- return nil
-}
-
-func (sw *Switch) AddBannedPeer(peer *Peer) error {
- sw.mtx.Lock()
- defer sw.mtx.Unlock()
- if peer == nil {
- return nil
- }
- key := peer.mconn.RemoteAddress.IP.String()
- sw.bannedPeer[key] = time.Now().Add(defaultBanDuration)
- datajson, err := json.Marshal(sw.bannedPeer)
- if err != nil {
- return err
- }
- sw.db.Set([]byte(bannedPeerKey), datajson)
- return nil
-}
-
-func (sw *Switch) delBannedPeer(addr string) error {
- delete(sw.bannedPeer, addr)
- datajson, err := json.Marshal(sw.bannedPeer)
- if err != nil {
- return err
- }
- sw.db.Set([]byte(bannedPeerKey), datajson)
- return nil
-}
-
-func (sw *Switch) checkBannedPeer(peer string) error {
- sw.mtx.Lock()
- defer sw.mtx.Unlock()
-
- if banEnd, ok := sw.bannedPeer[peer]; ok {
- if time.Now().Before(banEnd) {
- return ErrConnectBannedPeer
- }
- sw.delBannedPeer(peer)
- }
- return nil
-}