12 log "github.com/sirupsen/logrus"
13 wire "github.com/tendermint/go-wire"
14 cmn "github.com/tendermint/tmlibs/common"
15 "github.com/tendermint/tmlibs/flowrate"
19 packetTypePing = byte(0x01)
20 packetTypePong = byte(0x02)
21 packetTypeMsg = byte(0x03)
22 maxMsgPacketPayloadSize = 1024
23 maxMsgPacketOverheadSize = 10 // It's actually lower but good enough
24 maxMsgPacketTotalSize = maxMsgPacketPayloadSize + maxMsgPacketOverheadSize
26 numBatchMsgPackets = 10
27 minReadBufferSize = 1024
28 minWriteBufferSize = 65536
29 updateState = 2 * time.Second
30 pingTimeout = 40 * time.Second
31 flushThrottle = 100 * time.Millisecond
33 defaultSendQueueCapacity = 1
34 defaultSendRate = int64(512000) // 500KB/s
35 defaultRecvBufferCapacity = 4096
36 defaultRecvMessageCapacity = 22020096 // 21MB
37 defaultRecvRate = int64(512000) // 500KB/s
38 defaultSendTimeout = 10 * time.Second
41 type receiveCbFunc func(chID byte, msgBytes []byte)
42 type errorCbFunc func(interface{})
44 // Messages in channels are chopped into smaller msgPackets for multiplexing.
45 type msgPacket struct {
47 EOF byte // 1 means message ends here.
51 func (p msgPacket) String() string {
52 return fmt.Sprintf("MsgPacket{%X:%X T:%X}", p.ChannelID, p.Bytes, p.EOF)
56 MConnection handles message transmission on multiple abstract communication
57 `Channel`s. Each channel has a globally unique byte id.
58 The byte id and the relative priorities of each `Channel` are configured upon
59 initialization of the connection.
61 There are two methods for sending messages:
62 func (m MConnection) Send(chID byte, msg interface{}) bool {}
63 func (m MConnection) TrySend(chID byte, msg interface{}) bool {}
65 `Send(chID, msg)` is a blocking call that waits until `msg` is successfully queued
66 for the channel with the given id byte `chID`, or until the request times out.
67 The message `msg` is serialized using the `tendermint/wire` submodule's
68 `WriteBinary()` reflection routine.
70 `TrySend(chID, msg)` is a nonblocking call that returns false if the channel's
73 Inbound message bytes are handled with an onReceive callback function.
75 type MConnection struct {
79 bufReader *bufio.Reader
80 bufWriter *bufio.Writer
81 sendMonitor *flowrate.Monitor
82 recvMonitor *flowrate.Monitor
86 channelsIdx map[byte]*channel
87 onReceive receiveCbFunc
93 flushTimer *cmn.ThrottleTimer // flush writes as necessary but throttled.
94 pingTimer *time.Ticker // send pings periodically
95 chStatsTimer *time.Ticker // update channel stats periodically
98 // MConnConfig is a MConnection configuration.
99 type MConnConfig struct {
100 SendRate int64 `mapstructure:"send_rate"`
101 RecvRate int64 `mapstructure:"recv_rate"`
104 // DefaultMConnConfig returns the default config.
105 func DefaultMConnConfig() *MConnConfig {
107 SendRate: defaultSendRate,
108 RecvRate: defaultRecvRate,
112 // NewMConnectionWithConfig wraps net.Conn and creates multiplex connection with a config
113 func NewMConnectionWithConfig(conn net.Conn, chDescs []*ChannelDescriptor, onReceive receiveCbFunc, onError errorCbFunc, config *MConnConfig) *MConnection {
114 mconn := &MConnection{
116 bufReader: bufio.NewReaderSize(conn, minReadBufferSize),
117 bufWriter: bufio.NewWriterSize(conn, minWriteBufferSize),
118 sendMonitor: flowrate.New(0, 0),
119 recvMonitor: flowrate.New(0, 0),
120 send: make(chan struct{}, 1),
121 pong: make(chan struct{}, 1),
122 channelsIdx: map[byte]*channel{},
123 channels: []*channel{},
124 onReceive: onReceive,
128 pingTimer: time.NewTicker(pingTimeout),
129 chStatsTimer: time.NewTicker(updateState),
132 for _, desc := range chDescs {
133 descCopy := *desc // copy the desc else unsafe access across connections
134 channel := newChannel(mconn, &descCopy)
135 mconn.channelsIdx[channel.id] = channel
136 mconn.channels = append(mconn.channels, channel)
138 mconn.BaseService = *cmn.NewBaseService(nil, "MConnection", mconn)
142 // OnStart implements BaseService
143 func (c *MConnection) OnStart() error {
144 c.BaseService.OnStart()
145 c.quit = make(chan struct{})
146 c.flushTimer = cmn.NewThrottleTimer("flush", flushThrottle)
152 // OnStop implements BaseService
153 func (c *MConnection) OnStop() {
154 c.BaseService.OnStop()
160 // We can't close pong safely here because recvRoutine may write to it after we've
161 // stopped. Though it doesn't need to get closed at all, we close it @ recvRoutine.
164 // CanSend returns true if you can send more data onto the chID, false otherwise
165 func (c *MConnection) CanSend(chID byte) bool {
170 channel, ok := c.channelsIdx[chID]
174 return channel.canSend()
177 // Send will queues a message to be sent to channel(blocking).
178 func (c *MConnection) Send(chID byte, msg interface{}) bool {
183 channel, ok := c.channelsIdx[chID]
185 log.WithField("chID", chID).Error("cannot send bytes due to unknown channel")
189 if !channel.sendBytes(wire.BinaryBytes(msg)) {
190 log.WithFields(log.Fields{"chID": chID, "conn": c, "msg": msg}).Error("MConnection send failed")
195 case c.send <- struct{}{}:
201 // TrafficStatus return the in and out traffic status
202 func (c *MConnection) TrafficStatus() (*flowrate.Status, *flowrate.Status) {
203 sentStatus := c.sendMonitor.Status()
204 receivedStatus := c.recvMonitor.Status()
205 return &sentStatus, &receivedStatus
208 // TrySend queues a message to be sent to channel(Nonblocking).
209 func (c *MConnection) TrySend(chID byte, msg interface{}) bool {
214 channel, ok := c.channelsIdx[chID]
216 log.WithField("chID", chID).Error("cannot send bytes due to unknown channel")
220 ok = channel.trySendBytes(wire.BinaryBytes(msg))
223 case c.send <- struct{}{}:
230 func (c *MConnection) String() string {
231 return fmt.Sprintf("MConn{%v}", c.conn.RemoteAddr())
234 func (c *MConnection) flush() {
235 if err := c.bufWriter.Flush(); err != nil {
236 log.WithField("error", err).Error("MConnection flush failed")
240 // Catch panics, usually caused by remote disconnects.
241 func (c *MConnection) _recover() {
242 if r := recover(); r != nil {
243 stack := debug.Stack()
244 err := cmn.StackError{r, stack}
249 // recvRoutine reads msgPackets and reconstructs the message using the channels' "recving" buffer.
250 // After a whole message has been assembled, it's pushed to onReceive().
251 // Blocks depending on how the connection is throttled.
252 func (c *MConnection) recvRoutine() {
257 // Block until .recvMonitor says we can read.
258 c.recvMonitor.Limit(maxMsgPacketTotalSize, atomic.LoadInt64(&c.config.RecvRate), true)
263 pktType := wire.ReadByte(c.bufReader, &n, &err)
264 c.recvMonitor.Update(int(n))
267 log.WithFields(log.Fields{"conn": c, "error": err}).Error("Connection failed @ recvRoutine (reading byte)")
274 // Read more depending on packet type.
277 log.Debug("receive Ping")
279 case c.pong <- struct{}{}:
284 log.Debug("receive Pong")
287 pkt, n, err := msgPacket{}, int(0), error(nil)
288 wire.ReadBinaryPtr(&pkt, c.bufReader, maxMsgPacketTotalSize, &n, &err)
289 c.recvMonitor.Update(int(n))
292 log.WithFields(log.Fields{"conn": c, "error": err}).Error("failed on recvRoutine")
298 channel, ok := c.channelsIdx[pkt.ChannelID]
299 if !ok || channel == nil {
300 cmn.PanicQ(cmn.Fmt("Unknown channel %X", pkt.ChannelID))
303 msgBytes, err := channel.recvMsgPacket(pkt)
306 log.WithFields(log.Fields{"conn": c, "error": err}).Error("failed on recvRoutine")
313 c.onReceive(pkt.ChannelID, msgBytes)
317 cmn.PanicSanity(cmn.Fmt("Unknown message type %X", pktType))
322 // Returns true if messages from channels were exhausted.
323 func (c *MConnection) sendMsgPacket() bool {
324 var leastRatio float32 = math.MaxFloat32
325 var leastChannel *channel
326 for _, channel := range c.channels {
327 if !channel.isSendPending() {
330 if ratio := float32(channel.recentlySent) / float32(channel.priority); ratio < leastRatio {
332 leastChannel = channel
335 if leastChannel == nil {
339 n, err := leastChannel.writeMsgPacketTo(c.bufWriter)
341 log.WithField("error", err).Error("failed to write msgPacket")
345 c.sendMonitor.Update(int(n))
350 // sendRoutine polls for packets to send from channels.
351 func (c *MConnection) sendRoutine() {
358 case <-c.flushTimer.Ch:
360 case <-c.chStatsTimer.C:
361 for _, channel := range c.channels {
362 channel.updateStats()
364 case <-c.pingTimer.C:
365 log.Debug("send Ping")
366 wire.WriteByte(packetTypePing, c.bufWriter, &n, &err)
367 c.sendMonitor.Update(int(n))
370 log.Debug("send Pong")
371 wire.WriteByte(packetTypePong, c.bufWriter, &n, &err)
372 c.sendMonitor.Update(int(n))
377 if eof := c.sendSomeMsgPackets(); !eof {
379 case c.send <- struct{}{}:
389 log.WithFields(log.Fields{"conn": c, "error": err}).Error("Connection failed @ sendRoutine")
396 // Returns true if messages from channels were exhausted.
397 func (c *MConnection) sendSomeMsgPackets() bool {
398 // Block until .sendMonitor says we can write.
399 // Once we're ready we send more than we asked for,
400 // but amortized it should even out.
401 c.sendMonitor.Limit(maxMsgPacketTotalSize, atomic.LoadInt64(&c.config.SendRate), true)
402 for i := 0; i < numBatchMsgPackets; i++ {
403 if c.sendMsgPacket() {
410 func (c *MConnection) stopForError(r interface{}) {
412 if atomic.CompareAndSwapUint32(&c.errored, 0, 1) && c.onError != nil {