13 wire "github.com/tendermint/go-wire"
14 cmn "github.com/tendermint/tmlibs/common"
15 flow "github.com/tendermint/tmlibs/flowrate"
19 numBatchMsgPackets = 10
20 minReadBufferSize = 1024
21 minWriteBufferSize = 65536
22 updateState = 2 * time.Second
23 pingTimeout = 40 * time.Second
24 flushThrottle = 100 * time.Millisecond
26 defaultSendQueueCapacity = 1
27 defaultSendRate = int64(512000) // 500KB/s
28 defaultRecvBufferCapacity = 4096
29 defaultRecvMessageCapacity = 22020096 // 21MB
30 defaultRecvRate = int64(512000) // 500KB/s
31 defaultSendTimeout = 10 * time.Second
34 type receiveCbFunc func(chID byte, msgBytes []byte)
35 type errorCbFunc func(interface{})
38 Each peer has one `MConnection` (multiplex connection) instance.
40 __multiplex__ *noun* a system or signal involving simultaneous transmission of
41 several messages along a single channel of communication.
43 Each `MConnection` handles message transmission on multiple abstract communication
44 `Channel`s. Each channel has a globally unique byte id.
45 The byte id and the relative priorities of each `Channel` are configured upon
46 initialization of the connection.
48 There are two methods for sending messages:
49 func (m MConnection) Send(chID byte, msg interface{}) bool {}
50 func (m MConnection) TrySend(chID byte, msg interface{}) bool {}
52 `Send(chID, msg)` is a blocking call that waits until `msg` is successfully queued
53 for the channel with the given id byte `chID`, or until the request times out.
54 The message `msg` is serialized using the `tendermint/wire` submodule's
55 `WriteBinary()` reflection routine.
57 `TrySend(chID, msg)` is a nonblocking call that returns false if the channel's
60 Inbound message bytes are handled with an onReceive callback function.
62 type MConnection struct {
66 bufReader *bufio.Reader
67 bufWriter *bufio.Writer
68 sendMonitor *flow.Monitor
69 recvMonitor *flow.Monitor
73 channelsIdx map[byte]*Channel
74 onReceive receiveCbFunc
80 flushTimer *cmn.ThrottleTimer // flush writes as necessary but throttled.
81 pingTimer *cmn.RepeatTimer // send pings periodically
82 chStatsTimer *cmn.RepeatTimer // update channel stats periodically
84 LocalAddress *NetAddress
85 RemoteAddress *NetAddress
88 // MConnConfig is a MConnection configuration.
89 type MConnConfig struct {
90 SendRate int64 `mapstructure:"send_rate"`
91 RecvRate int64 `mapstructure:"recv_rate"`
94 // DefaultMConnConfig returns the default config.
95 func DefaultMConnConfig() *MConnConfig {
97 SendRate: defaultSendRate,
98 RecvRate: defaultRecvRate,
102 // NewMConnection wraps net.Conn and creates multiplex connection
103 func NewMConnection(conn net.Conn, chDescs []*ChannelDescriptor, onReceive receiveCbFunc, onError errorCbFunc) *MConnection {
104 return NewMConnectionWithConfig(
109 DefaultMConnConfig())
112 // NewMConnectionWithConfig wraps net.Conn and creates multiplex connection with a config
113 func NewMConnectionWithConfig(conn net.Conn, chDescs []*ChannelDescriptor, onReceive receiveCbFunc, onError errorCbFunc, config *MConnConfig) *MConnection {
114 mconn := &MConnection{
116 bufReader: bufio.NewReaderSize(conn, minReadBufferSize),
117 bufWriter: bufio.NewWriterSize(conn, minWriteBufferSize),
118 sendMonitor: flow.New(0, 0),
119 recvMonitor: flow.New(0, 0),
120 send: make(chan struct{}, 1),
121 pong: make(chan struct{}),
122 onReceive: onReceive,
126 LocalAddress: NewNetAddress(conn.LocalAddr()),
127 RemoteAddress: NewNetAddress(conn.RemoteAddr()),
131 var channelsIdx = map[byte]*Channel{}
132 var channels = []*Channel{}
134 for _, desc := range chDescs {
135 descCopy := *desc // copy the desc else unsafe access across connections
136 channel := newChannel(mconn, &descCopy)
137 channelsIdx[channel.id] = channel
138 channels = append(channels, channel)
140 mconn.channels = channels
141 mconn.channelsIdx = channelsIdx
143 mconn.BaseService = *cmn.NewBaseService(nil, "MConnection", mconn)
148 func (c *MConnection) OnStart() error {
149 c.BaseService.OnStart()
150 c.quit = make(chan struct{})
151 c.flushTimer = cmn.NewThrottleTimer("flush", flushThrottle)
152 c.pingTimer = cmn.NewRepeatTimer("ping", pingTimeout)
153 c.chStatsTimer = cmn.NewRepeatTimer("chStats", updateState)
159 func (c *MConnection) OnStop() {
160 c.BaseService.OnStop()
163 c.chStatsTimer.Stop()
168 // We can't close pong safely here because
169 // recvRoutine may write to it after we've stopped.
170 // Though it doesn't need to get closed at all,
171 // we close it @ recvRoutine.
175 func (c *MConnection) String() string {
176 return fmt.Sprintf("MConn{%v}", c.conn.RemoteAddr())
179 func (c *MConnection) flush() {
180 c.Logger.Debug("Flush", "conn", c)
181 err := c.bufWriter.Flush()
183 c.Logger.Error("MConnection flush failed", "error", err)
187 // Catch panics, usually caused by remote disconnects.
188 func (c *MConnection) _recover() {
189 if r := recover(); r != nil {
190 stack := debug.Stack()
191 err := cmn.StackError{r, stack}
196 func (c *MConnection) stopForError(r interface{}) {
198 if atomic.CompareAndSwapUint32(&c.errored, 0, 1) {
199 if c.onError != nil {
205 // Queues a message to be sent to channel.
206 func (c *MConnection) Send(chID byte, msg interface{}) bool {
211 c.Logger.Debug("Send", "channel", chID, "conn", c, "msg", msg) //, "bytes", wire.BinaryBytes(msg))
213 // Send message to channel.
214 channel, ok := c.channelsIdx[chID]
216 c.Logger.Error(cmn.Fmt("Cannot send bytes, unknown channel %X", chID))
220 success := channel.sendBytes(wire.BinaryBytes(msg))
222 // Wake up sendRoutine if necessary
224 case c.send <- struct{}{}:
228 c.Logger.Error("Send failed", "channel", chID, "conn", c, "msg", msg)
233 // Queues a message to be sent to channel.
234 // Nonblocking, returns true if successful.
235 func (c *MConnection) TrySend(chID byte, msg interface{}) bool {
240 c.Logger.Debug("TrySend", "channel", chID, "conn", c, "msg", msg)
242 // Send message to channel.
243 channel, ok := c.channelsIdx[chID]
245 c.Logger.Error(cmn.Fmt("Cannot send bytes, unknown channel %X", chID))
249 ok = channel.trySendBytes(wire.BinaryBytes(msg))
251 // Wake up sendRoutine if necessary
253 case c.send <- struct{}{}:
261 // CanSend returns true if you can send more data onto the chID, false
262 // otherwise. Use only as a heuristic.
263 func (c *MConnection) CanSend(chID byte) bool {
268 channel, ok := c.channelsIdx[chID]
270 c.Logger.Error(cmn.Fmt("Unknown channel %X", chID))
273 return channel.canSend()
276 // sendRoutine polls for packets to send from channels.
277 func (c *MConnection) sendRoutine() {
285 case <-c.flushTimer.Ch:
286 // NOTE: flushTimer.Set() must be called every time
287 // something is written to .bufWriter.
289 case <-c.chStatsTimer.Ch:
290 for _, channel := range c.channels {
291 channel.updateStats()
293 case <-c.pingTimer.Ch:
294 c.Logger.Debug("Send Ping")
295 wire.WriteByte(packetTypePing, c.bufWriter, &n, &err)
296 c.sendMonitor.Update(int(n))
299 c.Logger.Debug("Send Pong")
300 wire.WriteByte(packetTypePong, c.bufWriter, &n, &err)
301 c.sendMonitor.Update(int(n))
306 // Send some msgPackets
307 eof := c.sendSomeMsgPackets()
309 // Keep sendRoutine awake.
311 case c.send <- struct{}{}:
321 c.Logger.Error("Connection failed @ sendRoutine", "conn", c, "error", err)
330 // Returns true if messages from channels were exhausted.
331 // Blocks in accordance to .sendMonitor throttling.
332 func (c *MConnection) sendSomeMsgPackets() bool {
333 // Block until .sendMonitor says we can write.
334 // Once we're ready we send more than we asked for,
335 // but amortized it should even out.
336 c.sendMonitor.Limit(maxMsgPacketTotalSize, atomic.LoadInt64(&c.config.SendRate), true)
338 // Now send some msgPackets.
339 for i := 0; i < numBatchMsgPackets; i++ {
340 if c.sendMsgPacket() {
347 // Returns true if messages from channels were exhausted.
348 func (c *MConnection) sendMsgPacket() bool {
349 // Choose a channel to create a msgPacket from.
350 // The chosen channel will be the one whose recentlySent/priority is the least.
351 var leastRatio float32 = math.MaxFloat32
352 var leastChannel *Channel
353 for _, channel := range c.channels {
354 // If nothing to send, skip this channel
355 if !channel.isSendPending() {
358 // Get ratio, and keep track of lowest ratio.
359 ratio := float32(channel.recentlySent) / float32(channel.priority)
360 if ratio < leastRatio {
362 leastChannel = channel
367 if leastChannel == nil {
370 // c.Logger.Info("Found a msgPacket to send")
373 // Make & send a msgPacket from this channel
374 n, err := leastChannel.writeMsgPacketTo(c.bufWriter)
376 c.Logger.Error("Failed to write msgPacket", "error", err)
380 c.sendMonitor.Update(int(n))
385 // recvRoutine reads msgPackets and reconstructs the message using the channels' "recving" buffer.
386 // After a whole message has been assembled, it's pushed to onReceive().
387 // Blocks depending on how the connection is throttled.
388 func (c *MConnection) recvRoutine() {
393 // Block until .recvMonitor says we can read.
394 c.recvMonitor.Limit(maxMsgPacketTotalSize, atomic.LoadInt64(&c.config.RecvRate), true)
397 // Peek into bufReader for debugging
398 if numBytes := c.bufReader.Buffered(); numBytes > 0 {
399 log.Info("Peek connection buffer", "numBytes", numBytes, "bytes", log15.Lazy{func() []byte {
400 bytes, err := c.bufReader.Peek(MinInt(numBytes, 100))
404 log.Warn("Error peeking connection buffer", "error", err)
414 pktType := wire.ReadByte(c.bufReader, &n, &err)
415 c.recvMonitor.Update(int(n))
418 c.Logger.Error("Connection failed @ recvRoutine (reading byte)", "conn", c, "error", err)
424 // Read more depending on packet type.
427 // TODO: prevent abuse, as they cause flush()'s.
428 c.Logger.Debug("Receive Ping")
432 c.Logger.Debug("Receive Pong")
434 pkt, n, err := msgPacket{}, int(0), error(nil)
435 wire.ReadBinaryPtr(&pkt, c.bufReader, maxMsgPacketTotalSize, &n, &err)
436 c.recvMonitor.Update(int(n))
439 c.Logger.Error("Connection failed @ recvRoutine", "conn", c, "error", err)
444 channel, ok := c.channelsIdx[pkt.ChannelID]
445 if !ok || channel == nil {
446 cmn.PanicQ(cmn.Fmt("Unknown channel %X", pkt.ChannelID))
448 msgBytes, err := channel.recvMsgPacket(pkt)
451 c.Logger.Error("Connection failed @ recvRoutine", "conn", c, "error", err)
457 c.Logger.Debug("Received bytes", "chID", pkt.ChannelID, "msgBytes", msgBytes)
458 c.onReceive(pkt.ChannelID, msgBytes)
461 cmn.PanicSanity(cmn.Fmt("Unknown message type %X", pktType))
464 // TODO: shouldn't this go in the sendRoutine?
465 // Better to send a ping packet when *we* haven't sent anything for a while.
471 for _ = range c.pong {
476 type ConnectionStatus struct {
477 SendMonitor flow.Status
478 RecvMonitor flow.Status
479 Channels []ChannelStatus
482 type ChannelStatus struct {
484 SendQueueCapacity int
490 func (c *MConnection) Status() ConnectionStatus {
491 var status ConnectionStatus
492 status.SendMonitor = c.sendMonitor.Status()
493 status.RecvMonitor = c.recvMonitor.Status()
494 status.Channels = make([]ChannelStatus, len(c.channels))
495 for i, channel := range c.channels {
496 status.Channels[i] = ChannelStatus{
498 SendQueueCapacity: cap(channel.sendQueue),
499 SendQueueSize: int(channel.sendQueueSize), // TODO use atomic
500 Priority: channel.priority,
501 RecentlySent: channel.recentlySent,
507 //-----------------------------------------------------------------------------
509 type ChannelDescriptor struct {
512 SendQueueCapacity int
513 RecvBufferCapacity int
514 RecvMessageCapacity int
517 func (chDesc *ChannelDescriptor) FillDefaults() {
518 if chDesc.SendQueueCapacity == 0 {
519 chDesc.SendQueueCapacity = defaultSendQueueCapacity
521 if chDesc.RecvBufferCapacity == 0 {
522 chDesc.RecvBufferCapacity = defaultRecvBufferCapacity
524 if chDesc.RecvMessageCapacity == 0 {
525 chDesc.RecvMessageCapacity = defaultRecvMessageCapacity
530 // NOTE: not goroutine-safe.
531 type Channel struct {
533 desc *ChannelDescriptor
535 sendQueue chan []byte
536 sendQueueSize int32 // atomic.
540 recentlySent int64 // exponential moving average
543 func newChannel(conn *MConnection, desc *ChannelDescriptor) *Channel {
545 if desc.Priority <= 0 {
546 cmn.PanicSanity("Channel default priority must be a postive integer")
552 sendQueue: make(chan []byte, desc.SendQueueCapacity),
553 recving: make([]byte, 0, desc.RecvBufferCapacity),
554 priority: desc.Priority,
558 // Queues message to send to this channel.
560 // Times out (and returns false) after defaultSendTimeout
561 func (ch *Channel) sendBytes(bytes []byte) bool {
563 case ch.sendQueue <- bytes:
564 atomic.AddInt32(&ch.sendQueueSize, 1)
566 case <-time.After(defaultSendTimeout):
571 // Queues message to send to this channel.
572 // Nonblocking, returns true if successful.
574 func (ch *Channel) trySendBytes(bytes []byte) bool {
576 case ch.sendQueue <- bytes:
577 atomic.AddInt32(&ch.sendQueueSize, 1)
585 func (ch *Channel) loadSendQueueSize() (size int) {
586 return int(atomic.LoadInt32(&ch.sendQueueSize))
590 // Use only as a heuristic.
591 func (ch *Channel) canSend() bool {
592 return ch.loadSendQueueSize() < defaultSendQueueCapacity
595 // Returns true if any msgPackets are pending to be sent.
596 // Call before calling nextMsgPacket()
598 func (ch *Channel) isSendPending() bool {
599 if len(ch.sending) == 0 {
600 if len(ch.sendQueue) == 0 {
603 ch.sending = <-ch.sendQueue
608 // Creates a new msgPacket to send.
609 // Not goroutine-safe
610 func (ch *Channel) nextMsgPacket() msgPacket {
611 packet := msgPacket{}
612 packet.ChannelID = byte(ch.id)
613 packet.Bytes = ch.sending[:cmn.MinInt(maxMsgPacketPayloadSize, len(ch.sending))]
614 if len(ch.sending) <= maxMsgPacketPayloadSize {
615 packet.EOF = byte(0x01)
617 atomic.AddInt32(&ch.sendQueueSize, -1) // decrement sendQueueSize
619 packet.EOF = byte(0x00)
620 ch.sending = ch.sending[cmn.MinInt(maxMsgPacketPayloadSize, len(ch.sending)):]
625 // Writes next msgPacket to w.
626 // Not goroutine-safe
627 func (ch *Channel) writeMsgPacketTo(w io.Writer) (n int, err error) {
628 packet := ch.nextMsgPacket()
629 // log.Debug("Write Msg Packet", "conn", ch.conn, "packet", packet)
630 wire.WriteByte(packetTypeMsg, w, &n, &err)
631 wire.WriteBinary(packet, w, &n, &err)
633 ch.recentlySent += int64(n)
638 // Handles incoming msgPackets. Returns a msg bytes if msg is complete.
639 // Not goroutine-safe
640 func (ch *Channel) recvMsgPacket(packet msgPacket) ([]byte, error) {
641 // log.Debug("Read Msg Packet", "conn", ch.conn, "packet", packet)
642 if ch.desc.RecvMessageCapacity < len(ch.recving)+len(packet.Bytes) {
643 return nil, wire.ErrBinaryReadOverflow
645 ch.recving = append(ch.recving, packet.Bytes...)
646 if packet.EOF == byte(0x01) {
647 msgBytes := ch.recving
648 // clear the slice without re-allocating.
649 // http://stackoverflow.com/questions/16971741/how-do-you-clear-a-slice-in-go
650 // suggests this could be a memory leak, but we might as well keep the memory for the channel until it closes,
651 // at which point the recving slice stops being used and should be garbage collected
652 ch.recving = ch.recving[:0] // make([]byte, 0, ch.desc.RecvBufferCapacity)
658 // Call this periodically to update stats for throttling purposes.
659 // Not goroutine-safe
660 func (ch *Channel) updateStats() {
661 // Exponential decay of stats.
663 ch.recentlySent = int64(float64(ch.recentlySent) * 0.8)
666 //-----------------------------------------------------------------------------
669 maxMsgPacketPayloadSize = 1024
670 maxMsgPacketOverheadSize = 10 // It's actually lower but good enough
671 maxMsgPacketTotalSize = maxMsgPacketPayloadSize + maxMsgPacketOverheadSize
672 packetTypePing = byte(0x01)
673 packetTypePong = byte(0x02)
674 packetTypeMsg = byte(0x03)
677 // Messages in channels are chopped into smaller msgPackets for multiplexing.
678 type msgPacket struct {
680 EOF byte // 1 means message ends here.
684 func (p msgPacket) String() string {
685 return fmt.Sprintf("MsgPacket{%X:%X T:%X}", p.ChannelID, p.Bytes, p.EOF)