OSDN Git Service

new repo
[bytom/vapor.git] / p2p / connection / connection.go
1 package connection
2
3 import (
4         "bufio"
5         "fmt"
6         "math"
7         "net"
8         "runtime/debug"
9         "sync/atomic"
10         "time"
11
12         log "github.com/sirupsen/logrus"
13         wire "github.com/tendermint/go-wire"
14         cmn "github.com/tendermint/tmlibs/common"
15         "github.com/tendermint/tmlibs/flowrate"
16 )
17
18 const (
19         packetTypePing           = byte(0x01)
20         packetTypePong           = byte(0x02)
21         packetTypeMsg            = byte(0x03)
22         maxMsgPacketPayloadSize  = 1024
23         maxMsgPacketOverheadSize = 10 // It's actually lower but good enough
24         maxMsgPacketTotalSize    = maxMsgPacketPayloadSize + maxMsgPacketOverheadSize
25
26         numBatchMsgPackets = 10
27         minReadBufferSize  = 1024
28         minWriteBufferSize = 65536
29         updateState        = 2 * time.Second
30         pingTimeout        = 40 * time.Second
31         flushThrottle      = 100 * time.Millisecond
32
33         defaultSendQueueCapacity   = 1
34         defaultSendRate            = int64(512000) // 500KB/s
35         defaultRecvBufferCapacity  = 4096
36         defaultRecvMessageCapacity = 22020096      // 21MB
37         defaultRecvRate            = int64(512000) // 500KB/s
38         defaultSendTimeout         = 10 * time.Second
39 )
40
41 type receiveCbFunc func(chID byte, msgBytes []byte)
42 type errorCbFunc func(interface{})
43
44 // Messages in channels are chopped into smaller msgPackets for multiplexing.
45 type msgPacket struct {
46         ChannelID byte
47         EOF       byte // 1 means message ends here.
48         Bytes     []byte
49 }
50
51 func (p msgPacket) String() string {
52         return fmt.Sprintf("MsgPacket{%X:%X T:%X}", p.ChannelID, p.Bytes, p.EOF)
53 }
54
55 /*
56 MConnection handles message transmission on multiple abstract communication
57 `Channel`s.  Each channel has a globally unique byte id.
58 The byte id and the relative priorities of each `Channel` are configured upon
59 initialization of the connection.
60
61 There are two methods for sending messages:
62         func (m MConnection) Send(chID byte, msg interface{}) bool {}
63         func (m MConnection) TrySend(chID byte, msg interface{}) bool {}
64
65 `Send(chID, msg)` is a blocking call that waits until `msg` is successfully queued
66 for the channel with the given id byte `chID`, or until the request times out.
67 The message `msg` is serialized using the `tendermint/wire` submodule's
68 `WriteBinary()` reflection routine.
69
70 `TrySend(chID, msg)` is a nonblocking call that returns false if the channel's
71 queue is full.
72
73 Inbound message bytes are handled with an onReceive callback function.
74 */
75 type MConnection struct {
76         cmn.BaseService
77
78         conn        net.Conn
79         bufReader   *bufio.Reader
80         bufWriter   *bufio.Writer
81         sendMonitor *flowrate.Monitor
82         recvMonitor *flowrate.Monitor
83         send        chan struct{}
84         pong        chan struct{}
85         channels    []*channel
86         channelsIdx map[byte]*channel
87         onReceive   receiveCbFunc
88         onError     errorCbFunc
89         errored     uint32
90         config      *MConnConfig
91
92         quit         chan struct{}
93         flushTimer   *cmn.ThrottleTimer // flush writes as necessary but throttled.
94         pingTimer    *time.Ticker       // send pings periodically
95         chStatsTimer *time.Ticker       // update channel stats periodically
96 }
97
98 // MConnConfig is a MConnection configuration.
99 type MConnConfig struct {
100         SendRate int64 `mapstructure:"send_rate"`
101         RecvRate int64 `mapstructure:"recv_rate"`
102 }
103
104 // DefaultMConnConfig returns the default config.
105 func DefaultMConnConfig() *MConnConfig {
106         return &MConnConfig{
107                 SendRate: defaultSendRate,
108                 RecvRate: defaultRecvRate,
109         }
110 }
111
112 // NewMConnectionWithConfig wraps net.Conn and creates multiplex connection with a config
113 func NewMConnectionWithConfig(conn net.Conn, chDescs []*ChannelDescriptor, onReceive receiveCbFunc, onError errorCbFunc, config *MConnConfig) *MConnection {
114         mconn := &MConnection{
115                 conn:        conn,
116                 bufReader:   bufio.NewReaderSize(conn, minReadBufferSize),
117                 bufWriter:   bufio.NewWriterSize(conn, minWriteBufferSize),
118                 sendMonitor: flowrate.New(0, 0),
119                 recvMonitor: flowrate.New(0, 0),
120                 send:        make(chan struct{}, 1),
121                 pong:        make(chan struct{}, 1),
122                 channelsIdx: map[byte]*channel{},
123                 channels:    []*channel{},
124                 onReceive:   onReceive,
125                 onError:     onError,
126                 config:      config,
127
128                 pingTimer:    time.NewTicker(pingTimeout),
129                 chStatsTimer: time.NewTicker(updateState),
130         }
131
132         for _, desc := range chDescs {
133                 descCopy := *desc // copy the desc else unsafe access across connections
134                 channel := newChannel(mconn, &descCopy)
135                 mconn.channelsIdx[channel.id] = channel
136                 mconn.channels = append(mconn.channels, channel)
137         }
138         mconn.BaseService = *cmn.NewBaseService(nil, "MConnection", mconn)
139         return mconn
140 }
141
142 // OnStart implements BaseService
143 func (c *MConnection) OnStart() error {
144         c.BaseService.OnStart()
145         c.quit = make(chan struct{})
146         c.flushTimer = cmn.NewThrottleTimer("flush", flushThrottle)
147         go c.sendRoutine()
148         go c.recvRoutine()
149         return nil
150 }
151
152 // OnStop implements BaseService
153 func (c *MConnection) OnStop() {
154         c.BaseService.OnStop()
155         c.flushTimer.Stop()
156         if c.quit != nil {
157                 close(c.quit)
158         }
159         c.conn.Close()
160         // We can't close pong safely here because recvRoutine may write to it after we've
161         // stopped. Though it doesn't need to get closed at all, we close it @ recvRoutine.
162 }
163
164 // CanSend returns true if you can send more data onto the chID, false otherwise
165 func (c *MConnection) CanSend(chID byte) bool {
166         if !c.IsRunning() {
167                 return false
168         }
169
170         channel, ok := c.channelsIdx[chID]
171         if !ok {
172                 return false
173         }
174         return channel.canSend()
175 }
176
177 // Send will queues a message to be sent to channel(blocking).
178 func (c *MConnection) Send(chID byte, msg interface{}) bool {
179         if !c.IsRunning() {
180                 return false
181         }
182
183         channel, ok := c.channelsIdx[chID]
184         if !ok {
185                 log.WithField("chID", chID).Error("cannot send bytes due to unknown channel")
186                 return false
187         }
188
189         if !channel.sendBytes(wire.BinaryBytes(msg)) {
190                 log.WithFields(log.Fields{"chID": chID, "conn": c, "msg": msg}).Error("MConnection send failed")
191                 return false
192         }
193
194         select {
195         case c.send <- struct{}{}:
196         default:
197         }
198         return true
199 }
200
201 // TrafficStatus return the in and out traffic status
202 func (c *MConnection) TrafficStatus() (*flowrate.Status, *flowrate.Status) {
203         sentStatus := c.sendMonitor.Status()
204         receivedStatus := c.recvMonitor.Status()
205         return &sentStatus, &receivedStatus
206 }
207
208 // TrySend queues a message to be sent to channel(Nonblocking).
209 func (c *MConnection) TrySend(chID byte, msg interface{}) bool {
210         if !c.IsRunning() {
211                 return false
212         }
213
214         channel, ok := c.channelsIdx[chID]
215         if !ok {
216                 log.WithField("chID", chID).Error("cannot send bytes due to unknown channel")
217                 return false
218         }
219
220         ok = channel.trySendBytes(wire.BinaryBytes(msg))
221         if ok {
222                 select {
223                 case c.send <- struct{}{}:
224                 default:
225                 }
226         }
227         return ok
228 }
229
230 func (c *MConnection) String() string {
231         return fmt.Sprintf("MConn{%v}", c.conn.RemoteAddr())
232 }
233
234 func (c *MConnection) flush() {
235         if err := c.bufWriter.Flush(); err != nil {
236                 log.WithField("error", err).Error("MConnection flush failed")
237         }
238 }
239
240 // Catch panics, usually caused by remote disconnects.
241 func (c *MConnection) _recover() {
242         if r := recover(); r != nil {
243                 stack := debug.Stack()
244                 err := cmn.StackError{r, stack}
245                 c.stopForError(err)
246         }
247 }
248
249 // recvRoutine reads msgPackets and reconstructs the message using the channels' "recving" buffer.
250 // After a whole message has been assembled, it's pushed to onReceive().
251 // Blocks depending on how the connection is throttled.
252 func (c *MConnection) recvRoutine() {
253         defer c._recover()
254         defer close(c.pong)
255
256         for {
257                 // Block until .recvMonitor says we can read.
258                 c.recvMonitor.Limit(maxMsgPacketTotalSize, atomic.LoadInt64(&c.config.RecvRate), true)
259
260                 // Read packet type
261                 var n int
262                 var err error
263                 pktType := wire.ReadByte(c.bufReader, &n, &err)
264                 c.recvMonitor.Update(int(n))
265                 if err != nil {
266                         if c.IsRunning() {
267                                 log.WithFields(log.Fields{"conn": c, "error": err}).Error("Connection failed @ recvRoutine (reading byte)")
268                                 c.conn.Close()
269                                 c.stopForError(err)
270                         }
271                         return
272                 }
273
274                 // Read more depending on packet type.
275                 switch pktType {
276                 case packetTypePing:
277                         log.Debug("receive Ping")
278                         select {
279                         case c.pong <- struct{}{}:
280                         default:
281                         }
282
283                 case packetTypePong:
284                         log.Debug("receive Pong")
285
286                 case packetTypeMsg:
287                         pkt, n, err := msgPacket{}, int(0), error(nil)
288                         wire.ReadBinaryPtr(&pkt, c.bufReader, maxMsgPacketTotalSize, &n, &err)
289                         c.recvMonitor.Update(int(n))
290                         if err != nil {
291                                 if c.IsRunning() {
292                                         log.WithFields(log.Fields{"conn": c, "error": err}).Error("failed on recvRoutine")
293                                         c.stopForError(err)
294                                 }
295                                 return
296                         }
297
298                         channel, ok := c.channelsIdx[pkt.ChannelID]
299                         if !ok || channel == nil {
300                                 cmn.PanicQ(cmn.Fmt("Unknown channel %X", pkt.ChannelID))
301                         }
302
303                         msgBytes, err := channel.recvMsgPacket(pkt)
304                         if err != nil {
305                                 if c.IsRunning() {
306                                         log.WithFields(log.Fields{"conn": c, "error": err}).Error("failed on recvRoutine")
307                                         c.stopForError(err)
308                                 }
309                                 return
310                         }
311
312                         if msgBytes != nil {
313                                 c.onReceive(pkt.ChannelID, msgBytes)
314                         }
315
316                 default:
317                         cmn.PanicSanity(cmn.Fmt("Unknown message type %X", pktType))
318                 }
319         }
320 }
321
322 // Returns true if messages from channels were exhausted.
323 func (c *MConnection) sendMsgPacket() bool {
324         var leastRatio float32 = math.MaxFloat32
325         var leastChannel *channel
326         for _, channel := range c.channels {
327                 if !channel.isSendPending() {
328                         continue
329                 }
330                 if ratio := float32(channel.recentlySent) / float32(channel.priority); ratio < leastRatio {
331                         leastRatio = ratio
332                         leastChannel = channel
333                 }
334         }
335         if leastChannel == nil {
336                 return true
337         }
338
339         n, err := leastChannel.writeMsgPacketTo(c.bufWriter)
340         if err != nil {
341                 log.WithField("error", err).Error("failed to write msgPacket")
342                 c.stopForError(err)
343                 return true
344         }
345         c.sendMonitor.Update(int(n))
346         c.flushTimer.Set()
347         return false
348 }
349
350 // sendRoutine polls for packets to send from channels.
351 func (c *MConnection) sendRoutine() {
352         defer c._recover()
353
354         for {
355                 var n int
356                 var err error
357                 select {
358                 case <-c.flushTimer.Ch:
359                         c.flush()
360                 case <-c.chStatsTimer.C:
361                         for _, channel := range c.channels {
362                                 channel.updateStats()
363                         }
364                 case <-c.pingTimer.C:
365                         log.Debug("send Ping")
366                         wire.WriteByte(packetTypePing, c.bufWriter, &n, &err)
367                         c.sendMonitor.Update(int(n))
368                         c.flush()
369                 case <-c.pong:
370                         log.Debug("send Pong")
371                         wire.WriteByte(packetTypePong, c.bufWriter, &n, &err)
372                         c.sendMonitor.Update(int(n))
373                         c.flush()
374                 case <-c.quit:
375                         return
376                 case <-c.send:
377                         if eof := c.sendSomeMsgPackets(); !eof {
378                                 select {
379                                 case c.send <- struct{}{}:
380                                 default:
381                                 }
382                         }
383                 }
384
385                 if !c.IsRunning() {
386                         return
387                 }
388                 if err != nil {
389                         log.WithFields(log.Fields{"conn": c, "error": err}).Error("Connection failed @ sendRoutine")
390                         c.stopForError(err)
391                         return
392                 }
393         }
394 }
395
396 // Returns true if messages from channels were exhausted.
397 func (c *MConnection) sendSomeMsgPackets() bool {
398         // Block until .sendMonitor says we can write.
399         // Once we're ready we send more than we asked for,
400         // but amortized it should even out.
401         c.sendMonitor.Limit(maxMsgPacketTotalSize, atomic.LoadInt64(&c.config.SendRate), true)
402         for i := 0; i < numBatchMsgPackets; i++ {
403                 if c.sendMsgPacket() {
404                         return true
405                 }
406         }
407         return false
408 }
409
410 func (c *MConnection) stopForError(r interface{}) {
411         c.Stop()
412         if atomic.CompareAndSwapUint32(&c.errored, 0, 1) && c.onError != nil {
413                 c.onError(r)
414         }
415 }