OSDN Git Service

add bytom logs into files
[bytom/bytom.git] / p2p / connection / connection.go
index 77595d4..67f3220 100644 (file)
@@ -3,7 +3,6 @@ package connection
 import (
        "bufio"
        "fmt"
-       "io"
        "math"
        "net"
        "runtime/debug"
@@ -13,10 +12,17 @@ import (
        log "github.com/sirupsen/logrus"
        wire "github.com/tendermint/go-wire"
        cmn "github.com/tendermint/tmlibs/common"
-       flow "github.com/tendermint/tmlibs/flowrate"
+       "github.com/tendermint/tmlibs/flowrate"
 )
 
 const (
+       packetTypePing           = byte(0x01)
+       packetTypePong           = byte(0x02)
+       packetTypeMsg            = byte(0x03)
+       maxMsgPacketPayloadSize  = 1024
+       maxMsgPacketOverheadSize = 10 // It's actually lower but good enough
+       maxMsgPacketTotalSize    = maxMsgPacketPayloadSize + maxMsgPacketOverheadSize
+
        numBatchMsgPackets = 10
        minReadBufferSize  = 1024
        minWriteBufferSize = 65536
@@ -30,18 +36,25 @@ const (
        defaultRecvMessageCapacity = 22020096      // 21MB
        defaultRecvRate            = int64(512000) // 500KB/s
        defaultSendTimeout         = 10 * time.Second
+       logModule                  = "p2pConn"
 )
 
 type receiveCbFunc func(chID byte, msgBytes []byte)
 type errorCbFunc func(interface{})
 
-/*
-Each peer has one `MConnection` (multiplex connection) instance.
+// Messages in channels are chopped into smaller msgPackets for multiplexing.
+type msgPacket struct {
+       ChannelID byte
+       EOF       byte // 1 means message ends here.
+       Bytes     []byte
+}
 
-__multiplex__ *noun* a system or signal involving simultaneous transmission of
-several messages along a single channel of communication.
+func (p msgPacket) String() string {
+       return fmt.Sprintf("MsgPacket{%X:%X T:%X}", p.ChannelID, p.Bytes, p.EOF)
+}
 
-Each `MConnection` handles message transmission on multiple abstract communication
+/*
+MConnection handles message transmission on multiple abstract communication
 `Channel`s.  Each channel has a globally unique byte id.
 The byte id and the relative priorities of each `Channel` are configured upon
 initialization of the connection.
@@ -66,12 +79,12 @@ type MConnection struct {
        conn        net.Conn
        bufReader   *bufio.Reader
        bufWriter   *bufio.Writer
-       sendMonitor *flow.Monitor
-       recvMonitor *flow.Monitor
+       sendMonitor *flowrate.Monitor
+       recvMonitor *flowrate.Monitor
        send        chan struct{}
        pong        chan struct{}
-       channels    []*Channel
-       channelsIdx map[byte]*Channel
+       channels    []*channel
+       channelsIdx map[byte]*channel
        onReceive   receiveCbFunc
        onError     errorCbFunc
        errored     uint32
@@ -97,26 +110,18 @@ func DefaultMConnConfig() *MConnConfig {
        }
 }
 
-// NewMConnection wraps net.Conn and creates multiplex connection
-func NewMConnection(conn net.Conn, chDescs []*ChannelDescriptor, onReceive receiveCbFunc, onError errorCbFunc) *MConnection {
-       return NewMConnectionWithConfig(
-               conn,
-               chDescs,
-               onReceive,
-               onError,
-               DefaultMConnConfig())
-}
-
 // NewMConnectionWithConfig wraps net.Conn and creates multiplex connection with a config
 func NewMConnectionWithConfig(conn net.Conn, chDescs []*ChannelDescriptor, onReceive receiveCbFunc, onError errorCbFunc, config *MConnConfig) *MConnection {
        mconn := &MConnection{
                conn:        conn,
                bufReader:   bufio.NewReaderSize(conn, minReadBufferSize),
                bufWriter:   bufio.NewWriterSize(conn, minWriteBufferSize),
-               sendMonitor: flow.New(0, 0),
-               recvMonitor: flow.New(0, 0),
+               sendMonitor: flowrate.New(0, 0),
+               recvMonitor: flowrate.New(0, 0),
                send:        make(chan struct{}, 1),
-               pong:        make(chan struct{}),
+               pong:        make(chan struct{}, 1),
+               channelsIdx: map[byte]*channel{},
+               channels:    []*channel{},
                onReceive:   onReceive,
                onError:     onError,
                config:      config,
@@ -125,24 +130,17 @@ func NewMConnectionWithConfig(conn net.Conn, chDescs []*ChannelDescriptor, onRec
                chStatsTimer: time.NewTicker(updateState),
        }
 
-       // Create channels
-       var channelsIdx = map[byte]*Channel{}
-       var channels = []*Channel{}
-
        for _, desc := range chDescs {
                descCopy := *desc // copy the desc else unsafe access across connections
                channel := newChannel(mconn, &descCopy)
-               channelsIdx[channel.id] = channel
-               channels = append(channels, channel)
+               mconn.channelsIdx[channel.id] = channel
+               mconn.channels = append(mconn.channels, channel)
        }
-       mconn.channels = channels
-       mconn.channelsIdx = channelsIdx
-
        mconn.BaseService = *cmn.NewBaseService(nil, "MConnection", mconn)
-
        return mconn
 }
 
+// OnStart implements BaseService
 func (c *MConnection) OnStart() error {
        c.BaseService.OnStart()
        c.quit = make(chan struct{})
@@ -152,243 +150,103 @@ func (c *MConnection) OnStart() error {
        return nil
 }
 
+// OnStop implements BaseService
 func (c *MConnection) OnStop() {
        c.BaseService.OnStop()
        c.flushTimer.Stop()
+       c.pingTimer.Stop()
+       c.chStatsTimer.Stop()
        if c.quit != nil {
                close(c.quit)
        }
        c.conn.Close()
-       // We can't close pong safely here because
-       // recvRoutine may write to it after we've stopped.
-       // Though it doesn't need to get closed at all,
-       // we close it @ recvRoutine.
-       // close(c.pong)
-}
-
-func (c *MConnection) String() string {
-       return fmt.Sprintf("MConn{%v}", c.conn.RemoteAddr())
+       // We can't close pong safely here because recvRoutine may write to it after we've
+       // stopped. Though it doesn't need to get closed at all, we close it @ recvRoutine.
 }
 
-func (c *MConnection) flush() {
-       log.WithField("conn", c).Debug("Flush")
-       err := c.bufWriter.Flush()
-       if err != nil {
-               log.WithField("error", err).Error("MConnection flush failed")
-       }
-}
-
-// Catch panics, usually caused by remote disconnects.
-func (c *MConnection) _recover() {
-       if r := recover(); r != nil {
-               stack := debug.Stack()
-               err := cmn.StackError{r, stack}
-               c.stopForError(err)
+// CanSend returns true if you can send more data onto the chID, false otherwise
+func (c *MConnection) CanSend(chID byte) bool {
+       if !c.IsRunning() {
+               return false
        }
-}
 
-func (c *MConnection) stopForError(r interface{}) {
-       c.Stop()
-       if atomic.CompareAndSwapUint32(&c.errored, 0, 1) {
-               if c.onError != nil {
-                       c.onError(r)
-               }
+       channel, ok := c.channelsIdx[chID]
+       if !ok {
+               return false
        }
+       return channel.canSend()
 }
 
-// Queues a message to be sent to channel.
+// Send will queues a message to be sent to channel(blocking).
 func (c *MConnection) Send(chID byte, msg interface{}) bool {
        if !c.IsRunning() {
                return false
        }
 
-       log.WithFields(log.Fields{
-               "chID": chID,
-               "conn": c,
-               "msg":  msg,
-       }).Debug("Send")
-
-       // Send message to channel.
        channel, ok := c.channelsIdx[chID]
        if !ok {
-               log.WithField("chID", chID).Error(cmn.Fmt("Cannot send bytes, unknown channel"))
+               log.WithFields(log.Fields{"module": logModule, "chID": chID}).Error("cannot send bytes due to unknown channel")
                return false
        }
 
-       success := channel.sendBytes(wire.BinaryBytes(msg))
-       if success {
-               // Wake up sendRoutine if necessary
-               select {
-               case c.send <- struct{}{}:
-               default:
-               }
-       } else {
-               log.WithFields(log.Fields{
-                       "chID": chID,
-                       "conn": c,
-                       "msg":  msg,
-               }).Error("Send failed")
+       if !channel.sendBytes(wire.BinaryBytes(msg)) {
+               log.WithFields(log.Fields{"module": logModule, "chID": chID, "conn": c, "msg": msg}).Error("MConnection send failed")
+               return false
+       }
+
+       select {
+       case c.send <- struct{}{}:
+       default:
        }
-       return success
+       return true
+}
+
+// TrafficStatus return the in and out traffic status
+func (c *MConnection) TrafficStatus() (*flowrate.Status, *flowrate.Status) {
+       sentStatus := c.sendMonitor.Status()
+       receivedStatus := c.recvMonitor.Status()
+       return &sentStatus, &receivedStatus
 }
 
-// Queues a message to be sent to channel.
-// Nonblocking, returns true if successful.
+// TrySend queues a message to be sent to channel(Nonblocking).
 func (c *MConnection) TrySend(chID byte, msg interface{}) bool {
        if !c.IsRunning() {
                return false
        }
 
-       log.WithFields(log.Fields{
-               "chID": chID,
-               "conn": c,
-               "msg":  msg,
-       }).Debug("TrySend")
-
-       // Send message to channel.
        channel, ok := c.channelsIdx[chID]
        if !ok {
-               log.WithField("chID", chID).Error(cmn.Fmt("cannot send bytes, unknown channel"))
+               log.WithFields(log.Fields{"module": logModule, "chID": chID}).Error("cannot send bytes due to unknown channel")
                return false
        }
 
        ok = channel.trySendBytes(wire.BinaryBytes(msg))
        if ok {
-               // Wake up sendRoutine if necessary
                select {
                case c.send <- struct{}{}:
                default:
                }
        }
-
        return ok
 }
 
-// CanSend returns true if you can send more data onto the chID, false
-// otherwise. Use only as a heuristic.
-func (c *MConnection) CanSend(chID byte) bool {
-       if !c.IsRunning() {
-               return false
-       }
-
-       channel, ok := c.channelsIdx[chID]
-       if !ok {
-               log.WithField("chID", chID).Error(cmn.Fmt("Unknown channel"))
-               return false
-       }
-       return channel.canSend()
-}
-
-// sendRoutine polls for packets to send from channels.
-func (c *MConnection) sendRoutine() {
-       defer c._recover()
-
-FOR_LOOP:
-       for {
-               var n int
-               var err error
-               select {
-               case <-c.flushTimer.Ch:
-                       // NOTE: flushTimer.Set() must be called every time
-                       // something is written to .bufWriter.
-                       c.flush()
-               case <-c.chStatsTimer.C:
-                       for _, channel := range c.channels {
-                               channel.updateStats()
-                       }
-               case <-c.pingTimer.C:
-                       log.Debug("Send Ping")
-                       wire.WriteByte(packetTypePing, c.bufWriter, &n, &err)
-                       c.sendMonitor.Update(int(n))
-                       c.flush()
-               case <-c.pong:
-                       log.Debug("Send Pong")
-                       wire.WriteByte(packetTypePong, c.bufWriter, &n, &err)
-                       c.sendMonitor.Update(int(n))
-                       c.flush()
-               case <-c.quit:
-                       break FOR_LOOP
-               case <-c.send:
-                       // Send some msgPackets
-                       eof := c.sendSomeMsgPackets()
-                       if !eof {
-                               // Keep sendRoutine awake.
-                               select {
-                               case c.send <- struct{}{}:
-                               default:
-                               }
-                       }
-               }
-
-               if !c.IsRunning() {
-                       break FOR_LOOP
-               }
-               if err != nil {
-                       log.WithFields(log.Fields{
-                               "conn":  c,
-                               "error": err,
-                       }).Error("Connection failed @ sendRoutine")
-                       c.stopForError(err)
-                       break FOR_LOOP
-               }
-       }
-
-       // Cleanup
+func (c *MConnection) String() string {
+       return fmt.Sprintf("MConn{%v}", c.conn.RemoteAddr())
 }
 
-// Returns true if messages from channels were exhausted.
-// Blocks in accordance to .sendMonitor throttling.
-func (c *MConnection) sendSomeMsgPackets() bool {
-       // Block until .sendMonitor says we can write.
-       // Once we're ready we send more than we asked for,
-       // but amortized it should even out.
-       c.sendMonitor.Limit(maxMsgPacketTotalSize, atomic.LoadInt64(&c.config.SendRate), true)
-
-       // Now send some msgPackets.
-       for i := 0; i < numBatchMsgPackets; i++ {
-               if c.sendMsgPacket() {
-                       return true
-               }
+func (c *MConnection) flush() {
+       if err := c.bufWriter.Flush(); err != nil {
+               log.WithFields(log.Fields{"module": logModule, "error": err}).Error("MConnection flush failed")
        }
-       return false
 }
 
-// Returns true if messages from channels were exhausted.
-func (c *MConnection) sendMsgPacket() bool {
-       // Choose a channel to create a msgPacket from.
-       // The chosen channel will be the one whose recentlySent/priority is the least.
-       var leastRatio float32 = math.MaxFloat32
-       var leastChannel *Channel
-       for _, channel := range c.channels {
-               // If nothing to send, skip this channel
-               if !channel.isSendPending() {
-                       continue
-               }
-               // Get ratio, and keep track of lowest ratio.
-               ratio := float32(channel.recentlySent) / float32(channel.priority)
-               if ratio < leastRatio {
-                       leastRatio = ratio
-                       leastChannel = channel
-               }
-       }
-
-       // Nothing to send?
-       if leastChannel == nil {
-               return true
-       } else {
-               // c.Logger.Info("Found a msgPacket to send")
-       }
-
-       // Make & send a msgPacket from this channel
-       n, err := leastChannel.writeMsgPacketTo(c.bufWriter)
-       if err != nil {
-               log.WithField("error", err).Error("Failed to write msgPacket")
+// Catch panics, usually caused by remote disconnects.
+func (c *MConnection) _recover() {
+       if r := recover(); r != nil {
+               stack := debug.Stack()
+               err := cmn.StackError{r, stack}
                c.stopForError(err)
-               return true
        }
-       c.sendMonitor.Update(int(n))
-       c.flushTimer.Set()
-       return false
 }
 
 // recvRoutine reads msgPackets and reconstructs the message using the channels' "recving" buffer.
@@ -396,27 +254,12 @@ func (c *MConnection) sendMsgPacket() bool {
 // Blocks depending on how the connection is throttled.
 func (c *MConnection) recvRoutine() {
        defer c._recover()
+       defer close(c.pong)
 
-FOR_LOOP:
        for {
                // Block until .recvMonitor says we can read.
                c.recvMonitor.Limit(maxMsgPacketTotalSize, atomic.LoadInt64(&c.config.RecvRate), true)
 
-               /*
-                       // Peek into bufReader for debugging
-                       if numBytes := c.bufReader.Buffered(); numBytes > 0 {
-                               log.Infof("Peek connection buffer numBytes:", numBytes)
-                               bytes, err := c.bufReader.Peek(cmn.MinInt(numBytes, 100))
-                               if err == nil {
-                                       log.Infof("bytes:", bytes)
-                               } else {
-                                       log.Warning("Error peeking connection buffer err:", err)
-                               }
-                       } else {
-                               log.Warning("Received bytes number is:", numBytes)
-                       }
-               */
-
                // Read packet type
                var n int
                var err error
@@ -424,279 +267,152 @@ FOR_LOOP:
                c.recvMonitor.Update(int(n))
                if err != nil {
                        if c.IsRunning() {
-                               log.WithFields(log.Fields{
-                                       "conn":  c,
-                                       "error": err,
-                               }).Error("Connection failed @ recvRoutine (reading byte)")
+                               log.WithFields(log.Fields{"module": logModule, "conn": c, "error": err}).Error("Connection failed @ recvRoutine (reading byte)")
                                c.conn.Close()
                                c.stopForError(err)
                        }
-                       break FOR_LOOP
+                       return
                }
 
                // Read more depending on packet type.
                switch pktType {
                case packetTypePing:
-                       // TODO: prevent abuse, as they cause flush()'s.
-                       log.Debug("Receive Ping")
-                       c.pong <- struct{}{}
+                       log.WithFields(log.Fields{"module": logModule, "conn": c}).Debug("receive Ping")
+                       select {
+                       case c.pong <- struct{}{}:
+                       default:
+                       }
+
                case packetTypePong:
-                       // do nothing
-                       log.Debug("Receive Pong")
+                       log.WithFields(log.Fields{"module": logModule, "conn": c}).Debug("receive Pong")
+
                case packetTypeMsg:
                        pkt, n, err := msgPacket{}, int(0), error(nil)
                        wire.ReadBinaryPtr(&pkt, c.bufReader, maxMsgPacketTotalSize, &n, &err)
                        c.recvMonitor.Update(int(n))
                        if err != nil {
                                if c.IsRunning() {
-                                       log.WithFields(log.Fields{
-                                               "conn":  c,
-                                               "error": err,
-                                       }).Error("Connection failed @ recvRoutine")
+                                       log.WithFields(log.Fields{"module": logModule, "conn": c, "error": err}).Error("failed on recvRoutine")
                                        c.stopForError(err)
                                }
-                               break FOR_LOOP
+                               return
                        }
+
                        channel, ok := c.channelsIdx[pkt.ChannelID]
                        if !ok || channel == nil {
                                cmn.PanicQ(cmn.Fmt("Unknown channel %X", pkt.ChannelID))
                        }
+
                        msgBytes, err := channel.recvMsgPacket(pkt)
                        if err != nil {
                                if c.IsRunning() {
-                                       log.WithFields(log.Fields{
-                                               "conn":  c,
-                                               "error": err,
-                                       }).Error("Connection failed @ recvRoutine")
+                                       log.WithFields(log.Fields{"module": logModule, "conn": c, "error": err}).Error("failed on recvRoutine")
                                        c.stopForError(err)
                                }
-                               break FOR_LOOP
+                               return
                        }
+
                        if msgBytes != nil {
-                               log.WithFields(log.Fields{
-                                       "channelID": pkt.ChannelID,
-                                       "msgBytes":  msgBytes,
-                               }).Debug("Received bytes")
                                c.onReceive(pkt.ChannelID, msgBytes)
                        }
+
                default:
                        cmn.PanicSanity(cmn.Fmt("Unknown message type %X", pktType))
                }
        }
-
-       // Cleanup
-       close(c.pong)
-       for _ = range c.pong {
-               // Drain
-       }
-}
-
-type ConnectionStatus struct {
-       SendMonitor flow.Status
-       RecvMonitor flow.Status
-       Channels    []ChannelStatus
 }
 
-type ChannelStatus struct {
-       ID                byte
-       SendQueueCapacity int
-       SendQueueSize     int
-       Priority          int
-       RecentlySent      int64
-}
-
-func (c *MConnection) Status() ConnectionStatus {
-       var status ConnectionStatus
-       status.SendMonitor = c.sendMonitor.Status()
-       status.RecvMonitor = c.recvMonitor.Status()
-       status.Channels = make([]ChannelStatus, len(c.channels))
-       for i, channel := range c.channels {
-               status.Channels[i] = ChannelStatus{
-                       ID:                channel.id,
-                       SendQueueCapacity: cap(channel.sendQueue),
-                       SendQueueSize:     int(channel.sendQueueSize), // TODO use atomic
-                       Priority:          channel.priority,
-                       RecentlySent:      channel.recentlySent,
+// Returns true if messages from channels were exhausted.
+func (c *MConnection) sendMsgPacket() bool {
+       var leastRatio float32 = math.MaxFloat32
+       var leastChannel *channel
+       for _, channel := range c.channels {
+               if !channel.isSendPending() {
+                       continue
+               }
+               if ratio := float32(channel.recentlySent) / float32(channel.priority); ratio < leastRatio {
+                       leastRatio = ratio
+                       leastChannel = channel
                }
        }
-       return status
-}
-
-//-----------------------------------------------------------------------------
-
-type ChannelDescriptor struct {
-       ID                  byte
-       Priority            int
-       SendQueueCapacity   int
-       RecvBufferCapacity  int
-       RecvMessageCapacity int
-}
-
-func (chDesc *ChannelDescriptor) FillDefaults() {
-       if chDesc.SendQueueCapacity == 0 {
-               chDesc.SendQueueCapacity = defaultSendQueueCapacity
-       }
-       if chDesc.RecvBufferCapacity == 0 {
-               chDesc.RecvBufferCapacity = defaultRecvBufferCapacity
-       }
-       if chDesc.RecvMessageCapacity == 0 {
-               chDesc.RecvMessageCapacity = defaultRecvMessageCapacity
-       }
-}
-
-// TODO: lowercase.
-// NOTE: not goroutine-safe.
-type Channel struct {
-       conn          *MConnection
-       desc          *ChannelDescriptor
-       id            byte
-       sendQueue     chan []byte
-       sendQueueSize int32 // atomic.
-       recving       []byte
-       sending       []byte
-       priority      int
-       recentlySent  int64 // exponential moving average
-}
-
-func newChannel(conn *MConnection, desc *ChannelDescriptor) *Channel {
-       desc.FillDefaults()
-       if desc.Priority <= 0 {
-               cmn.PanicSanity("Channel default priority must be a postive integer")
-       }
-       return &Channel{
-               conn:      conn,
-               desc:      desc,
-               id:        desc.ID,
-               sendQueue: make(chan []byte, desc.SendQueueCapacity),
-               recving:   make([]byte, 0, desc.RecvBufferCapacity),
-               priority:  desc.Priority,
-       }
-}
-
-// Queues message to send to this channel.
-// Goroutine-safe
-// Times out (and returns false) after defaultSendTimeout
-func (ch *Channel) sendBytes(bytes []byte) bool {
-       select {
-       case ch.sendQueue <- bytes:
-               atomic.AddInt32(&ch.sendQueueSize, 1)
+       if leastChannel == nil {
                return true
-       case <-time.After(defaultSendTimeout):
-               return false
        }
-}
 
-// Queues message to send to this channel.
-// Nonblocking, returns true if successful.
-// Goroutine-safe
-func (ch *Channel) trySendBytes(bytes []byte) bool {
-       select {
-       case ch.sendQueue <- bytes:
-               atomic.AddInt32(&ch.sendQueueSize, 1)
+       n, err := leastChannel.writeMsgPacketTo(c.bufWriter)
+       if err != nil {
+               log.WithFields(log.Fields{"module": logModule, "error": err}).Error("failed to write msgPacket")
+               c.stopForError(err)
                return true
-       default:
-               return false
        }
+       c.sendMonitor.Update(int(n))
+       c.flushTimer.Set()
+       return false
 }
 
-// Goroutine-safe
-func (ch *Channel) loadSendQueueSize() (size int) {
-       return int(atomic.LoadInt32(&ch.sendQueueSize))
-}
-
-// Goroutine-safe
-// Use only as a heuristic.
-func (ch *Channel) canSend() bool {
-       return ch.loadSendQueueSize() < defaultSendQueueCapacity
-}
+// sendRoutine polls for packets to send from channels.
+func (c *MConnection) sendRoutine() {
+       defer c._recover()
 
-// Returns true if any msgPackets are pending to be sent.
-// Call before calling nextMsgPacket()
-// Goroutine-safe
-func (ch *Channel) isSendPending() bool {
-       if len(ch.sending) == 0 {
-               if len(ch.sendQueue) == 0 {
-                       return false
+       for {
+               var n int
+               var err error
+               select {
+               case <-c.flushTimer.Ch:
+                       c.flush()
+               case <-c.chStatsTimer.C:
+                       for _, channel := range c.channels {
+                               channel.updateStats()
+                       }
+               case <-c.pingTimer.C:
+                       log.WithFields(log.Fields{"module": logModule, "conn": c}).Debug("send Ping")
+                       wire.WriteByte(packetTypePing, c.bufWriter, &n, &err)
+                       c.sendMonitor.Update(int(n))
+                       c.flush()
+               case <-c.pong:
+                       log.WithFields(log.Fields{"module": logModule, "conn": c}).Debug("send Pong")
+                       wire.WriteByte(packetTypePong, c.bufWriter, &n, &err)
+                       c.sendMonitor.Update(int(n))
+                       c.flush()
+               case <-c.quit:
+                       return
+               case <-c.send:
+                       if eof := c.sendSomeMsgPackets(); !eof {
+                               select {
+                               case c.send <- struct{}{}:
+                               default:
+                               }
+                       }
                }
-               ch.sending = <-ch.sendQueue
-       }
-       return true
-}
 
-// Creates a new msgPacket to send.
-// Not goroutine-safe
-func (ch *Channel) nextMsgPacket() msgPacket {
-       packet := msgPacket{}
-       packet.ChannelID = byte(ch.id)
-       packet.Bytes = ch.sending[:cmn.MinInt(maxMsgPacketPayloadSize, len(ch.sending))]
-       if len(ch.sending) <= maxMsgPacketPayloadSize {
-               packet.EOF = byte(0x01)
-               ch.sending = nil
-               atomic.AddInt32(&ch.sendQueueSize, -1) // decrement sendQueueSize
-       } else {
-               packet.EOF = byte(0x00)
-               ch.sending = ch.sending[cmn.MinInt(maxMsgPacketPayloadSize, len(ch.sending)):]
+               if !c.IsRunning() {
+                       return
+               }
+               if err != nil {
+                       log.WithFields(log.Fields{"module": logModule, "conn": c, "error": err}).Error("Connection failed @ sendRoutine")
+                       c.stopForError(err)
+                       return
+               }
        }
-       return packet
 }
 
-// Writes next msgPacket to w.
-// Not goroutine-safe
-func (ch *Channel) writeMsgPacketTo(w io.Writer) (n int, err error) {
-       packet := ch.nextMsgPacket()
-       wire.WriteByte(packetTypeMsg, w, &n, &err)
-       wire.WriteBinary(packet, w, &n, &err)
-       if err == nil {
-               ch.recentlySent += int64(n)
+// Returns true if messages from channels were exhausted.
+func (c *MConnection) sendSomeMsgPackets() bool {
+       // Block until .sendMonitor says we can write.
+       // Once we're ready we send more than we asked for,
+       // but amortized it should even out.
+       c.sendMonitor.Limit(maxMsgPacketTotalSize, atomic.LoadInt64(&c.config.SendRate), true)
+       for i := 0; i < numBatchMsgPackets; i++ {
+               if c.sendMsgPacket() {
+                       return true
+               }
        }
-       return
+       return false
 }
 
-// Handles incoming msgPackets. Returns a msg bytes if msg is complete.
-// Not goroutine-safe
-func (ch *Channel) recvMsgPacket(packet msgPacket) ([]byte, error) {
-       if ch.desc.RecvMessageCapacity < len(ch.recving)+len(packet.Bytes) {
-               return nil, wire.ErrBinaryReadOverflow
-       }
-       ch.recving = append(ch.recving, packet.Bytes...)
-       if packet.EOF == byte(0x01) {
-               msgBytes := ch.recving
-               // clear the slice without re-allocating.
-               // http://stackoverflow.com/questions/16971741/how-do-you-clear-a-slice-in-go
-               //   suggests this could be a memory leak, but we might as well keep the memory for the channel until it closes,
-               //      at which point the recving slice stops being used and should be garbage collected
-               ch.recving = ch.recving[:0] // make([]byte, 0, ch.desc.RecvBufferCapacity)
-               return msgBytes, nil
+func (c *MConnection) stopForError(r interface{}) {
+       c.Stop()
+       if atomic.CompareAndSwapUint32(&c.errored, 0, 1) && c.onError != nil {
+               c.onError(r)
        }
-       return nil, nil
-}
-
-// Call this periodically to update stats for throttling purposes.
-// Not goroutine-safe
-func (ch *Channel) updateStats() {
-       // Exponential decay of stats.
-       // TODO: optimize.
-       ch.recentlySent = int64(float64(ch.recentlySent) * 0.8)
-}
-
-//-----------------------------------------------------------------------------
-
-const (
-       maxMsgPacketPayloadSize  = 1024
-       maxMsgPacketOverheadSize = 10 // It's actually lower but good enough
-       maxMsgPacketTotalSize    = maxMsgPacketPayloadSize + maxMsgPacketOverheadSize
-       packetTypePing           = byte(0x01)
-       packetTypePong           = byte(0x02)
-       packetTypeMsg            = byte(0x03)
-)
-
-// Messages in channels are chopped into smaller msgPackets for multiplexing.
-type msgPacket struct {
-       ChannelID byte
-       EOF       byte // 1 means message ends here.
-       Bytes     []byte
-}
-
-func (p msgPacket) String() string {
-       return fmt.Sprintf("MsgPacket{%X:%X T:%X}", p.ChannelID, p.Bytes, p.EOF)
 }