12 log "github.com/sirupsen/logrus"
13 wire "github.com/tendermint/go-wire"
14 cmn "github.com/tendermint/tmlibs/common"
15 "github.com/tendermint/tmlibs/flowrate"
19 packetTypePing = byte(0x01)
20 packetTypePong = byte(0x02)
21 packetTypeMsg = byte(0x03)
22 maxMsgPacketPayloadSize = 1024
23 maxMsgPacketOverheadSize = 10 // It's actually lower but good enough
24 maxMsgPacketTotalSize = maxMsgPacketPayloadSize + maxMsgPacketOverheadSize
26 numBatchMsgPackets = 10
27 minReadBufferSize = 1024
28 minWriteBufferSize = 65536
29 updateState = 2 * time.Second
30 pingTimeout = 40 * time.Second
31 flushThrottle = 100 * time.Millisecond
33 defaultSendQueueCapacity = 1
34 defaultSendRate = int64(512000) // 500KB/s
35 defaultRecvBufferCapacity = 4096
36 defaultRecvMessageCapacity = 22020096 // 21MB
37 defaultRecvRate = int64(512000) // 500KB/s
38 defaultSendTimeout = 10 * time.Second
39 logModule = "p2p/conn"
42 type receiveCbFunc func(chID byte, msgBytes []byte)
43 type errorCbFunc func(interface{})
45 // Messages in channels are chopped into smaller msgPackets for multiplexing.
46 type msgPacket struct {
48 EOF byte // 1 means message ends here.
52 func (p msgPacket) String() string {
53 return fmt.Sprintf("MsgPacket{%X:%X T:%X}", p.ChannelID, p.Bytes, p.EOF)
57 MConnection handles message transmission on multiple abstract communication
58 `Channel`s. Each channel has a globally unique byte id.
59 The byte id and the relative priorities of each `Channel` are configured upon
60 initialization of the connection.
62 There are two methods for sending messages:
63 func (m MConnection) Send(chID byte, msg interface{}) bool {}
64 func (m MConnection) TrySend(chID byte, msg interface{}) bool {}
66 `Send(chID, msg)` is a blocking call that waits until `msg` is successfully queued
67 for the channel with the given id byte `chID`, or until the request times out.
68 The message `msg` is serialized using the `tendermint/wire` submodule's
69 `WriteBinary()` reflection routine.
71 `TrySend(chID, msg)` is a nonblocking call that returns false if the channel's
74 Inbound message bytes are handled with an onReceive callback function.
76 type MConnection struct {
80 bufReader *bufio.Reader
81 bufWriter *bufio.Writer
82 sendMonitor *flowrate.Monitor
83 recvMonitor *flowrate.Monitor
87 channelsIdx map[byte]*channel
88 onReceive receiveCbFunc
94 flushTimer *cmn.ThrottleTimer // flush writes as necessary but throttled.
95 pingTimer *time.Ticker // send pings periodically
96 chStatsTimer *time.Ticker // update channel stats periodically
99 // MConnConfig is a MConnection configuration.
100 type MConnConfig struct {
101 SendRate int64 `mapstructure:"send_rate"`
102 RecvRate int64 `mapstructure:"recv_rate"`
105 // DefaultMConnConfig returns the default config.
106 func DefaultMConnConfig() *MConnConfig {
108 SendRate: defaultSendRate,
109 RecvRate: defaultRecvRate,
113 // NewMConnectionWithConfig wraps net.Conn and creates multiplex connection with a config
114 func NewMConnectionWithConfig(conn net.Conn, chDescs []*ChannelDescriptor, onReceive receiveCbFunc, onError errorCbFunc, config *MConnConfig) *MConnection {
115 mconn := &MConnection{
117 bufReader: bufio.NewReaderSize(conn, minReadBufferSize),
118 bufWriter: bufio.NewWriterSize(conn, minWriteBufferSize),
119 sendMonitor: flowrate.New(0, 0),
120 recvMonitor: flowrate.New(0, 0),
121 send: make(chan struct{}, 1),
122 pong: make(chan struct{}, 1),
123 channelsIdx: map[byte]*channel{},
124 channels: []*channel{},
125 onReceive: onReceive,
129 pingTimer: time.NewTicker(pingTimeout),
130 chStatsTimer: time.NewTicker(updateState),
133 for _, desc := range chDescs {
134 descCopy := *desc // copy the desc else unsafe access across connections
135 channel := newChannel(mconn, &descCopy)
136 mconn.channelsIdx[channel.id] = channel
137 mconn.channels = append(mconn.channels, channel)
139 mconn.BaseService = *cmn.NewBaseService(nil, "MConnection", mconn)
143 // OnStart implements BaseService
144 func (c *MConnection) OnStart() error {
145 c.BaseService.OnStart()
146 c.quit = make(chan struct{})
147 c.flushTimer = cmn.NewThrottleTimer("flush", flushThrottle)
153 // OnStop implements BaseService
154 func (c *MConnection) OnStop() {
155 c.BaseService.OnStop()
158 c.chStatsTimer.Stop()
163 // We can't close pong safely here because recvRoutine may write to it after we've
164 // stopped. Though it doesn't need to get closed at all, we close it @ recvRoutine.
167 // CanSend returns true if you can send more data onto the chID, false otherwise
168 func (c *MConnection) CanSend(chID byte) bool {
173 channel, ok := c.channelsIdx[chID]
177 return channel.canSend()
180 // Send will queues a message to be sent to channel(blocking).
181 func (c *MConnection) Send(chID byte, msg interface{}) bool {
186 channel, ok := c.channelsIdx[chID]
188 log.WithFields(log.Fields{"module": logModule, "chID": chID}).Error("cannot send bytes due to unknown channel")
192 if !channel.sendBytes(wire.BinaryBytes(msg)) {
193 log.WithFields(log.Fields{"module": logModule, "chID": chID, "conn": c, "msg": msg}).Error("MConnection send failed")
198 case c.send <- struct{}{}:
204 // TrafficStatus return the in and out traffic status
205 func (c *MConnection) TrafficStatus() (*flowrate.Status, *flowrate.Status) {
206 sentStatus := c.sendMonitor.Status()
207 receivedStatus := c.recvMonitor.Status()
208 return &sentStatus, &receivedStatus
211 // TrySend queues a message to be sent to channel(Nonblocking).
212 func (c *MConnection) TrySend(chID byte, msg interface{}) bool {
217 channel, ok := c.channelsIdx[chID]
219 log.WithFields(log.Fields{"module": logModule, "chID": chID}).Error("cannot send bytes due to unknown channel")
223 ok = channel.trySendBytes(wire.BinaryBytes(msg))
226 case c.send <- struct{}{}:
233 func (c *MConnection) String() string {
234 return fmt.Sprintf("MConn{%v}", c.conn.RemoteAddr())
237 func (c *MConnection) flush() {
238 if err := c.bufWriter.Flush(); err != nil {
239 log.WithFields(log.Fields{"module": logModule, "error": err}).Error("MConnection flush failed")
243 // Catch panics, usually caused by remote disconnects.
244 func (c *MConnection) _recover() {
245 if r := recover(); r != nil {
246 stack := debug.Stack()
247 err := cmn.StackError{r, stack}
252 // recvRoutine reads msgPackets and reconstructs the message using the channels' "recving" buffer.
253 // After a whole message has been assembled, it's pushed to onReceive().
254 // Blocks depending on how the connection is throttled.
255 func (c *MConnection) recvRoutine() {
260 // Block until .recvMonitor says we can read.
261 c.recvMonitor.Limit(maxMsgPacketTotalSize, atomic.LoadInt64(&c.config.RecvRate), true)
266 pktType := wire.ReadByte(c.bufReader, &n, &err)
267 c.recvMonitor.Update(int(n))
270 log.WithFields(log.Fields{"module": logModule, "conn": c, "error": err}).Error("Connection failed @ recvRoutine (reading byte)")
277 // Read more depending on packet type.
280 log.WithFields(log.Fields{"module": logModule, "conn": c}).Debug("receive Ping")
282 case c.pong <- struct{}{}:
287 log.WithFields(log.Fields{"module": logModule, "conn": c}).Debug("receive Pong")
290 pkt, n, err := msgPacket{}, int(0), error(nil)
291 wire.ReadBinaryPtr(&pkt, c.bufReader, maxMsgPacketTotalSize, &n, &err)
292 c.recvMonitor.Update(int(n))
295 log.WithFields(log.Fields{"module": logModule, "conn": c, "error": err}).Error("failed on recvRoutine")
301 channel, ok := c.channelsIdx[pkt.ChannelID]
302 if !ok || channel == nil {
303 cmn.PanicQ(cmn.Fmt("Unknown channel %X", pkt.ChannelID))
306 msgBytes, err := channel.recvMsgPacket(pkt)
309 log.WithFields(log.Fields{"module": logModule, "conn": c, "error": err}).Error("failed on recvRoutine")
316 c.onReceive(pkt.ChannelID, msgBytes)
320 cmn.PanicSanity(cmn.Fmt("Unknown message type %X", pktType))
325 // Returns true if messages from channels were exhausted.
326 func (c *MConnection) sendMsgPacket() bool {
327 var leastRatio float32 = math.MaxFloat32
328 var leastChannel *channel
329 for _, channel := range c.channels {
330 if !channel.isSendPending() {
333 if ratio := float32(channel.recentlySent) / float32(channel.priority); ratio < leastRatio {
335 leastChannel = channel
338 if leastChannel == nil {
342 n, err := leastChannel.writeMsgPacketTo(c.bufWriter)
344 log.WithFields(log.Fields{"module": logModule, "error": err}).Error("failed to write msgPacket")
348 c.sendMonitor.Update(int(n))
353 // sendRoutine polls for packets to send from channels.
354 func (c *MConnection) sendRoutine() {
361 case <-c.flushTimer.Ch:
363 case <-c.chStatsTimer.C:
364 for _, channel := range c.channels {
365 channel.updateStats()
367 case <-c.pingTimer.C:
368 log.WithFields(log.Fields{"module": logModule, "conn": c}).Debug("send Ping")
369 wire.WriteByte(packetTypePing, c.bufWriter, &n, &err)
370 c.sendMonitor.Update(int(n))
373 log.WithFields(log.Fields{"module": logModule, "conn": c}).Debug("send Pong")
374 wire.WriteByte(packetTypePong, c.bufWriter, &n, &err)
375 c.sendMonitor.Update(int(n))
380 if eof := c.sendSomeMsgPackets(); !eof {
382 case c.send <- struct{}{}:
392 log.WithFields(log.Fields{"module": logModule, "conn": c, "error": err}).Error("Connection failed @ sendRoutine")
399 // Returns true if messages from channels were exhausted.
400 func (c *MConnection) sendSomeMsgPackets() bool {
401 // Block until .sendMonitor says we can write.
402 // Once we're ready we send more than we asked for,
403 // but amortized it should even out.
404 c.sendMonitor.Limit(maxMsgPacketTotalSize, atomic.LoadInt64(&c.config.SendRate), true)
405 for i := 0; i < numBatchMsgPackets; i++ {
406 if c.sendMsgPacket() {
413 func (c *MConnection) stopForError(r interface{}) {
415 if atomic.CompareAndSwapUint32(&c.errored, 0, 1) && c.onError != nil {