OSDN Git Service

add package
[bytom/vapor.git] / vendor / github.com / hashicorp / yamux / session.go
1 package yamux
2
3 import (
4         "bufio"
5         "fmt"
6         "io"
7         "io/ioutil"
8         "log"
9         "math"
10         "net"
11         "strings"
12         "sync"
13         "sync/atomic"
14         "time"
15 )
16
17 // Session is used to wrap a reliable ordered connection and to
18 // multiplex it into multiple streams.
19 type Session struct {
20         // remoteGoAway indicates the remote side does
21         // not want futher connections. Must be first for alignment.
22         remoteGoAway int32
23
24         // localGoAway indicates that we should stop
25         // accepting futher connections. Must be first for alignment.
26         localGoAway int32
27
28         // nextStreamID is the next stream we should
29         // send. This depends if we are a client/server.
30         nextStreamID uint32
31
32         // config holds our configuration
33         config *Config
34
35         // logger is used for our logs
36         logger *log.Logger
37
38         // conn is the underlying connection
39         conn io.ReadWriteCloser
40
41         // bufRead is a buffered reader
42         bufRead *bufio.Reader
43
44         // pings is used to track inflight pings
45         pings    map[uint32]chan struct{}
46         pingID   uint32
47         pingLock sync.Mutex
48
49         // streams maps a stream id to a stream, and inflight has an entry
50         // for any outgoing stream that has not yet been established. Both are
51         // protected by streamLock.
52         streams    map[uint32]*Stream
53         inflight   map[uint32]struct{}
54         streamLock sync.Mutex
55
56         // synCh acts like a semaphore. It is sized to the AcceptBacklog which
57         // is assumed to be symmetric between the client and server. This allows
58         // the client to avoid exceeding the backlog and instead blocks the open.
59         synCh chan struct{}
60
61         // acceptCh is used to pass ready streams to the client
62         acceptCh chan *Stream
63
64         // sendCh is used to mark a stream as ready to send,
65         // or to send a header out directly.
66         sendCh chan sendReady
67
68         // recvDoneCh is closed when recv() exits to avoid a race
69         // between stream registration and stream shutdown
70         recvDoneCh chan struct{}
71
72         // shutdown is used to safely close a session
73         shutdown     bool
74         shutdownErr  error
75         shutdownCh   chan struct{}
76         shutdownLock sync.Mutex
77 }
78
79 // sendReady is used to either mark a stream as ready
80 // or to directly send a header
81 type sendReady struct {
82         Hdr  []byte
83         Body io.Reader
84         Err  chan error
85 }
86
87 // newSession is used to construct a new session
88 func newSession(config *Config, conn io.ReadWriteCloser, client bool) *Session {
89         s := &Session{
90                 config:     config,
91                 logger:     log.New(config.LogOutput, "", log.LstdFlags),
92                 conn:       conn,
93                 bufRead:    bufio.NewReader(conn),
94                 pings:      make(map[uint32]chan struct{}),
95                 streams:    make(map[uint32]*Stream),
96                 inflight:   make(map[uint32]struct{}),
97                 synCh:      make(chan struct{}, config.AcceptBacklog),
98                 acceptCh:   make(chan *Stream, config.AcceptBacklog),
99                 sendCh:     make(chan sendReady, 64),
100                 recvDoneCh: make(chan struct{}),
101                 shutdownCh: make(chan struct{}),
102         }
103         if client {
104                 s.nextStreamID = 1
105         } else {
106                 s.nextStreamID = 2
107         }
108         go s.recv()
109         go s.send()
110         if config.EnableKeepAlive {
111                 go s.keepalive()
112         }
113         return s
114 }
115
116 // IsClosed does a safe check to see if we have shutdown
117 func (s *Session) IsClosed() bool {
118         select {
119         case <-s.shutdownCh:
120                 return true
121         default:
122                 return false
123         }
124 }
125
126 // CloseChan returns a read-only channel which is closed as
127 // soon as the session is closed.
128 func (s *Session) CloseChan() <-chan struct{} {
129         return s.shutdownCh
130 }
131
132 // NumStreams returns the number of currently open streams
133 func (s *Session) NumStreams() int {
134         s.streamLock.Lock()
135         num := len(s.streams)
136         s.streamLock.Unlock()
137         return num
138 }
139
140 // Open is used to create a new stream as a net.Conn
141 func (s *Session) Open() (net.Conn, error) {
142         conn, err := s.OpenStream()
143         if err != nil {
144                 return nil, err
145         }
146         return conn, nil
147 }
148
149 // OpenStream is used to create a new stream
150 func (s *Session) OpenStream() (*Stream, error) {
151         if s.IsClosed() {
152                 return nil, ErrSessionShutdown
153         }
154         if atomic.LoadInt32(&s.remoteGoAway) == 1 {
155                 return nil, ErrRemoteGoAway
156         }
157
158         // Block if we have too many inflight SYNs
159         select {
160         case s.synCh <- struct{}{}:
161         case <-s.shutdownCh:
162                 return nil, ErrSessionShutdown
163         }
164
165 GET_ID:
166         // Get an ID, and check for stream exhaustion
167         id := atomic.LoadUint32(&s.nextStreamID)
168         if id >= math.MaxUint32-1 {
169                 return nil, ErrStreamsExhausted
170         }
171         if !atomic.CompareAndSwapUint32(&s.nextStreamID, id, id+2) {
172                 goto GET_ID
173         }
174
175         // Register the stream
176         stream := newStream(s, id, streamInit)
177         s.streamLock.Lock()
178         s.streams[id] = stream
179         s.inflight[id] = struct{}{}
180         s.streamLock.Unlock()
181
182         // Send the window update to create
183         if err := stream.sendWindowUpdate(); err != nil {
184                 select {
185                 case <-s.synCh:
186                 default:
187                         s.logger.Printf("[ERR] yamux: aborted stream open without inflight syn semaphore")
188                 }
189                 return nil, err
190         }
191         return stream, nil
192 }
193
194 // Accept is used to block until the next available stream
195 // is ready to be accepted.
196 func (s *Session) Accept() (net.Conn, error) {
197         conn, err := s.AcceptStream()
198         if err != nil {
199                 return nil, err
200         }
201         return conn, err
202 }
203
204 // AcceptStream is used to block until the next available stream
205 // is ready to be accepted.
206 func (s *Session) AcceptStream() (*Stream, error) {
207         select {
208         case stream := <-s.acceptCh:
209                 if err := stream.sendWindowUpdate(); err != nil {
210                         return nil, err
211                 }
212                 return stream, nil
213         case <-s.shutdownCh:
214                 return nil, s.shutdownErr
215         }
216 }
217
218 // Close is used to close the session and all streams.
219 // Attempts to send a GoAway before closing the connection.
220 func (s *Session) Close() error {
221         s.shutdownLock.Lock()
222         defer s.shutdownLock.Unlock()
223
224         if s.shutdown {
225                 return nil
226         }
227         s.shutdown = true
228         if s.shutdownErr == nil {
229                 s.shutdownErr = ErrSessionShutdown
230         }
231         close(s.shutdownCh)
232         s.conn.Close()
233         <-s.recvDoneCh
234
235         s.streamLock.Lock()
236         defer s.streamLock.Unlock()
237         for _, stream := range s.streams {
238                 stream.forceClose()
239         }
240         return nil
241 }
242
243 // exitErr is used to handle an error that is causing the
244 // session to terminate.
245 func (s *Session) exitErr(err error) {
246         s.shutdownLock.Lock()
247         if s.shutdownErr == nil {
248                 s.shutdownErr = err
249         }
250         s.shutdownLock.Unlock()
251         s.Close()
252 }
253
254 // GoAway can be used to prevent accepting further
255 // connections. It does not close the underlying conn.
256 func (s *Session) GoAway() error {
257         return s.waitForSend(s.goAway(goAwayNormal), nil)
258 }
259
260 // goAway is used to send a goAway message
261 func (s *Session) goAway(reason uint32) header {
262         atomic.SwapInt32(&s.localGoAway, 1)
263         hdr := header(make([]byte, headerSize))
264         hdr.encode(typeGoAway, 0, 0, reason)
265         return hdr
266 }
267
268 // Ping is used to measure the RTT response time
269 func (s *Session) Ping() (time.Duration, error) {
270         // Get a channel for the ping
271         ch := make(chan struct{})
272
273         // Get a new ping id, mark as pending
274         s.pingLock.Lock()
275         id := s.pingID
276         s.pingID++
277         s.pings[id] = ch
278         s.pingLock.Unlock()
279
280         // Send the ping request
281         hdr := header(make([]byte, headerSize))
282         hdr.encode(typePing, flagSYN, 0, id)
283         if err := s.waitForSend(hdr, nil); err != nil {
284                 return 0, err
285         }
286
287         // Wait for a response
288         start := time.Now()
289         select {
290         case <-ch:
291         case <-time.After(s.config.ConnectionWriteTimeout):
292                 s.pingLock.Lock()
293                 delete(s.pings, id) // Ignore it if a response comes later.
294                 s.pingLock.Unlock()
295                 return 0, ErrTimeout
296         case <-s.shutdownCh:
297                 return 0, ErrSessionShutdown
298         }
299
300         // Compute the RTT
301         return time.Now().Sub(start), nil
302 }
303
304 // keepalive is a long running goroutine that periodically does
305 // a ping to keep the connection alive.
306 func (s *Session) keepalive() {
307         for {
308                 select {
309                 case <-time.After(s.config.KeepAliveInterval):
310                         _, err := s.Ping()
311                         if err != nil {
312                                 if err != ErrSessionShutdown {
313                                         s.logger.Printf("[ERR] yamux: keepalive failed: %v", err)
314                                         s.exitErr(ErrKeepAliveTimeout)
315                                 }
316                                 return
317                         }
318                 case <-s.shutdownCh:
319                         return
320                 }
321         }
322 }
323
324 // waitForSendErr waits to send a header, checking for a potential shutdown
325 func (s *Session) waitForSend(hdr header, body io.Reader) error {
326         errCh := make(chan error, 1)
327         return s.waitForSendErr(hdr, body, errCh)
328 }
329
330 // waitForSendErr waits to send a header with optional data, checking for a
331 // potential shutdown. Since there's the expectation that sends can happen
332 // in a timely manner, we enforce the connection write timeout here.
333 func (s *Session) waitForSendErr(hdr header, body io.Reader, errCh chan error) error {
334         t := timerPool.Get()
335         timer := t.(*time.Timer)
336         timer.Reset(s.config.ConnectionWriteTimeout)
337         defer func() {
338                 timer.Stop()
339                 select {
340                 case <-timer.C:
341                 default:
342                 }
343                 timerPool.Put(t)
344         }()
345
346         ready := sendReady{Hdr: hdr, Body: body, Err: errCh}
347         select {
348         case s.sendCh <- ready:
349         case <-s.shutdownCh:
350                 return ErrSessionShutdown
351         case <-timer.C:
352                 return ErrConnectionWriteTimeout
353         }
354
355         select {
356         case err := <-errCh:
357                 return err
358         case <-s.shutdownCh:
359                 return ErrSessionShutdown
360         case <-timer.C:
361                 return ErrConnectionWriteTimeout
362         }
363 }
364
365 // sendNoWait does a send without waiting. Since there's the expectation that
366 // the send happens right here, we enforce the connection write timeout if we
367 // can't queue the header to be sent.
368 func (s *Session) sendNoWait(hdr header) error {
369         t := timerPool.Get()
370         timer := t.(*time.Timer)
371         timer.Reset(s.config.ConnectionWriteTimeout)
372         defer func() {
373                 timer.Stop()
374                 select {
375                 case <-timer.C:
376                 default:
377                 }
378                 timerPool.Put(t)
379         }()
380
381         select {
382         case s.sendCh <- sendReady{Hdr: hdr}:
383                 return nil
384         case <-s.shutdownCh:
385                 return ErrSessionShutdown
386         case <-timer.C:
387                 return ErrConnectionWriteTimeout
388         }
389 }
390
391 // send is a long running goroutine that sends data
392 func (s *Session) send() {
393         for {
394                 select {
395                 case ready := <-s.sendCh:
396                         // Send a header if ready
397                         if ready.Hdr != nil {
398                                 sent := 0
399                                 for sent < len(ready.Hdr) {
400                                         n, err := s.conn.Write(ready.Hdr[sent:])
401                                         if err != nil {
402                                                 s.logger.Printf("[ERR] yamux: Failed to write header: %v", err)
403                                                 asyncSendErr(ready.Err, err)
404                                                 s.exitErr(err)
405                                                 return
406                                         }
407                                         sent += n
408                                 }
409                         }
410
411                         // Send data from a body if given
412                         if ready.Body != nil {
413                                 _, err := io.Copy(s.conn, ready.Body)
414                                 if err != nil {
415                                         s.logger.Printf("[ERR] yamux: Failed to write body: %v", err)
416                                         asyncSendErr(ready.Err, err)
417                                         s.exitErr(err)
418                                         return
419                                 }
420                         }
421
422                         // No error, successful send
423                         asyncSendErr(ready.Err, nil)
424                 case <-s.shutdownCh:
425                         return
426                 }
427         }
428 }
429
430 // recv is a long running goroutine that accepts new data
431 func (s *Session) recv() {
432         if err := s.recvLoop(); err != nil {
433                 s.exitErr(err)
434         }
435 }
436
437 // Ensure that the index of the handler (typeData/typeWindowUpdate/etc) matches the message type
438 var (
439         handlers = []func(*Session, header) error{
440                 typeData:         (*Session).handleStreamMessage,
441                 typeWindowUpdate: (*Session).handleStreamMessage,
442                 typePing:         (*Session).handlePing,
443                 typeGoAway:       (*Session).handleGoAway,
444         }
445 )
446
447 // recvLoop continues to receive data until a fatal error is encountered
448 func (s *Session) recvLoop() error {
449         defer close(s.recvDoneCh)
450         hdr := header(make([]byte, headerSize))
451         for {
452                 // Read the header
453                 if _, err := io.ReadFull(s.bufRead, hdr); err != nil {
454                         if err != io.EOF && !strings.Contains(err.Error(), "closed") && !strings.Contains(err.Error(), "reset by peer") {
455                                 s.logger.Printf("[ERR] yamux: Failed to read header: %v", err)
456                         }
457                         return err
458                 }
459
460                 // Verify the version
461                 if hdr.Version() != protoVersion {
462                         s.logger.Printf("[ERR] yamux: Invalid protocol version: %d", hdr.Version())
463                         return ErrInvalidVersion
464                 }
465
466                 mt := hdr.MsgType()
467                 if mt < typeData || mt > typeGoAway {
468                         return ErrInvalidMsgType
469                 }
470
471                 if err := handlers[mt](s, hdr); err != nil {
472                         return err
473                 }
474         }
475 }
476
477 // handleStreamMessage handles either a data or window update frame
478 func (s *Session) handleStreamMessage(hdr header) error {
479         // Check for a new stream creation
480         id := hdr.StreamID()
481         flags := hdr.Flags()
482         if flags&flagSYN == flagSYN {
483                 if err := s.incomingStream(id); err != nil {
484                         return err
485                 }
486         }
487
488         // Get the stream
489         s.streamLock.Lock()
490         stream := s.streams[id]
491         s.streamLock.Unlock()
492
493         // If we do not have a stream, likely we sent a RST
494         if stream == nil {
495                 // Drain any data on the wire
496                 if hdr.MsgType() == typeData && hdr.Length() > 0 {
497                         s.logger.Printf("[WARN] yamux: Discarding data for stream: %d", id)
498                         if _, err := io.CopyN(ioutil.Discard, s.bufRead, int64(hdr.Length())); err != nil {
499                                 s.logger.Printf("[ERR] yamux: Failed to discard data: %v", err)
500                                 return nil
501                         }
502                 } else {
503                         s.logger.Printf("[WARN] yamux: frame for missing stream: %v", hdr)
504                 }
505                 return nil
506         }
507
508         // Check if this is a window update
509         if hdr.MsgType() == typeWindowUpdate {
510                 if err := stream.incrSendWindow(hdr, flags); err != nil {
511                         if sendErr := s.sendNoWait(s.goAway(goAwayProtoErr)); sendErr != nil {
512                                 s.logger.Printf("[WARN] yamux: failed to send go away: %v", sendErr)
513                         }
514                         return err
515                 }
516                 return nil
517         }
518
519         // Read the new data
520         if err := stream.readData(hdr, flags, s.bufRead); err != nil {
521                 if sendErr := s.sendNoWait(s.goAway(goAwayProtoErr)); sendErr != nil {
522                         s.logger.Printf("[WARN] yamux: failed to send go away: %v", sendErr)
523                 }
524                 return err
525         }
526         return nil
527 }
528
529 // handlePing is invokde for a typePing frame
530 func (s *Session) handlePing(hdr header) error {
531         flags := hdr.Flags()
532         pingID := hdr.Length()
533
534         // Check if this is a query, respond back in a separate context so we
535         // don't interfere with the receiving thread blocking for the write.
536         if flags&flagSYN == flagSYN {
537                 go func() {
538                         hdr := header(make([]byte, headerSize))
539                         hdr.encode(typePing, flagACK, 0, pingID)
540                         if err := s.sendNoWait(hdr); err != nil {
541                                 s.logger.Printf("[WARN] yamux: failed to send ping reply: %v", err)
542                         }
543                 }()
544                 return nil
545         }
546
547         // Handle a response
548         s.pingLock.Lock()
549         ch := s.pings[pingID]
550         if ch != nil {
551                 delete(s.pings, pingID)
552                 close(ch)
553         }
554         s.pingLock.Unlock()
555         return nil
556 }
557
558 // handleGoAway is invokde for a typeGoAway frame
559 func (s *Session) handleGoAway(hdr header) error {
560         code := hdr.Length()
561         switch code {
562         case goAwayNormal:
563                 atomic.SwapInt32(&s.remoteGoAway, 1)
564         case goAwayProtoErr:
565                 s.logger.Printf("[ERR] yamux: received protocol error go away")
566                 return fmt.Errorf("yamux protocol error")
567         case goAwayInternalErr:
568                 s.logger.Printf("[ERR] yamux: received internal error go away")
569                 return fmt.Errorf("remote yamux internal error")
570         default:
571                 s.logger.Printf("[ERR] yamux: received unexpected go away")
572                 return fmt.Errorf("unexpected go away received")
573         }
574         return nil
575 }
576
577 // incomingStream is used to create a new incoming stream
578 func (s *Session) incomingStream(id uint32) error {
579         // Reject immediately if we are doing a go away
580         if atomic.LoadInt32(&s.localGoAway) == 1 {
581                 hdr := header(make([]byte, headerSize))
582                 hdr.encode(typeWindowUpdate, flagRST, id, 0)
583                 return s.sendNoWait(hdr)
584         }
585
586         // Allocate a new stream
587         stream := newStream(s, id, streamSYNReceived)
588
589         s.streamLock.Lock()
590         defer s.streamLock.Unlock()
591
592         // Check if stream already exists
593         if _, ok := s.streams[id]; ok {
594                 s.logger.Printf("[ERR] yamux: duplicate stream declared")
595                 if sendErr := s.sendNoWait(s.goAway(goAwayProtoErr)); sendErr != nil {
596                         s.logger.Printf("[WARN] yamux: failed to send go away: %v", sendErr)
597                 }
598                 return ErrDuplicateStream
599         }
600
601         // Register the stream
602         s.streams[id] = stream
603
604         // Check if we've exceeded the backlog
605         select {
606         case s.acceptCh <- stream:
607                 return nil
608         default:
609                 // Backlog exceeded! RST the stream
610                 s.logger.Printf("[WARN] yamux: backlog exceeded, forcing connection reset")
611                 delete(s.streams, id)
612                 stream.sendHdr.encode(typeWindowUpdate, flagRST, id, 0)
613                 return s.sendNoWait(stream.sendHdr)
614         }
615 }
616
617 // closeStream is used to close a stream once both sides have
618 // issued a close. If there was an in-flight SYN and the stream
619 // was not yet established, then this will give the credit back.
620 func (s *Session) closeStream(id uint32) {
621         s.streamLock.Lock()
622         if _, ok := s.inflight[id]; ok {
623                 select {
624                 case <-s.synCh:
625                 default:
626                         s.logger.Printf("[ERR] yamux: SYN tracking out of sync")
627                 }
628         }
629         delete(s.streams, id)
630         s.streamLock.Unlock()
631 }
632
633 // establishStream is used to mark a stream that was in the
634 // SYN Sent state as established.
635 func (s *Session) establishStream(id uint32) {
636         s.streamLock.Lock()
637         if _, ok := s.inflight[id]; ok {
638                 delete(s.inflight, id)
639         } else {
640                 s.logger.Printf("[ERR] yamux: established stream without inflight SYN (no tracking entry)")
641         }
642         select {
643         case <-s.synCh:
644         default:
645                 s.logger.Printf("[ERR] yamux: established stream without inflight SYN (didn't have semaphore)")
646         }
647         s.streamLock.Unlock()
648 }