OSDN Git Service

try to fix ban peer bug (#273)
[bytom/vapor.git] / p2p / connection / channel.go
1 package connection
2
3 import (
4         "io"
5         "sync/atomic"
6         "time"
7
8         wire "github.com/tendermint/go-wire"
9         cmn "github.com/tendermint/tmlibs/common"
10 )
11
12 // ChannelDescriptor is the setting of channel
13 type ChannelDescriptor struct {
14         ID                  byte
15         Priority            int
16         SendQueueCapacity   int
17         RecvBufferCapacity  int
18         RecvMessageCapacity int
19 }
20
21 // FillDefaults set the channel config if empty
22 func (chDesc *ChannelDescriptor) FillDefaults() {
23         if chDesc.SendQueueCapacity == 0 {
24                 chDesc.SendQueueCapacity = defaultSendQueueCapacity
25         }
26         if chDesc.RecvBufferCapacity == 0 {
27                 chDesc.RecvBufferCapacity = defaultRecvBufferCapacity
28         }
29         if chDesc.RecvMessageCapacity == 0 {
30                 chDesc.RecvMessageCapacity = defaultRecvMessageCapacity
31         }
32 }
33
34 type channel struct {
35         conn          *MConnection
36         desc          *ChannelDescriptor
37         id            byte
38         sendQueue     chan []byte
39         sendQueueSize int32 // atomic.
40         recving       []byte
41         sending       []byte
42         priority      int
43         recentlySent  int64 // exponential moving average
44 }
45
46 func newChannel(conn *MConnection, desc *ChannelDescriptor) *channel {
47         desc.FillDefaults()
48         if desc.Priority <= 0 {
49                 cmn.PanicSanity("Channel default priority must be a postive integer")
50         }
51         return &channel{
52                 conn:      conn,
53                 desc:      desc,
54                 id:        desc.ID,
55                 sendQueue: make(chan []byte, desc.SendQueueCapacity),
56                 recving:   make([]byte, 0, desc.RecvBufferCapacity),
57                 priority:  desc.Priority,
58         }
59 }
60
61 // Goroutine-safe
62 // Use only as a heuristic.
63 func (ch *channel) canSend() bool {
64         return ch.loadSendQueueSize() < defaultSendQueueCapacity
65 }
66
67 // Returns true if any msgPackets are pending to be sent.
68 // Call before calling nextMsgPacket()
69 // Goroutine-safe
70 func (ch *channel) isSendPending() bool {
71         if len(ch.sending) == 0 {
72                 if len(ch.sendQueue) == 0 {
73                         return false
74                 }
75                 ch.sending = <-ch.sendQueue
76         }
77         return true
78 }
79
80 // Goroutine-safe
81 func (ch *channel) loadSendQueueSize() (size int) {
82         return int(atomic.LoadInt32(&ch.sendQueueSize))
83 }
84
85 // Creates a new msgPacket to send.
86 // Not goroutine-safe
87 func (ch *channel) nextMsgPacket() msgPacket {
88         packet := msgPacket{
89                 ChannelID: byte(ch.id),
90                 Bytes:     ch.sending[:cmn.MinInt(maxMsgPacketPayloadSize, len(ch.sending))],
91         }
92         if len(ch.sending) <= maxMsgPacketPayloadSize {
93                 packet.EOF = byte(0x01)
94                 ch.sending = nil
95                 atomic.AddInt32(&ch.sendQueueSize, -1) // decrement sendQueueSize
96         } else {
97                 packet.EOF = byte(0x00)
98                 ch.sending = ch.sending[cmn.MinInt(maxMsgPacketPayloadSize, len(ch.sending)):]
99         }
100         return packet
101 }
102
103 // Handles incoming msgPackets. Returns a msg bytes if msg is complete.
104 // Not goroutine-safe
105 func (ch *channel) recvMsgPacket(packet msgPacket) ([]byte, error) {
106         if ch.desc.RecvMessageCapacity < len(ch.recving)+len(packet.Bytes) {
107                 return nil, wire.ErrBinaryReadOverflow
108         }
109
110         ch.recving = append(ch.recving, packet.Bytes...)
111         if packet.EOF == byte(0x01) {
112                 msgBytes := ch.recving
113                 ch.recving = ch.recving[:0] // make([]byte, 0, ch.desc.RecvBufferCapacity)
114                 return msgBytes, nil
115         }
116         return nil, nil
117 }
118
119 // Queues message to send to this channel.
120 // Goroutine-safe
121 // Times out (and returns false) after defaultSendTimeout
122 func (ch *channel) sendBytes(bytes []byte) bool {
123         select {
124         case ch.sendQueue <- bytes:
125                 atomic.AddInt32(&ch.sendQueueSize, 1)
126                 return true
127         case <-time.After(defaultSendTimeout):
128                 return false
129         }
130 }
131
132 // Queues message to send to this channel.
133 // Nonblocking, returns true if successful.
134 // Goroutine-safe
135 func (ch *channel) trySendBytes(bytes []byte) bool {
136         select {
137         case ch.sendQueue <- bytes:
138                 atomic.AddInt32(&ch.sendQueueSize, 1)
139                 return true
140         default:
141                 return false
142         }
143 }
144
145 // Writes next msgPacket to w.
146 // Not goroutine-safe
147 func (ch *channel) writeMsgPacketTo(w io.Writer) (n int, err error) {
148         packet := ch.nextMsgPacket()
149         wire.WriteByte(packetTypeMsg, w, &n, &err)
150         wire.WriteBinary(packet, w, &n, &err)
151         if err == nil {
152                 ch.recentlySent += int64(n)
153         }
154         return
155 }
156
157 // Call this periodically to update stats for throttling purposes.
158 // Not goroutine-safe
159 func (ch *channel) updateStats() {
160         ch.recentlySent = int64(float64(ch.recentlySent) * 0.8)
161 }