X-Git-Url: http://git.osdn.net/view?a=blobdiff_plain;f=p2p%2Fconnection%2Fconnection.go;h=576be24b482729f2ca0da328e7dfea9168918370;hb=101539186ad6a4422b703abf5f8739f26c7a21f3;hp=ba2bf1b11b2794acd9716193243494a9600c0dc8;hpb=4e50d5a8c598700df67e37117a9bd47ba42e5a17;p=bytom%2Fvapor.git diff --git a/p2p/connection/connection.go b/p2p/connection/connection.go index ba2bf1b1..576be24b 100644 --- a/p2p/connection/connection.go +++ b/p2p/connection/connection.go @@ -13,6 +13,8 @@ import ( wire "github.com/tendermint/go-wire" cmn "github.com/tendermint/tmlibs/common" "github.com/tendermint/tmlibs/flowrate" + + "github.com/vapor/common/compression" ) const ( @@ -33,7 +35,7 @@ const ( defaultSendQueueCapacity = 1 defaultSendRate = int64(104857600) // 100MB/s defaultRecvBufferCapacity = 4096 - defaultRecvMessageCapacity = 22020096 // 21MB + defaultRecvMessageCapacity = 22020096 // 21MB defaultRecvRate = int64(104857600) // 100MB/s defaultSendTimeout = 10 * time.Second logModule = "p2p/conn" @@ -94,19 +96,23 @@ type MConnection struct { flushTimer *cmn.ThrottleTimer // flush writes as necessary but throttled. pingTimer *time.Ticker // send pings periodically chStatsTimer *time.Ticker // update channel stats periodically + + compression compression.Compression } // MConnConfig is a MConnection configuration. type MConnConfig struct { - SendRate int64 `mapstructure:"send_rate"` - RecvRate int64 `mapstructure:"recv_rate"` + SendRate int64 `mapstructure:"send_rate"` + RecvRate int64 `mapstructure:"recv_rate"` + Compression string `mapstructure:"compression_backend"` } // DefaultMConnConfig returns the default config. -func DefaultMConnConfig() *MConnConfig { +func DefaultMConnConfig(compression string) *MConnConfig { return &MConnConfig{ - SendRate: defaultSendRate, - RecvRate: defaultRecvRate, + SendRate: defaultSendRate, + RecvRate: defaultRecvRate, + Compression: compression, } } @@ -128,6 +134,7 @@ func NewMConnectionWithConfig(conn net.Conn, chDescs []*ChannelDescriptor, onRec pingTimer: time.NewTicker(pingTimeout), chStatsTimer: time.NewTicker(updateState), + compression: compression.NewCompression(config.Compression), } for _, desc := range chDescs { @@ -189,7 +196,9 @@ func (c *MConnection) Send(chID byte, msg interface{}) bool { return false } - if !channel.sendBytes(wire.BinaryBytes(msg)) { + compressData := c.compression.CompressBytes(wire.BinaryBytes(msg)) + + if !channel.sendBytes(compressData) { log.WithFields(log.Fields{"module": logModule, "chID": chID, "conn": c, "msg": msg}).Error("MConnection send failed") return false } @@ -220,7 +229,9 @@ func (c *MConnection) TrySend(chID byte, msg interface{}) bool { return false } - ok = channel.trySendBytes(wire.BinaryBytes(msg)) + compressData := c.compression.CompressBytes(wire.BinaryBytes(msg)) + + ok = channel.trySendBytes(compressData) if ok { select { case c.send <- struct{}{}: @@ -313,7 +324,12 @@ func (c *MConnection) recvRoutine() { } if msgBytes != nil { - c.onReceive(pkt.ChannelID, msgBytes) + data, err := c.compression.DecompressBytes(msgBytes) + if err != nil { + log.WithFields(log.Fields{"module": logModule, "conn": c, "error": err}).Error("failed decompress bytes") + return + } + c.onReceive(pkt.ChannelID, data) } default: