wire "github.com/tendermint/go-wire"
cmn "github.com/tendermint/tmlibs/common"
"github.com/tendermint/tmlibs/flowrate"
+
+ "github.com/vapor/common/compression"
)
const (
flushThrottle = 100 * time.Millisecond
defaultSendQueueCapacity = 1
- defaultSendRate = int64(512000) // 500KB/s
+ defaultSendRate = int64(104857600) // 100MB/s
defaultRecvBufferCapacity = 4096
- defaultRecvMessageCapacity = 22020096 // 21MB
- defaultRecvRate = int64(512000) // 500KB/s
+ defaultRecvMessageCapacity = 22020096 // 21MB
+ defaultRecvRate = int64(104857600) // 100MB/s
defaultSendTimeout = 10 * time.Second
+ logModule = "p2pConn"
)
type receiveCbFunc func(chID byte, msgBytes []byte)
flushTimer *cmn.ThrottleTimer // flush writes as necessary but throttled.
pingTimer *time.Ticker // send pings periodically
chStatsTimer *time.Ticker // update channel stats periodically
+
+ compression compression.Compression
}
// MConnConfig is a MConnection configuration.
type MConnConfig struct {
- SendRate int64 `mapstructure:"send_rate"`
- RecvRate int64 `mapstructure:"recv_rate"`
+ SendRate int64 `mapstructure:"send_rate"`
+ RecvRate int64 `mapstructure:"recv_rate"`
+ Compression string `mapstructure:"compression_backend"`
}
// DefaultMConnConfig returns the default config.
-func DefaultMConnConfig() *MConnConfig {
+func DefaultMConnConfig(compression string) *MConnConfig {
return &MConnConfig{
- SendRate: defaultSendRate,
- RecvRate: defaultRecvRate,
+ SendRate: defaultSendRate,
+ RecvRate: defaultRecvRate,
+ Compression: compression,
}
}
pingTimer: time.NewTicker(pingTimeout),
chStatsTimer: time.NewTicker(updateState),
+ compression: compression.NewCompression(config.Compression),
}
for _, desc := range chDescs {
func (c *MConnection) OnStop() {
c.BaseService.OnStop()
c.flushTimer.Stop()
+ c.pingTimer.Stop()
+ c.chStatsTimer.Stop()
if c.quit != nil {
close(c.quit)
}
channel, ok := c.channelsIdx[chID]
if !ok {
- log.WithField("chID", chID).Error("cannot send bytes due to unknown channel")
+ log.WithFields(log.Fields{"module": logModule, "chID": chID}).Error("cannot send bytes due to unknown channel")
return false
}
- if !channel.sendBytes(wire.BinaryBytes(msg)) {
- log.WithFields(log.Fields{"chID": chID, "conn": c, "msg": msg}).Error("MConnection send failed")
+ compressData := c.compression.CompressBytes(wire.BinaryBytes(msg))
+
+ if !channel.sendBytes(compressData) {
+ log.WithFields(log.Fields{"module": logModule, "chID": chID, "conn": c, "msg": msg}).Error("MConnection send failed")
return false
}
channel, ok := c.channelsIdx[chID]
if !ok {
- log.WithField("chID", chID).Error("cannot send bytes due to unknown channel")
+ log.WithFields(log.Fields{"module": logModule, "chID": chID}).Error("cannot send bytes due to unknown channel")
return false
}
- ok = channel.trySendBytes(wire.BinaryBytes(msg))
+ compressData := c.compression.CompressBytes(wire.BinaryBytes(msg))
+
+ ok = channel.trySendBytes(compressData)
if ok {
select {
case c.send <- struct{}{}:
func (c *MConnection) flush() {
if err := c.bufWriter.Flush(); err != nil {
- log.WithField("error", err).Error("MConnection flush failed")
+ log.WithFields(log.Fields{"module": logModule, "error": err}).Error("MConnection flush failed")
}
}
c.recvMonitor.Update(int(n))
if err != nil {
if c.IsRunning() {
- log.WithFields(log.Fields{"conn": c, "error": err}).Error("Connection failed @ recvRoutine (reading byte)")
+ log.WithFields(log.Fields{"module": logModule, "conn": c, "error": err}).Error("Connection failed @ recvRoutine (reading byte)")
c.conn.Close()
c.stopForError(err)
}
// Read more depending on packet type.
switch pktType {
case packetTypePing:
- log.Debug("receive Ping")
+ log.WithFields(log.Fields{"module": logModule, "conn": c}).Debug("receive Ping")
select {
case c.pong <- struct{}{}:
default:
}
case packetTypePong:
- log.Debug("receive Pong")
+ log.WithFields(log.Fields{"module": logModule, "conn": c}).Debug("receive Pong")
case packetTypeMsg:
pkt, n, err := msgPacket{}, int(0), error(nil)
c.recvMonitor.Update(int(n))
if err != nil {
if c.IsRunning() {
- log.WithFields(log.Fields{"conn": c, "error": err}).Error("failed on recvRoutine")
+ log.WithFields(log.Fields{"module": logModule, "conn": c, "error": err}).Error("failed on recvRoutine")
c.stopForError(err)
}
return
msgBytes, err := channel.recvMsgPacket(pkt)
if err != nil {
if c.IsRunning() {
- log.WithFields(log.Fields{"conn": c, "error": err}).Error("failed on recvRoutine")
+ log.WithFields(log.Fields{"module": logModule, "conn": c, "error": err}).Error("failed on recvRoutine")
c.stopForError(err)
}
return
}
if msgBytes != nil {
- c.onReceive(pkt.ChannelID, msgBytes)
+ data, err := c.compression.DecompressBytes(msgBytes)
+ if err != nil {
+ log.WithFields(log.Fields{"module": logModule, "conn": c, "error": err}).Error("failed decompress bytes")
+ return
+ }
+ c.onReceive(pkt.ChannelID, data)
}
default:
n, err := leastChannel.writeMsgPacketTo(c.bufWriter)
if err != nil {
- log.WithField("error", err).Error("failed to write msgPacket")
+ log.WithFields(log.Fields{"module": logModule, "error": err}).Error("failed to write msgPacket")
c.stopForError(err)
return true
}
channel.updateStats()
}
case <-c.pingTimer.C:
- log.Debug("send Ping")
+ log.WithFields(log.Fields{"module": logModule, "conn": c}).Debug("send Ping")
wire.WriteByte(packetTypePing, c.bufWriter, &n, &err)
c.sendMonitor.Update(int(n))
c.flush()
case <-c.pong:
- log.Debug("send Pong")
+ log.WithFields(log.Fields{"module": logModule, "conn": c}).Debug("send Pong")
wire.WriteByte(packetTypePong, c.bufWriter, &n, &err)
c.sendMonitor.Update(int(n))
c.flush()
return
}
if err != nil {
- log.WithFields(log.Fields{"conn": c, "error": err}).Error("Connection failed @ sendRoutine")
+ log.WithFields(log.Fields{"module": logModule, "conn": c, "error": err}).Error("Connection failed @ sendRoutine")
c.stopForError(err)
return
}