quit chan struct{}
flushTimer *cmn.ThrottleTimer // flush writes as necessary but throttled.
- pingTimer *cmn.RepeatTimer // send pings periodically
- chStatsTimer *cmn.RepeatTimer // update channel stats periodically
+ pingTimer *time.Ticker // send pings periodically
+ chStatsTimer *time.Ticker // update channel stats periodically
LocalAddress *NetAddress
RemoteAddress *NetAddress
onError: onError,
config: config,
+ pingTimer: time.NewTicker(pingTimeout),
+ chStatsTimer: time.NewTicker(updateState),
+
LocalAddress: NewNetAddress(conn.LocalAddr()),
RemoteAddress: NewNetAddress(conn.RemoteAddr()),
}
c.BaseService.OnStart()
c.quit = make(chan struct{})
c.flushTimer = cmn.NewThrottleTimer("flush", flushThrottle)
- c.pingTimer = cmn.NewRepeatTimer("ping", pingTimeout)
- c.chStatsTimer = cmn.NewRepeatTimer("chStats", updateState)
go c.sendRoutine()
go c.recvRoutine()
return nil
func (c *MConnection) OnStop() {
c.BaseService.OnStop()
c.flushTimer.Stop()
- c.pingTimer.Stop()
- c.chStatsTimer.Stop()
if c.quit != nil {
close(c.quit)
}
// NOTE: flushTimer.Set() must be called every time
// something is written to .bufWriter.
c.flush()
- case <-c.chStatsTimer.Ch:
+ case <-c.chStatsTimer.C:
for _, channel := range c.channels {
channel.updateStats()
}
- case <-c.pingTimer.Ch:
+ case <-c.pingTimer.C:
log.Debug("Send Ping")
wire.WriteByte(packetTypePing, c.bufWriter, &n, &err)
c.sendMonitor.Update(int(n))
/*
// Peek into bufReader for debugging
if numBytes := c.bufReader.Buffered(); numBytes > 0 {
- log.Info("Peek connection buffer", "numBytes", numBytes, "bytes", log15.Lazy{func() []byte {
- bytes, err := c.bufReader.Peek(MinInt(numBytes, 100))
- if err == nil {
- return bytes
- } else {
- log.Warn("Error peeking connection buffer", "error", err)
- return nil
- }
- }})
+ log.Infof("Peek connection buffer numBytes:", numBytes)
+ bytes, err := c.bufReader.Peek(cmn.MinInt(numBytes, 100))
+ if err == nil {
+ log.Infof("bytes:", bytes)
+ } else {
+ log.Warning("Error peeking connection buffer err:", err)
+ }
+ } else {
+ log.Warning("Received bytes number is:", numBytes)
}
*/
"conn": c,
"error": err,
}).Error("Connection failed @ recvRoutine (reading byte)")
+ c.conn.Close()
c.stopForError(err)
}
break FOR_LOOP
}
channel, ok := c.channelsIdx[pkt.ChannelID]
if !ok || channel == nil {
- cmn.PanicQ(cmn.Fmt("Unknown channel %X", pkt.ChannelID))
+ if pkt.ChannelID == PexChannel {
+ continue
+ } else {
+ cmn.PanicQ(cmn.Fmt("Unknown channel %X", pkt.ChannelID))
+ }
}
msgBytes, err := channel.recvMsgPacket(pkt)
if err != nil {
default:
cmn.PanicSanity(cmn.Fmt("Unknown message type %X", pktType))
}
-
- // TODO: shouldn't this go in the sendRoutine?
- // Better to send a ping packet when *we* haven't sent anything for a while.
- c.pingTimer.Reset()
}
// Cleanup