package connection import ( "io" "sync/atomic" "time" wire "github.com/tendermint/go-wire" cmn "github.com/tendermint/tmlibs/common" ) // ChannelDescriptor is the setting of channel type ChannelDescriptor struct { ID byte Priority int SendQueueCapacity int RecvBufferCapacity int RecvMessageCapacity int } // FillDefaults set the channel config if empty func (chDesc *ChannelDescriptor) FillDefaults() { if chDesc.SendQueueCapacity == 0 { chDesc.SendQueueCapacity = defaultSendQueueCapacity } if chDesc.RecvBufferCapacity == 0 { chDesc.RecvBufferCapacity = defaultRecvBufferCapacity } if chDesc.RecvMessageCapacity == 0 { chDesc.RecvMessageCapacity = defaultRecvMessageCapacity } } type channel struct { conn *MConnection desc *ChannelDescriptor id byte sendQueue chan []byte sendQueueSize int32 // atomic. recving []byte sending []byte priority int recentlySent int64 // exponential moving average } func newChannel(conn *MConnection, desc *ChannelDescriptor) *channel { desc.FillDefaults() if desc.Priority <= 0 { cmn.PanicSanity("Channel default priority must be a postive integer") } return &channel{ conn: conn, desc: desc, id: desc.ID, sendQueue: make(chan []byte, desc.SendQueueCapacity), recving: make([]byte, 0, desc.RecvBufferCapacity), priority: desc.Priority, } } // Goroutine-safe // Use only as a heuristic. func (ch *channel) canSend() bool { return ch.loadSendQueueSize() < defaultSendQueueCapacity } // Returns true if any msgPackets are pending to be sent. // Call before calling nextMsgPacket() // Goroutine-safe func (ch *channel) isSendPending() bool { if len(ch.sending) == 0 { if len(ch.sendQueue) == 0 { return false } ch.sending = <-ch.sendQueue } return true } // Goroutine-safe func (ch *channel) loadSendQueueSize() (size int) { return int(atomic.LoadInt32(&ch.sendQueueSize)) } // Creates a new msgPacket to send. // Not goroutine-safe func (ch *channel) nextMsgPacket() msgPacket { packet := msgPacket{ ChannelID: byte(ch.id), Bytes: ch.sending[:cmn.MinInt(maxMsgPacketPayloadSize, len(ch.sending))], } if len(ch.sending) <= maxMsgPacketPayloadSize { packet.EOF = byte(0x01) ch.sending = nil atomic.AddInt32(&ch.sendQueueSize, -1) // decrement sendQueueSize } else { packet.EOF = byte(0x00) ch.sending = ch.sending[cmn.MinInt(maxMsgPacketPayloadSize, len(ch.sending)):] } return packet } // Handles incoming msgPackets. Returns a msg bytes if msg is complete. // Not goroutine-safe func (ch *channel) recvMsgPacket(packet msgPacket) ([]byte, error) { if ch.desc.RecvMessageCapacity < len(ch.recving)+len(packet.Bytes) { return nil, wire.ErrBinaryReadOverflow } ch.recving = append(ch.recving, packet.Bytes...) if packet.EOF == byte(0x01) { msgBytes := ch.recving ch.recving = ch.recving[:0] // make([]byte, 0, ch.desc.RecvBufferCapacity) return msgBytes, nil } return nil, nil } // Queues message to send to this channel. // Goroutine-safe // Times out (and returns false) after defaultSendTimeout func (ch *channel) sendBytes(bytes []byte) bool { select { case ch.sendQueue <- bytes: atomic.AddInt32(&ch.sendQueueSize, 1) return true case <-time.After(defaultSendTimeout): return false } } // Queues message to send to this channel. // Nonblocking, returns true if successful. // Goroutine-safe func (ch *channel) trySendBytes(bytes []byte) bool { select { case ch.sendQueue <- bytes: atomic.AddInt32(&ch.sendQueueSize, 1) return true default: return false } } // Writes next msgPacket to w. // Not goroutine-safe func (ch *channel) writeMsgPacketTo(w io.Writer) (n int, err error) { packet := ch.nextMsgPacket() wire.WriteByte(packetTypeMsg, w, &n, &err) wire.WriteBinary(packet, w, &n, &err) if err == nil { ch.recentlySent += int64(n) } return } // Call this periodically to update stats for throttling purposes. // Not goroutine-safe func (ch *channel) updateStats() { ch.recentlySent = int64(float64(ch.recentlySent) * 0.8) }