OSDN Git Service

new repo
[bytom/vapor.git] / vendor / google.golang.org / grpc / transport / http2_client.go
1 /*
2  *
3  * Copyright 2014 gRPC authors.
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  *     http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  *
17  */
18
19 package transport
20
21 import (
22         "bytes"
23         "io"
24         "math"
25         "net"
26         "strings"
27         "sync"
28         "sync/atomic"
29         "time"
30
31         "golang.org/x/net/context"
32         "golang.org/x/net/http2"
33         "golang.org/x/net/http2/hpack"
34         "google.golang.org/grpc/codes"
35         "google.golang.org/grpc/credentials"
36         "google.golang.org/grpc/keepalive"
37         "google.golang.org/grpc/metadata"
38         "google.golang.org/grpc/peer"
39         "google.golang.org/grpc/stats"
40         "google.golang.org/grpc/status"
41 )
42
43 // http2Client implements the ClientTransport interface with HTTP2.
44 type http2Client struct {
45         ctx        context.Context
46         cancel     context.CancelFunc
47         userAgent  string
48         md         interface{}
49         conn       net.Conn // underlying communication channel
50         remoteAddr net.Addr
51         localAddr  net.Addr
52         authInfo   credentials.AuthInfo // auth info about the connection
53         nextID     uint32               // the next stream ID to be used
54
55         // goAway is closed to notify the upper layer (i.e., addrConn.transportMonitor)
56         // that the server sent GoAway on this transport.
57         goAway chan struct{}
58         // awakenKeepalive is used to wake up keepalive when after it has gone dormant.
59         awakenKeepalive chan struct{}
60
61         framer *framer
62         hBuf   *bytes.Buffer  // the buffer for HPACK encoding
63         hEnc   *hpack.Encoder // HPACK encoder
64
65         // controlBuf delivers all the control related tasks (e.g., window
66         // updates, reset streams, and various settings) to the controller.
67         controlBuf *controlBuffer
68         fc         *inFlow
69         // sendQuotaPool provides flow control to outbound message.
70         sendQuotaPool *quotaPool
71         // streamsQuota limits the max number of concurrent streams.
72         streamsQuota *quotaPool
73
74         // The scheme used: https if TLS is on, http otherwise.
75         scheme string
76
77         isSecure bool
78
79         creds []credentials.PerRPCCredentials
80
81         // Boolean to keep track of reading activity on transport.
82         // 1 is true and 0 is false.
83         activity uint32 // Accessed atomically.
84         kp       keepalive.ClientParameters
85
86         statsHandler stats.Handler
87
88         initialWindowSize int32
89
90         bdpEst          *bdpEstimator
91         outQuotaVersion uint32
92
93         mu            sync.Mutex     // guard the following variables
94         state         transportState // the state of underlying connection
95         activeStreams map[uint32]*Stream
96         // The max number of concurrent streams
97         maxStreams int
98         // the per-stream outbound flow control window size set by the peer.
99         streamSendQuota uint32
100         // prevGoAway ID records the Last-Stream-ID in the previous GOAway frame.
101         prevGoAwayID uint32
102         // goAwayReason records the http2.ErrCode and debug data received with the
103         // GoAway frame.
104         goAwayReason GoAwayReason
105 }
106
107 func dial(ctx context.Context, fn func(context.Context, string) (net.Conn, error), addr string) (net.Conn, error) {
108         if fn != nil {
109                 return fn(ctx, addr)
110         }
111         return dialContext(ctx, "tcp", addr)
112 }
113
114 func isTemporary(err error) bool {
115         switch err {
116         case io.EOF:
117                 // Connection closures may be resolved upon retry, and are thus
118                 // treated as temporary.
119                 return true
120         case context.DeadlineExceeded:
121                 // In Go 1.7, context.DeadlineExceeded implements Timeout(), and this
122                 // special case is not needed. Until then, we need to keep this
123                 // clause.
124                 return true
125         }
126
127         switch err := err.(type) {
128         case interface {
129                 Temporary() bool
130         }:
131                 return err.Temporary()
132         case interface {
133                 Timeout() bool
134         }:
135                 // Timeouts may be resolved upon retry, and are thus treated as
136                 // temporary.
137                 return err.Timeout()
138         }
139         return false
140 }
141
142 // newHTTP2Client constructs a connected ClientTransport to addr based on HTTP2
143 // and starts to receive messages on it. Non-nil error returns if construction
144 // fails.
145 func newHTTP2Client(ctx context.Context, addr TargetInfo, opts ConnectOptions, timeout time.Duration) (_ ClientTransport, err error) {
146         scheme := "http"
147         ctx, cancel := context.WithCancel(ctx)
148         connectCtx, connectCancel := context.WithTimeout(ctx, timeout)
149         defer func() {
150                 if err != nil {
151                         cancel()
152                         // Don't call connectCancel in success path due to a race in Go 1.6:
153                         // https://github.com/golang/go/issues/15078.
154                         connectCancel()
155                 }
156         }()
157
158         conn, err := dial(connectCtx, opts.Dialer, addr.Addr)
159         if err != nil {
160                 if opts.FailOnNonTempDialError {
161                         return nil, connectionErrorf(isTemporary(err), err, "transport: error while dialing: %v", err)
162                 }
163                 return nil, connectionErrorf(true, err, "transport: Error while dialing %v", err)
164         }
165         // Any further errors will close the underlying connection
166         defer func(conn net.Conn) {
167                 if err != nil {
168                         conn.Close()
169                 }
170         }(conn)
171         var (
172                 isSecure bool
173                 authInfo credentials.AuthInfo
174         )
175         if creds := opts.TransportCredentials; creds != nil {
176                 scheme = "https"
177                 conn, authInfo, err = creds.ClientHandshake(connectCtx, addr.Authority, conn)
178                 if err != nil {
179                         // Credentials handshake errors are typically considered permanent
180                         // to avoid retrying on e.g. bad certificates.
181                         temp := isTemporary(err)
182                         return nil, connectionErrorf(temp, err, "transport: authentication handshake failed: %v", err)
183                 }
184                 isSecure = true
185         }
186         kp := opts.KeepaliveParams
187         // Validate keepalive parameters.
188         if kp.Time == 0 {
189                 kp.Time = defaultClientKeepaliveTime
190         }
191         if kp.Timeout == 0 {
192                 kp.Timeout = defaultClientKeepaliveTimeout
193         }
194         dynamicWindow := true
195         icwz := int32(initialWindowSize)
196         if opts.InitialConnWindowSize >= defaultWindowSize {
197                 icwz = opts.InitialConnWindowSize
198                 dynamicWindow = false
199         }
200         var buf bytes.Buffer
201         writeBufSize := defaultWriteBufSize
202         if opts.WriteBufferSize > 0 {
203                 writeBufSize = opts.WriteBufferSize
204         }
205         readBufSize := defaultReadBufSize
206         if opts.ReadBufferSize > 0 {
207                 readBufSize = opts.ReadBufferSize
208         }
209         t := &http2Client{
210                 ctx:        ctx,
211                 cancel:     cancel,
212                 userAgent:  opts.UserAgent,
213                 md:         addr.Metadata,
214                 conn:       conn,
215                 remoteAddr: conn.RemoteAddr(),
216                 localAddr:  conn.LocalAddr(),
217                 authInfo:   authInfo,
218                 // The client initiated stream id is odd starting from 1.
219                 nextID:            1,
220                 goAway:            make(chan struct{}),
221                 awakenKeepalive:   make(chan struct{}, 1),
222                 hBuf:              &buf,
223                 hEnc:              hpack.NewEncoder(&buf),
224                 framer:            newFramer(conn, writeBufSize, readBufSize),
225                 controlBuf:        newControlBuffer(),
226                 fc:                &inFlow{limit: uint32(icwz)},
227                 sendQuotaPool:     newQuotaPool(defaultWindowSize),
228                 scheme:            scheme,
229                 state:             reachable,
230                 activeStreams:     make(map[uint32]*Stream),
231                 isSecure:          isSecure,
232                 creds:             opts.PerRPCCredentials,
233                 maxStreams:        defaultMaxStreamsClient,
234                 streamsQuota:      newQuotaPool(defaultMaxStreamsClient),
235                 streamSendQuota:   defaultWindowSize,
236                 kp:                kp,
237                 statsHandler:      opts.StatsHandler,
238                 initialWindowSize: initialWindowSize,
239         }
240         if opts.InitialWindowSize >= defaultWindowSize {
241                 t.initialWindowSize = opts.InitialWindowSize
242                 dynamicWindow = false
243         }
244         if dynamicWindow {
245                 t.bdpEst = &bdpEstimator{
246                         bdp:               initialWindowSize,
247                         updateFlowControl: t.updateFlowControl,
248                 }
249         }
250         // Make sure awakenKeepalive can't be written upon.
251         // keepalive routine will make it writable, if need be.
252         t.awakenKeepalive <- struct{}{}
253         if t.statsHandler != nil {
254                 t.ctx = t.statsHandler.TagConn(t.ctx, &stats.ConnTagInfo{
255                         RemoteAddr: t.remoteAddr,
256                         LocalAddr:  t.localAddr,
257                 })
258                 connBegin := &stats.ConnBegin{
259                         Client: true,
260                 }
261                 t.statsHandler.HandleConn(t.ctx, connBegin)
262         }
263         // Start the reader goroutine for incoming message. Each transport has
264         // a dedicated goroutine which reads HTTP2 frame from network. Then it
265         // dispatches the frame to the corresponding stream entity.
266         go t.reader()
267         // Send connection preface to server.
268         n, err := t.conn.Write(clientPreface)
269         if err != nil {
270                 t.Close()
271                 return nil, connectionErrorf(true, err, "transport: failed to write client preface: %v", err)
272         }
273         if n != len(clientPreface) {
274                 t.Close()
275                 return nil, connectionErrorf(true, err, "transport: preface mismatch, wrote %d bytes; want %d", n, len(clientPreface))
276         }
277         if t.initialWindowSize != defaultWindowSize {
278                 err = t.framer.fr.WriteSettings(http2.Setting{
279                         ID:  http2.SettingInitialWindowSize,
280                         Val: uint32(t.initialWindowSize),
281                 })
282         } else {
283                 err = t.framer.fr.WriteSettings()
284         }
285         if err != nil {
286                 t.Close()
287                 return nil, connectionErrorf(true, err, "transport: failed to write initial settings frame: %v", err)
288         }
289         // Adjust the connection flow control window if needed.
290         if delta := uint32(icwz - defaultWindowSize); delta > 0 {
291                 if err := t.framer.fr.WriteWindowUpdate(0, delta); err != nil {
292                         t.Close()
293                         return nil, connectionErrorf(true, err, "transport: failed to write window update: %v", err)
294                 }
295         }
296         t.framer.writer.Flush()
297         go func() {
298                 loopyWriter(t.ctx, t.controlBuf, t.itemHandler)
299                 t.Close()
300         }()
301         if t.kp.Time != infinity {
302                 go t.keepalive()
303         }
304         return t, nil
305 }
306
307 func (t *http2Client) newStream(ctx context.Context, callHdr *CallHdr) *Stream {
308         // TODO(zhaoq): Handle uint32 overflow of Stream.id.
309         s := &Stream{
310                 id:             t.nextID,
311                 done:           make(chan struct{}),
312                 goAway:         make(chan struct{}),
313                 method:         callHdr.Method,
314                 sendCompress:   callHdr.SendCompress,
315                 buf:            newRecvBuffer(),
316                 fc:             &inFlow{limit: uint32(t.initialWindowSize)},
317                 sendQuotaPool:  newQuotaPool(int(t.streamSendQuota)),
318                 localSendQuota: newQuotaPool(defaultLocalSendQuota),
319                 headerChan:     make(chan struct{}),
320         }
321         t.nextID += 2
322         s.requestRead = func(n int) {
323                 t.adjustWindow(s, uint32(n))
324         }
325         // The client side stream context should have exactly the same life cycle with the user provided context.
326         // That means, s.ctx should be read-only. And s.ctx is done iff ctx is done.
327         // So we use the original context here instead of creating a copy.
328         s.ctx = ctx
329         s.trReader = &transportReader{
330                 reader: &recvBufferReader{
331                         ctx:    s.ctx,
332                         goAway: s.goAway,
333                         recv:   s.buf,
334                 },
335                 windowHandler: func(n int) {
336                         t.updateWindow(s, uint32(n))
337                 },
338         }
339
340         return s
341 }
342
343 // NewStream creates a stream and registers it into the transport as "active"
344 // streams.
345 func (t *http2Client) NewStream(ctx context.Context, callHdr *CallHdr) (_ *Stream, err error) {
346         pr := &peer.Peer{
347                 Addr: t.remoteAddr,
348         }
349         // Attach Auth info if there is any.
350         if t.authInfo != nil {
351                 pr.AuthInfo = t.authInfo
352         }
353         ctx = peer.NewContext(ctx, pr)
354         var (
355                 authData = make(map[string]string)
356                 audience string
357         )
358         // Create an audience string only if needed.
359         if len(t.creds) > 0 || callHdr.Creds != nil {
360                 // Construct URI required to get auth request metadata.
361                 // Omit port if it is the default one.
362                 host := strings.TrimSuffix(callHdr.Host, ":443")
363                 pos := strings.LastIndex(callHdr.Method, "/")
364                 if pos == -1 {
365                         pos = len(callHdr.Method)
366                 }
367                 audience = "https://" + host + callHdr.Method[:pos]
368         }
369         for _, c := range t.creds {
370                 data, err := c.GetRequestMetadata(ctx, audience)
371                 if err != nil {
372                         return nil, streamErrorf(codes.Internal, "transport: %v", err)
373                 }
374                 for k, v := range data {
375                         // Capital header names are illegal in HTTP/2.
376                         k = strings.ToLower(k)
377                         authData[k] = v
378                 }
379         }
380         callAuthData := map[string]string{}
381         // Check if credentials.PerRPCCredentials were provided via call options.
382         // Note: if these credentials are provided both via dial options and call
383         // options, then both sets of credentials will be applied.
384         if callCreds := callHdr.Creds; callCreds != nil {
385                 if !t.isSecure && callCreds.RequireTransportSecurity() {
386                         return nil, streamErrorf(codes.Unauthenticated, "transport: cannot send secure credentials on an insecure connection")
387                 }
388                 data, err := callCreds.GetRequestMetadata(ctx, audience)
389                 if err != nil {
390                         return nil, streamErrorf(codes.Internal, "transport: %v", err)
391                 }
392                 for k, v := range data {
393                         // Capital header names are illegal in HTTP/2
394                         k = strings.ToLower(k)
395                         callAuthData[k] = v
396                 }
397         }
398         t.mu.Lock()
399         if t.activeStreams == nil {
400                 t.mu.Unlock()
401                 return nil, ErrConnClosing
402         }
403         if t.state == draining {
404                 t.mu.Unlock()
405                 return nil, ErrStreamDrain
406         }
407         if t.state != reachable {
408                 t.mu.Unlock()
409                 return nil, ErrConnClosing
410         }
411         t.mu.Unlock()
412         sq, err := wait(ctx, t.ctx, nil, nil, t.streamsQuota.acquire())
413         if err != nil {
414                 return nil, err
415         }
416         // Returns the quota balance back.
417         if sq > 1 {
418                 t.streamsQuota.add(sq - 1)
419         }
420         // TODO(mmukhi): Benchmark if the performance gets better if count the metadata and other header fields
421         // first and create a slice of that exact size.
422         // Make the slice of certain predictable size to reduce allocations made by append.
423         hfLen := 7 // :method, :scheme, :path, :authority, content-type, user-agent, te
424         hfLen += len(authData) + len(callAuthData)
425         headerFields := make([]hpack.HeaderField, 0, hfLen)
426         headerFields = append(headerFields, hpack.HeaderField{Name: ":method", Value: "POST"})
427         headerFields = append(headerFields, hpack.HeaderField{Name: ":scheme", Value: t.scheme})
428         headerFields = append(headerFields, hpack.HeaderField{Name: ":path", Value: callHdr.Method})
429         headerFields = append(headerFields, hpack.HeaderField{Name: ":authority", Value: callHdr.Host})
430         headerFields = append(headerFields, hpack.HeaderField{Name: "content-type", Value: "application/grpc"})
431         headerFields = append(headerFields, hpack.HeaderField{Name: "user-agent", Value: t.userAgent})
432         headerFields = append(headerFields, hpack.HeaderField{Name: "te", Value: "trailers"})
433
434         if callHdr.SendCompress != "" {
435                 headerFields = append(headerFields, hpack.HeaderField{Name: "grpc-encoding", Value: callHdr.SendCompress})
436         }
437         if dl, ok := ctx.Deadline(); ok {
438                 // Send out timeout regardless its value. The server can detect timeout context by itself.
439                 // TODO(mmukhi): Perhaps this field should be updated when actually writing out to the wire.
440                 timeout := dl.Sub(time.Now())
441                 headerFields = append(headerFields, hpack.HeaderField{Name: "grpc-timeout", Value: encodeTimeout(timeout)})
442         }
443         for k, v := range authData {
444                 headerFields = append(headerFields, hpack.HeaderField{Name: k, Value: encodeMetadataHeader(k, v)})
445         }
446         for k, v := range callAuthData {
447                 headerFields = append(headerFields, hpack.HeaderField{Name: k, Value: encodeMetadataHeader(k, v)})
448         }
449         if b := stats.OutgoingTags(ctx); b != nil {
450                 headerFields = append(headerFields, hpack.HeaderField{Name: "grpc-tags-bin", Value: encodeBinHeader(b)})
451         }
452         if b := stats.OutgoingTrace(ctx); b != nil {
453                 headerFields = append(headerFields, hpack.HeaderField{Name: "grpc-trace-bin", Value: encodeBinHeader(b)})
454         }
455         if md, ok := metadata.FromOutgoingContext(ctx); ok {
456                 for k, vv := range md {
457                         // HTTP doesn't allow you to set pseudoheaders after non pseudoheaders were set.
458                         if isReservedHeader(k) {
459                                 continue
460                         }
461                         for _, v := range vv {
462                                 headerFields = append(headerFields, hpack.HeaderField{Name: k, Value: encodeMetadataHeader(k, v)})
463                         }
464                 }
465         }
466         if md, ok := t.md.(*metadata.MD); ok {
467                 for k, vv := range *md {
468                         if isReservedHeader(k) {
469                                 continue
470                         }
471                         for _, v := range vv {
472                                 headerFields = append(headerFields, hpack.HeaderField{Name: k, Value: encodeMetadataHeader(k, v)})
473                         }
474                 }
475         }
476         t.mu.Lock()
477         if t.state == draining {
478                 t.mu.Unlock()
479                 t.streamsQuota.add(1)
480                 return nil, ErrStreamDrain
481         }
482         if t.state != reachable {
483                 t.mu.Unlock()
484                 return nil, ErrConnClosing
485         }
486         s := t.newStream(ctx, callHdr)
487         t.activeStreams[s.id] = s
488         // If the number of active streams change from 0 to 1, then check if keepalive
489         // has gone dormant. If so, wake it up.
490         if len(t.activeStreams) == 1 {
491                 select {
492                 case t.awakenKeepalive <- struct{}{}:
493                         t.controlBuf.put(&ping{data: [8]byte{}})
494                         // Fill the awakenKeepalive channel again as this channel must be
495                         // kept non-writable except at the point that the keepalive()
496                         // goroutine is waiting either to be awaken or shutdown.
497                         t.awakenKeepalive <- struct{}{}
498                 default:
499                 }
500         }
501         t.controlBuf.put(&headerFrame{
502                 streamID:  s.id,
503                 hf:        headerFields,
504                 endStream: false,
505         })
506         t.mu.Unlock()
507
508         s.mu.Lock()
509         s.bytesSent = true
510         s.mu.Unlock()
511
512         if t.statsHandler != nil {
513                 outHeader := &stats.OutHeader{
514                         Client:      true,
515                         FullMethod:  callHdr.Method,
516                         RemoteAddr:  t.remoteAddr,
517                         LocalAddr:   t.localAddr,
518                         Compression: callHdr.SendCompress,
519                 }
520                 t.statsHandler.HandleRPC(s.ctx, outHeader)
521         }
522         return s, nil
523 }
524
525 // CloseStream clears the footprint of a stream when the stream is not needed any more.
526 // This must not be executed in reader's goroutine.
527 func (t *http2Client) CloseStream(s *Stream, err error) {
528         t.mu.Lock()
529         if t.activeStreams == nil {
530                 t.mu.Unlock()
531                 return
532         }
533         if err != nil {
534                 // notify in-flight streams, before the deletion
535                 s.write(recvMsg{err: err})
536         }
537         delete(t.activeStreams, s.id)
538         if t.state == draining && len(t.activeStreams) == 0 {
539                 // The transport is draining and s is the last live stream on t.
540                 t.mu.Unlock()
541                 t.Close()
542                 return
543         }
544         t.mu.Unlock()
545         // rstStream is true in case the stream is being closed at the client-side
546         // and the server needs to be intimated about it by sending a RST_STREAM
547         // frame.
548         // To make sure this frame is written to the wire before the headers of the
549         // next stream waiting for streamsQuota, we add to streamsQuota pool only
550         // after having acquired the writableChan to send RST_STREAM out (look at
551         // the controller() routine).
552         var rstStream bool
553         var rstError http2.ErrCode
554         defer func() {
555                 // In case, the client doesn't have to send RST_STREAM to server
556                 // we can safely add back to streamsQuota pool now.
557                 if !rstStream {
558                         t.streamsQuota.add(1)
559                         return
560                 }
561                 t.controlBuf.put(&resetStream{s.id, rstError})
562         }()
563         s.mu.Lock()
564         rstStream = s.rstStream
565         rstError = s.rstError
566         if s.state == streamDone {
567                 s.mu.Unlock()
568                 return
569         }
570         if !s.headerDone {
571                 close(s.headerChan)
572                 s.headerDone = true
573         }
574         s.state = streamDone
575         s.mu.Unlock()
576         if _, ok := err.(StreamError); ok {
577                 rstStream = true
578                 rstError = http2.ErrCodeCancel
579         }
580 }
581
582 // Close kicks off the shutdown process of the transport. This should be called
583 // only once on a transport. Once it is called, the transport should not be
584 // accessed any more.
585 func (t *http2Client) Close() (err error) {
586         t.mu.Lock()
587         if t.state == closing {
588                 t.mu.Unlock()
589                 return
590         }
591         t.state = closing
592         t.mu.Unlock()
593         t.cancel()
594         err = t.conn.Close()
595         t.mu.Lock()
596         streams := t.activeStreams
597         t.activeStreams = nil
598         t.mu.Unlock()
599         // Notify all active streams.
600         for _, s := range streams {
601                 s.mu.Lock()
602                 if !s.headerDone {
603                         close(s.headerChan)
604                         s.headerDone = true
605                 }
606                 s.mu.Unlock()
607                 s.write(recvMsg{err: ErrConnClosing})
608         }
609         if t.statsHandler != nil {
610                 connEnd := &stats.ConnEnd{
611                         Client: true,
612                 }
613                 t.statsHandler.HandleConn(t.ctx, connEnd)
614         }
615         return err
616 }
617
618 // GracefulClose sets the state to draining, which prevents new streams from
619 // being created and causes the transport to be closed when the last active
620 // stream is closed.  If there are no active streams, the transport is closed
621 // immediately.  This does nothing if the transport is already draining or
622 // closing.
623 func (t *http2Client) GracefulClose() error {
624         t.mu.Lock()
625         switch t.state {
626         case closing, draining:
627                 t.mu.Unlock()
628                 return nil
629         }
630         t.state = draining
631         active := len(t.activeStreams)
632         t.mu.Unlock()
633         if active == 0 {
634                 return t.Close()
635         }
636         return nil
637 }
638
639 // Write formats the data into HTTP2 data frame(s) and sends it out. The caller
640 // should proceed only if Write returns nil.
641 func (t *http2Client) Write(s *Stream, hdr []byte, data []byte, opts *Options) error {
642         select {
643         case <-s.ctx.Done():
644                 return ContextErr(s.ctx.Err())
645         case <-t.ctx.Done():
646                 return ErrConnClosing
647         default:
648         }
649
650         if hdr == nil && data == nil && opts.Last {
651                 // stream.CloseSend uses this to send an empty frame with endStream=True
652                 t.controlBuf.put(&dataFrame{streamID: s.id, endStream: true, f: func() {}})
653                 return nil
654         }
655         // Add data to header frame so that we can equally distribute data across frames.
656         emptyLen := http2MaxFrameLen - len(hdr)
657         if emptyLen > len(data) {
658                 emptyLen = len(data)
659         }
660         hdr = append(hdr, data[:emptyLen]...)
661         data = data[emptyLen:]
662         var (
663                 streamQuota    int
664                 streamQuotaVer uint32
665                 localSendQuota int
666                 err            error
667                 sqChan         <-chan int
668         )
669         for idx, r := range [][]byte{hdr, data} {
670                 for len(r) > 0 {
671                         size := http2MaxFrameLen
672                         if size > len(r) {
673                                 size = len(r)
674                         }
675                         if streamQuota == 0 { // Used up all the locally cached stream quota.
676                                 sqChan, streamQuotaVer = s.sendQuotaPool.acquireWithVersion()
677                                 // Wait until the stream has some quota to send the data.
678                                 streamQuota, err = wait(s.ctx, t.ctx, s.done, s.goAway, sqChan)
679                                 if err != nil {
680                                         return err
681                                 }
682                         }
683                         if localSendQuota <= 0 { // Being a soft limit, it can go negative.
684                                 // Acquire local send quota to be able to write to the controlBuf.
685                                 localSendQuota, err = wait(s.ctx, t.ctx, s.done, s.goAway, s.localSendQuota.acquire())
686                                 if err != nil {
687                                         return err
688                                 }
689                         }
690                         if size > streamQuota {
691                                 size = streamQuota
692                         } // No need to do that for localSendQuota since that's only a soft limit.
693                         // Wait until the transport has some quota to send the data.
694                         tq, err := wait(s.ctx, t.ctx, s.done, s.goAway, t.sendQuotaPool.acquire())
695                         if err != nil {
696                                 return err
697                         }
698                         if tq < size {
699                                 size = tq
700                         }
701                         if tq > size { // Overbooked transport quota. Return it back.
702                                 t.sendQuotaPool.add(tq - size)
703                         }
704                         streamQuota -= size
705                         localSendQuota -= size
706                         p := r[:size]
707                         var endStream bool
708                         // See if this is the last frame to be written.
709                         if opts.Last {
710                                 if len(r)-size == 0 { // No more data in r after this iteration.
711                                         if idx == 0 { // We're writing data header.
712                                                 if len(data) == 0 { // There's no data to follow.
713                                                         endStream = true
714                                                 }
715                                         } else { // We're writing data.
716                                                 endStream = true
717                                         }
718                                 }
719                         }
720                         success := func() {
721                                 sz := size
722                                 t.controlBuf.put(&dataFrame{streamID: s.id, endStream: endStream, d: p, f: func() { s.localSendQuota.add(sz) }})
723                                 r = r[size:]
724                         }
725                         failure := func() { // The stream quota version must have changed.
726                                 // Our streamQuota cache is invalidated now, so give it back.
727                                 s.sendQuotaPool.lockedAdd(streamQuota + size)
728                         }
729                         if !s.sendQuotaPool.compareAndExecute(streamQuotaVer, success, failure) {
730                                 // Couldn't send this chunk out.
731                                 t.sendQuotaPool.add(size)
732                                 localSendQuota += size
733                                 streamQuota = 0
734                         }
735                 }
736         }
737         if streamQuota > 0 { // Add the left over quota back to stream.
738                 s.sendQuotaPool.add(streamQuota)
739         }
740         if localSendQuota > 0 {
741                 s.localSendQuota.add(localSendQuota)
742         }
743         if !opts.Last {
744                 return nil
745         }
746         s.mu.Lock()
747         if s.state != streamDone {
748                 s.state = streamWriteDone
749         }
750         s.mu.Unlock()
751         return nil
752 }
753
754 func (t *http2Client) getStream(f http2.Frame) (*Stream, bool) {
755         t.mu.Lock()
756         defer t.mu.Unlock()
757         s, ok := t.activeStreams[f.Header().StreamID]
758         return s, ok
759 }
760
761 // adjustWindow sends out extra window update over the initial window size
762 // of stream if the application is requesting data larger in size than
763 // the window.
764 func (t *http2Client) adjustWindow(s *Stream, n uint32) {
765         s.mu.Lock()
766         defer s.mu.Unlock()
767         if s.state == streamDone {
768                 return
769         }
770         if w := s.fc.maybeAdjust(n); w > 0 {
771                 // Piggyback connection's window update along.
772                 if cw := t.fc.resetPendingUpdate(); cw > 0 {
773                         t.controlBuf.put(&windowUpdate{0, cw})
774                 }
775                 t.controlBuf.put(&windowUpdate{s.id, w})
776         }
777 }
778
779 // updateWindow adjusts the inbound quota for the stream and the transport.
780 // Window updates will deliver to the controller for sending when
781 // the cumulative quota exceeds the corresponding threshold.
782 func (t *http2Client) updateWindow(s *Stream, n uint32) {
783         s.mu.Lock()
784         defer s.mu.Unlock()
785         if s.state == streamDone {
786                 return
787         }
788         if w := s.fc.onRead(n); w > 0 {
789                 if cw := t.fc.resetPendingUpdate(); cw > 0 {
790                         t.controlBuf.put(&windowUpdate{0, cw})
791                 }
792                 t.controlBuf.put(&windowUpdate{s.id, w})
793         }
794 }
795
796 // updateFlowControl updates the incoming flow control windows
797 // for the transport and the stream based on the current bdp
798 // estimation.
799 func (t *http2Client) updateFlowControl(n uint32) {
800         t.mu.Lock()
801         for _, s := range t.activeStreams {
802                 s.fc.newLimit(n)
803         }
804         t.initialWindowSize = int32(n)
805         t.mu.Unlock()
806         t.controlBuf.put(&windowUpdate{0, t.fc.newLimit(n)})
807         t.controlBuf.put(&settings{
808                 ack: false,
809                 ss: []http2.Setting{
810                         {
811                                 ID:  http2.SettingInitialWindowSize,
812                                 Val: uint32(n),
813                         },
814                 },
815         })
816 }
817
818 func (t *http2Client) handleData(f *http2.DataFrame) {
819         size := f.Header().Length
820         var sendBDPPing bool
821         if t.bdpEst != nil {
822                 sendBDPPing = t.bdpEst.add(uint32(size))
823         }
824         // Decouple connection's flow control from application's read.
825         // An update on connection's flow control should not depend on
826         // whether user application has read the data or not. Such a
827         // restriction is already imposed on the stream's flow control,
828         // and therefore the sender will be blocked anyways.
829         // Decoupling the connection flow control will prevent other
830         // active(fast) streams from starving in presence of slow or
831         // inactive streams.
832         //
833         // Furthermore, if a bdpPing is being sent out we can piggyback
834         // connection's window update for the bytes we just received.
835         if sendBDPPing {
836                 if size != 0 { // Could've been an empty data frame.
837                         t.controlBuf.put(&windowUpdate{0, uint32(size)})
838                 }
839                 t.controlBuf.put(bdpPing)
840         } else {
841                 if err := t.fc.onData(uint32(size)); err != nil {
842                         t.Close()
843                         return
844                 }
845                 if w := t.fc.onRead(uint32(size)); w > 0 {
846                         t.controlBuf.put(&windowUpdate{0, w})
847                 }
848         }
849         // Select the right stream to dispatch.
850         s, ok := t.getStream(f)
851         if !ok {
852                 return
853         }
854         if size > 0 {
855                 s.mu.Lock()
856                 if s.state == streamDone {
857                         s.mu.Unlock()
858                         return
859                 }
860                 if err := s.fc.onData(uint32(size)); err != nil {
861                         s.rstStream = true
862                         s.rstError = http2.ErrCodeFlowControl
863                         s.finish(status.New(codes.Internal, err.Error()))
864                         s.mu.Unlock()
865                         s.write(recvMsg{err: io.EOF})
866                         return
867                 }
868                 if f.Header().Flags.Has(http2.FlagDataPadded) {
869                         if w := s.fc.onRead(uint32(size) - uint32(len(f.Data()))); w > 0 {
870                                 t.controlBuf.put(&windowUpdate{s.id, w})
871                         }
872                 }
873                 s.mu.Unlock()
874                 // TODO(bradfitz, zhaoq): A copy is required here because there is no
875                 // guarantee f.Data() is consumed before the arrival of next frame.
876                 // Can this copy be eliminated?
877                 if len(f.Data()) > 0 {
878                         data := make([]byte, len(f.Data()))
879                         copy(data, f.Data())
880                         s.write(recvMsg{data: data})
881                 }
882         }
883         // The server has closed the stream without sending trailers.  Record that
884         // the read direction is closed, and set the status appropriately.
885         if f.FrameHeader.Flags.Has(http2.FlagDataEndStream) {
886                 s.mu.Lock()
887                 if s.state == streamDone {
888                         s.mu.Unlock()
889                         return
890                 }
891                 s.finish(status.New(codes.Internal, "server closed the stream without sending trailers"))
892                 s.mu.Unlock()
893                 s.write(recvMsg{err: io.EOF})
894         }
895 }
896
897 func (t *http2Client) handleRSTStream(f *http2.RSTStreamFrame) {
898         s, ok := t.getStream(f)
899         if !ok {
900                 return
901         }
902         s.mu.Lock()
903         if s.state == streamDone {
904                 s.mu.Unlock()
905                 return
906         }
907         if !s.headerDone {
908                 close(s.headerChan)
909                 s.headerDone = true
910         }
911         statusCode, ok := http2ErrConvTab[http2.ErrCode(f.ErrCode)]
912         if !ok {
913                 warningf("transport: http2Client.handleRSTStream found no mapped gRPC status for the received http2 error %v", f.ErrCode)
914                 statusCode = codes.Unknown
915         }
916         s.finish(status.Newf(statusCode, "stream terminated by RST_STREAM with error code: %v", f.ErrCode))
917         s.mu.Unlock()
918         s.write(recvMsg{err: io.EOF})
919 }
920
921 func (t *http2Client) handleSettings(f *http2.SettingsFrame, isFirst bool) {
922         if f.IsAck() {
923                 return
924         }
925         var ss []http2.Setting
926         isMaxConcurrentStreamsMissing := true
927         f.ForeachSetting(func(s http2.Setting) error {
928                 if s.ID == http2.SettingMaxConcurrentStreams {
929                         isMaxConcurrentStreamsMissing = false
930                 }
931                 ss = append(ss, s)
932                 return nil
933         })
934         if isFirst && isMaxConcurrentStreamsMissing {
935                 // This means server is imposing no limits on
936                 // maximum number of concurrent streams initiated by client.
937                 // So we must remove our self-imposed limit.
938                 ss = append(ss, http2.Setting{
939                         ID:  http2.SettingMaxConcurrentStreams,
940                         Val: math.MaxUint32,
941                 })
942         }
943         // The settings will be applied once the ack is sent.
944         t.controlBuf.put(&settings{ack: true, ss: ss})
945 }
946
947 func (t *http2Client) handlePing(f *http2.PingFrame) {
948         if f.IsAck() {
949                 // Maybe it's a BDP ping.
950                 if t.bdpEst != nil {
951                         t.bdpEst.calculate(f.Data)
952                 }
953                 return
954         }
955         pingAck := &ping{ack: true}
956         copy(pingAck.data[:], f.Data[:])
957         t.controlBuf.put(pingAck)
958 }
959
960 func (t *http2Client) handleGoAway(f *http2.GoAwayFrame) {
961         t.mu.Lock()
962         if t.state != reachable && t.state != draining {
963                 t.mu.Unlock()
964                 return
965         }
966         if f.ErrCode == http2.ErrCodeEnhanceYourCalm {
967                 infof("Client received GoAway with http2.ErrCodeEnhanceYourCalm.")
968         }
969         id := f.LastStreamID
970         if id > 0 && id%2 != 1 {
971                 t.mu.Unlock()
972                 t.Close()
973                 return
974         }
975         // A client can receive multiple GoAways from server (look at https://github.com/grpc/grpc-go/issues/1387).
976         // The idea is that the first GoAway will be sent with an ID of MaxInt32 and the second GoAway will be sent after an RTT delay
977         // with the ID of the last stream the server will process.
978         // Therefore, when we get the first GoAway we don't really close any streams. While in case of second GoAway we
979         // close all streams created after the second GoAwayId. This way streams that were in-flight while the GoAway from server
980         // was being sent don't get killed.
981         select {
982         case <-t.goAway: // t.goAway has been closed (i.e.,multiple GoAways).
983                 // If there are multiple GoAways the first one should always have an ID greater than the following ones.
984                 if id > t.prevGoAwayID {
985                         t.mu.Unlock()
986                         t.Close()
987                         return
988                 }
989         default:
990                 t.setGoAwayReason(f)
991                 close(t.goAway)
992                 t.state = draining
993         }
994         // All streams with IDs greater than the GoAwayId
995         // and smaller than the previous GoAway ID should be killed.
996         upperLimit := t.prevGoAwayID
997         if upperLimit == 0 { // This is the first GoAway Frame.
998                 upperLimit = math.MaxUint32 // Kill all streams after the GoAway ID.
999         }
1000         for streamID, stream := range t.activeStreams {
1001                 if streamID > id && streamID <= upperLimit {
1002                         close(stream.goAway)
1003                 }
1004         }
1005         t.prevGoAwayID = id
1006         active := len(t.activeStreams)
1007         t.mu.Unlock()
1008         if active == 0 {
1009                 t.Close()
1010         }
1011 }
1012
1013 // setGoAwayReason sets the value of t.goAwayReason based
1014 // on the GoAway frame received.
1015 // It expects a lock on transport's mutext to be held by
1016 // the caller.
1017 func (t *http2Client) setGoAwayReason(f *http2.GoAwayFrame) {
1018         t.goAwayReason = NoReason
1019         switch f.ErrCode {
1020         case http2.ErrCodeEnhanceYourCalm:
1021                 if string(f.DebugData()) == "too_many_pings" {
1022                         t.goAwayReason = TooManyPings
1023                 }
1024         }
1025 }
1026
1027 func (t *http2Client) GetGoAwayReason() GoAwayReason {
1028         t.mu.Lock()
1029         defer t.mu.Unlock()
1030         return t.goAwayReason
1031 }
1032
1033 func (t *http2Client) handleWindowUpdate(f *http2.WindowUpdateFrame) {
1034         id := f.Header().StreamID
1035         incr := f.Increment
1036         if id == 0 {
1037                 t.sendQuotaPool.add(int(incr))
1038                 return
1039         }
1040         if s, ok := t.getStream(f); ok {
1041                 s.sendQuotaPool.add(int(incr))
1042         }
1043 }
1044
1045 // operateHeaders takes action on the decoded headers.
1046 func (t *http2Client) operateHeaders(frame *http2.MetaHeadersFrame) {
1047         s, ok := t.getStream(frame)
1048         if !ok {
1049                 return
1050         }
1051         s.mu.Lock()
1052         s.bytesReceived = true
1053         s.mu.Unlock()
1054         var state decodeState
1055         if err := state.decodeResponseHeader(frame); err != nil {
1056                 s.mu.Lock()
1057                 if !s.headerDone {
1058                         close(s.headerChan)
1059                         s.headerDone = true
1060                 }
1061                 s.mu.Unlock()
1062                 s.write(recvMsg{err: err})
1063                 // Something wrong. Stops reading even when there is remaining.
1064                 return
1065         }
1066
1067         endStream := frame.StreamEnded()
1068         var isHeader bool
1069         defer func() {
1070                 if t.statsHandler != nil {
1071                         if isHeader {
1072                                 inHeader := &stats.InHeader{
1073                                         Client:     true,
1074                                         WireLength: int(frame.Header().Length),
1075                                 }
1076                                 t.statsHandler.HandleRPC(s.ctx, inHeader)
1077                         } else {
1078                                 inTrailer := &stats.InTrailer{
1079                                         Client:     true,
1080                                         WireLength: int(frame.Header().Length),
1081                                 }
1082                                 t.statsHandler.HandleRPC(s.ctx, inTrailer)
1083                         }
1084                 }
1085         }()
1086
1087         s.mu.Lock()
1088         if !endStream {
1089                 s.recvCompress = state.encoding
1090         }
1091         if !s.headerDone {
1092                 if !endStream && len(state.mdata) > 0 {
1093                         s.header = state.mdata
1094                 }
1095                 close(s.headerChan)
1096                 s.headerDone = true
1097                 isHeader = true
1098         }
1099         if !endStream || s.state == streamDone {
1100                 s.mu.Unlock()
1101                 return
1102         }
1103
1104         if len(state.mdata) > 0 {
1105                 s.trailer = state.mdata
1106         }
1107         s.finish(state.status())
1108         s.mu.Unlock()
1109         s.write(recvMsg{err: io.EOF})
1110 }
1111
1112 func handleMalformedHTTP2(s *Stream, err error) {
1113         s.mu.Lock()
1114         if !s.headerDone {
1115                 close(s.headerChan)
1116                 s.headerDone = true
1117         }
1118         s.mu.Unlock()
1119         s.write(recvMsg{err: err})
1120 }
1121
1122 // reader runs as a separate goroutine in charge of reading data from network
1123 // connection.
1124 //
1125 // TODO(zhaoq): currently one reader per transport. Investigate whether this is
1126 // optimal.
1127 // TODO(zhaoq): Check the validity of the incoming frame sequence.
1128 func (t *http2Client) reader() {
1129         // Check the validity of server preface.
1130         frame, err := t.framer.fr.ReadFrame()
1131         if err != nil {
1132                 t.Close()
1133                 return
1134         }
1135         atomic.CompareAndSwapUint32(&t.activity, 0, 1)
1136         sf, ok := frame.(*http2.SettingsFrame)
1137         if !ok {
1138                 t.Close()
1139                 return
1140         }
1141         t.handleSettings(sf, true)
1142
1143         // loop to keep reading incoming messages on this transport.
1144         for {
1145                 frame, err := t.framer.fr.ReadFrame()
1146                 atomic.CompareAndSwapUint32(&t.activity, 0, 1)
1147                 if err != nil {
1148                         // Abort an active stream if the http2.Framer returns a
1149                         // http2.StreamError. This can happen only if the server's response
1150                         // is malformed http2.
1151                         if se, ok := err.(http2.StreamError); ok {
1152                                 t.mu.Lock()
1153                                 s := t.activeStreams[se.StreamID]
1154                                 t.mu.Unlock()
1155                                 if s != nil {
1156                                         // use error detail to provide better err message
1157                                         handleMalformedHTTP2(s, streamErrorf(http2ErrConvTab[se.Code], "%v", t.framer.fr.ErrorDetail()))
1158                                 }
1159                                 continue
1160                         } else {
1161                                 // Transport error.
1162                                 t.Close()
1163                                 return
1164                         }
1165                 }
1166                 switch frame := frame.(type) {
1167                 case *http2.MetaHeadersFrame:
1168                         t.operateHeaders(frame)
1169                 case *http2.DataFrame:
1170                         t.handleData(frame)
1171                 case *http2.RSTStreamFrame:
1172                         t.handleRSTStream(frame)
1173                 case *http2.SettingsFrame:
1174                         t.handleSettings(frame, false)
1175                 case *http2.PingFrame:
1176                         t.handlePing(frame)
1177                 case *http2.GoAwayFrame:
1178                         t.handleGoAway(frame)
1179                 case *http2.WindowUpdateFrame:
1180                         t.handleWindowUpdate(frame)
1181                 default:
1182                         errorf("transport: http2Client.reader got unhandled frame type %v.", frame)
1183                 }
1184         }
1185 }
1186
1187 func (t *http2Client) applySettings(ss []http2.Setting) {
1188         for _, s := range ss {
1189                 switch s.ID {
1190                 case http2.SettingMaxConcurrentStreams:
1191                         // TODO(zhaoq): This is a hack to avoid significant refactoring of the
1192                         // code to deal with the unrealistic int32 overflow. Probably will try
1193                         // to find a better way to handle this later.
1194                         if s.Val > math.MaxInt32 {
1195                                 s.Val = math.MaxInt32
1196                         }
1197                         t.mu.Lock()
1198                         ms := t.maxStreams
1199                         t.maxStreams = int(s.Val)
1200                         t.mu.Unlock()
1201                         t.streamsQuota.add(int(s.Val) - ms)
1202                 case http2.SettingInitialWindowSize:
1203                         t.mu.Lock()
1204                         for _, stream := range t.activeStreams {
1205                                 // Adjust the sending quota for each stream.
1206                                 stream.sendQuotaPool.addAndUpdate(int(s.Val) - int(t.streamSendQuota))
1207                         }
1208                         t.streamSendQuota = s.Val
1209                         t.mu.Unlock()
1210                 }
1211         }
1212 }
1213
1214 // TODO(mmukhi): A lot of this code(and code in other places in the tranpsort layer)
1215 // is duplicated between the client and the server.
1216 // The transport layer needs to be refactored to take care of this.
1217 func (t *http2Client) itemHandler(i item) error {
1218         var err error
1219         switch i := i.(type) {
1220         case *dataFrame:
1221                 err = t.framer.fr.WriteData(i.streamID, i.endStream, i.d)
1222                 if err == nil {
1223                         i.f()
1224                 }
1225         case *headerFrame:
1226                 t.hBuf.Reset()
1227                 for _, f := range i.hf {
1228                         t.hEnc.WriteField(f)
1229                 }
1230                 endHeaders := false
1231                 first := true
1232                 for !endHeaders {
1233                         size := t.hBuf.Len()
1234                         if size > http2MaxFrameLen {
1235                                 size = http2MaxFrameLen
1236                         } else {
1237                                 endHeaders = true
1238                         }
1239                         if first {
1240                                 first = false
1241                                 err = t.framer.fr.WriteHeaders(http2.HeadersFrameParam{
1242                                         StreamID:      i.streamID,
1243                                         BlockFragment: t.hBuf.Next(size),
1244                                         EndStream:     i.endStream,
1245                                         EndHeaders:    endHeaders,
1246                                 })
1247                         } else {
1248                                 err = t.framer.fr.WriteContinuation(
1249                                         i.streamID,
1250                                         endHeaders,
1251                                         t.hBuf.Next(size),
1252                                 )
1253                         }
1254                         if err != nil {
1255                                 return err
1256                         }
1257                 }
1258         case *windowUpdate:
1259                 err = t.framer.fr.WriteWindowUpdate(i.streamID, i.increment)
1260         case *settings:
1261                 if i.ack {
1262                         t.applySettings(i.ss)
1263                         err = t.framer.fr.WriteSettingsAck()
1264                 } else {
1265                         err = t.framer.fr.WriteSettings(i.ss...)
1266                 }
1267         case *resetStream:
1268                 // If the server needs to be to intimated about stream closing,
1269                 // then we need to make sure the RST_STREAM frame is written to
1270                 // the wire before the headers of the next stream waiting on
1271                 // streamQuota. We ensure this by adding to the streamsQuota pool
1272                 // only after having acquired the writableChan to send RST_STREAM.
1273                 err = t.framer.fr.WriteRSTStream(i.streamID, i.code)
1274                 t.streamsQuota.add(1)
1275         case *flushIO:
1276                 err = t.framer.writer.Flush()
1277         case *ping:
1278                 if !i.ack {
1279                         t.bdpEst.timesnap(i.data)
1280                 }
1281                 err = t.framer.fr.WritePing(i.ack, i.data)
1282         default:
1283                 errorf("transport: http2Client.controller got unexpected item type %v", i)
1284         }
1285         return err
1286 }
1287
1288 // keepalive running in a separate goroutune makes sure the connection is alive by sending pings.
1289 func (t *http2Client) keepalive() {
1290         p := &ping{data: [8]byte{}}
1291         timer := time.NewTimer(t.kp.Time)
1292         for {
1293                 select {
1294                 case <-timer.C:
1295                         if atomic.CompareAndSwapUint32(&t.activity, 1, 0) {
1296                                 timer.Reset(t.kp.Time)
1297                                 continue
1298                         }
1299                         // Check if keepalive should go dormant.
1300                         t.mu.Lock()
1301                         if len(t.activeStreams) < 1 && !t.kp.PermitWithoutStream {
1302                                 // Make awakenKeepalive writable.
1303                                 <-t.awakenKeepalive
1304                                 t.mu.Unlock()
1305                                 select {
1306                                 case <-t.awakenKeepalive:
1307                                         // If the control gets here a ping has been sent
1308                                         // need to reset the timer with keepalive.Timeout.
1309                                 case <-t.ctx.Done():
1310                                         return
1311                                 }
1312                         } else {
1313                                 t.mu.Unlock()
1314                                 // Send ping.
1315                                 t.controlBuf.put(p)
1316                         }
1317
1318                         // By the time control gets here a ping has been sent one way or the other.
1319                         timer.Reset(t.kp.Timeout)
1320                         select {
1321                         case <-timer.C:
1322                                 if atomic.CompareAndSwapUint32(&t.activity, 1, 0) {
1323                                         timer.Reset(t.kp.Time)
1324                                         continue
1325                                 }
1326                                 t.Close()
1327                                 return
1328                         case <-t.ctx.Done():
1329                                 if !timer.Stop() {
1330                                         <-timer.C
1331                                 }
1332                                 return
1333                         }
1334                 case <-t.ctx.Done():
1335                         if !timer.Stop() {
1336                                 <-timer.C
1337                         }
1338                         return
1339                 }
1340         }
1341 }
1342
1343 func (t *http2Client) Error() <-chan struct{} {
1344         return t.ctx.Done()
1345 }
1346
1347 func (t *http2Client) GoAway() <-chan struct{} {
1348         return t.goAway
1349 }