OSDN Git Service

Hulk did something
[bytom/vapor.git] / p2p / connection / connection.go
1 package connection
2
3 import (
4         "bufio"
5         "fmt"
6         "math"
7         "net"
8         "runtime/debug"
9         "sync/atomic"
10         "time"
11
12         log "github.com/sirupsen/logrus"
13         wire "github.com/tendermint/go-wire"
14         cmn "github.com/tendermint/tmlibs/common"
15         "github.com/tendermint/tmlibs/flowrate"
16 )
17
18 const (
19         packetTypePing           = byte(0x01)
20         packetTypePong           = byte(0x02)
21         packetTypeMsg            = byte(0x03)
22         maxMsgPacketPayloadSize  = 1024
23         maxMsgPacketOverheadSize = 10 // It's actually lower but good enough
24         maxMsgPacketTotalSize    = maxMsgPacketPayloadSize + maxMsgPacketOverheadSize
25
26         numBatchMsgPackets = 10
27         minReadBufferSize  = 1024
28         minWriteBufferSize = 65536
29         updateState        = 2 * time.Second
30         pingTimeout        = 40 * time.Second
31         flushThrottle      = 100 * time.Millisecond
32
33         defaultSendQueueCapacity   = 1
34         defaultSendRate            = int64(512000) // 500KB/s
35         defaultRecvBufferCapacity  = 4096
36         defaultRecvMessageCapacity = 22020096      // 21MB
37         defaultRecvRate            = int64(512000) // 500KB/s
38         defaultSendTimeout         = 10 * time.Second
39         logModule                  = "p2p/conn"
40 )
41
42 type receiveCbFunc func(chID byte, msgBytes []byte)
43 type errorCbFunc func(interface{})
44
45 // Messages in channels are chopped into smaller msgPackets for multiplexing.
46 type msgPacket struct {
47         ChannelID byte
48         EOF       byte // 1 means message ends here.
49         Bytes     []byte
50 }
51
52 func (p msgPacket) String() string {
53         return fmt.Sprintf("MsgPacket{%X:%X T:%X}", p.ChannelID, p.Bytes, p.EOF)
54 }
55
56 /*
57 MConnection handles message transmission on multiple abstract communication
58 `Channel`s.  Each channel has a globally unique byte id.
59 The byte id and the relative priorities of each `Channel` are configured upon
60 initialization of the connection.
61
62 There are two methods for sending messages:
63         func (m MConnection) Send(chID byte, msg interface{}) bool {}
64         func (m MConnection) TrySend(chID byte, msg interface{}) bool {}
65
66 `Send(chID, msg)` is a blocking call that waits until `msg` is successfully queued
67 for the channel with the given id byte `chID`, or until the request times out.
68 The message `msg` is serialized using the `tendermint/wire` submodule's
69 `WriteBinary()` reflection routine.
70
71 `TrySend(chID, msg)` is a nonblocking call that returns false if the channel's
72 queue is full.
73
74 Inbound message bytes are handled with an onReceive callback function.
75 */
76 type MConnection struct {
77         cmn.BaseService
78
79         conn        net.Conn
80         bufReader   *bufio.Reader
81         bufWriter   *bufio.Writer
82         sendMonitor *flowrate.Monitor
83         recvMonitor *flowrate.Monitor
84         send        chan struct{}
85         pong        chan struct{}
86         channels    []*channel
87         channelsIdx map[byte]*channel
88         onReceive   receiveCbFunc
89         onError     errorCbFunc
90         errored     uint32
91         config      *MConnConfig
92
93         quit         chan struct{}
94         flushTimer   *cmn.ThrottleTimer // flush writes as necessary but throttled.
95         pingTimer    *time.Ticker       // send pings periodically
96         chStatsTimer *time.Ticker       // update channel stats periodically
97 }
98
99 // MConnConfig is a MConnection configuration.
100 type MConnConfig struct {
101         SendRate int64 `mapstructure:"send_rate"`
102         RecvRate int64 `mapstructure:"recv_rate"`
103 }
104
105 // DefaultMConnConfig returns the default config.
106 func DefaultMConnConfig() *MConnConfig {
107         return &MConnConfig{
108                 SendRate: defaultSendRate,
109                 RecvRate: defaultRecvRate,
110         }
111 }
112
113 // NewMConnectionWithConfig wraps net.Conn and creates multiplex connection with a config
114 func NewMConnectionWithConfig(conn net.Conn, chDescs []*ChannelDescriptor, onReceive receiveCbFunc, onError errorCbFunc, config *MConnConfig) *MConnection {
115         mconn := &MConnection{
116                 conn:        conn,
117                 bufReader:   bufio.NewReaderSize(conn, minReadBufferSize),
118                 bufWriter:   bufio.NewWriterSize(conn, minWriteBufferSize),
119                 sendMonitor: flowrate.New(0, 0),
120                 recvMonitor: flowrate.New(0, 0),
121                 send:        make(chan struct{}, 1),
122                 pong:        make(chan struct{}, 1),
123                 channelsIdx: map[byte]*channel{},
124                 channels:    []*channel{},
125                 onReceive:   onReceive,
126                 onError:     onError,
127                 config:      config,
128
129                 pingTimer:    time.NewTicker(pingTimeout),
130                 chStatsTimer: time.NewTicker(updateState),
131         }
132
133         for _, desc := range chDescs {
134                 descCopy := *desc // copy the desc else unsafe access across connections
135                 channel := newChannel(mconn, &descCopy)
136                 mconn.channelsIdx[channel.id] = channel
137                 mconn.channels = append(mconn.channels, channel)
138         }
139         mconn.BaseService = *cmn.NewBaseService(nil, "MConnection", mconn)
140         return mconn
141 }
142
143 // OnStart implements BaseService
144 func (c *MConnection) OnStart() error {
145         c.BaseService.OnStart()
146         c.quit = make(chan struct{})
147         c.flushTimer = cmn.NewThrottleTimer("flush", flushThrottle)
148         go c.sendRoutine()
149         go c.recvRoutine()
150         return nil
151 }
152
153 // OnStop implements BaseService
154 func (c *MConnection) OnStop() {
155         c.BaseService.OnStop()
156         c.flushTimer.Stop()
157         c.pingTimer.Stop()
158         c.chStatsTimer.Stop()
159         if c.quit != nil {
160                 close(c.quit)
161         }
162         c.conn.Close()
163         // We can't close pong safely here because recvRoutine may write to it after we've
164         // stopped. Though it doesn't need to get closed at all, we close it @ recvRoutine.
165 }
166
167 // CanSend returns true if you can send more data onto the chID, false otherwise
168 func (c *MConnection) CanSend(chID byte) bool {
169         if !c.IsRunning() {
170                 return false
171         }
172
173         channel, ok := c.channelsIdx[chID]
174         if !ok {
175                 return false
176         }
177         return channel.canSend()
178 }
179
180 // Send will queues a message to be sent to channel(blocking).
181 func (c *MConnection) Send(chID byte, msg interface{}) bool {
182         if !c.IsRunning() {
183                 return false
184         }
185
186         channel, ok := c.channelsIdx[chID]
187         if !ok {
188                 log.WithFields(log.Fields{"module": logModule, "chID": chID}).Error("cannot send bytes due to unknown channel")
189                 return false
190         }
191
192         if !channel.sendBytes(wire.BinaryBytes(msg)) {
193                 log.WithFields(log.Fields{"module": logModule, "chID": chID, "conn": c, "msg": msg}).Error("MConnection send failed")
194                 return false
195         }
196
197         select {
198         case c.send <- struct{}{}:
199         default:
200         }
201         return true
202 }
203
204 // TrafficStatus return the in and out traffic status
205 func (c *MConnection) TrafficStatus() (*flowrate.Status, *flowrate.Status) {
206         sentStatus := c.sendMonitor.Status()
207         receivedStatus := c.recvMonitor.Status()
208         return &sentStatus, &receivedStatus
209 }
210
211 // TrySend queues a message to be sent to channel(Nonblocking).
212 func (c *MConnection) TrySend(chID byte, msg interface{}) bool {
213         if !c.IsRunning() {
214                 return false
215         }
216
217         channel, ok := c.channelsIdx[chID]
218         if !ok {
219                 log.WithFields(log.Fields{"module": logModule, "chID": chID}).Error("cannot send bytes due to unknown channel")
220                 return false
221         }
222
223         ok = channel.trySendBytes(wire.BinaryBytes(msg))
224         if ok {
225                 select {
226                 case c.send <- struct{}{}:
227                 default:
228                 }
229         }
230         return ok
231 }
232
233 func (c *MConnection) String() string {
234         return fmt.Sprintf("MConn{%v}", c.conn.RemoteAddr())
235 }
236
237 func (c *MConnection) flush() {
238         if err := c.bufWriter.Flush(); err != nil {
239                 log.WithFields(log.Fields{"module": logModule, "error": err}).Error("MConnection flush failed")
240         }
241 }
242
243 // Catch panics, usually caused by remote disconnects.
244 func (c *MConnection) _recover() {
245         if r := recover(); r != nil {
246                 stack := debug.Stack()
247                 err := cmn.StackError{r, stack}
248                 c.stopForError(err)
249         }
250 }
251
252 // recvRoutine reads msgPackets and reconstructs the message using the channels' "recving" buffer.
253 // After a whole message has been assembled, it's pushed to onReceive().
254 // Blocks depending on how the connection is throttled.
255 func (c *MConnection) recvRoutine() {
256         defer c._recover()
257         defer close(c.pong)
258
259         for {
260                 // Block until .recvMonitor says we can read.
261                 c.recvMonitor.Limit(maxMsgPacketTotalSize, atomic.LoadInt64(&c.config.RecvRate), true)
262
263                 // Read packet type
264                 var n int
265                 var err error
266                 pktType := wire.ReadByte(c.bufReader, &n, &err)
267                 c.recvMonitor.Update(int(n))
268                 if err != nil {
269                         if c.IsRunning() {
270                                 log.WithFields(log.Fields{"module": logModule, "conn": c, "error": err}).Error("Connection failed @ recvRoutine (reading byte)")
271                                 c.conn.Close()
272                                 c.stopForError(err)
273                         }
274                         return
275                 }
276
277                 // Read more depending on packet type.
278                 switch pktType {
279                 case packetTypePing:
280                         log.WithFields(log.Fields{"module": logModule, "conn": c}).Debug("receive Ping")
281                         select {
282                         case c.pong <- struct{}{}:
283                         default:
284                         }
285
286                 case packetTypePong:
287                         log.WithFields(log.Fields{"module": logModule, "conn": c}).Debug("receive Pong")
288
289                 case packetTypeMsg:
290                         pkt, n, err := msgPacket{}, int(0), error(nil)
291                         wire.ReadBinaryPtr(&pkt, c.bufReader, maxMsgPacketTotalSize, &n, &err)
292                         c.recvMonitor.Update(int(n))
293                         if err != nil {
294                                 if c.IsRunning() {
295                                         log.WithFields(log.Fields{"module": logModule, "conn": c, "error": err}).Error("failed on recvRoutine")
296                                         c.stopForError(err)
297                                 }
298                                 return
299                         }
300
301                         channel, ok := c.channelsIdx[pkt.ChannelID]
302                         if !ok || channel == nil {
303                                 cmn.PanicQ(cmn.Fmt("Unknown channel %X", pkt.ChannelID))
304                         }
305
306                         msgBytes, err := channel.recvMsgPacket(pkt)
307                         if err != nil {
308                                 if c.IsRunning() {
309                                         log.WithFields(log.Fields{"module": logModule, "conn": c, "error": err}).Error("failed on recvRoutine")
310                                         c.stopForError(err)
311                                 }
312                                 return
313                         }
314
315                         if msgBytes != nil {
316                                 c.onReceive(pkt.ChannelID, msgBytes)
317                         }
318
319                 default:
320                         cmn.PanicSanity(cmn.Fmt("Unknown message type %X", pktType))
321                 }
322         }
323 }
324
325 // Returns true if messages from channels were exhausted.
326 func (c *MConnection) sendMsgPacket() bool {
327         var leastRatio float32 = math.MaxFloat32
328         var leastChannel *channel
329         for _, channel := range c.channels {
330                 if !channel.isSendPending() {
331                         continue
332                 }
333                 if ratio := float32(channel.recentlySent) / float32(channel.priority); ratio < leastRatio {
334                         leastRatio = ratio
335                         leastChannel = channel
336                 }
337         }
338         if leastChannel == nil {
339                 return true
340         }
341
342         n, err := leastChannel.writeMsgPacketTo(c.bufWriter)
343         if err != nil {
344                 log.WithFields(log.Fields{"module": logModule, "error": err}).Error("failed to write msgPacket")
345                 c.stopForError(err)
346                 return true
347         }
348         c.sendMonitor.Update(int(n))
349         c.flushTimer.Set()
350         return false
351 }
352
353 // sendRoutine polls for packets to send from channels.
354 func (c *MConnection) sendRoutine() {
355         defer c._recover()
356
357         for {
358                 var n int
359                 var err error
360                 select {
361                 case <-c.flushTimer.Ch:
362                         c.flush()
363                 case <-c.chStatsTimer.C:
364                         for _, channel := range c.channels {
365                                 channel.updateStats()
366                         }
367                 case <-c.pingTimer.C:
368                         log.WithFields(log.Fields{"module": logModule, "conn": c}).Debug("send Ping")
369                         wire.WriteByte(packetTypePing, c.bufWriter, &n, &err)
370                         c.sendMonitor.Update(int(n))
371                         c.flush()
372                 case <-c.pong:
373                         log.WithFields(log.Fields{"module": logModule, "conn": c}).Debug("send Pong")
374                         wire.WriteByte(packetTypePong, c.bufWriter, &n, &err)
375                         c.sendMonitor.Update(int(n))
376                         c.flush()
377                 case <-c.quit:
378                         return
379                 case <-c.send:
380                         if eof := c.sendSomeMsgPackets(); !eof {
381                                 select {
382                                 case c.send <- struct{}{}:
383                                 default:
384                                 }
385                         }
386                 }
387
388                 if !c.IsRunning() {
389                         return
390                 }
391                 if err != nil {
392                         log.WithFields(log.Fields{"module": logModule, "conn": c, "error": err}).Error("Connection failed @ sendRoutine")
393                         c.stopForError(err)
394                         return
395                 }
396         }
397 }
398
399 // Returns true if messages from channels were exhausted.
400 func (c *MConnection) sendSomeMsgPackets() bool {
401         // Block until .sendMonitor says we can write.
402         // Once we're ready we send more than we asked for,
403         // but amortized it should even out.
404         c.sendMonitor.Limit(maxMsgPacketTotalSize, atomic.LoadInt64(&c.config.SendRate), true)
405         for i := 0; i < numBatchMsgPackets; i++ {
406                 if c.sendMsgPacket() {
407                         return true
408                 }
409         }
410         return false
411 }
412
413 func (c *MConnection) stopForError(r interface{}) {
414         c.Stop()
415         if atomic.CompareAndSwapUint32(&c.errored, 0, 1) && c.onError != nil {
416                 c.onError(r)
417         }
418 }