OSDN Git Service

add bytom logs into files
[bytom/bytom.git] / p2p / connection / connection.go
index 09b2810..67f3220 100644 (file)
@@ -12,7 +12,7 @@ import (
        log "github.com/sirupsen/logrus"
        wire "github.com/tendermint/go-wire"
        cmn "github.com/tendermint/tmlibs/common"
-       flow "github.com/tendermint/tmlibs/flowrate"
+       "github.com/tendermint/tmlibs/flowrate"
 )
 
 const (
@@ -36,6 +36,7 @@ const (
        defaultRecvMessageCapacity = 22020096      // 21MB
        defaultRecvRate            = int64(512000) // 500KB/s
        defaultSendTimeout         = 10 * time.Second
+       logModule                  = "p2pConn"
 )
 
 type receiveCbFunc func(chID byte, msgBytes []byte)
@@ -78,8 +79,8 @@ type MConnection struct {
        conn        net.Conn
        bufReader   *bufio.Reader
        bufWriter   *bufio.Writer
-       sendMonitor *flow.Monitor
-       recvMonitor *flow.Monitor
+       sendMonitor *flowrate.Monitor
+       recvMonitor *flowrate.Monitor
        send        chan struct{}
        pong        chan struct{}
        channels    []*channel
@@ -115,10 +116,10 @@ func NewMConnectionWithConfig(conn net.Conn, chDescs []*ChannelDescriptor, onRec
                conn:        conn,
                bufReader:   bufio.NewReaderSize(conn, minReadBufferSize),
                bufWriter:   bufio.NewWriterSize(conn, minWriteBufferSize),
-               sendMonitor: flow.New(0, 0),
-               recvMonitor: flow.New(0, 0),
+               sendMonitor: flowrate.New(0, 0),
+               recvMonitor: flowrate.New(0, 0),
                send:        make(chan struct{}, 1),
-               pong:        make(chan struct{}),
+               pong:        make(chan struct{}, 1),
                channelsIdx: map[byte]*channel{},
                channels:    []*channel{},
                onReceive:   onReceive,
@@ -153,6 +154,8 @@ func (c *MConnection) OnStart() error {
 func (c *MConnection) OnStop() {
        c.BaseService.OnStop()
        c.flushTimer.Stop()
+       c.pingTimer.Stop()
+       c.chStatsTimer.Stop()
        if c.quit != nil {
                close(c.quit)
        }
@@ -182,12 +185,12 @@ func (c *MConnection) Send(chID byte, msg interface{}) bool {
 
        channel, ok := c.channelsIdx[chID]
        if !ok {
-               log.WithField("chID", chID).Error("cannot send bytes due to unknown channel")
+               log.WithFields(log.Fields{"module": logModule, "chID": chID}).Error("cannot send bytes due to unknown channel")
                return false
        }
 
        if !channel.sendBytes(wire.BinaryBytes(msg)) {
-               log.WithFields(log.Fields{"chID": chID, "conn": c, "msg": msg}).Error("MConnection send failed")
+               log.WithFields(log.Fields{"module": logModule, "chID": chID, "conn": c, "msg": msg}).Error("MConnection send failed")
                return false
        }
 
@@ -198,6 +201,13 @@ func (c *MConnection) Send(chID byte, msg interface{}) bool {
        return true
 }
 
+// TrafficStatus return the in and out traffic status
+func (c *MConnection) TrafficStatus() (*flowrate.Status, *flowrate.Status) {
+       sentStatus := c.sendMonitor.Status()
+       receivedStatus := c.recvMonitor.Status()
+       return &sentStatus, &receivedStatus
+}
+
 // TrySend queues a message to be sent to channel(Nonblocking).
 func (c *MConnection) TrySend(chID byte, msg interface{}) bool {
        if !c.IsRunning() {
@@ -206,7 +216,7 @@ func (c *MConnection) TrySend(chID byte, msg interface{}) bool {
 
        channel, ok := c.channelsIdx[chID]
        if !ok {
-               log.WithField("chID", chID).Error("cannot send bytes due to unknown channel")
+               log.WithFields(log.Fields{"module": logModule, "chID": chID}).Error("cannot send bytes due to unknown channel")
                return false
        }
 
@@ -226,7 +236,7 @@ func (c *MConnection) String() string {
 
 func (c *MConnection) flush() {
        if err := c.bufWriter.Flush(); err != nil {
-               log.WithField("error", err).Error("MConnection flush failed")
+               log.WithFields(log.Fields{"module": logModule, "error": err}).Error("MConnection flush failed")
        }
 }
 
@@ -257,7 +267,7 @@ func (c *MConnection) recvRoutine() {
                c.recvMonitor.Update(int(n))
                if err != nil {
                        if c.IsRunning() {
-                               log.WithFields(log.Fields{"conn": c, "error": err}).Error("Connection failed @ recvRoutine (reading byte)")
+                               log.WithFields(log.Fields{"module": logModule, "conn": c, "error": err}).Error("Connection failed @ recvRoutine (reading byte)")
                                c.conn.Close()
                                c.stopForError(err)
                        }
@@ -267,11 +277,14 @@ func (c *MConnection) recvRoutine() {
                // Read more depending on packet type.
                switch pktType {
                case packetTypePing:
-                       log.Debug("receive Ping")
-                       c.pong <- struct{}{}
+                       log.WithFields(log.Fields{"module": logModule, "conn": c}).Debug("receive Ping")
+                       select {
+                       case c.pong <- struct{}{}:
+                       default:
+                       }
 
                case packetTypePong:
-                       log.Debug("receive Pong")
+                       log.WithFields(log.Fields{"module": logModule, "conn": c}).Debug("receive Pong")
 
                case packetTypeMsg:
                        pkt, n, err := msgPacket{}, int(0), error(nil)
@@ -279,7 +292,7 @@ func (c *MConnection) recvRoutine() {
                        c.recvMonitor.Update(int(n))
                        if err != nil {
                                if c.IsRunning() {
-                                       log.WithFields(log.Fields{"conn": c, "error": err}).Error("failed on recvRoutine")
+                                       log.WithFields(log.Fields{"module": logModule, "conn": c, "error": err}).Error("failed on recvRoutine")
                                        c.stopForError(err)
                                }
                                return
@@ -293,7 +306,7 @@ func (c *MConnection) recvRoutine() {
                        msgBytes, err := channel.recvMsgPacket(pkt)
                        if err != nil {
                                if c.IsRunning() {
-                                       log.WithFields(log.Fields{"conn": c, "error": err}).Error("failed on recvRoutine")
+                                       log.WithFields(log.Fields{"module": logModule, "conn": c, "error": err}).Error("failed on recvRoutine")
                                        c.stopForError(err)
                                }
                                return
@@ -328,7 +341,7 @@ func (c *MConnection) sendMsgPacket() bool {
 
        n, err := leastChannel.writeMsgPacketTo(c.bufWriter)
        if err != nil {
-               log.WithField("error", err).Error("failed to write msgPacket")
+               log.WithFields(log.Fields{"module": logModule, "error": err}).Error("failed to write msgPacket")
                c.stopForError(err)
                return true
        }
@@ -352,12 +365,12 @@ func (c *MConnection) sendRoutine() {
                                channel.updateStats()
                        }
                case <-c.pingTimer.C:
-                       log.Debug("send Ping")
+                       log.WithFields(log.Fields{"module": logModule, "conn": c}).Debug("send Ping")
                        wire.WriteByte(packetTypePing, c.bufWriter, &n, &err)
                        c.sendMonitor.Update(int(n))
                        c.flush()
                case <-c.pong:
-                       log.Debug("send Pong")
+                       log.WithFields(log.Fields{"module": logModule, "conn": c}).Debug("send Pong")
                        wire.WriteByte(packetTypePong, c.bufWriter, &n, &err)
                        c.sendMonitor.Update(int(n))
                        c.flush()
@@ -376,7 +389,7 @@ func (c *MConnection) sendRoutine() {
                        return
                }
                if err != nil {
-                       log.WithFields(log.Fields{"conn": c, "error": err}).Error("Connection failed @ sendRoutine")
+                       log.WithFields(log.Fields{"module": logModule, "conn": c, "error": err}).Error("Connection failed @ sendRoutine")
                        c.stopForError(err)
                        return
                }