8 wire "github.com/tendermint/go-wire"
9 cmn "github.com/tendermint/tmlibs/common"
12 // ChannelDescriptor is the setting of channel
13 type ChannelDescriptor struct {
17 RecvBufferCapacity int
18 RecvMessageCapacity int
21 // FillDefaults set the channel config if empty
22 func (chDesc *ChannelDescriptor) FillDefaults() {
23 if chDesc.SendQueueCapacity == 0 {
24 chDesc.SendQueueCapacity = defaultSendQueueCapacity
26 if chDesc.RecvBufferCapacity == 0 {
27 chDesc.RecvBufferCapacity = defaultRecvBufferCapacity
29 if chDesc.RecvMessageCapacity == 0 {
30 chDesc.RecvMessageCapacity = defaultRecvMessageCapacity
36 desc *ChannelDescriptor
39 sendQueueSize int32 // atomic.
43 recentlySent int64 // exponential moving average
46 func newChannel(conn *MConnection, desc *ChannelDescriptor) *channel {
48 if desc.Priority <= 0 {
49 cmn.PanicSanity("Channel default priority must be a postive integer")
55 sendQueue: make(chan []byte, desc.SendQueueCapacity),
56 recving: make([]byte, 0, desc.RecvBufferCapacity),
57 priority: desc.Priority,
62 // Use only as a heuristic.
63 func (ch *channel) canSend() bool {
64 return ch.loadSendQueueSize() < defaultSendQueueCapacity
67 // Returns true if any msgPackets are pending to be sent.
68 // Call before calling nextMsgPacket()
70 func (ch *channel) isSendPending() bool {
71 if len(ch.sending) == 0 {
72 if len(ch.sendQueue) == 0 {
75 ch.sending = <-ch.sendQueue
81 func (ch *channel) loadSendQueueSize() (size int) {
82 return int(atomic.LoadInt32(&ch.sendQueueSize))
85 // Creates a new msgPacket to send.
87 func (ch *channel) nextMsgPacket() msgPacket {
89 ChannelID: byte(ch.id),
90 Bytes: ch.sending[:cmn.MinInt(maxMsgPacketPayloadSize, len(ch.sending))],
92 if len(ch.sending) <= maxMsgPacketPayloadSize {
93 packet.EOF = byte(0x01)
95 atomic.AddInt32(&ch.sendQueueSize, -1) // decrement sendQueueSize
97 packet.EOF = byte(0x00)
98 ch.sending = ch.sending[cmn.MinInt(maxMsgPacketPayloadSize, len(ch.sending)):]
103 // Handles incoming msgPackets. Returns a msg bytes if msg is complete.
104 // Not goroutine-safe
105 func (ch *channel) recvMsgPacket(packet msgPacket) ([]byte, error) {
106 if ch.desc.RecvMessageCapacity < len(ch.recving)+len(packet.Bytes) {
107 return nil, wire.ErrBinaryReadOverflow
110 ch.recving = append(ch.recving, packet.Bytes...)
111 if packet.EOF == byte(0x01) {
112 msgBytes := ch.recving
113 ch.recving = ch.recving[:0] // make([]byte, 0, ch.desc.RecvBufferCapacity)
119 // Queues message to send to this channel.
121 // Times out (and returns false) after defaultSendTimeout
122 func (ch *channel) sendBytes(bytes []byte) bool {
124 case ch.sendQueue <- bytes:
125 atomic.AddInt32(&ch.sendQueueSize, 1)
127 case <-time.After(defaultSendTimeout):
132 // Queues message to send to this channel.
133 // Nonblocking, returns true if successful.
135 func (ch *channel) trySendBytes(bytes []byte) bool {
137 case ch.sendQueue <- bytes:
138 atomic.AddInt32(&ch.sendQueueSize, 1)
145 // Writes next msgPacket to w.
146 // Not goroutine-safe
147 func (ch *channel) writeMsgPacketTo(w io.Writer) (n int, err error) {
148 packet := ch.nextMsgPacket()
149 wire.WriteByte(packetTypeMsg, w, &n, &err)
150 wire.WriteBinary(packet, w, &n, &err)
152 ch.recentlySent += int64(n)
157 // Call this periodically to update stats for throttling purposes.
158 // Not goroutine-safe
159 func (ch *channel) updateStats() {
160 ch.recentlySent = int64(float64(ch.recentlySent) * 0.8)