12 log "github.com/sirupsen/logrus"
13 wire "github.com/tendermint/go-wire"
14 cmn "github.com/tendermint/tmlibs/common"
15 "github.com/tendermint/tmlibs/flowrate"
19 packetTypePing = byte(0x01)
20 packetTypePong = byte(0x02)
21 packetTypeMsg = byte(0x03)
22 maxMsgPacketPayloadSize = 1024
23 maxMsgPacketOverheadSize = 10 // It's actually lower but good enough
24 maxMsgPacketTotalSize = maxMsgPacketPayloadSize + maxMsgPacketOverheadSize
26 numBatchMsgPackets = 10
27 minReadBufferSize = 1024
28 minWriteBufferSize = 65536
29 updateState = 2 * time.Second
30 pingTimeout = 40 * time.Second
31 flushThrottle = 100 * time.Millisecond
33 defaultSendQueueCapacity = 1
34 defaultSendRate = int64(512000) // 500KB/s
35 defaultRecvBufferCapacity = 4096
36 defaultRecvMessageCapacity = 22020096 // 21MB
37 defaultRecvRate = int64(512000) // 500KB/s
38 defaultSendTimeout = 10 * time.Second
39 logModule = "p2p/conn"
42 type receiveCbFunc func(chID byte, msgBytes []byte)
43 type errorCbFunc func(interface{})
45 // Messages in channels are chopped into smaller msgPackets for multiplexing.
46 type msgPacket struct {
48 EOF byte // 1 means message ends here.
52 func (p msgPacket) String() string {
53 return fmt.Sprintf("MsgPacket{%X:%X T:%X}", p.ChannelID, p.Bytes, p.EOF)
57 MConnection handles message transmission on multiple abstract communication
58 `Channel`s. Each channel has a globally unique byte id.
59 The byte id and the relative priorities of each `Channel` are configured upon
60 initialization of the connection.
62 There are two methods for sending messages:
63 func (m MConnection) Send(chID byte, msg interface{}) bool {}
64 func (m MConnection) TrySend(chID byte, msg interface{}) bool {}
66 `Send(chID, msg)` is a blocking call that waits until `msg` is successfully queued
67 for the channel with the given id byte `chID`, or until the request times out.
68 The message `msg` is serialized using the `tendermint/wire` submodule's
69 `WriteBinary()` reflection routine.
71 `TrySend(chID, msg)` is a nonblocking call that returns false if the channel's
74 Inbound message bytes are handled with an onReceive callback function.
76 type MConnection struct {
80 bufReader *bufio.Reader
81 bufWriter *bufio.Writer
82 sendMonitor *flowrate.Monitor
83 recvMonitor *flowrate.Monitor
87 channelsIdx map[byte]*channel
88 onReceive receiveCbFunc
94 flushTimer *cmn.ThrottleTimer // flush writes as necessary but throttled.
95 pingTimer *time.Ticker // send pings periodically
96 chStatsTimer *time.Ticker // update channel stats periodically
99 // MConnConfig is a MConnection configuration.
100 type MConnConfig struct {
101 SendRate int64 `mapstructure:"send_rate"`
102 RecvRate int64 `mapstructure:"recv_rate"`
105 // DefaultMConnConfig returns the default config.
106 func DefaultMConnConfig() *MConnConfig {
108 SendRate: defaultSendRate,
109 RecvRate: defaultRecvRate,
113 // NewMConnectionWithConfig wraps net.Conn and creates multiplex connection with a config
114 func NewMConnectionWithConfig(conn net.Conn, chDescs []*ChannelDescriptor, onReceive receiveCbFunc, onError errorCbFunc, config *MConnConfig) *MConnection {
115 mconn := &MConnection{
117 bufReader: bufio.NewReaderSize(conn, minReadBufferSize),
118 bufWriter: bufio.NewWriterSize(conn, minWriteBufferSize),
119 sendMonitor: flowrate.New(0, 0),
120 recvMonitor: flowrate.New(0, 0),
121 send: make(chan struct{}, 1),
122 pong: make(chan struct{}, 1),
123 channelsIdx: map[byte]*channel{},
124 channels: []*channel{},
125 onReceive: onReceive,
129 pingTimer: time.NewTicker(pingTimeout),
130 chStatsTimer: time.NewTicker(updateState),
133 for _, desc := range chDescs {
134 descCopy := *desc // copy the desc else unsafe access across connections
135 channel := newChannel(mconn, &descCopy)
136 mconn.channelsIdx[channel.id] = channel
137 mconn.channels = append(mconn.channels, channel)
139 mconn.BaseService = *cmn.NewBaseService(nil, "MConnection", mconn)
143 // OnStart implements BaseService
144 func (c *MConnection) OnStart() error {
145 c.BaseService.OnStart()
146 c.quit = make(chan struct{})
147 c.flushTimer = cmn.NewThrottleTimer("flush", flushThrottle)
153 // OnStop implements BaseService
154 func (c *MConnection) OnStop() {
155 c.BaseService.OnStop()
161 // We can't close pong safely here because recvRoutine may write to it after we've
162 // stopped. Though it doesn't need to get closed at all, we close it @ recvRoutine.
165 // CanSend returns true if you can send more data onto the chID, false otherwise
166 func (c *MConnection) CanSend(chID byte) bool {
171 channel, ok := c.channelsIdx[chID]
175 return channel.canSend()
178 // Send will queues a message to be sent to channel(blocking).
179 func (c *MConnection) Send(chID byte, msg interface{}) bool {
184 channel, ok := c.channelsIdx[chID]
186 log.WithFields(log.Fields{"module": logModule, "chID": chID}).Error("cannot send bytes due to unknown channel")
190 if !channel.sendBytes(wire.BinaryBytes(msg)) {
191 log.WithFields(log.Fields{"module": logModule, "chID": chID, "conn": c, "msg": msg}).Error("MConnection send failed")
196 case c.send <- struct{}{}:
202 // TrafficStatus return the in and out traffic status
203 func (c *MConnection) TrafficStatus() (*flowrate.Status, *flowrate.Status) {
204 sentStatus := c.sendMonitor.Status()
205 receivedStatus := c.recvMonitor.Status()
206 return &sentStatus, &receivedStatus
209 // TrySend queues a message to be sent to channel(Nonblocking).
210 func (c *MConnection) TrySend(chID byte, msg interface{}) bool {
215 channel, ok := c.channelsIdx[chID]
217 log.WithFields(log.Fields{"module": logModule, "chID": chID}).Error("cannot send bytes due to unknown channel")
221 ok = channel.trySendBytes(wire.BinaryBytes(msg))
224 case c.send <- struct{}{}:
231 func (c *MConnection) String() string {
232 return fmt.Sprintf("MConn{%v}", c.conn.RemoteAddr())
235 func (c *MConnection) flush() {
236 if err := c.bufWriter.Flush(); err != nil {
237 log.WithFields(log.Fields{"module": logModule, "error": err}).Error("MConnection flush failed")
241 // Catch panics, usually caused by remote disconnects.
242 func (c *MConnection) _recover() {
243 if r := recover(); r != nil {
244 stack := debug.Stack()
245 err := cmn.StackError{r, stack}
250 // recvRoutine reads msgPackets and reconstructs the message using the channels' "recving" buffer.
251 // After a whole message has been assembled, it's pushed to onReceive().
252 // Blocks depending on how the connection is throttled.
253 func (c *MConnection) recvRoutine() {
258 // Block until .recvMonitor says we can read.
259 c.recvMonitor.Limit(maxMsgPacketTotalSize, atomic.LoadInt64(&c.config.RecvRate), true)
264 pktType := wire.ReadByte(c.bufReader, &n, &err)
265 c.recvMonitor.Update(int(n))
268 log.WithFields(log.Fields{"module": logModule, "conn": c, "error": err}).Error("Connection failed @ recvRoutine (reading byte)")
275 // Read more depending on packet type.
278 log.WithFields(log.Fields{"module": logModule, "conn": c}).Debug("receive Ping")
280 case c.pong <- struct{}{}:
285 log.WithFields(log.Fields{"module": logModule, "conn": c}).Debug("receive Pong")
288 pkt, n, err := msgPacket{}, int(0), error(nil)
289 wire.ReadBinaryPtr(&pkt, c.bufReader, maxMsgPacketTotalSize, &n, &err)
290 c.recvMonitor.Update(int(n))
293 log.WithFields(log.Fields{"module": logModule, "conn": c, "error": err}).Error("failed on recvRoutine")
299 channel, ok := c.channelsIdx[pkt.ChannelID]
300 if !ok || channel == nil {
301 cmn.PanicQ(cmn.Fmt("Unknown channel %X", pkt.ChannelID))
304 msgBytes, err := channel.recvMsgPacket(pkt)
307 log.WithFields(log.Fields{"module": logModule, "conn": c, "error": err}).Error("failed on recvRoutine")
314 c.onReceive(pkt.ChannelID, msgBytes)
318 cmn.PanicSanity(cmn.Fmt("Unknown message type %X", pktType))
323 // Returns true if messages from channels were exhausted.
324 func (c *MConnection) sendMsgPacket() bool {
325 var leastRatio float32 = math.MaxFloat32
326 var leastChannel *channel
327 for _, channel := range c.channels {
328 if !channel.isSendPending() {
331 if ratio := float32(channel.recentlySent) / float32(channel.priority); ratio < leastRatio {
333 leastChannel = channel
336 if leastChannel == nil {
340 n, err := leastChannel.writeMsgPacketTo(c.bufWriter)
342 log.WithFields(log.Fields{"module": logModule, "error": err}).Error("failed to write msgPacket")
346 c.sendMonitor.Update(int(n))
351 // sendRoutine polls for packets to send from channels.
352 func (c *MConnection) sendRoutine() {
359 case <-c.flushTimer.Ch:
361 case <-c.chStatsTimer.C:
362 for _, channel := range c.channels {
363 channel.updateStats()
365 case <-c.pingTimer.C:
366 log.WithFields(log.Fields{"module": logModule, "conn": c}).Debug("send Ping")
367 wire.WriteByte(packetTypePing, c.bufWriter, &n, &err)
368 c.sendMonitor.Update(int(n))
371 log.WithFields(log.Fields{"module": logModule, "conn": c}).Debug("send Pong")
372 wire.WriteByte(packetTypePong, c.bufWriter, &n, &err)
373 c.sendMonitor.Update(int(n))
378 if eof := c.sendSomeMsgPackets(); !eof {
380 case c.send <- struct{}{}:
390 log.WithFields(log.Fields{"module": logModule, "conn": c, "error": err}).Error("Connection failed @ sendRoutine")
397 // Returns true if messages from channels were exhausted.
398 func (c *MConnection) sendSomeMsgPackets() bool {
399 // Block until .sendMonitor says we can write.
400 // Once we're ready we send more than we asked for,
401 // but amortized it should even out.
402 c.sendMonitor.Limit(maxMsgPacketTotalSize, atomic.LoadInt64(&c.config.SendRate), true)
403 for i := 0; i < numBatchMsgPackets; i++ {
404 if c.sendMsgPacket() {
411 func (c *MConnection) stopForError(r interface{}) {
413 if atomic.CompareAndSwapUint32(&c.errored, 0, 1) && c.onError != nil {