+++ /dev/null
-package connection
-
-import (
- "io"
- "sync/atomic"
- "time"
-
- wire "github.com/tendermint/go-wire"
- cmn "github.com/tendermint/tmlibs/common"
-)
-
-// ChannelDescriptor is the setting of channel
-type ChannelDescriptor struct {
- ID byte
- Priority int
- SendQueueCapacity int
- RecvBufferCapacity int
- RecvMessageCapacity int
-}
-
-// FillDefaults set the channel config if empty
-func (chDesc *ChannelDescriptor) FillDefaults() {
- if chDesc.SendQueueCapacity == 0 {
- chDesc.SendQueueCapacity = defaultSendQueueCapacity
- }
- if chDesc.RecvBufferCapacity == 0 {
- chDesc.RecvBufferCapacity = defaultRecvBufferCapacity
- }
- if chDesc.RecvMessageCapacity == 0 {
- chDesc.RecvMessageCapacity = defaultRecvMessageCapacity
- }
-}
-
-type channel struct {
- conn *MConnection
- desc *ChannelDescriptor
- id byte
- sendQueue chan []byte
- sendQueueSize int32 // atomic.
- recving []byte
- sending []byte
- priority int
- recentlySent int64 // exponential moving average
-}
-
-func newChannel(conn *MConnection, desc *ChannelDescriptor) *channel {
- desc.FillDefaults()
- if desc.Priority <= 0 {
- cmn.PanicSanity("Channel default priority must be a postive integer")
- }
- return &channel{
- conn: conn,
- desc: desc,
- id: desc.ID,
- sendQueue: make(chan []byte, desc.SendQueueCapacity),
- recving: make([]byte, 0, desc.RecvBufferCapacity),
- priority: desc.Priority,
- }
-}
-
-// Goroutine-safe
-// Use only as a heuristic.
-func (ch *channel) canSend() bool {
- return ch.loadSendQueueSize() < defaultSendQueueCapacity
-}
-
-// Returns true if any msgPackets are pending to be sent.
-// Call before calling nextMsgPacket()
-// Goroutine-safe
-func (ch *channel) isSendPending() bool {
- if len(ch.sending) == 0 {
- if len(ch.sendQueue) == 0 {
- return false
- }
- ch.sending = <-ch.sendQueue
- }
- return true
-}
-
-// Goroutine-safe
-func (ch *channel) loadSendQueueSize() (size int) {
- return int(atomic.LoadInt32(&ch.sendQueueSize))
-}
-
-// Creates a new msgPacket to send.
-// Not goroutine-safe
-func (ch *channel) nextMsgPacket() msgPacket {
- packet := msgPacket{
- ChannelID: byte(ch.id),
- Bytes: ch.sending[:cmn.MinInt(maxMsgPacketPayloadSize, len(ch.sending))],
- }
- if len(ch.sending) <= maxMsgPacketPayloadSize {
- packet.EOF = byte(0x01)
- ch.sending = nil
- atomic.AddInt32(&ch.sendQueueSize, -1) // decrement sendQueueSize
- } else {
- packet.EOF = byte(0x00)
- ch.sending = ch.sending[cmn.MinInt(maxMsgPacketPayloadSize, len(ch.sending)):]
- }
- return packet
-}
-
-// Handles incoming msgPackets. Returns a msg bytes if msg is complete.
-// Not goroutine-safe
-func (ch *channel) recvMsgPacket(packet msgPacket) ([]byte, error) {
- if ch.desc.RecvMessageCapacity < len(ch.recving)+len(packet.Bytes) {
- return nil, wire.ErrBinaryReadOverflow
- }
-
- ch.recving = append(ch.recving, packet.Bytes...)
- if packet.EOF == byte(0x01) {
- msgBytes := ch.recving
- ch.recving = ch.recving[:0] // make([]byte, 0, ch.desc.RecvBufferCapacity)
- return msgBytes, nil
- }
- return nil, nil
-}
-
-// Queues message to send to this channel.
-// Goroutine-safe
-// Times out (and returns false) after defaultSendTimeout
-func (ch *channel) sendBytes(bytes []byte) bool {
- select {
- case ch.sendQueue <- bytes:
- atomic.AddInt32(&ch.sendQueueSize, 1)
- return true
- case <-time.After(defaultSendTimeout):
- return false
- }
-}
-
-// Queues message to send to this channel.
-// Nonblocking, returns true if successful.
-// Goroutine-safe
-func (ch *channel) trySendBytes(bytes []byte) bool {
- select {
- case ch.sendQueue <- bytes:
- atomic.AddInt32(&ch.sendQueueSize, 1)
- return true
- default:
- return false
- }
-}
-
-// Writes next msgPacket to w.
-// Not goroutine-safe
-func (ch *channel) writeMsgPacketTo(w io.Writer) (n int, err error) {
- packet := ch.nextMsgPacket()
- wire.WriteByte(packetTypeMsg, w, &n, &err)
- wire.WriteBinary(packet, w, &n, &err)
- if err == nil {
- ch.recentlySent += int64(n)
- }
- return
-}
-
-// Call this periodically to update stats for throttling purposes.
-// Not goroutine-safe
-func (ch *channel) updateStats() {
- ch.recentlySent = int64(float64(ch.recentlySent) * 0.8)
-}