From 0af2e399160a671896126ab3eb682ef183c0ca09 Mon Sep 17 00:00:00 2001 From: wz Date: Sat, 14 Apr 2018 10:37:33 +0800 Subject: [PATCH] fix conn close --- p2p/connection.go | 22 +++++++++++----------- p2p/pex_reactor.go | 8 +++++--- p2p/switch.go | 9 ++++++--- 3 files changed, 22 insertions(+), 17 deletions(-) diff --git a/p2p/connection.go b/p2p/connection.go index 5eecebc7..32ef469e 100644 --- a/p2p/connection.go +++ b/p2p/connection.go @@ -409,19 +409,19 @@ FOR_LOOP: // Block until .recvMonitor says we can read. c.recvMonitor.Limit(maxMsgPacketTotalSize, atomic.LoadInt64(&c.config.RecvRate), true) -/* - // Peek into bufReader for debugging - if numBytes := c.bufReader.Buffered(); numBytes > 0 { - log.Infof("Peek connection buffer numBytes:", numBytes) - bytes, err := c.bufReader.Peek(cmn.MinInt(numBytes, 100)) - if err == nil { - log.Infof("bytes:", bytes) + /* + // Peek into bufReader for debugging + if numBytes := c.bufReader.Buffered(); numBytes > 0 { + log.Infof("Peek connection buffer numBytes:", numBytes) + bytes, err := c.bufReader.Peek(cmn.MinInt(numBytes, 100)) + if err == nil { + log.Infof("bytes:", bytes) + } else { + log.Warning("Error peeking connection buffer err:", err) + } } else { - log.Warning("Error peeking connection buffer err:", err) + log.Warning("Received bytes number is:", numBytes) } - } else { - log.Warning("Received bytes number is:", numBytes) - } */ // Read packet type diff --git a/p2p/pex_reactor.go b/p2p/pex_reactor.go index 3db22e36..bc2d3247 100644 --- a/p2p/pex_reactor.go +++ b/p2p/pex_reactor.go @@ -5,13 +5,13 @@ import ( "fmt" "math/rand" "reflect" - "time" "strings" + "time" + "github.com/bytom/errors" log "github.com/sirupsen/logrus" wire "github.com/tendermint/go-wire" cmn "github.com/tendermint/tmlibs/common" - "github.com/bytom/errors" ) const ( @@ -246,6 +246,7 @@ func (r *PEXReactor) ensurePeers() { return } + newBias := cmn.MinInt(numOutPeers, 8)*10 + 10 toDial := make(map[string]*NetAddress) // Try to pick numToDial addresses to dial. @@ -255,7 +256,7 @@ func (r *PEXReactor) ensurePeers() { // if we already have many connections. This algorithm isn't perfect, but // it somewhat ensures that we prioritize connecting to more-vetted // peers. - newBias := cmn.MinInt(numOutPeers, 8)*10 + 10 + var picked *NetAddress // Try to fetch a new peer 3 times. // This caps the maximum number of tries to 3 * numToDial. @@ -267,6 +268,7 @@ func (r *PEXReactor) ensurePeers() { _, alreadySelected := toDial[try.IP.String()] alreadyDialing := r.Switch.IsDialing(try) var alreadyConnected bool + for _, v := range r.Switch.Peers().list { if strings.Compare(v.mconn.RemoteAddress.IP.String(), try.IP.String()) == 0 { alreadyConnected = true diff --git a/p2p/switch.go b/p2p/switch.go index 37bf4e48..e8207b98 100644 --- a/p2p/switch.go +++ b/p2p/switch.go @@ -13,10 +13,11 @@ import ( cmn "github.com/tendermint/tmlibs/common" dbm "github.com/tendermint/tmlibs/db" + "strings" + cfg "github.com/bytom/config" "github.com/bytom/errors" "github.com/bytom/p2p/trust" - "strings" ) const ( @@ -516,6 +517,8 @@ func (sw *Switch) listenerRoutine(l Listener) { // ignore connection if we already have enough maxPeers := sw.config.MaxNumPeers if maxPeers <= sw.peers.Size() { + // close inConn + inConn.Close() log.WithFields(log.Fields{ "address": inConn.RemoteAddr().String(), "numPeers": sw.peers.Size(), @@ -527,6 +530,8 @@ func (sw *Switch) listenerRoutine(l Listener) { // New inbound connection! err := sw.addPeerWithConnectionAndConfig(inConn, sw.peerConfig) if err != nil { + // conn close for returing err + inConn.Close() log.WithFields(log.Fields{ "address": inConn.RemoteAddr().String(), "error": err, @@ -662,12 +667,10 @@ func (sw *Switch) addPeerWithConnectionAndConfig(conn net.Conn, config *PeerConf peer, err := newInboundPeerWithConfig(conn, sw.reactorsByCh, sw.chDescs, sw.StopPeerForError, sw.nodePrivKey, config) if err != nil { - conn.Close() return err } peer.SetLogger(sw.Logger.With("peer", conn.RemoteAddr())) if err = sw.AddPeer(peer); err != nil { - conn.Close() return err } -- 2.11.0