OSDN Git Service

Merge pull request #674 from Bytom/dev_addr_pre_for_net
[bytom/bytom.git] / p2p / connection.go
1 package p2p
2
3 import (
4         "bufio"
5         "fmt"
6         "io"
7         "math"
8         "net"
9         "runtime/debug"
10         "sync/atomic"
11         "time"
12
13         log "github.com/sirupsen/logrus"
14         wire "github.com/tendermint/go-wire"
15         cmn "github.com/tendermint/tmlibs/common"
16         flow "github.com/tendermint/tmlibs/flowrate"
17 )
18
19 const (
20         numBatchMsgPackets = 10
21         minReadBufferSize  = 1024
22         minWriteBufferSize = 65536
23         updateState        = 2 * time.Second
24         pingTimeout        = 40 * time.Second
25         flushThrottle      = 100 * time.Millisecond
26
27         defaultSendQueueCapacity   = 1
28         defaultSendRate            = int64(512000) // 500KB/s
29         defaultRecvBufferCapacity  = 4096
30         defaultRecvMessageCapacity = 22020096      // 21MB
31         defaultRecvRate            = int64(512000) // 500KB/s
32         defaultSendTimeout         = 10 * time.Second
33 )
34
35 type receiveCbFunc func(chID byte, msgBytes []byte)
36 type errorCbFunc func(interface{})
37
38 /*
39 Each peer has one `MConnection` (multiplex connection) instance.
40
41 __multiplex__ *noun* a system or signal involving simultaneous transmission of
42 several messages along a single channel of communication.
43
44 Each `MConnection` handles message transmission on multiple abstract communication
45 `Channel`s.  Each channel has a globally unique byte id.
46 The byte id and the relative priorities of each `Channel` are configured upon
47 initialization of the connection.
48
49 There are two methods for sending messages:
50         func (m MConnection) Send(chID byte, msg interface{}) bool {}
51         func (m MConnection) TrySend(chID byte, msg interface{}) bool {}
52
53 `Send(chID, msg)` is a blocking call that waits until `msg` is successfully queued
54 for the channel with the given id byte `chID`, or until the request times out.
55 The message `msg` is serialized using the `tendermint/wire` submodule's
56 `WriteBinary()` reflection routine.
57
58 `TrySend(chID, msg)` is a nonblocking call that returns false if the channel's
59 queue is full.
60
61 Inbound message bytes are handled with an onReceive callback function.
62 */
63 type MConnection struct {
64         cmn.BaseService
65
66         conn        net.Conn
67         bufReader   *bufio.Reader
68         bufWriter   *bufio.Writer
69         sendMonitor *flow.Monitor
70         recvMonitor *flow.Monitor
71         send        chan struct{}
72         pong        chan struct{}
73         channels    []*Channel
74         channelsIdx map[byte]*Channel
75         onReceive   receiveCbFunc
76         onError     errorCbFunc
77         errored     uint32
78         config      *MConnConfig
79
80         quit         chan struct{}
81         flushTimer   *cmn.ThrottleTimer // flush writes as necessary but throttled.
82         pingTimer    *cmn.RepeatTimer   // send pings periodically
83         chStatsTimer *cmn.RepeatTimer   // update channel stats periodically
84
85         LocalAddress  *NetAddress
86         RemoteAddress *NetAddress
87 }
88
89 // MConnConfig is a MConnection configuration.
90 type MConnConfig struct {
91         SendRate int64 `mapstructure:"send_rate"`
92         RecvRate int64 `mapstructure:"recv_rate"`
93 }
94
95 // DefaultMConnConfig returns the default config.
96 func DefaultMConnConfig() *MConnConfig {
97         return &MConnConfig{
98                 SendRate: defaultSendRate,
99                 RecvRate: defaultRecvRate,
100         }
101 }
102
103 // NewMConnection wraps net.Conn and creates multiplex connection
104 func NewMConnection(conn net.Conn, chDescs []*ChannelDescriptor, onReceive receiveCbFunc, onError errorCbFunc) *MConnection {
105         return NewMConnectionWithConfig(
106                 conn,
107                 chDescs,
108                 onReceive,
109                 onError,
110                 DefaultMConnConfig())
111 }
112
113 // NewMConnectionWithConfig wraps net.Conn and creates multiplex connection with a config
114 func NewMConnectionWithConfig(conn net.Conn, chDescs []*ChannelDescriptor, onReceive receiveCbFunc, onError errorCbFunc, config *MConnConfig) *MConnection {
115         mconn := &MConnection{
116                 conn:        conn,
117                 bufReader:   bufio.NewReaderSize(conn, minReadBufferSize),
118                 bufWriter:   bufio.NewWriterSize(conn, minWriteBufferSize),
119                 sendMonitor: flow.New(0, 0),
120                 recvMonitor: flow.New(0, 0),
121                 send:        make(chan struct{}, 1),
122                 pong:        make(chan struct{}),
123                 onReceive:   onReceive,
124                 onError:     onError,
125                 config:      config,
126
127                 LocalAddress:  NewNetAddress(conn.LocalAddr()),
128                 RemoteAddress: NewNetAddress(conn.RemoteAddr()),
129         }
130
131         // Create channels
132         var channelsIdx = map[byte]*Channel{}
133         var channels = []*Channel{}
134
135         for _, desc := range chDescs {
136                 descCopy := *desc // copy the desc else unsafe access across connections
137                 channel := newChannel(mconn, &descCopy)
138                 channelsIdx[channel.id] = channel
139                 channels = append(channels, channel)
140         }
141         mconn.channels = channels
142         mconn.channelsIdx = channelsIdx
143
144         mconn.BaseService = *cmn.NewBaseService(nil, "MConnection", mconn)
145
146         return mconn
147 }
148
149 func (c *MConnection) OnStart() error {
150         c.BaseService.OnStart()
151         c.quit = make(chan struct{})
152         c.flushTimer = cmn.NewThrottleTimer("flush", flushThrottle)
153         c.pingTimer = cmn.NewRepeatTimer("ping", pingTimeout)
154         c.chStatsTimer = cmn.NewRepeatTimer("chStats", updateState)
155         go c.sendRoutine()
156         go c.recvRoutine()
157         return nil
158 }
159
160 func (c *MConnection) OnStop() {
161         c.BaseService.OnStop()
162         c.flushTimer.Stop()
163         c.pingTimer.Stop()
164         c.chStatsTimer.Stop()
165         if c.quit != nil {
166                 close(c.quit)
167         }
168         c.conn.Close()
169         // We can't close pong safely here because
170         // recvRoutine may write to it after we've stopped.
171         // Though it doesn't need to get closed at all,
172         // we close it @ recvRoutine.
173         // close(c.pong)
174 }
175
176 func (c *MConnection) String() string {
177         return fmt.Sprintf("MConn{%v}", c.conn.RemoteAddr())
178 }
179
180 func (c *MConnection) flush() {
181         log.WithField("conn", c).Debug("Flush")
182         err := c.bufWriter.Flush()
183         if err != nil {
184                 log.WithField("error", err).Error("MConnection flush failed")
185         }
186 }
187
188 // Catch panics, usually caused by remote disconnects.
189 func (c *MConnection) _recover() {
190         if r := recover(); r != nil {
191                 stack := debug.Stack()
192                 err := cmn.StackError{r, stack}
193                 c.stopForError(err)
194         }
195 }
196
197 func (c *MConnection) stopForError(r interface{}) {
198         c.Stop()
199         if atomic.CompareAndSwapUint32(&c.errored, 0, 1) {
200                 if c.onError != nil {
201                         c.onError(r)
202                 }
203         }
204 }
205
206 // Queues a message to be sent to channel.
207 func (c *MConnection) Send(chID byte, msg interface{}) bool {
208         if !c.IsRunning() {
209                 return false
210         }
211
212         log.WithFields(log.Fields{
213                 "chID": chID,
214                 "conn": c,
215                 "msg":  msg,
216         }).Debug("Send")
217
218         // Send message to channel.
219         channel, ok := c.channelsIdx[chID]
220         if !ok {
221                 log.WithField("chID", chID).Error(cmn.Fmt("Cannot send bytes, unknown channel"))
222                 return false
223         }
224
225         success := channel.sendBytes(wire.BinaryBytes(msg))
226         if success {
227                 // Wake up sendRoutine if necessary
228                 select {
229                 case c.send <- struct{}{}:
230                 default:
231                 }
232         } else {
233                 log.WithFields(log.Fields{
234                         "chID": chID,
235                         "conn": c,
236                         "msg":  msg,
237                 }).Error("Send failed")
238         }
239         return success
240 }
241
242 // Queues a message to be sent to channel.
243 // Nonblocking, returns true if successful.
244 func (c *MConnection) TrySend(chID byte, msg interface{}) bool {
245         if !c.IsRunning() {
246                 return false
247         }
248
249         log.WithFields(log.Fields{
250                 "chID": chID,
251                 "conn": c,
252                 "msg":  msg,
253         }).Debug("TrySend")
254
255         // Send message to channel.
256         channel, ok := c.channelsIdx[chID]
257         if !ok {
258                 log.WithField("chID", chID).Error(cmn.Fmt("cannot send bytes, unknown channel"))
259                 return false
260         }
261
262         ok = channel.trySendBytes(wire.BinaryBytes(msg))
263         if ok {
264                 // Wake up sendRoutine if necessary
265                 select {
266                 case c.send <- struct{}{}:
267                 default:
268                 }
269         }
270
271         return ok
272 }
273
274 // CanSend returns true if you can send more data onto the chID, false
275 // otherwise. Use only as a heuristic.
276 func (c *MConnection) CanSend(chID byte) bool {
277         if !c.IsRunning() {
278                 return false
279         }
280
281         channel, ok := c.channelsIdx[chID]
282         if !ok {
283                 log.WithField("chID", chID).Error(cmn.Fmt("Unknown channel"))
284                 return false
285         }
286         return channel.canSend()
287 }
288
289 // sendRoutine polls for packets to send from channels.
290 func (c *MConnection) sendRoutine() {
291         defer c._recover()
292
293 FOR_LOOP:
294         for {
295                 var n int
296                 var err error
297                 select {
298                 case <-c.flushTimer.Ch:
299                         // NOTE: flushTimer.Set() must be called every time
300                         // something is written to .bufWriter.
301                         c.flush()
302                 case <-c.chStatsTimer.Ch:
303                         for _, channel := range c.channels {
304                                 channel.updateStats()
305                         }
306                 case <-c.pingTimer.Ch:
307                         log.Debug("Send Ping")
308                         wire.WriteByte(packetTypePing, c.bufWriter, &n, &err)
309                         c.sendMonitor.Update(int(n))
310                         c.flush()
311                 case <-c.pong:
312                         log.Debug("Send Pong")
313                         wire.WriteByte(packetTypePong, c.bufWriter, &n, &err)
314                         c.sendMonitor.Update(int(n))
315                         c.flush()
316                 case <-c.quit:
317                         break FOR_LOOP
318                 case <-c.send:
319                         // Send some msgPackets
320                         eof := c.sendSomeMsgPackets()
321                         if !eof {
322                                 // Keep sendRoutine awake.
323                                 select {
324                                 case c.send <- struct{}{}:
325                                 default:
326                                 }
327                         }
328                 }
329
330                 if !c.IsRunning() {
331                         break FOR_LOOP
332                 }
333                 if err != nil {
334                         log.WithFields(log.Fields{
335                                 "conn":  c,
336                                 "error": err,
337                         }).Error("Connection failed @ sendRoutine")
338                         c.stopForError(err)
339                         break FOR_LOOP
340                 }
341         }
342
343         // Cleanup
344 }
345
346 // Returns true if messages from channels were exhausted.
347 // Blocks in accordance to .sendMonitor throttling.
348 func (c *MConnection) sendSomeMsgPackets() bool {
349         // Block until .sendMonitor says we can write.
350         // Once we're ready we send more than we asked for,
351         // but amortized it should even out.
352         c.sendMonitor.Limit(maxMsgPacketTotalSize, atomic.LoadInt64(&c.config.SendRate), true)
353
354         // Now send some msgPackets.
355         for i := 0; i < numBatchMsgPackets; i++ {
356                 if c.sendMsgPacket() {
357                         return true
358                 }
359         }
360         return false
361 }
362
363 // Returns true if messages from channels were exhausted.
364 func (c *MConnection) sendMsgPacket() bool {
365         // Choose a channel to create a msgPacket from.
366         // The chosen channel will be the one whose recentlySent/priority is the least.
367         var leastRatio float32 = math.MaxFloat32
368         var leastChannel *Channel
369         for _, channel := range c.channels {
370                 // If nothing to send, skip this channel
371                 if !channel.isSendPending() {
372                         continue
373                 }
374                 // Get ratio, and keep track of lowest ratio.
375                 ratio := float32(channel.recentlySent) / float32(channel.priority)
376                 if ratio < leastRatio {
377                         leastRatio = ratio
378                         leastChannel = channel
379                 }
380         }
381
382         // Nothing to send?
383         if leastChannel == nil {
384                 return true
385         } else {
386                 // c.Logger.Info("Found a msgPacket to send")
387         }
388
389         // Make & send a msgPacket from this channel
390         n, err := leastChannel.writeMsgPacketTo(c.bufWriter)
391         if err != nil {
392                 log.WithField("error", err).Error("Failed to write msgPacket")
393                 c.stopForError(err)
394                 return true
395         }
396         c.sendMonitor.Update(int(n))
397         c.flushTimer.Set()
398         return false
399 }
400
401 // recvRoutine reads msgPackets and reconstructs the message using the channels' "recving" buffer.
402 // After a whole message has been assembled, it's pushed to onReceive().
403 // Blocks depending on how the connection is throttled.
404 func (c *MConnection) recvRoutine() {
405         defer c._recover()
406
407 FOR_LOOP:
408         for {
409                 // Block until .recvMonitor says we can read.
410                 c.recvMonitor.Limit(maxMsgPacketTotalSize, atomic.LoadInt64(&c.config.RecvRate), true)
411
412                 /*
413                         // Peek into bufReader for debugging
414                         if numBytes := c.bufReader.Buffered(); numBytes > 0 {
415                                 log.Infof("Peek connection buffer numBytes:", numBytes)
416                                 bytes, err := c.bufReader.Peek(cmn.MinInt(numBytes, 100))
417                                 if err == nil {
418                                         log.Infof("bytes:", bytes)
419                                 } else {
420                                         log.Warning("Error peeking connection buffer err:", err)
421                                 }
422                         } else {
423                                 log.Warning("Received bytes number is:", numBytes)
424                         }
425                 */
426
427                 // Read packet type
428                 var n int
429                 var err error
430                 pktType := wire.ReadByte(c.bufReader, &n, &err)
431                 c.recvMonitor.Update(int(n))
432                 if err != nil {
433                         if c.IsRunning() {
434                                 log.WithFields(log.Fields{
435                                         "conn":  c,
436                                         "error": err,
437                                 }).Error("Connection failed @ recvRoutine (reading byte)")
438                                 c.conn.Close()
439                                 c.stopForError(err)
440                         }
441                         break FOR_LOOP
442                 }
443
444                 // Read more depending on packet type.
445                 switch pktType {
446                 case packetTypePing:
447                         // TODO: prevent abuse, as they cause flush()'s.
448                         log.Debug("Receive Ping")
449                         c.pong <- struct{}{}
450                 case packetTypePong:
451                         // do nothing
452                         log.Debug("Receive Pong")
453                 case packetTypeMsg:
454                         pkt, n, err := msgPacket{}, int(0), error(nil)
455                         wire.ReadBinaryPtr(&pkt, c.bufReader, maxMsgPacketTotalSize, &n, &err)
456                         c.recvMonitor.Update(int(n))
457                         if err != nil {
458                                 if c.IsRunning() {
459                                         log.WithFields(log.Fields{
460                                                 "conn":  c,
461                                                 "error": err,
462                                         }).Error("Connection failed @ recvRoutine")
463                                         c.stopForError(err)
464                                 }
465                                 break FOR_LOOP
466                         }
467                         channel, ok := c.channelsIdx[pkt.ChannelID]
468                         if !ok || channel == nil {
469                                 if pkt.ChannelID == PexChannel {
470                                         continue
471                                 } else {
472                                         cmn.PanicQ(cmn.Fmt("Unknown channel %X", pkt.ChannelID))
473                                 }
474                         }
475                         msgBytes, err := channel.recvMsgPacket(pkt)
476                         if err != nil {
477                                 if c.IsRunning() {
478                                         log.WithFields(log.Fields{
479                                                 "conn":  c,
480                                                 "error": err,
481                                         }).Error("Connection failed @ recvRoutine")
482                                         c.stopForError(err)
483                                 }
484                                 break FOR_LOOP
485                         }
486                         if msgBytes != nil {
487                                 log.WithFields(log.Fields{
488                                         "channelID": pkt.ChannelID,
489                                         "msgBytes":  msgBytes,
490                                 }).Debug("Received bytes")
491                                 c.onReceive(pkt.ChannelID, msgBytes)
492                         }
493                 default:
494                         cmn.PanicSanity(cmn.Fmt("Unknown message type %X", pktType))
495                 }
496
497                 // TODO: shouldn't this go in the sendRoutine?
498                 // Better to send a ping packet when *we* haven't sent anything for a while.
499                 c.pingTimer.Reset()
500         }
501
502         // Cleanup
503         close(c.pong)
504         for _ = range c.pong {
505                 // Drain
506         }
507 }
508
509 type ConnectionStatus struct {
510         SendMonitor flow.Status
511         RecvMonitor flow.Status
512         Channels    []ChannelStatus
513 }
514
515 type ChannelStatus struct {
516         ID                byte
517         SendQueueCapacity int
518         SendQueueSize     int
519         Priority          int
520         RecentlySent      int64
521 }
522
523 func (c *MConnection) Status() ConnectionStatus {
524         var status ConnectionStatus
525         status.SendMonitor = c.sendMonitor.Status()
526         status.RecvMonitor = c.recvMonitor.Status()
527         status.Channels = make([]ChannelStatus, len(c.channels))
528         for i, channel := range c.channels {
529                 status.Channels[i] = ChannelStatus{
530                         ID:                channel.id,
531                         SendQueueCapacity: cap(channel.sendQueue),
532                         SendQueueSize:     int(channel.sendQueueSize), // TODO use atomic
533                         Priority:          channel.priority,
534                         RecentlySent:      channel.recentlySent,
535                 }
536         }
537         return status
538 }
539
540 //-----------------------------------------------------------------------------
541
542 type ChannelDescriptor struct {
543         ID                  byte
544         Priority            int
545         SendQueueCapacity   int
546         RecvBufferCapacity  int
547         RecvMessageCapacity int
548 }
549
550 func (chDesc *ChannelDescriptor) FillDefaults() {
551         if chDesc.SendQueueCapacity == 0 {
552                 chDesc.SendQueueCapacity = defaultSendQueueCapacity
553         }
554         if chDesc.RecvBufferCapacity == 0 {
555                 chDesc.RecvBufferCapacity = defaultRecvBufferCapacity
556         }
557         if chDesc.RecvMessageCapacity == 0 {
558                 chDesc.RecvMessageCapacity = defaultRecvMessageCapacity
559         }
560 }
561
562 // TODO: lowercase.
563 // NOTE: not goroutine-safe.
564 type Channel struct {
565         conn          *MConnection
566         desc          *ChannelDescriptor
567         id            byte
568         sendQueue     chan []byte
569         sendQueueSize int32 // atomic.
570         recving       []byte
571         sending       []byte
572         priority      int
573         recentlySent  int64 // exponential moving average
574 }
575
576 func newChannel(conn *MConnection, desc *ChannelDescriptor) *Channel {
577         desc.FillDefaults()
578         if desc.Priority <= 0 {
579                 cmn.PanicSanity("Channel default priority must be a postive integer")
580         }
581         return &Channel{
582                 conn:      conn,
583                 desc:      desc,
584                 id:        desc.ID,
585                 sendQueue: make(chan []byte, desc.SendQueueCapacity),
586                 recving:   make([]byte, 0, desc.RecvBufferCapacity),
587                 priority:  desc.Priority,
588         }
589 }
590
591 // Queues message to send to this channel.
592 // Goroutine-safe
593 // Times out (and returns false) after defaultSendTimeout
594 func (ch *Channel) sendBytes(bytes []byte) bool {
595         select {
596         case ch.sendQueue <- bytes:
597                 atomic.AddInt32(&ch.sendQueueSize, 1)
598                 return true
599         case <-time.After(defaultSendTimeout):
600                 return false
601         }
602 }
603
604 // Queues message to send to this channel.
605 // Nonblocking, returns true if successful.
606 // Goroutine-safe
607 func (ch *Channel) trySendBytes(bytes []byte) bool {
608         select {
609         case ch.sendQueue <- bytes:
610                 atomic.AddInt32(&ch.sendQueueSize, 1)
611                 return true
612         default:
613                 return false
614         }
615 }
616
617 // Goroutine-safe
618 func (ch *Channel) loadSendQueueSize() (size int) {
619         return int(atomic.LoadInt32(&ch.sendQueueSize))
620 }
621
622 // Goroutine-safe
623 // Use only as a heuristic.
624 func (ch *Channel) canSend() bool {
625         return ch.loadSendQueueSize() < defaultSendQueueCapacity
626 }
627
628 // Returns true if any msgPackets are pending to be sent.
629 // Call before calling nextMsgPacket()
630 // Goroutine-safe
631 func (ch *Channel) isSendPending() bool {
632         if len(ch.sending) == 0 {
633                 if len(ch.sendQueue) == 0 {
634                         return false
635                 }
636                 ch.sending = <-ch.sendQueue
637         }
638         return true
639 }
640
641 // Creates a new msgPacket to send.
642 // Not goroutine-safe
643 func (ch *Channel) nextMsgPacket() msgPacket {
644         packet := msgPacket{}
645         packet.ChannelID = byte(ch.id)
646         packet.Bytes = ch.sending[:cmn.MinInt(maxMsgPacketPayloadSize, len(ch.sending))]
647         if len(ch.sending) <= maxMsgPacketPayloadSize {
648                 packet.EOF = byte(0x01)
649                 ch.sending = nil
650                 atomic.AddInt32(&ch.sendQueueSize, -1) // decrement sendQueueSize
651         } else {
652                 packet.EOF = byte(0x00)
653                 ch.sending = ch.sending[cmn.MinInt(maxMsgPacketPayloadSize, len(ch.sending)):]
654         }
655         return packet
656 }
657
658 // Writes next msgPacket to w.
659 // Not goroutine-safe
660 func (ch *Channel) writeMsgPacketTo(w io.Writer) (n int, err error) {
661         packet := ch.nextMsgPacket()
662         wire.WriteByte(packetTypeMsg, w, &n, &err)
663         wire.WriteBinary(packet, w, &n, &err)
664         if err == nil {
665                 ch.recentlySent += int64(n)
666         }
667         return
668 }
669
670 // Handles incoming msgPackets. Returns a msg bytes if msg is complete.
671 // Not goroutine-safe
672 func (ch *Channel) recvMsgPacket(packet msgPacket) ([]byte, error) {
673         if ch.desc.RecvMessageCapacity < len(ch.recving)+len(packet.Bytes) {
674                 return nil, wire.ErrBinaryReadOverflow
675         }
676         ch.recving = append(ch.recving, packet.Bytes...)
677         if packet.EOF == byte(0x01) {
678                 msgBytes := ch.recving
679                 // clear the slice without re-allocating.
680                 // http://stackoverflow.com/questions/16971741/how-do-you-clear-a-slice-in-go
681                 //   suggests this could be a memory leak, but we might as well keep the memory for the channel until it closes,
682                 //      at which point the recving slice stops being used and should be garbage collected
683                 ch.recving = ch.recving[:0] // make([]byte, 0, ch.desc.RecvBufferCapacity)
684                 return msgBytes, nil
685         }
686         return nil, nil
687 }
688
689 // Call this periodically to update stats for throttling purposes.
690 // Not goroutine-safe
691 func (ch *Channel) updateStats() {
692         // Exponential decay of stats.
693         // TODO: optimize.
694         ch.recentlySent = int64(float64(ch.recentlySent) * 0.8)
695 }
696
697 //-----------------------------------------------------------------------------
698
699 const (
700         maxMsgPacketPayloadSize  = 1024
701         maxMsgPacketOverheadSize = 10 // It's actually lower but good enough
702         maxMsgPacketTotalSize    = maxMsgPacketPayloadSize + maxMsgPacketOverheadSize
703         packetTypePing           = byte(0x01)
704         packetTypePong           = byte(0x02)
705         packetTypeMsg            = byte(0x03)
706 )
707
708 // Messages in channels are chopped into smaller msgPackets for multiplexing.
709 type msgPacket struct {
710         ChannelID byte
711         EOF       byte // 1 means message ends here.
712         Bytes     []byte
713 }
714
715 func (p msgPacket) String() string {
716         return fmt.Sprintf("MsgPacket{%X:%X T:%X}", p.ChannelID, p.Bytes, p.EOF)
717 }