OSDN Git Service

mv dockerfiles (#377)
[bytom/vapor.git] / p2p / connection / connection.go
index ee4e6a7..af5f645 100644 (file)
@@ -13,6 +13,8 @@ import (
        wire "github.com/tendermint/go-wire"
        cmn "github.com/tendermint/tmlibs/common"
        "github.com/tendermint/tmlibs/flowrate"
+
+       "github.com/vapor/common/compression"
 )
 
 const (
@@ -31,11 +33,12 @@ const (
        flushThrottle      = 100 * time.Millisecond
 
        defaultSendQueueCapacity   = 1
-       defaultSendRate            = int64(512000) // 500KB/s
+       defaultSendRate            = int64(104857600) // 100MB/s
        defaultRecvBufferCapacity  = 4096
-       defaultRecvMessageCapacity = 22020096      // 21MB
-       defaultRecvRate            = int64(512000) // 500KB/s
+       defaultRecvMessageCapacity = 22020096         // 21MB
+       defaultRecvRate            = int64(104857600) // 100MB/s
        defaultSendTimeout         = 10 * time.Second
+       logModule                  = "p2pConn"
 )
 
 type receiveCbFunc func(chID byte, msgBytes []byte)
@@ -93,19 +96,23 @@ type MConnection struct {
        flushTimer   *cmn.ThrottleTimer // flush writes as necessary but throttled.
        pingTimer    *time.Ticker       // send pings periodically
        chStatsTimer *time.Ticker       // update channel stats periodically
+
+       compression compression.Compression
 }
 
 // MConnConfig is a MConnection configuration.
 type MConnConfig struct {
-       SendRate int64 `mapstructure:"send_rate"`
-       RecvRate int64 `mapstructure:"recv_rate"`
+       SendRate    int64  `mapstructure:"send_rate"`
+       RecvRate    int64  `mapstructure:"recv_rate"`
+       Compression string `mapstructure:"compression_backend"`
 }
 
 // DefaultMConnConfig returns the default config.
-func DefaultMConnConfig() *MConnConfig {
+func DefaultMConnConfig(compression string) *MConnConfig {
        return &MConnConfig{
-               SendRate: defaultSendRate,
-               RecvRate: defaultRecvRate,
+               SendRate:    defaultSendRate,
+               RecvRate:    defaultRecvRate,
+               Compression: compression,
        }
 }
 
@@ -127,6 +134,7 @@ func NewMConnectionWithConfig(conn net.Conn, chDescs []*ChannelDescriptor, onRec
 
                pingTimer:    time.NewTicker(pingTimeout),
                chStatsTimer: time.NewTicker(updateState),
+               compression:  compression.NewCompression(config.Compression),
        }
 
        for _, desc := range chDescs {
@@ -153,6 +161,8 @@ func (c *MConnection) OnStart() error {
 func (c *MConnection) OnStop() {
        c.BaseService.OnStop()
        c.flushTimer.Stop()
+       c.pingTimer.Stop()
+       c.chStatsTimer.Stop()
        if c.quit != nil {
                close(c.quit)
        }
@@ -182,12 +192,14 @@ func (c *MConnection) Send(chID byte, msg interface{}) bool {
 
        channel, ok := c.channelsIdx[chID]
        if !ok {
-               log.WithField("chID", chID).Error("cannot send bytes due to unknown channel")
+               log.WithFields(log.Fields{"module": logModule, "chID": chID}).Error("cannot send bytes due to unknown channel")
                return false
        }
 
-       if !channel.sendBytes(wire.BinaryBytes(msg)) {
-               log.WithFields(log.Fields{"chID": chID, "conn": c, "msg": msg}).Error("MConnection send failed")
+       compressData := c.compression.CompressBytes(wire.BinaryBytes(msg))
+
+       if !channel.sendBytes(compressData) {
+               log.WithFields(log.Fields{"module": logModule, "chID": chID, "conn": c, "msg": msg}).Error("MConnection send failed")
                return false
        }
 
@@ -213,11 +225,13 @@ func (c *MConnection) TrySend(chID byte, msg interface{}) bool {
 
        channel, ok := c.channelsIdx[chID]
        if !ok {
-               log.WithField("chID", chID).Error("cannot send bytes due to unknown channel")
+               log.WithFields(log.Fields{"module": logModule, "chID": chID}).Error("cannot send bytes due to unknown channel")
                return false
        }
 
-       ok = channel.trySendBytes(wire.BinaryBytes(msg))
+       compressData := c.compression.CompressBytes(wire.BinaryBytes(msg))
+
+       ok = channel.trySendBytes(compressData)
        if ok {
                select {
                case c.send <- struct{}{}:
@@ -233,7 +247,7 @@ func (c *MConnection) String() string {
 
 func (c *MConnection) flush() {
        if err := c.bufWriter.Flush(); err != nil {
-               log.WithField("error", err).Error("MConnection flush failed")
+               log.WithFields(log.Fields{"module": logModule, "error": err}).Error("MConnection flush failed")
        }
 }
 
@@ -264,7 +278,7 @@ func (c *MConnection) recvRoutine() {
                c.recvMonitor.Update(int(n))
                if err != nil {
                        if c.IsRunning() {
-                               log.WithFields(log.Fields{"conn": c, "error": err}).Error("Connection failed @ recvRoutine (reading byte)")
+                               log.WithFields(log.Fields{"module": logModule, "conn": c, "error": err}).Error("Connection failed @ recvRoutine (reading byte)")
                                c.conn.Close()
                                c.stopForError(err)
                        }
@@ -274,14 +288,14 @@ func (c *MConnection) recvRoutine() {
                // Read more depending on packet type.
                switch pktType {
                case packetTypePing:
-                       log.Debug("receive Ping")
+                       log.WithFields(log.Fields{"module": logModule, "conn": c}).Debug("receive Ping")
                        select {
                        case c.pong <- struct{}{}:
                        default:
                        }
 
                case packetTypePong:
-                       log.Debug("receive Pong")
+                       log.WithFields(log.Fields{"module": logModule, "conn": c}).Debug("receive Pong")
 
                case packetTypeMsg:
                        pkt, n, err := msgPacket{}, int(0), error(nil)
@@ -289,7 +303,7 @@ func (c *MConnection) recvRoutine() {
                        c.recvMonitor.Update(int(n))
                        if err != nil {
                                if c.IsRunning() {
-                                       log.WithFields(log.Fields{"conn": c, "error": err}).Error("failed on recvRoutine")
+                                       log.WithFields(log.Fields{"module": logModule, "conn": c, "error": err}).Error("failed on recvRoutine")
                                        c.stopForError(err)
                                }
                                return
@@ -303,14 +317,19 @@ func (c *MConnection) recvRoutine() {
                        msgBytes, err := channel.recvMsgPacket(pkt)
                        if err != nil {
                                if c.IsRunning() {
-                                       log.WithFields(log.Fields{"conn": c, "error": err}).Error("failed on recvRoutine")
+                                       log.WithFields(log.Fields{"module": logModule, "conn": c, "error": err}).Error("failed on recvRoutine")
                                        c.stopForError(err)
                                }
                                return
                        }
 
                        if msgBytes != nil {
-                               c.onReceive(pkt.ChannelID, msgBytes)
+                               data, err := c.compression.DecompressBytes(msgBytes)
+                               if err != nil {
+                                       log.WithFields(log.Fields{"module": logModule, "conn": c, "error": err}).Error("failed decompress bytes")
+                                       return
+                               }
+                               c.onReceive(pkt.ChannelID, data)
                        }
 
                default:
@@ -338,7 +357,7 @@ func (c *MConnection) sendMsgPacket() bool {
 
        n, err := leastChannel.writeMsgPacketTo(c.bufWriter)
        if err != nil {
-               log.WithField("error", err).Error("failed to write msgPacket")
+               log.WithFields(log.Fields{"module": logModule, "error": err}).Error("failed to write msgPacket")
                c.stopForError(err)
                return true
        }
@@ -362,12 +381,12 @@ func (c *MConnection) sendRoutine() {
                                channel.updateStats()
                        }
                case <-c.pingTimer.C:
-                       log.Debug("send Ping")
+                       log.WithFields(log.Fields{"module": logModule, "conn": c}).Debug("send Ping")
                        wire.WriteByte(packetTypePing, c.bufWriter, &n, &err)
                        c.sendMonitor.Update(int(n))
                        c.flush()
                case <-c.pong:
-                       log.Debug("send Pong")
+                       log.WithFields(log.Fields{"module": logModule, "conn": c}).Debug("send Pong")
                        wire.WriteByte(packetTypePong, c.bufWriter, &n, &err)
                        c.sendMonitor.Update(int(n))
                        c.flush()
@@ -386,7 +405,7 @@ func (c *MConnection) sendRoutine() {
                        return
                }
                if err != nil {
-                       log.WithFields(log.Fields{"conn": c, "error": err}).Error("Connection failed @ sendRoutine")
+                       log.WithFields(log.Fields{"module": logModule, "conn": c, "error": err}).Error("Connection failed @ sendRoutine")
                        c.stopForError(err)
                        return
                }