"time"
log "github.com/sirupsen/logrus"
- crypto "github.com/tendermint/go-crypto"
+ "github.com/tendermint/go-crypto"
cmn "github.com/tendermint/tmlibs/common"
dbm "github.com/tendermint/tmlibs/db"
cfg "github.com/bytom/config"
+ "github.com/bytom/consensus"
"github.com/bytom/errors"
"github.com/bytom/p2p/connection"
+ "github.com/bytom/p2p/discover"
"github.com/bytom/p2p/trust"
+ "github.com/bytom/version"
)
const (
- bannedPeerKey = "BannedPeer"
- defaultBanDuration = time.Hour * 1
+ bannedPeerKey = "BannedPeer"
+ defaultBanDuration = time.Hour * 1
+ minNumOutboundPeers = 3
)
//pre-define errors for connecting fail
ErrDuplicatePeer = errors.New("Duplicate peer")
ErrConnectSelf = errors.New("Connect self")
ErrConnectBannedPeer = errors.New("Connect banned peer")
+ ErrConnectSpvPeer = errors.New("Outbound connect spv peer")
)
-// An AddrBook represents an address book from the pex package, which is used to store peer addresses.
-type AddrBook interface {
- AddAddress(*NetAddress, *NetAddress) error
- AddOurAddress(*NetAddress)
- MarkGood(*NetAddress)
- RemoveAddress(*NetAddress)
- SaveToFile() error
-}
-
-//-----------------------------------------------------------------------------
-
// Switch handles peer connections and exposes an API to receive incoming messages
// on `Reactors`. Each `Reactor` is responsible for handling incoming messages of one
// or more `Channels`. So while sending outgoing messages is typically performed on the peer,
type Switch struct {
cmn.BaseService
- Config *cfg.P2PConfig
+ Config *cfg.Config
peerConfig *PeerConfig
listeners []Listener
reactors map[string]Reactor
dialing *cmn.CMap
nodeInfo *NodeInfo // our node info
nodePrivKey crypto.PrivKeyEd25519 // our node privkey
- addrBook AddrBook
+ discv *discover.Network
bannedPeer map[string]time.Time
db dbm.DB
mtx sync.Mutex
}
// NewSwitch creates a new Switch with the given config.
-func NewSwitch(config *cfg.P2PConfig, addrBook AddrBook, trustHistoryDB dbm.DB) *Switch {
+func NewSwitch(config *cfg.Config) *Switch {
sw := &Switch{
Config: config,
- peerConfig: DefaultPeerConfig(config),
+ peerConfig: DefaultPeerConfig(config.P2P),
reactors: make(map[string]Reactor),
chDescs: make([]*connection.ChannelDescriptor, 0),
reactorsByCh: make(map[byte]Reactor),
peers: NewPeerSet(),
dialing: cmn.NewCMap(),
nodeInfo: nil,
- addrBook: addrBook,
- db: trustHistoryDB,
+ db: dbm.NewDB("trusthistory", config.DBBackend, config.DBDir()),
}
sw.BaseService = *cmn.NewBaseService(nil, "P2P Switch", sw)
sw.bannedPeer = make(map[string]time.Time)
for _, listener := range sw.listeners {
go sw.listenerRoutine(listener)
}
+ go sw.ensureOutboundPeersRoutine()
return nil
}
}
//AddBannedPeer add peer to blacklist
-func (sw *Switch) AddBannedPeer(peer *Peer) error {
+func (sw *Switch) AddBannedPeer(ip string) error {
sw.mtx.Lock()
defer sw.mtx.Unlock()
- key := peer.NodeInfo.RemoteAddrHost()
- sw.bannedPeer[key] = time.Now().Add(defaultBanDuration)
+ sw.bannedPeer[ip] = time.Now().Add(defaultBanDuration)
datajson, err := json.Marshal(sw.bannedPeer)
if err != nil {
return err
// NOTE: This performs a blocking handshake before the peer is added.
// CONTRACT: If error is returned, peer is nil, and conn is immediately closed.
func (sw *Switch) AddPeer(pc *peerConn) error {
- peerNodeInfo, err := pc.HandshakeTimeout(sw.nodeInfo, time.Duration(sw.peerConfig.HandshakeTimeout*time.Second))
+ peerNodeInfo, err := pc.HandshakeTimeout(sw.nodeInfo, time.Duration(sw.peerConfig.HandshakeTimeout))
if err != nil {
return err
}
+ if err := version.Status.CheckUpdate(sw.nodeInfo.Version, peerNodeInfo.Version, peerNodeInfo.RemoteAddr); err != nil {
+ return err
+ }
if err := sw.nodeInfo.CompatibleWith(peerNodeInfo); err != nil {
return err
}
return err
}
+ if pc.outbound && !peer.ServiceFlag().IsEnable(consensus.SFFullNode) {
+ return ErrConnectSpvPeer
+ }
+
// Start peer
if sw.IsRunning() {
if err := sw.startInitPeer(peer); err != nil {
}
// StopPeerGracefully disconnect from a peer gracefully.
-func (sw *Switch) StopPeerGracefully(peer *Peer) {
- sw.stopAndRemovePeer(peer, nil)
+func (sw *Switch) StopPeerGracefully(peerID string) {
+ if peer := sw.peers.Get(peerID); peer != nil {
+ sw.stopAndRemovePeer(peer, nil)
+ }
}
func (sw *Switch) addPeerWithConnection(conn net.Conn) error {
- peerConn, err := newInboundPeerConn(conn, sw.nodePrivKey, sw.Config)
+ peerConn, err := newInboundPeerConn(conn, sw.nodePrivKey, sw.Config.P2P)
if err != nil {
conn.Close()
return err
return nil
}
-func (sw *Switch) addrBookDelSelf() error {
- addr, err := NewNetAddressString(sw.nodeInfo.ListenAddr)
- if err != nil {
- return err
- }
-
- sw.addrBook.RemoveAddress(addr)
- sw.addrBook.AddOurAddress(addr)
- return nil
-}
-
func (sw *Switch) checkBannedPeer(peer string) error {
sw.mtx.Lock()
defer sw.mtx.Unlock()
func (sw *Switch) filterConnByIP(ip string) error {
if ip == sw.nodeInfo.ListenHost() {
- sw.addrBookDelSelf()
return ErrConnectSelf
}
return sw.checkBannedPeer(ip)
}
if sw.nodeInfo.PubKey.Equals(peer.PubKey().Wrap()) {
- sw.addrBookDelSelf()
return ErrConnectSelf
}
break
}
- // disconnect if we alrady have 2 * MaxNumPeers, we do this because we wanna address book get exchanged even if
- // the connect is full. The pex will disconnect the peer after address exchange, the max connected peer won't
- // be double of MaxNumPeers
- if sw.peers.Size() >= sw.Config.MaxNumPeers*2 {
+ // disconnect if we alrady have MaxNumPeers
+ if sw.peers.Size() >= sw.Config.P2P.MaxNumPeers {
inConn.Close()
log.Info("Ignoring inbound connection: already have enough peers.")
continue
}
}
+func (sw *Switch) SetDiscv(discv *discover.Network) {
+ sw.discv = discv
+}
+
+func (sw *Switch) dialPeerWorker(a *NetAddress, wg *sync.WaitGroup) {
+ if err := sw.DialPeerWithAddress(a); err != nil {
+ log.WithFields(log.Fields{"addr": a, "err": err}).Error("dialPeerWorker fail on dial peer")
+ }
+ wg.Done()
+}
+
+func (sw *Switch) ensureOutboundPeers() {
+ numOutPeers, _, numDialing := sw.NumPeers()
+ numToDial := (minNumOutboundPeers - (numOutPeers + numDialing))
+ log.WithFields(log.Fields{"numOutPeers": numOutPeers, "numDialing": numDialing, "numToDial": numToDial}).Debug("ensure peers")
+ if numToDial <= 0 {
+ return
+ }
+
+ connectedPeers := make(map[string]struct{})
+ for _, peer := range sw.Peers().List() {
+ connectedPeers[peer.RemoteAddrHost()] = struct{}{}
+ }
+
+ var wg sync.WaitGroup
+ nodes := make([]*discover.Node, numToDial)
+ n := sw.discv.ReadRandomNodes(nodes)
+ for i := 0; i < n; i++ {
+ try := NewNetAddressIPPort(nodes[i].IP, nodes[i].TCP)
+ if sw.NodeInfo().ListenAddr == try.String() {
+ continue
+ }
+ if dialling := sw.IsDialing(try); dialling {
+ continue
+ }
+ if _, ok := connectedPeers[try.IP.String()]; ok {
+ continue
+ }
+
+ wg.Add(1)
+ go sw.dialPeerWorker(try, &wg)
+ }
+ wg.Wait()
+}
+
+func (sw *Switch) ensureOutboundPeersRoutine() {
+ sw.ensureOutboundPeers()
+
+ ticker := time.NewTicker(10 * time.Second)
+ defer ticker.Stop()
+
+ for {
+ select {
+ case <-ticker.C:
+ sw.ensureOutboundPeers()
+ case <-sw.Quit:
+ return
+ }
+ }
+}
+
func (sw *Switch) startInitPeer(peer *Peer) error {
peer.Start() // spawn send/recv routines
for _, reactor := range sw.reactors {
}
func (sw *Switch) stopAndRemovePeer(peer *Peer, reason interface{}) {
+ sw.peers.Remove(peer)
for _, reactor := range sw.reactors {
reactor.RemovePeer(peer, reason)
}
- sw.peers.Remove(peer)
peer.Stop()
}