OSDN Git Service

Merge pull request #57 from liuchengxu/log
[bytom/bytom.git] / p2p / connection.go
1 package p2p
2
3 import (
4         "bufio"
5         "fmt"
6         "io"
7         "math"
8         "net"
9         "runtime/debug"
10         "sync/atomic"
11         "time"
12
13         wire "github.com/tendermint/go-wire"
14         cmn "github.com/tendermint/tmlibs/common"
15         flow "github.com/tendermint/tmlibs/flowrate"
16 )
17
18 const (
19         numBatchMsgPackets = 10
20         minReadBufferSize  = 1024
21         minWriteBufferSize = 65536
22         updateState        = 2 * time.Second
23         pingTimeout        = 40 * time.Second
24         flushThrottle      = 100 * time.Millisecond
25
26         defaultSendQueueCapacity   = 1
27         defaultSendRate            = int64(512000) // 500KB/s
28         defaultRecvBufferCapacity  = 4096
29         defaultRecvMessageCapacity = 22020096      // 21MB
30         defaultRecvRate            = int64(512000) // 500KB/s
31         defaultSendTimeout         = 10 * time.Second
32 )
33
34 type receiveCbFunc func(chID byte, msgBytes []byte)
35 type errorCbFunc func(interface{})
36
37 /*
38 Each peer has one `MConnection` (multiplex connection) instance.
39
40 __multiplex__ *noun* a system or signal involving simultaneous transmission of
41 several messages along a single channel of communication.
42
43 Each `MConnection` handles message transmission on multiple abstract communication
44 `Channel`s.  Each channel has a globally unique byte id.
45 The byte id and the relative priorities of each `Channel` are configured upon
46 initialization of the connection.
47
48 There are two methods for sending messages:
49         func (m MConnection) Send(chID byte, msg interface{}) bool {}
50         func (m MConnection) TrySend(chID byte, msg interface{}) bool {}
51
52 `Send(chID, msg)` is a blocking call that waits until `msg` is successfully queued
53 for the channel with the given id byte `chID`, or until the request times out.
54 The message `msg` is serialized using the `tendermint/wire` submodule's
55 `WriteBinary()` reflection routine.
56
57 `TrySend(chID, msg)` is a nonblocking call that returns false if the channel's
58 queue is full.
59
60 Inbound message bytes are handled with an onReceive callback function.
61 */
62 type MConnection struct {
63         cmn.BaseService
64
65         conn        net.Conn
66         bufReader   *bufio.Reader
67         bufWriter   *bufio.Writer
68         sendMonitor *flow.Monitor
69         recvMonitor *flow.Monitor
70         send        chan struct{}
71         pong        chan struct{}
72         channels    []*Channel
73         channelsIdx map[byte]*Channel
74         onReceive   receiveCbFunc
75         onError     errorCbFunc
76         errored     uint32
77         config      *MConnConfig
78
79         quit         chan struct{}
80         flushTimer   *cmn.ThrottleTimer // flush writes as necessary but throttled.
81         pingTimer    *cmn.RepeatTimer   // send pings periodically
82         chStatsTimer *cmn.RepeatTimer   // update channel stats periodically
83
84         LocalAddress  *NetAddress
85         RemoteAddress *NetAddress
86 }
87
88 // MConnConfig is a MConnection configuration.
89 type MConnConfig struct {
90         SendRate int64 `mapstructure:"send_rate"`
91         RecvRate int64 `mapstructure:"recv_rate"`
92 }
93
94 // DefaultMConnConfig returns the default config.
95 func DefaultMConnConfig() *MConnConfig {
96         return &MConnConfig{
97                 SendRate: defaultSendRate,
98                 RecvRate: defaultRecvRate,
99         }
100 }
101
102 // NewMConnection wraps net.Conn and creates multiplex connection
103 func NewMConnection(conn net.Conn, chDescs []*ChannelDescriptor, onReceive receiveCbFunc, onError errorCbFunc) *MConnection {
104         return NewMConnectionWithConfig(
105                 conn,
106                 chDescs,
107                 onReceive,
108                 onError,
109                 DefaultMConnConfig())
110 }
111
112 // NewMConnectionWithConfig wraps net.Conn and creates multiplex connection with a config
113 func NewMConnectionWithConfig(conn net.Conn, chDescs []*ChannelDescriptor, onReceive receiveCbFunc, onError errorCbFunc, config *MConnConfig) *MConnection {
114         mconn := &MConnection{
115                 conn:        conn,
116                 bufReader:   bufio.NewReaderSize(conn, minReadBufferSize),
117                 bufWriter:   bufio.NewWriterSize(conn, minWriteBufferSize),
118                 sendMonitor: flow.New(0, 0),
119                 recvMonitor: flow.New(0, 0),
120                 send:        make(chan struct{}, 1),
121                 pong:        make(chan struct{}),
122                 onReceive:   onReceive,
123                 onError:     onError,
124                 config:      config,
125
126                 LocalAddress:  NewNetAddress(conn.LocalAddr()),
127                 RemoteAddress: NewNetAddress(conn.RemoteAddr()),
128         }
129
130         // Create channels
131         var channelsIdx = map[byte]*Channel{}
132         var channels = []*Channel{}
133
134         for _, desc := range chDescs {
135                 descCopy := *desc // copy the desc else unsafe access across connections
136                 channel := newChannel(mconn, &descCopy)
137                 channelsIdx[channel.id] = channel
138                 channels = append(channels, channel)
139         }
140         mconn.channels = channels
141         mconn.channelsIdx = channelsIdx
142
143         mconn.BaseService = *cmn.NewBaseService(nil, "MConnection", mconn)
144
145         return mconn
146 }
147
148 func (c *MConnection) OnStart() error {
149         c.BaseService.OnStart()
150         c.quit = make(chan struct{})
151         c.flushTimer = cmn.NewThrottleTimer("flush", flushThrottle)
152         c.pingTimer = cmn.NewRepeatTimer("ping", pingTimeout)
153         c.chStatsTimer = cmn.NewRepeatTimer("chStats", updateState)
154         go c.sendRoutine()
155         go c.recvRoutine()
156         return nil
157 }
158
159 func (c *MConnection) OnStop() {
160         c.BaseService.OnStop()
161         c.flushTimer.Stop()
162         c.pingTimer.Stop()
163         c.chStatsTimer.Stop()
164         if c.quit != nil {
165                 close(c.quit)
166         }
167         c.conn.Close()
168         // We can't close pong safely here because
169         // recvRoutine may write to it after we've stopped.
170         // Though it doesn't need to get closed at all,
171         // we close it @ recvRoutine.
172         // close(c.pong)
173 }
174
175 func (c *MConnection) String() string {
176         return fmt.Sprintf("MConn{%v}", c.conn.RemoteAddr())
177 }
178
179 func (c *MConnection) flush() {
180         c.Logger.Debug("Flush", "conn", c)
181         err := c.bufWriter.Flush()
182         if err != nil {
183                 c.Logger.Error("MConnection flush failed", "error", err)
184         }
185 }
186
187 // Catch panics, usually caused by remote disconnects.
188 func (c *MConnection) _recover() {
189         if r := recover(); r != nil {
190                 stack := debug.Stack()
191                 err := cmn.StackError{r, stack}
192                 c.stopForError(err)
193         }
194 }
195
196 func (c *MConnection) stopForError(r interface{}) {
197         c.Stop()
198         if atomic.CompareAndSwapUint32(&c.errored, 0, 1) {
199                 if c.onError != nil {
200                         c.onError(r)
201                 }
202         }
203 }
204
205 // Queues a message to be sent to channel.
206 func (c *MConnection) Send(chID byte, msg interface{}) bool {
207         if !c.IsRunning() {
208                 return false
209         }
210
211         c.Logger.Debug("Send", "channel", chID, "conn", c, "msg", msg) //, "bytes", wire.BinaryBytes(msg))
212
213         // Send message to channel.
214         channel, ok := c.channelsIdx[chID]
215         if !ok {
216                 c.Logger.Error(cmn.Fmt("Cannot send bytes, unknown channel %X", chID))
217                 return false
218         }
219
220         success := channel.sendBytes(wire.BinaryBytes(msg))
221         if success {
222                 // Wake up sendRoutine if necessary
223                 select {
224                 case c.send <- struct{}{}:
225                 default:
226                 }
227         } else {
228                 c.Logger.Error("Send failed", "channel", chID, "conn", c, "msg", msg)
229         }
230         return success
231 }
232
233 // Queues a message to be sent to channel.
234 // Nonblocking, returns true if successful.
235 func (c *MConnection) TrySend(chID byte, msg interface{}) bool {
236         if !c.IsRunning() {
237                 return false
238         }
239
240         c.Logger.Debug("TrySend", "channel", chID, "conn", c, "msg", msg)
241
242         // Send message to channel.
243         channel, ok := c.channelsIdx[chID]
244         if !ok {
245                 c.Logger.Error(cmn.Fmt("Cannot send bytes, unknown channel %X", chID))
246                 return false
247         }
248
249         ok = channel.trySendBytes(wire.BinaryBytes(msg))
250         if ok {
251                 // Wake up sendRoutine if necessary
252                 select {
253                 case c.send <- struct{}{}:
254                 default:
255                 }
256         }
257
258         return ok
259 }
260
261 // CanSend returns true if you can send more data onto the chID, false
262 // otherwise. Use only as a heuristic.
263 func (c *MConnection) CanSend(chID byte) bool {
264         if !c.IsRunning() {
265                 return false
266         }
267
268         channel, ok := c.channelsIdx[chID]
269         if !ok {
270                 c.Logger.Error(cmn.Fmt("Unknown channel %X", chID))
271                 return false
272         }
273         return channel.canSend()
274 }
275
276 // sendRoutine polls for packets to send from channels.
277 func (c *MConnection) sendRoutine() {
278         defer c._recover()
279
280 FOR_LOOP:
281         for {
282                 var n int
283                 var err error
284                 select {
285                 case <-c.flushTimer.Ch:
286                         // NOTE: flushTimer.Set() must be called every time
287                         // something is written to .bufWriter.
288                         c.flush()
289                 case <-c.chStatsTimer.Ch:
290                         for _, channel := range c.channels {
291                                 channel.updateStats()
292                         }
293                 case <-c.pingTimer.Ch:
294                         c.Logger.Debug("Send Ping")
295                         wire.WriteByte(packetTypePing, c.bufWriter, &n, &err)
296                         c.sendMonitor.Update(int(n))
297                         c.flush()
298                 case <-c.pong:
299                         c.Logger.Debug("Send Pong")
300                         wire.WriteByte(packetTypePong, c.bufWriter, &n, &err)
301                         c.sendMonitor.Update(int(n))
302                         c.flush()
303                 case <-c.quit:
304                         break FOR_LOOP
305                 case <-c.send:
306                         // Send some msgPackets
307                         eof := c.sendSomeMsgPackets()
308                         if !eof {
309                                 // Keep sendRoutine awake.
310                                 select {
311                                 case c.send <- struct{}{}:
312                                 default:
313                                 }
314                         }
315                 }
316
317                 if !c.IsRunning() {
318                         break FOR_LOOP
319                 }
320                 if err != nil {
321                         c.Logger.Error("Connection failed @ sendRoutine", "conn", c, "error", err)
322                         c.stopForError(err)
323                         break FOR_LOOP
324                 }
325         }
326
327         // Cleanup
328 }
329
330 // Returns true if messages from channels were exhausted.
331 // Blocks in accordance to .sendMonitor throttling.
332 func (c *MConnection) sendSomeMsgPackets() bool {
333         // Block until .sendMonitor says we can write.
334         // Once we're ready we send more than we asked for,
335         // but amortized it should even out.
336         c.sendMonitor.Limit(maxMsgPacketTotalSize, atomic.LoadInt64(&c.config.SendRate), true)
337
338         // Now send some msgPackets.
339         for i := 0; i < numBatchMsgPackets; i++ {
340                 if c.sendMsgPacket() {
341                         return true
342                 }
343         }
344         return false
345 }
346
347 // Returns true if messages from channels were exhausted.
348 func (c *MConnection) sendMsgPacket() bool {
349         // Choose a channel to create a msgPacket from.
350         // The chosen channel will be the one whose recentlySent/priority is the least.
351         var leastRatio float32 = math.MaxFloat32
352         var leastChannel *Channel
353         for _, channel := range c.channels {
354                 // If nothing to send, skip this channel
355                 if !channel.isSendPending() {
356                         continue
357                 }
358                 // Get ratio, and keep track of lowest ratio.
359                 ratio := float32(channel.recentlySent) / float32(channel.priority)
360                 if ratio < leastRatio {
361                         leastRatio = ratio
362                         leastChannel = channel
363                 }
364         }
365
366         // Nothing to send?
367         if leastChannel == nil {
368                 return true
369         } else {
370                 // c.Logger.Info("Found a msgPacket to send")
371         }
372
373         // Make & send a msgPacket from this channel
374         n, err := leastChannel.writeMsgPacketTo(c.bufWriter)
375         if err != nil {
376                 c.Logger.Error("Failed to write msgPacket", "error", err)
377                 c.stopForError(err)
378                 return true
379         }
380         c.sendMonitor.Update(int(n))
381         c.flushTimer.Set()
382         return false
383 }
384
385 // recvRoutine reads msgPackets and reconstructs the message using the channels' "recving" buffer.
386 // After a whole message has been assembled, it's pushed to onReceive().
387 // Blocks depending on how the connection is throttled.
388 func (c *MConnection) recvRoutine() {
389         defer c._recover()
390
391 FOR_LOOP:
392         for {
393                 // Block until .recvMonitor says we can read.
394                 c.recvMonitor.Limit(maxMsgPacketTotalSize, atomic.LoadInt64(&c.config.RecvRate), true)
395
396                 /*
397                         // Peek into bufReader for debugging
398                         if numBytes := c.bufReader.Buffered(); numBytes > 0 {
399                                 log.Info("Peek connection buffer", "numBytes", numBytes, "bytes", log15.Lazy{func() []byte {
400                                         bytes, err := c.bufReader.Peek(MinInt(numBytes, 100))
401                                         if err == nil {
402                                                 return bytes
403                                         } else {
404                                                 log.Warn("Error peeking connection buffer", "error", err)
405                                                 return nil
406                                         }
407                                 }})
408                         }
409                 */
410
411                 // Read packet type
412                 var n int
413                 var err error
414                 pktType := wire.ReadByte(c.bufReader, &n, &err)
415                 c.recvMonitor.Update(int(n))
416                 if err != nil {
417                         if c.IsRunning() {
418                                 c.Logger.Error("Connection failed @ recvRoutine (reading byte)", "conn", c, "error", err)
419                                 c.stopForError(err)
420                         }
421                         break FOR_LOOP
422                 }
423
424                 // Read more depending on packet type.
425                 switch pktType {
426                 case packetTypePing:
427                         // TODO: prevent abuse, as they cause flush()'s.
428                         c.Logger.Debug("Receive Ping")
429                         c.pong <- struct{}{}
430                 case packetTypePong:
431                         // do nothing
432                         c.Logger.Debug("Receive Pong")
433                 case packetTypeMsg:
434                         pkt, n, err := msgPacket{}, int(0), error(nil)
435                         wire.ReadBinaryPtr(&pkt, c.bufReader, maxMsgPacketTotalSize, &n, &err)
436                         c.recvMonitor.Update(int(n))
437                         if err != nil {
438                                 if c.IsRunning() {
439                                         c.Logger.Error("Connection failed @ recvRoutine", "conn", c, "error", err)
440                                         c.stopForError(err)
441                                 }
442                                 break FOR_LOOP
443                         }
444                         channel, ok := c.channelsIdx[pkt.ChannelID]
445                         if !ok || channel == nil {
446                                 cmn.PanicQ(cmn.Fmt("Unknown channel %X", pkt.ChannelID))
447                         }
448                         msgBytes, err := channel.recvMsgPacket(pkt)
449                         if err != nil {
450                                 if c.IsRunning() {
451                                         c.Logger.Error("Connection failed @ recvRoutine", "conn", c, "error", err)
452                                         c.stopForError(err)
453                                 }
454                                 break FOR_LOOP
455                         }
456                         if msgBytes != nil {
457                                 c.Logger.Debug("Received bytes", "chID", pkt.ChannelID, "msgBytes", msgBytes)
458                                 c.onReceive(pkt.ChannelID, msgBytes)
459                         }
460                 default:
461                         cmn.PanicSanity(cmn.Fmt("Unknown message type %X", pktType))
462                 }
463
464                 // TODO: shouldn't this go in the sendRoutine?
465                 // Better to send a ping packet when *we* haven't sent anything for a while.
466                 c.pingTimer.Reset()
467         }
468
469         // Cleanup
470         close(c.pong)
471         for _ = range c.pong {
472                 // Drain
473         }
474 }
475
476 type ConnectionStatus struct {
477         SendMonitor flow.Status
478         RecvMonitor flow.Status
479         Channels    []ChannelStatus
480 }
481
482 type ChannelStatus struct {
483         ID                byte
484         SendQueueCapacity int
485         SendQueueSize     int
486         Priority          int
487         RecentlySent      int64
488 }
489
490 func (c *MConnection) Status() ConnectionStatus {
491         var status ConnectionStatus
492         status.SendMonitor = c.sendMonitor.Status()
493         status.RecvMonitor = c.recvMonitor.Status()
494         status.Channels = make([]ChannelStatus, len(c.channels))
495         for i, channel := range c.channels {
496                 status.Channels[i] = ChannelStatus{
497                         ID:                channel.id,
498                         SendQueueCapacity: cap(channel.sendQueue),
499                         SendQueueSize:     int(channel.sendQueueSize), // TODO use atomic
500                         Priority:          channel.priority,
501                         RecentlySent:      channel.recentlySent,
502                 }
503         }
504         return status
505 }
506
507 //-----------------------------------------------------------------------------
508
509 type ChannelDescriptor struct {
510         ID                  byte
511         Priority            int
512         SendQueueCapacity   int
513         RecvBufferCapacity  int
514         RecvMessageCapacity int
515 }
516
517 func (chDesc *ChannelDescriptor) FillDefaults() {
518         if chDesc.SendQueueCapacity == 0 {
519                 chDesc.SendQueueCapacity = defaultSendQueueCapacity
520         }
521         if chDesc.RecvBufferCapacity == 0 {
522                 chDesc.RecvBufferCapacity = defaultRecvBufferCapacity
523         }
524         if chDesc.RecvMessageCapacity == 0 {
525                 chDesc.RecvMessageCapacity = defaultRecvMessageCapacity
526         }
527 }
528
529 // TODO: lowercase.
530 // NOTE: not goroutine-safe.
531 type Channel struct {
532         conn          *MConnection
533         desc          *ChannelDescriptor
534         id            byte
535         sendQueue     chan []byte
536         sendQueueSize int32 // atomic.
537         recving       []byte
538         sending       []byte
539         priority      int
540         recentlySent  int64 // exponential moving average
541 }
542
543 func newChannel(conn *MConnection, desc *ChannelDescriptor) *Channel {
544         desc.FillDefaults()
545         if desc.Priority <= 0 {
546                 cmn.PanicSanity("Channel default priority must be a postive integer")
547         }
548         return &Channel{
549                 conn:      conn,
550                 desc:      desc,
551                 id:        desc.ID,
552                 sendQueue: make(chan []byte, desc.SendQueueCapacity),
553                 recving:   make([]byte, 0, desc.RecvBufferCapacity),
554                 priority:  desc.Priority,
555         }
556 }
557
558 // Queues message to send to this channel.
559 // Goroutine-safe
560 // Times out (and returns false) after defaultSendTimeout
561 func (ch *Channel) sendBytes(bytes []byte) bool {
562         select {
563         case ch.sendQueue <- bytes:
564                 atomic.AddInt32(&ch.sendQueueSize, 1)
565                 return true
566         case <-time.After(defaultSendTimeout):
567                 return false
568         }
569 }
570
571 // Queues message to send to this channel.
572 // Nonblocking, returns true if successful.
573 // Goroutine-safe
574 func (ch *Channel) trySendBytes(bytes []byte) bool {
575         select {
576         case ch.sendQueue <- bytes:
577                 atomic.AddInt32(&ch.sendQueueSize, 1)
578                 return true
579         default:
580                 return false
581         }
582 }
583
584 // Goroutine-safe
585 func (ch *Channel) loadSendQueueSize() (size int) {
586         return int(atomic.LoadInt32(&ch.sendQueueSize))
587 }
588
589 // Goroutine-safe
590 // Use only as a heuristic.
591 func (ch *Channel) canSend() bool {
592         return ch.loadSendQueueSize() < defaultSendQueueCapacity
593 }
594
595 // Returns true if any msgPackets are pending to be sent.
596 // Call before calling nextMsgPacket()
597 // Goroutine-safe
598 func (ch *Channel) isSendPending() bool {
599         if len(ch.sending) == 0 {
600                 if len(ch.sendQueue) == 0 {
601                         return false
602                 }
603                 ch.sending = <-ch.sendQueue
604         }
605         return true
606 }
607
608 // Creates a new msgPacket to send.
609 // Not goroutine-safe
610 func (ch *Channel) nextMsgPacket() msgPacket {
611         packet := msgPacket{}
612         packet.ChannelID = byte(ch.id)
613         packet.Bytes = ch.sending[:cmn.MinInt(maxMsgPacketPayloadSize, len(ch.sending))]
614         if len(ch.sending) <= maxMsgPacketPayloadSize {
615                 packet.EOF = byte(0x01)
616                 ch.sending = nil
617                 atomic.AddInt32(&ch.sendQueueSize, -1) // decrement sendQueueSize
618         } else {
619                 packet.EOF = byte(0x00)
620                 ch.sending = ch.sending[cmn.MinInt(maxMsgPacketPayloadSize, len(ch.sending)):]
621         }
622         return packet
623 }
624
625 // Writes next msgPacket to w.
626 // Not goroutine-safe
627 func (ch *Channel) writeMsgPacketTo(w io.Writer) (n int, err error) {
628         packet := ch.nextMsgPacket()
629         // log.Debug("Write Msg Packet", "conn", ch.conn, "packet", packet)
630         wire.WriteByte(packetTypeMsg, w, &n, &err)
631         wire.WriteBinary(packet, w, &n, &err)
632         if err == nil {
633                 ch.recentlySent += int64(n)
634         }
635         return
636 }
637
638 // Handles incoming msgPackets. Returns a msg bytes if msg is complete.
639 // Not goroutine-safe
640 func (ch *Channel) recvMsgPacket(packet msgPacket) ([]byte, error) {
641         // log.Debug("Read Msg Packet", "conn", ch.conn, "packet", packet)
642         if ch.desc.RecvMessageCapacity < len(ch.recving)+len(packet.Bytes) {
643                 return nil, wire.ErrBinaryReadOverflow
644         }
645         ch.recving = append(ch.recving, packet.Bytes...)
646         if packet.EOF == byte(0x01) {
647                 msgBytes := ch.recving
648                 // clear the slice without re-allocating.
649                 // http://stackoverflow.com/questions/16971741/how-do-you-clear-a-slice-in-go
650                 //   suggests this could be a memory leak, but we might as well keep the memory for the channel until it closes,
651                 //      at which point the recving slice stops being used and should be garbage collected
652                 ch.recving = ch.recving[:0] // make([]byte, 0, ch.desc.RecvBufferCapacity)
653                 return msgBytes, nil
654         }
655         return nil, nil
656 }
657
658 // Call this periodically to update stats for throttling purposes.
659 // Not goroutine-safe
660 func (ch *Channel) updateStats() {
661         // Exponential decay of stats.
662         // TODO: optimize.
663         ch.recentlySent = int64(float64(ch.recentlySent) * 0.8)
664 }
665
666 //-----------------------------------------------------------------------------
667
668 const (
669         maxMsgPacketPayloadSize  = 1024
670         maxMsgPacketOverheadSize = 10 // It's actually lower but good enough
671         maxMsgPacketTotalSize    = maxMsgPacketPayloadSize + maxMsgPacketOverheadSize
672         packetTypePing           = byte(0x01)
673         packetTypePong           = byte(0x02)
674         packetTypeMsg            = byte(0x03)
675 )
676
677 // Messages in channels are chopped into smaller msgPackets for multiplexing.
678 type msgPacket struct {
679         ChannelID byte
680         EOF       byte // 1 means message ends here.
681         Bytes     []byte
682 }
683
684 func (p msgPacket) String() string {
685         return fmt.Sprintf("MsgPacket{%X:%X T:%X}", p.ChannelID, p.Bytes, p.EOF)
686 }