12 log "github.com/sirupsen/logrus"
13 wire "github.com/tendermint/go-wire"
14 cmn "github.com/tendermint/tmlibs/common"
15 "github.com/tendermint/tmlibs/flowrate"
17 "github.com/vapor/common/compression"
21 packetTypePing = byte(0x01)
22 packetTypePong = byte(0x02)
23 packetTypeMsg = byte(0x03)
24 maxMsgPacketPayloadSize = 1024
25 maxMsgPacketOverheadSize = 10 // It's actually lower but good enough
26 maxMsgPacketTotalSize = maxMsgPacketPayloadSize + maxMsgPacketOverheadSize
28 numBatchMsgPackets = 10
29 minReadBufferSize = 1024
30 minWriteBufferSize = 65536
31 updateState = 2 * time.Second
32 pingTimeout = 40 * time.Second
33 flushThrottle = 100 * time.Millisecond
35 defaultSendQueueCapacity = 1
36 defaultSendRate = int64(104857600) // 100MB/s
37 defaultRecvBufferCapacity = 4096
38 defaultRecvMessageCapacity = 22020096 // 21MB
39 defaultRecvRate = int64(104857600) // 100MB/s
40 defaultSendTimeout = 10 * time.Second
44 type receiveCbFunc func(chID byte, msgBytes []byte)
45 type errorCbFunc func(interface{})
47 // Messages in channels are chopped into smaller msgPackets for multiplexing.
48 type msgPacket struct {
50 EOF byte // 1 means message ends here.
54 func (p msgPacket) String() string {
55 return fmt.Sprintf("MsgPacket{%X:%X T:%X}", p.ChannelID, p.Bytes, p.EOF)
59 MConnection handles message transmission on multiple abstract communication
60 `Channel`s. Each channel has a globally unique byte id.
61 The byte id and the relative priorities of each `Channel` are configured upon
62 initialization of the connection.
64 There are two methods for sending messages:
65 func (m MConnection) Send(chID byte, msg interface{}) bool {}
66 func (m MConnection) TrySend(chID byte, msg interface{}) bool {}
68 `Send(chID, msg)` is a blocking call that waits until `msg` is successfully queued
69 for the channel with the given id byte `chID`, or until the request times out.
70 The message `msg` is serialized using the `tendermint/wire` submodule's
71 `WriteBinary()` reflection routine.
73 `TrySend(chID, msg)` is a nonblocking call that returns false if the channel's
76 Inbound message bytes are handled with an onReceive callback function.
78 type MConnection struct {
82 bufReader *bufio.Reader
83 bufWriter *bufio.Writer
84 sendMonitor *flowrate.Monitor
85 recvMonitor *flowrate.Monitor
89 channelsIdx map[byte]*channel
90 onReceive receiveCbFunc
96 flushTimer *cmn.ThrottleTimer // flush writes as necessary but throttled.
97 pingTimer *time.Ticker // send pings periodically
98 chStatsTimer *time.Ticker // update channel stats periodically
100 compression compression.Compression
103 // MConnConfig is a MConnection configuration.
104 type MConnConfig struct {
105 SendRate int64 `mapstructure:"send_rate"`
106 RecvRate int64 `mapstructure:"recv_rate"`
107 Compression string `mapstructure:"compression_backend"`
110 // DefaultMConnConfig returns the default config.
111 func DefaultMConnConfig(compression string) *MConnConfig {
113 SendRate: defaultSendRate,
114 RecvRate: defaultRecvRate,
115 Compression: compression,
119 // NewMConnectionWithConfig wraps net.Conn and creates multiplex connection with a config
120 func NewMConnectionWithConfig(conn net.Conn, chDescs []*ChannelDescriptor, onReceive receiveCbFunc, onError errorCbFunc, config *MConnConfig) *MConnection {
121 mconn := &MConnection{
123 bufReader: bufio.NewReaderSize(conn, minReadBufferSize),
124 bufWriter: bufio.NewWriterSize(conn, minWriteBufferSize),
125 sendMonitor: flowrate.New(0, 0),
126 recvMonitor: flowrate.New(0, 0),
127 send: make(chan struct{}, 1),
128 pong: make(chan struct{}, 1),
129 channelsIdx: map[byte]*channel{},
130 channels: []*channel{},
131 onReceive: onReceive,
135 pingTimer: time.NewTicker(pingTimeout),
136 chStatsTimer: time.NewTicker(updateState),
137 compression: compression.NewCompression(config.Compression),
140 for _, desc := range chDescs {
141 descCopy := *desc // copy the desc else unsafe access across connections
142 channel := newChannel(mconn, &descCopy)
143 mconn.channelsIdx[channel.id] = channel
144 mconn.channels = append(mconn.channels, channel)
146 mconn.BaseService = *cmn.NewBaseService(nil, "MConnection", mconn)
150 // OnStart implements BaseService
151 func (c *MConnection) OnStart() error {
152 c.BaseService.OnStart()
153 c.quit = make(chan struct{})
154 c.flushTimer = cmn.NewThrottleTimer("flush", flushThrottle)
160 // OnStop implements BaseService
161 func (c *MConnection) OnStop() {
162 c.BaseService.OnStop()
165 c.chStatsTimer.Stop()
170 // We can't close pong safely here because recvRoutine may write to it after we've
171 // stopped. Though it doesn't need to get closed at all, we close it @ recvRoutine.
174 // CanSend returns true if you can send more data onto the chID, false otherwise
175 func (c *MConnection) CanSend(chID byte) bool {
180 channel, ok := c.channelsIdx[chID]
184 return channel.canSend()
187 // Send will queues a message to be sent to channel(blocking).
188 func (c *MConnection) Send(chID byte, msg interface{}) bool {
193 channel, ok := c.channelsIdx[chID]
195 log.WithFields(log.Fields{"module": logModule, "chID": chID}).Error("cannot send bytes due to unknown channel")
199 compressData := c.compression.CompressBytes(wire.BinaryBytes(msg))
201 if !channel.sendBytes(compressData) {
202 log.WithFields(log.Fields{"module": logModule, "chID": chID, "conn": c, "msg": msg}).Error("MConnection send failed")
207 case c.send <- struct{}{}:
213 // TrafficStatus return the in and out traffic status
214 func (c *MConnection) TrafficStatus() (*flowrate.Status, *flowrate.Status) {
215 sentStatus := c.sendMonitor.Status()
216 receivedStatus := c.recvMonitor.Status()
217 return &sentStatus, &receivedStatus
220 // TrySend queues a message to be sent to channel(Nonblocking).
221 func (c *MConnection) TrySend(chID byte, msg interface{}) bool {
226 channel, ok := c.channelsIdx[chID]
228 log.WithFields(log.Fields{"module": logModule, "chID": chID}).Error("cannot send bytes due to unknown channel")
232 compressData := c.compression.CompressBytes(wire.BinaryBytes(msg))
234 ok = channel.trySendBytes(compressData)
237 case c.send <- struct{}{}:
244 func (c *MConnection) String() string {
245 return fmt.Sprintf("MConn{%v}", c.conn.RemoteAddr())
248 func (c *MConnection) flush() {
249 if err := c.bufWriter.Flush(); err != nil {
250 log.WithFields(log.Fields{"module": logModule, "error": err}).Error("MConnection flush failed")
254 // Catch panics, usually caused by remote disconnects.
255 func (c *MConnection) _recover() {
256 if r := recover(); r != nil {
257 stack := debug.Stack()
258 err := cmn.StackError{r, stack}
263 // recvRoutine reads msgPackets and reconstructs the message using the channels' "recving" buffer.
264 // After a whole message has been assembled, it's pushed to onReceive().
265 // Blocks depending on how the connection is throttled.
266 func (c *MConnection) recvRoutine() {
271 // Block until .recvMonitor says we can read.
272 c.recvMonitor.Limit(maxMsgPacketTotalSize, atomic.LoadInt64(&c.config.RecvRate), true)
277 pktType := wire.ReadByte(c.bufReader, &n, &err)
278 c.recvMonitor.Update(int(n))
281 log.WithFields(log.Fields{"module": logModule, "conn": c, "error": err}).Error("Connection failed @ recvRoutine (reading byte)")
288 // Read more depending on packet type.
291 log.WithFields(log.Fields{"module": logModule, "conn": c}).Debug("receive Ping")
293 case c.pong <- struct{}{}:
298 log.WithFields(log.Fields{"module": logModule, "conn": c}).Debug("receive Pong")
301 pkt, n, err := msgPacket{}, int(0), error(nil)
302 wire.ReadBinaryPtr(&pkt, c.bufReader, maxMsgPacketTotalSize, &n, &err)
303 c.recvMonitor.Update(int(n))
306 log.WithFields(log.Fields{"module": logModule, "conn": c, "error": err}).Error("failed on recvRoutine")
312 channel, ok := c.channelsIdx[pkt.ChannelID]
313 if !ok || channel == nil {
314 cmn.PanicQ(cmn.Fmt("Unknown channel %X", pkt.ChannelID))
317 msgBytes, err := channel.recvMsgPacket(pkt)
320 log.WithFields(log.Fields{"module": logModule, "conn": c, "error": err}).Error("failed on recvRoutine")
327 data, err := c.compression.DecompressBytes(msgBytes)
329 log.WithFields(log.Fields{"module": logModule, "conn": c, "error": err}).Error("failed decompress bytes")
332 c.onReceive(pkt.ChannelID, data)
336 cmn.PanicSanity(cmn.Fmt("Unknown message type %X", pktType))
341 // Returns true if messages from channels were exhausted.
342 func (c *MConnection) sendMsgPacket() bool {
343 var leastRatio float32 = math.MaxFloat32
344 var leastChannel *channel
345 for _, channel := range c.channels {
346 if !channel.isSendPending() {
349 if ratio := float32(channel.recentlySent) / float32(channel.priority); ratio < leastRatio {
351 leastChannel = channel
354 if leastChannel == nil {
358 n, err := leastChannel.writeMsgPacketTo(c.bufWriter)
360 log.WithFields(log.Fields{"module": logModule, "error": err}).Error("failed to write msgPacket")
364 c.sendMonitor.Update(int(n))
369 // sendRoutine polls for packets to send from channels.
370 func (c *MConnection) sendRoutine() {
377 case <-c.flushTimer.Ch:
379 case <-c.chStatsTimer.C:
380 for _, channel := range c.channels {
381 channel.updateStats()
383 case <-c.pingTimer.C:
384 log.WithFields(log.Fields{"module": logModule, "conn": c}).Debug("send Ping")
385 wire.WriteByte(packetTypePing, c.bufWriter, &n, &err)
386 c.sendMonitor.Update(int(n))
389 log.WithFields(log.Fields{"module": logModule, "conn": c}).Debug("send Pong")
390 wire.WriteByte(packetTypePong, c.bufWriter, &n, &err)
391 c.sendMonitor.Update(int(n))
396 if eof := c.sendSomeMsgPackets(); !eof {
398 case c.send <- struct{}{}:
408 log.WithFields(log.Fields{"module": logModule, "conn": c, "error": err}).Error("Connection failed @ sendRoutine")
415 // Returns true if messages from channels were exhausted.
416 func (c *MConnection) sendSomeMsgPackets() bool {
417 // Block until .sendMonitor says we can write.
418 // Once we're ready we send more than we asked for,
419 // but amortized it should even out.
420 c.sendMonitor.Limit(maxMsgPacketTotalSize, atomic.LoadInt64(&c.config.SendRate), true)
421 for i := 0; i < numBatchMsgPackets; i++ {
422 if c.sendMsgPacket() {
429 func (c *MConnection) stopForError(r interface{}) {
431 if atomic.CompareAndSwapUint32(&c.errored, 0, 1) && c.onError != nil {