OSDN Git Service

new repo
[bytom/vapor.git] / vendor / google.golang.org / grpc / transport / transport_test.go
1 /*
2  *
3  * Copyright 2014 gRPC authors.
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  *     http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  *
17  */
18
19 package transport
20
21 import (
22         "bufio"
23         "bytes"
24         "encoding/binary"
25         "errors"
26         "fmt"
27         "io"
28         "math"
29         "net"
30         "net/http"
31         "reflect"
32         "strconv"
33         "strings"
34         "sync"
35         "testing"
36         "time"
37
38         "golang.org/x/net/context"
39         "golang.org/x/net/http2"
40         "golang.org/x/net/http2/hpack"
41         "google.golang.org/grpc/codes"
42         "google.golang.org/grpc/keepalive"
43         "google.golang.org/grpc/status"
44 )
45
46 type server struct {
47         lis        net.Listener
48         port       string
49         startedErr chan error // error (or nil) with server start value
50         mu         sync.Mutex
51         conns      map[ServerTransport]bool
52         h          *testStreamHandler
53 }
54
55 var (
56         expectedRequest            = []byte("ping")
57         expectedResponse           = []byte("pong")
58         expectedRequestLarge       = make([]byte, initialWindowSize*2)
59         expectedResponseLarge      = make([]byte, initialWindowSize*2)
60         expectedInvalidHeaderField = "invalid/content-type"
61 )
62
63 type testStreamHandler struct {
64         t      *http2Server
65         notify chan struct{}
66 }
67
68 type hType int
69
70 const (
71         normal hType = iota
72         suspended
73         notifyCall
74         misbehaved
75         encodingRequiredStatus
76         invalidHeaderField
77         delayRead
78         delayWrite
79         pingpong
80 )
81
82 func (h *testStreamHandler) handleStreamAndNotify(s *Stream) {
83         if h.notify == nil {
84                 return
85         }
86         go func() {
87                 select {
88                 case <-h.notify:
89                 default:
90                         close(h.notify)
91                 }
92         }()
93 }
94
95 func (h *testStreamHandler) handleStream(t *testing.T, s *Stream) {
96         req := expectedRequest
97         resp := expectedResponse
98         if s.Method() == "foo.Large" {
99                 req = expectedRequestLarge
100                 resp = expectedResponseLarge
101         }
102         p := make([]byte, len(req))
103         _, err := s.Read(p)
104         if err != nil {
105                 return
106         }
107         if !bytes.Equal(p, req) {
108                 t.Fatalf("handleStream got %v, want %v", p, req)
109         }
110         // send a response back to the client.
111         h.t.Write(s, nil, resp, &Options{})
112         // send the trailer to end the stream.
113         h.t.WriteStatus(s, status.New(codes.OK, ""))
114 }
115
116 func (h *testStreamHandler) handleStreamPingPong(t *testing.T, s *Stream) {
117         header := make([]byte, 5)
118         for {
119                 if _, err := s.Read(header); err != nil {
120                         if err == io.EOF {
121                                 h.t.WriteStatus(s, status.New(codes.OK, ""))
122                                 return
123                         }
124                         t.Fatalf("Error on server while reading data header: %v", err)
125                 }
126                 sz := binary.BigEndian.Uint32(header[1:])
127                 msg := make([]byte, int(sz))
128                 if _, err := s.Read(msg); err != nil {
129                         t.Fatalf("Error on server while reading message: %v", err)
130                 }
131                 buf := make([]byte, sz+5)
132                 buf[0] = byte(0)
133                 binary.BigEndian.PutUint32(buf[1:], uint32(sz))
134                 copy(buf[5:], msg)
135                 h.t.Write(s, nil, buf, &Options{})
136         }
137 }
138
139 func (h *testStreamHandler) handleStreamMisbehave(t *testing.T, s *Stream) {
140         conn, ok := s.ServerTransport().(*http2Server)
141         if !ok {
142                 t.Fatalf("Failed to convert %v to *http2Server", s.ServerTransport())
143         }
144         var sent int
145         p := make([]byte, http2MaxFrameLen)
146         for sent < initialWindowSize {
147                 n := initialWindowSize - sent
148                 // The last message may be smaller than http2MaxFrameLen
149                 if n <= http2MaxFrameLen {
150                         if s.Method() == "foo.Connection" {
151                                 // Violate connection level flow control window of client but do not
152                                 // violate any stream level windows.
153                                 p = make([]byte, n)
154                         } else {
155                                 // Violate stream level flow control window of client.
156                                 p = make([]byte, n+1)
157                         }
158                 }
159                 conn.controlBuf.put(&dataFrame{s.id, false, p, func() {}})
160                 sent += len(p)
161         }
162 }
163
164 func (h *testStreamHandler) handleStreamEncodingRequiredStatus(t *testing.T, s *Stream) {
165         // raw newline is not accepted by http2 framer so it must be encoded.
166         h.t.WriteStatus(s, encodingTestStatus)
167 }
168
169 func (h *testStreamHandler) handleStreamInvalidHeaderField(t *testing.T, s *Stream) {
170         headerFields := []hpack.HeaderField{}
171         headerFields = append(headerFields, hpack.HeaderField{Name: "content-type", Value: expectedInvalidHeaderField})
172         h.t.controlBuf.put(&headerFrame{
173                 streamID:  s.id,
174                 hf:        headerFields,
175                 endStream: false,
176         })
177 }
178
179 func (h *testStreamHandler) handleStreamDelayRead(t *testing.T, s *Stream) {
180         req := expectedRequest
181         resp := expectedResponse
182         if s.Method() == "foo.Large" {
183                 req = expectedRequestLarge
184                 resp = expectedResponseLarge
185         }
186         p := make([]byte, len(req))
187
188         // Wait before reading. Give time to client to start sending
189         // before server starts reading.
190         time.Sleep(2 * time.Second)
191         _, err := s.Read(p)
192         if err != nil {
193                 t.Fatalf("s.Read(_) = _, %v, want _, <nil>", err)
194                 return
195         }
196
197         if !bytes.Equal(p, req) {
198                 t.Fatalf("handleStream got %v, want %v", p, req)
199         }
200         // send a response back to the client.
201         h.t.Write(s, nil, resp, &Options{})
202         // send the trailer to end the stream.
203         h.t.WriteStatus(s, status.New(codes.OK, ""))
204 }
205
206 func (h *testStreamHandler) handleStreamDelayWrite(t *testing.T, s *Stream) {
207         req := expectedRequest
208         resp := expectedResponse
209         if s.Method() == "foo.Large" {
210                 req = expectedRequestLarge
211                 resp = expectedResponseLarge
212         }
213         p := make([]byte, len(req))
214         _, err := s.Read(p)
215         if err != nil {
216                 t.Fatalf("s.Read(_) = _, %v, want _, <nil>", err)
217                 return
218         }
219         if !bytes.Equal(p, req) {
220                 t.Fatalf("handleStream got %v, want %v", p, req)
221         }
222
223         // Wait before sending. Give time to client to start reading
224         // before server starts sending.
225         time.Sleep(2 * time.Second)
226         h.t.Write(s, nil, resp, &Options{})
227         // send the trailer to end the stream.
228         h.t.WriteStatus(s, status.New(codes.OK, ""))
229 }
230
231 // start starts server. Other goroutines should block on s.readyChan for further operations.
232 func (s *server) start(t *testing.T, port int, serverConfig *ServerConfig, ht hType) {
233         var err error
234         if port == 0 {
235                 s.lis, err = net.Listen("tcp", "localhost:0")
236         } else {
237                 s.lis, err = net.Listen("tcp", "localhost:"+strconv.Itoa(port))
238         }
239         if err != nil {
240                 s.startedErr <- fmt.Errorf("failed to listen: %v", err)
241                 return
242         }
243         _, p, err := net.SplitHostPort(s.lis.Addr().String())
244         if err != nil {
245                 s.startedErr <- fmt.Errorf("failed to parse listener address: %v", err)
246                 return
247         }
248         s.port = p
249         s.conns = make(map[ServerTransport]bool)
250         s.startedErr <- nil
251         for {
252                 conn, err := s.lis.Accept()
253                 if err != nil {
254                         return
255                 }
256                 transport, err := NewServerTransport("http2", conn, serverConfig)
257                 if err != nil {
258                         return
259                 }
260                 s.mu.Lock()
261                 if s.conns == nil {
262                         s.mu.Unlock()
263                         transport.Close()
264                         return
265                 }
266                 s.conns[transport] = true
267                 h := &testStreamHandler{t: transport.(*http2Server)}
268                 s.h = h
269                 s.mu.Unlock()
270                 switch ht {
271                 case notifyCall:
272                         go transport.HandleStreams(h.handleStreamAndNotify,
273                                 func(ctx context.Context, _ string) context.Context {
274                                         return ctx
275                                 })
276                 case suspended:
277                         go transport.HandleStreams(func(*Stream) {}, // Do nothing to handle the stream.
278                                 func(ctx context.Context, method string) context.Context {
279                                         return ctx
280                                 })
281                 case misbehaved:
282                         go transport.HandleStreams(func(s *Stream) {
283                                 go h.handleStreamMisbehave(t, s)
284                         }, func(ctx context.Context, method string) context.Context {
285                                 return ctx
286                         })
287                 case encodingRequiredStatus:
288                         go transport.HandleStreams(func(s *Stream) {
289                                 go h.handleStreamEncodingRequiredStatus(t, s)
290                         }, func(ctx context.Context, method string) context.Context {
291                                 return ctx
292                         })
293                 case invalidHeaderField:
294                         go transport.HandleStreams(func(s *Stream) {
295                                 go h.handleStreamInvalidHeaderField(t, s)
296                         }, func(ctx context.Context, method string) context.Context {
297                                 return ctx
298                         })
299                 case delayRead:
300                         go transport.HandleStreams(func(s *Stream) {
301                                 go h.handleStreamDelayRead(t, s)
302                         }, func(ctx context.Context, method string) context.Context {
303                                 return ctx
304                         })
305                 case delayWrite:
306                         go transport.HandleStreams(func(s *Stream) {
307                                 go h.handleStreamDelayWrite(t, s)
308                         }, func(ctx context.Context, method string) context.Context {
309                                 return ctx
310                         })
311                 case pingpong:
312                         go transport.HandleStreams(func(s *Stream) {
313                                 go h.handleStreamPingPong(t, s)
314                         }, func(ctx context.Context, method string) context.Context {
315                                 return ctx
316                         })
317                 default:
318                         go transport.HandleStreams(func(s *Stream) {
319                                 go h.handleStream(t, s)
320                         }, func(ctx context.Context, method string) context.Context {
321                                 return ctx
322                         })
323                 }
324         }
325 }
326
327 func (s *server) wait(t *testing.T, timeout time.Duration) {
328         select {
329         case err := <-s.startedErr:
330                 if err != nil {
331                         t.Fatal(err)
332                 }
333         case <-time.After(timeout):
334                 t.Fatalf("Timed out after %v waiting for server to be ready", timeout)
335         }
336 }
337
338 func (s *server) stop() {
339         s.lis.Close()
340         s.mu.Lock()
341         for c := range s.conns {
342                 c.Close()
343         }
344         s.conns = nil
345         s.mu.Unlock()
346 }
347
348 func setUp(t *testing.T, port int, maxStreams uint32, ht hType) (*server, ClientTransport) {
349         return setUpWithOptions(t, port, &ServerConfig{MaxStreams: maxStreams}, ht, ConnectOptions{})
350 }
351
352 func setUpWithOptions(t *testing.T, port int, serverConfig *ServerConfig, ht hType, copts ConnectOptions) (*server, ClientTransport) {
353         server := &server{startedErr: make(chan error, 1)}
354         go server.start(t, port, serverConfig, ht)
355         server.wait(t, 2*time.Second)
356         addr := "localhost:" + server.port
357         var (
358                 ct      ClientTransport
359                 connErr error
360         )
361         target := TargetInfo{
362                 Addr: addr,
363         }
364         ct, connErr = NewClientTransport(context.Background(), target, copts, 2*time.Second)
365         if connErr != nil {
366                 t.Fatalf("failed to create transport: %v", connErr)
367         }
368         return server, ct
369 }
370
371 func setUpWithNoPingServer(t *testing.T, copts ConnectOptions, done chan net.Conn) ClientTransport {
372         lis, err := net.Listen("tcp", "localhost:0")
373         if err != nil {
374                 t.Fatalf("Failed to listen: %v", err)
375         }
376         // Launch a non responsive server.
377         go func() {
378                 defer lis.Close()
379                 conn, err := lis.Accept()
380                 if err != nil {
381                         t.Errorf("Error at server-side while accepting: %v", err)
382                         close(done)
383                         return
384                 }
385                 done <- conn
386         }()
387         tr, err := NewClientTransport(context.Background(), TargetInfo{Addr: lis.Addr().String()}, copts, 2*time.Second)
388         if err != nil {
389                 // Server clean-up.
390                 lis.Close()
391                 if conn, ok := <-done; ok {
392                         conn.Close()
393                 }
394                 t.Fatalf("Failed to dial: %v", err)
395         }
396         return tr
397 }
398
399 // TestInflightStreamClosing ensures that closing in-flight stream
400 // sends StreamError to concurrent stream reader.
401 func TestInflightStreamClosing(t *testing.T) {
402         serverConfig := &ServerConfig{}
403         server, client := setUpWithOptions(t, 0, serverConfig, suspended, ConnectOptions{})
404         defer server.stop()
405         defer client.Close()
406
407         stream, err := client.NewStream(context.Background(), &CallHdr{})
408         if err != nil {
409                 t.Fatalf("Client failed to create RPC request: %v", err)
410         }
411
412         donec := make(chan struct{})
413         serr := StreamError{Desc: "client connection is closing"}
414         go func() {
415                 defer close(donec)
416                 if _, err := stream.Read(make([]byte, defaultWindowSize)); err != serr {
417                         t.Errorf("unexpected Stream error %v, expected %v", err, serr)
418                 }
419         }()
420
421         // should unblock concurrent stream.Read
422         client.CloseStream(stream, serr)
423
424         // wait for stream.Read error
425         timeout := time.NewTimer(5 * time.Second)
426         select {
427         case <-donec:
428                 if !timeout.Stop() {
429                         <-timeout.C
430                 }
431         case <-timeout.C:
432                 t.Fatalf("Test timed out, expected a StreamError.")
433         }
434 }
435
436 // TestMaxConnectionIdle tests that a server will send GoAway to a idle client.
437 // An idle client is one who doesn't make any RPC calls for a duration of
438 // MaxConnectionIdle time.
439 func TestMaxConnectionIdle(t *testing.T) {
440         serverConfig := &ServerConfig{
441                 KeepaliveParams: keepalive.ServerParameters{
442                         MaxConnectionIdle: 2 * time.Second,
443                 },
444         }
445         server, client := setUpWithOptions(t, 0, serverConfig, suspended, ConnectOptions{})
446         defer server.stop()
447         defer client.Close()
448         stream, err := client.NewStream(context.Background(), &CallHdr{Flush: true})
449         if err != nil {
450                 t.Fatalf("Client failed to create RPC request: %v", err)
451         }
452         stream.mu.Lock()
453         stream.rstStream = true
454         stream.mu.Unlock()
455         client.CloseStream(stream, nil)
456         // wait for server to see that closed stream and max-age logic to send goaway after no new RPCs are mode
457         timeout := time.NewTimer(time.Second * 4)
458         select {
459         case <-client.GoAway():
460                 if !timeout.Stop() {
461                         <-timeout.C
462                 }
463         case <-timeout.C:
464                 t.Fatalf("Test timed out, expected a GoAway from the server.")
465         }
466 }
467
468 // TestMaxConenctionIdleNegative tests that a server will not send GoAway to a non-idle(busy) client.
469 func TestMaxConnectionIdleNegative(t *testing.T) {
470         serverConfig := &ServerConfig{
471                 KeepaliveParams: keepalive.ServerParameters{
472                         MaxConnectionIdle: 2 * time.Second,
473                 },
474         }
475         server, client := setUpWithOptions(t, 0, serverConfig, suspended, ConnectOptions{})
476         defer server.stop()
477         defer client.Close()
478         _, err := client.NewStream(context.Background(), &CallHdr{Flush: true})
479         if err != nil {
480                 t.Fatalf("Client failed to create RPC request: %v", err)
481         }
482         timeout := time.NewTimer(time.Second * 4)
483         select {
484         case <-client.GoAway():
485                 if !timeout.Stop() {
486                         <-timeout.C
487                 }
488                 t.Fatalf("A non-idle client received a GoAway.")
489         case <-timeout.C:
490         }
491
492 }
493
494 // TestMaxConnectionAge tests that a server will send GoAway after a duration of MaxConnectionAge.
495 func TestMaxConnectionAge(t *testing.T) {
496         serverConfig := &ServerConfig{
497                 KeepaliveParams: keepalive.ServerParameters{
498                         MaxConnectionAge: 2 * time.Second,
499                 },
500         }
501         server, client := setUpWithOptions(t, 0, serverConfig, suspended, ConnectOptions{})
502         defer server.stop()
503         defer client.Close()
504         _, err := client.NewStream(context.Background(), &CallHdr{})
505         if err != nil {
506                 t.Fatalf("Client failed to create stream: %v", err)
507         }
508         // Wait for max-age logic to send GoAway.
509         timeout := time.NewTimer(4 * time.Second)
510         select {
511         case <-client.GoAway():
512                 if !timeout.Stop() {
513                         <-timeout.C
514                 }
515         case <-timeout.C:
516                 t.Fatalf("Test timer out, expected a GoAway from the server.")
517         }
518 }
519
520 // TestKeepaliveServer tests that a server closes connection with a client that doesn't respond to keepalive pings.
521 func TestKeepaliveServer(t *testing.T) {
522         serverConfig := &ServerConfig{
523                 KeepaliveParams: keepalive.ServerParameters{
524                         Time:    2 * time.Second,
525                         Timeout: 1 * time.Second,
526                 },
527         }
528         server, c := setUpWithOptions(t, 0, serverConfig, suspended, ConnectOptions{})
529         defer server.stop()
530         defer c.Close()
531         client, err := net.Dial("tcp", server.lis.Addr().String())
532         if err != nil {
533                 t.Fatalf("Failed to dial: %v", err)
534         }
535         defer client.Close()
536         // Set read deadline on client conn so that it doesn't block forever in errorsome cases.
537         client.SetReadDeadline(time.Now().Add(10 * time.Second))
538         // Wait for keepalive logic to close the connection.
539         time.Sleep(4 * time.Second)
540         b := make([]byte, 24)
541         for {
542                 _, err = client.Read(b)
543                 if err == nil {
544                         continue
545                 }
546                 if err != io.EOF {
547                         t.Fatalf("client.Read(_) = _,%v, want io.EOF", err)
548                 }
549                 break
550         }
551 }
552
553 // TestKeepaliveServerNegative tests that a server doesn't close connection with a client that responds to keepalive pings.
554 func TestKeepaliveServerNegative(t *testing.T) {
555         serverConfig := &ServerConfig{
556                 KeepaliveParams: keepalive.ServerParameters{
557                         Time:    2 * time.Second,
558                         Timeout: 1 * time.Second,
559                 },
560         }
561         server, client := setUpWithOptions(t, 0, serverConfig, suspended, ConnectOptions{})
562         defer server.stop()
563         defer client.Close()
564         // Give keepalive logic some time by sleeping.
565         time.Sleep(4 * time.Second)
566         // Assert that client is still active.
567         clientTr := client.(*http2Client)
568         clientTr.mu.Lock()
569         defer clientTr.mu.Unlock()
570         if clientTr.state != reachable {
571                 t.Fatalf("Test failed: Expected server-client connection to be healthy.")
572         }
573 }
574
575 func TestKeepaliveClientClosesIdleTransport(t *testing.T) {
576         done := make(chan net.Conn, 1)
577         tr := setUpWithNoPingServer(t, ConnectOptions{KeepaliveParams: keepalive.ClientParameters{
578                 Time:                2 * time.Second, // Keepalive time = 2 sec.
579                 Timeout:             1 * time.Second, // Keepalive timeout = 1 sec.
580                 PermitWithoutStream: true,            // Run keepalive even with no RPCs.
581         }}, done)
582         defer tr.Close()
583         conn, ok := <-done
584         if !ok {
585                 t.Fatalf("Server didn't return connection object")
586         }
587         defer conn.Close()
588         // Sleep for keepalive to close the connection.
589         time.Sleep(4 * time.Second)
590         // Assert that the connection was closed.
591         ct := tr.(*http2Client)
592         ct.mu.Lock()
593         defer ct.mu.Unlock()
594         if ct.state == reachable {
595                 t.Fatalf("Test Failed: Expected client transport to have closed.")
596         }
597 }
598
599 func TestKeepaliveClientStaysHealthyOnIdleTransport(t *testing.T) {
600         done := make(chan net.Conn, 1)
601         tr := setUpWithNoPingServer(t, ConnectOptions{KeepaliveParams: keepalive.ClientParameters{
602                 Time:    2 * time.Second, // Keepalive time = 2 sec.
603                 Timeout: 1 * time.Second, // Keepalive timeout = 1 sec.
604         }}, done)
605         defer tr.Close()
606         conn, ok := <-done
607         if !ok {
608                 t.Fatalf("server didn't reutrn connection object")
609         }
610         defer conn.Close()
611         // Give keepalive some time.
612         time.Sleep(4 * time.Second)
613         // Assert that connections is still healthy.
614         ct := tr.(*http2Client)
615         ct.mu.Lock()
616         defer ct.mu.Unlock()
617         if ct.state != reachable {
618                 t.Fatalf("Test failed: Expected client transport to be healthy.")
619         }
620 }
621
622 func TestKeepaliveClientClosesWithActiveStreams(t *testing.T) {
623         done := make(chan net.Conn, 1)
624         tr := setUpWithNoPingServer(t, ConnectOptions{KeepaliveParams: keepalive.ClientParameters{
625                 Time:    2 * time.Second, // Keepalive time = 2 sec.
626                 Timeout: 1 * time.Second, // Keepalive timeout = 1 sec.
627         }}, done)
628         defer tr.Close()
629         conn, ok := <-done
630         if !ok {
631                 t.Fatalf("Server didn't return connection object")
632         }
633         defer conn.Close()
634         // Create a stream.
635         _, err := tr.NewStream(context.Background(), &CallHdr{Flush: true})
636         if err != nil {
637                 t.Fatalf("Failed to create a new stream: %v", err)
638         }
639         // Give keepalive some time.
640         time.Sleep(4 * time.Second)
641         // Assert that transport was closed.
642         ct := tr.(*http2Client)
643         ct.mu.Lock()
644         defer ct.mu.Unlock()
645         if ct.state == reachable {
646                 t.Fatalf("Test failed: Expected client transport to have closed.")
647         }
648 }
649
650 func TestKeepaliveClientStaysHealthyWithResponsiveServer(t *testing.T) {
651         s, tr := setUpWithOptions(t, 0, &ServerConfig{MaxStreams: math.MaxUint32}, normal, ConnectOptions{KeepaliveParams: keepalive.ClientParameters{
652                 Time:                2 * time.Second, // Keepalive time = 2 sec.
653                 Timeout:             1 * time.Second, // Keepalive timeout = 1 sec.
654                 PermitWithoutStream: true,            // Run keepalive even with no RPCs.
655         }})
656         defer s.stop()
657         defer tr.Close()
658         // Give keep alive some time.
659         time.Sleep(4 * time.Second)
660         // Assert that transport is healthy.
661         ct := tr.(*http2Client)
662         ct.mu.Lock()
663         defer ct.mu.Unlock()
664         if ct.state != reachable {
665                 t.Fatalf("Test failed: Expected client transport to be healthy.")
666         }
667 }
668
669 func TestKeepaliveServerEnforcementWithAbusiveClientNoRPC(t *testing.T) {
670         serverConfig := &ServerConfig{
671                 KeepalivePolicy: keepalive.EnforcementPolicy{
672                         MinTime: 2 * time.Second,
673                 },
674         }
675         clientOptions := ConnectOptions{
676                 KeepaliveParams: keepalive.ClientParameters{
677                         Time:                50 * time.Millisecond,
678                         Timeout:             50 * time.Millisecond,
679                         PermitWithoutStream: true,
680                 },
681         }
682         server, client := setUpWithOptions(t, 0, serverConfig, normal, clientOptions)
683         defer server.stop()
684         defer client.Close()
685
686         timeout := time.NewTimer(2 * time.Second)
687         select {
688         case <-client.GoAway():
689                 if !timeout.Stop() {
690                         <-timeout.C
691                 }
692         case <-timeout.C:
693                 t.Fatalf("Test failed: Expected a GoAway from server.")
694         }
695         time.Sleep(500 * time.Millisecond)
696         ct := client.(*http2Client)
697         ct.mu.Lock()
698         defer ct.mu.Unlock()
699         if ct.state == reachable {
700                 t.Fatalf("Test failed: Expected the connection to be closed.")
701         }
702 }
703
704 func TestKeepaliveServerEnforcementWithAbusiveClientWithRPC(t *testing.T) {
705         serverConfig := &ServerConfig{
706                 KeepalivePolicy: keepalive.EnforcementPolicy{
707                         MinTime: 2 * time.Second,
708                 },
709         }
710         clientOptions := ConnectOptions{
711                 KeepaliveParams: keepalive.ClientParameters{
712                         Time:    50 * time.Millisecond,
713                         Timeout: 50 * time.Millisecond,
714                 },
715         }
716         server, client := setUpWithOptions(t, 0, serverConfig, suspended, clientOptions)
717         defer server.stop()
718         defer client.Close()
719
720         if _, err := client.NewStream(context.Background(), &CallHdr{Flush: true}); err != nil {
721                 t.Fatalf("Client failed to create stream.")
722         }
723         timeout := time.NewTimer(2 * time.Second)
724         select {
725         case <-client.GoAway():
726                 if !timeout.Stop() {
727                         <-timeout.C
728                 }
729         case <-timeout.C:
730                 t.Fatalf("Test failed: Expected a GoAway from server.")
731         }
732         time.Sleep(500 * time.Millisecond)
733         ct := client.(*http2Client)
734         ct.mu.Lock()
735         defer ct.mu.Unlock()
736         if ct.state == reachable {
737                 t.Fatalf("Test failed: Expected the connection to be closed.")
738         }
739 }
740
741 func TestKeepaliveServerEnforcementWithObeyingClientNoRPC(t *testing.T) {
742         serverConfig := &ServerConfig{
743                 KeepalivePolicy: keepalive.EnforcementPolicy{
744                         MinTime:             100 * time.Millisecond,
745                         PermitWithoutStream: true,
746                 },
747         }
748         clientOptions := ConnectOptions{
749                 KeepaliveParams: keepalive.ClientParameters{
750                         Time:                101 * time.Millisecond,
751                         Timeout:             50 * time.Millisecond,
752                         PermitWithoutStream: true,
753                 },
754         }
755         server, client := setUpWithOptions(t, 0, serverConfig, normal, clientOptions)
756         defer server.stop()
757         defer client.Close()
758
759         // Give keepalive enough time.
760         time.Sleep(2 * time.Second)
761         // Assert that connection is healthy.
762         ct := client.(*http2Client)
763         ct.mu.Lock()
764         defer ct.mu.Unlock()
765         if ct.state != reachable {
766                 t.Fatalf("Test failed: Expected connection to be healthy.")
767         }
768 }
769
770 func TestKeepaliveServerEnforcementWithObeyingClientWithRPC(t *testing.T) {
771         serverConfig := &ServerConfig{
772                 KeepalivePolicy: keepalive.EnforcementPolicy{
773                         MinTime: 100 * time.Millisecond,
774                 },
775         }
776         clientOptions := ConnectOptions{
777                 KeepaliveParams: keepalive.ClientParameters{
778                         Time:    101 * time.Millisecond,
779                         Timeout: 50 * time.Millisecond,
780                 },
781         }
782         server, client := setUpWithOptions(t, 0, serverConfig, suspended, clientOptions)
783         defer server.stop()
784         defer client.Close()
785
786         if _, err := client.NewStream(context.Background(), &CallHdr{Flush: true}); err != nil {
787                 t.Fatalf("Client failed to create stream.")
788         }
789
790         // Give keepalive enough time.
791         time.Sleep(2 * time.Second)
792         // Assert that connection is healthy.
793         ct := client.(*http2Client)
794         ct.mu.Lock()
795         defer ct.mu.Unlock()
796         if ct.state != reachable {
797                 t.Fatalf("Test failed: Expected connection to be healthy.")
798         }
799 }
800
801 func TestClientSendAndReceive(t *testing.T) {
802         server, ct := setUp(t, 0, math.MaxUint32, normal)
803         callHdr := &CallHdr{
804                 Host:   "localhost",
805                 Method: "foo.Small",
806         }
807         s1, err1 := ct.NewStream(context.Background(), callHdr)
808         if err1 != nil {
809                 t.Fatalf("failed to open stream: %v", err1)
810         }
811         if s1.id != 1 {
812                 t.Fatalf("wrong stream id: %d", s1.id)
813         }
814         s2, err2 := ct.NewStream(context.Background(), callHdr)
815         if err2 != nil {
816                 t.Fatalf("failed to open stream: %v", err2)
817         }
818         if s2.id != 3 {
819                 t.Fatalf("wrong stream id: %d", s2.id)
820         }
821         opts := Options{
822                 Last:  true,
823                 Delay: false,
824         }
825         if err := ct.Write(s1, nil, expectedRequest, &opts); err != nil && err != io.EOF {
826                 t.Fatalf("failed to send data: %v", err)
827         }
828         p := make([]byte, len(expectedResponse))
829         _, recvErr := s1.Read(p)
830         if recvErr != nil || !bytes.Equal(p, expectedResponse) {
831                 t.Fatalf("Error: %v, want <nil>; Result: %v, want %v", recvErr, p, expectedResponse)
832         }
833         _, recvErr = s1.Read(p)
834         if recvErr != io.EOF {
835                 t.Fatalf("Error: %v; want <EOF>", recvErr)
836         }
837         ct.Close()
838         server.stop()
839 }
840
841 func TestClientErrorNotify(t *testing.T) {
842         server, ct := setUp(t, 0, math.MaxUint32, normal)
843         go server.stop()
844         // ct.reader should detect the error and activate ct.Error().
845         <-ct.Error()
846         ct.Close()
847 }
848
849 func performOneRPC(ct ClientTransport) {
850         callHdr := &CallHdr{
851                 Host:   "localhost",
852                 Method: "foo.Small",
853         }
854         s, err := ct.NewStream(context.Background(), callHdr)
855         if err != nil {
856                 return
857         }
858         opts := Options{
859                 Last:  true,
860                 Delay: false,
861         }
862         if err := ct.Write(s, []byte{}, expectedRequest, &opts); err == nil || err == io.EOF {
863                 time.Sleep(5 * time.Millisecond)
864                 // The following s.Recv()'s could error out because the
865                 // underlying transport is gone.
866                 //
867                 // Read response
868                 p := make([]byte, len(expectedResponse))
869                 s.Read(p)
870                 // Read io.EOF
871                 s.Read(p)
872         }
873 }
874
875 func TestClientMix(t *testing.T) {
876         s, ct := setUp(t, 0, math.MaxUint32, normal)
877         go func(s *server) {
878                 time.Sleep(5 * time.Second)
879                 s.stop()
880         }(s)
881         go func(ct ClientTransport) {
882                 <-ct.Error()
883                 ct.Close()
884         }(ct)
885         for i := 0; i < 1000; i++ {
886                 time.Sleep(10 * time.Millisecond)
887                 go performOneRPC(ct)
888         }
889 }
890
891 func TestLargeMessage(t *testing.T) {
892         server, ct := setUp(t, 0, math.MaxUint32, normal)
893         callHdr := &CallHdr{
894                 Host:   "localhost",
895                 Method: "foo.Large",
896         }
897         var wg sync.WaitGroup
898         for i := 0; i < 2; i++ {
899                 wg.Add(1)
900                 go func() {
901                         defer wg.Done()
902                         s, err := ct.NewStream(context.Background(), callHdr)
903                         if err != nil {
904                                 t.Errorf("%v.NewStream(_, _) = _, %v, want _, <nil>", ct, err)
905                         }
906                         if err := ct.Write(s, []byte{}, expectedRequestLarge, &Options{Last: true, Delay: false}); err != nil && err != io.EOF {
907                                 t.Errorf("%v.Write(_, _, _) = %v, want  <nil>", ct, err)
908                         }
909                         p := make([]byte, len(expectedResponseLarge))
910                         if _, err := s.Read(p); err != nil || !bytes.Equal(p, expectedResponseLarge) {
911                                 t.Errorf("s.Read(%v) = _, %v, want %v, <nil>", err, p, expectedResponse)
912                         }
913                         if _, err = s.Read(p); err != io.EOF {
914                                 t.Errorf("Failed to complete the stream %v; want <EOF>", err)
915                         }
916                 }()
917         }
918         wg.Wait()
919         ct.Close()
920         server.stop()
921 }
922
923 func TestLargeMessageWithDelayRead(t *testing.T) {
924         server, ct := setUp(t, 0, math.MaxUint32, delayRead)
925         callHdr := &CallHdr{
926                 Host:   "localhost",
927                 Method: "foo.Large",
928         }
929         var wg sync.WaitGroup
930         for i := 0; i < 2; i++ {
931                 wg.Add(1)
932                 go func() {
933                         defer wg.Done()
934                         s, err := ct.NewStream(context.Background(), callHdr)
935                         if err != nil {
936                                 t.Errorf("%v.NewStream(_, _) = _, %v, want _, <nil>", ct, err)
937                         }
938                         if err := ct.Write(s, []byte{}, expectedRequestLarge, &Options{Last: true, Delay: false}); err != nil && err != io.EOF {
939                                 t.Errorf("%v.Write(_, _, _) = %v, want  <nil>", ct, err)
940                         }
941                         p := make([]byte, len(expectedResponseLarge))
942
943                         // Give time to server to begin sending before client starts reading.
944                         time.Sleep(2 * time.Second)
945                         if _, err := s.Read(p); err != nil || !bytes.Equal(p, expectedResponseLarge) {
946                                 t.Errorf("s.Read(_) = _, %v, want _, <nil>", err)
947                         }
948                         if _, err = s.Read(p); err != io.EOF {
949                                 t.Errorf("Failed to complete the stream %v; want <EOF>", err)
950                         }
951                 }()
952         }
953         wg.Wait()
954         ct.Close()
955         server.stop()
956 }
957
958 func TestLargeMessageDelayWrite(t *testing.T) {
959         server, ct := setUp(t, 0, math.MaxUint32, delayWrite)
960         callHdr := &CallHdr{
961                 Host:   "localhost",
962                 Method: "foo.Large",
963         }
964         var wg sync.WaitGroup
965         for i := 0; i < 2; i++ {
966                 wg.Add(1)
967                 go func() {
968                         defer wg.Done()
969                         s, err := ct.NewStream(context.Background(), callHdr)
970                         if err != nil {
971                                 t.Errorf("%v.NewStream(_, _) = _, %v, want _, <nil>", ct, err)
972                         }
973
974                         // Give time to server to start reading before client starts sending.
975                         time.Sleep(2 * time.Second)
976                         if err := ct.Write(s, []byte{}, expectedRequestLarge, &Options{Last: true, Delay: false}); err != nil && err != io.EOF {
977                                 t.Errorf("%v.Write(_, _, _) = %v, want  <nil>", ct, err)
978                         }
979                         p := make([]byte, len(expectedResponseLarge))
980                         if _, err := s.Read(p); err != nil || !bytes.Equal(p, expectedResponseLarge) {
981                                 t.Errorf("io.ReadFull(%v) = _, %v, want %v, <nil>", err, p, expectedResponse)
982                         }
983                         if _, err = s.Read(p); err != io.EOF {
984                                 t.Errorf("Failed to complete the stream %v; want <EOF>", err)
985                         }
986                 }()
987         }
988         wg.Wait()
989         ct.Close()
990         server.stop()
991 }
992
993 func TestGracefulClose(t *testing.T) {
994         server, ct := setUp(t, 0, math.MaxUint32, normal)
995         callHdr := &CallHdr{
996                 Host:   "localhost",
997                 Method: "foo.Small",
998         }
999         s, err := ct.NewStream(context.Background(), callHdr)
1000         if err != nil {
1001                 t.Fatalf("%v.NewStream(_, _) = _, %v, want _, <nil>", ct, err)
1002         }
1003         if err = ct.GracefulClose(); err != nil {
1004                 t.Fatalf("%v.GracefulClose() = %v, want <nil>", ct, err)
1005         }
1006         var wg sync.WaitGroup
1007         // Expect the failure for all the follow-up streams because ct has been closed gracefully.
1008         for i := 0; i < 100; i++ {
1009                 wg.Add(1)
1010                 go func() {
1011                         defer wg.Done()
1012                         if _, err := ct.NewStream(context.Background(), callHdr); err != ErrStreamDrain {
1013                                 t.Errorf("%v.NewStream(_, _) = _, %v, want _, %v", ct, err, ErrStreamDrain)
1014                         }
1015                 }()
1016         }
1017         opts := Options{
1018                 Last:  true,
1019                 Delay: false,
1020         }
1021         // The stream which was created before graceful close can still proceed.
1022         if err := ct.Write(s, nil, expectedRequest, &opts); err != nil && err != io.EOF {
1023                 t.Fatalf("%v.Write(_, _, _) = %v, want  <nil>", ct, err)
1024         }
1025         p := make([]byte, len(expectedResponse))
1026         if _, err := s.Read(p); err != nil || !bytes.Equal(p, expectedResponse) {
1027                 t.Fatalf("s.Read(%v) = _, %v, want %v, <nil>", err, p, expectedResponse)
1028         }
1029         if _, err = s.Read(p); err != io.EOF {
1030                 t.Fatalf("Failed to complete the stream %v; want <EOF>", err)
1031         }
1032         wg.Wait()
1033         ct.Close()
1034         server.stop()
1035 }
1036
1037 func TestLargeMessageSuspension(t *testing.T) {
1038         server, ct := setUp(t, 0, math.MaxUint32, suspended)
1039         callHdr := &CallHdr{
1040                 Host:   "localhost",
1041                 Method: "foo.Large",
1042         }
1043         // Set a long enough timeout for writing a large message out.
1044         ctx, cancel := context.WithTimeout(context.Background(), time.Second)
1045         defer cancel()
1046         s, err := ct.NewStream(ctx, callHdr)
1047         if err != nil {
1048                 t.Fatalf("failed to open stream: %v", err)
1049         }
1050         // Write should not be done successfully due to flow control.
1051         msg := make([]byte, initialWindowSize*8)
1052         err = ct.Write(s, nil, msg, &Options{Last: true, Delay: false})
1053         expectedErr := streamErrorf(codes.DeadlineExceeded, "%v", context.DeadlineExceeded)
1054         if err != expectedErr {
1055                 t.Fatalf("Write got %v, want %v", err, expectedErr)
1056         }
1057         ct.Close()
1058         server.stop()
1059 }
1060
1061 func TestMaxStreams(t *testing.T) {
1062         server, ct := setUp(t, 0, 1, suspended)
1063         callHdr := &CallHdr{
1064                 Host:   "localhost",
1065                 Method: "foo.Large",
1066         }
1067         // Have a pending stream which takes all streams quota.
1068         s, err := ct.NewStream(context.Background(), callHdr)
1069         if err != nil {
1070                 t.Fatalf("Failed to open stream: %v", err)
1071         }
1072         cc, ok := ct.(*http2Client)
1073         if !ok {
1074                 t.Fatalf("Failed to convert %v to *http2Client", ct)
1075         }
1076         done := make(chan struct{})
1077         ch := make(chan int)
1078         ready := make(chan struct{})
1079         go func() {
1080                 for {
1081                         select {
1082                         case <-time.After(5 * time.Millisecond):
1083                                 select {
1084                                 case ch <- 0:
1085                                 case <-ready:
1086                                         return
1087                                 }
1088                         case <-time.After(5 * time.Second):
1089                                 close(done)
1090                                 return
1091                         case <-ready:
1092                                 return
1093                         }
1094                 }
1095         }()
1096         for {
1097                 select {
1098                 case <-ch:
1099                 case <-done:
1100                         t.Fatalf("Client has not received the max stream setting in 5 seconds.")
1101                 }
1102                 cc.mu.Lock()
1103                 // cc.maxStreams should be equal to 1 after having received settings frame from
1104                 // server.
1105                 if cc.maxStreams == 1 {
1106                         cc.mu.Unlock()
1107                         select {
1108                         case <-cc.streamsQuota.acquire():
1109                                 t.Fatalf("streamsQuota.acquire() becomes readable mistakenly.")
1110                         default:
1111                                 cc.streamsQuota.mu.Lock()
1112                                 quota := cc.streamsQuota.quota
1113                                 cc.streamsQuota.mu.Unlock()
1114                                 if quota != 0 {
1115                                         t.Fatalf("streamsQuota.quota got non-zero quota mistakenly.")
1116                                 }
1117                         }
1118                         break
1119                 }
1120                 cc.mu.Unlock()
1121         }
1122         close(ready)
1123         // Close the pending stream so that the streams quota becomes available for the next new stream.
1124         ct.CloseStream(s, nil)
1125         select {
1126         case i := <-cc.streamsQuota.acquire():
1127                 if i != 1 {
1128                         t.Fatalf("streamsQuota.acquire() got %d quota, want 1.", i)
1129                 }
1130                 cc.streamsQuota.add(i)
1131         default:
1132                 t.Fatalf("streamsQuota.acquire() is not readable.")
1133         }
1134         if _, err := ct.NewStream(context.Background(), callHdr); err != nil {
1135                 t.Fatalf("Failed to open stream: %v", err)
1136         }
1137         ct.Close()
1138         server.stop()
1139 }
1140
1141 func TestServerContextCanceledOnClosedConnection(t *testing.T) {
1142         server, ct := setUp(t, 0, math.MaxUint32, suspended)
1143         callHdr := &CallHdr{
1144                 Host:   "localhost",
1145                 Method: "foo",
1146         }
1147         var sc *http2Server
1148         // Wait until the server transport is setup.
1149         for {
1150                 server.mu.Lock()
1151                 if len(server.conns) == 0 {
1152                         server.mu.Unlock()
1153                         time.Sleep(time.Millisecond)
1154                         continue
1155                 }
1156                 for k := range server.conns {
1157                         var ok bool
1158                         sc, ok = k.(*http2Server)
1159                         if !ok {
1160                                 t.Fatalf("Failed to convert %v to *http2Server", k)
1161                         }
1162                 }
1163                 server.mu.Unlock()
1164                 break
1165         }
1166         cc, ok := ct.(*http2Client)
1167         if !ok {
1168                 t.Fatalf("Failed to convert %v to *http2Client", ct)
1169         }
1170         s, err := ct.NewStream(context.Background(), callHdr)
1171         if err != nil {
1172                 t.Fatalf("Failed to open stream: %v", err)
1173         }
1174         cc.controlBuf.put(&dataFrame{s.id, false, make([]byte, http2MaxFrameLen), func() {}})
1175         // Loop until the server side stream is created.
1176         var ss *Stream
1177         for {
1178                 time.Sleep(time.Second)
1179                 sc.mu.Lock()
1180                 if len(sc.activeStreams) == 0 {
1181                         sc.mu.Unlock()
1182                         continue
1183                 }
1184                 ss = sc.activeStreams[s.id]
1185                 sc.mu.Unlock()
1186                 break
1187         }
1188         cc.Close()
1189         select {
1190         case <-ss.Context().Done():
1191                 if ss.Context().Err() != context.Canceled {
1192                         t.Fatalf("ss.Context().Err() got %v, want %v", ss.Context().Err(), context.Canceled)
1193                 }
1194         case <-time.After(5 * time.Second):
1195                 t.Fatalf("Failed to cancel the context of the sever side stream.")
1196         }
1197         server.stop()
1198 }
1199
1200 func TestClientConnDecoupledFromApplicationRead(t *testing.T) {
1201         connectOptions := ConnectOptions{
1202                 InitialWindowSize:     defaultWindowSize,
1203                 InitialConnWindowSize: defaultWindowSize,
1204         }
1205         server, client := setUpWithOptions(t, 0, &ServerConfig{}, notifyCall, connectOptions)
1206         defer server.stop()
1207         defer client.Close()
1208
1209         waitWhileTrue(t, func() (bool, error) {
1210                 server.mu.Lock()
1211                 defer server.mu.Unlock()
1212
1213                 if len(server.conns) == 0 {
1214                         return true, fmt.Errorf("timed-out while waiting for connection to be created on the server")
1215                 }
1216                 return false, nil
1217         })
1218
1219         var st *http2Server
1220         server.mu.Lock()
1221         for k := range server.conns {
1222                 st = k.(*http2Server)
1223         }
1224         notifyChan := make(chan struct{})
1225         server.h.notify = notifyChan
1226         server.mu.Unlock()
1227         cstream1, err := client.NewStream(context.Background(), &CallHdr{Flush: true})
1228         if err != nil {
1229                 t.Fatalf("Client failed to create first stream. Err: %v", err)
1230         }
1231
1232         <-notifyChan
1233         var sstream1 *Stream
1234         // Access stream on the server.
1235         st.mu.Lock()
1236         for _, v := range st.activeStreams {
1237                 if v.id == cstream1.id {
1238                         sstream1 = v
1239                 }
1240         }
1241         st.mu.Unlock()
1242         if sstream1 == nil {
1243                 t.Fatalf("Didn't find stream corresponding to client cstream.id: %v on the server", cstream1.id)
1244         }
1245         // Exhaust client's connection window.
1246         if err := st.Write(sstream1, []byte{}, make([]byte, defaultWindowSize), &Options{}); err != nil {
1247                 t.Fatalf("Server failed to write data. Err: %v", err)
1248         }
1249         notifyChan = make(chan struct{})
1250         server.mu.Lock()
1251         server.h.notify = notifyChan
1252         server.mu.Unlock()
1253         // Create another stream on client.
1254         cstream2, err := client.NewStream(context.Background(), &CallHdr{Flush: true})
1255         if err != nil {
1256                 t.Fatalf("Client failed to create second stream. Err: %v", err)
1257         }
1258         <-notifyChan
1259         var sstream2 *Stream
1260         st.mu.Lock()
1261         for _, v := range st.activeStreams {
1262                 if v.id == cstream2.id {
1263                         sstream2 = v
1264                 }
1265         }
1266         st.mu.Unlock()
1267         if sstream2 == nil {
1268                 t.Fatalf("Didn't find stream corresponding to client cstream.id: %v on the server", cstream2.id)
1269         }
1270         // Server should be able to send data on the new stream, even though the client hasn't read anything on the first stream.
1271         if err := st.Write(sstream2, []byte{}, make([]byte, defaultWindowSize), &Options{}); err != nil {
1272                 t.Fatalf("Server failed to write data. Err: %v", err)
1273         }
1274
1275         // Client should be able to read data on second stream.
1276         if _, err := cstream2.Read(make([]byte, defaultWindowSize)); err != nil {
1277                 t.Fatalf("_.Read(_) = _, %v, want _, <nil>", err)
1278         }
1279
1280         // Client should be able to read data on first stream.
1281         if _, err := cstream1.Read(make([]byte, defaultWindowSize)); err != nil {
1282                 t.Fatalf("_.Read(_) = _, %v, want _, <nil>", err)
1283         }
1284 }
1285
1286 func TestServerConnDecoupledFromApplicationRead(t *testing.T) {
1287         serverConfig := &ServerConfig{
1288                 InitialWindowSize:     defaultWindowSize,
1289                 InitialConnWindowSize: defaultWindowSize,
1290         }
1291         server, client := setUpWithOptions(t, 0, serverConfig, suspended, ConnectOptions{})
1292         defer server.stop()
1293         defer client.Close()
1294         waitWhileTrue(t, func() (bool, error) {
1295                 server.mu.Lock()
1296                 defer server.mu.Unlock()
1297
1298                 if len(server.conns) == 0 {
1299                         return true, fmt.Errorf("timed-out while waiting for connection to be created on the server")
1300                 }
1301                 return false, nil
1302         })
1303         var st *http2Server
1304         server.mu.Lock()
1305         for k := range server.conns {
1306                 st = k.(*http2Server)
1307         }
1308         server.mu.Unlock()
1309         cstream1, err := client.NewStream(context.Background(), &CallHdr{Flush: true})
1310         if err != nil {
1311                 t.Fatalf("Failed to create 1st stream. Err: %v", err)
1312         }
1313         // Exhaust server's connection window.
1314         if err := client.Write(cstream1, nil, make([]byte, defaultWindowSize), &Options{Last: true}); err != nil {
1315                 t.Fatalf("Client failed to write data. Err: %v", err)
1316         }
1317         //Client should be able to create another stream and send data on it.
1318         cstream2, err := client.NewStream(context.Background(), &CallHdr{Flush: true})
1319         if err != nil {
1320                 t.Fatalf("Failed to create 2nd stream. Err: %v", err)
1321         }
1322         if err := client.Write(cstream2, nil, make([]byte, defaultWindowSize), &Options{}); err != nil {
1323                 t.Fatalf("Client failed to write data. Err: %v", err)
1324         }
1325         // Get the streams on server.
1326         waitWhileTrue(t, func() (bool, error) {
1327                 st.mu.Lock()
1328                 defer st.mu.Unlock()
1329
1330                 if len(st.activeStreams) != 2 {
1331                         return true, fmt.Errorf("timed-out while waiting for server to have created the streams")
1332                 }
1333                 return false, nil
1334         })
1335         var sstream1 *Stream
1336         st.mu.Lock()
1337         for _, v := range st.activeStreams {
1338                 if v.id == 1 {
1339                         sstream1 = v
1340                 }
1341         }
1342         st.mu.Unlock()
1343         // Trying to write more on a max-ed out stream should result in a RST_STREAM from the server.
1344         ct := client.(*http2Client)
1345         ct.controlBuf.put(&dataFrame{cstream2.id, true, make([]byte, 1), func() {}})
1346         code := http2ErrConvTab[http2.ErrCodeFlowControl]
1347         waitWhileTrue(t, func() (bool, error) {
1348                 cstream2.mu.Lock()
1349                 defer cstream2.mu.Unlock()
1350                 if cstream2.status.Code() != code {
1351                         return true, fmt.Errorf("want code = %v, got %v", code, cstream2.status.Code())
1352                 }
1353                 return false, nil
1354         })
1355         // Reading from the stream on server should succeed.
1356         if _, err := sstream1.Read(make([]byte, defaultWindowSize)); err != nil {
1357                 t.Fatalf("_.Read(_) = %v, want <nil>", err)
1358         }
1359
1360         if _, err := sstream1.Read(make([]byte, 1)); err != io.EOF {
1361                 t.Fatalf("_.Read(_) = %v, want io.EOF", err)
1362         }
1363
1364 }
1365
1366 func TestServerWithMisbehavedClient(t *testing.T) {
1367         server, ct := setUp(t, 0, math.MaxUint32, suspended)
1368         callHdr := &CallHdr{
1369                 Host:   "localhost",
1370                 Method: "foo",
1371         }
1372         var sc *http2Server
1373         // Wait until the server transport is setup.
1374         for {
1375                 server.mu.Lock()
1376                 if len(server.conns) == 0 {
1377                         server.mu.Unlock()
1378                         time.Sleep(time.Millisecond)
1379                         continue
1380                 }
1381                 for k := range server.conns {
1382                         var ok bool
1383                         sc, ok = k.(*http2Server)
1384                         if !ok {
1385                                 t.Fatalf("Failed to convert %v to *http2Server", k)
1386                         }
1387                 }
1388                 server.mu.Unlock()
1389                 break
1390         }
1391         cc, ok := ct.(*http2Client)
1392         if !ok {
1393                 t.Fatalf("Failed to convert %v to *http2Client", ct)
1394         }
1395         // Test server behavior for violation of stream flow control window size restriction.
1396         s, err := ct.NewStream(context.Background(), callHdr)
1397         if err != nil {
1398                 t.Fatalf("Failed to open stream: %v", err)
1399         }
1400         var sent int
1401         // Drain the stream flow control window
1402         cc.controlBuf.put(&dataFrame{s.id, false, make([]byte, http2MaxFrameLen), func() {}})
1403         sent += http2MaxFrameLen
1404         // Wait until the server creates the corresponding stream and receive some data.
1405         var ss *Stream
1406         for {
1407                 time.Sleep(time.Millisecond)
1408                 sc.mu.Lock()
1409                 if len(sc.activeStreams) == 0 {
1410                         sc.mu.Unlock()
1411                         continue
1412                 }
1413                 ss = sc.activeStreams[s.id]
1414                 sc.mu.Unlock()
1415                 ss.fc.mu.Lock()
1416                 if ss.fc.pendingData > 0 {
1417                         ss.fc.mu.Unlock()
1418                         break
1419                 }
1420                 ss.fc.mu.Unlock()
1421         }
1422         if ss.fc.pendingData != http2MaxFrameLen || ss.fc.pendingUpdate != 0 || sc.fc.pendingData != 0 || sc.fc.pendingUpdate != 0 {
1423                 t.Fatalf("Server mistakenly updates inbound flow control params: got %d, %d, %d, %d; want %d, %d, %d, %d", ss.fc.pendingData, ss.fc.pendingUpdate, sc.fc.pendingData, sc.fc.pendingUpdate, http2MaxFrameLen, 0, 0, 0)
1424         }
1425         // Keep sending until the server inbound window is drained for that stream.
1426         for sent <= initialWindowSize {
1427                 cc.controlBuf.put(&dataFrame{s.id, false, make([]byte, 1), func() {}})
1428                 sent++
1429         }
1430         // Server sent a resetStream for s already.
1431         code := http2ErrConvTab[http2.ErrCodeFlowControl]
1432         if _, err := s.Read(make([]byte, 1)); err != io.EOF {
1433                 t.Fatalf("%v got err %v want <EOF>", s, err)
1434         }
1435         if s.status.Code() != code {
1436                 t.Fatalf("%v got status %v; want Code=%v", s, s.status, code)
1437         }
1438
1439         ct.CloseStream(s, nil)
1440         ct.Close()
1441         server.stop()
1442 }
1443
1444 func TestClientWithMisbehavedServer(t *testing.T) {
1445         // Turn off BDP estimation so that the server can
1446         // violate stream window.
1447         connectOptions := ConnectOptions{
1448                 InitialWindowSize: initialWindowSize,
1449         }
1450         server, ct := setUpWithOptions(t, 0, &ServerConfig{}, misbehaved, connectOptions)
1451         callHdr := &CallHdr{
1452                 Host:   "localhost",
1453                 Method: "foo.Stream",
1454         }
1455         conn, ok := ct.(*http2Client)
1456         if !ok {
1457                 t.Fatalf("Failed to convert %v to *http2Client", ct)
1458         }
1459         // Test the logic for the violation of stream flow control window size restriction.
1460         s, err := ct.NewStream(context.Background(), callHdr)
1461         if err != nil {
1462                 t.Fatalf("Failed to open stream: %v", err)
1463         }
1464         d := make([]byte, 1)
1465         if err := ct.Write(s, nil, d, &Options{Last: true, Delay: false}); err != nil && err != io.EOF {
1466                 t.Fatalf("Failed to write: %v", err)
1467         }
1468         // Read without window update.
1469         for {
1470                 p := make([]byte, http2MaxFrameLen)
1471                 if _, err = s.trReader.(*transportReader).reader.Read(p); err != nil {
1472                         break
1473                 }
1474         }
1475         if s.fc.pendingData <= initialWindowSize || s.fc.pendingUpdate != 0 || conn.fc.pendingData != 0 || conn.fc.pendingUpdate != 0 {
1476                 t.Fatalf("Client mistakenly updates inbound flow control params: got %d, %d, %d, %d; want >%d, %d, %d, >%d", s.fc.pendingData, s.fc.pendingUpdate, conn.fc.pendingData, conn.fc.pendingUpdate, initialWindowSize, 0, 0, 0)
1477         }
1478
1479         if err != io.EOF {
1480                 t.Fatalf("Got err %v, want <EOF>", err)
1481         }
1482         if s.status.Code() != codes.Internal {
1483                 t.Fatalf("Got s.status %v, want s.status.Code()=Internal", s.status)
1484         }
1485
1486         conn.CloseStream(s, err)
1487         ct.Close()
1488         server.stop()
1489 }
1490
1491 var encodingTestStatus = status.New(codes.Internal, "\n")
1492
1493 func TestEncodingRequiredStatus(t *testing.T) {
1494         server, ct := setUp(t, 0, math.MaxUint32, encodingRequiredStatus)
1495         callHdr := &CallHdr{
1496                 Host:   "localhost",
1497                 Method: "foo",
1498         }
1499         s, err := ct.NewStream(context.Background(), callHdr)
1500         if err != nil {
1501                 return
1502         }
1503         opts := Options{
1504                 Last:  true,
1505                 Delay: false,
1506         }
1507         if err := ct.Write(s, nil, expectedRequest, &opts); err != nil && err != io.EOF {
1508                 t.Fatalf("Failed to write the request: %v", err)
1509         }
1510         p := make([]byte, http2MaxFrameLen)
1511         if _, err := s.trReader.(*transportReader).Read(p); err != io.EOF {
1512                 t.Fatalf("Read got error %v, want %v", err, io.EOF)
1513         }
1514         if !reflect.DeepEqual(s.Status(), encodingTestStatus) {
1515                 t.Fatalf("stream with status %v, want %v", s.Status(), encodingTestStatus)
1516         }
1517         ct.Close()
1518         server.stop()
1519 }
1520
1521 func TestInvalidHeaderField(t *testing.T) {
1522         server, ct := setUp(t, 0, math.MaxUint32, invalidHeaderField)
1523         callHdr := &CallHdr{
1524                 Host:   "localhost",
1525                 Method: "foo",
1526         }
1527         s, err := ct.NewStream(context.Background(), callHdr)
1528         if err != nil {
1529                 return
1530         }
1531         opts := Options{
1532                 Last:  true,
1533                 Delay: false,
1534         }
1535         if err := ct.Write(s, nil, expectedRequest, &opts); err != nil && err != io.EOF {
1536                 t.Fatalf("Failed to write the request: %v", err)
1537         }
1538         p := make([]byte, http2MaxFrameLen)
1539         _, err = s.trReader.(*transportReader).Read(p)
1540         if se, ok := err.(StreamError); !ok || se.Code != codes.FailedPrecondition || !strings.Contains(err.Error(), expectedInvalidHeaderField) {
1541                 t.Fatalf("Read got error %v, want error with code %s and contains %q", err, codes.FailedPrecondition, expectedInvalidHeaderField)
1542         }
1543         ct.Close()
1544         server.stop()
1545 }
1546
1547 func TestStreamContext(t *testing.T) {
1548         expectedStream := &Stream{}
1549         ctx := newContextWithStream(context.Background(), expectedStream)
1550         s, ok := StreamFromContext(ctx)
1551         if !ok || expectedStream != s {
1552                 t.Fatalf("GetStreamFromContext(%v) = %v, %t, want: %v, true", ctx, s, ok, expectedStream)
1553         }
1554 }
1555
1556 func TestIsReservedHeader(t *testing.T) {
1557         tests := []struct {
1558                 h    string
1559                 want bool
1560         }{
1561                 {"", false}, // but should be rejected earlier
1562                 {"foo", false},
1563                 {"content-type", true},
1564                 {"grpc-message-type", true},
1565                 {"grpc-encoding", true},
1566                 {"grpc-message", true},
1567                 {"grpc-status", true},
1568                 {"grpc-timeout", true},
1569                 {"te", true},
1570         }
1571         for _, tt := range tests {
1572                 got := isReservedHeader(tt.h)
1573                 if got != tt.want {
1574                         t.Errorf("isReservedHeader(%q) = %v; want %v", tt.h, got, tt.want)
1575                 }
1576         }
1577 }
1578
1579 func TestContextErr(t *testing.T) {
1580         for _, test := range []struct {
1581                 // input
1582                 errIn error
1583                 // outputs
1584                 errOut StreamError
1585         }{
1586                 {context.DeadlineExceeded, StreamError{codes.DeadlineExceeded, context.DeadlineExceeded.Error()}},
1587                 {context.Canceled, StreamError{codes.Canceled, context.Canceled.Error()}},
1588         } {
1589                 err := ContextErr(test.errIn)
1590                 if err != test.errOut {
1591                         t.Fatalf("ContextErr{%v} = %v \nwant %v", test.errIn, err, test.errOut)
1592                 }
1593         }
1594 }
1595
1596 func max(a, b int32) int32 {
1597         if a > b {
1598                 return a
1599         }
1600         return b
1601 }
1602
1603 type windowSizeConfig struct {
1604         serverStream int32
1605         serverConn   int32
1606         clientStream int32
1607         clientConn   int32
1608 }
1609
1610 func TestAccountCheckWindowSizeWithLargeWindow(t *testing.T) {
1611         wc := windowSizeConfig{
1612                 serverStream: 10 * 1024 * 1024,
1613                 serverConn:   12 * 1024 * 1024,
1614                 clientStream: 6 * 1024 * 1024,
1615                 clientConn:   8 * 1024 * 1024,
1616         }
1617         testAccountCheckWindowSize(t, wc)
1618 }
1619
1620 func TestAccountCheckWindowSizeWithSmallWindow(t *testing.T) {
1621         wc := windowSizeConfig{
1622                 serverStream: defaultWindowSize,
1623                 // Note this is smaller than initialConnWindowSize which is the current default.
1624                 serverConn:   defaultWindowSize,
1625                 clientStream: defaultWindowSize,
1626                 clientConn:   defaultWindowSize,
1627         }
1628         testAccountCheckWindowSize(t, wc)
1629 }
1630
1631 func testAccountCheckWindowSize(t *testing.T, wc windowSizeConfig) {
1632         serverConfig := &ServerConfig{
1633                 InitialWindowSize:     wc.serverStream,
1634                 InitialConnWindowSize: wc.serverConn,
1635         }
1636         connectOptions := ConnectOptions{
1637                 InitialWindowSize:     wc.clientStream,
1638                 InitialConnWindowSize: wc.clientConn,
1639         }
1640         server, client := setUpWithOptions(t, 0, serverConfig, suspended, connectOptions)
1641         defer server.stop()
1642         defer client.Close()
1643
1644         // Wait for server conns to be populated with new server transport.
1645         waitWhileTrue(t, func() (bool, error) {
1646                 server.mu.Lock()
1647                 defer server.mu.Unlock()
1648                 if len(server.conns) == 0 {
1649                         return true, fmt.Errorf("timed out waiting for server transport to be created")
1650                 }
1651                 return false, nil
1652         })
1653         var st *http2Server
1654         server.mu.Lock()
1655         for k := range server.conns {
1656                 st = k.(*http2Server)
1657         }
1658         server.mu.Unlock()
1659         ct := client.(*http2Client)
1660         cstream, err := client.NewStream(context.Background(), &CallHdr{Flush: true})
1661         if err != nil {
1662                 t.Fatalf("Failed to create stream. Err: %v", err)
1663         }
1664         // Wait for server to receive headers.
1665         waitWhileTrue(t, func() (bool, error) {
1666                 st.mu.Lock()
1667                 defer st.mu.Unlock()
1668                 if len(st.activeStreams) == 0 {
1669                         return true, fmt.Errorf("timed out waiting for server to receive headers")
1670                 }
1671                 return false, nil
1672         })
1673         // Sleeping to make sure the settings are applied in case of negative test.
1674         time.Sleep(time.Second)
1675
1676         waitWhileTrue(t, func() (bool, error) {
1677                 st.fc.mu.Lock()
1678                 lim := st.fc.limit
1679                 st.fc.mu.Unlock()
1680                 if lim != uint32(serverConfig.InitialConnWindowSize) {
1681                         return true, fmt.Errorf("Server transport flow control window size: got %v, want %v", lim, serverConfig.InitialConnWindowSize)
1682                 }
1683                 return false, nil
1684         })
1685
1686         ctx, cancel := context.WithTimeout(context.Background(), time.Second)
1687         serverSendQuota, err := wait(ctx, context.Background(), nil, nil, st.sendQuotaPool.acquire())
1688         if err != nil {
1689                 t.Fatalf("Error while acquiring sendQuota on server. Err: %v", err)
1690         }
1691         cancel()
1692         st.sendQuotaPool.add(serverSendQuota)
1693         if serverSendQuota != int(connectOptions.InitialConnWindowSize) {
1694                 t.Fatalf("Server send quota(%v) not equal to client's window size(%v) on conn.", serverSendQuota, connectOptions.InitialConnWindowSize)
1695         }
1696         st.mu.Lock()
1697         ssq := st.streamSendQuota
1698         st.mu.Unlock()
1699         if ssq != uint32(connectOptions.InitialWindowSize) {
1700                 t.Fatalf("Server stream send quota(%v) not equal to client's window size(%v) on stream.", ssq, connectOptions.InitialWindowSize)
1701         }
1702         ct.fc.mu.Lock()
1703         limit := ct.fc.limit
1704         ct.fc.mu.Unlock()
1705         if limit != uint32(connectOptions.InitialConnWindowSize) {
1706                 t.Fatalf("Client transport flow control window size is %v, want %v", limit, connectOptions.InitialConnWindowSize)
1707         }
1708         ctx, cancel = context.WithTimeout(context.Background(), time.Second)
1709         clientSendQuota, err := wait(ctx, context.Background(), nil, nil, ct.sendQuotaPool.acquire())
1710         if err != nil {
1711                 t.Fatalf("Error while acquiring sendQuota on client. Err: %v", err)
1712         }
1713         cancel()
1714         ct.sendQuotaPool.add(clientSendQuota)
1715         if clientSendQuota != int(serverConfig.InitialConnWindowSize) {
1716                 t.Fatalf("Client send quota(%v) not equal to server's window size(%v) on conn.", clientSendQuota, serverConfig.InitialConnWindowSize)
1717         }
1718         ct.mu.Lock()
1719         ssq = ct.streamSendQuota
1720         ct.mu.Unlock()
1721         if ssq != uint32(serverConfig.InitialWindowSize) {
1722                 t.Fatalf("Client stream send quota(%v) not equal to server's window size(%v) on stream.", ssq, serverConfig.InitialWindowSize)
1723         }
1724         cstream.fc.mu.Lock()
1725         limit = cstream.fc.limit
1726         cstream.fc.mu.Unlock()
1727         if limit != uint32(connectOptions.InitialWindowSize) {
1728                 t.Fatalf("Client stream flow control window size is %v, want %v", limit, connectOptions.InitialWindowSize)
1729         }
1730         var sstream *Stream
1731         st.mu.Lock()
1732         for _, v := range st.activeStreams {
1733                 sstream = v
1734         }
1735         st.mu.Unlock()
1736         sstream.fc.mu.Lock()
1737         limit = sstream.fc.limit
1738         sstream.fc.mu.Unlock()
1739         if limit != uint32(serverConfig.InitialWindowSize) {
1740                 t.Fatalf("Server stream flow control window size is %v, want %v", limit, serverConfig.InitialWindowSize)
1741         }
1742 }
1743
1744 // Check accounting on both sides after sending and receiving large messages.
1745 func TestAccountCheckExpandingWindow(t *testing.T) {
1746         server, client := setUp(t, 0, 0, pingpong)
1747         defer server.stop()
1748         defer client.Close()
1749         waitWhileTrue(t, func() (bool, error) {
1750                 server.mu.Lock()
1751                 defer server.mu.Unlock()
1752                 if len(server.conns) == 0 {
1753                         return true, fmt.Errorf("timed out while waiting for server transport to be created")
1754                 }
1755                 return false, nil
1756         })
1757         var st *http2Server
1758         server.mu.Lock()
1759         for k := range server.conns {
1760                 st = k.(*http2Server)
1761         }
1762         server.mu.Unlock()
1763         ct := client.(*http2Client)
1764         cstream, err := client.NewStream(context.Background(), &CallHdr{Flush: true})
1765         if err != nil {
1766                 t.Fatalf("Failed to create stream. Err: %v", err)
1767         }
1768
1769         msgSize := 65535 * 16 * 2
1770         msg := make([]byte, msgSize)
1771         buf := make([]byte, msgSize+5)
1772         buf[0] = byte(0)
1773         binary.BigEndian.PutUint32(buf[1:], uint32(msgSize))
1774         copy(buf[5:], msg)
1775         opts := Options{}
1776         header := make([]byte, 5)
1777         for i := 1; i <= 10; i++ {
1778                 if err := ct.Write(cstream, nil, buf, &opts); err != nil {
1779                         t.Fatalf("Error on client while writing message: %v", err)
1780                 }
1781                 if _, err := cstream.Read(header); err != nil {
1782                         t.Fatalf("Error on client while reading data frame header: %v", err)
1783                 }
1784                 sz := binary.BigEndian.Uint32(header[1:])
1785                 recvMsg := make([]byte, int(sz))
1786                 if _, err := cstream.Read(recvMsg); err != nil {
1787                         t.Fatalf("Error on client while reading data: %v", err)
1788                 }
1789                 if len(recvMsg) != len(msg) {
1790                         t.Fatalf("Length of message received by client: %v, want: %v", len(recvMsg), len(msg))
1791                 }
1792         }
1793         defer func() {
1794                 ct.Write(cstream, nil, nil, &Options{Last: true}) // Close the stream.
1795                 if _, err := cstream.Read(header); err != io.EOF {
1796                         t.Fatalf("Client expected an EOF from the server. Got: %v", err)
1797                 }
1798         }()
1799         var sstream *Stream
1800         st.mu.Lock()
1801         for _, v := range st.activeStreams {
1802                 sstream = v
1803         }
1804         st.mu.Unlock()
1805
1806         waitWhileTrue(t, func() (bool, error) {
1807                 // Check that pendingData and delta on flow control windows on both sides are 0.
1808                 cstream.fc.mu.Lock()
1809                 if cstream.fc.delta != 0 {
1810                         cstream.fc.mu.Unlock()
1811                         return true, fmt.Errorf("delta on flow control window of client stream is non-zero")
1812                 }
1813                 if cstream.fc.pendingData != 0 {
1814                         cstream.fc.mu.Unlock()
1815                         return true, fmt.Errorf("pendingData on flow control window of client stream is non-zero")
1816                 }
1817                 cstream.fc.mu.Unlock()
1818                 sstream.fc.mu.Lock()
1819                 if sstream.fc.delta != 0 {
1820                         sstream.fc.mu.Unlock()
1821                         return true, fmt.Errorf("delta on flow control window of server stream is non-zero")
1822                 }
1823                 if sstream.fc.pendingData != 0 {
1824                         sstream.fc.mu.Unlock()
1825                         return true, fmt.Errorf("pendingData on flow control window of sercer stream is non-zero")
1826                 }
1827                 sstream.fc.mu.Unlock()
1828                 ct.fc.mu.Lock()
1829                 if ct.fc.delta != 0 {
1830                         ct.fc.mu.Unlock()
1831                         return true, fmt.Errorf("delta on flow control window of client transport is non-zero")
1832                 }
1833                 if ct.fc.pendingData != 0 {
1834                         ct.fc.mu.Unlock()
1835                         return true, fmt.Errorf("pendingData on flow control window of client transport is non-zero")
1836                 }
1837                 ct.fc.mu.Unlock()
1838                 st.fc.mu.Lock()
1839                 if st.fc.delta != 0 {
1840                         st.fc.mu.Unlock()
1841                         return true, fmt.Errorf("delta on flow control window of server transport is non-zero")
1842                 }
1843                 if st.fc.pendingData != 0 {
1844                         st.fc.mu.Unlock()
1845                         return true, fmt.Errorf("pendingData on flow control window of server transport is non-zero")
1846                 }
1847                 st.fc.mu.Unlock()
1848
1849                 // Check flow conrtrol window on client stream is equal to out flow on server stream.
1850                 ctx, cancel := context.WithTimeout(context.Background(), time.Second)
1851                 serverStreamSendQuota, err := wait(ctx, context.Background(), nil, nil, sstream.sendQuotaPool.acquire())
1852                 cancel()
1853                 if err != nil {
1854                         return true, fmt.Errorf("error while acquiring server stream send quota. Err: %v", err)
1855                 }
1856                 sstream.sendQuotaPool.add(serverStreamSendQuota)
1857                 cstream.fc.mu.Lock()
1858                 clientEst := cstream.fc.limit - cstream.fc.pendingUpdate
1859                 cstream.fc.mu.Unlock()
1860                 if uint32(serverStreamSendQuota) != clientEst {
1861                         return true, fmt.Errorf("server stream outflow: %v, estimated by client: %v", serverStreamSendQuota, clientEst)
1862                 }
1863
1864                 // Check flow control window on server stream is equal to out flow on client stream.
1865                 ctx, cancel = context.WithTimeout(context.Background(), time.Second)
1866                 clientStreamSendQuota, err := wait(ctx, context.Background(), nil, nil, cstream.sendQuotaPool.acquire())
1867                 cancel()
1868                 if err != nil {
1869                         return true, fmt.Errorf("error while acquiring client stream send quota. Err: %v", err)
1870                 }
1871                 cstream.sendQuotaPool.add(clientStreamSendQuota)
1872                 sstream.fc.mu.Lock()
1873                 serverEst := sstream.fc.limit - sstream.fc.pendingUpdate
1874                 sstream.fc.mu.Unlock()
1875                 if uint32(clientStreamSendQuota) != serverEst {
1876                         return true, fmt.Errorf("client stream outflow: %v. estimated by server: %v", clientStreamSendQuota, serverEst)
1877                 }
1878
1879                 // Check flow control window on client transport is equal to out flow of server transport.
1880                 ctx, cancel = context.WithTimeout(context.Background(), time.Second)
1881                 serverTrSendQuota, err := wait(ctx, context.Background(), nil, nil, st.sendQuotaPool.acquire())
1882                 cancel()
1883                 if err != nil {
1884                         return true, fmt.Errorf("error while acquring server transport send quota. Err: %v", err)
1885                 }
1886                 st.sendQuotaPool.add(serverTrSendQuota)
1887                 ct.fc.mu.Lock()
1888                 clientEst = ct.fc.limit - ct.fc.pendingUpdate
1889                 ct.fc.mu.Unlock()
1890                 if uint32(serverTrSendQuota) != clientEst {
1891                         return true, fmt.Errorf("server transport outflow: %v, estimated by client: %v", serverTrSendQuota, clientEst)
1892                 }
1893
1894                 // Check flow control window on server transport is equal to out flow of client transport.
1895                 ctx, cancel = context.WithTimeout(context.Background(), time.Second)
1896                 clientTrSendQuota, err := wait(ctx, context.Background(), nil, nil, ct.sendQuotaPool.acquire())
1897                 cancel()
1898                 if err != nil {
1899                         return true, fmt.Errorf("error while acquiring client transport send quota. Err: %v", err)
1900                 }
1901                 ct.sendQuotaPool.add(clientTrSendQuota)
1902                 st.fc.mu.Lock()
1903                 serverEst = st.fc.limit - st.fc.pendingUpdate
1904                 st.fc.mu.Unlock()
1905                 if uint32(clientTrSendQuota) != serverEst {
1906                         return true, fmt.Errorf("client transport outflow: %v, estimated by client: %v", clientTrSendQuota, serverEst)
1907                 }
1908
1909                 return false, nil
1910         })
1911
1912 }
1913
1914 func waitWhileTrue(t *testing.T, condition func() (bool, error)) {
1915         var (
1916                 wait bool
1917                 err  error
1918         )
1919         timer := time.NewTimer(time.Second * 5)
1920         for {
1921                 wait, err = condition()
1922                 if wait {
1923                         select {
1924                         case <-timer.C:
1925                                 t.Fatalf(err.Error())
1926                         default:
1927                                 time.Sleep(50 * time.Millisecond)
1928                                 continue
1929                         }
1930                 }
1931                 if !timer.Stop() {
1932                         <-timer.C
1933                 }
1934                 break
1935         }
1936 }
1937
1938 // A function of type writeHeaders writes out
1939 // http status with the given stream ID using the given framer.
1940 type writeHeaders func(*http2.Framer, uint32, int) error
1941
1942 func writeOneHeader(framer *http2.Framer, sid uint32, httpStatus int) error {
1943         var buf bytes.Buffer
1944         henc := hpack.NewEncoder(&buf)
1945         henc.WriteField(hpack.HeaderField{Name: ":status", Value: fmt.Sprint(httpStatus)})
1946         return framer.WriteHeaders(http2.HeadersFrameParam{
1947                 StreamID:      sid,
1948                 BlockFragment: buf.Bytes(),
1949                 EndStream:     true,
1950                 EndHeaders:    true,
1951         })
1952 }
1953
1954 func writeTwoHeaders(framer *http2.Framer, sid uint32, httpStatus int) error {
1955         var buf bytes.Buffer
1956         henc := hpack.NewEncoder(&buf)
1957         henc.WriteField(hpack.HeaderField{
1958                 Name:  ":status",
1959                 Value: fmt.Sprint(http.StatusOK),
1960         })
1961         if err := framer.WriteHeaders(http2.HeadersFrameParam{
1962                 StreamID:      sid,
1963                 BlockFragment: buf.Bytes(),
1964                 EndHeaders:    true,
1965         }); err != nil {
1966                 return err
1967         }
1968         buf.Reset()
1969         henc.WriteField(hpack.HeaderField{
1970                 Name:  ":status",
1971                 Value: fmt.Sprint(httpStatus),
1972         })
1973         return framer.WriteHeaders(http2.HeadersFrameParam{
1974                 StreamID:      sid,
1975                 BlockFragment: buf.Bytes(),
1976                 EndStream:     true,
1977                 EndHeaders:    true,
1978         })
1979 }
1980
1981 type httpServer struct {
1982         conn       net.Conn
1983         httpStatus int
1984         wh         writeHeaders
1985 }
1986
1987 func (s *httpServer) start(t *testing.T, lis net.Listener) {
1988         // Launch an HTTP server to send back header with httpStatus.
1989         go func() {
1990                 var err error
1991                 s.conn, err = lis.Accept()
1992                 if err != nil {
1993                         t.Errorf("Error accepting connection: %v", err)
1994                         return
1995                 }
1996                 defer s.conn.Close()
1997                 // Read preface sent by client.
1998                 if _, err = io.ReadFull(s.conn, make([]byte, len(http2.ClientPreface))); err != nil {
1999                         t.Errorf("Error at server-side while reading preface from cleint. Err: %v", err)
2000                         return
2001                 }
2002                 reader := bufio.NewReaderSize(s.conn, defaultWriteBufSize)
2003                 writer := bufio.NewWriterSize(s.conn, defaultReadBufSize)
2004                 framer := http2.NewFramer(writer, reader)
2005                 if err = framer.WriteSettingsAck(); err != nil {
2006                         t.Errorf("Error at server-side while sending Settings ack. Err: %v", err)
2007                         return
2008                 }
2009                 var sid uint32
2010                 // Read frames until a header is received.
2011                 for {
2012                         frame, err := framer.ReadFrame()
2013                         if err != nil {
2014                                 t.Errorf("Error at server-side while reading frame. Err: %v", err)
2015                                 return
2016                         }
2017                         if hframe, ok := frame.(*http2.HeadersFrame); ok {
2018                                 sid = hframe.Header().StreamID
2019                                 break
2020                         }
2021                 }
2022                 if err = s.wh(framer, sid, s.httpStatus); err != nil {
2023                         t.Errorf("Error at server-side while writing headers. Err: %v", err)
2024                         return
2025                 }
2026                 writer.Flush()
2027         }()
2028 }
2029
2030 func (s *httpServer) cleanUp() {
2031         if s.conn != nil {
2032                 s.conn.Close()
2033         }
2034 }
2035
2036 func setUpHTTPStatusTest(t *testing.T, httpStatus int, wh writeHeaders) (stream *Stream, cleanUp func()) {
2037         var (
2038                 err    error
2039                 lis    net.Listener
2040                 server *httpServer
2041                 client ClientTransport
2042         )
2043         cleanUp = func() {
2044                 if lis != nil {
2045                         lis.Close()
2046                 }
2047                 if server != nil {
2048                         server.cleanUp()
2049                 }
2050                 if client != nil {
2051                         client.Close()
2052                 }
2053         }
2054         defer func() {
2055                 if err != nil {
2056                         cleanUp()
2057                 }
2058         }()
2059         lis, err = net.Listen("tcp", "localhost:0")
2060         if err != nil {
2061                 t.Fatalf("Failed to listen. Err: %v", err)
2062         }
2063         server = &httpServer{
2064                 httpStatus: httpStatus,
2065                 wh:         wh,
2066         }
2067         server.start(t, lis)
2068         client, err = newHTTP2Client(context.Background(), TargetInfo{Addr: lis.Addr().String()}, ConnectOptions{}, 2*time.Second)
2069         if err != nil {
2070                 t.Fatalf("Error creating client. Err: %v", err)
2071         }
2072         stream, err = client.NewStream(context.Background(), &CallHdr{Method: "bogus/method", Flush: true})
2073         if err != nil {
2074                 t.Fatalf("Error creating stream at client-side. Err: %v", err)
2075         }
2076         return
2077 }
2078
2079 func TestHTTPToGRPCStatusMapping(t *testing.T) {
2080         for k := range httpStatusConvTab {
2081                 testHTTPToGRPCStatusMapping(t, k, writeOneHeader)
2082         }
2083 }
2084
2085 func testHTTPToGRPCStatusMapping(t *testing.T, httpStatus int, wh writeHeaders) {
2086         stream, cleanUp := setUpHTTPStatusTest(t, httpStatus, wh)
2087         defer cleanUp()
2088         want := httpStatusConvTab[httpStatus]
2089         buf := make([]byte, 8)
2090         _, err := stream.Read(buf)
2091         if err == nil {
2092                 t.Fatalf("Stream.Read(_) unexpectedly returned no error. Expected stream error with code %v", want)
2093         }
2094         serr, ok := err.(StreamError)
2095         if !ok {
2096                 t.Fatalf("err.(Type) = %T, want StreamError", err)
2097         }
2098         if want != serr.Code {
2099                 t.Fatalf("Want error code: %v, got: %v", want, serr.Code)
2100         }
2101 }
2102
2103 func TestHTTPStatusOKAndMissingGRPCStatus(t *testing.T) {
2104         stream, cleanUp := setUpHTTPStatusTest(t, http.StatusOK, writeOneHeader)
2105         defer cleanUp()
2106         buf := make([]byte, 8)
2107         _, err := stream.Read(buf)
2108         if err != io.EOF {
2109                 t.Fatalf("stream.Read(_) = _, %v, want _, io.EOF", err)
2110         }
2111         want := codes.Unknown
2112         stream.mu.Lock()
2113         defer stream.mu.Unlock()
2114         if stream.status.Code() != want {
2115                 t.Fatalf("Status code of stream: %v, want: %v", stream.status.Code(), want)
2116         }
2117 }
2118
2119 func TestHTTPStatusNottOKAndMissingGRPCStatusInSecondHeader(t *testing.T) {
2120         testHTTPToGRPCStatusMapping(t, http.StatusUnauthorized, writeTwoHeaders)
2121 }
2122
2123 // If any error occurs on a call to Stream.Read, future calls
2124 // should continue to return that same error.
2125 func TestReadGivesSameErrorAfterAnyErrorOccurs(t *testing.T) {
2126         testRecvBuffer := newRecvBuffer()
2127         s := &Stream{
2128                 ctx:         context.Background(),
2129                 goAway:      make(chan struct{}),
2130                 buf:         testRecvBuffer,
2131                 requestRead: func(int) {},
2132         }
2133         s.trReader = &transportReader{
2134                 reader: &recvBufferReader{
2135                         ctx:    s.ctx,
2136                         goAway: s.goAway,
2137                         recv:   s.buf,
2138                 },
2139                 windowHandler: func(int) {},
2140         }
2141         testData := make([]byte, 1)
2142         testData[0] = 5
2143         testErr := errors.New("test error")
2144         s.write(recvMsg{data: testData, err: testErr})
2145
2146         inBuf := make([]byte, 1)
2147         actualCount, actualErr := s.Read(inBuf)
2148         if actualCount != 0 {
2149                 t.Errorf("actualCount, _ := s.Read(_) differs; want 0; got %v", actualCount)
2150         }
2151         if actualErr.Error() != testErr.Error() {
2152                 t.Errorf("_ , actualErr := s.Read(_) differs; want actualErr.Error() to be %v; got %v", testErr.Error(), actualErr.Error())
2153         }
2154
2155         s.write(recvMsg{data: testData, err: nil})
2156         s.write(recvMsg{data: testData, err: errors.New("different error from first")})
2157
2158         for i := 0; i < 2; i++ {
2159                 inBuf := make([]byte, 1)
2160                 actualCount, actualErr := s.Read(inBuf)
2161                 if actualCount != 0 {
2162                         t.Errorf("actualCount, _ := s.Read(_) differs; want %v; got %v", 0, actualCount)
2163                 }
2164                 if actualErr.Error() != testErr.Error() {
2165                         t.Errorf("_ , actualErr := s.Read(_) differs; want actualErr.Error() to be %v; got %v", testErr.Error(), actualErr.Error())
2166                 }
2167         }
2168 }
2169
2170 func TestPingPong1B(t *testing.T) {
2171         runPingPongTest(t, 1)
2172 }
2173
2174 func TestPingPong1KB(t *testing.T) {
2175         runPingPongTest(t, 1024)
2176 }
2177
2178 func TestPingPong64KB(t *testing.T) {
2179         runPingPongTest(t, 65536)
2180 }
2181
2182 func TestPingPong1MB(t *testing.T) {
2183         runPingPongTest(t, 1048576)
2184 }
2185
2186 //This is a stress-test of flow control logic.
2187 func runPingPongTest(t *testing.T, msgSize int) {
2188         server, client := setUp(t, 0, 0, pingpong)
2189         defer server.stop()
2190         defer client.Close()
2191         waitWhileTrue(t, func() (bool, error) {
2192                 server.mu.Lock()
2193                 defer server.mu.Unlock()
2194                 if len(server.conns) == 0 {
2195                         return true, fmt.Errorf("timed out while waiting for server transport to be created")
2196                 }
2197                 return false, nil
2198         })
2199         ct := client.(*http2Client)
2200         stream, err := client.NewStream(context.Background(), &CallHdr{})
2201         if err != nil {
2202                 t.Fatalf("Failed to create stream. Err: %v", err)
2203         }
2204         msg := make([]byte, msgSize)
2205         outgoingHeader := make([]byte, 5)
2206         outgoingHeader[0] = byte(0)
2207         binary.BigEndian.PutUint32(outgoingHeader[1:], uint32(msgSize))
2208         opts := &Options{}
2209         incomingHeader := make([]byte, 5)
2210         done := make(chan struct{})
2211         go func() {
2212                 timer := time.NewTimer(time.Second * 5)
2213                 <-timer.C
2214                 close(done)
2215         }()
2216         for {
2217                 select {
2218                 case <-done:
2219                         ct.Write(stream, nil, nil, &Options{Last: true})
2220                         if _, err := stream.Read(incomingHeader); err != io.EOF {
2221                                 t.Fatalf("Client expected EOF from the server. Got: %v", err)
2222                         }
2223                         return
2224                 default:
2225                         if err := ct.Write(stream, outgoingHeader, msg, opts); err != nil {
2226                                 t.Fatalf("Error on client while writing message. Err: %v", err)
2227                         }
2228                         if _, err := stream.Read(incomingHeader); err != nil {
2229                                 t.Fatalf("Error on client while reading data header. Err: %v", err)
2230                         }
2231                         sz := binary.BigEndian.Uint32(incomingHeader[1:])
2232                         recvMsg := make([]byte, int(sz))
2233                         if _, err := stream.Read(recvMsg); err != nil {
2234                                 t.Fatalf("Error on client while reading data. Err: %v", err)
2235                         }
2236                 }
2237         }
2238 }