OSDN Git Service

Thanos did someting
[bytom/vapor.git] / p2p / connection / channel.go
diff --git a/p2p/connection/channel.go b/p2p/connection/channel.go
deleted file mode 100644 (file)
index 4834e56..0000000
+++ /dev/null
@@ -1,161 +0,0 @@
-package connection
-
-import (
-       "io"
-       "sync/atomic"
-       "time"
-
-       wire "github.com/tendermint/go-wire"
-       cmn "github.com/tendermint/tmlibs/common"
-)
-
-// ChannelDescriptor is the setting of channel
-type ChannelDescriptor struct {
-       ID                  byte
-       Priority            int
-       SendQueueCapacity   int
-       RecvBufferCapacity  int
-       RecvMessageCapacity int
-}
-
-// FillDefaults set the channel config if empty
-func (chDesc *ChannelDescriptor) FillDefaults() {
-       if chDesc.SendQueueCapacity == 0 {
-               chDesc.SendQueueCapacity = defaultSendQueueCapacity
-       }
-       if chDesc.RecvBufferCapacity == 0 {
-               chDesc.RecvBufferCapacity = defaultRecvBufferCapacity
-       }
-       if chDesc.RecvMessageCapacity == 0 {
-               chDesc.RecvMessageCapacity = defaultRecvMessageCapacity
-       }
-}
-
-type channel struct {
-       conn          *MConnection
-       desc          *ChannelDescriptor
-       id            byte
-       sendQueue     chan []byte
-       sendQueueSize int32 // atomic.
-       recving       []byte
-       sending       []byte
-       priority      int
-       recentlySent  int64 // exponential moving average
-}
-
-func newChannel(conn *MConnection, desc *ChannelDescriptor) *channel {
-       desc.FillDefaults()
-       if desc.Priority <= 0 {
-               cmn.PanicSanity("Channel default priority must be a postive integer")
-       }
-       return &channel{
-               conn:      conn,
-               desc:      desc,
-               id:        desc.ID,
-               sendQueue: make(chan []byte, desc.SendQueueCapacity),
-               recving:   make([]byte, 0, desc.RecvBufferCapacity),
-               priority:  desc.Priority,
-       }
-}
-
-// Goroutine-safe
-// Use only as a heuristic.
-func (ch *channel) canSend() bool {
-       return ch.loadSendQueueSize() < defaultSendQueueCapacity
-}
-
-// Returns true if any msgPackets are pending to be sent.
-// Call before calling nextMsgPacket()
-// Goroutine-safe
-func (ch *channel) isSendPending() bool {
-       if len(ch.sending) == 0 {
-               if len(ch.sendQueue) == 0 {
-                       return false
-               }
-               ch.sending = <-ch.sendQueue
-       }
-       return true
-}
-
-// Goroutine-safe
-func (ch *channel) loadSendQueueSize() (size int) {
-       return int(atomic.LoadInt32(&ch.sendQueueSize))
-}
-
-// Creates a new msgPacket to send.
-// Not goroutine-safe
-func (ch *channel) nextMsgPacket() msgPacket {
-       packet := msgPacket{
-               ChannelID: byte(ch.id),
-               Bytes:     ch.sending[:cmn.MinInt(maxMsgPacketPayloadSize, len(ch.sending))],
-       }
-       if len(ch.sending) <= maxMsgPacketPayloadSize {
-               packet.EOF = byte(0x01)
-               ch.sending = nil
-               atomic.AddInt32(&ch.sendQueueSize, -1) // decrement sendQueueSize
-       } else {
-               packet.EOF = byte(0x00)
-               ch.sending = ch.sending[cmn.MinInt(maxMsgPacketPayloadSize, len(ch.sending)):]
-       }
-       return packet
-}
-
-// Handles incoming msgPackets. Returns a msg bytes if msg is complete.
-// Not goroutine-safe
-func (ch *channel) recvMsgPacket(packet msgPacket) ([]byte, error) {
-       if ch.desc.RecvMessageCapacity < len(ch.recving)+len(packet.Bytes) {
-               return nil, wire.ErrBinaryReadOverflow
-       }
-
-       ch.recving = append(ch.recving, packet.Bytes...)
-       if packet.EOF == byte(0x01) {
-               msgBytes := ch.recving
-               ch.recving = ch.recving[:0] // make([]byte, 0, ch.desc.RecvBufferCapacity)
-               return msgBytes, nil
-       }
-       return nil, nil
-}
-
-// Queues message to send to this channel.
-// Goroutine-safe
-// Times out (and returns false) after defaultSendTimeout
-func (ch *channel) sendBytes(bytes []byte) bool {
-       select {
-       case ch.sendQueue <- bytes:
-               atomic.AddInt32(&ch.sendQueueSize, 1)
-               return true
-       case <-time.After(defaultSendTimeout):
-               return false
-       }
-}
-
-// Queues message to send to this channel.
-// Nonblocking, returns true if successful.
-// Goroutine-safe
-func (ch *channel) trySendBytes(bytes []byte) bool {
-       select {
-       case ch.sendQueue <- bytes:
-               atomic.AddInt32(&ch.sendQueueSize, 1)
-               return true
-       default:
-               return false
-       }
-}
-
-// Writes next msgPacket to w.
-// Not goroutine-safe
-func (ch *channel) writeMsgPacketTo(w io.Writer) (n int, err error) {
-       packet := ch.nextMsgPacket()
-       wire.WriteByte(packetTypeMsg, w, &n, &err)
-       wire.WriteBinary(packet, w, &n, &err)
-       if err == nil {
-               ch.recentlySent += int64(n)
-       }
-       return
-}
-
-// Call this periodically to update stats for throttling purposes.
-// Not goroutine-safe
-func (ch *channel) updateStats() {
-       ch.recentlySent = int64(float64(ch.recentlySent) * 0.8)
-}