OSDN Git Service

add compression (#240)
[bytom/vapor.git] / p2p / connection / connection.go
index ba2bf1b..576be24 100644 (file)
@@ -13,6 +13,8 @@ import (
        wire "github.com/tendermint/go-wire"
        cmn "github.com/tendermint/tmlibs/common"
        "github.com/tendermint/tmlibs/flowrate"
+
+       "github.com/vapor/common/compression"
 )
 
 const (
@@ -33,7 +35,7 @@ const (
        defaultSendQueueCapacity   = 1
        defaultSendRate            = int64(104857600) // 100MB/s
        defaultRecvBufferCapacity  = 4096
-       defaultRecvMessageCapacity = 22020096      // 21MB
+       defaultRecvMessageCapacity = 22020096         // 21MB
        defaultRecvRate            = int64(104857600) // 100MB/s
        defaultSendTimeout         = 10 * time.Second
        logModule                  = "p2p/conn"
@@ -94,19 +96,23 @@ type MConnection struct {
        flushTimer   *cmn.ThrottleTimer // flush writes as necessary but throttled.
        pingTimer    *time.Ticker       // send pings periodically
        chStatsTimer *time.Ticker       // update channel stats periodically
+
+       compression compression.Compression
 }
 
 // MConnConfig is a MConnection configuration.
 type MConnConfig struct {
-       SendRate int64 `mapstructure:"send_rate"`
-       RecvRate int64 `mapstructure:"recv_rate"`
+       SendRate    int64  `mapstructure:"send_rate"`
+       RecvRate    int64  `mapstructure:"recv_rate"`
+       Compression string `mapstructure:"compression_backend"`
 }
 
 // DefaultMConnConfig returns the default config.
-func DefaultMConnConfig() *MConnConfig {
+func DefaultMConnConfig(compression string) *MConnConfig {
        return &MConnConfig{
-               SendRate: defaultSendRate,
-               RecvRate: defaultRecvRate,
+               SendRate:    defaultSendRate,
+               RecvRate:    defaultRecvRate,
+               Compression: compression,
        }
 }
 
@@ -128,6 +134,7 @@ func NewMConnectionWithConfig(conn net.Conn, chDescs []*ChannelDescriptor, onRec
 
                pingTimer:    time.NewTicker(pingTimeout),
                chStatsTimer: time.NewTicker(updateState),
+               compression:  compression.NewCompression(config.Compression),
        }
 
        for _, desc := range chDescs {
@@ -189,7 +196,9 @@ func (c *MConnection) Send(chID byte, msg interface{}) bool {
                return false
        }
 
-       if !channel.sendBytes(wire.BinaryBytes(msg)) {
+       compressData := c.compression.CompressBytes(wire.BinaryBytes(msg))
+
+       if !channel.sendBytes(compressData) {
                log.WithFields(log.Fields{"module": logModule, "chID": chID, "conn": c, "msg": msg}).Error("MConnection send failed")
                return false
        }
@@ -220,7 +229,9 @@ func (c *MConnection) TrySend(chID byte, msg interface{}) bool {
                return false
        }
 
-       ok = channel.trySendBytes(wire.BinaryBytes(msg))
+       compressData := c.compression.CompressBytes(wire.BinaryBytes(msg))
+
+       ok = channel.trySendBytes(compressData)
        if ok {
                select {
                case c.send <- struct{}{}:
@@ -313,7 +324,12 @@ func (c *MConnection) recvRoutine() {
                        }
 
                        if msgBytes != nil {
-                               c.onReceive(pkt.ChannelID, msgBytes)
+                               data, err := c.compression.DecompressBytes(msgBytes)
+                               if err != nil {
+                                       log.WithFields(log.Fields{"module": logModule, "conn": c, "error": err}).Error("failed decompress bytes")
+                                       return
+                               }
+                               c.onReceive(pkt.ChannelID, data)
                        }
 
                default: