log "github.com/sirupsen/logrus"
wire "github.com/tendermint/go-wire"
cmn "github.com/tendermint/tmlibs/common"
- flow "github.com/tendermint/tmlibs/flowrate"
+ "github.com/tendermint/tmlibs/flowrate"
)
const (
defaultRecvMessageCapacity = 22020096 // 21MB
defaultRecvRate = int64(512000) // 500KB/s
defaultSendTimeout = 10 * time.Second
+ logModule = "p2pConn"
)
type receiveCbFunc func(chID byte, msgBytes []byte)
conn net.Conn
bufReader *bufio.Reader
bufWriter *bufio.Writer
- sendMonitor *flow.Monitor
- recvMonitor *flow.Monitor
+ sendMonitor *flowrate.Monitor
+ recvMonitor *flowrate.Monitor
send chan struct{}
pong chan struct{}
channels []*channel
conn: conn,
bufReader: bufio.NewReaderSize(conn, minReadBufferSize),
bufWriter: bufio.NewWriterSize(conn, minWriteBufferSize),
- sendMonitor: flow.New(0, 0),
- recvMonitor: flow.New(0, 0),
+ sendMonitor: flowrate.New(0, 0),
+ recvMonitor: flowrate.New(0, 0),
send: make(chan struct{}, 1),
- pong: make(chan struct{}),
+ pong: make(chan struct{}, 1),
channelsIdx: map[byte]*channel{},
channels: []*channel{},
onReceive: onReceive,
func (c *MConnection) OnStop() {
c.BaseService.OnStop()
c.flushTimer.Stop()
+ c.pingTimer.Stop()
+ c.chStatsTimer.Stop()
if c.quit != nil {
close(c.quit)
}
channel, ok := c.channelsIdx[chID]
if !ok {
- log.WithField("chID", chID).Error("cannot send bytes due to unknown channel")
+ log.WithFields(log.Fields{"module": logModule, "chID": chID}).Error("cannot send bytes due to unknown channel")
return false
}
if !channel.sendBytes(wire.BinaryBytes(msg)) {
- log.WithFields(log.Fields{"chID": chID, "conn": c, "msg": msg}).Error("MConnection send failed")
+ log.WithFields(log.Fields{"module": logModule, "chID": chID, "conn": c, "msg": msg}).Error("MConnection send failed")
return false
}
return true
}
+// TrafficStatus return the in and out traffic status
+func (c *MConnection) TrafficStatus() (*flowrate.Status, *flowrate.Status) {
+ sentStatus := c.sendMonitor.Status()
+ receivedStatus := c.recvMonitor.Status()
+ return &sentStatus, &receivedStatus
+}
+
// TrySend queues a message to be sent to channel(Nonblocking).
func (c *MConnection) TrySend(chID byte, msg interface{}) bool {
if !c.IsRunning() {
channel, ok := c.channelsIdx[chID]
if !ok {
- log.WithField("chID", chID).Error("cannot send bytes due to unknown channel")
+ log.WithFields(log.Fields{"module": logModule, "chID": chID}).Error("cannot send bytes due to unknown channel")
return false
}
func (c *MConnection) flush() {
if err := c.bufWriter.Flush(); err != nil {
- log.WithField("error", err).Error("MConnection flush failed")
+ log.WithFields(log.Fields{"module": logModule, "error": err}).Error("MConnection flush failed")
}
}
c.recvMonitor.Update(int(n))
if err != nil {
if c.IsRunning() {
- log.WithFields(log.Fields{"conn": c, "error": err}).Error("Connection failed @ recvRoutine (reading byte)")
+ log.WithFields(log.Fields{"module": logModule, "conn": c, "error": err}).Error("Connection failed @ recvRoutine (reading byte)")
c.conn.Close()
c.stopForError(err)
}
// Read more depending on packet type.
switch pktType {
case packetTypePing:
- log.Debug("receive Ping")
- c.pong <- struct{}{}
+ log.WithFields(log.Fields{"module": logModule, "conn": c}).Debug("receive Ping")
+ select {
+ case c.pong <- struct{}{}:
+ default:
+ }
case packetTypePong:
- log.Debug("receive Pong")
+ log.WithFields(log.Fields{"module": logModule, "conn": c}).Debug("receive Pong")
case packetTypeMsg:
pkt, n, err := msgPacket{}, int(0), error(nil)
c.recvMonitor.Update(int(n))
if err != nil {
if c.IsRunning() {
- log.WithFields(log.Fields{"conn": c, "error": err}).Error("failed on recvRoutine")
+ log.WithFields(log.Fields{"module": logModule, "conn": c, "error": err}).Error("failed on recvRoutine")
c.stopForError(err)
}
return
msgBytes, err := channel.recvMsgPacket(pkt)
if err != nil {
if c.IsRunning() {
- log.WithFields(log.Fields{"conn": c, "error": err}).Error("failed on recvRoutine")
+ log.WithFields(log.Fields{"module": logModule, "conn": c, "error": err}).Error("failed on recvRoutine")
c.stopForError(err)
}
return
n, err := leastChannel.writeMsgPacketTo(c.bufWriter)
if err != nil {
- log.WithField("error", err).Error("failed to write msgPacket")
+ log.WithFields(log.Fields{"module": logModule, "error": err}).Error("failed to write msgPacket")
c.stopForError(err)
return true
}
channel.updateStats()
}
case <-c.pingTimer.C:
- log.Debug("send Ping")
+ log.WithFields(log.Fields{"module": logModule, "conn": c}).Debug("send Ping")
wire.WriteByte(packetTypePing, c.bufWriter, &n, &err)
c.sendMonitor.Update(int(n))
c.flush()
case <-c.pong:
- log.Debug("send Pong")
+ log.WithFields(log.Fields{"module": logModule, "conn": c}).Debug("send Pong")
wire.WriteByte(packetTypePong, c.bufWriter, &n, &err)
c.sendMonitor.Update(int(n))
c.flush()
return
}
if err != nil {
- log.WithFields(log.Fields{"conn": c, "error": err}).Error("Connection failed @ sendRoutine")
+ log.WithFields(log.Fields{"module": logModule, "conn": c, "error": err}).Error("Connection failed @ sendRoutine")
c.stopForError(err)
return
}