From: paladz <453256728@qq.com> Date: Mon, 29 Oct 2018 14:14:21 +0000 (+0800) Subject: add net performance X-Git-Tag: 2.0.0-alpha~106^2~1^2 X-Git-Url: http://git.osdn.net/view?a=commitdiff_plain;h=9dc50254ef3a4c3e4f7a9fb3df52f43b872106f8;p=bytom%2Fbytom.git add net performance --- diff --git a/netsync/peer.go b/netsync/peer.go index a08458ee..0abfa3df 100644 --- a/netsync/peer.go +++ b/netsync/peer.go @@ -6,6 +6,7 @@ import ( "sync" log "github.com/sirupsen/logrus" + "github.com/tendermint/tmlibs/flowrate" "gopkg.in/fatih/set.v0" "github.com/bytom/consensus" @@ -26,6 +27,7 @@ type BasePeer interface { Addr() net.Addr ID() string ServiceFlag() consensus.ServiceFlag + TrafficStatus() (*flowrate.Status, *flowrate.Status) TrySend(byte, interface{}) bool } @@ -37,10 +39,17 @@ type BasePeerSet interface { // PeerInfo indicate peer status snap type PeerInfo struct { - ID string `json:"peer_id"` - RemoteAddr string `json:"remote_addr"` - Height uint64 `json:"height"` - Delay uint32 `json:"delay"` + ID string `json:"peer_id"` + RemoteAddr string `json:"remote_addr"` + Height uint64 `json:"height"` + Ping string `json:"ping"` + Duration string `json:"duration"` + TotalSent int64 `json:"total_sent"` + TotalReceive int64 `json:"total_receive"` + AverageSentRate int64 `json:"average_sent_rate"` + AverageReceiveRate int64 `json:"average_receive_rate"` + CurrentSentRate int64 `json:"current_sent_rate"` + CurrentReceiveRate int64 `json:"current_receive_rate"` } type peer struct { @@ -139,10 +148,25 @@ func (p *peer) getHeaders(locator []*bc.Hash, stopHash *bc.Hash) bool { func (p *peer) getPeerInfo() *PeerInfo { p.mtx.RLock() defer p.mtx.RUnlock() + + sentStatus, receiveStatus := p.TrafficStatus() + ping := sentStatus.Idle - receiveStatus.Idle + if receiveStatus.Idle > sentStatus.Idle { + ping = -ping + } + return &PeerInfo{ - ID: p.ID(), - RemoteAddr: p.Addr().String(), - Height: p.height, + ID: p.ID(), + RemoteAddr: p.Addr().String(), + Height: p.height, + Ping: ping.String(), + Duration: sentStatus.Duration.String(), + TotalSent: sentStatus.Bytes, + TotalReceive: receiveStatus.Bytes, + AverageSentRate: sentStatus.AvgRate, + AverageReceiveRate: receiveStatus.AvgRate, + CurrentSentRate: sentStatus.CurRate, + CurrentReceiveRate: receiveStatus.CurRate, } } diff --git a/netsync/tool_test.go b/netsync/tool_test.go index 253e84e6..90575e66 100644 --- a/netsync/tool_test.go +++ b/netsync/tool_test.go @@ -7,6 +7,7 @@ import ( "time" wire "github.com/tendermint/go-wire" + "github.com/tendermint/tmlibs/flowrate" "github.com/bytom/consensus" "github.com/bytom/protocol/bc" @@ -52,6 +53,10 @@ func (p *P2PPeer) SetConnection(srcPeer *P2PPeer, node *SyncManager) { p.remoteNode = node } +func (p *P2PPeer) TrafficStatus() (*flowrate.Status, *flowrate.Status) { + return nil, nil +} + func (p *P2PPeer) TrySend(b byte, msg interface{}) bool { msgBytes := wire.BinaryBytes(msg) if p.async { diff --git a/p2p/connection/connection.go b/p2p/connection/connection.go index 983fe221..282c8c73 100644 --- a/p2p/connection/connection.go +++ b/p2p/connection/connection.go @@ -12,7 +12,7 @@ import ( log "github.com/sirupsen/logrus" wire "github.com/tendermint/go-wire" cmn "github.com/tendermint/tmlibs/common" - flow "github.com/tendermint/tmlibs/flowrate" + "github.com/tendermint/tmlibs/flowrate" ) const ( @@ -78,8 +78,8 @@ type MConnection struct { conn net.Conn bufReader *bufio.Reader bufWriter *bufio.Writer - sendMonitor *flow.Monitor - recvMonitor *flow.Monitor + sendMonitor *flowrate.Monitor + recvMonitor *flowrate.Monitor send chan struct{} pong chan struct{} channels []*channel @@ -115,8 +115,8 @@ func NewMConnectionWithConfig(conn net.Conn, chDescs []*ChannelDescriptor, onRec conn: conn, bufReader: bufio.NewReaderSize(conn, minReadBufferSize), bufWriter: bufio.NewWriterSize(conn, minWriteBufferSize), - sendMonitor: flow.New(0, 0), - recvMonitor: flow.New(0, 0), + sendMonitor: flowrate.New(0, 0), + recvMonitor: flowrate.New(0, 0), send: make(chan struct{}, 1), pong: make(chan struct{}, 1), channelsIdx: map[byte]*channel{}, @@ -198,6 +198,13 @@ func (c *MConnection) Send(chID byte, msg interface{}) bool { return true } +// TrafficStatus return the in and out traffic status +func (c *MConnection) TrafficStatus() (*flowrate.Status, *flowrate.Status) { + sentStatus := c.sendMonitor.Status() + receiveStatus := c.recvMonitor.Status() + return &sentStatus, &receiveStatus +} + // TrySend queues a message to be sent to channel(Nonblocking). func (c *MConnection) TrySend(chID byte, msg interface{}) bool { if !c.IsRunning() { diff --git a/p2p/node_info.go b/p2p/node_info.go index 1f8c95db..e826e3ef 100644 --- a/p2p/node_info.go +++ b/p2p/node_info.go @@ -31,11 +31,11 @@ func (info *NodeInfo) CompatibleWith(other *NodeInfo) error { return err } if !compatible { - return fmt.Errorf("Peer is on a different major version. Peer version: %v, node version: %v.", other.Version, info.Version) + return fmt.Errorf("Peer is on a different major version. Peer version: %v, node version: %v", other.Version, info.Version) } if info.Network != other.Network { - return fmt.Errorf("Peer is on a different network. Peer network: %v, node network: %v.", other.Network, info.Network) + return fmt.Errorf("Peer is on a different network. Peer network: %v, node network: %v", other.Network, info.Network) } return nil } diff --git a/p2p/peer.go b/p2p/peer.go index 1e3cc2e6..8c9efce4 100644 --- a/p2p/peer.go +++ b/p2p/peer.go @@ -11,6 +11,7 @@ import ( crypto "github.com/tendermint/go-crypto" wire "github.com/tendermint/go-wire" cmn "github.com/tendermint/tmlibs/common" + "github.com/tendermint/tmlibs/flowrate" cfg "github.com/bytom/config" "github.com/bytom/consensus" @@ -164,6 +165,7 @@ func (pc *peerConn) HandshakeTimeout(ourNodeInfo *NodeInfo, timeout time.Duratio return peerNodeInfo, nil } +// ID return the uuid of the peer func (p *Peer) ID() string { return p.Key } @@ -187,6 +189,7 @@ func (p *Peer) Send(chID byte, msg interface{}) bool { return p.mconn.Send(chID, msg) } +// ServiceFlag return the ServiceFlag of this peer func (p *Peer) ServiceFlag() consensus.ServiceFlag { services := consensus.SFFullNode if len(p.Other) == 0 { @@ -207,6 +210,12 @@ func (p *Peer) String() string { return fmt.Sprintf("Peer{%v %v in}", p.mconn, p.Key[:12]) } +// TrafficStatus return the in and out traffic status +func (p *Peer) TrafficStatus() (*flowrate.Status, *flowrate.Status) { + return p.mconn.TrafficStatus() + +} + // TrySend msg to the channel identified by chID byte. Immediately returns // false if the send queue is full. func (p *Peer) TrySend(chID byte, msg interface{}) bool { diff --git a/p2p/switch.go b/p2p/switch.go index 4e747a49..5d69f088 100644 --- a/p2p/switch.go +++ b/p2p/switch.go @@ -367,6 +367,7 @@ func (sw *Switch) listenerRoutine(l Listener) { } } +// SetDiscv connect the discv model to the switch func (sw *Switch) SetDiscv(discv *discover.Network) { sw.discv = discv } @@ -444,4 +445,15 @@ func (sw *Switch) stopAndRemovePeer(peer *Peer, reason interface{}) { reactor.RemovePeer(peer, reason) } peer.Stop() + + sentStatus, receiveStatus := peer.TrafficStatus() + log.WithFields(log.Fields{ + "address": peer.Addr().String(), + "reason": reason, + "duration": sentStatus.Duration.String(), + "total_sent": sentStatus.Bytes, + "total_receive": receiveStatus.Bytes, + "average_sent_rate": sentStatus.AvgRate, + "average_receive_rate": receiveStatus.AvgRate, + }).Info("disconnect with peer") }