13 log "github.com/sirupsen/logrus"
14 wire "github.com/tendermint/go-wire"
15 cmn "github.com/tendermint/tmlibs/common"
16 flow "github.com/tendermint/tmlibs/flowrate"
20 numBatchMsgPackets = 10
21 minReadBufferSize = 1024
22 minWriteBufferSize = 65536
23 updateState = 2 * time.Second
24 pingTimeout = 40 * time.Second
25 flushThrottle = 100 * time.Millisecond
27 defaultSendQueueCapacity = 1
28 defaultSendRate = int64(512000) // 500KB/s
29 defaultRecvBufferCapacity = 4096
30 defaultRecvMessageCapacity = 22020096 // 21MB
31 defaultRecvRate = int64(512000) // 500KB/s
32 defaultSendTimeout = 10 * time.Second
35 type receiveCbFunc func(chID byte, msgBytes []byte)
36 type errorCbFunc func(interface{})
39 Each peer has one `MConnection` (multiplex connection) instance.
41 __multiplex__ *noun* a system or signal involving simultaneous transmission of
42 several messages along a single channel of communication.
44 Each `MConnection` handles message transmission on multiple abstract communication
45 `Channel`s. Each channel has a globally unique byte id.
46 The byte id and the relative priorities of each `Channel` are configured upon
47 initialization of the connection.
49 There are two methods for sending messages:
50 func (m MConnection) Send(chID byte, msg interface{}) bool {}
51 func (m MConnection) TrySend(chID byte, msg interface{}) bool {}
53 `Send(chID, msg)` is a blocking call that waits until `msg` is successfully queued
54 for the channel with the given id byte `chID`, or until the request times out.
55 The message `msg` is serialized using the `tendermint/wire` submodule's
56 `WriteBinary()` reflection routine.
58 `TrySend(chID, msg)` is a nonblocking call that returns false if the channel's
61 Inbound message bytes are handled with an onReceive callback function.
63 type MConnection struct {
67 bufReader *bufio.Reader
68 bufWriter *bufio.Writer
69 sendMonitor *flow.Monitor
70 recvMonitor *flow.Monitor
74 channelsIdx map[byte]*Channel
75 onReceive receiveCbFunc
81 flushTimer *cmn.ThrottleTimer // flush writes as necessary but throttled.
82 pingTimer *cmn.RepeatTimer // send pings periodically
83 chStatsTimer *cmn.RepeatTimer // update channel stats periodically
85 LocalAddress *NetAddress
86 RemoteAddress *NetAddress
89 // MConnConfig is a MConnection configuration.
90 type MConnConfig struct {
91 SendRate int64 `mapstructure:"send_rate"`
92 RecvRate int64 `mapstructure:"recv_rate"`
95 // DefaultMConnConfig returns the default config.
96 func DefaultMConnConfig() *MConnConfig {
98 SendRate: defaultSendRate,
99 RecvRate: defaultRecvRate,
103 // NewMConnection wraps net.Conn and creates multiplex connection
104 func NewMConnection(conn net.Conn, chDescs []*ChannelDescriptor, onReceive receiveCbFunc, onError errorCbFunc) *MConnection {
105 return NewMConnectionWithConfig(
110 DefaultMConnConfig())
113 // NewMConnectionWithConfig wraps net.Conn and creates multiplex connection with a config
114 func NewMConnectionWithConfig(conn net.Conn, chDescs []*ChannelDescriptor, onReceive receiveCbFunc, onError errorCbFunc, config *MConnConfig) *MConnection {
115 mconn := &MConnection{
117 bufReader: bufio.NewReaderSize(conn, minReadBufferSize),
118 bufWriter: bufio.NewWriterSize(conn, minWriteBufferSize),
119 sendMonitor: flow.New(0, 0),
120 recvMonitor: flow.New(0, 0),
121 send: make(chan struct{}, 1),
122 pong: make(chan struct{}),
123 onReceive: onReceive,
127 LocalAddress: NewNetAddress(conn.LocalAddr()),
128 RemoteAddress: NewNetAddress(conn.RemoteAddr()),
132 var channelsIdx = map[byte]*Channel{}
133 var channels = []*Channel{}
135 for _, desc := range chDescs {
136 descCopy := *desc // copy the desc else unsafe access across connections
137 channel := newChannel(mconn, &descCopy)
138 channelsIdx[channel.id] = channel
139 channels = append(channels, channel)
141 mconn.channels = channels
142 mconn.channelsIdx = channelsIdx
144 mconn.BaseService = *cmn.NewBaseService(nil, "MConnection", mconn)
149 func (c *MConnection) OnStart() error {
150 c.BaseService.OnStart()
151 c.quit = make(chan struct{})
152 c.flushTimer = cmn.NewThrottleTimer("flush", flushThrottle)
153 c.pingTimer = cmn.NewRepeatTimer("ping", pingTimeout)
154 c.chStatsTimer = cmn.NewRepeatTimer("chStats", updateState)
160 func (c *MConnection) OnStop() {
161 c.BaseService.OnStop()
164 c.chStatsTimer.Stop()
169 // We can't close pong safely here because
170 // recvRoutine may write to it after we've stopped.
171 // Though it doesn't need to get closed at all,
172 // we close it @ recvRoutine.
176 func (c *MConnection) String() string {
177 return fmt.Sprintf("MConn{%v}", c.conn.RemoteAddr())
180 func (c *MConnection) flush() {
181 log.WithField("conn", c).Debug("Flush")
182 err := c.bufWriter.Flush()
184 log.WithField("error", err).Error("MConnection flush failed")
188 // Catch panics, usually caused by remote disconnects.
189 func (c *MConnection) _recover() {
190 if r := recover(); r != nil {
191 stack := debug.Stack()
192 err := cmn.StackError{r, stack}
197 func (c *MConnection) stopForError(r interface{}) {
199 if atomic.CompareAndSwapUint32(&c.errored, 0, 1) {
200 if c.onError != nil {
206 // Queues a message to be sent to channel.
207 func (c *MConnection) Send(chID byte, msg interface{}) bool {
212 log.WithFields(log.Fields{
218 // Send message to channel.
219 channel, ok := c.channelsIdx[chID]
221 log.WithField("chID", chID).Error(cmn.Fmt("Cannot send bytes, unknown channel"))
225 success := channel.sendBytes(wire.BinaryBytes(msg))
227 // Wake up sendRoutine if necessary
229 case c.send <- struct{}{}:
233 log.WithFields(log.Fields{
237 }).Error("Send failed")
242 // Queues a message to be sent to channel.
243 // Nonblocking, returns true if successful.
244 func (c *MConnection) TrySend(chID byte, msg interface{}) bool {
249 log.WithFields(log.Fields{
255 // Send message to channel.
256 channel, ok := c.channelsIdx[chID]
258 log.WithField("chID", chID).Error(cmn.Fmt("cannot send bytes, unknown channel"))
262 ok = channel.trySendBytes(wire.BinaryBytes(msg))
264 // Wake up sendRoutine if necessary
266 case c.send <- struct{}{}:
274 // CanSend returns true if you can send more data onto the chID, false
275 // otherwise. Use only as a heuristic.
276 func (c *MConnection) CanSend(chID byte) bool {
281 channel, ok := c.channelsIdx[chID]
283 log.WithField("chID", chID).Error(cmn.Fmt("Unknown channel"))
286 return channel.canSend()
289 // sendRoutine polls for packets to send from channels.
290 func (c *MConnection) sendRoutine() {
298 case <-c.flushTimer.Ch:
299 // NOTE: flushTimer.Set() must be called every time
300 // something is written to .bufWriter.
302 case <-c.chStatsTimer.Ch:
303 for _, channel := range c.channels {
304 channel.updateStats()
306 case <-c.pingTimer.Ch:
307 log.Debug("Send Ping")
308 wire.WriteByte(packetTypePing, c.bufWriter, &n, &err)
309 c.sendMonitor.Update(int(n))
312 log.Debug("Send Pong")
313 wire.WriteByte(packetTypePong, c.bufWriter, &n, &err)
314 c.sendMonitor.Update(int(n))
319 // Send some msgPackets
320 eof := c.sendSomeMsgPackets()
322 // Keep sendRoutine awake.
324 case c.send <- struct{}{}:
334 log.WithFields(log.Fields{
337 }).Error("Connection failed @ sendRoutine")
346 // Returns true if messages from channels were exhausted.
347 // Blocks in accordance to .sendMonitor throttling.
348 func (c *MConnection) sendSomeMsgPackets() bool {
349 // Block until .sendMonitor says we can write.
350 // Once we're ready we send more than we asked for,
351 // but amortized it should even out.
352 c.sendMonitor.Limit(maxMsgPacketTotalSize, atomic.LoadInt64(&c.config.SendRate), true)
354 // Now send some msgPackets.
355 for i := 0; i < numBatchMsgPackets; i++ {
356 if c.sendMsgPacket() {
363 // Returns true if messages from channels were exhausted.
364 func (c *MConnection) sendMsgPacket() bool {
365 // Choose a channel to create a msgPacket from.
366 // The chosen channel will be the one whose recentlySent/priority is the least.
367 var leastRatio float32 = math.MaxFloat32
368 var leastChannel *Channel
369 for _, channel := range c.channels {
370 // If nothing to send, skip this channel
371 if !channel.isSendPending() {
374 // Get ratio, and keep track of lowest ratio.
375 ratio := float32(channel.recentlySent) / float32(channel.priority)
376 if ratio < leastRatio {
378 leastChannel = channel
383 if leastChannel == nil {
386 // c.Logger.Info("Found a msgPacket to send")
389 // Make & send a msgPacket from this channel
390 n, err := leastChannel.writeMsgPacketTo(c.bufWriter)
392 log.WithField("error", err).Error("Failed to write msgPacket")
396 c.sendMonitor.Update(int(n))
401 // recvRoutine reads msgPackets and reconstructs the message using the channels' "recving" buffer.
402 // After a whole message has been assembled, it's pushed to onReceive().
403 // Blocks depending on how the connection is throttled.
404 func (c *MConnection) recvRoutine() {
409 // Block until .recvMonitor says we can read.
410 c.recvMonitor.Limit(maxMsgPacketTotalSize, atomic.LoadInt64(&c.config.RecvRate), true)
413 // Peek into bufReader for debugging
414 if numBytes := c.bufReader.Buffered(); numBytes > 0 {
415 log.Infof("Peek connection buffer numBytes:", numBytes)
416 bytes, err := c.bufReader.Peek(cmn.MinInt(numBytes, 100))
418 log.Infof("bytes:", bytes)
420 log.Warning("Error peeking connection buffer err:", err)
423 log.Warning("Received bytes number is:", numBytes)
430 pktType := wire.ReadByte(c.bufReader, &n, &err)
431 c.recvMonitor.Update(int(n))
434 log.WithFields(log.Fields{
437 }).Error("Connection failed @ recvRoutine (reading byte)")
444 // Read more depending on packet type.
447 // TODO: prevent abuse, as they cause flush()'s.
448 log.Debug("Receive Ping")
452 log.Debug("Receive Pong")
454 pkt, n, err := msgPacket{}, int(0), error(nil)
455 wire.ReadBinaryPtr(&pkt, c.bufReader, maxMsgPacketTotalSize, &n, &err)
456 c.recvMonitor.Update(int(n))
459 log.WithFields(log.Fields{
462 }).Error("Connection failed @ recvRoutine")
467 channel, ok := c.channelsIdx[pkt.ChannelID]
468 if !ok || channel == nil {
469 if pkt.ChannelID == PexChannel {
472 cmn.PanicQ(cmn.Fmt("Unknown channel %X", pkt.ChannelID))
475 msgBytes, err := channel.recvMsgPacket(pkt)
478 log.WithFields(log.Fields{
481 }).Error("Connection failed @ recvRoutine")
487 log.WithFields(log.Fields{
488 "channelID": pkt.ChannelID,
489 "msgBytes": msgBytes,
490 }).Debug("Received bytes")
491 c.onReceive(pkt.ChannelID, msgBytes)
494 cmn.PanicSanity(cmn.Fmt("Unknown message type %X", pktType))
497 // TODO: shouldn't this go in the sendRoutine?
498 // Better to send a ping packet when *we* haven't sent anything for a while.
504 for _ = range c.pong {
509 type ConnectionStatus struct {
510 SendMonitor flow.Status
511 RecvMonitor flow.Status
512 Channels []ChannelStatus
515 type ChannelStatus struct {
517 SendQueueCapacity int
523 func (c *MConnection) Status() ConnectionStatus {
524 var status ConnectionStatus
525 status.SendMonitor = c.sendMonitor.Status()
526 status.RecvMonitor = c.recvMonitor.Status()
527 status.Channels = make([]ChannelStatus, len(c.channels))
528 for i, channel := range c.channels {
529 status.Channels[i] = ChannelStatus{
531 SendQueueCapacity: cap(channel.sendQueue),
532 SendQueueSize: int(channel.sendQueueSize), // TODO use atomic
533 Priority: channel.priority,
534 RecentlySent: channel.recentlySent,
540 //-----------------------------------------------------------------------------
542 type ChannelDescriptor struct {
545 SendQueueCapacity int
546 RecvBufferCapacity int
547 RecvMessageCapacity int
550 func (chDesc *ChannelDescriptor) FillDefaults() {
551 if chDesc.SendQueueCapacity == 0 {
552 chDesc.SendQueueCapacity = defaultSendQueueCapacity
554 if chDesc.RecvBufferCapacity == 0 {
555 chDesc.RecvBufferCapacity = defaultRecvBufferCapacity
557 if chDesc.RecvMessageCapacity == 0 {
558 chDesc.RecvMessageCapacity = defaultRecvMessageCapacity
563 // NOTE: not goroutine-safe.
564 type Channel struct {
566 desc *ChannelDescriptor
568 sendQueue chan []byte
569 sendQueueSize int32 // atomic.
573 recentlySent int64 // exponential moving average
576 func newChannel(conn *MConnection, desc *ChannelDescriptor) *Channel {
578 if desc.Priority <= 0 {
579 cmn.PanicSanity("Channel default priority must be a postive integer")
585 sendQueue: make(chan []byte, desc.SendQueueCapacity),
586 recving: make([]byte, 0, desc.RecvBufferCapacity),
587 priority: desc.Priority,
591 // Queues message to send to this channel.
593 // Times out (and returns false) after defaultSendTimeout
594 func (ch *Channel) sendBytes(bytes []byte) bool {
596 case ch.sendQueue <- bytes:
597 atomic.AddInt32(&ch.sendQueueSize, 1)
599 case <-time.After(defaultSendTimeout):
604 // Queues message to send to this channel.
605 // Nonblocking, returns true if successful.
607 func (ch *Channel) trySendBytes(bytes []byte) bool {
609 case ch.sendQueue <- bytes:
610 atomic.AddInt32(&ch.sendQueueSize, 1)
618 func (ch *Channel) loadSendQueueSize() (size int) {
619 return int(atomic.LoadInt32(&ch.sendQueueSize))
623 // Use only as a heuristic.
624 func (ch *Channel) canSend() bool {
625 return ch.loadSendQueueSize() < defaultSendQueueCapacity
628 // Returns true if any msgPackets are pending to be sent.
629 // Call before calling nextMsgPacket()
631 func (ch *Channel) isSendPending() bool {
632 if len(ch.sending) == 0 {
633 if len(ch.sendQueue) == 0 {
636 ch.sending = <-ch.sendQueue
641 // Creates a new msgPacket to send.
642 // Not goroutine-safe
643 func (ch *Channel) nextMsgPacket() msgPacket {
644 packet := msgPacket{}
645 packet.ChannelID = byte(ch.id)
646 packet.Bytes = ch.sending[:cmn.MinInt(maxMsgPacketPayloadSize, len(ch.sending))]
647 if len(ch.sending) <= maxMsgPacketPayloadSize {
648 packet.EOF = byte(0x01)
650 atomic.AddInt32(&ch.sendQueueSize, -1) // decrement sendQueueSize
652 packet.EOF = byte(0x00)
653 ch.sending = ch.sending[cmn.MinInt(maxMsgPacketPayloadSize, len(ch.sending)):]
658 // Writes next msgPacket to w.
659 // Not goroutine-safe
660 func (ch *Channel) writeMsgPacketTo(w io.Writer) (n int, err error) {
661 packet := ch.nextMsgPacket()
662 wire.WriteByte(packetTypeMsg, w, &n, &err)
663 wire.WriteBinary(packet, w, &n, &err)
665 ch.recentlySent += int64(n)
670 // Handles incoming msgPackets. Returns a msg bytes if msg is complete.
671 // Not goroutine-safe
672 func (ch *Channel) recvMsgPacket(packet msgPacket) ([]byte, error) {
673 if ch.desc.RecvMessageCapacity < len(ch.recving)+len(packet.Bytes) {
674 return nil, wire.ErrBinaryReadOverflow
676 ch.recving = append(ch.recving, packet.Bytes...)
677 if packet.EOF == byte(0x01) {
678 msgBytes := ch.recving
679 // clear the slice without re-allocating.
680 // http://stackoverflow.com/questions/16971741/how-do-you-clear-a-slice-in-go
681 // suggests this could be a memory leak, but we might as well keep the memory for the channel until it closes,
682 // at which point the recving slice stops being used and should be garbage collected
683 ch.recving = ch.recving[:0] // make([]byte, 0, ch.desc.RecvBufferCapacity)
689 // Call this periodically to update stats for throttling purposes.
690 // Not goroutine-safe
691 func (ch *Channel) updateStats() {
692 // Exponential decay of stats.
694 ch.recentlySent = int64(float64(ch.recentlySent) * 0.8)
697 //-----------------------------------------------------------------------------
700 maxMsgPacketPayloadSize = 1024
701 maxMsgPacketOverheadSize = 10 // It's actually lower but good enough
702 maxMsgPacketTotalSize = maxMsgPacketPayloadSize + maxMsgPacketOverheadSize
703 packetTypePing = byte(0x01)
704 packetTypePong = byte(0x02)
705 packetTypeMsg = byte(0x03)
708 // Messages in channels are chopped into smaller msgPackets for multiplexing.
709 type msgPacket struct {
711 EOF byte // 1 means message ends here.
715 func (p msgPacket) String() string {
716 return fmt.Sprintf("MsgPacket{%X:%X T:%X}", p.ChannelID, p.Bytes, p.EOF)