OSDN Git Service

Merge pull request #935 from Bytom/dev
[bytom/bytom.git] / p2p / connection.go
1 package p2p
2
3 import (
4         "bufio"
5         "fmt"
6         "io"
7         "math"
8         "net"
9         "runtime/debug"
10         "sync/atomic"
11         "time"
12
13         log "github.com/sirupsen/logrus"
14         wire "github.com/tendermint/go-wire"
15         cmn "github.com/tendermint/tmlibs/common"
16         flow "github.com/tendermint/tmlibs/flowrate"
17 )
18
19 const (
20         numBatchMsgPackets = 10
21         minReadBufferSize  = 1024
22         minWriteBufferSize = 65536
23         updateState        = 2 * time.Second
24         pingTimeout        = 40 * time.Second
25         flushThrottle      = 100 * time.Millisecond
26
27         defaultSendQueueCapacity   = 1
28         defaultSendRate            = int64(512000) // 500KB/s
29         defaultRecvBufferCapacity  = 4096
30         defaultRecvMessageCapacity = 22020096      // 21MB
31         defaultRecvRate            = int64(512000) // 500KB/s
32         defaultSendTimeout         = 10 * time.Second
33 )
34
35 type receiveCbFunc func(chID byte, msgBytes []byte)
36 type errorCbFunc func(interface{})
37
38 /*
39 Each peer has one `MConnection` (multiplex connection) instance.
40
41 __multiplex__ *noun* a system or signal involving simultaneous transmission of
42 several messages along a single channel of communication.
43
44 Each `MConnection` handles message transmission on multiple abstract communication
45 `Channel`s.  Each channel has a globally unique byte id.
46 The byte id and the relative priorities of each `Channel` are configured upon
47 initialization of the connection.
48
49 There are two methods for sending messages:
50         func (m MConnection) Send(chID byte, msg interface{}) bool {}
51         func (m MConnection) TrySend(chID byte, msg interface{}) bool {}
52
53 `Send(chID, msg)` is a blocking call that waits until `msg` is successfully queued
54 for the channel with the given id byte `chID`, or until the request times out.
55 The message `msg` is serialized using the `tendermint/wire` submodule's
56 `WriteBinary()` reflection routine.
57
58 `TrySend(chID, msg)` is a nonblocking call that returns false if the channel's
59 queue is full.
60
61 Inbound message bytes are handled with an onReceive callback function.
62 */
63 type MConnection struct {
64         cmn.BaseService
65
66         conn        net.Conn
67         bufReader   *bufio.Reader
68         bufWriter   *bufio.Writer
69         sendMonitor *flow.Monitor
70         recvMonitor *flow.Monitor
71         send        chan struct{}
72         pong        chan struct{}
73         channels    []*Channel
74         channelsIdx map[byte]*Channel
75         onReceive   receiveCbFunc
76         onError     errorCbFunc
77         errored     uint32
78         config      *MConnConfig
79
80         quit         chan struct{}
81         flushTimer   *cmn.ThrottleTimer // flush writes as necessary but throttled.
82         pingTimer    *time.Ticker       // send pings periodically
83         chStatsTimer *time.Ticker       // update channel stats periodically
84
85         LocalAddress  *NetAddress
86         RemoteAddress *NetAddress
87 }
88
89 // MConnConfig is a MConnection configuration.
90 type MConnConfig struct {
91         SendRate int64 `mapstructure:"send_rate"`
92         RecvRate int64 `mapstructure:"recv_rate"`
93 }
94
95 // DefaultMConnConfig returns the default config.
96 func DefaultMConnConfig() *MConnConfig {
97         return &MConnConfig{
98                 SendRate: defaultSendRate,
99                 RecvRate: defaultRecvRate,
100         }
101 }
102
103 // NewMConnection wraps net.Conn and creates multiplex connection
104 func NewMConnection(conn net.Conn, chDescs []*ChannelDescriptor, onReceive receiveCbFunc, onError errorCbFunc) *MConnection {
105         return NewMConnectionWithConfig(
106                 conn,
107                 chDescs,
108                 onReceive,
109                 onError,
110                 DefaultMConnConfig())
111 }
112
113 // NewMConnectionWithConfig wraps net.Conn and creates multiplex connection with a config
114 func NewMConnectionWithConfig(conn net.Conn, chDescs []*ChannelDescriptor, onReceive receiveCbFunc, onError errorCbFunc, config *MConnConfig) *MConnection {
115         mconn := &MConnection{
116                 conn:        conn,
117                 bufReader:   bufio.NewReaderSize(conn, minReadBufferSize),
118                 bufWriter:   bufio.NewWriterSize(conn, minWriteBufferSize),
119                 sendMonitor: flow.New(0, 0),
120                 recvMonitor: flow.New(0, 0),
121                 send:        make(chan struct{}, 1),
122                 pong:        make(chan struct{}),
123                 onReceive:   onReceive,
124                 onError:     onError,
125                 config:      config,
126
127                 pingTimer:    time.NewTicker(pingTimeout),
128                 chStatsTimer: time.NewTicker(updateState),
129
130                 LocalAddress:  NewNetAddress(conn.LocalAddr()),
131                 RemoteAddress: NewNetAddress(conn.RemoteAddr()),
132         }
133
134         // Create channels
135         var channelsIdx = map[byte]*Channel{}
136         var channels = []*Channel{}
137
138         for _, desc := range chDescs {
139                 descCopy := *desc // copy the desc else unsafe access across connections
140                 channel := newChannel(mconn, &descCopy)
141                 channelsIdx[channel.id] = channel
142                 channels = append(channels, channel)
143         }
144         mconn.channels = channels
145         mconn.channelsIdx = channelsIdx
146
147         mconn.BaseService = *cmn.NewBaseService(nil, "MConnection", mconn)
148
149         return mconn
150 }
151
152 func (c *MConnection) OnStart() error {
153         c.BaseService.OnStart()
154         c.quit = make(chan struct{})
155         c.flushTimer = cmn.NewThrottleTimer("flush", flushThrottle)
156         go c.sendRoutine()
157         go c.recvRoutine()
158         return nil
159 }
160
161 func (c *MConnection) OnStop() {
162         c.BaseService.OnStop()
163         c.flushTimer.Stop()
164         if c.quit != nil {
165                 close(c.quit)
166         }
167         c.conn.Close()
168         // We can't close pong safely here because
169         // recvRoutine may write to it after we've stopped.
170         // Though it doesn't need to get closed at all,
171         // we close it @ recvRoutine.
172         // close(c.pong)
173 }
174
175 func (c *MConnection) String() string {
176         return fmt.Sprintf("MConn{%v}", c.conn.RemoteAddr())
177 }
178
179 func (c *MConnection) flush() {
180         log.WithField("conn", c).Debug("Flush")
181         err := c.bufWriter.Flush()
182         if err != nil {
183                 log.WithField("error", err).Error("MConnection flush failed")
184         }
185 }
186
187 // Catch panics, usually caused by remote disconnects.
188 func (c *MConnection) _recover() {
189         if r := recover(); r != nil {
190                 stack := debug.Stack()
191                 err := cmn.StackError{r, stack}
192                 c.stopForError(err)
193         }
194 }
195
196 func (c *MConnection) stopForError(r interface{}) {
197         c.Stop()
198         if atomic.CompareAndSwapUint32(&c.errored, 0, 1) {
199                 if c.onError != nil {
200                         c.onError(r)
201                 }
202         }
203 }
204
205 // Queues a message to be sent to channel.
206 func (c *MConnection) Send(chID byte, msg interface{}) bool {
207         if !c.IsRunning() {
208                 return false
209         }
210
211         log.WithFields(log.Fields{
212                 "chID": chID,
213                 "conn": c,
214                 "msg":  msg,
215         }).Debug("Send")
216
217         // Send message to channel.
218         channel, ok := c.channelsIdx[chID]
219         if !ok {
220                 log.WithField("chID", chID).Error(cmn.Fmt("Cannot send bytes, unknown channel"))
221                 return false
222         }
223
224         success := channel.sendBytes(wire.BinaryBytes(msg))
225         if success {
226                 // Wake up sendRoutine if necessary
227                 select {
228                 case c.send <- struct{}{}:
229                 default:
230                 }
231         } else {
232                 log.WithFields(log.Fields{
233                         "chID": chID,
234                         "conn": c,
235                         "msg":  msg,
236                 }).Error("Send failed")
237         }
238         return success
239 }
240
241 // Queues a message to be sent to channel.
242 // Nonblocking, returns true if successful.
243 func (c *MConnection) TrySend(chID byte, msg interface{}) bool {
244         if !c.IsRunning() {
245                 return false
246         }
247
248         log.WithFields(log.Fields{
249                 "chID": chID,
250                 "conn": c,
251                 "msg":  msg,
252         }).Debug("TrySend")
253
254         // Send message to channel.
255         channel, ok := c.channelsIdx[chID]
256         if !ok {
257                 log.WithField("chID", chID).Error(cmn.Fmt("cannot send bytes, unknown channel"))
258                 return false
259         }
260
261         ok = channel.trySendBytes(wire.BinaryBytes(msg))
262         if ok {
263                 // Wake up sendRoutine if necessary
264                 select {
265                 case c.send <- struct{}{}:
266                 default:
267                 }
268         }
269
270         return ok
271 }
272
273 // CanSend returns true if you can send more data onto the chID, false
274 // otherwise. Use only as a heuristic.
275 func (c *MConnection) CanSend(chID byte) bool {
276         if !c.IsRunning() {
277                 return false
278         }
279
280         channel, ok := c.channelsIdx[chID]
281         if !ok {
282                 log.WithField("chID", chID).Error(cmn.Fmt("Unknown channel"))
283                 return false
284         }
285         return channel.canSend()
286 }
287
288 // sendRoutine polls for packets to send from channels.
289 func (c *MConnection) sendRoutine() {
290         defer c._recover()
291
292 FOR_LOOP:
293         for {
294                 var n int
295                 var err error
296                 select {
297                 case <-c.flushTimer.Ch:
298                         // NOTE: flushTimer.Set() must be called every time
299                         // something is written to .bufWriter.
300                         c.flush()
301                 case <-c.chStatsTimer.C:
302                         for _, channel := range c.channels {
303                                 channel.updateStats()
304                         }
305                 case <-c.pingTimer.C:
306                         log.Debug("Send Ping")
307                         wire.WriteByte(packetTypePing, c.bufWriter, &n, &err)
308                         c.sendMonitor.Update(int(n))
309                         c.flush()
310                 case <-c.pong:
311                         log.Debug("Send Pong")
312                         wire.WriteByte(packetTypePong, c.bufWriter, &n, &err)
313                         c.sendMonitor.Update(int(n))
314                         c.flush()
315                 case <-c.quit:
316                         break FOR_LOOP
317                 case <-c.send:
318                         // Send some msgPackets
319                         eof := c.sendSomeMsgPackets()
320                         if !eof {
321                                 // Keep sendRoutine awake.
322                                 select {
323                                 case c.send <- struct{}{}:
324                                 default:
325                                 }
326                         }
327                 }
328
329                 if !c.IsRunning() {
330                         break FOR_LOOP
331                 }
332                 if err != nil {
333                         log.WithFields(log.Fields{
334                                 "conn":  c,
335                                 "error": err,
336                         }).Error("Connection failed @ sendRoutine")
337                         c.stopForError(err)
338                         break FOR_LOOP
339                 }
340         }
341
342         // Cleanup
343 }
344
345 // Returns true if messages from channels were exhausted.
346 // Blocks in accordance to .sendMonitor throttling.
347 func (c *MConnection) sendSomeMsgPackets() bool {
348         // Block until .sendMonitor says we can write.
349         // Once we're ready we send more than we asked for,
350         // but amortized it should even out.
351         c.sendMonitor.Limit(maxMsgPacketTotalSize, atomic.LoadInt64(&c.config.SendRate), true)
352
353         // Now send some msgPackets.
354         for i := 0; i < numBatchMsgPackets; i++ {
355                 if c.sendMsgPacket() {
356                         return true
357                 }
358         }
359         return false
360 }
361
362 // Returns true if messages from channels were exhausted.
363 func (c *MConnection) sendMsgPacket() bool {
364         // Choose a channel to create a msgPacket from.
365         // The chosen channel will be the one whose recentlySent/priority is the least.
366         var leastRatio float32 = math.MaxFloat32
367         var leastChannel *Channel
368         for _, channel := range c.channels {
369                 // If nothing to send, skip this channel
370                 if !channel.isSendPending() {
371                         continue
372                 }
373                 // Get ratio, and keep track of lowest ratio.
374                 ratio := float32(channel.recentlySent) / float32(channel.priority)
375                 if ratio < leastRatio {
376                         leastRatio = ratio
377                         leastChannel = channel
378                 }
379         }
380
381         // Nothing to send?
382         if leastChannel == nil {
383                 return true
384         } else {
385                 // c.Logger.Info("Found a msgPacket to send")
386         }
387
388         // Make & send a msgPacket from this channel
389         n, err := leastChannel.writeMsgPacketTo(c.bufWriter)
390         if err != nil {
391                 log.WithField("error", err).Error("Failed to write msgPacket")
392                 c.stopForError(err)
393                 return true
394         }
395         c.sendMonitor.Update(int(n))
396         c.flushTimer.Set()
397         return false
398 }
399
400 // recvRoutine reads msgPackets and reconstructs the message using the channels' "recving" buffer.
401 // After a whole message has been assembled, it's pushed to onReceive().
402 // Blocks depending on how the connection is throttled.
403 func (c *MConnection) recvRoutine() {
404         defer c._recover()
405
406 FOR_LOOP:
407         for {
408                 // Block until .recvMonitor says we can read.
409                 c.recvMonitor.Limit(maxMsgPacketTotalSize, atomic.LoadInt64(&c.config.RecvRate), true)
410
411                 /*
412                         // Peek into bufReader for debugging
413                         if numBytes := c.bufReader.Buffered(); numBytes > 0 {
414                                 log.Infof("Peek connection buffer numBytes:", numBytes)
415                                 bytes, err := c.bufReader.Peek(cmn.MinInt(numBytes, 100))
416                                 if err == nil {
417                                         log.Infof("bytes:", bytes)
418                                 } else {
419                                         log.Warning("Error peeking connection buffer err:", err)
420                                 }
421                         } else {
422                                 log.Warning("Received bytes number is:", numBytes)
423                         }
424                 */
425
426                 // Read packet type
427                 var n int
428                 var err error
429                 pktType := wire.ReadByte(c.bufReader, &n, &err)
430                 c.recvMonitor.Update(int(n))
431                 if err != nil {
432                         if c.IsRunning() {
433                                 log.WithFields(log.Fields{
434                                         "conn":  c,
435                                         "error": err,
436                                 }).Error("Connection failed @ recvRoutine (reading byte)")
437                                 c.conn.Close()
438                                 c.stopForError(err)
439                         }
440                         break FOR_LOOP
441                 }
442
443                 // Read more depending on packet type.
444                 switch pktType {
445                 case packetTypePing:
446                         // TODO: prevent abuse, as they cause flush()'s.
447                         log.Debug("Receive Ping")
448                         c.pong <- struct{}{}
449                 case packetTypePong:
450                         // do nothing
451                         log.Debug("Receive Pong")
452                 case packetTypeMsg:
453                         pkt, n, err := msgPacket{}, int(0), error(nil)
454                         wire.ReadBinaryPtr(&pkt, c.bufReader, maxMsgPacketTotalSize, &n, &err)
455                         c.recvMonitor.Update(int(n))
456                         if err != nil {
457                                 if c.IsRunning() {
458                                         log.WithFields(log.Fields{
459                                                 "conn":  c,
460                                                 "error": err,
461                                         }).Error("Connection failed @ recvRoutine")
462                                         c.stopForError(err)
463                                 }
464                                 break FOR_LOOP
465                         }
466                         channel, ok := c.channelsIdx[pkt.ChannelID]
467                         if !ok || channel == nil {
468                                 if pkt.ChannelID == PexChannel {
469                                         continue
470                                 } else {
471                                         cmn.PanicQ(cmn.Fmt("Unknown channel %X", pkt.ChannelID))
472                                 }
473                         }
474                         msgBytes, err := channel.recvMsgPacket(pkt)
475                         if err != nil {
476                                 if c.IsRunning() {
477                                         log.WithFields(log.Fields{
478                                                 "conn":  c,
479                                                 "error": err,
480                                         }).Error("Connection failed @ recvRoutine")
481                                         c.stopForError(err)
482                                 }
483                                 break FOR_LOOP
484                         }
485                         if msgBytes != nil {
486                                 log.WithFields(log.Fields{
487                                         "channelID": pkt.ChannelID,
488                                         "msgBytes":  msgBytes,
489                                 }).Debug("Received bytes")
490                                 c.onReceive(pkt.ChannelID, msgBytes)
491                         }
492                 default:
493                         cmn.PanicSanity(cmn.Fmt("Unknown message type %X", pktType))
494                 }
495         }
496
497         // Cleanup
498         close(c.pong)
499         for _ = range c.pong {
500                 // Drain
501         }
502 }
503
504 type ConnectionStatus struct {
505         SendMonitor flow.Status
506         RecvMonitor flow.Status
507         Channels    []ChannelStatus
508 }
509
510 type ChannelStatus struct {
511         ID                byte
512         SendQueueCapacity int
513         SendQueueSize     int
514         Priority          int
515         RecentlySent      int64
516 }
517
518 func (c *MConnection) Status() ConnectionStatus {
519         var status ConnectionStatus
520         status.SendMonitor = c.sendMonitor.Status()
521         status.RecvMonitor = c.recvMonitor.Status()
522         status.Channels = make([]ChannelStatus, len(c.channels))
523         for i, channel := range c.channels {
524                 status.Channels[i] = ChannelStatus{
525                         ID:                channel.id,
526                         SendQueueCapacity: cap(channel.sendQueue),
527                         SendQueueSize:     int(channel.sendQueueSize), // TODO use atomic
528                         Priority:          channel.priority,
529                         RecentlySent:      channel.recentlySent,
530                 }
531         }
532         return status
533 }
534
535 //-----------------------------------------------------------------------------
536
537 type ChannelDescriptor struct {
538         ID                  byte
539         Priority            int
540         SendQueueCapacity   int
541         RecvBufferCapacity  int
542         RecvMessageCapacity int
543 }
544
545 func (chDesc *ChannelDescriptor) FillDefaults() {
546         if chDesc.SendQueueCapacity == 0 {
547                 chDesc.SendQueueCapacity = defaultSendQueueCapacity
548         }
549         if chDesc.RecvBufferCapacity == 0 {
550                 chDesc.RecvBufferCapacity = defaultRecvBufferCapacity
551         }
552         if chDesc.RecvMessageCapacity == 0 {
553                 chDesc.RecvMessageCapacity = defaultRecvMessageCapacity
554         }
555 }
556
557 // TODO: lowercase.
558 // NOTE: not goroutine-safe.
559 type Channel struct {
560         conn          *MConnection
561         desc          *ChannelDescriptor
562         id            byte
563         sendQueue     chan []byte
564         sendQueueSize int32 // atomic.
565         recving       []byte
566         sending       []byte
567         priority      int
568         recentlySent  int64 // exponential moving average
569 }
570
571 func newChannel(conn *MConnection, desc *ChannelDescriptor) *Channel {
572         desc.FillDefaults()
573         if desc.Priority <= 0 {
574                 cmn.PanicSanity("Channel default priority must be a postive integer")
575         }
576         return &Channel{
577                 conn:      conn,
578                 desc:      desc,
579                 id:        desc.ID,
580                 sendQueue: make(chan []byte, desc.SendQueueCapacity),
581                 recving:   make([]byte, 0, desc.RecvBufferCapacity),
582                 priority:  desc.Priority,
583         }
584 }
585
586 // Queues message to send to this channel.
587 // Goroutine-safe
588 // Times out (and returns false) after defaultSendTimeout
589 func (ch *Channel) sendBytes(bytes []byte) bool {
590         select {
591         case ch.sendQueue <- bytes:
592                 atomic.AddInt32(&ch.sendQueueSize, 1)
593                 return true
594         case <-time.After(defaultSendTimeout):
595                 return false
596         }
597 }
598
599 // Queues message to send to this channel.
600 // Nonblocking, returns true if successful.
601 // Goroutine-safe
602 func (ch *Channel) trySendBytes(bytes []byte) bool {
603         select {
604         case ch.sendQueue <- bytes:
605                 atomic.AddInt32(&ch.sendQueueSize, 1)
606                 return true
607         default:
608                 return false
609         }
610 }
611
612 // Goroutine-safe
613 func (ch *Channel) loadSendQueueSize() (size int) {
614         return int(atomic.LoadInt32(&ch.sendQueueSize))
615 }
616
617 // Goroutine-safe
618 // Use only as a heuristic.
619 func (ch *Channel) canSend() bool {
620         return ch.loadSendQueueSize() < defaultSendQueueCapacity
621 }
622
623 // Returns true if any msgPackets are pending to be sent.
624 // Call before calling nextMsgPacket()
625 // Goroutine-safe
626 func (ch *Channel) isSendPending() bool {
627         if len(ch.sending) == 0 {
628                 if len(ch.sendQueue) == 0 {
629                         return false
630                 }
631                 ch.sending = <-ch.sendQueue
632         }
633         return true
634 }
635
636 // Creates a new msgPacket to send.
637 // Not goroutine-safe
638 func (ch *Channel) nextMsgPacket() msgPacket {
639         packet := msgPacket{}
640         packet.ChannelID = byte(ch.id)
641         packet.Bytes = ch.sending[:cmn.MinInt(maxMsgPacketPayloadSize, len(ch.sending))]
642         if len(ch.sending) <= maxMsgPacketPayloadSize {
643                 packet.EOF = byte(0x01)
644                 ch.sending = nil
645                 atomic.AddInt32(&ch.sendQueueSize, -1) // decrement sendQueueSize
646         } else {
647                 packet.EOF = byte(0x00)
648                 ch.sending = ch.sending[cmn.MinInt(maxMsgPacketPayloadSize, len(ch.sending)):]
649         }
650         return packet
651 }
652
653 // Writes next msgPacket to w.
654 // Not goroutine-safe
655 func (ch *Channel) writeMsgPacketTo(w io.Writer) (n int, err error) {
656         packet := ch.nextMsgPacket()
657         wire.WriteByte(packetTypeMsg, w, &n, &err)
658         wire.WriteBinary(packet, w, &n, &err)
659         if err == nil {
660                 ch.recentlySent += int64(n)
661         }
662         return
663 }
664
665 // Handles incoming msgPackets. Returns a msg bytes if msg is complete.
666 // Not goroutine-safe
667 func (ch *Channel) recvMsgPacket(packet msgPacket) ([]byte, error) {
668         if ch.desc.RecvMessageCapacity < len(ch.recving)+len(packet.Bytes) {
669                 return nil, wire.ErrBinaryReadOverflow
670         }
671         ch.recving = append(ch.recving, packet.Bytes...)
672         if packet.EOF == byte(0x01) {
673                 msgBytes := ch.recving
674                 // clear the slice without re-allocating.
675                 // http://stackoverflow.com/questions/16971741/how-do-you-clear-a-slice-in-go
676                 //   suggests this could be a memory leak, but we might as well keep the memory for the channel until it closes,
677                 //      at which point the recving slice stops being used and should be garbage collected
678                 ch.recving = ch.recving[:0] // make([]byte, 0, ch.desc.RecvBufferCapacity)
679                 return msgBytes, nil
680         }
681         return nil, nil
682 }
683
684 // Call this periodically to update stats for throttling purposes.
685 // Not goroutine-safe
686 func (ch *Channel) updateStats() {
687         // Exponential decay of stats.
688         // TODO: optimize.
689         ch.recentlySent = int64(float64(ch.recentlySent) * 0.8)
690 }
691
692 //-----------------------------------------------------------------------------
693
694 const (
695         maxMsgPacketPayloadSize  = 1024
696         maxMsgPacketOverheadSize = 10 // It's actually lower but good enough
697         maxMsgPacketTotalSize    = maxMsgPacketPayloadSize + maxMsgPacketOverheadSize
698         packetTypePing           = byte(0x01)
699         packetTypePong           = byte(0x02)
700         packetTypeMsg            = byte(0x03)
701 )
702
703 // Messages in channels are chopped into smaller msgPackets for multiplexing.
704 type msgPacket struct {
705         ChannelID byte
706         EOF       byte // 1 means message ends here.
707         Bytes     []byte
708 }
709
710 func (p msgPacket) String() string {
711         return fmt.Sprintf("MsgPacket{%X:%X T:%X}", p.ChannelID, p.Bytes, p.EOF)
712 }