3 * Copyright 2014 gRPC authors.
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
9 * http://www.apache.org/licenses/LICENSE-2.0
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
38 "golang.org/x/net/context"
39 "golang.org/x/net/http2"
40 "golang.org/x/net/http2/hpack"
41 "google.golang.org/grpc/codes"
42 "google.golang.org/grpc/keepalive"
43 "google.golang.org/grpc/status"
49 startedErr chan error // error (or nil) with server start value
51 conns map[ServerTransport]bool
56 expectedRequest = []byte("ping")
57 expectedResponse = []byte("pong")
58 expectedRequestLarge = make([]byte, initialWindowSize*2)
59 expectedResponseLarge = make([]byte, initialWindowSize*2)
60 expectedInvalidHeaderField = "invalid/content-type"
63 type testStreamHandler struct {
75 encodingRequiredStatus
82 func (h *testStreamHandler) handleStreamAndNotify(s *Stream) {
95 func (h *testStreamHandler) handleStream(t *testing.T, s *Stream) {
96 req := expectedRequest
97 resp := expectedResponse
98 if s.Method() == "foo.Large" {
99 req = expectedRequestLarge
100 resp = expectedResponseLarge
102 p := make([]byte, len(req))
107 if !bytes.Equal(p, req) {
108 t.Fatalf("handleStream got %v, want %v", p, req)
110 // send a response back to the client.
111 h.t.Write(s, nil, resp, &Options{})
112 // send the trailer to end the stream.
113 h.t.WriteStatus(s, status.New(codes.OK, ""))
116 func (h *testStreamHandler) handleStreamPingPong(t *testing.T, s *Stream) {
117 header := make([]byte, 5)
119 if _, err := s.Read(header); err != nil {
121 h.t.WriteStatus(s, status.New(codes.OK, ""))
124 t.Fatalf("Error on server while reading data header: %v", err)
126 sz := binary.BigEndian.Uint32(header[1:])
127 msg := make([]byte, int(sz))
128 if _, err := s.Read(msg); err != nil {
129 t.Fatalf("Error on server while reading message: %v", err)
131 buf := make([]byte, sz+5)
133 binary.BigEndian.PutUint32(buf[1:], uint32(sz))
135 h.t.Write(s, nil, buf, &Options{})
139 func (h *testStreamHandler) handleStreamMisbehave(t *testing.T, s *Stream) {
140 conn, ok := s.ServerTransport().(*http2Server)
142 t.Fatalf("Failed to convert %v to *http2Server", s.ServerTransport())
145 p := make([]byte, http2MaxFrameLen)
146 for sent < initialWindowSize {
147 n := initialWindowSize - sent
148 // The last message may be smaller than http2MaxFrameLen
149 if n <= http2MaxFrameLen {
150 if s.Method() == "foo.Connection" {
151 // Violate connection level flow control window of client but do not
152 // violate any stream level windows.
155 // Violate stream level flow control window of client.
156 p = make([]byte, n+1)
159 conn.controlBuf.put(&dataFrame{s.id, false, p, func() {}})
164 func (h *testStreamHandler) handleStreamEncodingRequiredStatus(t *testing.T, s *Stream) {
165 // raw newline is not accepted by http2 framer so it must be encoded.
166 h.t.WriteStatus(s, encodingTestStatus)
169 func (h *testStreamHandler) handleStreamInvalidHeaderField(t *testing.T, s *Stream) {
170 headerFields := []hpack.HeaderField{}
171 headerFields = append(headerFields, hpack.HeaderField{Name: "content-type", Value: expectedInvalidHeaderField})
172 h.t.controlBuf.put(&headerFrame{
179 func (h *testStreamHandler) handleStreamDelayRead(t *testing.T, s *Stream) {
180 req := expectedRequest
181 resp := expectedResponse
182 if s.Method() == "foo.Large" {
183 req = expectedRequestLarge
184 resp = expectedResponseLarge
186 p := make([]byte, len(req))
188 // Wait before reading. Give time to client to start sending
189 // before server starts reading.
190 time.Sleep(2 * time.Second)
193 t.Fatalf("s.Read(_) = _, %v, want _, <nil>", err)
197 if !bytes.Equal(p, req) {
198 t.Fatalf("handleStream got %v, want %v", p, req)
200 // send a response back to the client.
201 h.t.Write(s, nil, resp, &Options{})
202 // send the trailer to end the stream.
203 h.t.WriteStatus(s, status.New(codes.OK, ""))
206 func (h *testStreamHandler) handleStreamDelayWrite(t *testing.T, s *Stream) {
207 req := expectedRequest
208 resp := expectedResponse
209 if s.Method() == "foo.Large" {
210 req = expectedRequestLarge
211 resp = expectedResponseLarge
213 p := make([]byte, len(req))
216 t.Fatalf("s.Read(_) = _, %v, want _, <nil>", err)
219 if !bytes.Equal(p, req) {
220 t.Fatalf("handleStream got %v, want %v", p, req)
223 // Wait before sending. Give time to client to start reading
224 // before server starts sending.
225 time.Sleep(2 * time.Second)
226 h.t.Write(s, nil, resp, &Options{})
227 // send the trailer to end the stream.
228 h.t.WriteStatus(s, status.New(codes.OK, ""))
231 // start starts server. Other goroutines should block on s.readyChan for further operations.
232 func (s *server) start(t *testing.T, port int, serverConfig *ServerConfig, ht hType) {
235 s.lis, err = net.Listen("tcp", "localhost:0")
237 s.lis, err = net.Listen("tcp", "localhost:"+strconv.Itoa(port))
240 s.startedErr <- fmt.Errorf("failed to listen: %v", err)
243 _, p, err := net.SplitHostPort(s.lis.Addr().String())
245 s.startedErr <- fmt.Errorf("failed to parse listener address: %v", err)
249 s.conns = make(map[ServerTransport]bool)
252 conn, err := s.lis.Accept()
256 transport, err := NewServerTransport("http2", conn, serverConfig)
266 s.conns[transport] = true
267 h := &testStreamHandler{t: transport.(*http2Server)}
272 go transport.HandleStreams(h.handleStreamAndNotify,
273 func(ctx context.Context, _ string) context.Context {
277 go transport.HandleStreams(func(*Stream) {}, // Do nothing to handle the stream.
278 func(ctx context.Context, method string) context.Context {
282 go transport.HandleStreams(func(s *Stream) {
283 go h.handleStreamMisbehave(t, s)
284 }, func(ctx context.Context, method string) context.Context {
287 case encodingRequiredStatus:
288 go transport.HandleStreams(func(s *Stream) {
289 go h.handleStreamEncodingRequiredStatus(t, s)
290 }, func(ctx context.Context, method string) context.Context {
293 case invalidHeaderField:
294 go transport.HandleStreams(func(s *Stream) {
295 go h.handleStreamInvalidHeaderField(t, s)
296 }, func(ctx context.Context, method string) context.Context {
300 go transport.HandleStreams(func(s *Stream) {
301 go h.handleStreamDelayRead(t, s)
302 }, func(ctx context.Context, method string) context.Context {
306 go transport.HandleStreams(func(s *Stream) {
307 go h.handleStreamDelayWrite(t, s)
308 }, func(ctx context.Context, method string) context.Context {
312 go transport.HandleStreams(func(s *Stream) {
313 go h.handleStreamPingPong(t, s)
314 }, func(ctx context.Context, method string) context.Context {
318 go transport.HandleStreams(func(s *Stream) {
319 go h.handleStream(t, s)
320 }, func(ctx context.Context, method string) context.Context {
327 func (s *server) wait(t *testing.T, timeout time.Duration) {
329 case err := <-s.startedErr:
333 case <-time.After(timeout):
334 t.Fatalf("Timed out after %v waiting for server to be ready", timeout)
338 func (s *server) stop() {
341 for c := range s.conns {
348 func setUp(t *testing.T, port int, maxStreams uint32, ht hType) (*server, ClientTransport) {
349 return setUpWithOptions(t, port, &ServerConfig{MaxStreams: maxStreams}, ht, ConnectOptions{})
352 func setUpWithOptions(t *testing.T, port int, serverConfig *ServerConfig, ht hType, copts ConnectOptions) (*server, ClientTransport) {
353 server := &server{startedErr: make(chan error, 1)}
354 go server.start(t, port, serverConfig, ht)
355 server.wait(t, 2*time.Second)
356 addr := "localhost:" + server.port
361 target := TargetInfo{
364 ct, connErr = NewClientTransport(context.Background(), target, copts, 2*time.Second)
366 t.Fatalf("failed to create transport: %v", connErr)
371 func setUpWithNoPingServer(t *testing.T, copts ConnectOptions, done chan net.Conn) ClientTransport {
372 lis, err := net.Listen("tcp", "localhost:0")
374 t.Fatalf("Failed to listen: %v", err)
376 // Launch a non responsive server.
379 conn, err := lis.Accept()
381 t.Errorf("Error at server-side while accepting: %v", err)
387 tr, err := NewClientTransport(context.Background(), TargetInfo{Addr: lis.Addr().String()}, copts, 2*time.Second)
391 if conn, ok := <-done; ok {
394 t.Fatalf("Failed to dial: %v", err)
399 // TestInflightStreamClosing ensures that closing in-flight stream
400 // sends StreamError to concurrent stream reader.
401 func TestInflightStreamClosing(t *testing.T) {
402 serverConfig := &ServerConfig{}
403 server, client := setUpWithOptions(t, 0, serverConfig, suspended, ConnectOptions{})
407 stream, err := client.NewStream(context.Background(), &CallHdr{})
409 t.Fatalf("Client failed to create RPC request: %v", err)
412 donec := make(chan struct{})
413 serr := StreamError{Desc: "client connection is closing"}
416 if _, err := stream.Read(make([]byte, defaultWindowSize)); err != serr {
417 t.Errorf("unexpected Stream error %v, expected %v", err, serr)
421 // should unblock concurrent stream.Read
422 client.CloseStream(stream, serr)
424 // wait for stream.Read error
425 timeout := time.NewTimer(5 * time.Second)
432 t.Fatalf("Test timed out, expected a StreamError.")
436 // TestMaxConnectionIdle tests that a server will send GoAway to a idle client.
437 // An idle client is one who doesn't make any RPC calls for a duration of
438 // MaxConnectionIdle time.
439 func TestMaxConnectionIdle(t *testing.T) {
440 serverConfig := &ServerConfig{
441 KeepaliveParams: keepalive.ServerParameters{
442 MaxConnectionIdle: 2 * time.Second,
445 server, client := setUpWithOptions(t, 0, serverConfig, suspended, ConnectOptions{})
448 stream, err := client.NewStream(context.Background(), &CallHdr{Flush: true})
450 t.Fatalf("Client failed to create RPC request: %v", err)
453 stream.rstStream = true
455 client.CloseStream(stream, nil)
456 // wait for server to see that closed stream and max-age logic to send goaway after no new RPCs are mode
457 timeout := time.NewTimer(time.Second * 4)
459 case <-client.GoAway():
464 t.Fatalf("Test timed out, expected a GoAway from the server.")
468 // TestMaxConenctionIdleNegative tests that a server will not send GoAway to a non-idle(busy) client.
469 func TestMaxConnectionIdleNegative(t *testing.T) {
470 serverConfig := &ServerConfig{
471 KeepaliveParams: keepalive.ServerParameters{
472 MaxConnectionIdle: 2 * time.Second,
475 server, client := setUpWithOptions(t, 0, serverConfig, suspended, ConnectOptions{})
478 _, err := client.NewStream(context.Background(), &CallHdr{Flush: true})
480 t.Fatalf("Client failed to create RPC request: %v", err)
482 timeout := time.NewTimer(time.Second * 4)
484 case <-client.GoAway():
488 t.Fatalf("A non-idle client received a GoAway.")
494 // TestMaxConnectionAge tests that a server will send GoAway after a duration of MaxConnectionAge.
495 func TestMaxConnectionAge(t *testing.T) {
496 serverConfig := &ServerConfig{
497 KeepaliveParams: keepalive.ServerParameters{
498 MaxConnectionAge: 2 * time.Second,
501 server, client := setUpWithOptions(t, 0, serverConfig, suspended, ConnectOptions{})
504 _, err := client.NewStream(context.Background(), &CallHdr{})
506 t.Fatalf("Client failed to create stream: %v", err)
508 // Wait for max-age logic to send GoAway.
509 timeout := time.NewTimer(4 * time.Second)
511 case <-client.GoAway():
516 t.Fatalf("Test timer out, expected a GoAway from the server.")
520 // TestKeepaliveServer tests that a server closes connection with a client that doesn't respond to keepalive pings.
521 func TestKeepaliveServer(t *testing.T) {
522 serverConfig := &ServerConfig{
523 KeepaliveParams: keepalive.ServerParameters{
524 Time: 2 * time.Second,
525 Timeout: 1 * time.Second,
528 server, c := setUpWithOptions(t, 0, serverConfig, suspended, ConnectOptions{})
531 client, err := net.Dial("tcp", server.lis.Addr().String())
533 t.Fatalf("Failed to dial: %v", err)
536 // Set read deadline on client conn so that it doesn't block forever in errorsome cases.
537 client.SetReadDeadline(time.Now().Add(10 * time.Second))
538 // Wait for keepalive logic to close the connection.
539 time.Sleep(4 * time.Second)
540 b := make([]byte, 24)
542 _, err = client.Read(b)
547 t.Fatalf("client.Read(_) = _,%v, want io.EOF", err)
553 // TestKeepaliveServerNegative tests that a server doesn't close connection with a client that responds to keepalive pings.
554 func TestKeepaliveServerNegative(t *testing.T) {
555 serverConfig := &ServerConfig{
556 KeepaliveParams: keepalive.ServerParameters{
557 Time: 2 * time.Second,
558 Timeout: 1 * time.Second,
561 server, client := setUpWithOptions(t, 0, serverConfig, suspended, ConnectOptions{})
564 // Give keepalive logic some time by sleeping.
565 time.Sleep(4 * time.Second)
566 // Assert that client is still active.
567 clientTr := client.(*http2Client)
569 defer clientTr.mu.Unlock()
570 if clientTr.state != reachable {
571 t.Fatalf("Test failed: Expected server-client connection to be healthy.")
575 func TestKeepaliveClientClosesIdleTransport(t *testing.T) {
576 done := make(chan net.Conn, 1)
577 tr := setUpWithNoPingServer(t, ConnectOptions{KeepaliveParams: keepalive.ClientParameters{
578 Time: 2 * time.Second, // Keepalive time = 2 sec.
579 Timeout: 1 * time.Second, // Keepalive timeout = 1 sec.
580 PermitWithoutStream: true, // Run keepalive even with no RPCs.
585 t.Fatalf("Server didn't return connection object")
588 // Sleep for keepalive to close the connection.
589 time.Sleep(4 * time.Second)
590 // Assert that the connection was closed.
591 ct := tr.(*http2Client)
594 if ct.state == reachable {
595 t.Fatalf("Test Failed: Expected client transport to have closed.")
599 func TestKeepaliveClientStaysHealthyOnIdleTransport(t *testing.T) {
600 done := make(chan net.Conn, 1)
601 tr := setUpWithNoPingServer(t, ConnectOptions{KeepaliveParams: keepalive.ClientParameters{
602 Time: 2 * time.Second, // Keepalive time = 2 sec.
603 Timeout: 1 * time.Second, // Keepalive timeout = 1 sec.
608 t.Fatalf("server didn't reutrn connection object")
611 // Give keepalive some time.
612 time.Sleep(4 * time.Second)
613 // Assert that connections is still healthy.
614 ct := tr.(*http2Client)
617 if ct.state != reachable {
618 t.Fatalf("Test failed: Expected client transport to be healthy.")
622 func TestKeepaliveClientClosesWithActiveStreams(t *testing.T) {
623 done := make(chan net.Conn, 1)
624 tr := setUpWithNoPingServer(t, ConnectOptions{KeepaliveParams: keepalive.ClientParameters{
625 Time: 2 * time.Second, // Keepalive time = 2 sec.
626 Timeout: 1 * time.Second, // Keepalive timeout = 1 sec.
631 t.Fatalf("Server didn't return connection object")
635 _, err := tr.NewStream(context.Background(), &CallHdr{Flush: true})
637 t.Fatalf("Failed to create a new stream: %v", err)
639 // Give keepalive some time.
640 time.Sleep(4 * time.Second)
641 // Assert that transport was closed.
642 ct := tr.(*http2Client)
645 if ct.state == reachable {
646 t.Fatalf("Test failed: Expected client transport to have closed.")
650 func TestKeepaliveClientStaysHealthyWithResponsiveServer(t *testing.T) {
651 s, tr := setUpWithOptions(t, 0, &ServerConfig{MaxStreams: math.MaxUint32}, normal, ConnectOptions{KeepaliveParams: keepalive.ClientParameters{
652 Time: 2 * time.Second, // Keepalive time = 2 sec.
653 Timeout: 1 * time.Second, // Keepalive timeout = 1 sec.
654 PermitWithoutStream: true, // Run keepalive even with no RPCs.
658 // Give keep alive some time.
659 time.Sleep(4 * time.Second)
660 // Assert that transport is healthy.
661 ct := tr.(*http2Client)
664 if ct.state != reachable {
665 t.Fatalf("Test failed: Expected client transport to be healthy.")
669 func TestKeepaliveServerEnforcementWithAbusiveClientNoRPC(t *testing.T) {
670 serverConfig := &ServerConfig{
671 KeepalivePolicy: keepalive.EnforcementPolicy{
672 MinTime: 2 * time.Second,
675 clientOptions := ConnectOptions{
676 KeepaliveParams: keepalive.ClientParameters{
677 Time: 50 * time.Millisecond,
678 Timeout: 50 * time.Millisecond,
679 PermitWithoutStream: true,
682 server, client := setUpWithOptions(t, 0, serverConfig, normal, clientOptions)
686 timeout := time.NewTimer(2 * time.Second)
688 case <-client.GoAway():
693 t.Fatalf("Test failed: Expected a GoAway from server.")
695 time.Sleep(500 * time.Millisecond)
696 ct := client.(*http2Client)
699 if ct.state == reachable {
700 t.Fatalf("Test failed: Expected the connection to be closed.")
704 func TestKeepaliveServerEnforcementWithAbusiveClientWithRPC(t *testing.T) {
705 serverConfig := &ServerConfig{
706 KeepalivePolicy: keepalive.EnforcementPolicy{
707 MinTime: 2 * time.Second,
710 clientOptions := ConnectOptions{
711 KeepaliveParams: keepalive.ClientParameters{
712 Time: 50 * time.Millisecond,
713 Timeout: 50 * time.Millisecond,
716 server, client := setUpWithOptions(t, 0, serverConfig, suspended, clientOptions)
720 if _, err := client.NewStream(context.Background(), &CallHdr{Flush: true}); err != nil {
721 t.Fatalf("Client failed to create stream.")
723 timeout := time.NewTimer(2 * time.Second)
725 case <-client.GoAway():
730 t.Fatalf("Test failed: Expected a GoAway from server.")
732 time.Sleep(500 * time.Millisecond)
733 ct := client.(*http2Client)
736 if ct.state == reachable {
737 t.Fatalf("Test failed: Expected the connection to be closed.")
741 func TestKeepaliveServerEnforcementWithObeyingClientNoRPC(t *testing.T) {
742 serverConfig := &ServerConfig{
743 KeepalivePolicy: keepalive.EnforcementPolicy{
744 MinTime: 100 * time.Millisecond,
745 PermitWithoutStream: true,
748 clientOptions := ConnectOptions{
749 KeepaliveParams: keepalive.ClientParameters{
750 Time: 101 * time.Millisecond,
751 Timeout: 50 * time.Millisecond,
752 PermitWithoutStream: true,
755 server, client := setUpWithOptions(t, 0, serverConfig, normal, clientOptions)
759 // Give keepalive enough time.
760 time.Sleep(2 * time.Second)
761 // Assert that connection is healthy.
762 ct := client.(*http2Client)
765 if ct.state != reachable {
766 t.Fatalf("Test failed: Expected connection to be healthy.")
770 func TestKeepaliveServerEnforcementWithObeyingClientWithRPC(t *testing.T) {
771 serverConfig := &ServerConfig{
772 KeepalivePolicy: keepalive.EnforcementPolicy{
773 MinTime: 100 * time.Millisecond,
776 clientOptions := ConnectOptions{
777 KeepaliveParams: keepalive.ClientParameters{
778 Time: 101 * time.Millisecond,
779 Timeout: 50 * time.Millisecond,
782 server, client := setUpWithOptions(t, 0, serverConfig, suspended, clientOptions)
786 if _, err := client.NewStream(context.Background(), &CallHdr{Flush: true}); err != nil {
787 t.Fatalf("Client failed to create stream.")
790 // Give keepalive enough time.
791 time.Sleep(2 * time.Second)
792 // Assert that connection is healthy.
793 ct := client.(*http2Client)
796 if ct.state != reachable {
797 t.Fatalf("Test failed: Expected connection to be healthy.")
801 func TestClientSendAndReceive(t *testing.T) {
802 server, ct := setUp(t, 0, math.MaxUint32, normal)
807 s1, err1 := ct.NewStream(context.Background(), callHdr)
809 t.Fatalf("failed to open stream: %v", err1)
812 t.Fatalf("wrong stream id: %d", s1.id)
814 s2, err2 := ct.NewStream(context.Background(), callHdr)
816 t.Fatalf("failed to open stream: %v", err2)
819 t.Fatalf("wrong stream id: %d", s2.id)
825 if err := ct.Write(s1, nil, expectedRequest, &opts); err != nil && err != io.EOF {
826 t.Fatalf("failed to send data: %v", err)
828 p := make([]byte, len(expectedResponse))
829 _, recvErr := s1.Read(p)
830 if recvErr != nil || !bytes.Equal(p, expectedResponse) {
831 t.Fatalf("Error: %v, want <nil>; Result: %v, want %v", recvErr, p, expectedResponse)
833 _, recvErr = s1.Read(p)
834 if recvErr != io.EOF {
835 t.Fatalf("Error: %v; want <EOF>", recvErr)
841 func TestClientErrorNotify(t *testing.T) {
842 server, ct := setUp(t, 0, math.MaxUint32, normal)
844 // ct.reader should detect the error and activate ct.Error().
849 func performOneRPC(ct ClientTransport) {
854 s, err := ct.NewStream(context.Background(), callHdr)
862 if err := ct.Write(s, []byte{}, expectedRequest, &opts); err == nil || err == io.EOF {
863 time.Sleep(5 * time.Millisecond)
864 // The following s.Recv()'s could error out because the
865 // underlying transport is gone.
868 p := make([]byte, len(expectedResponse))
875 func TestClientMix(t *testing.T) {
876 s, ct := setUp(t, 0, math.MaxUint32, normal)
878 time.Sleep(5 * time.Second)
881 go func(ct ClientTransport) {
885 for i := 0; i < 1000; i++ {
886 time.Sleep(10 * time.Millisecond)
891 func TestLargeMessage(t *testing.T) {
892 server, ct := setUp(t, 0, math.MaxUint32, normal)
897 var wg sync.WaitGroup
898 for i := 0; i < 2; i++ {
902 s, err := ct.NewStream(context.Background(), callHdr)
904 t.Errorf("%v.NewStream(_, _) = _, %v, want _, <nil>", ct, err)
906 if err := ct.Write(s, []byte{}, expectedRequestLarge, &Options{Last: true, Delay: false}); err != nil && err != io.EOF {
907 t.Errorf("%v.Write(_, _, _) = %v, want <nil>", ct, err)
909 p := make([]byte, len(expectedResponseLarge))
910 if _, err := s.Read(p); err != nil || !bytes.Equal(p, expectedResponseLarge) {
911 t.Errorf("s.Read(%v) = _, %v, want %v, <nil>", err, p, expectedResponse)
913 if _, err = s.Read(p); err != io.EOF {
914 t.Errorf("Failed to complete the stream %v; want <EOF>", err)
923 func TestLargeMessageWithDelayRead(t *testing.T) {
924 server, ct := setUp(t, 0, math.MaxUint32, delayRead)
929 var wg sync.WaitGroup
930 for i := 0; i < 2; i++ {
934 s, err := ct.NewStream(context.Background(), callHdr)
936 t.Errorf("%v.NewStream(_, _) = _, %v, want _, <nil>", ct, err)
938 if err := ct.Write(s, []byte{}, expectedRequestLarge, &Options{Last: true, Delay: false}); err != nil && err != io.EOF {
939 t.Errorf("%v.Write(_, _, _) = %v, want <nil>", ct, err)
941 p := make([]byte, len(expectedResponseLarge))
943 // Give time to server to begin sending before client starts reading.
944 time.Sleep(2 * time.Second)
945 if _, err := s.Read(p); err != nil || !bytes.Equal(p, expectedResponseLarge) {
946 t.Errorf("s.Read(_) = _, %v, want _, <nil>", err)
948 if _, err = s.Read(p); err != io.EOF {
949 t.Errorf("Failed to complete the stream %v; want <EOF>", err)
958 func TestLargeMessageDelayWrite(t *testing.T) {
959 server, ct := setUp(t, 0, math.MaxUint32, delayWrite)
964 var wg sync.WaitGroup
965 for i := 0; i < 2; i++ {
969 s, err := ct.NewStream(context.Background(), callHdr)
971 t.Errorf("%v.NewStream(_, _) = _, %v, want _, <nil>", ct, err)
974 // Give time to server to start reading before client starts sending.
975 time.Sleep(2 * time.Second)
976 if err := ct.Write(s, []byte{}, expectedRequestLarge, &Options{Last: true, Delay: false}); err != nil && err != io.EOF {
977 t.Errorf("%v.Write(_, _, _) = %v, want <nil>", ct, err)
979 p := make([]byte, len(expectedResponseLarge))
980 if _, err := s.Read(p); err != nil || !bytes.Equal(p, expectedResponseLarge) {
981 t.Errorf("io.ReadFull(%v) = _, %v, want %v, <nil>", err, p, expectedResponse)
983 if _, err = s.Read(p); err != io.EOF {
984 t.Errorf("Failed to complete the stream %v; want <EOF>", err)
993 func TestGracefulClose(t *testing.T) {
994 server, ct := setUp(t, 0, math.MaxUint32, normal)
999 s, err := ct.NewStream(context.Background(), callHdr)
1001 t.Fatalf("%v.NewStream(_, _) = _, %v, want _, <nil>", ct, err)
1003 if err = ct.GracefulClose(); err != nil {
1004 t.Fatalf("%v.GracefulClose() = %v, want <nil>", ct, err)
1006 var wg sync.WaitGroup
1007 // Expect the failure for all the follow-up streams because ct has been closed gracefully.
1008 for i := 0; i < 100; i++ {
1012 if _, err := ct.NewStream(context.Background(), callHdr); err != ErrStreamDrain {
1013 t.Errorf("%v.NewStream(_, _) = _, %v, want _, %v", ct, err, ErrStreamDrain)
1021 // The stream which was created before graceful close can still proceed.
1022 if err := ct.Write(s, nil, expectedRequest, &opts); err != nil && err != io.EOF {
1023 t.Fatalf("%v.Write(_, _, _) = %v, want <nil>", ct, err)
1025 p := make([]byte, len(expectedResponse))
1026 if _, err := s.Read(p); err != nil || !bytes.Equal(p, expectedResponse) {
1027 t.Fatalf("s.Read(%v) = _, %v, want %v, <nil>", err, p, expectedResponse)
1029 if _, err = s.Read(p); err != io.EOF {
1030 t.Fatalf("Failed to complete the stream %v; want <EOF>", err)
1037 func TestLargeMessageSuspension(t *testing.T) {
1038 server, ct := setUp(t, 0, math.MaxUint32, suspended)
1039 callHdr := &CallHdr{
1041 Method: "foo.Large",
1043 // Set a long enough timeout for writing a large message out.
1044 ctx, cancel := context.WithTimeout(context.Background(), time.Second)
1046 s, err := ct.NewStream(ctx, callHdr)
1048 t.Fatalf("failed to open stream: %v", err)
1050 // Write should not be done successfully due to flow control.
1051 msg := make([]byte, initialWindowSize*8)
1052 err = ct.Write(s, nil, msg, &Options{Last: true, Delay: false})
1053 expectedErr := streamErrorf(codes.DeadlineExceeded, "%v", context.DeadlineExceeded)
1054 if err != expectedErr {
1055 t.Fatalf("Write got %v, want %v", err, expectedErr)
1061 func TestMaxStreams(t *testing.T) {
1062 server, ct := setUp(t, 0, 1, suspended)
1063 callHdr := &CallHdr{
1065 Method: "foo.Large",
1067 // Have a pending stream which takes all streams quota.
1068 s, err := ct.NewStream(context.Background(), callHdr)
1070 t.Fatalf("Failed to open stream: %v", err)
1072 cc, ok := ct.(*http2Client)
1074 t.Fatalf("Failed to convert %v to *http2Client", ct)
1076 done := make(chan struct{})
1077 ch := make(chan int)
1078 ready := make(chan struct{})
1082 case <-time.After(5 * time.Millisecond):
1088 case <-time.After(5 * time.Second):
1100 t.Fatalf("Client has not received the max stream setting in 5 seconds.")
1103 // cc.maxStreams should be equal to 1 after having received settings frame from
1105 if cc.maxStreams == 1 {
1108 case <-cc.streamsQuota.acquire():
1109 t.Fatalf("streamsQuota.acquire() becomes readable mistakenly.")
1111 cc.streamsQuota.mu.Lock()
1112 quota := cc.streamsQuota.quota
1113 cc.streamsQuota.mu.Unlock()
1115 t.Fatalf("streamsQuota.quota got non-zero quota mistakenly.")
1123 // Close the pending stream so that the streams quota becomes available for the next new stream.
1124 ct.CloseStream(s, nil)
1126 case i := <-cc.streamsQuota.acquire():
1128 t.Fatalf("streamsQuota.acquire() got %d quota, want 1.", i)
1130 cc.streamsQuota.add(i)
1132 t.Fatalf("streamsQuota.acquire() is not readable.")
1134 if _, err := ct.NewStream(context.Background(), callHdr); err != nil {
1135 t.Fatalf("Failed to open stream: %v", err)
1141 func TestServerContextCanceledOnClosedConnection(t *testing.T) {
1142 server, ct := setUp(t, 0, math.MaxUint32, suspended)
1143 callHdr := &CallHdr{
1148 // Wait until the server transport is setup.
1151 if len(server.conns) == 0 {
1153 time.Sleep(time.Millisecond)
1156 for k := range server.conns {
1158 sc, ok = k.(*http2Server)
1160 t.Fatalf("Failed to convert %v to *http2Server", k)
1166 cc, ok := ct.(*http2Client)
1168 t.Fatalf("Failed to convert %v to *http2Client", ct)
1170 s, err := ct.NewStream(context.Background(), callHdr)
1172 t.Fatalf("Failed to open stream: %v", err)
1174 cc.controlBuf.put(&dataFrame{s.id, false, make([]byte, http2MaxFrameLen), func() {}})
1175 // Loop until the server side stream is created.
1178 time.Sleep(time.Second)
1180 if len(sc.activeStreams) == 0 {
1184 ss = sc.activeStreams[s.id]
1190 case <-ss.Context().Done():
1191 if ss.Context().Err() != context.Canceled {
1192 t.Fatalf("ss.Context().Err() got %v, want %v", ss.Context().Err(), context.Canceled)
1194 case <-time.After(5 * time.Second):
1195 t.Fatalf("Failed to cancel the context of the sever side stream.")
1200 func TestClientConnDecoupledFromApplicationRead(t *testing.T) {
1201 connectOptions := ConnectOptions{
1202 InitialWindowSize: defaultWindowSize,
1203 InitialConnWindowSize: defaultWindowSize,
1205 server, client := setUpWithOptions(t, 0, &ServerConfig{}, notifyCall, connectOptions)
1207 defer client.Close()
1209 waitWhileTrue(t, func() (bool, error) {
1211 defer server.mu.Unlock()
1213 if len(server.conns) == 0 {
1214 return true, fmt.Errorf("timed-out while waiting for connection to be created on the server")
1221 for k := range server.conns {
1222 st = k.(*http2Server)
1224 notifyChan := make(chan struct{})
1225 server.h.notify = notifyChan
1227 cstream1, err := client.NewStream(context.Background(), &CallHdr{Flush: true})
1229 t.Fatalf("Client failed to create first stream. Err: %v", err)
1233 var sstream1 *Stream
1234 // Access stream on the server.
1236 for _, v := range st.activeStreams {
1237 if v.id == cstream1.id {
1242 if sstream1 == nil {
1243 t.Fatalf("Didn't find stream corresponding to client cstream.id: %v on the server", cstream1.id)
1245 // Exhaust client's connection window.
1246 if err := st.Write(sstream1, []byte{}, make([]byte, defaultWindowSize), &Options{}); err != nil {
1247 t.Fatalf("Server failed to write data. Err: %v", err)
1249 notifyChan = make(chan struct{})
1251 server.h.notify = notifyChan
1253 // Create another stream on client.
1254 cstream2, err := client.NewStream(context.Background(), &CallHdr{Flush: true})
1256 t.Fatalf("Client failed to create second stream. Err: %v", err)
1259 var sstream2 *Stream
1261 for _, v := range st.activeStreams {
1262 if v.id == cstream2.id {
1267 if sstream2 == nil {
1268 t.Fatalf("Didn't find stream corresponding to client cstream.id: %v on the server", cstream2.id)
1270 // Server should be able to send data on the new stream, even though the client hasn't read anything on the first stream.
1271 if err := st.Write(sstream2, []byte{}, make([]byte, defaultWindowSize), &Options{}); err != nil {
1272 t.Fatalf("Server failed to write data. Err: %v", err)
1275 // Client should be able to read data on second stream.
1276 if _, err := cstream2.Read(make([]byte, defaultWindowSize)); err != nil {
1277 t.Fatalf("_.Read(_) = _, %v, want _, <nil>", err)
1280 // Client should be able to read data on first stream.
1281 if _, err := cstream1.Read(make([]byte, defaultWindowSize)); err != nil {
1282 t.Fatalf("_.Read(_) = _, %v, want _, <nil>", err)
1286 func TestServerConnDecoupledFromApplicationRead(t *testing.T) {
1287 serverConfig := &ServerConfig{
1288 InitialWindowSize: defaultWindowSize,
1289 InitialConnWindowSize: defaultWindowSize,
1291 server, client := setUpWithOptions(t, 0, serverConfig, suspended, ConnectOptions{})
1293 defer client.Close()
1294 waitWhileTrue(t, func() (bool, error) {
1296 defer server.mu.Unlock()
1298 if len(server.conns) == 0 {
1299 return true, fmt.Errorf("timed-out while waiting for connection to be created on the server")
1305 for k := range server.conns {
1306 st = k.(*http2Server)
1309 cstream1, err := client.NewStream(context.Background(), &CallHdr{Flush: true})
1311 t.Fatalf("Failed to create 1st stream. Err: %v", err)
1313 // Exhaust server's connection window.
1314 if err := client.Write(cstream1, nil, make([]byte, defaultWindowSize), &Options{Last: true}); err != nil {
1315 t.Fatalf("Client failed to write data. Err: %v", err)
1317 //Client should be able to create another stream and send data on it.
1318 cstream2, err := client.NewStream(context.Background(), &CallHdr{Flush: true})
1320 t.Fatalf("Failed to create 2nd stream. Err: %v", err)
1322 if err := client.Write(cstream2, nil, make([]byte, defaultWindowSize), &Options{}); err != nil {
1323 t.Fatalf("Client failed to write data. Err: %v", err)
1325 // Get the streams on server.
1326 waitWhileTrue(t, func() (bool, error) {
1328 defer st.mu.Unlock()
1330 if len(st.activeStreams) != 2 {
1331 return true, fmt.Errorf("timed-out while waiting for server to have created the streams")
1335 var sstream1 *Stream
1337 for _, v := range st.activeStreams {
1343 // Trying to write more on a max-ed out stream should result in a RST_STREAM from the server.
1344 ct := client.(*http2Client)
1345 ct.controlBuf.put(&dataFrame{cstream2.id, true, make([]byte, 1), func() {}})
1346 code := http2ErrConvTab[http2.ErrCodeFlowControl]
1347 waitWhileTrue(t, func() (bool, error) {
1349 defer cstream2.mu.Unlock()
1350 if cstream2.status.Code() != code {
1351 return true, fmt.Errorf("want code = %v, got %v", code, cstream2.status.Code())
1355 // Reading from the stream on server should succeed.
1356 if _, err := sstream1.Read(make([]byte, defaultWindowSize)); err != nil {
1357 t.Fatalf("_.Read(_) = %v, want <nil>", err)
1360 if _, err := sstream1.Read(make([]byte, 1)); err != io.EOF {
1361 t.Fatalf("_.Read(_) = %v, want io.EOF", err)
1366 func TestServerWithMisbehavedClient(t *testing.T) {
1367 server, ct := setUp(t, 0, math.MaxUint32, suspended)
1368 callHdr := &CallHdr{
1373 // Wait until the server transport is setup.
1376 if len(server.conns) == 0 {
1378 time.Sleep(time.Millisecond)
1381 for k := range server.conns {
1383 sc, ok = k.(*http2Server)
1385 t.Fatalf("Failed to convert %v to *http2Server", k)
1391 cc, ok := ct.(*http2Client)
1393 t.Fatalf("Failed to convert %v to *http2Client", ct)
1395 // Test server behavior for violation of stream flow control window size restriction.
1396 s, err := ct.NewStream(context.Background(), callHdr)
1398 t.Fatalf("Failed to open stream: %v", err)
1401 // Drain the stream flow control window
1402 cc.controlBuf.put(&dataFrame{s.id, false, make([]byte, http2MaxFrameLen), func() {}})
1403 sent += http2MaxFrameLen
1404 // Wait until the server creates the corresponding stream and receive some data.
1407 time.Sleep(time.Millisecond)
1409 if len(sc.activeStreams) == 0 {
1413 ss = sc.activeStreams[s.id]
1416 if ss.fc.pendingData > 0 {
1422 if ss.fc.pendingData != http2MaxFrameLen || ss.fc.pendingUpdate != 0 || sc.fc.pendingData != 0 || sc.fc.pendingUpdate != 0 {
1423 t.Fatalf("Server mistakenly updates inbound flow control params: got %d, %d, %d, %d; want %d, %d, %d, %d", ss.fc.pendingData, ss.fc.pendingUpdate, sc.fc.pendingData, sc.fc.pendingUpdate, http2MaxFrameLen, 0, 0, 0)
1425 // Keep sending until the server inbound window is drained for that stream.
1426 for sent <= initialWindowSize {
1427 cc.controlBuf.put(&dataFrame{s.id, false, make([]byte, 1), func() {}})
1430 // Server sent a resetStream for s already.
1431 code := http2ErrConvTab[http2.ErrCodeFlowControl]
1432 if _, err := s.Read(make([]byte, 1)); err != io.EOF {
1433 t.Fatalf("%v got err %v want <EOF>", s, err)
1435 if s.status.Code() != code {
1436 t.Fatalf("%v got status %v; want Code=%v", s, s.status, code)
1439 ct.CloseStream(s, nil)
1444 func TestClientWithMisbehavedServer(t *testing.T) {
1445 // Turn off BDP estimation so that the server can
1446 // violate stream window.
1447 connectOptions := ConnectOptions{
1448 InitialWindowSize: initialWindowSize,
1450 server, ct := setUpWithOptions(t, 0, &ServerConfig{}, misbehaved, connectOptions)
1451 callHdr := &CallHdr{
1453 Method: "foo.Stream",
1455 conn, ok := ct.(*http2Client)
1457 t.Fatalf("Failed to convert %v to *http2Client", ct)
1459 // Test the logic for the violation of stream flow control window size restriction.
1460 s, err := ct.NewStream(context.Background(), callHdr)
1462 t.Fatalf("Failed to open stream: %v", err)
1464 d := make([]byte, 1)
1465 if err := ct.Write(s, nil, d, &Options{Last: true, Delay: false}); err != nil && err != io.EOF {
1466 t.Fatalf("Failed to write: %v", err)
1468 // Read without window update.
1470 p := make([]byte, http2MaxFrameLen)
1471 if _, err = s.trReader.(*transportReader).reader.Read(p); err != nil {
1475 if s.fc.pendingData <= initialWindowSize || s.fc.pendingUpdate != 0 || conn.fc.pendingData != 0 || conn.fc.pendingUpdate != 0 {
1476 t.Fatalf("Client mistakenly updates inbound flow control params: got %d, %d, %d, %d; want >%d, %d, %d, >%d", s.fc.pendingData, s.fc.pendingUpdate, conn.fc.pendingData, conn.fc.pendingUpdate, initialWindowSize, 0, 0, 0)
1480 t.Fatalf("Got err %v, want <EOF>", err)
1482 if s.status.Code() != codes.Internal {
1483 t.Fatalf("Got s.status %v, want s.status.Code()=Internal", s.status)
1486 conn.CloseStream(s, err)
1491 var encodingTestStatus = status.New(codes.Internal, "\n")
1493 func TestEncodingRequiredStatus(t *testing.T) {
1494 server, ct := setUp(t, 0, math.MaxUint32, encodingRequiredStatus)
1495 callHdr := &CallHdr{
1499 s, err := ct.NewStream(context.Background(), callHdr)
1507 if err := ct.Write(s, nil, expectedRequest, &opts); err != nil && err != io.EOF {
1508 t.Fatalf("Failed to write the request: %v", err)
1510 p := make([]byte, http2MaxFrameLen)
1511 if _, err := s.trReader.(*transportReader).Read(p); err != io.EOF {
1512 t.Fatalf("Read got error %v, want %v", err, io.EOF)
1514 if !reflect.DeepEqual(s.Status(), encodingTestStatus) {
1515 t.Fatalf("stream with status %v, want %v", s.Status(), encodingTestStatus)
1521 func TestInvalidHeaderField(t *testing.T) {
1522 server, ct := setUp(t, 0, math.MaxUint32, invalidHeaderField)
1523 callHdr := &CallHdr{
1527 s, err := ct.NewStream(context.Background(), callHdr)
1535 if err := ct.Write(s, nil, expectedRequest, &opts); err != nil && err != io.EOF {
1536 t.Fatalf("Failed to write the request: %v", err)
1538 p := make([]byte, http2MaxFrameLen)
1539 _, err = s.trReader.(*transportReader).Read(p)
1540 if se, ok := err.(StreamError); !ok || se.Code != codes.FailedPrecondition || !strings.Contains(err.Error(), expectedInvalidHeaderField) {
1541 t.Fatalf("Read got error %v, want error with code %s and contains %q", err, codes.FailedPrecondition, expectedInvalidHeaderField)
1547 func TestStreamContext(t *testing.T) {
1548 expectedStream := &Stream{}
1549 ctx := newContextWithStream(context.Background(), expectedStream)
1550 s, ok := StreamFromContext(ctx)
1551 if !ok || expectedStream != s {
1552 t.Fatalf("GetStreamFromContext(%v) = %v, %t, want: %v, true", ctx, s, ok, expectedStream)
1556 func TestIsReservedHeader(t *testing.T) {
1561 {"", false}, // but should be rejected earlier
1563 {"content-type", true},
1564 {"grpc-message-type", true},
1565 {"grpc-encoding", true},
1566 {"grpc-message", true},
1567 {"grpc-status", true},
1568 {"grpc-timeout", true},
1571 for _, tt := range tests {
1572 got := isReservedHeader(tt.h)
1574 t.Errorf("isReservedHeader(%q) = %v; want %v", tt.h, got, tt.want)
1579 func TestContextErr(t *testing.T) {
1580 for _, test := range []struct {
1586 {context.DeadlineExceeded, StreamError{codes.DeadlineExceeded, context.DeadlineExceeded.Error()}},
1587 {context.Canceled, StreamError{codes.Canceled, context.Canceled.Error()}},
1589 err := ContextErr(test.errIn)
1590 if err != test.errOut {
1591 t.Fatalf("ContextErr{%v} = %v \nwant %v", test.errIn, err, test.errOut)
1596 func max(a, b int32) int32 {
1603 type windowSizeConfig struct {
1610 func TestAccountCheckWindowSizeWithLargeWindow(t *testing.T) {
1611 wc := windowSizeConfig{
1612 serverStream: 10 * 1024 * 1024,
1613 serverConn: 12 * 1024 * 1024,
1614 clientStream: 6 * 1024 * 1024,
1615 clientConn: 8 * 1024 * 1024,
1617 testAccountCheckWindowSize(t, wc)
1620 func TestAccountCheckWindowSizeWithSmallWindow(t *testing.T) {
1621 wc := windowSizeConfig{
1622 serverStream: defaultWindowSize,
1623 // Note this is smaller than initialConnWindowSize which is the current default.
1624 serverConn: defaultWindowSize,
1625 clientStream: defaultWindowSize,
1626 clientConn: defaultWindowSize,
1628 testAccountCheckWindowSize(t, wc)
1631 func testAccountCheckWindowSize(t *testing.T, wc windowSizeConfig) {
1632 serverConfig := &ServerConfig{
1633 InitialWindowSize: wc.serverStream,
1634 InitialConnWindowSize: wc.serverConn,
1636 connectOptions := ConnectOptions{
1637 InitialWindowSize: wc.clientStream,
1638 InitialConnWindowSize: wc.clientConn,
1640 server, client := setUpWithOptions(t, 0, serverConfig, suspended, connectOptions)
1642 defer client.Close()
1644 // Wait for server conns to be populated with new server transport.
1645 waitWhileTrue(t, func() (bool, error) {
1647 defer server.mu.Unlock()
1648 if len(server.conns) == 0 {
1649 return true, fmt.Errorf("timed out waiting for server transport to be created")
1655 for k := range server.conns {
1656 st = k.(*http2Server)
1659 ct := client.(*http2Client)
1660 cstream, err := client.NewStream(context.Background(), &CallHdr{Flush: true})
1662 t.Fatalf("Failed to create stream. Err: %v", err)
1664 // Wait for server to receive headers.
1665 waitWhileTrue(t, func() (bool, error) {
1667 defer st.mu.Unlock()
1668 if len(st.activeStreams) == 0 {
1669 return true, fmt.Errorf("timed out waiting for server to receive headers")
1673 // Sleeping to make sure the settings are applied in case of negative test.
1674 time.Sleep(time.Second)
1676 waitWhileTrue(t, func() (bool, error) {
1680 if lim != uint32(serverConfig.InitialConnWindowSize) {
1681 return true, fmt.Errorf("Server transport flow control window size: got %v, want %v", lim, serverConfig.InitialConnWindowSize)
1686 ctx, cancel := context.WithTimeout(context.Background(), time.Second)
1687 serverSendQuota, err := wait(ctx, context.Background(), nil, nil, st.sendQuotaPool.acquire())
1689 t.Fatalf("Error while acquiring sendQuota on server. Err: %v", err)
1692 st.sendQuotaPool.add(serverSendQuota)
1693 if serverSendQuota != int(connectOptions.InitialConnWindowSize) {
1694 t.Fatalf("Server send quota(%v) not equal to client's window size(%v) on conn.", serverSendQuota, connectOptions.InitialConnWindowSize)
1697 ssq := st.streamSendQuota
1699 if ssq != uint32(connectOptions.InitialWindowSize) {
1700 t.Fatalf("Server stream send quota(%v) not equal to client's window size(%v) on stream.", ssq, connectOptions.InitialWindowSize)
1703 limit := ct.fc.limit
1705 if limit != uint32(connectOptions.InitialConnWindowSize) {
1706 t.Fatalf("Client transport flow control window size is %v, want %v", limit, connectOptions.InitialConnWindowSize)
1708 ctx, cancel = context.WithTimeout(context.Background(), time.Second)
1709 clientSendQuota, err := wait(ctx, context.Background(), nil, nil, ct.sendQuotaPool.acquire())
1711 t.Fatalf("Error while acquiring sendQuota on client. Err: %v", err)
1714 ct.sendQuotaPool.add(clientSendQuota)
1715 if clientSendQuota != int(serverConfig.InitialConnWindowSize) {
1716 t.Fatalf("Client send quota(%v) not equal to server's window size(%v) on conn.", clientSendQuota, serverConfig.InitialConnWindowSize)
1719 ssq = ct.streamSendQuota
1721 if ssq != uint32(serverConfig.InitialWindowSize) {
1722 t.Fatalf("Client stream send quota(%v) not equal to server's window size(%v) on stream.", ssq, serverConfig.InitialWindowSize)
1724 cstream.fc.mu.Lock()
1725 limit = cstream.fc.limit
1726 cstream.fc.mu.Unlock()
1727 if limit != uint32(connectOptions.InitialWindowSize) {
1728 t.Fatalf("Client stream flow control window size is %v, want %v", limit, connectOptions.InitialWindowSize)
1732 for _, v := range st.activeStreams {
1736 sstream.fc.mu.Lock()
1737 limit = sstream.fc.limit
1738 sstream.fc.mu.Unlock()
1739 if limit != uint32(serverConfig.InitialWindowSize) {
1740 t.Fatalf("Server stream flow control window size is %v, want %v", limit, serverConfig.InitialWindowSize)
1744 // Check accounting on both sides after sending and receiving large messages.
1745 func TestAccountCheckExpandingWindow(t *testing.T) {
1746 server, client := setUp(t, 0, 0, pingpong)
1748 defer client.Close()
1749 waitWhileTrue(t, func() (bool, error) {
1751 defer server.mu.Unlock()
1752 if len(server.conns) == 0 {
1753 return true, fmt.Errorf("timed out while waiting for server transport to be created")
1759 for k := range server.conns {
1760 st = k.(*http2Server)
1763 ct := client.(*http2Client)
1764 cstream, err := client.NewStream(context.Background(), &CallHdr{Flush: true})
1766 t.Fatalf("Failed to create stream. Err: %v", err)
1769 msgSize := 65535 * 16 * 2
1770 msg := make([]byte, msgSize)
1771 buf := make([]byte, msgSize+5)
1773 binary.BigEndian.PutUint32(buf[1:], uint32(msgSize))
1776 header := make([]byte, 5)
1777 for i := 1; i <= 10; i++ {
1778 if err := ct.Write(cstream, nil, buf, &opts); err != nil {
1779 t.Fatalf("Error on client while writing message: %v", err)
1781 if _, err := cstream.Read(header); err != nil {
1782 t.Fatalf("Error on client while reading data frame header: %v", err)
1784 sz := binary.BigEndian.Uint32(header[1:])
1785 recvMsg := make([]byte, int(sz))
1786 if _, err := cstream.Read(recvMsg); err != nil {
1787 t.Fatalf("Error on client while reading data: %v", err)
1789 if len(recvMsg) != len(msg) {
1790 t.Fatalf("Length of message received by client: %v, want: %v", len(recvMsg), len(msg))
1794 ct.Write(cstream, nil, nil, &Options{Last: true}) // Close the stream.
1795 if _, err := cstream.Read(header); err != io.EOF {
1796 t.Fatalf("Client expected an EOF from the server. Got: %v", err)
1801 for _, v := range st.activeStreams {
1806 waitWhileTrue(t, func() (bool, error) {
1807 // Check that pendingData and delta on flow control windows on both sides are 0.
1808 cstream.fc.mu.Lock()
1809 if cstream.fc.delta != 0 {
1810 cstream.fc.mu.Unlock()
1811 return true, fmt.Errorf("delta on flow control window of client stream is non-zero")
1813 if cstream.fc.pendingData != 0 {
1814 cstream.fc.mu.Unlock()
1815 return true, fmt.Errorf("pendingData on flow control window of client stream is non-zero")
1817 cstream.fc.mu.Unlock()
1818 sstream.fc.mu.Lock()
1819 if sstream.fc.delta != 0 {
1820 sstream.fc.mu.Unlock()
1821 return true, fmt.Errorf("delta on flow control window of server stream is non-zero")
1823 if sstream.fc.pendingData != 0 {
1824 sstream.fc.mu.Unlock()
1825 return true, fmt.Errorf("pendingData on flow control window of sercer stream is non-zero")
1827 sstream.fc.mu.Unlock()
1829 if ct.fc.delta != 0 {
1831 return true, fmt.Errorf("delta on flow control window of client transport is non-zero")
1833 if ct.fc.pendingData != 0 {
1835 return true, fmt.Errorf("pendingData on flow control window of client transport is non-zero")
1839 if st.fc.delta != 0 {
1841 return true, fmt.Errorf("delta on flow control window of server transport is non-zero")
1843 if st.fc.pendingData != 0 {
1845 return true, fmt.Errorf("pendingData on flow control window of server transport is non-zero")
1849 // Check flow conrtrol window on client stream is equal to out flow on server stream.
1850 ctx, cancel := context.WithTimeout(context.Background(), time.Second)
1851 serverStreamSendQuota, err := wait(ctx, context.Background(), nil, nil, sstream.sendQuotaPool.acquire())
1854 return true, fmt.Errorf("error while acquiring server stream send quota. Err: %v", err)
1856 sstream.sendQuotaPool.add(serverStreamSendQuota)
1857 cstream.fc.mu.Lock()
1858 clientEst := cstream.fc.limit - cstream.fc.pendingUpdate
1859 cstream.fc.mu.Unlock()
1860 if uint32(serverStreamSendQuota) != clientEst {
1861 return true, fmt.Errorf("server stream outflow: %v, estimated by client: %v", serverStreamSendQuota, clientEst)
1864 // Check flow control window on server stream is equal to out flow on client stream.
1865 ctx, cancel = context.WithTimeout(context.Background(), time.Second)
1866 clientStreamSendQuota, err := wait(ctx, context.Background(), nil, nil, cstream.sendQuotaPool.acquire())
1869 return true, fmt.Errorf("error while acquiring client stream send quota. Err: %v", err)
1871 cstream.sendQuotaPool.add(clientStreamSendQuota)
1872 sstream.fc.mu.Lock()
1873 serverEst := sstream.fc.limit - sstream.fc.pendingUpdate
1874 sstream.fc.mu.Unlock()
1875 if uint32(clientStreamSendQuota) != serverEst {
1876 return true, fmt.Errorf("client stream outflow: %v. estimated by server: %v", clientStreamSendQuota, serverEst)
1879 // Check flow control window on client transport is equal to out flow of server transport.
1880 ctx, cancel = context.WithTimeout(context.Background(), time.Second)
1881 serverTrSendQuota, err := wait(ctx, context.Background(), nil, nil, st.sendQuotaPool.acquire())
1884 return true, fmt.Errorf("error while acquring server transport send quota. Err: %v", err)
1886 st.sendQuotaPool.add(serverTrSendQuota)
1888 clientEst = ct.fc.limit - ct.fc.pendingUpdate
1890 if uint32(serverTrSendQuota) != clientEst {
1891 return true, fmt.Errorf("server transport outflow: %v, estimated by client: %v", serverTrSendQuota, clientEst)
1894 // Check flow control window on server transport is equal to out flow of client transport.
1895 ctx, cancel = context.WithTimeout(context.Background(), time.Second)
1896 clientTrSendQuota, err := wait(ctx, context.Background(), nil, nil, ct.sendQuotaPool.acquire())
1899 return true, fmt.Errorf("error while acquiring client transport send quota. Err: %v", err)
1901 ct.sendQuotaPool.add(clientTrSendQuota)
1903 serverEst = st.fc.limit - st.fc.pendingUpdate
1905 if uint32(clientTrSendQuota) != serverEst {
1906 return true, fmt.Errorf("client transport outflow: %v, estimated by client: %v", clientTrSendQuota, serverEst)
1914 func waitWhileTrue(t *testing.T, condition func() (bool, error)) {
1919 timer := time.NewTimer(time.Second * 5)
1921 wait, err = condition()
1925 t.Fatalf(err.Error())
1927 time.Sleep(50 * time.Millisecond)
1938 // A function of type writeHeaders writes out
1939 // http status with the given stream ID using the given framer.
1940 type writeHeaders func(*http2.Framer, uint32, int) error
1942 func writeOneHeader(framer *http2.Framer, sid uint32, httpStatus int) error {
1943 var buf bytes.Buffer
1944 henc := hpack.NewEncoder(&buf)
1945 henc.WriteField(hpack.HeaderField{Name: ":status", Value: fmt.Sprint(httpStatus)})
1946 return framer.WriteHeaders(http2.HeadersFrameParam{
1948 BlockFragment: buf.Bytes(),
1954 func writeTwoHeaders(framer *http2.Framer, sid uint32, httpStatus int) error {
1955 var buf bytes.Buffer
1956 henc := hpack.NewEncoder(&buf)
1957 henc.WriteField(hpack.HeaderField{
1959 Value: fmt.Sprint(http.StatusOK),
1961 if err := framer.WriteHeaders(http2.HeadersFrameParam{
1963 BlockFragment: buf.Bytes(),
1969 henc.WriteField(hpack.HeaderField{
1971 Value: fmt.Sprint(httpStatus),
1973 return framer.WriteHeaders(http2.HeadersFrameParam{
1975 BlockFragment: buf.Bytes(),
1981 type httpServer struct {
1987 func (s *httpServer) start(t *testing.T, lis net.Listener) {
1988 // Launch an HTTP server to send back header with httpStatus.
1991 s.conn, err = lis.Accept()
1993 t.Errorf("Error accepting connection: %v", err)
1996 defer s.conn.Close()
1997 // Read preface sent by client.
1998 if _, err = io.ReadFull(s.conn, make([]byte, len(http2.ClientPreface))); err != nil {
1999 t.Errorf("Error at server-side while reading preface from cleint. Err: %v", err)
2002 reader := bufio.NewReaderSize(s.conn, defaultWriteBufSize)
2003 writer := bufio.NewWriterSize(s.conn, defaultReadBufSize)
2004 framer := http2.NewFramer(writer, reader)
2005 if err = framer.WriteSettingsAck(); err != nil {
2006 t.Errorf("Error at server-side while sending Settings ack. Err: %v", err)
2010 // Read frames until a header is received.
2012 frame, err := framer.ReadFrame()
2014 t.Errorf("Error at server-side while reading frame. Err: %v", err)
2017 if hframe, ok := frame.(*http2.HeadersFrame); ok {
2018 sid = hframe.Header().StreamID
2022 if err = s.wh(framer, sid, s.httpStatus); err != nil {
2023 t.Errorf("Error at server-side while writing headers. Err: %v", err)
2030 func (s *httpServer) cleanUp() {
2036 func setUpHTTPStatusTest(t *testing.T, httpStatus int, wh writeHeaders) (stream *Stream, cleanUp func()) {
2041 client ClientTransport
2059 lis, err = net.Listen("tcp", "localhost:0")
2061 t.Fatalf("Failed to listen. Err: %v", err)
2063 server = &httpServer{
2064 httpStatus: httpStatus,
2067 server.start(t, lis)
2068 client, err = newHTTP2Client(context.Background(), TargetInfo{Addr: lis.Addr().String()}, ConnectOptions{}, 2*time.Second)
2070 t.Fatalf("Error creating client. Err: %v", err)
2072 stream, err = client.NewStream(context.Background(), &CallHdr{Method: "bogus/method", Flush: true})
2074 t.Fatalf("Error creating stream at client-side. Err: %v", err)
2079 func TestHTTPToGRPCStatusMapping(t *testing.T) {
2080 for k := range httpStatusConvTab {
2081 testHTTPToGRPCStatusMapping(t, k, writeOneHeader)
2085 func testHTTPToGRPCStatusMapping(t *testing.T, httpStatus int, wh writeHeaders) {
2086 stream, cleanUp := setUpHTTPStatusTest(t, httpStatus, wh)
2088 want := httpStatusConvTab[httpStatus]
2089 buf := make([]byte, 8)
2090 _, err := stream.Read(buf)
2092 t.Fatalf("Stream.Read(_) unexpectedly returned no error. Expected stream error with code %v", want)
2094 serr, ok := err.(StreamError)
2096 t.Fatalf("err.(Type) = %T, want StreamError", err)
2098 if want != serr.Code {
2099 t.Fatalf("Want error code: %v, got: %v", want, serr.Code)
2103 func TestHTTPStatusOKAndMissingGRPCStatus(t *testing.T) {
2104 stream, cleanUp := setUpHTTPStatusTest(t, http.StatusOK, writeOneHeader)
2106 buf := make([]byte, 8)
2107 _, err := stream.Read(buf)
2109 t.Fatalf("stream.Read(_) = _, %v, want _, io.EOF", err)
2111 want := codes.Unknown
2113 defer stream.mu.Unlock()
2114 if stream.status.Code() != want {
2115 t.Fatalf("Status code of stream: %v, want: %v", stream.status.Code(), want)
2119 func TestHTTPStatusNottOKAndMissingGRPCStatusInSecondHeader(t *testing.T) {
2120 testHTTPToGRPCStatusMapping(t, http.StatusUnauthorized, writeTwoHeaders)
2123 // If any error occurs on a call to Stream.Read, future calls
2124 // should continue to return that same error.
2125 func TestReadGivesSameErrorAfterAnyErrorOccurs(t *testing.T) {
2126 testRecvBuffer := newRecvBuffer()
2128 ctx: context.Background(),
2129 goAway: make(chan struct{}),
2130 buf: testRecvBuffer,
2131 requestRead: func(int) {},
2133 s.trReader = &transportReader{
2134 reader: &recvBufferReader{
2139 windowHandler: func(int) {},
2141 testData := make([]byte, 1)
2143 testErr := errors.New("test error")
2144 s.write(recvMsg{data: testData, err: testErr})
2146 inBuf := make([]byte, 1)
2147 actualCount, actualErr := s.Read(inBuf)
2148 if actualCount != 0 {
2149 t.Errorf("actualCount, _ := s.Read(_) differs; want 0; got %v", actualCount)
2151 if actualErr.Error() != testErr.Error() {
2152 t.Errorf("_ , actualErr := s.Read(_) differs; want actualErr.Error() to be %v; got %v", testErr.Error(), actualErr.Error())
2155 s.write(recvMsg{data: testData, err: nil})
2156 s.write(recvMsg{data: testData, err: errors.New("different error from first")})
2158 for i := 0; i < 2; i++ {
2159 inBuf := make([]byte, 1)
2160 actualCount, actualErr := s.Read(inBuf)
2161 if actualCount != 0 {
2162 t.Errorf("actualCount, _ := s.Read(_) differs; want %v; got %v", 0, actualCount)
2164 if actualErr.Error() != testErr.Error() {
2165 t.Errorf("_ , actualErr := s.Read(_) differs; want actualErr.Error() to be %v; got %v", testErr.Error(), actualErr.Error())
2170 func TestPingPong1B(t *testing.T) {
2171 runPingPongTest(t, 1)
2174 func TestPingPong1KB(t *testing.T) {
2175 runPingPongTest(t, 1024)
2178 func TestPingPong64KB(t *testing.T) {
2179 runPingPongTest(t, 65536)
2182 func TestPingPong1MB(t *testing.T) {
2183 runPingPongTest(t, 1048576)
2186 //This is a stress-test of flow control logic.
2187 func runPingPongTest(t *testing.T, msgSize int) {
2188 server, client := setUp(t, 0, 0, pingpong)
2190 defer client.Close()
2191 waitWhileTrue(t, func() (bool, error) {
2193 defer server.mu.Unlock()
2194 if len(server.conns) == 0 {
2195 return true, fmt.Errorf("timed out while waiting for server transport to be created")
2199 ct := client.(*http2Client)
2200 stream, err := client.NewStream(context.Background(), &CallHdr{})
2202 t.Fatalf("Failed to create stream. Err: %v", err)
2204 msg := make([]byte, msgSize)
2205 outgoingHeader := make([]byte, 5)
2206 outgoingHeader[0] = byte(0)
2207 binary.BigEndian.PutUint32(outgoingHeader[1:], uint32(msgSize))
2209 incomingHeader := make([]byte, 5)
2210 done := make(chan struct{})
2212 timer := time.NewTimer(time.Second * 5)
2219 ct.Write(stream, nil, nil, &Options{Last: true})
2220 if _, err := stream.Read(incomingHeader); err != io.EOF {
2221 t.Fatalf("Client expected EOF from the server. Got: %v", err)
2225 if err := ct.Write(stream, outgoingHeader, msg, opts); err != nil {
2226 t.Fatalf("Error on client while writing message. Err: %v", err)
2228 if _, err := stream.Read(incomingHeader); err != nil {
2229 t.Fatalf("Error on client while reading data header. Err: %v", err)
2231 sz := binary.BigEndian.Uint32(incomingHeader[1:])
2232 recvMsg := make([]byte, int(sz))
2233 if _, err := stream.Read(recvMsg); err != nil {
2234 t.Fatalf("Error on client while reading data. Err: %v", err)