"fmt"
"math/rand"
"net"
+ "strings"
"sync"
"time"
- "strings"
log "github.com/sirupsen/logrus"
crypto "github.com/tendermint/go-crypto"
)
const (
- reconnectAttempts = 10
+ reconnectAttempts = 5
reconnectInterval = 10 * time.Second
bannedPeerKey = "BannedPeer"
- defaultBanDuration = time.Hour * 24
- peerBannedTM = 20
+ defaultBanDuration = time.Hour * 1
)
var ErrConnectBannedPeer = errors.New("Connect banned peer")
type Switch struct {
cmn.BaseService
- config *cfg.P2PConfig
- peerConfig *PeerConfig
- listeners []Listener
- reactors map[string]Reactor
- chDescs []*ChannelDescriptor
- reactorsByCh map[byte]Reactor
- peers *PeerSet
- dialing *cmn.CMap
- nodeInfo *NodeInfo // our node info
- nodePrivKey crypto.PrivKeyEd25519 // our node privkey
- bannedPeer map[string]time.Time
- db dbm.DB
- TrustMetricStore *trust.TrustMetricStore
- ScamPeerCh chan *Peer
- mtx sync.Mutex
+ config *cfg.P2PConfig
+ peerConfig *PeerConfig
+ listeners []Listener
+ reactors map[string]Reactor
+ chDescs []*ChannelDescriptor
+ reactorsByCh map[byte]Reactor
+ peers *PeerSet
+ dialing *cmn.CMap
+ nodeInfo *NodeInfo // our node info
+ nodePrivKey crypto.PrivKeyEd25519 // our node privkey
+ bannedPeer map[string]time.Time
+ db dbm.DB
+ mtx sync.Mutex
filterConnByAddr func(net.Addr) error
filterConnByPubKey func(crypto.PubKeyEd25519) error
dialing: cmn.NewCMap(),
nodeInfo: nil,
db: trustHistoryDB,
- ScamPeerCh: make(chan *Peer),
}
sw.BaseService = *cmn.NewBaseService(nil, "P2P Switch", sw)
- sw.TrustMetricStore = trust.NewTrustMetricStore(trustHistoryDB, trust.DefaultConfig())
- sw.TrustMetricStore.Start()
sw.bannedPeer = make(map[string]time.Time)
if datajson := sw.db.Get([]byte(bannedPeerKey)); datajson != nil {
return nil
}
}
- go sw.scamPeerHandler()
+ trust.Init()
return sw
}
return err
}
- tm := trust.NewMetric()
-
- tm.Start()
- sw.TrustMetricStore.AddPeerTrustMetric(peer.mconn.RemoteAddress.IP.String(), tm)
-
log.WithField("peer", peer).Info("Added peer")
return nil
}
}
addrBook.AddAddress(netAddr, ourAddr)
}
+
addrBook.Save()
}
-
- // permute the list, dial them in random order.
+ //permute the list, dial them in random order.
perm := rand.Perm(len(netAddrs))
- for i := 0; i < len(perm); i++ {
+ for i := 0; i < len(perm); i += 2 {
j := perm[i]
sw.dialSeed(netAddrs[j])
}
+
return nil
}
func (sw *Switch) dialSeed(addr *NetAddress) {
- peer, err := sw.DialPeerWithAddress(addr, true)
+ peer, err := sw.DialPeerWithAddress(addr, false)
if err != nil {
log.WithField("error", err).Error("Error dialing seed")
} else {
sw.dialing.Set(addr.IP.String(), addr)
defer sw.dialing.Delete(addr.IP.String())
- log.WithField("address", addr).Info("Dialing peer")
+ log.Debug("Dialing peer address:", addr)
peer, err := newOutboundPeerWithConfig(addr, sw.reactorsByCh, sw.chDescs, sw.StopPeerForError, sw.nodePrivKey, sw.peerConfig)
if err != nil {
- log.WithFields(log.Fields{
- "address": addr,
- "error": err,
- }).Info("Failed to dial peer")
+ log.Debug("Failed to dial peer", " address:", addr, " error:", err)
return nil, err
}
peer.SetLogger(sw.Logger.With("peer", addr))
return
}
- peer, err := sw.DialPeerWithAddress(addr, true)
+ peer, err := sw.DialPeerWithAddress(addr, false)
if err != nil {
if i == reconnectAttempts {
log.WithFields(log.Fields{
}
func (sw *Switch) stopAndRemovePeer(peer *Peer, reason interface{}) {
- sw.peers.Remove(peer)
- peer.Stop()
for _, reactor := range sw.reactors {
reactor.RemovePeer(peer, reason)
}
+ sw.peers.Remove(peer)
+ log.Info("Del peer from switch.")
+ peer.Stop()
+ log.Info("Peer connection is closed.")
}
func (sw *Switch) listenerRoutine(l Listener) {
break
}
- // ignore connection if we already have enough
- maxPeers := sw.config.MaxNumPeers
- if maxPeers <= sw.peers.Size() {
+ // disconnect if we alrady have 2 * MaxNumPeers, we do this because we wanna address book get exchanged even if
+ // the connect is full. The pex will disconnect the peer after address exchange, the max connected peer won't
+ // be double of MaxNumPeers
+ if sw.config.MaxNumPeers*2 <= sw.peers.Size() {
// close inConn
inConn.Close()
log.WithFields(log.Fields{
"address": inConn.RemoteAddr().String(),
"numPeers": sw.peers.Size(),
- "max": maxPeers,
}).Info("Ignoring inbound connection: already have enough peers")
continue
}
func (sw *Switch) AddBannedPeer(peer *Peer) error {
sw.mtx.Lock()
defer sw.mtx.Unlock()
-
+ if peer == nil {
+ return nil
+ }
key := peer.mconn.RemoteAddress.IP.String()
sw.bannedPeer[key] = time.Now().Add(defaultBanDuration)
datajson, err := json.Marshal(sw.bannedPeer)
return nil
}
-func (sw *Switch) DelBannedPeer(addr string) error {
- sw.mtx.Lock()
- defer sw.mtx.Unlock()
-
+func (sw *Switch) delBannedPeer(addr string) error {
delete(sw.bannedPeer, addr)
datajson, err := json.Marshal(sw.bannedPeer)
if err != nil {
return nil
}
-func (sw *Switch) scamPeerHandler() {
- for src := range sw.ScamPeerCh {
- var tm *trust.TrustMetric
- key := src.Connection().RemoteAddress.IP.String()
- if tm = sw.TrustMetricStore.GetPeerTrustMetric(key); tm == nil {
- log.Errorf("Can't get peer trust metric")
- continue
- }
- sw.delTrustMetric(tm, src)
- }
-}
-
-func (sw *Switch) AddScamPeer(src *Peer) {
- sw.ScamPeerCh <- src
-}
-
-func (sw *Switch) delTrustMetric(tm *trust.TrustMetric, src *Peer) {
- key := src.Connection().RemoteAddress.IP.String()
- tm.BadEvents(1)
- if tm.TrustScore() < peerBannedTM {
- sw.AddBannedPeer(src)
- sw.TrustMetricStore.PeerDisconnected(key)
- sw.StopPeerGracefully(src)
- }
-}
-
func (sw *Switch) checkBannedPeer(peer string) error {
+ sw.mtx.Lock()
+ defer sw.mtx.Unlock()
+
if banEnd, ok := sw.bannedPeer[peer]; ok {
if time.Now().Before(banEnd) {
return ErrConnectBannedPeer
}
- sw.DelBannedPeer(peer)
+ sw.delBannedPeer(peer)
}
return nil
}