--- /dev/null
+package compression
+
+import "fmt"
+
+// Compression is intterface
+type Compression interface {
+ CompressBytes(data []byte) []byte
+ DecompressBytes(data []byte) ([]byte, error)
+}
+
+const (
+ SnappyBackendStr = "snappy" // legacy, defaults to SnappyBackendStr.
+)
+
+type compressionCreator func() Compression
+
+var backends = map[string]compressionCreator{}
+
+func registerCompressionCreator(backend string, creator compressionCreator, force bool) {
+ _, ok := backends[backend]
+ if !force && ok {
+ return
+ }
+ backends[backend] = creator
+}
+
+func NewCompression(backend string) Compression {
+ compression, ok := backends[backend]
+ if !ok {
+ panic(fmt.Sprintf("Cannot find compression algorithm:[%s]", backend))
+
+ }
+ return compression()
+}
--- /dev/null
+package compression
+
+import (
+ sny "github.com/golang/snappy"
+)
+
+func init() {
+ creator := func() Compression {
+ return NewSnappy()
+ }
+
+ registerCompressionCreator(SnappyBackendStr, creator, false)
+}
+
+type Snappy struct {
+}
+
+func NewSnappy() *Snappy {
+ return &Snappy{}
+}
+
+func (s *Snappy) CompressBytes(data []byte) []byte {
+ return sny.Encode(nil, data)
+}
+
+func (s *Snappy) DecompressBytes(data []byte) ([]byte, error) {
+ return sny.Decode(nil, data)
+}
wire "github.com/tendermint/go-wire"
cmn "github.com/tendermint/tmlibs/common"
"github.com/tendermint/tmlibs/flowrate"
+
+ "github.com/vapor/common/compression"
)
const (
defaultSendQueueCapacity = 1
defaultSendRate = int64(104857600) // 100MB/s
defaultRecvBufferCapacity = 4096
- defaultRecvMessageCapacity = 22020096 // 21MB
+ defaultRecvMessageCapacity = 22020096 // 21MB
defaultRecvRate = int64(104857600) // 100MB/s
defaultSendTimeout = 10 * time.Second
logModule = "p2p/conn"
flushTimer *cmn.ThrottleTimer // flush writes as necessary but throttled.
pingTimer *time.Ticker // send pings periodically
chStatsTimer *time.Ticker // update channel stats periodically
+
+ compression compression.Compression
}
// MConnConfig is a MConnection configuration.
type MConnConfig struct {
- SendRate int64 `mapstructure:"send_rate"`
- RecvRate int64 `mapstructure:"recv_rate"`
+ SendRate int64 `mapstructure:"send_rate"`
+ RecvRate int64 `mapstructure:"recv_rate"`
+ Compression string `mapstructure:"compression_backend"`
}
// DefaultMConnConfig returns the default config.
-func DefaultMConnConfig() *MConnConfig {
+func DefaultMConnConfig(compression string) *MConnConfig {
return &MConnConfig{
- SendRate: defaultSendRate,
- RecvRate: defaultRecvRate,
+ SendRate: defaultSendRate,
+ RecvRate: defaultRecvRate,
+ Compression: compression,
}
}
pingTimer: time.NewTicker(pingTimeout),
chStatsTimer: time.NewTicker(updateState),
+ compression: compression.NewCompression(config.Compression),
}
for _, desc := range chDescs {
return false
}
- if !channel.sendBytes(wire.BinaryBytes(msg)) {
+ compressData := c.compression.CompressBytes(wire.BinaryBytes(msg))
+
+ if !channel.sendBytes(compressData) {
log.WithFields(log.Fields{"module": logModule, "chID": chID, "conn": c, "msg": msg}).Error("MConnection send failed")
return false
}
return false
}
- ok = channel.trySendBytes(wire.BinaryBytes(msg))
+ compressData := c.compression.CompressBytes(wire.BinaryBytes(msg))
+
+ ok = channel.trySendBytes(compressData)
if ok {
select {
case c.send <- struct{}{}:
}
if msgBytes != nil {
- c.onReceive(pkt.ChannelID, msgBytes)
+ data, err := c.compression.DecompressBytes(msgBytes)
+ if err != nil {
+ log.WithFields(log.Fields{"module": logModule, "conn": c, "error": err}).Error("failed decompress bytes")
+ return
+ }
+ c.onReceive(pkt.ChannelID, data)
}
default: