3 * Copyright 2014 gRPC authors.
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
9 * http://www.apache.org/licenses/LICENSE-2.0
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
19 //go:generate protoc --go_out=plugins=grpc:. codec_perf/perf.proto
20 //go:generate protoc --go_out=plugins=grpc:. grpc_testing/test.proto
43 "github.com/golang/protobuf/proto"
44 anypb "github.com/golang/protobuf/ptypes/any"
45 "golang.org/x/net/context"
46 "golang.org/x/net/http2"
47 spb "google.golang.org/genproto/googleapis/rpc/status"
48 "google.golang.org/grpc"
49 "google.golang.org/grpc/balancer"
50 _ "google.golang.org/grpc/balancer/roundrobin"
51 "google.golang.org/grpc/codes"
52 "google.golang.org/grpc/connectivity"
53 "google.golang.org/grpc/credentials"
54 _ "google.golang.org/grpc/grpclog/glogger"
55 "google.golang.org/grpc/health"
56 healthpb "google.golang.org/grpc/health/grpc_health_v1"
57 "google.golang.org/grpc/internal"
58 "google.golang.org/grpc/metadata"
59 "google.golang.org/grpc/peer"
60 "google.golang.org/grpc/resolver"
61 "google.golang.org/grpc/resolver/manual"
62 _ "google.golang.org/grpc/resolver/passthrough"
63 "google.golang.org/grpc/stats"
64 "google.golang.org/grpc/status"
65 "google.golang.org/grpc/tap"
66 testpb "google.golang.org/grpc/test/grpc_testing"
67 "google.golang.org/grpc/test/leakcheck"
68 "google.golang.org/grpc/testdata"
73 testMetadata = metadata.MD{
74 "key1": []string{"value1"},
75 "key2": []string{"value2"},
76 "key3-bin": []string{"binvalue1", string([]byte{1, 2, 3})},
78 testMetadata2 = metadata.MD{
79 "key1": []string{"value12"},
80 "key2": []string{"value22"},
83 testTrailerMetadata = metadata.MD{
84 "tkey1": []string{"trailerValue1"},
85 "tkey2": []string{"trailerValue2"},
86 "tkey3-bin": []string{"trailerbinvalue1", string([]byte{3, 2, 1})},
88 testTrailerMetadata2 = metadata.MD{
89 "tkey1": []string{"trailerValue12"},
90 "tkey2": []string{"trailerValue22"},
92 // capital "Key" is illegal in HTTP/2.
93 malformedHTTP2Metadata = metadata.MD{
94 "Key": []string{"foo"},
96 testAppUA = "myApp1/1.0 myApp2/0.9"
97 failAppUA = "fail-this-RPC"
98 detailedError = status.ErrorProto(&spb.Status{
99 Code: int32(codes.DataLoss),
100 Message: "error for testing: " + failAppUA,
101 Details: []*anypb.Any{{
103 Value: []byte{6, 0, 0, 6, 1, 3},
108 var raceMode bool // set by race.go in race mode
110 type testServer struct {
111 security string // indicate the authentication protocol used by this server.
112 earlyFail bool // whether to error out the execution of a service handler prematurely.
113 setAndSendHeader bool // whether to call setHeader and sendHeader.
114 setHeaderOnly bool // whether to only call setHeader, not sendHeader.
115 multipleSetTrailer bool // whether to call setTrailer multiple times.
116 unaryCallSleepTime time.Duration
119 func (s *testServer) EmptyCall(ctx context.Context, in *testpb.Empty) (*testpb.Empty, error) {
120 if md, ok := metadata.FromIncomingContext(ctx); ok {
121 // For testing purpose, returns an error if user-agent is failAppUA.
122 // To test that client gets the correct error.
123 if ua, ok := md["user-agent"]; !ok || strings.HasPrefix(ua[0], failAppUA) {
124 return nil, detailedError
127 for _, entry := range md["user-agent"] {
128 str = append(str, "ua", entry)
130 grpc.SendHeader(ctx, metadata.Pairs(str...))
132 return new(testpb.Empty), nil
135 func newPayload(t testpb.PayloadType, size int32) (*testpb.Payload, error) {
137 return nil, fmt.Errorf("Requested a response with invalid length %d", size)
139 body := make([]byte, size)
141 case testpb.PayloadType_COMPRESSABLE:
142 case testpb.PayloadType_UNCOMPRESSABLE:
143 return nil, fmt.Errorf("PayloadType UNCOMPRESSABLE is not supported")
145 return nil, fmt.Errorf("Unsupported payload type: %d", t)
147 return &testpb.Payload{
153 func (s *testServer) UnaryCall(ctx context.Context, in *testpb.SimpleRequest) (*testpb.SimpleResponse, error) {
154 md, ok := metadata.FromIncomingContext(ctx)
156 if _, exists := md[":authority"]; !exists {
157 return nil, grpc.Errorf(codes.DataLoss, "expected an :authority metadata: %v", md)
159 if s.setAndSendHeader {
160 if err := grpc.SetHeader(ctx, md); err != nil {
161 return nil, grpc.Errorf(grpc.Code(err), "grpc.SetHeader(_, %v) = %v, want <nil>", md, err)
163 if err := grpc.SendHeader(ctx, testMetadata2); err != nil {
164 return nil, grpc.Errorf(grpc.Code(err), "grpc.SendHeader(_, %v) = %v, want <nil>", testMetadata2, err)
166 } else if s.setHeaderOnly {
167 if err := grpc.SetHeader(ctx, md); err != nil {
168 return nil, grpc.Errorf(grpc.Code(err), "grpc.SetHeader(_, %v) = %v, want <nil>", md, err)
170 if err := grpc.SetHeader(ctx, testMetadata2); err != nil {
171 return nil, grpc.Errorf(grpc.Code(err), "grpc.SetHeader(_, %v) = %v, want <nil>", testMetadata2, err)
174 if err := grpc.SendHeader(ctx, md); err != nil {
175 return nil, grpc.Errorf(grpc.Code(err), "grpc.SendHeader(_, %v) = %v, want <nil>", md, err)
178 if err := grpc.SetTrailer(ctx, testTrailerMetadata); err != nil {
179 return nil, grpc.Errorf(grpc.Code(err), "grpc.SetTrailer(_, %v) = %v, want <nil>", testTrailerMetadata, err)
181 if s.multipleSetTrailer {
182 if err := grpc.SetTrailer(ctx, testTrailerMetadata2); err != nil {
183 return nil, grpc.Errorf(grpc.Code(err), "grpc.SetTrailer(_, %v) = %v, want <nil>", testTrailerMetadata2, err)
187 pr, ok := peer.FromContext(ctx)
189 return nil, grpc.Errorf(codes.DataLoss, "failed to get peer from ctx")
191 if pr.Addr == net.Addr(nil) {
192 return nil, grpc.Errorf(codes.DataLoss, "failed to get peer address")
194 if s.security != "" {
196 var authType, serverName string
197 switch info := pr.AuthInfo.(type) {
198 case credentials.TLSInfo:
199 authType = info.AuthType()
200 serverName = info.State.ServerName
202 return nil, grpc.Errorf(codes.Unauthenticated, "Unknown AuthInfo type")
204 if authType != s.security {
205 return nil, grpc.Errorf(codes.Unauthenticated, "Wrong auth type: got %q, want %q", authType, s.security)
207 if serverName != "x.test.youtube.com" {
208 return nil, grpc.Errorf(codes.Unauthenticated, "Unknown server name %q", serverName)
211 // Simulate some service delay.
212 time.Sleep(s.unaryCallSleepTime)
214 payload, err := newPayload(in.GetResponseType(), in.GetResponseSize())
219 return &testpb.SimpleResponse{
224 func (s *testServer) StreamingOutputCall(args *testpb.StreamingOutputCallRequest, stream testpb.TestService_StreamingOutputCallServer) error {
225 if md, ok := metadata.FromIncomingContext(stream.Context()); ok {
226 if _, exists := md[":authority"]; !exists {
227 return grpc.Errorf(codes.DataLoss, "expected an :authority metadata: %v", md)
229 // For testing purpose, returns an error if user-agent is failAppUA.
230 // To test that client gets the correct error.
231 if ua, ok := md["user-agent"]; !ok || strings.HasPrefix(ua[0], failAppUA) {
232 return grpc.Errorf(codes.DataLoss, "error for testing: "+failAppUA)
235 cs := args.GetResponseParameters()
236 for _, c := range cs {
237 if us := c.GetIntervalUs(); us > 0 {
238 time.Sleep(time.Duration(us) * time.Microsecond)
241 payload, err := newPayload(args.GetResponseType(), c.GetSize())
246 if err := stream.Send(&testpb.StreamingOutputCallResponse{
255 func (s *testServer) StreamingInputCall(stream testpb.TestService_StreamingInputCallServer) error {
258 in, err := stream.Recv()
260 return stream.SendAndClose(&testpb.StreamingInputCallResponse{
261 AggregatedPayloadSize: int32(sum),
267 p := in.GetPayload().GetBody()
270 return grpc.Errorf(codes.NotFound, "not found")
275 func (s *testServer) FullDuplexCall(stream testpb.TestService_FullDuplexCallServer) error {
276 md, ok := metadata.FromIncomingContext(stream.Context())
278 if s.setAndSendHeader {
279 if err := stream.SetHeader(md); err != nil {
280 return grpc.Errorf(grpc.Code(err), "%v.SetHeader(_, %v) = %v, want <nil>", stream, md, err)
282 if err := stream.SendHeader(testMetadata2); err != nil {
283 return grpc.Errorf(grpc.Code(err), "%v.SendHeader(_, %v) = %v, want <nil>", stream, testMetadata2, err)
285 } else if s.setHeaderOnly {
286 if err := stream.SetHeader(md); err != nil {
287 return grpc.Errorf(grpc.Code(err), "%v.SetHeader(_, %v) = %v, want <nil>", stream, md, err)
289 if err := stream.SetHeader(testMetadata2); err != nil {
290 return grpc.Errorf(grpc.Code(err), "%v.SetHeader(_, %v) = %v, want <nil>", stream, testMetadata2, err)
293 if err := stream.SendHeader(md); err != nil {
294 return grpc.Errorf(grpc.Code(err), "%v.SendHeader(%v) = %v, want %v", stream, md, err, nil)
297 stream.SetTrailer(testTrailerMetadata)
298 if s.multipleSetTrailer {
299 stream.SetTrailer(testTrailerMetadata2)
303 in, err := stream.Recv()
309 // to facilitate testSvrWriteStatusEarlyWrite
310 if grpc.Code(err) == codes.ResourceExhausted {
311 return grpc.Errorf(codes.Internal, "fake error for test testSvrWriteStatusEarlyWrite. true error: %s", err.Error())
315 cs := in.GetResponseParameters()
316 for _, c := range cs {
317 if us := c.GetIntervalUs(); us > 0 {
318 time.Sleep(time.Duration(us) * time.Microsecond)
321 payload, err := newPayload(in.GetResponseType(), c.GetSize())
326 if err := stream.Send(&testpb.StreamingOutputCallResponse{
329 // to facilitate testSvrWriteStatusEarlyWrite
330 if grpc.Code(err) == codes.ResourceExhausted {
331 return grpc.Errorf(codes.Internal, "fake error for test testSvrWriteStatusEarlyWrite. true error: %s", err.Error())
339 func (s *testServer) HalfDuplexCall(stream testpb.TestService_HalfDuplexCallServer) error {
340 var msgBuf []*testpb.StreamingOutputCallRequest
342 in, err := stream.Recv()
350 msgBuf = append(msgBuf, in)
352 for _, m := range msgBuf {
353 cs := m.GetResponseParameters()
354 for _, c := range cs {
355 if us := c.GetIntervalUs(); us > 0 {
356 time.Sleep(time.Duration(us) * time.Microsecond)
359 payload, err := newPayload(m.GetResponseType(), c.GetSize())
364 if err := stream.Send(&testpb.StreamingOutputCallResponse{
376 network string // The type of network such as tcp, unix, etc.
377 security string // The security protocol such as TLS, SSH, etc.
378 httpHandler bool // whether to use the http.Handler ServerTransport; requires TLS
379 balancer string // One of "roundrobin", "pickfirst", "v1", or "".
380 customDialer func(string, string, time.Duration) (net.Conn, error)
383 func (e env) runnable() bool {
384 if runtime.GOOS == "windows" && e.network == "unix" {
390 func (e env) dialer(addr string, timeout time.Duration) (net.Conn, error) {
391 if e.customDialer != nil {
392 return e.customDialer(e.network, addr, timeout)
394 return net.DialTimeout(e.network, addr, timeout)
398 tcpClearEnv = env{name: "tcp-clear-v1-balancer", network: "tcp", balancer: "v1"}
399 tcpTLSEnv = env{name: "tcp-tls-v1-balancer", network: "tcp", security: "tls", balancer: "v1"}
400 tcpClearRREnv = env{name: "tcp-clear", network: "tcp", balancer: "roundrobin"}
401 tcpTLSRREnv = env{name: "tcp-tls", network: "tcp", security: "tls", balancer: "roundrobin"}
402 handlerEnv = env{name: "handler-tls", network: "tcp", security: "tls", httpHandler: true, balancer: "roundrobin"}
403 noBalancerEnv = env{name: "no-balancer", network: "tcp", security: "tls"}
404 allEnv = []env{tcpClearEnv, tcpTLSEnv, tcpClearRREnv, tcpTLSRREnv, handlerEnv, noBalancerEnv}
407 var onlyEnv = flag.String("only_env", "", "If non-empty, one of 'tcp-clear', 'tcp-tls', 'unix-clear', 'unix-tls', or 'handler-tls' to only run the tests for that environment. Empty means all.")
409 func listTestEnv() (envs []env) {
411 for _, e := range allEnv {
412 if e.name == *onlyEnv {
414 panic(fmt.Sprintf("--only_env environment %q does not run on %s", *onlyEnv, runtime.GOOS))
419 panic(fmt.Sprintf("invalid --only_env value %q", *onlyEnv))
421 for _, e := range allEnv {
423 envs = append(envs, e)
429 // test is an end-to-end test. It should be created with the newTest
430 // func, modified as needed, and then started with its startServer method.
431 // It should be cleaned up with the tearDown method.
436 ctx context.Context // valid for life of test, before tearDown
437 cancel context.CancelFunc
439 // Configurable knobs, after newTest returns:
440 testServer testpb.TestServiceServer // nil means none
441 healthServer *health.Server // nil means disabled
443 tapHandle tap.ServerInHandle
445 maxClientReceiveMsgSize *int
446 maxClientSendMsgSize *int
447 maxServerReceiveMsgSize *int
448 maxServerSendMsgSize *int
450 clientCompression bool
451 serverCompression bool
452 unaryClientInt grpc.UnaryClientInterceptor
453 streamClientInt grpc.StreamClientInterceptor
454 unaryServerInt grpc.UnaryServerInterceptor
455 streamServerInt grpc.StreamServerInterceptor
456 unknownHandler grpc.StreamHandler
457 sc <-chan grpc.ServiceConfig
458 customCodec grpc.Codec
459 serverInitialWindowSize int32
460 serverInitialConnWindowSize int32
461 clientInitialWindowSize int32
462 clientInitialConnWindowSize int32
463 perRPCCreds credentials.PerRPCCredentials
464 resolverScheme string
466 // All test dialing is blocking by default. Set this to true if dial
467 // should be non-blocking.
470 // srv and srvAddr are set once startServer is called.
474 cc *grpc.ClientConn // nil until requested via clientConn
475 restoreLogs func() // nil unless declareLogNoise is used
478 func (te *test) tearDown() {
479 if te.cancel != nil {
487 if te.restoreLogs != nil {
496 // newTest returns a new test using the provided testing.T and
497 // environment. It is returned with default values. Tests should
498 // modify it before calling its startServer and clientConn methods.
499 func newTest(t *testing.T, e env) *test {
503 maxStream: math.MaxUint32,
505 te.ctx, te.cancel = context.WithCancel(context.Background())
509 // startServer starts a gRPC server listening. Callers should defer a
510 // call to te.tearDown to clean up.
511 func (te *test) startServer(ts testpb.TestServiceServer) {
513 te.t.Logf("Running test in %s environment...", te.e.name)
514 sopts := []grpc.ServerOption{grpc.MaxConcurrentStreams(te.maxStream)}
515 if te.maxMsgSize != nil {
516 sopts = append(sopts, grpc.MaxMsgSize(*te.maxMsgSize))
518 if te.maxServerReceiveMsgSize != nil {
519 sopts = append(sopts, grpc.MaxRecvMsgSize(*te.maxServerReceiveMsgSize))
521 if te.maxServerSendMsgSize != nil {
522 sopts = append(sopts, grpc.MaxSendMsgSize(*te.maxServerSendMsgSize))
524 if te.tapHandle != nil {
525 sopts = append(sopts, grpc.InTapHandle(te.tapHandle))
527 if te.serverCompression {
528 sopts = append(sopts,
529 grpc.RPCCompressor(grpc.NewGZIPCompressor()),
530 grpc.RPCDecompressor(grpc.NewGZIPDecompressor()),
533 if te.unaryServerInt != nil {
534 sopts = append(sopts, grpc.UnaryInterceptor(te.unaryServerInt))
536 if te.streamServerInt != nil {
537 sopts = append(sopts, grpc.StreamInterceptor(te.streamServerInt))
539 if te.unknownHandler != nil {
540 sopts = append(sopts, grpc.UnknownServiceHandler(te.unknownHandler))
542 if te.serverInitialWindowSize > 0 {
543 sopts = append(sopts, grpc.InitialWindowSize(te.serverInitialWindowSize))
545 if te.serverInitialConnWindowSize > 0 {
546 sopts = append(sopts, grpc.InitialConnWindowSize(te.serverInitialConnWindowSize))
549 switch te.e.network {
551 la = "/tmp/testsock" + fmt.Sprintf("%d", time.Now().UnixNano())
554 lis, err := net.Listen(te.e.network, la)
556 te.t.Fatalf("Failed to listen: %v", err)
558 switch te.e.security {
560 creds, err := credentials.NewServerTLSFromFile(testdata.Path("server1.pem"), testdata.Path("server1.key"))
562 te.t.Fatalf("Failed to generate credentials %v", err)
564 sopts = append(sopts, grpc.Creds(creds))
565 case "clientTimeoutCreds":
566 sopts = append(sopts, grpc.Creds(&clientTimeoutCreds{}))
568 if te.customCodec != nil {
569 sopts = append(sopts, grpc.CustomCodec(te.customCodec))
571 s := grpc.NewServer(sopts...)
573 if te.e.httpHandler {
574 internal.TestingUseHandlerImpl(s)
576 if te.healthServer != nil {
577 healthpb.RegisterHealthServer(s, te.healthServer)
579 if te.testServer != nil {
580 testpb.RegisterTestServiceServer(s, te.testServer)
583 switch te.e.network {
586 _, port, err := net.SplitHostPort(lis.Addr().String())
588 te.t.Fatalf("Failed to parse listener address: %v", err)
590 addr = "localhost:" + port
597 func (te *test) clientConn() *grpc.ClientConn {
601 opts := []grpc.DialOption{
602 grpc.WithDialer(te.e.dialer),
603 grpc.WithUserAgent(te.userAgent),
607 opts = append(opts, grpc.WithServiceConfig(te.sc))
610 if te.clientCompression {
612 grpc.WithCompressor(grpc.NewGZIPCompressor()),
613 grpc.WithDecompressor(grpc.NewGZIPDecompressor()),
616 if te.unaryClientInt != nil {
617 opts = append(opts, grpc.WithUnaryInterceptor(te.unaryClientInt))
619 if te.streamClientInt != nil {
620 opts = append(opts, grpc.WithStreamInterceptor(te.streamClientInt))
622 if te.maxMsgSize != nil {
623 opts = append(opts, grpc.WithMaxMsgSize(*te.maxMsgSize))
625 if te.maxClientReceiveMsgSize != nil {
626 opts = append(opts, grpc.WithDefaultCallOptions(grpc.MaxCallRecvMsgSize(*te.maxClientReceiveMsgSize)))
628 if te.maxClientSendMsgSize != nil {
629 opts = append(opts, grpc.WithDefaultCallOptions(grpc.MaxCallSendMsgSize(*te.maxClientSendMsgSize)))
631 switch te.e.security {
633 creds, err := credentials.NewClientTLSFromFile(testdata.Path("ca.pem"), "x.test.youtube.com")
635 te.t.Fatalf("Failed to load credentials: %v", err)
637 opts = append(opts, grpc.WithTransportCredentials(creds))
638 case "clientTimeoutCreds":
639 opts = append(opts, grpc.WithTransportCredentials(&clientTimeoutCreds{}))
641 opts = append(opts, grpc.WithInsecure())
643 // TODO(bar) switch balancer case "pickfirst".
645 if te.resolverScheme == "" {
646 scheme = "passthrough:///"
648 scheme = te.resolverScheme + ":///"
650 switch te.e.balancer {
652 opts = append(opts, grpc.WithBalancer(grpc.RoundRobin(nil)))
654 rr := balancer.Get("roundrobin")
656 te.t.Fatalf("got nil when trying to get roundrobin balancer builder")
658 opts = append(opts, grpc.WithBalancerBuilder(rr))
660 if te.clientInitialWindowSize > 0 {
661 opts = append(opts, grpc.WithInitialWindowSize(te.clientInitialWindowSize))
663 if te.clientInitialConnWindowSize > 0 {
664 opts = append(opts, grpc.WithInitialConnWindowSize(te.clientInitialConnWindowSize))
666 if te.perRPCCreds != nil {
667 opts = append(opts, grpc.WithPerRPCCredentials(te.perRPCCreds))
669 if te.customCodec != nil {
670 opts = append(opts, grpc.WithCodec(te.customCodec))
672 if !te.nonBlockingDial && te.srvAddr != "" {
673 // Only do a blocking dial if server is up.
674 opts = append(opts, grpc.WithBlock())
676 if te.srvAddr == "" {
677 te.srvAddr = "client.side.only.test"
680 te.cc, err = grpc.Dial(scheme+te.srvAddr, opts...)
682 te.t.Fatalf("Dial(%q) = %v", scheme+te.srvAddr, err)
687 func (te *test) declareLogNoise(phrases ...string) {
688 te.restoreLogs = declareLogNoise(te.t, phrases...)
691 func (te *test) withServerTester(fn func(st *serverTester)) {
692 c, err := te.e.dialer(te.srvAddr, 10*time.Second)
697 if te.e.security == "tls" {
698 c = tls.Client(c, &tls.Config{
699 InsecureSkipVerify: true,
700 NextProtos: []string{http2.NextProtoTLS},
703 st := newServerTesterFromConn(te.t, c)
708 type lazyConn struct {
713 func (l *lazyConn) Write(b []byte) (int, error) {
714 if atomic.LoadInt32(&(l.beLazy)) == 1 {
715 // The sleep duration here needs to less than the leakCheck deadline.
716 time.Sleep(time.Second)
718 return l.Conn.Write(b)
721 func TestContextDeadlineNotIgnored(t *testing.T) {
722 defer leakcheck.Check(t)
725 e.customDialer = func(network, addr string, timeout time.Duration) (net.Conn, error) {
726 conn, err := net.DialTimeout(network, addr, timeout)
730 lc = &lazyConn{Conn: conn}
735 te.startServer(&testServer{security: e.security})
738 cc := te.clientConn()
739 tc := testpb.NewTestServiceClient(cc)
740 if _, err := tc.EmptyCall(context.Background(), &testpb.Empty{}); err != nil {
741 t.Fatalf("TestService/EmptyCall(_, _) = _, %v, want _, <nil>", err)
743 atomic.StoreInt32(&(lc.beLazy), 1)
744 ctx, cancel := context.WithTimeout(context.Background(), 50*time.Millisecond)
747 if _, err := tc.EmptyCall(ctx, &testpb.Empty{}); grpc.Code(err) != codes.DeadlineExceeded {
748 t.Fatalf("TestService/EmptyCall(_, _) = _, %v, want _, context.DeadlineExceeded", err)
750 if time.Since(t1) > 2*time.Second {
751 t.Fatalf("TestService/EmptyCall(_, _) ran over the deadline")
755 func TestTimeoutOnDeadServer(t *testing.T) {
756 defer leakcheck.Check(t)
757 for _, e := range listTestEnv() {
758 testTimeoutOnDeadServer(t, e)
762 func testTimeoutOnDeadServer(t *testing.T, e env) {
764 te.userAgent = testAppUA
766 "transport: http2Client.notifyError got notified that the client transport was broken EOF",
767 "grpc: addrConn.transportMonitor exits due to: grpc: the connection is closing",
768 "grpc: addrConn.resetTransport failed to create client transport: connection error",
770 te.startServer(&testServer{security: e.security})
773 cc := te.clientConn()
774 tc := testpb.NewTestServiceClient(cc)
775 if _, err := tc.EmptyCall(context.Background(), &testpb.Empty{}, grpc.FailFast(false)); err != nil {
776 t.Fatalf("TestService/EmptyCall(_, _) = _, %v, want _, <nil>", err)
779 ctx, cancel := context.WithTimeout(context.Background(), time.Millisecond)
780 _, err := tc.EmptyCall(ctx, &testpb.Empty{}, grpc.FailFast(false))
782 if e.balancer != "" && grpc.Code(err) != codes.DeadlineExceeded {
783 // If e.balancer == nil, the ac will stop reconnecting because the dialer returns non-temp error,
784 // the error will be an internal error.
785 t.Fatalf("TestService/EmptyCall(%v, _) = _, %v, want _, error code: %s", ctx, err, codes.DeadlineExceeded)
787 awaitNewConnLogOutput()
790 func TestServerGracefulStopIdempotent(t *testing.T) {
791 defer leakcheck.Check(t)
792 for _, e := range listTestEnv() {
793 if e.name == "handler-tls" {
796 testServerGracefulStopIdempotent(t, e)
800 func testServerGracefulStopIdempotent(t *testing.T, e env) {
802 te.userAgent = testAppUA
803 te.startServer(&testServer{security: e.security})
806 for i := 0; i < 3; i++ {
807 te.srv.GracefulStop()
811 func TestServerGoAway(t *testing.T) {
812 defer leakcheck.Check(t)
813 for _, e := range listTestEnv() {
814 if e.name == "handler-tls" {
817 testServerGoAway(t, e)
821 func testServerGoAway(t *testing.T, e env) {
823 te.userAgent = testAppUA
824 te.startServer(&testServer{security: e.security})
827 cc := te.clientConn()
828 tc := testpb.NewTestServiceClient(cc)
829 // Finish an RPC to make sure the connection is good.
830 if _, err := tc.EmptyCall(context.Background(), &testpb.Empty{}, grpc.FailFast(false)); err != nil {
831 t.Fatalf("TestService/EmptyCall(_, _) = _, %v, want _, <nil>", err)
833 ch := make(chan struct{})
835 te.srv.GracefulStop()
838 // Loop until the server side GoAway signal is propagated to the client.
840 ctx, cancel := context.WithTimeout(context.Background(), 10*time.Millisecond)
841 if _, err := tc.EmptyCall(ctx, &testpb.Empty{}); err != nil && grpc.Code(err) != codes.DeadlineExceeded {
847 // A new RPC should fail.
848 if _, err := tc.EmptyCall(context.Background(), &testpb.Empty{}); grpc.Code(err) != codes.Unavailable && grpc.Code(err) != codes.Internal {
849 t.Fatalf("TestService/EmptyCall(_, _) = _, %v, want _, %s or %s", err, codes.Unavailable, codes.Internal)
852 awaitNewConnLogOutput()
855 func TestServerGoAwayPendingRPC(t *testing.T) {
856 defer leakcheck.Check(t)
857 for _, e := range listTestEnv() {
858 if e.name == "handler-tls" {
861 testServerGoAwayPendingRPC(t, e)
865 func testServerGoAwayPendingRPC(t *testing.T, e env) {
867 te.userAgent = testAppUA
869 "transport: http2Client.notifyError got notified that the client transport was broken EOF",
870 "grpc: addrConn.transportMonitor exits due to: grpc: the connection is closing",
871 "grpc: addrConn.resetTransport failed to create client transport: connection error",
873 te.startServer(&testServer{security: e.security})
876 cc := te.clientConn()
877 tc := testpb.NewTestServiceClient(cc)
878 ctx, cancel := context.WithCancel(context.Background())
879 stream, err := tc.FullDuplexCall(ctx, grpc.FailFast(false))
881 t.Fatalf("%v.FullDuplexCall(_) = _, %v, want <nil>", tc, err)
883 // Finish an RPC to make sure the connection is good.
884 if _, err := tc.EmptyCall(context.Background(), &testpb.Empty{}, grpc.FailFast(false)); err != nil {
885 t.Fatalf("%v.EmptyCall(_, _, _) = _, %v, want _, <nil>", tc, err)
887 ch := make(chan struct{})
889 te.srv.GracefulStop()
892 // Loop until the server side GoAway signal is propagated to the client.
894 ctx, cancel := context.WithTimeout(context.Background(), 10*time.Millisecond)
895 if _, err := tc.EmptyCall(ctx, &testpb.Empty{}, grpc.FailFast(false)); err != nil {
901 respParam := []*testpb.ResponseParameters{
906 payload, err := newPayload(testpb.PayloadType_COMPRESSABLE, int32(100))
910 req := &testpb.StreamingOutputCallRequest{
911 ResponseType: testpb.PayloadType_COMPRESSABLE,
912 ResponseParameters: respParam,
915 // The existing RPC should be still good to proceed.
916 if err := stream.Send(req); err != nil {
917 t.Fatalf("%v.Send(%v) = %v, want <nil>", stream, req, err)
919 if _, err := stream.Recv(); err != nil {
920 t.Fatalf("%v.Recv() = _, %v, want _, <nil>", stream, err)
924 awaitNewConnLogOutput()
927 func TestServerMultipleGoAwayPendingRPC(t *testing.T) {
928 defer leakcheck.Check(t)
929 for _, e := range listTestEnv() {
930 if e.name == "handler-tls" {
933 testServerMultipleGoAwayPendingRPC(t, e)
937 func testServerMultipleGoAwayPendingRPC(t *testing.T, e env) {
939 te.userAgent = testAppUA
941 "transport: http2Client.notifyError got notified that the client transport was broken EOF",
942 "grpc: addrConn.transportMonitor exits due to: grpc: the connection is closing",
943 "grpc: addrConn.resetTransport failed to create client transport: connection error",
945 te.startServer(&testServer{security: e.security})
948 cc := te.clientConn()
949 tc := testpb.NewTestServiceClient(cc)
950 ctx, cancel := context.WithCancel(context.Background())
951 stream, err := tc.FullDuplexCall(ctx, grpc.FailFast(false))
953 t.Fatalf("%v.FullDuplexCall(_) = _, %v, want <nil>", tc, err)
955 // Finish an RPC to make sure the connection is good.
956 if _, err := tc.EmptyCall(context.Background(), &testpb.Empty{}, grpc.FailFast(false)); err != nil {
957 t.Fatalf("%v.EmptyCall(_, _, _) = _, %v, want _, <nil>", tc, err)
959 ch1 := make(chan struct{})
961 te.srv.GracefulStop()
964 ch2 := make(chan struct{})
966 te.srv.GracefulStop()
969 // Loop until the server side GoAway signal is propagated to the client.
971 ctx, cancel := context.WithTimeout(context.Background(), 10*time.Millisecond)
972 if _, err := tc.EmptyCall(ctx, &testpb.Empty{}, grpc.FailFast(false)); err != nil {
980 t.Fatal("GracefulStop() terminated early")
982 t.Fatal("GracefulStop() terminated early")
985 respParam := []*testpb.ResponseParameters{
990 payload, err := newPayload(testpb.PayloadType_COMPRESSABLE, int32(100))
994 req := &testpb.StreamingOutputCallRequest{
995 ResponseType: testpb.PayloadType_COMPRESSABLE,
996 ResponseParameters: respParam,
999 // The existing RPC should be still good to proceed.
1000 if err := stream.Send(req); err != nil {
1001 t.Fatalf("%v.Send(%v) = %v, want <nil>", stream, req, err)
1003 if _, err := stream.Recv(); err != nil {
1004 t.Fatalf("%v.Recv() = _, %v, want _, <nil>", stream, err)
1006 if err := stream.CloseSend(); err != nil {
1007 t.Fatalf("%v.CloseSend() = %v, want <nil>", stream, err)
1012 awaitNewConnLogOutput()
1015 func TestConcurrentClientConnCloseAndServerGoAway(t *testing.T) {
1016 defer leakcheck.Check(t)
1017 for _, e := range listTestEnv() {
1018 if e.name == "handler-tls" {
1021 testConcurrentClientConnCloseAndServerGoAway(t, e)
1025 func testConcurrentClientConnCloseAndServerGoAway(t *testing.T, e env) {
1027 te.userAgent = testAppUA
1029 "transport: http2Client.notifyError got notified that the client transport was broken EOF",
1030 "grpc: addrConn.transportMonitor exits due to: grpc: the connection is closing",
1031 "grpc: addrConn.resetTransport failed to create client transport: connection error",
1033 te.startServer(&testServer{security: e.security})
1036 cc := te.clientConn()
1037 tc := testpb.NewTestServiceClient(cc)
1038 if _, err := tc.EmptyCall(context.Background(), &testpb.Empty{}, grpc.FailFast(false)); err != nil {
1039 t.Fatalf("%v.EmptyCall(_, _, _) = _, %v, want _, <nil>", tc, err)
1041 ch := make(chan struct{})
1042 // Close ClientConn and Server concurrently.
1044 te.srv.GracefulStop()
1053 func TestConcurrentServerStopAndGoAway(t *testing.T) {
1054 defer leakcheck.Check(t)
1055 for _, e := range listTestEnv() {
1056 if e.name == "handler-tls" {
1059 testConcurrentServerStopAndGoAway(t, e)
1063 func testConcurrentServerStopAndGoAway(t *testing.T, e env) {
1065 te.userAgent = testAppUA
1067 "transport: http2Client.notifyError got notified that the client transport was broken EOF",
1068 "grpc: addrConn.transportMonitor exits due to: grpc: the connection is closing",
1069 "grpc: addrConn.resetTransport failed to create client transport: connection error",
1071 te.startServer(&testServer{security: e.security})
1074 cc := te.clientConn()
1075 tc := testpb.NewTestServiceClient(cc)
1076 stream, err := tc.FullDuplexCall(context.Background(), grpc.FailFast(false))
1078 t.Fatalf("%v.FullDuplexCall(_) = _, %v, want <nil>", tc, err)
1080 // Finish an RPC to make sure the connection is good.
1081 if _, err := tc.EmptyCall(context.Background(), &testpb.Empty{}, grpc.FailFast(false)); err != nil {
1082 t.Fatalf("%v.EmptyCall(_, _, _) = _, %v, want _, <nil>", tc, err)
1084 ch := make(chan struct{})
1086 te.srv.GracefulStop()
1089 // Loop until the server side GoAway signal is propagated to the client.
1091 ctx, cancel := context.WithTimeout(context.Background(), 10*time.Millisecond)
1092 if _, err := tc.EmptyCall(ctx, &testpb.Empty{}, grpc.FailFast(false)); err != nil {
1098 // Stop the server and close all the connections.
1100 respParam := []*testpb.ResponseParameters{
1105 payload, err := newPayload(testpb.PayloadType_COMPRESSABLE, int32(100))
1109 req := &testpb.StreamingOutputCallRequest{
1110 ResponseType: testpb.PayloadType_COMPRESSABLE,
1111 ResponseParameters: respParam,
1114 if err := stream.Send(req); err == nil {
1115 if _, err := stream.Recv(); err == nil {
1116 t.Fatalf("%v.Recv() = _, %v, want _, <nil>", stream, err)
1120 awaitNewConnLogOutput()
1123 func TestClientConnCloseAfterGoAwayWithActiveStream(t *testing.T) {
1124 defer leakcheck.Check(t)
1125 for _, e := range listTestEnv() {
1126 if e.name == "handler-tls" {
1129 testClientConnCloseAfterGoAwayWithActiveStream(t, e)
1133 func testClientConnCloseAfterGoAwayWithActiveStream(t *testing.T, e env) {
1135 te.startServer(&testServer{security: e.security})
1137 cc := te.clientConn()
1138 tc := testpb.NewTestServiceClient(cc)
1140 if _, err := tc.FullDuplexCall(context.Background()); err != nil {
1141 t.Fatalf("%v.FullDuplexCall(_) = _, %v, want _, <nil>", tc, err)
1143 done := make(chan struct{})
1145 te.srv.GracefulStop()
1148 time.Sleep(50 * time.Millisecond)
1150 timeout := time.NewTimer(time.Second)
1154 t.Fatalf("Test timed-out.")
1158 func TestFailFast(t *testing.T) {
1159 defer leakcheck.Check(t)
1160 for _, e := range listTestEnv() {
1165 func testFailFast(t *testing.T, e env) {
1167 te.userAgent = testAppUA
1169 "transport: http2Client.notifyError got notified that the client transport was broken EOF",
1170 "grpc: addrConn.transportMonitor exits due to: grpc: the connection is closing",
1171 "grpc: addrConn.resetTransport failed to create client transport: connection error",
1173 te.startServer(&testServer{security: e.security})
1176 cc := te.clientConn()
1177 tc := testpb.NewTestServiceClient(cc)
1178 if _, err := tc.EmptyCall(context.Background(), &testpb.Empty{}); err != nil {
1179 t.Fatalf("TestService/EmptyCall(_, _) = _, %v, want _, <nil>", err)
1181 // Stop the server and tear down all the exisiting connections.
1183 // Loop until the server teardown is propagated to the client.
1185 _, err := tc.EmptyCall(context.Background(), &testpb.Empty{})
1186 if grpc.Code(err) == codes.Unavailable {
1189 fmt.Printf("%v.EmptyCall(_, _) = _, %v", tc, err)
1190 time.Sleep(10 * time.Millisecond)
1192 // The client keeps reconnecting and ongoing fail-fast RPCs should fail with code.Unavailable.
1193 if _, err := tc.EmptyCall(context.Background(), &testpb.Empty{}); grpc.Code(err) != codes.Unavailable {
1194 t.Fatalf("TestService/EmptyCall(_, _, _) = _, %v, want _, error code: %s", err, codes.Unavailable)
1196 if _, err := tc.StreamingInputCall(context.Background()); grpc.Code(err) != codes.Unavailable {
1197 t.Fatalf("TestService/StreamingInputCall(_) = _, %v, want _, error code: %s", err, codes.Unavailable)
1200 awaitNewConnLogOutput()
1203 func testServiceConfigSetup(t *testing.T, e env) *test {
1205 te.userAgent = testAppUA
1207 "transport: http2Client.notifyError got notified that the client transport was broken EOF",
1208 "grpc: addrConn.transportMonitor exits due to: grpc: the connection is closing",
1209 "grpc: addrConn.resetTransport failed to create client transport: connection error",
1210 "Failed to dial : context canceled; please retry.",
1215 func newBool(b bool) (a *bool) {
1219 func newInt(b int) (a *int) {
1223 func newDuration(b time.Duration) (a *time.Duration) {
1224 a = new(time.Duration)
1229 func TestGetMethodConfig(t *testing.T) {
1230 te := testServiceConfigSetup(t, tcpClearRREnv)
1232 r, rcleanup := manual.GenerateAndRegisterManualResolver()
1235 te.resolverScheme = r.Scheme()
1236 cc := te.clientConn()
1237 r.NewAddress([]resolver.Address{{Addr: te.srvAddr}})
1238 r.NewServiceConfig(`{
1243 "service": "grpc.testing.TestService",
1244 "method": "EmptyCall"
1247 "waitForReady": true,
1253 "service": "grpc.testing.TestService"
1256 "waitForReady": false
1261 tc := testpb.NewTestServiceClient(cc)
1263 // Make sure service config has been processed by grpc.
1265 if cc.GetMethodConfig("/grpc.testing.TestService/EmptyCall").WaitForReady != nil {
1268 time.Sleep(time.Millisecond)
1271 // The following RPCs are expected to become non-fail-fast ones with 1ms deadline.
1273 if _, err = tc.EmptyCall(context.Background(), &testpb.Empty{}); grpc.Code(err) != codes.DeadlineExceeded {
1274 t.Fatalf("TestService/EmptyCall(_, _) = _, %v, want _, %s", err, codes.DeadlineExceeded)
1277 r.NewServiceConfig(`{
1282 "service": "grpc.testing.TestService",
1283 "method": "UnaryCall"
1286 "waitForReady": true,
1292 "service": "grpc.testing.TestService"
1295 "waitForReady": false
1300 // Make sure service config has been processed by grpc.
1302 if mc := cc.GetMethodConfig("/grpc.testing.TestService/EmptyCall"); mc.WaitForReady != nil && !*mc.WaitForReady {
1305 time.Sleep(time.Millisecond)
1307 // The following RPCs are expected to become fail-fast.
1308 if _, err = tc.EmptyCall(context.Background(), &testpb.Empty{}); grpc.Code(err) != codes.Unavailable {
1309 t.Fatalf("TestService/EmptyCall(_, _) = _, %v, want _, %s", err, codes.Unavailable)
1313 func TestServiceConfigWaitForReady(t *testing.T) {
1314 te := testServiceConfigSetup(t, tcpClearRREnv)
1316 r, rcleanup := manual.GenerateAndRegisterManualResolver()
1319 // Case1: Client API set failfast to be false, and service config set wait_for_ready to be false, Client API should win, and the rpc will wait until deadline exceeds.
1320 te.resolverScheme = r.Scheme()
1321 cc := te.clientConn()
1322 r.NewAddress([]resolver.Address{{Addr: te.srvAddr}})
1323 r.NewServiceConfig(`{
1328 "service": "grpc.testing.TestService",
1329 "method": "EmptyCall"
1332 "service": "grpc.testing.TestService",
1333 "method": "FullDuplexCall"
1336 "waitForReady": false,
1342 tc := testpb.NewTestServiceClient(cc)
1344 // Make sure service config has been processed by grpc.
1346 if cc.GetMethodConfig("/grpc.testing.TestService/FullDuplexCall").WaitForReady != nil {
1349 time.Sleep(time.Millisecond)
1352 // The following RPCs are expected to become non-fail-fast ones with 1ms deadline.
1354 if _, err = tc.EmptyCall(context.Background(), &testpb.Empty{}, grpc.FailFast(false)); grpc.Code(err) != codes.DeadlineExceeded {
1355 t.Fatalf("TestService/EmptyCall(_, _) = _, %v, want _, %s", err, codes.DeadlineExceeded)
1357 if _, err := tc.FullDuplexCall(context.Background(), grpc.FailFast(false)); grpc.Code(err) != codes.DeadlineExceeded {
1358 t.Fatalf("TestService/FullDuplexCall(_) = _, %v, want %s", err, codes.DeadlineExceeded)
1361 // Generate a service config update.
1362 // Case2:Client API set failfast to be false, and service config set wait_for_ready to be true, and the rpc will wait until deadline exceeds.
1363 r.NewServiceConfig(`{
1368 "service": "grpc.testing.TestService",
1369 "method": "EmptyCall"
1372 "service": "grpc.testing.TestService",
1373 "method": "FullDuplexCall"
1376 "waitForReady": true,
1382 // Wait for the new service config to take effect.
1384 if mc := cc.GetMethodConfig("/grpc.testing.TestService/EmptyCall"); mc.WaitForReady != nil && *mc.WaitForReady {
1387 time.Sleep(time.Millisecond)
1389 // The following RPCs are expected to become non-fail-fast ones with 1ms deadline.
1390 if _, err := tc.EmptyCall(context.Background(), &testpb.Empty{}); grpc.Code(err) != codes.DeadlineExceeded {
1391 t.Fatalf("TestService/EmptyCall(_, _) = _, %v, want _, %s", err, codes.DeadlineExceeded)
1393 if _, err := tc.FullDuplexCall(context.Background()); grpc.Code(err) != codes.DeadlineExceeded {
1394 t.Fatalf("TestService/FullDuplexCall(_) = _, %v, want %s", err, codes.DeadlineExceeded)
1398 func TestServiceConfigTimeout(t *testing.T) {
1399 te := testServiceConfigSetup(t, tcpClearRREnv)
1401 r, rcleanup := manual.GenerateAndRegisterManualResolver()
1404 // Case1: Client API sets timeout to be 1ns and ServiceConfig sets timeout to be 1hr. Timeout should be 1ns (min of 1ns and 1hr) and the rpc will wait until deadline exceeds.
1405 te.resolverScheme = r.Scheme()
1406 cc := te.clientConn()
1407 r.NewAddress([]resolver.Address{{Addr: te.srvAddr}})
1408 r.NewServiceConfig(`{
1413 "service": "grpc.testing.TestService",
1414 "method": "EmptyCall"
1417 "service": "grpc.testing.TestService",
1418 "method": "FullDuplexCall"
1421 "waitForReady": true,
1427 tc := testpb.NewTestServiceClient(cc)
1429 // Make sure service config has been processed by grpc.
1431 if cc.GetMethodConfig("/grpc.testing.TestService/FullDuplexCall").Timeout != nil {
1434 time.Sleep(time.Millisecond)
1437 // The following RPCs are expected to become non-fail-fast ones with 1ns deadline.
1439 ctx, cancel := context.WithTimeout(context.Background(), time.Nanosecond)
1440 if _, err = tc.EmptyCall(ctx, &testpb.Empty{}, grpc.FailFast(false)); grpc.Code(err) != codes.DeadlineExceeded {
1441 t.Fatalf("TestService/EmptyCall(_, _) = _, %v, want _, %s", err, codes.DeadlineExceeded)
1445 ctx, cancel = context.WithTimeout(context.Background(), time.Nanosecond)
1446 if _, err = tc.FullDuplexCall(ctx, grpc.FailFast(false)); grpc.Code(err) != codes.DeadlineExceeded {
1447 t.Fatalf("TestService/FullDuplexCall(_) = _, %v, want %s", err, codes.DeadlineExceeded)
1451 // Generate a service config update.
1452 // Case2: Client API sets timeout to be 1hr and ServiceConfig sets timeout to be 1ns. Timeout should be 1ns (min of 1ns and 1hr) and the rpc will wait until deadline exceeds.
1453 r.NewServiceConfig(`{
1458 "service": "grpc.testing.TestService",
1459 "method": "EmptyCall"
1462 "service": "grpc.testing.TestService",
1463 "method": "FullDuplexCall"
1466 "waitForReady": true,
1472 // Wait for the new service config to take effect.
1474 if mc := cc.GetMethodConfig("/grpc.testing.TestService/FullDuplexCall"); mc.Timeout != nil && *mc.Timeout == time.Nanosecond {
1477 time.Sleep(time.Millisecond)
1480 ctx, cancel = context.WithTimeout(context.Background(), time.Hour)
1481 if _, err = tc.EmptyCall(ctx, &testpb.Empty{}, grpc.FailFast(false)); grpc.Code(err) != codes.DeadlineExceeded {
1482 t.Fatalf("TestService/EmptyCall(_, _) = _, %v, want _, %s", err, codes.DeadlineExceeded)
1486 ctx, cancel = context.WithTimeout(context.Background(), time.Hour)
1487 if _, err = tc.FullDuplexCall(ctx, grpc.FailFast(false)); grpc.Code(err) != codes.DeadlineExceeded {
1488 t.Fatalf("TestService/FullDuplexCall(_) = _, %v, want %s", err, codes.DeadlineExceeded)
1493 func TestServiceConfigMaxMsgSize(t *testing.T) {
1495 r, rcleanup := manual.GenerateAndRegisterManualResolver()
1498 // Setting up values and objects shared across all test cases.
1500 const largeSize = 1024
1501 const extraLargeSize = 2048
1503 smallPayload, err := newPayload(testpb.PayloadType_COMPRESSABLE, smallSize)
1507 largePayload, err := newPayload(testpb.PayloadType_COMPRESSABLE, largeSize)
1511 extraLargePayload, err := newPayload(testpb.PayloadType_COMPRESSABLE, extraLargeSize)
1521 "service": "grpc.testing.TestService",
1522 "method": "UnaryCall"
1525 "service": "grpc.testing.TestService",
1526 "method": "FullDuplexCall"
1529 "maxRequestMessageBytes": 2048,
1530 "maxResponseMessageBytes": 2048
1535 // Case1: sc set maxReqSize to 2048 (send), maxRespSize to 2048 (recv).
1536 te1 := testServiceConfigSetup(t, e)
1537 defer te1.tearDown()
1539 te1.resolverScheme = r.Scheme()
1540 te1.nonBlockingDial = true
1541 te1.startServer(&testServer{security: e.security})
1542 cc1 := te1.clientConn()
1544 r.NewAddress([]resolver.Address{{Addr: te1.srvAddr}})
1545 r.NewServiceConfig(scjs)
1546 tc := testpb.NewTestServiceClient(cc1)
1548 req := &testpb.SimpleRequest{
1549 ResponseType: testpb.PayloadType_COMPRESSABLE,
1550 ResponseSize: int32(extraLargeSize),
1551 Payload: smallPayload,
1555 if cc1.GetMethodConfig("/grpc.testing.TestService/FullDuplexCall").MaxReqSize != nil {
1558 time.Sleep(time.Millisecond)
1561 // Test for unary RPC recv.
1562 if _, err = tc.UnaryCall(context.Background(), req, grpc.FailFast(false)); err == nil || grpc.Code(err) != codes.ResourceExhausted {
1563 t.Fatalf("TestService/UnaryCall(_, _) = _, %v, want _, error code: %s", err, codes.ResourceExhausted)
1566 // Test for unary RPC send.
1567 req.Payload = extraLargePayload
1568 req.ResponseSize = int32(smallSize)
1569 if _, err := tc.UnaryCall(context.Background(), req); err == nil || grpc.Code(err) != codes.ResourceExhausted {
1570 t.Fatalf("TestService/UnaryCall(_, _) = _, %v, want _, error code: %s", err, codes.ResourceExhausted)
1573 // Test for streaming RPC recv.
1574 respParam := []*testpb.ResponseParameters{
1576 Size: int32(extraLargeSize),
1579 sreq := &testpb.StreamingOutputCallRequest{
1580 ResponseType: testpb.PayloadType_COMPRESSABLE,
1581 ResponseParameters: respParam,
1582 Payload: smallPayload,
1584 stream, err := tc.FullDuplexCall(te1.ctx)
1586 t.Fatalf("%v.FullDuplexCall(_) = _, %v, want <nil>", tc, err)
1588 if err = stream.Send(sreq); err != nil {
1589 t.Fatalf("%v.Send(%v) = %v, want <nil>", stream, sreq, err)
1591 if _, err = stream.Recv(); err == nil || grpc.Code(err) != codes.ResourceExhausted {
1592 t.Fatalf("%v.Recv() = _, %v, want _, error code: %s", stream, err, codes.ResourceExhausted)
1595 // Test for streaming RPC send.
1596 respParam[0].Size = int32(smallSize)
1597 sreq.Payload = extraLargePayload
1598 stream, err = tc.FullDuplexCall(te1.ctx)
1600 t.Fatalf("%v.FullDuplexCall(_) = _, %v, want <nil>", tc, err)
1602 if err = stream.Send(sreq); err == nil || grpc.Code(err) != codes.ResourceExhausted {
1603 t.Fatalf("%v.Send(%v) = %v, want _, error code: %s", stream, sreq, err, codes.ResourceExhausted)
1606 // Case2: Client API set maxReqSize to 1024 (send), maxRespSize to 1024 (recv). Sc sets maxReqSize to 2048 (send), maxRespSize to 2048 (recv).
1607 te2 := testServiceConfigSetup(t, e)
1608 te2.resolverScheme = r.Scheme()
1609 te2.nonBlockingDial = true
1610 te2.maxClientReceiveMsgSize = newInt(1024)
1611 te2.maxClientSendMsgSize = newInt(1024)
1613 te2.startServer(&testServer{security: e.security})
1614 defer te2.tearDown()
1615 cc2 := te2.clientConn()
1616 r.NewAddress([]resolver.Address{{Addr: te2.srvAddr}})
1617 r.NewServiceConfig(scjs)
1618 tc = testpb.NewTestServiceClient(cc2)
1621 if cc2.GetMethodConfig("/grpc.testing.TestService/FullDuplexCall").MaxReqSize != nil {
1624 time.Sleep(time.Millisecond)
1627 // Test for unary RPC recv.
1628 req.Payload = smallPayload
1629 req.ResponseSize = int32(largeSize)
1631 if _, err = tc.UnaryCall(context.Background(), req, grpc.FailFast(false)); err == nil || grpc.Code(err) != codes.ResourceExhausted {
1632 t.Fatalf("TestService/UnaryCall(_, _) = _, %v, want _, error code: %s", err, codes.ResourceExhausted)
1635 // Test for unary RPC send.
1636 req.Payload = largePayload
1637 req.ResponseSize = int32(smallSize)
1638 if _, err := tc.UnaryCall(context.Background(), req); err == nil || grpc.Code(err) != codes.ResourceExhausted {
1639 t.Fatalf("TestService/UnaryCall(_, _) = _, %v, want _, error code: %s", err, codes.ResourceExhausted)
1642 // Test for streaming RPC recv.
1643 stream, err = tc.FullDuplexCall(te2.ctx)
1644 respParam[0].Size = int32(largeSize)
1645 sreq.Payload = smallPayload
1647 t.Fatalf("%v.FullDuplexCall(_) = _, %v, want <nil>", tc, err)
1649 if err = stream.Send(sreq); err != nil {
1650 t.Fatalf("%v.Send(%v) = %v, want <nil>", stream, sreq, err)
1652 if _, err = stream.Recv(); err == nil || grpc.Code(err) != codes.ResourceExhausted {
1653 t.Fatalf("%v.Recv() = _, %v, want _, error code: %s", stream, err, codes.ResourceExhausted)
1656 // Test for streaming RPC send.
1657 respParam[0].Size = int32(smallSize)
1658 sreq.Payload = largePayload
1659 stream, err = tc.FullDuplexCall(te2.ctx)
1661 t.Fatalf("%v.FullDuplexCall(_) = _, %v, want <nil>", tc, err)
1663 if err = stream.Send(sreq); err == nil || grpc.Code(err) != codes.ResourceExhausted {
1664 t.Fatalf("%v.Send(%v) = %v, want _, error code: %s", stream, sreq, err, codes.ResourceExhausted)
1667 // Case3: Client API set maxReqSize to 4096 (send), maxRespSize to 4096 (recv). Sc sets maxReqSize to 2048 (send), maxRespSize to 2048 (recv).
1668 te3 := testServiceConfigSetup(t, e)
1669 te3.resolverScheme = r.Scheme()
1670 te3.nonBlockingDial = true
1671 te3.maxClientReceiveMsgSize = newInt(4096)
1672 te3.maxClientSendMsgSize = newInt(4096)
1674 te3.startServer(&testServer{security: e.security})
1675 defer te3.tearDown()
1677 cc3 := te3.clientConn()
1678 r.NewAddress([]resolver.Address{{Addr: te3.srvAddr}})
1679 r.NewServiceConfig(scjs)
1680 tc = testpb.NewTestServiceClient(cc3)
1683 if cc3.GetMethodConfig("/grpc.testing.TestService/FullDuplexCall").MaxReqSize != nil {
1686 time.Sleep(time.Millisecond)
1689 // Test for unary RPC recv.
1690 req.Payload = smallPayload
1691 req.ResponseSize = int32(largeSize)
1693 if _, err = tc.UnaryCall(context.Background(), req, grpc.FailFast(false)); err != nil {
1694 t.Fatalf("TestService/UnaryCall(_, _) = _, %v, want <nil>", err)
1697 req.ResponseSize = int32(extraLargeSize)
1698 if _, err := tc.UnaryCall(context.Background(), req); err == nil || grpc.Code(err) != codes.ResourceExhausted {
1699 t.Fatalf("TestService/UnaryCall(_, _) = _, %v, want _, error code: %s", err, codes.ResourceExhausted)
1702 // Test for unary RPC send.
1703 req.Payload = largePayload
1704 req.ResponseSize = int32(smallSize)
1705 if _, err := tc.UnaryCall(context.Background(), req); err != nil {
1706 t.Fatalf("TestService/UnaryCall(_, _) = _, %v, want <nil>", err)
1709 req.Payload = extraLargePayload
1710 if _, err = tc.UnaryCall(context.Background(), req); err == nil || grpc.Code(err) != codes.ResourceExhausted {
1711 t.Fatalf("TestService/UnaryCall(_, _) = _, %v, want _, error code: %s", err, codes.ResourceExhausted)
1714 // Test for streaming RPC recv.
1715 stream, err = tc.FullDuplexCall(te3.ctx)
1717 t.Fatalf("%v.FullDuplexCall(_) = _, %v, want <nil>", tc, err)
1719 respParam[0].Size = int32(largeSize)
1720 sreq.Payload = smallPayload
1722 if err = stream.Send(sreq); err != nil {
1723 t.Fatalf("%v.Send(%v) = %v, want <nil>", stream, sreq, err)
1725 if _, err = stream.Recv(); err != nil {
1726 t.Fatalf("%v.Recv() = _, %v, want <nil>", stream, err)
1729 respParam[0].Size = int32(extraLargeSize)
1731 if err = stream.Send(sreq); err != nil {
1732 t.Fatalf("%v.Send(%v) = %v, want <nil>", stream, sreq, err)
1734 if _, err = stream.Recv(); err == nil || grpc.Code(err) != codes.ResourceExhausted {
1735 t.Fatalf("%v.Recv() = _, %v, want _, error code: %s", stream, err, codes.ResourceExhausted)
1738 // Test for streaming RPC send.
1739 respParam[0].Size = int32(smallSize)
1740 sreq.Payload = largePayload
1741 stream, err = tc.FullDuplexCall(te3.ctx)
1743 t.Fatalf("%v.FullDuplexCall(_) = _, %v, want <nil>", tc, err)
1745 if err := stream.Send(sreq); err != nil {
1746 t.Fatalf("%v.Send(%v) = %v, want <nil>", stream, sreq, err)
1748 sreq.Payload = extraLargePayload
1749 if err := stream.Send(sreq); err == nil || grpc.Code(err) != codes.ResourceExhausted {
1750 t.Fatalf("%v.Send(%v) = %v, want _, error code: %s", stream, sreq, err, codes.ResourceExhausted)
1754 func TestMaxMsgSizeClientDefault(t *testing.T) {
1755 defer leakcheck.Check(t)
1756 for _, e := range listTestEnv() {
1757 testMaxMsgSizeClientDefault(t, e)
1761 func testMaxMsgSizeClientDefault(t *testing.T, e env) {
1763 te.userAgent = testAppUA
1765 "transport: http2Client.notifyError got notified that the client transport was broken EOF",
1766 "grpc: addrConn.transportMonitor exits due to: grpc: the connection is closing",
1767 "grpc: addrConn.resetTransport failed to create client transport: connection error",
1768 "Failed to dial : context canceled; please retry.",
1770 te.startServer(&testServer{security: e.security})
1773 tc := testpb.NewTestServiceClient(te.clientConn())
1776 const largeSize = 4 * 1024 * 1024
1777 smallPayload, err := newPayload(testpb.PayloadType_COMPRESSABLE, smallSize)
1781 req := &testpb.SimpleRequest{
1782 ResponseType: testpb.PayloadType_COMPRESSABLE,
1783 ResponseSize: int32(largeSize),
1784 Payload: smallPayload,
1786 // Test for unary RPC recv.
1787 if _, err := tc.UnaryCall(context.Background(), req); err == nil || grpc.Code(err) != codes.ResourceExhausted {
1788 t.Fatalf("TestService/UnaryCall(_, _) = _, %v, want _, error code: %s", err, codes.ResourceExhausted)
1791 respParam := []*testpb.ResponseParameters{
1793 Size: int32(largeSize),
1796 sreq := &testpb.StreamingOutputCallRequest{
1797 ResponseType: testpb.PayloadType_COMPRESSABLE,
1798 ResponseParameters: respParam,
1799 Payload: smallPayload,
1802 // Test for streaming RPC recv.
1803 stream, err := tc.FullDuplexCall(te.ctx)
1805 t.Fatalf("%v.FullDuplexCall(_) = _, %v, want <nil>", tc, err)
1807 if err := stream.Send(sreq); err != nil {
1808 t.Fatalf("%v.Send(%v) = %v, want <nil>", stream, sreq, err)
1810 if _, err := stream.Recv(); err == nil || grpc.Code(err) != codes.ResourceExhausted {
1811 t.Fatalf("%v.Recv() = _, %v, want _, error code: %s", stream, err, codes.ResourceExhausted)
1815 func TestMaxMsgSizeClientAPI(t *testing.T) {
1816 defer leakcheck.Check(t)
1817 for _, e := range listTestEnv() {
1818 testMaxMsgSizeClientAPI(t, e)
1822 func testMaxMsgSizeClientAPI(t *testing.T, e env) {
1824 te.userAgent = testAppUA
1825 // To avoid error on server side.
1826 te.maxServerSendMsgSize = newInt(5 * 1024 * 1024)
1827 te.maxClientReceiveMsgSize = newInt(1024)
1828 te.maxClientSendMsgSize = newInt(1024)
1830 "transport: http2Client.notifyError got notified that the client transport was broken EOF",
1831 "grpc: addrConn.transportMonitor exits due to: grpc: the connection is closing",
1832 "grpc: addrConn.resetTransport failed to create client transport: connection error",
1833 "Failed to dial : context canceled; please retry.",
1835 te.startServer(&testServer{security: e.security})
1838 tc := testpb.NewTestServiceClient(te.clientConn())
1841 const largeSize = 1024
1842 smallPayload, err := newPayload(testpb.PayloadType_COMPRESSABLE, smallSize)
1847 largePayload, err := newPayload(testpb.PayloadType_COMPRESSABLE, largeSize)
1851 req := &testpb.SimpleRequest{
1852 ResponseType: testpb.PayloadType_COMPRESSABLE,
1853 ResponseSize: int32(largeSize),
1854 Payload: smallPayload,
1856 // Test for unary RPC recv.
1857 if _, err := tc.UnaryCall(context.Background(), req); err == nil || grpc.Code(err) != codes.ResourceExhausted {
1858 t.Fatalf("TestService/UnaryCall(_, _) = _, %v, want _, error code: %s", err, codes.ResourceExhausted)
1861 // Test for unary RPC send.
1862 req.Payload = largePayload
1863 req.ResponseSize = int32(smallSize)
1864 if _, err := tc.UnaryCall(context.Background(), req); err == nil || grpc.Code(err) != codes.ResourceExhausted {
1865 t.Fatalf("TestService/UnaryCall(_, _) = _, %v, want _, error code: %s", err, codes.ResourceExhausted)
1868 respParam := []*testpb.ResponseParameters{
1870 Size: int32(largeSize),
1873 sreq := &testpb.StreamingOutputCallRequest{
1874 ResponseType: testpb.PayloadType_COMPRESSABLE,
1875 ResponseParameters: respParam,
1876 Payload: smallPayload,
1879 // Test for streaming RPC recv.
1880 stream, err := tc.FullDuplexCall(te.ctx)
1882 t.Fatalf("%v.FullDuplexCall(_) = _, %v, want <nil>", tc, err)
1884 if err := stream.Send(sreq); err != nil {
1885 t.Fatalf("%v.Send(%v) = %v, want <nil>", stream, sreq, err)
1887 if _, err := stream.Recv(); err == nil || grpc.Code(err) != codes.ResourceExhausted {
1888 t.Fatalf("%v.Recv() = _, %v, want _, error code: %s", stream, err, codes.ResourceExhausted)
1891 // Test for streaming RPC send.
1892 respParam[0].Size = int32(smallSize)
1893 sreq.Payload = largePayload
1894 stream, err = tc.FullDuplexCall(te.ctx)
1896 t.Fatalf("%v.FullDuplexCall(_) = _, %v, want <nil>", tc, err)
1898 if err := stream.Send(sreq); err == nil || grpc.Code(err) != codes.ResourceExhausted {
1899 t.Fatalf("%v.Send(%v) = %v, want _, error code: %s", stream, sreq, err, codes.ResourceExhausted)
1903 func TestMaxMsgSizeServerAPI(t *testing.T) {
1904 defer leakcheck.Check(t)
1905 for _, e := range listTestEnv() {
1906 testMaxMsgSizeServerAPI(t, e)
1910 func testMaxMsgSizeServerAPI(t *testing.T, e env) {
1912 te.userAgent = testAppUA
1913 te.maxServerReceiveMsgSize = newInt(1024)
1914 te.maxServerSendMsgSize = newInt(1024)
1916 "transport: http2Client.notifyError got notified that the client transport was broken EOF",
1917 "grpc: addrConn.transportMonitor exits due to: grpc: the connection is closing",
1918 "grpc: addrConn.resetTransport failed to create client transport: connection error",
1919 "Failed to dial : context canceled; please retry.",
1921 te.startServer(&testServer{security: e.security})
1924 tc := testpb.NewTestServiceClient(te.clientConn())
1927 const largeSize = 1024
1928 smallPayload, err := newPayload(testpb.PayloadType_COMPRESSABLE, smallSize)
1933 largePayload, err := newPayload(testpb.PayloadType_COMPRESSABLE, largeSize)
1937 req := &testpb.SimpleRequest{
1938 ResponseType: testpb.PayloadType_COMPRESSABLE,
1939 ResponseSize: int32(largeSize),
1940 Payload: smallPayload,
1942 // Test for unary RPC send.
1943 if _, err := tc.UnaryCall(context.Background(), req); err == nil || grpc.Code(err) != codes.ResourceExhausted {
1944 t.Fatalf("TestService/UnaryCall(_, _) = _, %v, want _, error code: %s", err, codes.ResourceExhausted)
1947 // Test for unary RPC recv.
1948 req.Payload = largePayload
1949 req.ResponseSize = int32(smallSize)
1950 if _, err := tc.UnaryCall(context.Background(), req); err == nil || grpc.Code(err) != codes.ResourceExhausted {
1951 t.Fatalf("TestService/UnaryCall(_, _) = _, %v, want _, error code: %s", err, codes.ResourceExhausted)
1954 respParam := []*testpb.ResponseParameters{
1956 Size: int32(largeSize),
1959 sreq := &testpb.StreamingOutputCallRequest{
1960 ResponseType: testpb.PayloadType_COMPRESSABLE,
1961 ResponseParameters: respParam,
1962 Payload: smallPayload,
1965 // Test for streaming RPC send.
1966 stream, err := tc.FullDuplexCall(te.ctx)
1968 t.Fatalf("%v.FullDuplexCall(_) = _, %v, want <nil>", tc, err)
1970 if err := stream.Send(sreq); err != nil {
1971 t.Fatalf("%v.Send(%v) = %v, want <nil>", stream, sreq, err)
1973 if _, err := stream.Recv(); err == nil || grpc.Code(err) != codes.ResourceExhausted {
1974 t.Fatalf("%v.Recv() = _, %v, want _, error code: %s", stream, err, codes.ResourceExhausted)
1977 // Test for streaming RPC recv.
1978 respParam[0].Size = int32(smallSize)
1979 sreq.Payload = largePayload
1980 stream, err = tc.FullDuplexCall(te.ctx)
1982 t.Fatalf("%v.FullDuplexCall(_) = _, %v, want <nil>", tc, err)
1984 if err := stream.Send(sreq); err != nil {
1985 t.Fatalf("%v.Send(%v) = %v, want <nil>", stream, sreq, err)
1987 if _, err := stream.Recv(); err == nil || grpc.Code(err) != codes.ResourceExhausted {
1988 t.Fatalf("%v.Recv() = _, %v, want _, error code: %s", stream, err, codes.ResourceExhausted)
1992 func TestTap(t *testing.T) {
1993 defer leakcheck.Check(t)
1994 for _, e := range listTestEnv() {
1995 if e.name == "handler-tls" {
2006 func (t *myTap) handle(ctx context.Context, info *tap.Info) (context.Context, error) {
2008 if info.FullMethodName == "/grpc.testing.TestService/EmptyCall" {
2010 } else if info.FullMethodName == "/grpc.testing.TestService/UnaryCall" {
2011 return nil, fmt.Errorf("tap error")
2017 func testTap(t *testing.T, e env) {
2019 te.userAgent = testAppUA
2021 te.tapHandle = ttap.handle
2023 "transport: http2Client.notifyError got notified that the client transport was broken EOF",
2024 "grpc: addrConn.transportMonitor exits due to: grpc: the connection is closing",
2025 "grpc: addrConn.resetTransport failed to create client transport: connection error",
2027 te.startServer(&testServer{security: e.security})
2030 cc := te.clientConn()
2031 tc := testpb.NewTestServiceClient(cc)
2032 if _, err := tc.EmptyCall(context.Background(), &testpb.Empty{}); err != nil {
2033 t.Fatalf("TestService/EmptyCall(_, _) = _, %v, want _, <nil>", err)
2036 t.Fatalf("Get the count in ttap %d, want 1", ttap.cnt)
2039 payload, err := newPayload(testpb.PayloadType_COMPRESSABLE, 31)
2044 req := &testpb.SimpleRequest{
2045 ResponseType: testpb.PayloadType_COMPRESSABLE,
2049 if _, err := tc.UnaryCall(context.Background(), req); grpc.Code(err) != codes.Unavailable {
2050 t.Fatalf("TestService/UnaryCall(_, _) = _, %v, want _, %s", err, codes.Unavailable)
2054 func healthCheck(d time.Duration, cc *grpc.ClientConn, serviceName string) (*healthpb.HealthCheckResponse, error) {
2055 ctx, cancel := context.WithTimeout(context.Background(), d)
2057 hc := healthpb.NewHealthClient(cc)
2058 req := &healthpb.HealthCheckRequest{
2059 Service: serviceName,
2061 return hc.Check(ctx, req)
2064 func TestHealthCheckOnSuccess(t *testing.T) {
2065 defer leakcheck.Check(t)
2066 for _, e := range listTestEnv() {
2067 testHealthCheckOnSuccess(t, e)
2071 func testHealthCheckOnSuccess(t *testing.T, e env) {
2073 hs := health.NewServer()
2074 hs.SetServingStatus("grpc.health.v1.Health", 1)
2075 te.healthServer = hs
2076 te.startServer(&testServer{security: e.security})
2079 cc := te.clientConn()
2080 if _, err := healthCheck(1*time.Second, cc, "grpc.health.v1.Health"); err != nil {
2081 t.Fatalf("Health/Check(_, _) = _, %v, want _, <nil>", err)
2085 func TestHealthCheckOnFailure(t *testing.T) {
2086 defer leakcheck.Check(t)
2087 for _, e := range listTestEnv() {
2088 testHealthCheckOnFailure(t, e)
2092 func testHealthCheckOnFailure(t *testing.T, e env) {
2093 defer leakcheck.Check(t)
2097 "grpc: the client connection is closing; please retry",
2099 hs := health.NewServer()
2100 hs.SetServingStatus("grpc.health.v1.HealthCheck", 1)
2101 te.healthServer = hs
2102 te.startServer(&testServer{security: e.security})
2105 cc := te.clientConn()
2106 wantErr := grpc.Errorf(codes.DeadlineExceeded, "context deadline exceeded")
2107 if _, err := healthCheck(0*time.Second, cc, "grpc.health.v1.Health"); !reflect.DeepEqual(err, wantErr) {
2108 t.Fatalf("Health/Check(_, _) = _, %v, want _, error code %s", err, codes.DeadlineExceeded)
2110 awaitNewConnLogOutput()
2113 func TestHealthCheckOff(t *testing.T) {
2114 defer leakcheck.Check(t)
2115 for _, e := range listTestEnv() {
2116 // TODO(bradfitz): Temporarily skip this env due to #619.
2117 if e.name == "handler-tls" {
2120 testHealthCheckOff(t, e)
2124 func testHealthCheckOff(t *testing.T, e env) {
2126 te.startServer(&testServer{security: e.security})
2128 want := grpc.Errorf(codes.Unimplemented, "unknown service grpc.health.v1.Health")
2129 if _, err := healthCheck(1*time.Second, te.clientConn(), ""); !reflect.DeepEqual(err, want) {
2130 t.Fatalf("Health/Check(_, _) = _, %v, want _, %v", err, want)
2134 func TestUnknownHandler(t *testing.T) {
2135 defer leakcheck.Check(t)
2136 // An example unknownHandler that returns a different code and a different method, making sure that we do not
2137 // expose what methods are implemented to a client that is not authenticated.
2138 unknownHandler := func(srv interface{}, stream grpc.ServerStream) error {
2139 return grpc.Errorf(codes.Unauthenticated, "user unauthenticated")
2141 for _, e := range listTestEnv() {
2142 // TODO(bradfitz): Temporarily skip this env due to #619.
2143 if e.name == "handler-tls" {
2146 testUnknownHandler(t, e, unknownHandler)
2150 func testUnknownHandler(t *testing.T, e env, unknownHandler grpc.StreamHandler) {
2152 te.unknownHandler = unknownHandler
2153 te.startServer(&testServer{security: e.security})
2155 want := grpc.Errorf(codes.Unauthenticated, "user unauthenticated")
2156 if _, err := healthCheck(1*time.Second, te.clientConn(), ""); !reflect.DeepEqual(err, want) {
2157 t.Fatalf("Health/Check(_, _) = _, %v, want _, %v", err, want)
2161 func TestHealthCheckServingStatus(t *testing.T) {
2162 defer leakcheck.Check(t)
2163 for _, e := range listTestEnv() {
2164 testHealthCheckServingStatus(t, e)
2168 func testHealthCheckServingStatus(t *testing.T, e env) {
2170 hs := health.NewServer()
2171 te.healthServer = hs
2172 te.startServer(&testServer{security: e.security})
2175 cc := te.clientConn()
2176 out, err := healthCheck(1*time.Second, cc, "")
2178 t.Fatalf("Health/Check(_, _) = _, %v, want _, <nil>", err)
2180 if out.Status != healthpb.HealthCheckResponse_SERVING {
2181 t.Fatalf("Got the serving status %v, want SERVING", out.Status)
2183 wantErr := grpc.Errorf(codes.NotFound, "unknown service")
2184 if _, err := healthCheck(1*time.Second, cc, "grpc.health.v1.Health"); !reflect.DeepEqual(err, wantErr) {
2185 t.Fatalf("Health/Check(_, _) = _, %v, want _, error code %s", err, codes.NotFound)
2187 hs.SetServingStatus("grpc.health.v1.Health", healthpb.HealthCheckResponse_SERVING)
2188 out, err = healthCheck(1*time.Second, cc, "grpc.health.v1.Health")
2190 t.Fatalf("Health/Check(_, _) = _, %v, want _, <nil>", err)
2192 if out.Status != healthpb.HealthCheckResponse_SERVING {
2193 t.Fatalf("Got the serving status %v, want SERVING", out.Status)
2195 hs.SetServingStatus("grpc.health.v1.Health", healthpb.HealthCheckResponse_NOT_SERVING)
2196 out, err = healthCheck(1*time.Second, cc, "grpc.health.v1.Health")
2198 t.Fatalf("Health/Check(_, _) = _, %v, want _, <nil>", err)
2200 if out.Status != healthpb.HealthCheckResponse_NOT_SERVING {
2201 t.Fatalf("Got the serving status %v, want NOT_SERVING", out.Status)
2206 func TestErrorChanNoIO(t *testing.T) {
2207 defer leakcheck.Check(t)
2208 for _, e := range listTestEnv() {
2209 testErrorChanNoIO(t, e)
2213 func testErrorChanNoIO(t *testing.T, e env) {
2215 te.startServer(&testServer{security: e.security})
2218 tc := testpb.NewTestServiceClient(te.clientConn())
2219 if _, err := tc.FullDuplexCall(context.Background()); err != nil {
2220 t.Fatalf("%v.FullDuplexCall(_) = _, %v, want <nil>", tc, err)
2224 func TestEmptyUnaryWithUserAgent(t *testing.T) {
2225 defer leakcheck.Check(t)
2226 for _, e := range listTestEnv() {
2227 testEmptyUnaryWithUserAgent(t, e)
2231 func testEmptyUnaryWithUserAgent(t *testing.T, e env) {
2233 te.userAgent = testAppUA
2234 te.startServer(&testServer{security: e.security})
2237 cc := te.clientConn()
2238 tc := testpb.NewTestServiceClient(cc)
2239 var header metadata.MD
2240 reply, err := tc.EmptyCall(context.Background(), &testpb.Empty{}, grpc.Header(&header))
2241 if err != nil || !proto.Equal(&testpb.Empty{}, reply) {
2242 t.Fatalf("TestService/EmptyCall(_, _) = %v, %v, want %v, <nil>", reply, err, &testpb.Empty{})
2244 if v, ok := header["ua"]; !ok || !strings.HasPrefix(v[0], testAppUA) {
2245 t.Fatalf("header[\"ua\"] = %q, %t, want string with prefix %q, true", v, ok, testAppUA)
2251 func TestFailedEmptyUnary(t *testing.T) {
2252 defer leakcheck.Check(t)
2253 for _, e := range listTestEnv() {
2254 if e.name == "handler-tls" {
2255 // This test covers status details, but
2256 // Grpc-Status-Details-Bin is not support in handler_server.
2259 testFailedEmptyUnary(t, e)
2263 func testFailedEmptyUnary(t *testing.T, e env) {
2265 te.userAgent = failAppUA
2266 te.startServer(&testServer{security: e.security})
2268 tc := testpb.NewTestServiceClient(te.clientConn())
2270 ctx := metadata.NewOutgoingContext(context.Background(), testMetadata)
2271 wantErr := detailedError
2272 if _, err := tc.EmptyCall(ctx, &testpb.Empty{}); !reflect.DeepEqual(err, wantErr) {
2273 t.Fatalf("TestService/EmptyCall(_, _) = _, %v, want _, %v", err, wantErr)
2277 func TestLargeUnary(t *testing.T) {
2278 defer leakcheck.Check(t)
2279 for _, e := range listTestEnv() {
2280 testLargeUnary(t, e)
2284 func testLargeUnary(t *testing.T, e env) {
2286 te.startServer(&testServer{security: e.security})
2288 tc := testpb.NewTestServiceClient(te.clientConn())
2290 const argSize = 271828
2291 const respSize = 314159
2293 payload, err := newPayload(testpb.PayloadType_COMPRESSABLE, argSize)
2298 req := &testpb.SimpleRequest{
2299 ResponseType: testpb.PayloadType_COMPRESSABLE,
2300 ResponseSize: respSize,
2303 reply, err := tc.UnaryCall(context.Background(), req)
2305 t.Fatalf("TestService/UnaryCall(_, _) = _, %v, want _, <nil>", err)
2307 pt := reply.GetPayload().GetType()
2308 ps := len(reply.GetPayload().GetBody())
2309 if pt != testpb.PayloadType_COMPRESSABLE || ps != respSize {
2310 t.Fatalf("Got the reply with type %d len %d; want %d, %d", pt, ps, testpb.PayloadType_COMPRESSABLE, respSize)
2314 // Test backward-compatibility API for setting msg size limit.
2315 func TestExceedMsgLimit(t *testing.T) {
2316 defer leakcheck.Check(t)
2317 for _, e := range listTestEnv() {
2318 testExceedMsgLimit(t, e)
2322 func testExceedMsgLimit(t *testing.T, e env) {
2324 te.maxMsgSize = newInt(1024)
2325 te.startServer(&testServer{security: e.security})
2327 tc := testpb.NewTestServiceClient(te.clientConn())
2329 argSize := int32(*te.maxMsgSize + 1)
2332 payload, err := newPayload(testpb.PayloadType_COMPRESSABLE, argSize)
2336 smallPayload, err := newPayload(testpb.PayloadType_COMPRESSABLE, smallSize)
2341 // Test on server side for unary RPC.
2342 req := &testpb.SimpleRequest{
2343 ResponseType: testpb.PayloadType_COMPRESSABLE,
2344 ResponseSize: smallSize,
2347 if _, err := tc.UnaryCall(context.Background(), req); err == nil || grpc.Code(err) != codes.ResourceExhausted {
2348 t.Fatalf("TestService/UnaryCall(_, _) = _, %v, want _, error code: %s", err, codes.ResourceExhausted)
2350 // Test on client side for unary RPC.
2351 req.ResponseSize = int32(*te.maxMsgSize) + 1
2352 req.Payload = smallPayload
2353 if _, err := tc.UnaryCall(context.Background(), req); err == nil || grpc.Code(err) != codes.ResourceExhausted {
2354 t.Fatalf("TestService/UnaryCall(_, _) = _, %v, want _, error code: %s", err, codes.ResourceExhausted)
2357 // Test on server side for streaming RPC.
2358 stream, err := tc.FullDuplexCall(te.ctx)
2360 t.Fatalf("%v.FullDuplexCall(_) = _, %v, want <nil>", tc, err)
2362 respParam := []*testpb.ResponseParameters{
2368 spayload, err := newPayload(testpb.PayloadType_COMPRESSABLE, int32(*te.maxMsgSize+1))
2373 sreq := &testpb.StreamingOutputCallRequest{
2374 ResponseType: testpb.PayloadType_COMPRESSABLE,
2375 ResponseParameters: respParam,
2378 if err := stream.Send(sreq); err != nil {
2379 t.Fatalf("%v.Send(%v) = %v, want <nil>", stream, sreq, err)
2381 if _, err := stream.Recv(); err == nil || grpc.Code(err) != codes.ResourceExhausted {
2382 t.Fatalf("%v.Recv() = _, %v, want _, error code: %s", stream, err, codes.ResourceExhausted)
2385 // Test on client side for streaming RPC.
2386 stream, err = tc.FullDuplexCall(te.ctx)
2388 t.Fatalf("%v.FullDuplexCall(_) = _, %v, want <nil>", tc, err)
2390 respParam[0].Size = int32(*te.maxMsgSize) + 1
2391 sreq.Payload = smallPayload
2392 if err := stream.Send(sreq); err != nil {
2393 t.Fatalf("%v.Send(%v) = %v, want <nil>", stream, sreq, err)
2395 if _, err := stream.Recv(); err == nil || grpc.Code(err) != codes.ResourceExhausted {
2396 t.Fatalf("%v.Recv() = _, %v, want _, error code: %s", stream, err, codes.ResourceExhausted)
2401 func TestPeerClientSide(t *testing.T) {
2402 defer leakcheck.Check(t)
2403 for _, e := range listTestEnv() {
2404 testPeerClientSide(t, e)
2408 func testPeerClientSide(t *testing.T, e env) {
2410 te.userAgent = testAppUA
2411 te.startServer(&testServer{security: e.security})
2413 tc := testpb.NewTestServiceClient(te.clientConn())
2414 peer := new(peer.Peer)
2415 if _, err := tc.EmptyCall(context.Background(), &testpb.Empty{}, grpc.Peer(peer), grpc.FailFast(false)); err != nil {
2416 t.Fatalf("TestService/EmptyCall(_, _) = _, %v, want _, <nil>", err)
2418 pa := peer.Addr.String()
2419 if e.network == "unix" {
2420 if pa != te.srvAddr {
2421 t.Fatalf("peer.Addr = %v, want %v", pa, te.srvAddr)
2425 _, pp, err := net.SplitHostPort(pa)
2427 t.Fatalf("Failed to parse address from peer.")
2429 _, sp, err := net.SplitHostPort(te.srvAddr)
2431 t.Fatalf("Failed to parse address of test server.")
2434 t.Fatalf("peer.Addr = localhost:%v, want localhost:%v", pp, sp)
2438 // TestPeerNegative tests that if call fails setting peer
2439 // doesn't cause a segmentation fault.
2440 // issue#1141 https://github.com/grpc/grpc-go/issues/1141
2441 func TestPeerNegative(t *testing.T) {
2442 defer leakcheck.Check(t)
2443 for _, e := range listTestEnv() {
2444 testPeerNegative(t, e)
2448 func testPeerNegative(t *testing.T, e env) {
2450 te.startServer(&testServer{security: e.security})
2453 cc := te.clientConn()
2454 tc := testpb.NewTestServiceClient(cc)
2455 peer := new(peer.Peer)
2456 ctx, cancel := context.WithCancel(context.Background())
2458 tc.EmptyCall(ctx, &testpb.Empty{}, grpc.Peer(peer))
2461 func TestPeerFailedRPC(t *testing.T) {
2462 defer leakcheck.Check(t)
2463 for _, e := range listTestEnv() {
2464 testPeerFailedRPC(t, e)
2468 func testPeerFailedRPC(t *testing.T, e env) {
2470 te.maxServerReceiveMsgSize = newInt(1 * 1024)
2471 te.startServer(&testServer{security: e.security})
2474 tc := testpb.NewTestServiceClient(te.clientConn())
2476 // first make a successful request to the server
2477 if _, err := tc.EmptyCall(context.Background(), &testpb.Empty{}); err != nil {
2478 t.Fatalf("TestService/EmptyCall(_, _) = _, %v, want _, <nil>", err)
2481 // make a second request that will be rejected by the server
2482 const largeSize = 5 * 1024
2483 largePayload, err := newPayload(testpb.PayloadType_COMPRESSABLE, largeSize)
2487 req := &testpb.SimpleRequest{
2488 ResponseType: testpb.PayloadType_COMPRESSABLE,
2489 Payload: largePayload,
2492 peer := new(peer.Peer)
2493 if _, err := tc.UnaryCall(context.Background(), req, grpc.Peer(peer)); err == nil || grpc.Code(err) != codes.ResourceExhausted {
2494 t.Fatalf("TestService/UnaryCall(_, _) = _, %v, want _, error code: %s", err, codes.ResourceExhausted)
2496 pa := peer.Addr.String()
2497 if e.network == "unix" {
2498 if pa != te.srvAddr {
2499 t.Fatalf("peer.Addr = %v, want %v", pa, te.srvAddr)
2503 _, pp, err := net.SplitHostPort(pa)
2505 t.Fatalf("Failed to parse address from peer.")
2507 _, sp, err := net.SplitHostPort(te.srvAddr)
2509 t.Fatalf("Failed to parse address of test server.")
2512 t.Fatalf("peer.Addr = localhost:%v, want localhost:%v", pp, sp)
2517 func TestMetadataUnaryRPC(t *testing.T) {
2518 defer leakcheck.Check(t)
2519 for _, e := range listTestEnv() {
2520 testMetadataUnaryRPC(t, e)
2524 func testMetadataUnaryRPC(t *testing.T, e env) {
2526 te.startServer(&testServer{security: e.security})
2528 tc := testpb.NewTestServiceClient(te.clientConn())
2530 const argSize = 2718
2531 const respSize = 314
2533 payload, err := newPayload(testpb.PayloadType_COMPRESSABLE, argSize)
2538 req := &testpb.SimpleRequest{
2539 ResponseType: testpb.PayloadType_COMPRESSABLE,
2540 ResponseSize: respSize,
2543 var header, trailer metadata.MD
2544 ctx := metadata.NewOutgoingContext(context.Background(), testMetadata)
2545 if _, err := tc.UnaryCall(ctx, req, grpc.Header(&header), grpc.Trailer(&trailer)); err != nil {
2546 t.Fatalf("TestService.UnaryCall(%v, _, _, _) = _, %v; want _, <nil>", ctx, err)
2548 // Ignore optional response headers that Servers may set:
2550 delete(header, "trailer") // RFC 2616 says server SHOULD (but optional) declare trailers
2551 delete(header, "date") // the Date header is also optional
2552 delete(header, "user-agent")
2554 if !reflect.DeepEqual(header, testMetadata) {
2555 t.Fatalf("Received header metadata %v, want %v", header, testMetadata)
2557 if !reflect.DeepEqual(trailer, testTrailerMetadata) {
2558 t.Fatalf("Received trailer metadata %v, want %v", trailer, testTrailerMetadata)
2562 func TestMultipleSetTrailerUnaryRPC(t *testing.T) {
2563 defer leakcheck.Check(t)
2564 for _, e := range listTestEnv() {
2565 testMultipleSetTrailerUnaryRPC(t, e)
2569 func testMultipleSetTrailerUnaryRPC(t *testing.T, e env) {
2571 te.startServer(&testServer{security: e.security, multipleSetTrailer: true})
2573 tc := testpb.NewTestServiceClient(te.clientConn())
2579 payload, err := newPayload(testpb.PayloadType_COMPRESSABLE, argSize)
2584 req := &testpb.SimpleRequest{
2585 ResponseType: testpb.PayloadType_COMPRESSABLE,
2586 ResponseSize: respSize,
2589 var trailer metadata.MD
2590 ctx := metadata.NewOutgoingContext(context.Background(), testMetadata)
2591 if _, err := tc.UnaryCall(ctx, req, grpc.Trailer(&trailer), grpc.FailFast(false)); err != nil {
2592 t.Fatalf("TestService.UnaryCall(%v, _, _, _) = _, %v; want _, <nil>", ctx, err)
2594 expectedTrailer := metadata.Join(testTrailerMetadata, testTrailerMetadata2)
2595 if !reflect.DeepEqual(trailer, expectedTrailer) {
2596 t.Fatalf("Received trailer metadata %v, want %v", trailer, expectedTrailer)
2600 func TestMultipleSetTrailerStreamingRPC(t *testing.T) {
2601 defer leakcheck.Check(t)
2602 for _, e := range listTestEnv() {
2603 testMultipleSetTrailerStreamingRPC(t, e)
2607 func testMultipleSetTrailerStreamingRPC(t *testing.T, e env) {
2609 te.startServer(&testServer{security: e.security, multipleSetTrailer: true})
2611 tc := testpb.NewTestServiceClient(te.clientConn())
2613 ctx := metadata.NewOutgoingContext(context.Background(), testMetadata)
2614 stream, err := tc.FullDuplexCall(ctx, grpc.FailFast(false))
2616 t.Fatalf("%v.FullDuplexCall(_) = _, %v, want <nil>", tc, err)
2618 if err := stream.CloseSend(); err != nil {
2619 t.Fatalf("%v.CloseSend() got %v, want %v", stream, err, nil)
2621 if _, err := stream.Recv(); err != io.EOF {
2622 t.Fatalf("%v failed to complele the FullDuplexCall: %v", stream, err)
2625 trailer := stream.Trailer()
2626 expectedTrailer := metadata.Join(testTrailerMetadata, testTrailerMetadata2)
2627 if !reflect.DeepEqual(trailer, expectedTrailer) {
2628 t.Fatalf("Received trailer metadata %v, want %v", trailer, expectedTrailer)
2632 func TestSetAndSendHeaderUnaryRPC(t *testing.T) {
2633 defer leakcheck.Check(t)
2634 for _, e := range listTestEnv() {
2635 if e.name == "handler-tls" {
2638 testSetAndSendHeaderUnaryRPC(t, e)
2642 // To test header metadata is sent on SendHeader().
2643 func testSetAndSendHeaderUnaryRPC(t *testing.T, e env) {
2645 te.startServer(&testServer{security: e.security, setAndSendHeader: true})
2647 tc := testpb.NewTestServiceClient(te.clientConn())
2653 payload, err := newPayload(testpb.PayloadType_COMPRESSABLE, argSize)
2658 req := &testpb.SimpleRequest{
2659 ResponseType: testpb.PayloadType_COMPRESSABLE,
2660 ResponseSize: respSize,
2663 var header metadata.MD
2664 ctx := metadata.NewOutgoingContext(context.Background(), testMetadata)
2665 if _, err := tc.UnaryCall(ctx, req, grpc.Header(&header), grpc.FailFast(false)); err != nil {
2666 t.Fatalf("TestService.UnaryCall(%v, _, _, _) = _, %v; want _, <nil>", ctx, err)
2668 delete(header, "user-agent")
2669 expectedHeader := metadata.Join(testMetadata, testMetadata2)
2670 if !reflect.DeepEqual(header, expectedHeader) {
2671 t.Fatalf("Received header metadata %v, want %v", header, expectedHeader)
2675 func TestMultipleSetHeaderUnaryRPC(t *testing.T) {
2676 defer leakcheck.Check(t)
2677 for _, e := range listTestEnv() {
2678 if e.name == "handler-tls" {
2681 testMultipleSetHeaderUnaryRPC(t, e)
2685 // To test header metadata is sent when sending response.
2686 func testMultipleSetHeaderUnaryRPC(t *testing.T, e env) {
2688 te.startServer(&testServer{security: e.security, setHeaderOnly: true})
2690 tc := testpb.NewTestServiceClient(te.clientConn())
2696 payload, err := newPayload(testpb.PayloadType_COMPRESSABLE, argSize)
2701 req := &testpb.SimpleRequest{
2702 ResponseType: testpb.PayloadType_COMPRESSABLE,
2703 ResponseSize: respSize,
2707 var header metadata.MD
2708 ctx := metadata.NewOutgoingContext(context.Background(), testMetadata)
2709 if _, err := tc.UnaryCall(ctx, req, grpc.Header(&header), grpc.FailFast(false)); err != nil {
2710 t.Fatalf("TestService.UnaryCall(%v, _, _, _) = _, %v; want _, <nil>", ctx, err)
2712 delete(header, "user-agent")
2713 expectedHeader := metadata.Join(testMetadata, testMetadata2)
2714 if !reflect.DeepEqual(header, expectedHeader) {
2715 t.Fatalf("Received header metadata %v, want %v", header, expectedHeader)
2719 func TestMultipleSetHeaderUnaryRPCError(t *testing.T) {
2720 defer leakcheck.Check(t)
2721 for _, e := range listTestEnv() {
2722 if e.name == "handler-tls" {
2725 testMultipleSetHeaderUnaryRPCError(t, e)
2729 // To test header metadata is sent when sending status.
2730 func testMultipleSetHeaderUnaryRPCError(t *testing.T, e env) {
2732 te.startServer(&testServer{security: e.security, setHeaderOnly: true})
2734 tc := testpb.NewTestServiceClient(te.clientConn())
2738 respSize = -1 // Invalid respSize to make RPC fail.
2740 payload, err := newPayload(testpb.PayloadType_COMPRESSABLE, argSize)
2745 req := &testpb.SimpleRequest{
2746 ResponseType: testpb.PayloadType_COMPRESSABLE,
2747 ResponseSize: respSize,
2750 var header metadata.MD
2751 ctx := metadata.NewOutgoingContext(context.Background(), testMetadata)
2752 if _, err := tc.UnaryCall(ctx, req, grpc.Header(&header), grpc.FailFast(false)); err == nil {
2753 t.Fatalf("TestService.UnaryCall(%v, _, _, _) = _, %v; want _, <non-nil>", ctx, err)
2755 delete(header, "user-agent")
2756 expectedHeader := metadata.Join(testMetadata, testMetadata2)
2757 if !reflect.DeepEqual(header, expectedHeader) {
2758 t.Fatalf("Received header metadata %v, want %v", header, expectedHeader)
2762 func TestSetAndSendHeaderStreamingRPC(t *testing.T) {
2763 defer leakcheck.Check(t)
2764 for _, e := range listTestEnv() {
2765 if e.name == "handler-tls" {
2768 testSetAndSendHeaderStreamingRPC(t, e)
2772 // To test header metadata is sent on SendHeader().
2773 func testSetAndSendHeaderStreamingRPC(t *testing.T, e env) {
2775 te.startServer(&testServer{security: e.security, setAndSendHeader: true})
2777 tc := testpb.NewTestServiceClient(te.clientConn())
2783 ctx := metadata.NewOutgoingContext(context.Background(), testMetadata)
2784 stream, err := tc.FullDuplexCall(ctx)
2786 t.Fatalf("%v.FullDuplexCall(_) = _, %v, want <nil>", tc, err)
2788 if err := stream.CloseSend(); err != nil {
2789 t.Fatalf("%v.CloseSend() got %v, want %v", stream, err, nil)
2791 if _, err := stream.Recv(); err != io.EOF {
2792 t.Fatalf("%v failed to complele the FullDuplexCall: %v", stream, err)
2795 header, err := stream.Header()
2797 t.Fatalf("%v.Header() = _, %v, want _, <nil>", stream, err)
2799 delete(header, "user-agent")
2800 expectedHeader := metadata.Join(testMetadata, testMetadata2)
2801 if !reflect.DeepEqual(header, expectedHeader) {
2802 t.Fatalf("Received header metadata %v, want %v", header, expectedHeader)
2806 func TestMultipleSetHeaderStreamingRPC(t *testing.T) {
2807 defer leakcheck.Check(t)
2808 for _, e := range listTestEnv() {
2809 if e.name == "handler-tls" {
2812 testMultipleSetHeaderStreamingRPC(t, e)
2816 // To test header metadata is sent when sending response.
2817 func testMultipleSetHeaderStreamingRPC(t *testing.T, e env) {
2819 te.startServer(&testServer{security: e.security, setHeaderOnly: true})
2821 tc := testpb.NewTestServiceClient(te.clientConn())
2827 ctx := metadata.NewOutgoingContext(context.Background(), testMetadata)
2828 stream, err := tc.FullDuplexCall(ctx)
2830 t.Fatalf("%v.FullDuplexCall(_) = _, %v, want <nil>", tc, err)
2833 payload, err := newPayload(testpb.PayloadType_COMPRESSABLE, argSize)
2838 req := &testpb.StreamingOutputCallRequest{
2839 ResponseType: testpb.PayloadType_COMPRESSABLE,
2840 ResponseParameters: []*testpb.ResponseParameters{
2845 if err := stream.Send(req); err != nil {
2846 t.Fatalf("%v.Send(%v) = %v, want <nil>", stream, req, err)
2848 if _, err := stream.Recv(); err != nil {
2849 t.Fatalf("%v.Recv() = %v, want <nil>", stream, err)
2851 if err := stream.CloseSend(); err != nil {
2852 t.Fatalf("%v.CloseSend() got %v, want %v", stream, err, nil)
2854 if _, err := stream.Recv(); err != io.EOF {
2855 t.Fatalf("%v failed to complele the FullDuplexCall: %v", stream, err)
2858 header, err := stream.Header()
2860 t.Fatalf("%v.Header() = _, %v, want _, <nil>", stream, err)
2862 delete(header, "user-agent")
2863 expectedHeader := metadata.Join(testMetadata, testMetadata2)
2864 if !reflect.DeepEqual(header, expectedHeader) {
2865 t.Fatalf("Received header metadata %v, want %v", header, expectedHeader)
2870 func TestMultipleSetHeaderStreamingRPCError(t *testing.T) {
2871 defer leakcheck.Check(t)
2872 for _, e := range listTestEnv() {
2873 if e.name == "handler-tls" {
2876 testMultipleSetHeaderStreamingRPCError(t, e)
2880 // To test header metadata is sent when sending status.
2881 func testMultipleSetHeaderStreamingRPCError(t *testing.T, e env) {
2883 te.startServer(&testServer{security: e.security, setHeaderOnly: true})
2885 tc := testpb.NewTestServiceClient(te.clientConn())
2891 ctx := metadata.NewOutgoingContext(context.Background(), testMetadata)
2892 stream, err := tc.FullDuplexCall(ctx)
2894 t.Fatalf("%v.FullDuplexCall(_) = _, %v, want <nil>", tc, err)
2897 payload, err := newPayload(testpb.PayloadType_COMPRESSABLE, argSize)
2902 req := &testpb.StreamingOutputCallRequest{
2903 ResponseType: testpb.PayloadType_COMPRESSABLE,
2904 ResponseParameters: []*testpb.ResponseParameters{
2909 if err := stream.Send(req); err != nil {
2910 t.Fatalf("%v.Send(%v) = %v, want <nil>", stream, req, err)
2912 if _, err := stream.Recv(); err == nil {
2913 t.Fatalf("%v.Recv() = %v, want <non-nil>", stream, err)
2916 header, err := stream.Header()
2918 t.Fatalf("%v.Header() = _, %v, want _, <nil>", stream, err)
2920 delete(header, "user-agent")
2921 expectedHeader := metadata.Join(testMetadata, testMetadata2)
2922 if !reflect.DeepEqual(header, expectedHeader) {
2923 t.Fatalf("Received header metadata %v, want %v", header, expectedHeader)
2926 if err := stream.CloseSend(); err != nil {
2927 t.Fatalf("%v.CloseSend() got %v, want %v", stream, err, nil)
2931 // TestMalformedHTTP2Metedata verfies the returned error when the client
2932 // sends an illegal metadata.
2933 func TestMalformedHTTP2Metadata(t *testing.T) {
2934 defer leakcheck.Check(t)
2935 for _, e := range listTestEnv() {
2936 if e.name == "handler-tls" {
2937 // Failed with "server stops accepting new RPCs".
2938 // Server stops accepting new RPCs when the client sends an illegal http2 header.
2941 testMalformedHTTP2Metadata(t, e)
2945 func testMalformedHTTP2Metadata(t *testing.T, e env) {
2947 te.startServer(&testServer{security: e.security})
2949 tc := testpb.NewTestServiceClient(te.clientConn())
2951 payload, err := newPayload(testpb.PayloadType_COMPRESSABLE, 2718)
2956 req := &testpb.SimpleRequest{
2957 ResponseType: testpb.PayloadType_COMPRESSABLE,
2961 ctx := metadata.NewOutgoingContext(context.Background(), malformedHTTP2Metadata)
2962 if _, err := tc.UnaryCall(ctx, req); grpc.Code(err) != codes.Internal {
2963 t.Fatalf("TestService.UnaryCall(%v, _) = _, %v; want _, %s", ctx, err, codes.Internal)
2967 func performOneRPC(t *testing.T, tc testpb.TestServiceClient, wg *sync.WaitGroup) {
2969 const argSize = 2718
2970 const respSize = 314
2972 payload, err := newPayload(testpb.PayloadType_COMPRESSABLE, argSize)
2978 req := &testpb.SimpleRequest{
2979 ResponseType: testpb.PayloadType_COMPRESSABLE,
2980 ResponseSize: respSize,
2983 reply, err := tc.UnaryCall(context.Background(), req, grpc.FailFast(false))
2985 t.Errorf("TestService/UnaryCall(_, _) = _, %v, want _, <nil>", err)
2988 pt := reply.GetPayload().GetType()
2989 ps := len(reply.GetPayload().GetBody())
2990 if pt != testpb.PayloadType_COMPRESSABLE || ps != respSize {
2991 t.Errorf("Got reply with type %d len %d; want %d, %d", pt, ps, testpb.PayloadType_COMPRESSABLE, respSize)
2996 func TestRetry(t *testing.T) {
2997 defer leakcheck.Check(t)
2998 for _, e := range listTestEnv() {
2999 if e.name == "handler-tls" {
3000 // Fails with RST_STREAM / FLOW_CONTROL_ERROR
3007 // This test mimics a user who sends 1000 RPCs concurrently on a faulty transport.
3008 // TODO(zhaoq): Refactor to make this clearer and add more cases to test racy
3009 // and error-prone paths.
3010 func testRetry(t *testing.T, e env) {
3012 te.declareLogNoise("transport: http2Client.notifyError got notified that the client transport was broken")
3013 te.startServer(&testServer{security: e.security})
3016 cc := te.clientConn()
3017 tc := testpb.NewTestServiceClient(cc)
3018 var wg sync.WaitGroup
3021 rpcSpacing := 2 * time.Millisecond
3023 // The race detector has a limit on how many goroutines it can track.
3024 // This test is near the upper limit, and goes over the limit
3025 // depending on the environment (the http.Handler environment uses
3027 t.Logf("Shortening test in race mode.")
3034 // Halfway through starting RPCs, kill all connections:
3035 time.Sleep(time.Duration(numRPC/2) * rpcSpacing)
3037 // The server shuts down the network connection to make a
3038 // transport error which will be detected by the client side
3040 internal.TestingCloseConns(te.srv)
3043 // All these RPCs should succeed eventually.
3044 for i := 0; i < numRPC; i++ {
3045 time.Sleep(rpcSpacing)
3047 go performOneRPC(t, tc, &wg)
3052 func TestRPCTimeout(t *testing.T) {
3053 defer leakcheck.Check(t)
3054 for _, e := range listTestEnv() {
3055 testRPCTimeout(t, e)
3059 // TODO(zhaoq): Have a better test coverage of timeout and cancellation mechanism.
3060 func testRPCTimeout(t *testing.T, e env) {
3062 te.startServer(&testServer{security: e.security, unaryCallSleepTime: 50 * time.Millisecond})
3065 cc := te.clientConn()
3066 tc := testpb.NewTestServiceClient(cc)
3068 const argSize = 2718
3069 const respSize = 314
3071 payload, err := newPayload(testpb.PayloadType_COMPRESSABLE, argSize)
3076 req := &testpb.SimpleRequest{
3077 ResponseType: testpb.PayloadType_COMPRESSABLE,
3078 ResponseSize: respSize,
3081 for i := -1; i <= 10; i++ {
3082 ctx, cancel := context.WithTimeout(context.Background(), time.Duration(i)*time.Millisecond)
3083 if _, err := tc.UnaryCall(ctx, req); grpc.Code(err) != codes.DeadlineExceeded {
3084 t.Fatalf("TestService/UnaryCallv(_, _) = _, %v; want <nil>, error code: %s", err, codes.DeadlineExceeded)
3090 func TestCancel(t *testing.T) {
3091 defer leakcheck.Check(t)
3092 for _, e := range listTestEnv() {
3097 func testCancel(t *testing.T, e env) {
3099 te.declareLogNoise("grpc: the client connection is closing; please retry")
3100 te.startServer(&testServer{security: e.security, unaryCallSleepTime: time.Second})
3103 cc := te.clientConn()
3104 tc := testpb.NewTestServiceClient(cc)
3106 const argSize = 2718
3107 const respSize = 314
3109 payload, err := newPayload(testpb.PayloadType_COMPRESSABLE, argSize)
3114 req := &testpb.SimpleRequest{
3115 ResponseType: testpb.PayloadType_COMPRESSABLE,
3116 ResponseSize: respSize,
3119 ctx, cancel := context.WithCancel(context.Background())
3120 time.AfterFunc(1*time.Millisecond, cancel)
3121 if r, err := tc.UnaryCall(ctx, req); grpc.Code(err) != codes.Canceled {
3122 t.Fatalf("TestService/UnaryCall(_, _) = %v, %v; want _, error code: %s", r, err, codes.Canceled)
3124 awaitNewConnLogOutput()
3127 func TestCancelNoIO(t *testing.T) {
3128 defer leakcheck.Check(t)
3129 for _, e := range listTestEnv() {
3130 testCancelNoIO(t, e)
3134 func testCancelNoIO(t *testing.T, e env) {
3136 te.declareLogNoise("http2Client.notifyError got notified that the client transport was broken")
3137 te.maxStream = 1 // Only allows 1 live stream per server transport.
3138 te.startServer(&testServer{security: e.security})
3141 cc := te.clientConn()
3142 tc := testpb.NewTestServiceClient(cc)
3144 // Start one blocked RPC for which we'll never send streaming
3145 // input. This will consume the 1 maximum concurrent streams,
3146 // causing future RPCs to hang.
3147 ctx, cancelFirst := context.WithCancel(context.Background())
3148 _, err := tc.StreamingInputCall(ctx)
3150 t.Fatalf("%v.StreamingInputCall(_) = _, %v, want _, <nil>", tc, err)
3153 // Loop until the ClientConn receives the initial settings
3154 // frame from the server, notifying it about the maximum
3155 // concurrent streams. We know when it's received it because
3156 // an RPC will fail with codes.DeadlineExceeded instead of
3158 // TODO(bradfitz): add internal test hook for this (Issue 534)
3160 ctx, cancelSecond := context.WithTimeout(context.Background(), 50*time.Millisecond)
3161 _, err := tc.StreamingInputCall(ctx)
3166 if grpc.Code(err) == codes.DeadlineExceeded {
3169 t.Fatalf("%v.StreamingInputCall(_) = _, %v, want _, %s", tc, err, codes.DeadlineExceeded)
3171 // If there are any RPCs in flight before the client receives
3172 // the max streams setting, let them be expired.
3173 // TODO(bradfitz): add internal test hook for this (Issue 534)
3174 time.Sleep(50 * time.Millisecond)
3177 time.Sleep(50 * time.Millisecond)
3181 // This should be blocked until the 1st is canceled, then succeed.
3182 ctx, cancelThird := context.WithTimeout(context.Background(), 500*time.Millisecond)
3183 if _, err := tc.StreamingInputCall(ctx); err != nil {
3184 t.Errorf("%v.StreamingInputCall(_) = _, %v, want _, <nil>", tc, err)
3189 // The following tests the gRPC streaming RPC implementations.
3190 // TODO(zhaoq): Have better coverage on error cases.
3192 reqSizes = []int{27182, 8, 1828, 45904}
3193 respSizes = []int{31415, 9, 2653, 58979}
3196 func TestNoService(t *testing.T) {
3197 defer leakcheck.Check(t)
3198 for _, e := range listTestEnv() {
3203 func testNoService(t *testing.T, e env) {
3208 cc := te.clientConn()
3209 tc := testpb.NewTestServiceClient(cc)
3211 stream, err := tc.FullDuplexCall(te.ctx, grpc.FailFast(false))
3213 t.Fatalf("%v.FullDuplexCall(_) = _, %v, want <nil>", tc, err)
3215 if _, err := stream.Recv(); grpc.Code(err) != codes.Unimplemented {
3216 t.Fatalf("stream.Recv() = _, %v, want _, error code %s", err, codes.Unimplemented)
3220 func TestPingPong(t *testing.T) {
3221 defer leakcheck.Check(t)
3222 for _, e := range listTestEnv() {
3227 func testPingPong(t *testing.T, e env) {
3229 te.startServer(&testServer{security: e.security})
3231 tc := testpb.NewTestServiceClient(te.clientConn())
3233 stream, err := tc.FullDuplexCall(te.ctx)
3235 t.Fatalf("%v.FullDuplexCall(_) = _, %v, want <nil>", tc, err)
3238 for index < len(reqSizes) {
3239 respParam := []*testpb.ResponseParameters{
3241 Size: int32(respSizes[index]),
3245 payload, err := newPayload(testpb.PayloadType_COMPRESSABLE, int32(reqSizes[index]))
3250 req := &testpb.StreamingOutputCallRequest{
3251 ResponseType: testpb.PayloadType_COMPRESSABLE,
3252 ResponseParameters: respParam,
3255 if err := stream.Send(req); err != nil {
3256 t.Fatalf("%v.Send(%v) = %v, want <nil>", stream, req, err)
3258 reply, err := stream.Recv()
3260 t.Fatalf("%v.Recv() = %v, want <nil>", stream, err)
3262 pt := reply.GetPayload().GetType()
3263 if pt != testpb.PayloadType_COMPRESSABLE {
3264 t.Fatalf("Got the reply of type %d, want %d", pt, testpb.PayloadType_COMPRESSABLE)
3266 size := len(reply.GetPayload().GetBody())
3267 if size != int(respSizes[index]) {
3268 t.Fatalf("Got reply body of length %d, want %d", size, respSizes[index])
3272 if err := stream.CloseSend(); err != nil {
3273 t.Fatalf("%v.CloseSend() got %v, want %v", stream, err, nil)
3275 if _, err := stream.Recv(); err != io.EOF {
3276 t.Fatalf("%v failed to complele the ping pong test: %v", stream, err)
3280 func TestMetadataStreamingRPC(t *testing.T) {
3281 defer leakcheck.Check(t)
3282 for _, e := range listTestEnv() {
3283 testMetadataStreamingRPC(t, e)
3287 func testMetadataStreamingRPC(t *testing.T, e env) {
3289 te.startServer(&testServer{security: e.security})
3291 tc := testpb.NewTestServiceClient(te.clientConn())
3293 ctx := metadata.NewOutgoingContext(te.ctx, testMetadata)
3294 stream, err := tc.FullDuplexCall(ctx)
3296 t.Fatalf("%v.FullDuplexCall(_) = _, %v, want <nil>", tc, err)
3299 headerMD, err := stream.Header()
3300 if e.security == "tls" {
3301 delete(headerMD, "transport_security_type")
3303 delete(headerMD, "trailer") // ignore if present
3304 delete(headerMD, "user-agent")
3305 if err != nil || !reflect.DeepEqual(testMetadata, headerMD) {
3306 t.Errorf("#1 %v.Header() = %v, %v, want %v, <nil>", stream, headerMD, err, testMetadata)
3308 // test the cached value.
3309 headerMD, err = stream.Header()
3310 delete(headerMD, "trailer") // ignore if present
3311 delete(headerMD, "user-agent")
3312 if err != nil || !reflect.DeepEqual(testMetadata, headerMD) {
3313 t.Errorf("#2 %v.Header() = %v, %v, want %v, <nil>", stream, headerMD, err, testMetadata)
3315 err = func() error {
3316 for index := 0; index < len(reqSizes); index++ {
3317 respParam := []*testpb.ResponseParameters{
3319 Size: int32(respSizes[index]),
3323 payload, err := newPayload(testpb.PayloadType_COMPRESSABLE, int32(reqSizes[index]))
3328 req := &testpb.StreamingOutputCallRequest{
3329 ResponseType: testpb.PayloadType_COMPRESSABLE,
3330 ResponseParameters: respParam,
3333 if err := stream.Send(req); err != nil {
3334 return fmt.Errorf("%v.Send(%v) = %v, want <nil>", stream, req, err)
3339 // Tell the server we're done sending args.
3346 if _, err := stream.Recv(); err != nil {
3350 trailerMD := stream.Trailer()
3351 if !reflect.DeepEqual(testTrailerMetadata, trailerMD) {
3352 t.Fatalf("%v.Trailer() = %v, want %v", stream, trailerMD, testTrailerMetadata)
3356 func TestServerStreaming(t *testing.T) {
3357 defer leakcheck.Check(t)
3358 for _, e := range listTestEnv() {
3359 testServerStreaming(t, e)
3363 func testServerStreaming(t *testing.T, e env) {
3365 te.startServer(&testServer{security: e.security})
3367 tc := testpb.NewTestServiceClient(te.clientConn())
3369 respParam := make([]*testpb.ResponseParameters, len(respSizes))
3370 for i, s := range respSizes {
3371 respParam[i] = &testpb.ResponseParameters{
3375 req := &testpb.StreamingOutputCallRequest{
3376 ResponseType: testpb.PayloadType_COMPRESSABLE,
3377 ResponseParameters: respParam,
3379 stream, err := tc.StreamingOutputCall(context.Background(), req)
3381 t.Fatalf("%v.StreamingOutputCall(_) = _, %v, want <nil>", tc, err)
3387 reply, err := stream.Recv()
3392 pt := reply.GetPayload().GetType()
3393 if pt != testpb.PayloadType_COMPRESSABLE {
3394 t.Fatalf("Got the reply of type %d, want %d", pt, testpb.PayloadType_COMPRESSABLE)
3396 size := len(reply.GetPayload().GetBody())
3397 if size != int(respSizes[index]) {
3398 t.Fatalf("Got reply body of length %d, want %d", size, respSizes[index])
3403 if rpcStatus != io.EOF {
3404 t.Fatalf("Failed to finish the server streaming rpc: %v, want <EOF>", rpcStatus)
3406 if respCnt != len(respSizes) {
3407 t.Fatalf("Got %d reply, want %d", len(respSizes), respCnt)
3411 func TestFailedServerStreaming(t *testing.T) {
3412 defer leakcheck.Check(t)
3413 for _, e := range listTestEnv() {
3414 testFailedServerStreaming(t, e)
3418 func testFailedServerStreaming(t *testing.T, e env) {
3420 te.userAgent = failAppUA
3421 te.startServer(&testServer{security: e.security})
3423 tc := testpb.NewTestServiceClient(te.clientConn())
3425 respParam := make([]*testpb.ResponseParameters, len(respSizes))
3426 for i, s := range respSizes {
3427 respParam[i] = &testpb.ResponseParameters{
3431 req := &testpb.StreamingOutputCallRequest{
3432 ResponseType: testpb.PayloadType_COMPRESSABLE,
3433 ResponseParameters: respParam,
3435 ctx := metadata.NewOutgoingContext(te.ctx, testMetadata)
3436 stream, err := tc.StreamingOutputCall(ctx, req)
3438 t.Fatalf("%v.StreamingOutputCall(_) = _, %v, want <nil>", tc, err)
3440 wantErr := grpc.Errorf(codes.DataLoss, "error for testing: "+failAppUA)
3441 if _, err := stream.Recv(); !reflect.DeepEqual(err, wantErr) {
3442 t.Fatalf("%v.Recv() = _, %v, want _, %v", stream, err, wantErr)
3446 // concurrentSendServer is a TestServiceServer whose
3447 // StreamingOutputCall makes ten serial Send calls, sending payloads
3448 // "0".."9", inclusive. TestServerStreamingConcurrent verifies they
3449 // were received in the correct order, and that there were no races.
3451 // All other TestServiceServer methods crash if called.
3452 type concurrentSendServer struct {
3453 testpb.TestServiceServer
3456 func (s concurrentSendServer) StreamingOutputCall(args *testpb.StreamingOutputCallRequest, stream testpb.TestService_StreamingOutputCallServer) error {
3457 for i := 0; i < 10; i++ {
3458 stream.Send(&testpb.StreamingOutputCallResponse{
3459 Payload: &testpb.Payload{
3460 Body: []byte{'0' + uint8(i)},
3467 // Tests doing a bunch of concurrent streaming output calls.
3468 func TestServerStreamingConcurrent(t *testing.T) {
3469 defer leakcheck.Check(t)
3470 for _, e := range listTestEnv() {
3471 testServerStreamingConcurrent(t, e)
3475 func testServerStreamingConcurrent(t *testing.T, e env) {
3477 te.startServer(concurrentSendServer{})
3480 cc := te.clientConn()
3481 tc := testpb.NewTestServiceClient(cc)
3483 doStreamingCall := func() {
3484 req := &testpb.StreamingOutputCallRequest{}
3485 stream, err := tc.StreamingOutputCall(context.Background(), req)
3487 t.Errorf("%v.StreamingOutputCall(_) = _, %v, want <nil>", tc, err)
3491 var buf bytes.Buffer
3493 reply, err := stream.Recv()
3504 buf.Write(reply.GetPayload().GetBody())
3506 if want := 10; ngot != want {
3507 t.Errorf("Got %d replies, want %d", ngot, want)
3509 if got, want := buf.String(), "0,1,2,3,4,5,6,7,8,9"; got != want {
3510 t.Errorf("Got replies %q; want %q", got, want)
3514 var wg sync.WaitGroup
3515 for i := 0; i < 20; i++ {
3526 func generatePayloadSizes() [][]int {
3527 reqSizes := [][]int{
3528 {27182, 8, 1828, 45904},
3531 num8KPayloads := 1024
3532 eightKPayloads := []int{}
3533 for i := 0; i < num8KPayloads; i++ {
3534 eightKPayloads = append(eightKPayloads, (1 << 13))
3536 reqSizes = append(reqSizes, eightKPayloads)
3539 twoMPayloads := []int{}
3540 for i := 0; i < num2MPayloads; i++ {
3541 twoMPayloads = append(twoMPayloads, (1 << 21))
3543 reqSizes = append(reqSizes, twoMPayloads)
3548 func TestClientStreaming(t *testing.T) {
3549 defer leakcheck.Check(t)
3550 for _, s := range generatePayloadSizes() {
3551 for _, e := range listTestEnv() {
3552 testClientStreaming(t, e, s)
3557 func testClientStreaming(t *testing.T, e env, sizes []int) {
3559 te.startServer(&testServer{security: e.security})
3561 tc := testpb.NewTestServiceClient(te.clientConn())
3563 ctx, cancel := context.WithTimeout(te.ctx, time.Second*30)
3565 stream, err := tc.StreamingInputCall(ctx)
3567 t.Fatalf("%v.StreamingInputCall(_) = _, %v, want <nil>", tc, err)
3571 for _, s := range sizes {
3572 payload, err := newPayload(testpb.PayloadType_COMPRESSABLE, int32(s))
3577 req := &testpb.StreamingInputCallRequest{
3580 if err := stream.Send(req); err != nil {
3581 t.Fatalf("%v.Send(%v) = %v, want <nil>", stream, req, err)
3585 reply, err := stream.CloseAndRecv()
3587 t.Fatalf("%v.CloseAndRecv() got error %v, want %v", stream, err, nil)
3589 if reply.GetAggregatedPayloadSize() != int32(sum) {
3590 t.Fatalf("%v.CloseAndRecv().GetAggregatePayloadSize() = %v; want %v", stream, reply.GetAggregatedPayloadSize(), sum)
3594 func TestClientStreamingError(t *testing.T) {
3595 defer leakcheck.Check(t)
3596 for _, e := range listTestEnv() {
3597 if e.name == "handler-tls" {
3600 testClientStreamingError(t, e)
3604 func testClientStreamingError(t *testing.T, e env) {
3606 te.startServer(&testServer{security: e.security, earlyFail: true})
3608 tc := testpb.NewTestServiceClient(te.clientConn())
3610 stream, err := tc.StreamingInputCall(te.ctx)
3612 t.Fatalf("%v.StreamingInputCall(_) = _, %v, want <nil>", tc, err)
3614 payload, err := newPayload(testpb.PayloadType_COMPRESSABLE, 1)
3619 req := &testpb.StreamingInputCallRequest{
3622 // The 1st request should go through.
3623 if err := stream.Send(req); err != nil {
3624 t.Fatalf("%v.Send(%v) = %v, want <nil>", stream, req, err)
3627 if err := stream.Send(req); err != io.EOF {
3630 if _, err := stream.CloseAndRecv(); grpc.Code(err) != codes.NotFound {
3631 t.Fatalf("%v.CloseAndRecv() = %v, want error %s", stream, err, codes.NotFound)
3637 func TestExceedMaxStreamsLimit(t *testing.T) {
3638 defer leakcheck.Check(t)
3639 for _, e := range listTestEnv() {
3640 testExceedMaxStreamsLimit(t, e)
3644 func testExceedMaxStreamsLimit(t *testing.T, e env) {
3647 "http2Client.notifyError got notified that the client transport was broken",
3648 "Conn.resetTransport failed to create client transport",
3649 "grpc: the connection is closing",
3651 te.maxStream = 1 // Only allows 1 live stream per server transport.
3652 te.startServer(&testServer{security: e.security})
3655 cc := te.clientConn()
3656 tc := testpb.NewTestServiceClient(cc)
3658 _, err := tc.StreamingInputCall(te.ctx)
3660 t.Fatalf("%v.StreamingInputCall(_) = _, %v, want _, <nil>", tc, err)
3662 // Loop until receiving the new max stream setting from the server.
3664 ctx, cancel := context.WithTimeout(context.Background(), 50*time.Millisecond)
3666 _, err := tc.StreamingInputCall(ctx)
3668 time.Sleep(50 * time.Millisecond)
3671 if grpc.Code(err) == codes.DeadlineExceeded {
3674 t.Fatalf("%v.StreamingInputCall(_) = _, %v, want _, %s", tc, err, codes.DeadlineExceeded)
3678 func TestStreamsQuotaRecovery(t *testing.T) {
3679 defer leakcheck.Check(t)
3680 for _, e := range listTestEnv() {
3681 testStreamsQuotaRecovery(t, e)
3685 func testStreamsQuotaRecovery(t *testing.T, e env) {
3688 "http2Client.notifyError got notified that the client transport was broken",
3689 "Conn.resetTransport failed to create client transport",
3690 "grpc: the connection is closing",
3692 te.maxStream = 1 // Allows 1 live stream.
3693 te.startServer(&testServer{security: e.security})
3696 cc := te.clientConn()
3697 tc := testpb.NewTestServiceClient(cc)
3698 if _, err := tc.StreamingInputCall(context.Background()); err != nil {
3699 t.Fatalf("%v.StreamingInputCall(_) = _, %v, want _, <nil>", tc, err)
3701 // Loop until the new max stream setting is effective.
3703 ctx, cancel := context.WithTimeout(context.Background(), 50*time.Millisecond)
3705 _, err := tc.StreamingInputCall(ctx)
3707 time.Sleep(50 * time.Millisecond)
3710 if grpc.Code(err) == codes.DeadlineExceeded {
3713 t.Fatalf("%v.StreamingInputCall(_) = _, %v, want _, %s", tc, err, codes.DeadlineExceeded)
3716 var wg sync.WaitGroup
3717 for i := 0; i < 10; i++ {
3721 payload, err := newPayload(testpb.PayloadType_COMPRESSABLE, 314)
3726 req := &testpb.SimpleRequest{
3727 ResponseType: testpb.PayloadType_COMPRESSABLE,
3731 // No rpc should go through due to the max streams limit.
3732 ctx, cancel := context.WithTimeout(context.Background(), 10*time.Millisecond)
3734 if _, err := tc.UnaryCall(ctx, req, grpc.FailFast(false)); grpc.Code(err) != codes.DeadlineExceeded {
3735 t.Errorf("TestService/UnaryCall(_, _) = _, %v, want _, %s", err, codes.DeadlineExceeded)
3742 func TestCompressServerHasNoSupport(t *testing.T) {
3743 defer leakcheck.Check(t)
3744 for _, e := range listTestEnv() {
3745 testCompressServerHasNoSupport(t, e)
3749 func testCompressServerHasNoSupport(t *testing.T, e env) {
3751 te.serverCompression = false
3752 te.clientCompression = true
3753 te.startServer(&testServer{security: e.security})
3755 tc := testpb.NewTestServiceClient(te.clientConn())
3757 const argSize = 271828
3758 const respSize = 314159
3759 payload, err := newPayload(testpb.PayloadType_COMPRESSABLE, argSize)
3763 req := &testpb.SimpleRequest{
3764 ResponseType: testpb.PayloadType_COMPRESSABLE,
3765 ResponseSize: respSize,
3768 if _, err := tc.UnaryCall(context.Background(), req); err == nil || grpc.Code(err) != codes.Unimplemented {
3769 t.Fatalf("TestService/UnaryCall(_, _) = _, %v, want _, error code %s", err, codes.Unimplemented)
3772 stream, err := tc.FullDuplexCall(context.Background())
3774 t.Fatalf("%v.FullDuplexCall(_) = _, %v, want <nil>", tc, err)
3776 respParam := []*testpb.ResponseParameters{
3781 payload, err = newPayload(testpb.PayloadType_COMPRESSABLE, int32(31415))
3785 sreq := &testpb.StreamingOutputCallRequest{
3786 ResponseType: testpb.PayloadType_COMPRESSABLE,
3787 ResponseParameters: respParam,
3790 if err := stream.Send(sreq); err != nil {
3791 t.Fatalf("%v.Send(%v) = %v, want <nil>", stream, sreq, err)
3793 if _, err := stream.Recv(); err == nil || grpc.Code(err) != codes.Unimplemented {
3794 t.Fatalf("%v.Recv() = %v, want error code %s", stream, err, codes.Unimplemented)
3798 func TestCompressOK(t *testing.T) {
3799 defer leakcheck.Check(t)
3800 for _, e := range listTestEnv() {
3801 testCompressOK(t, e)
3805 func testCompressOK(t *testing.T, e env) {
3807 te.serverCompression = true
3808 te.clientCompression = true
3809 te.startServer(&testServer{security: e.security})
3811 tc := testpb.NewTestServiceClient(te.clientConn())
3814 const argSize = 271828
3815 const respSize = 314159
3816 payload, err := newPayload(testpb.PayloadType_COMPRESSABLE, argSize)
3820 req := &testpb.SimpleRequest{
3821 ResponseType: testpb.PayloadType_COMPRESSABLE,
3822 ResponseSize: respSize,
3825 ctx := metadata.NewOutgoingContext(context.Background(), metadata.Pairs("something", "something"))
3826 if _, err := tc.UnaryCall(ctx, req); err != nil {
3827 t.Fatalf("TestService/UnaryCall(_, _) = _, %v, want _, <nil>", err)
3830 ctx, cancel := context.WithCancel(context.Background())
3832 stream, err := tc.FullDuplexCall(ctx)
3834 t.Fatalf("%v.FullDuplexCall(_) = _, %v, want <nil>", tc, err)
3836 respParam := []*testpb.ResponseParameters{
3841 payload, err = newPayload(testpb.PayloadType_COMPRESSABLE, int32(31415))
3845 sreq := &testpb.StreamingOutputCallRequest{
3846 ResponseType: testpb.PayloadType_COMPRESSABLE,
3847 ResponseParameters: respParam,
3850 if err := stream.Send(sreq); err != nil {
3851 t.Fatalf("%v.Send(%v) = %v, want <nil>", stream, sreq, err)
3853 if _, err := stream.Recv(); err != nil {
3854 t.Fatalf("%v.Recv() = %v, want <nil>", stream, err)
3858 func TestUnaryClientInterceptor(t *testing.T) {
3859 defer leakcheck.Check(t)
3860 for _, e := range listTestEnv() {
3861 testUnaryClientInterceptor(t, e)
3865 func failOkayRPC(ctx context.Context, method string, req, reply interface{}, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error {
3866 err := invoker(ctx, method, req, reply, cc, opts...)
3868 return grpc.Errorf(codes.NotFound, "")
3873 func testUnaryClientInterceptor(t *testing.T, e env) {
3875 te.userAgent = testAppUA
3876 te.unaryClientInt = failOkayRPC
3877 te.startServer(&testServer{security: e.security})
3880 tc := testpb.NewTestServiceClient(te.clientConn())
3881 if _, err := tc.EmptyCall(context.Background(), &testpb.Empty{}); grpc.Code(err) != codes.NotFound {
3882 t.Fatalf("%v.EmptyCall(_, _) = _, %v, want _, error code %s", tc, err, codes.NotFound)
3886 func TestStreamClientInterceptor(t *testing.T) {
3887 defer leakcheck.Check(t)
3888 for _, e := range listTestEnv() {
3889 testStreamClientInterceptor(t, e)
3893 func failOkayStream(ctx context.Context, desc *grpc.StreamDesc, cc *grpc.ClientConn, method string, streamer grpc.Streamer, opts ...grpc.CallOption) (grpc.ClientStream, error) {
3894 s, err := streamer(ctx, desc, cc, method, opts...)
3896 return nil, grpc.Errorf(codes.NotFound, "")
3901 func testStreamClientInterceptor(t *testing.T, e env) {
3903 te.streamClientInt = failOkayStream
3904 te.startServer(&testServer{security: e.security})
3907 tc := testpb.NewTestServiceClient(te.clientConn())
3908 respParam := []*testpb.ResponseParameters{
3913 payload, err := newPayload(testpb.PayloadType_COMPRESSABLE, int32(1))
3917 req := &testpb.StreamingOutputCallRequest{
3918 ResponseType: testpb.PayloadType_COMPRESSABLE,
3919 ResponseParameters: respParam,
3922 if _, err := tc.StreamingOutputCall(context.Background(), req); grpc.Code(err) != codes.NotFound {
3923 t.Fatalf("%v.StreamingOutputCall(_) = _, %v, want _, error code %s", tc, err, codes.NotFound)
3927 func TestUnaryServerInterceptor(t *testing.T) {
3928 defer leakcheck.Check(t)
3929 for _, e := range listTestEnv() {
3930 testUnaryServerInterceptor(t, e)
3934 func errInjector(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error) {
3935 return nil, grpc.Errorf(codes.PermissionDenied, "")
3938 func testUnaryServerInterceptor(t *testing.T, e env) {
3940 te.unaryServerInt = errInjector
3941 te.startServer(&testServer{security: e.security})
3944 tc := testpb.NewTestServiceClient(te.clientConn())
3945 if _, err := tc.EmptyCall(context.Background(), &testpb.Empty{}); grpc.Code(err) != codes.PermissionDenied {
3946 t.Fatalf("%v.EmptyCall(_, _) = _, %v, want _, error code %s", tc, err, codes.PermissionDenied)
3950 func TestStreamServerInterceptor(t *testing.T) {
3951 defer leakcheck.Check(t)
3952 for _, e := range listTestEnv() {
3953 // TODO(bradfitz): Temporarily skip this env due to #619.
3954 if e.name == "handler-tls" {
3957 testStreamServerInterceptor(t, e)
3961 func fullDuplexOnly(srv interface{}, ss grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) error {
3962 if info.FullMethod == "/grpc.testing.TestService/FullDuplexCall" {
3963 return handler(srv, ss)
3965 // Reject the other methods.
3966 return grpc.Errorf(codes.PermissionDenied, "")
3969 func testStreamServerInterceptor(t *testing.T, e env) {
3971 te.streamServerInt = fullDuplexOnly
3972 te.startServer(&testServer{security: e.security})
3975 tc := testpb.NewTestServiceClient(te.clientConn())
3976 respParam := []*testpb.ResponseParameters{
3981 payload, err := newPayload(testpb.PayloadType_COMPRESSABLE, int32(1))
3985 req := &testpb.StreamingOutputCallRequest{
3986 ResponseType: testpb.PayloadType_COMPRESSABLE,
3987 ResponseParameters: respParam,
3990 s1, err := tc.StreamingOutputCall(context.Background(), req)
3992 t.Fatalf("%v.StreamingOutputCall(_) = _, %v, want _, <nil>", tc, err)
3994 if _, err := s1.Recv(); grpc.Code(err) != codes.PermissionDenied {
3995 t.Fatalf("%v.StreamingInputCall(_) = _, %v, want _, error code %s", tc, err, codes.PermissionDenied)
3997 s2, err := tc.FullDuplexCall(context.Background())
3999 t.Fatalf("%v.FullDuplexCall(_) = _, %v, want <nil>", tc, err)
4001 if err := s2.Send(req); err != nil {
4002 t.Fatalf("%v.Send(_) = %v, want <nil>", s2, err)
4004 if _, err := s2.Recv(); err != nil {
4005 t.Fatalf("%v.Recv() = _, %v, want _, <nil>", s2, err)
4009 // funcServer implements methods of TestServiceServer using funcs,
4010 // similar to an http.HandlerFunc.
4011 // Any unimplemented method will crash. Tests implement the method(s)
4013 type funcServer struct {
4014 testpb.TestServiceServer
4015 unaryCall func(ctx context.Context, in *testpb.SimpleRequest) (*testpb.SimpleResponse, error)
4016 streamingInputCall func(stream testpb.TestService_StreamingInputCallServer) error
4019 func (s *funcServer) UnaryCall(ctx context.Context, in *testpb.SimpleRequest) (*testpb.SimpleResponse, error) {
4020 return s.unaryCall(ctx, in)
4023 func (s *funcServer) StreamingInputCall(stream testpb.TestService_StreamingInputCallServer) error {
4024 return s.streamingInputCall(stream)
4027 func TestClientRequestBodyErrorUnexpectedEOF(t *testing.T) {
4028 defer leakcheck.Check(t)
4029 for _, e := range listTestEnv() {
4030 testClientRequestBodyErrorUnexpectedEOF(t, e)
4034 func testClientRequestBodyErrorUnexpectedEOF(t *testing.T, e env) {
4036 ts := &funcServer{unaryCall: func(ctx context.Context, in *testpb.SimpleRequest) (*testpb.SimpleResponse, error) {
4037 errUnexpectedCall := errors.New("unexpected call func server method")
4038 t.Error(errUnexpectedCall)
4039 return nil, errUnexpectedCall
4043 te.withServerTester(func(st *serverTester) {
4044 st.writeHeadersGRPC(1, "/grpc.testing.TestService/UnaryCall")
4045 // Say we have 5 bytes coming, but set END_STREAM flag:
4046 st.writeData(1, true, []byte{0, 0, 0, 0, 5})
4047 st.wantAnyFrame() // wait for server to crash (it used to crash)
4051 func TestClientRequestBodyErrorCloseAfterLength(t *testing.T) {
4052 defer leakcheck.Check(t)
4053 for _, e := range listTestEnv() {
4054 testClientRequestBodyErrorCloseAfterLength(t, e)
4058 func testClientRequestBodyErrorCloseAfterLength(t *testing.T, e env) {
4060 te.declareLogNoise("Server.processUnaryRPC failed to write status")
4061 ts := &funcServer{unaryCall: func(ctx context.Context, in *testpb.SimpleRequest) (*testpb.SimpleResponse, error) {
4062 errUnexpectedCall := errors.New("unexpected call func server method")
4063 t.Error(errUnexpectedCall)
4064 return nil, errUnexpectedCall
4068 te.withServerTester(func(st *serverTester) {
4069 st.writeHeadersGRPC(1, "/grpc.testing.TestService/UnaryCall")
4070 // say we're sending 5 bytes, but then close the connection instead.
4071 st.writeData(1, false, []byte{0, 0, 0, 0, 5})
4076 func TestClientRequestBodyErrorCancel(t *testing.T) {
4077 defer leakcheck.Check(t)
4078 for _, e := range listTestEnv() {
4079 testClientRequestBodyErrorCancel(t, e)
4083 func testClientRequestBodyErrorCancel(t *testing.T, e env) {
4085 gotCall := make(chan bool, 1)
4086 ts := &funcServer{unaryCall: func(ctx context.Context, in *testpb.SimpleRequest) (*testpb.SimpleResponse, error) {
4088 return new(testpb.SimpleResponse), nil
4092 te.withServerTester(func(st *serverTester) {
4093 st.writeHeadersGRPC(1, "/grpc.testing.TestService/UnaryCall")
4094 // Say we have 5 bytes coming, but cancel it instead.
4095 st.writeRSTStream(1, http2.ErrCodeCancel)
4096 st.writeData(1, false, []byte{0, 0, 0, 0, 5})
4098 // Verify we didn't a call yet.
4101 t.Fatal("unexpected call")
4105 // And now send an uncanceled (but still invalid), just to get a response.
4106 st.writeHeadersGRPC(3, "/grpc.testing.TestService/UnaryCall")
4107 st.writeData(3, true, []byte{0, 0, 0, 0, 0})
4113 func TestClientRequestBodyErrorCancelStreamingInput(t *testing.T) {
4114 defer leakcheck.Check(t)
4115 for _, e := range listTestEnv() {
4116 testClientRequestBodyErrorCancelStreamingInput(t, e)
4120 func testClientRequestBodyErrorCancelStreamingInput(t *testing.T, e env) {
4122 recvErr := make(chan error, 1)
4123 ts := &funcServer{streamingInputCall: func(stream testpb.TestService_StreamingInputCallServer) error {
4124 _, err := stream.Recv()
4130 te.withServerTester(func(st *serverTester) {
4131 st.writeHeadersGRPC(1, "/grpc.testing.TestService/StreamingInputCall")
4132 // Say we have 5 bytes coming, but cancel it instead.
4133 st.writeData(1, false, []byte{0, 0, 0, 0, 5})
4134 st.writeRSTStream(1, http2.ErrCodeCancel)
4138 case got = <-recvErr:
4139 case <-time.After(3 * time.Second):
4140 t.Fatal("timeout waiting for error")
4142 if grpc.Code(got) != codes.Canceled {
4143 t.Errorf("error = %#v; want error code %s", got, codes.Canceled)
4148 type clientTimeoutCreds struct {
4149 timeoutReturned bool
4152 func (c *clientTimeoutCreds) ClientHandshake(ctx context.Context, addr string, rawConn net.Conn) (net.Conn, credentials.AuthInfo, error) {
4153 if !c.timeoutReturned {
4154 c.timeoutReturned = true
4155 return nil, nil, context.DeadlineExceeded
4157 return rawConn, nil, nil
4159 func (c *clientTimeoutCreds) ServerHandshake(rawConn net.Conn) (net.Conn, credentials.AuthInfo, error) {
4160 return rawConn, nil, nil
4162 func (c *clientTimeoutCreds) Info() credentials.ProtocolInfo {
4163 return credentials.ProtocolInfo{}
4165 func (c *clientTimeoutCreds) Clone() credentials.TransportCredentials {
4168 func (c *clientTimeoutCreds) OverrideServerName(s string) error {
4172 func TestNonFailFastRPCSucceedOnTimeoutCreds(t *testing.T) {
4173 te := newTest(t, env{name: "timeout-cred", network: "tcp", security: "clientTimeoutCreds", balancer: "v1"})
4174 te.userAgent = testAppUA
4175 te.startServer(&testServer{security: te.e.security})
4178 cc := te.clientConn()
4179 tc := testpb.NewTestServiceClient(cc)
4180 // This unary call should succeed, because ClientHandshake will succeed for the second time.
4181 if _, err := tc.EmptyCall(context.Background(), &testpb.Empty{}, grpc.FailFast(false)); err != nil {
4182 te.t.Fatalf("TestService/EmptyCall(_, _) = _, %v, want <nil>", err)
4186 type serverDispatchCred struct {
4187 rawConnCh chan net.Conn
4190 func newServerDispatchCred() *serverDispatchCred {
4191 return &serverDispatchCred{
4192 rawConnCh: make(chan net.Conn, 1),
4195 func (c *serverDispatchCred) ClientHandshake(ctx context.Context, addr string, rawConn net.Conn) (net.Conn, credentials.AuthInfo, error) {
4196 return rawConn, nil, nil
4198 func (c *serverDispatchCred) ServerHandshake(rawConn net.Conn) (net.Conn, credentials.AuthInfo, error) {
4200 case c.rawConnCh <- rawConn:
4203 return nil, nil, credentials.ErrConnDispatched
4205 func (c *serverDispatchCred) Info() credentials.ProtocolInfo {
4206 return credentials.ProtocolInfo{}
4208 func (c *serverDispatchCred) Clone() credentials.TransportCredentials {
4211 func (c *serverDispatchCred) OverrideServerName(s string) error {
4214 func (c *serverDispatchCred) getRawConn() net.Conn {
4215 return <-c.rawConnCh
4218 func TestServerCredsDispatch(t *testing.T) {
4219 lis, err := net.Listen("tcp", "localhost:0")
4221 t.Fatalf("Failed to listen: %v", err)
4223 cred := newServerDispatchCred()
4224 s := grpc.NewServer(grpc.Creds(cred))
4228 cc, err := grpc.Dial(lis.Addr().String(), grpc.WithTransportCredentials(cred))
4230 t.Fatalf("grpc.Dial(%q) = %v", lis.Addr().String(), err)
4234 rawConn := cred.getRawConn()
4235 // Give grpc a chance to see the error and potentially close the connection.
4236 // And check that connection is not closed after that.
4237 time.Sleep(100 * time.Millisecond)
4238 // Check rawConn is not closed.
4239 if n, err := rawConn.Write([]byte{0}); n <= 0 || err != nil {
4240 t.Errorf("Read() = %v, %v; want n>0, <nil>", n, err)
4244 type authorityCheckCreds struct {
4248 func (c *authorityCheckCreds) ServerHandshake(rawConn net.Conn) (net.Conn, credentials.AuthInfo, error) {
4249 return rawConn, nil, nil
4251 func (c *authorityCheckCreds) ClientHandshake(ctx context.Context, authority string, rawConn net.Conn) (net.Conn, credentials.AuthInfo, error) {
4253 return rawConn, nil, nil
4255 func (c *authorityCheckCreds) Info() credentials.ProtocolInfo {
4256 return credentials.ProtocolInfo{}
4258 func (c *authorityCheckCreds) Clone() credentials.TransportCredentials {
4261 func (c *authorityCheckCreds) OverrideServerName(s string) error {
4265 // This test makes sure that the authority client handshake gets is the endpoint
4266 // in dial target, not the resolved ip address.
4267 func TestCredsHandshakeAuthority(t *testing.T) {
4268 const testAuthority = "test.auth.ori.ty"
4270 lis, err := net.Listen("tcp", "localhost:0")
4272 t.Fatalf("Failed to listen: %v", err)
4274 cred := &authorityCheckCreds{}
4275 s := grpc.NewServer()
4279 r, rcleanup := manual.GenerateAndRegisterManualResolver()
4282 cc, err := grpc.Dial(r.Scheme()+":///"+testAuthority, grpc.WithTransportCredentials(cred))
4284 t.Fatalf("grpc.Dial(%q) = %v", lis.Addr().String(), err)
4287 r.NewAddress([]resolver.Address{{Addr: lis.Addr().String()}})
4289 ctx, cancel := context.WithTimeout(context.Background(), 100*time.Millisecond)
4293 if s == connectivity.Ready {
4296 if !cc.WaitForStateChange(ctx, s) {
4297 // ctx got timeout or canceled.
4298 t.Fatalf("ClientConn is not ready after 100 ms")
4302 if cred.got != testAuthority {
4303 t.Fatalf("client creds got authority: %q, want: %q", cred.got, testAuthority)
4307 func TestFlowControlLogicalRace(t *testing.T) {
4308 // Test for a regression of https://github.com/grpc/grpc-go/issues/632,
4309 // and other flow control bugs.
4311 defer leakcheck.Check(t)
4319 requestTimeout = time.Second * 5
4322 requestCount := 10000
4327 lis, err := net.Listen("tcp", "localhost:0")
4329 t.Fatalf("Failed to listen: %v", err)
4333 s := grpc.NewServer()
4334 testpb.RegisterTestServiceServer(s, &flowControlLogicalRaceServer{
4335 itemCount: itemCount,
4342 ctx := context.Background()
4344 cc, err := grpc.Dial(lis.Addr().String(), grpc.WithInsecure(), grpc.WithBlock())
4346 t.Fatalf("grpc.Dial(%q) = %v", lis.Addr().String(), err)
4349 cl := testpb.NewTestServiceClient(cc)
4352 for i := 0; i < requestCount; i++ {
4353 ctx, cancel := context.WithTimeout(ctx, requestTimeout)
4354 output, err := cl.StreamingOutputCall(ctx, &testpb.StreamingOutputCallRequest{})
4356 t.Fatalf("StreamingOutputCall; err = %q", err)
4361 for ; j < recvCount; j++ {
4362 _, err := output.Recv()
4367 switch grpc.Code(err) {
4368 case codes.DeadlineExceeded:
4371 t.Fatalf("Recv; err = %q", err)
4379 t.Errorf("got %d responses to request %d", j, i)
4381 if failures >= maxFailures {
4382 // Continue past the first failure to see if the connection is
4383 // entirely broken, or if only a single RPC was affected
4390 type flowControlLogicalRaceServer struct {
4391 testpb.TestServiceServer
4397 func (s *flowControlLogicalRaceServer) StreamingOutputCall(req *testpb.StreamingOutputCallRequest, srv testpb.TestService_StreamingOutputCallServer) error {
4398 for i := 0; i < s.itemCount; i++ {
4399 err := srv.Send(&testpb.StreamingOutputCallResponse{
4400 Payload: &testpb.Payload{
4401 // Sending a large stream of data which the client reject
4402 // helps to trigger some types of flow control bugs.
4404 // Reallocating memory here is inefficient, but the stress it
4405 // puts on the GC leads to more frequent flow control
4406 // failures. The GC likely causes more variety in the
4407 // goroutine scheduling orders.
4408 Body: bytes.Repeat([]byte("a"), s.itemSize),
4418 type lockingWriter struct {
4423 func (lw *lockingWriter) Write(p []byte) (n int, err error) {
4425 defer lw.mu.Unlock()
4426 return lw.w.Write(p)
4429 func (lw *lockingWriter) setWriter(w io.Writer) {
4431 defer lw.mu.Unlock()
4435 var testLogOutput = &lockingWriter{w: os.Stderr}
4437 // awaitNewConnLogOutput waits for any of grpc.NewConn's goroutines to
4438 // terminate, if they're still running. It spams logs with this
4439 // message. We wait for it so our log filter is still
4440 // active. Otherwise the "defer restore()" at the top of various test
4441 // functions restores our log filter and then the goroutine spams.
4442 func awaitNewConnLogOutput() {
4443 awaitLogOutput(50*time.Millisecond, "grpc: the client connection is closing; please retry")
4446 func awaitLogOutput(maxWait time.Duration, phrase string) {
4447 pb := []byte(phrase)
4449 timer := time.NewTimer(maxWait)
4451 wakeup := make(chan bool, 1)
4453 if logOutputHasContents(pb, wakeup) {
4458 // Too slow. Oh well.
4465 func logOutputHasContents(v []byte, wakeup chan<- bool) bool {
4466 testLogOutput.mu.Lock()
4467 defer testLogOutput.mu.Unlock()
4468 fw, ok := testLogOutput.w.(*filterWriter)
4473 defer fw.mu.Unlock()
4474 if bytes.Contains(fw.buf.Bytes(), v) {
4481 var verboseLogs = flag.Bool("verbose_logs", false, "show all grpclog output, without filtering")
4485 // declareLogNoise declares that t is expected to emit the following noisy phrases,
4486 // even on success. Those phrases will be filtered from grpclog output
4487 // and only be shown if *verbose_logs or t ends up failing.
4488 // The returned restore function should be called with defer to be run
4489 // before the test ends.
4490 func declareLogNoise(t *testing.T, phrases ...string) (restore func()) {
4494 fw := &filterWriter{dst: os.Stderr, filter: phrases}
4495 testLogOutput.setWriter(fw)
4499 defer fw.mu.Unlock()
4500 if fw.buf.Len() > 0 {
4501 t.Logf("Complete log output:\n%s", fw.buf.Bytes())
4504 testLogOutput.setWriter(os.Stderr)
4508 type filterWriter struct {
4514 wakeup chan<- bool // if non-nil, gets true on write
4517 func (fw *filterWriter) Write(p []byte) (n int, err error) {
4520 if fw.wakeup != nil {
4522 case fw.wakeup <- true:
4529 for _, f := range fw.filter {
4530 if strings.Contains(ps, f) {
4534 return fw.dst.Write(p)
4537 // stubServer is a server that is easy to customize within individual test
4539 type stubServer struct {
4540 // Guarantees we satisfy this interface; panics if unimplemented methods are called.
4541 testpb.TestServiceServer
4543 // Customizable implementations of server handlers.
4544 emptyCall func(ctx context.Context, in *testpb.Empty) (*testpb.Empty, error)
4545 fullDuplexCall func(stream testpb.TestService_FullDuplexCallServer) error
4547 // A client connected to this service the test may use. Created in Start().
4548 client testpb.TestServiceClient
4550 cleanups []func() // Lambdas executed in Stop(); populated by Start().
4553 func (ss *stubServer) EmptyCall(ctx context.Context, in *testpb.Empty) (*testpb.Empty, error) {
4554 return ss.emptyCall(ctx, in)
4557 func (ss *stubServer) FullDuplexCall(stream testpb.TestService_FullDuplexCallServer) error {
4558 return ss.fullDuplexCall(stream)
4561 // Start starts the server and creates a client connected to it.
4562 func (ss *stubServer) Start(sopts []grpc.ServerOption) error {
4563 lis, err := net.Listen("tcp", "localhost:0")
4565 return fmt.Errorf(`net.Listen("tcp", "localhost:0") = %v`, err)
4567 ss.cleanups = append(ss.cleanups, func() { lis.Close() })
4569 s := grpc.NewServer(sopts...)
4570 testpb.RegisterTestServiceServer(s, ss)
4572 ss.cleanups = append(ss.cleanups, s.Stop)
4574 cc, err := grpc.Dial(lis.Addr().String(), grpc.WithInsecure(), grpc.WithBlock())
4576 return fmt.Errorf("grpc.Dial(%q) = %v", lis.Addr().String(), err)
4578 ss.cleanups = append(ss.cleanups, func() { cc.Close() })
4580 ss.client = testpb.NewTestServiceClient(cc)
4584 func (ss *stubServer) Stop() {
4585 for i := len(ss.cleanups) - 1; i >= 0; i-- {
4590 func TestUnaryProxyDoesNotForwardMetadata(t *testing.T) {
4591 const mdkey = "somedata"
4593 // endpoint ensures mdkey is NOT in metadata and returns an error if it is.
4594 endpoint := &stubServer{
4595 emptyCall: func(ctx context.Context, in *testpb.Empty) (*testpb.Empty, error) {
4596 if md, ok := metadata.FromIncomingContext(ctx); !ok || md[mdkey] != nil {
4597 return nil, status.Errorf(codes.Internal, "endpoint: md=%v; want !contains(%q)", md, mdkey)
4599 return &testpb.Empty{}, nil
4602 if err := endpoint.Start(nil); err != nil {
4603 t.Fatalf("Error starting endpoint server: %v", err)
4605 defer endpoint.Stop()
4607 // proxy ensures mdkey IS in metadata, then forwards the RPC to endpoint
4608 // without explicitly copying the metadata.
4609 proxy := &stubServer{
4610 emptyCall: func(ctx context.Context, in *testpb.Empty) (*testpb.Empty, error) {
4611 if md, ok := metadata.FromIncomingContext(ctx); !ok || md[mdkey] == nil {
4612 return nil, status.Errorf(codes.Internal, "proxy: md=%v; want contains(%q)", md, mdkey)
4614 return endpoint.client.EmptyCall(ctx, in)
4617 if err := proxy.Start(nil); err != nil {
4618 t.Fatalf("Error starting proxy server: %v", err)
4622 ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
4624 md := metadata.Pairs(mdkey, "val")
4625 ctx = metadata.NewOutgoingContext(ctx, md)
4627 // Sanity check that endpoint properly errors when it sees mdkey.
4628 _, err := endpoint.client.EmptyCall(ctx, &testpb.Empty{})
4629 if s, ok := status.FromError(err); !ok || s.Code() != codes.Internal {
4630 t.Fatalf("endpoint.client.EmptyCall(_, _) = _, %v; want _, <status with Code()=Internal>", err)
4633 if _, err := proxy.client.EmptyCall(ctx, &testpb.Empty{}); err != nil {
4634 t.Fatal(err.Error())
4638 func TestStreamingProxyDoesNotForwardMetadata(t *testing.T) {
4639 const mdkey = "somedata"
4641 // doFDC performs a FullDuplexCall with client and returns the error from the
4642 // first stream.Recv call, or nil if that error is io.EOF. Calls t.Fatal if
4643 // the stream cannot be established.
4644 doFDC := func(ctx context.Context, client testpb.TestServiceClient) error {
4645 stream, err := client.FullDuplexCall(ctx)
4647 t.Fatalf("Unwanted error: %v", err)
4649 if _, err := stream.Recv(); err != io.EOF {
4655 // endpoint ensures mdkey is NOT in metadata and returns an error if it is.
4656 endpoint := &stubServer{
4657 fullDuplexCall: func(stream testpb.TestService_FullDuplexCallServer) error {
4658 ctx := stream.Context()
4659 if md, ok := metadata.FromIncomingContext(ctx); !ok || md[mdkey] != nil {
4660 return status.Errorf(codes.Internal, "endpoint: md=%v; want !contains(%q)", md, mdkey)
4665 if err := endpoint.Start(nil); err != nil {
4666 t.Fatalf("Error starting endpoint server: %v", err)
4668 defer endpoint.Stop()
4670 // proxy ensures mdkey IS in metadata, then forwards the RPC to endpoint
4671 // without explicitly copying the metadata.
4672 proxy := &stubServer{
4673 fullDuplexCall: func(stream testpb.TestService_FullDuplexCallServer) error {
4674 ctx := stream.Context()
4675 if md, ok := metadata.FromIncomingContext(ctx); !ok || md[mdkey] == nil {
4676 return status.Errorf(codes.Internal, "endpoint: md=%v; want !contains(%q)", md, mdkey)
4678 return doFDC(ctx, endpoint.client)
4681 if err := proxy.Start(nil); err != nil {
4682 t.Fatalf("Error starting proxy server: %v", err)
4686 ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
4688 md := metadata.Pairs(mdkey, "val")
4689 ctx = metadata.NewOutgoingContext(ctx, md)
4691 // Sanity check that endpoint properly errors when it sees mdkey in ctx.
4692 err := doFDC(ctx, endpoint.client)
4693 if s, ok := status.FromError(err); !ok || s.Code() != codes.Internal {
4694 t.Fatalf("stream.Recv() = _, %v; want _, <status with Code()=Internal>", err)
4697 if err := doFDC(ctx, proxy.client); err != nil {
4698 t.Fatalf("doFDC(_, proxy.client) = %v; want nil", err)
4702 func TestStatsTagsAndTrace(t *testing.T) {
4703 // Data added to context by client (typically in a stats handler).
4704 tags := []byte{1, 5, 2, 4, 3}
4705 trace := []byte{5, 2, 1, 3, 4}
4707 // endpoint ensures Tags() and Trace() in context match those that were added
4708 // by the client and returns an error if not.
4709 endpoint := &stubServer{
4710 emptyCall: func(ctx context.Context, in *testpb.Empty) (*testpb.Empty, error) {
4711 md, _ := metadata.FromIncomingContext(ctx)
4712 if tg := stats.Tags(ctx); !reflect.DeepEqual(tg, tags) {
4713 return nil, status.Errorf(codes.Internal, "stats.Tags(%v)=%v; want %v", ctx, tg, tags)
4715 if !reflect.DeepEqual(md["grpc-tags-bin"], []string{string(tags)}) {
4716 return nil, status.Errorf(codes.Internal, "md['grpc-tags-bin']=%v; want %v", md["grpc-tags-bin"], tags)
4718 if tr := stats.Trace(ctx); !reflect.DeepEqual(tr, trace) {
4719 return nil, status.Errorf(codes.Internal, "stats.Trace(%v)=%v; want %v", ctx, tr, trace)
4721 if !reflect.DeepEqual(md["grpc-trace-bin"], []string{string(trace)}) {
4722 return nil, status.Errorf(codes.Internal, "md['grpc-trace-bin']=%v; want %v", md["grpc-trace-bin"], trace)
4724 return &testpb.Empty{}, nil
4727 if err := endpoint.Start(nil); err != nil {
4728 t.Fatalf("Error starting endpoint server: %v", err)
4730 defer endpoint.Stop()
4732 ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
4735 testCases := []struct {
4739 {ctx: ctx, want: codes.Internal},
4740 {ctx: stats.SetTags(ctx, tags), want: codes.Internal},
4741 {ctx: stats.SetTrace(ctx, trace), want: codes.Internal},
4742 {ctx: stats.SetTags(stats.SetTrace(ctx, tags), tags), want: codes.Internal},
4743 {ctx: stats.SetTags(stats.SetTrace(ctx, trace), tags), want: codes.OK},
4746 for _, tc := range testCases {
4747 _, err := endpoint.client.EmptyCall(tc.ctx, &testpb.Empty{})
4748 if tc.want == codes.OK && err != nil {
4749 t.Fatalf("endpoint.client.EmptyCall(%v, _) = _, %v; want _, nil", tc.ctx, err)
4751 if s, ok := status.FromError(err); !ok || s.Code() != tc.want {
4752 t.Fatalf("endpoint.client.EmptyCall(%v, _) = _, %v; want _, <status with Code()=%v>", tc.ctx, err, tc.want)
4757 func TestTapTimeout(t *testing.T) {
4758 sopts := []grpc.ServerOption{
4759 grpc.InTapHandle(func(ctx context.Context, _ *tap.Info) (context.Context, error) {
4760 c, cancel := context.WithCancel(ctx)
4761 // Call cancel instead of setting a deadline so we can detect which error
4762 // occurred -- this cancellation (desired) or the client's deadline
4763 // expired (indicating this cancellation did not affect the RPC).
4764 time.AfterFunc(10*time.Millisecond, cancel)
4770 emptyCall: func(ctx context.Context, in *testpb.Empty) (*testpb.Empty, error) {
4772 return &testpb.Empty{}, nil
4775 if err := ss.Start(sopts); err != nil {
4776 t.Fatalf("Error starting endpoint server: %v", err)
4780 // This was known to be flaky; test several times.
4781 for i := 0; i < 10; i++ {
4782 // Set our own deadline in case the server hangs.
4783 ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
4784 res, err := ss.client.EmptyCall(ctx, &testpb.Empty{})
4786 if s, ok := status.FromError(err); !ok || s.Code() != codes.Canceled {
4787 t.Fatalf("ss.client.EmptyCall(context.Background(), _) = %v, %v; want nil, <status with Code()=Canceled>", res, err)
4792 type windowSizeConfig struct {
4799 func max(a, b int32) int32 {
4806 func TestConfigurableWindowSizeWithLargeWindow(t *testing.T) {
4807 defer leakcheck.Check(t)
4808 wc := windowSizeConfig{
4809 serverStream: 8 * 1024 * 1024,
4810 serverConn: 12 * 1024 * 1024,
4811 clientStream: 6 * 1024 * 1024,
4812 clientConn: 8 * 1024 * 1024,
4814 for _, e := range listTestEnv() {
4815 testConfigurableWindowSize(t, e, wc)
4819 func TestConfigurableWindowSizeWithSmallWindow(t *testing.T) {
4820 defer leakcheck.Check(t)
4821 wc := windowSizeConfig{
4827 for _, e := range listTestEnv() {
4828 testConfigurableWindowSize(t, e, wc)
4832 func testConfigurableWindowSize(t *testing.T, e env, wc windowSizeConfig) {
4834 te.serverInitialWindowSize = wc.serverStream
4835 te.serverInitialConnWindowSize = wc.serverConn
4836 te.clientInitialWindowSize = wc.clientStream
4837 te.clientInitialConnWindowSize = wc.clientConn
4839 te.startServer(&testServer{security: e.security})
4842 cc := te.clientConn()
4843 tc := testpb.NewTestServiceClient(cc)
4844 stream, err := tc.FullDuplexCall(context.Background())
4846 t.Fatalf("%v.FullDuplexCall(_) = _, %v, want <nil>", tc, err)
4849 // Set message size to exhaust largest of window sizes.
4850 messageSize := max(max(wc.serverStream, wc.serverConn), max(wc.clientStream, wc.clientConn)) / int32(numOfIter-1)
4851 messageSize = max(messageSize, 64*1024)
4852 payload, err := newPayload(testpb.PayloadType_COMPRESSABLE, messageSize)
4856 respParams := []*testpb.ResponseParameters{
4861 req := &testpb.StreamingOutputCallRequest{
4862 ResponseType: testpb.PayloadType_COMPRESSABLE,
4863 ResponseParameters: respParams,
4866 for i := 0; i < numOfIter; i++ {
4867 if err := stream.Send(req); err != nil {
4868 t.Fatalf("%v.Send(%v) = %v, want <nil>", stream, req, err)
4870 if _, err := stream.Recv(); err != nil {
4871 t.Fatalf("%v.Recv() = _, %v, want _, <nil>", stream, err)
4874 if err := stream.CloseSend(); err != nil {
4875 t.Fatalf("%v.CloseSend() = %v, want <nil>", stream, err)
4881 authdata = map[string]string{
4882 "test-key": "test-value",
4883 "test-key2-bin": string([]byte{1, 2, 3}),
4887 type testPerRPCCredentials struct{}
4889 func (cr testPerRPCCredentials) GetRequestMetadata(ctx context.Context, uri ...string) (map[string]string, error) {
4890 return authdata, nil
4893 func (cr testPerRPCCredentials) RequireTransportSecurity() bool {
4897 func authHandle(ctx context.Context, info *tap.Info) (context.Context, error) {
4898 md, ok := metadata.FromIncomingContext(ctx)
4900 return ctx, fmt.Errorf("didn't find metadata in context")
4902 for k, vwant := range authdata {
4905 return ctx, fmt.Errorf("didn't find authdata key %v in context", k)
4907 if vgot[0] != vwant {
4908 return ctx, fmt.Errorf("for key %v, got value %v, want %v", k, vgot, vwant)
4914 func TestPerRPCCredentialsViaDialOptions(t *testing.T) {
4915 defer leakcheck.Check(t)
4916 for _, e := range listTestEnv() {
4917 testPerRPCCredentialsViaDialOptions(t, e)
4921 func testPerRPCCredentialsViaDialOptions(t *testing.T, e env) {
4923 te.tapHandle = authHandle
4924 te.perRPCCreds = testPerRPCCredentials{}
4925 te.startServer(&testServer{security: e.security})
4928 cc := te.clientConn()
4929 tc := testpb.NewTestServiceClient(cc)
4930 if _, err := tc.EmptyCall(context.Background(), &testpb.Empty{}); err != nil {
4931 t.Fatalf("Test failed. Reason: %v", err)
4935 func TestPerRPCCredentialsViaCallOptions(t *testing.T) {
4936 defer leakcheck.Check(t)
4937 for _, e := range listTestEnv() {
4938 testPerRPCCredentialsViaCallOptions(t, e)
4942 func testPerRPCCredentialsViaCallOptions(t *testing.T, e env) {
4944 te.tapHandle = authHandle
4945 te.startServer(&testServer{security: e.security})
4948 cc := te.clientConn()
4949 tc := testpb.NewTestServiceClient(cc)
4950 if _, err := tc.EmptyCall(context.Background(), &testpb.Empty{}, grpc.PerRPCCredentials(testPerRPCCredentials{})); err != nil {
4951 t.Fatalf("Test failed. Reason: %v", err)
4955 func TestPerRPCCredentialsViaDialOptionsAndCallOptions(t *testing.T) {
4956 defer leakcheck.Check(t)
4957 for _, e := range listTestEnv() {
4958 testPerRPCCredentialsViaDialOptionsAndCallOptions(t, e)
4962 func testPerRPCCredentialsViaDialOptionsAndCallOptions(t *testing.T, e env) {
4964 te.perRPCCreds = testPerRPCCredentials{}
4965 // When credentials are provided via both dial options and call options,
4966 // we apply both sets.
4967 te.tapHandle = func(ctx context.Context, _ *tap.Info) (context.Context, error) {
4968 md, ok := metadata.FromIncomingContext(ctx)
4970 return ctx, fmt.Errorf("couldn't find metadata in context")
4972 for k, vwant := range authdata {
4975 return ctx, fmt.Errorf("couldn't find metadata for key %v", k)
4978 return ctx, fmt.Errorf("len of value for key %v was %v, want 2", k, len(vgot))
4980 if vgot[0] != vwant || vgot[1] != vwant {
4981 return ctx, fmt.Errorf("value for %v was %v, want [%v, %v]", k, vgot, vwant, vwant)
4986 te.startServer(&testServer{security: e.security})
4989 cc := te.clientConn()
4990 tc := testpb.NewTestServiceClient(cc)
4991 if _, err := tc.EmptyCall(context.Background(), &testpb.Empty{}, grpc.PerRPCCredentials(testPerRPCCredentials{})); err != nil {
4992 t.Fatalf("Test failed. Reason: %v", err)
4996 func TestWaitForReadyConnection(t *testing.T) {
4997 defer leakcheck.Check(t)
4998 for _, e := range listTestEnv() {
4999 testWaitForReadyConnection(t, e)
5004 func testWaitForReadyConnection(t *testing.T, e env) {
5006 te.userAgent = testAppUA
5007 te.startServer(&testServer{security: e.security})
5010 cc := te.clientConn() // Non-blocking dial.
5011 tc := testpb.NewTestServiceClient(cc)
5012 ctx, cancel := context.WithTimeout(context.Background(), time.Second)
5014 state := cc.GetState()
5015 // Wait for connection to be Ready.
5016 for ; state != connectivity.Ready && cc.WaitForStateChange(ctx, state); state = cc.GetState() {
5018 if state != connectivity.Ready {
5019 t.Fatalf("Want connection state to be Ready, got %v", state)
5021 ctx, cancel = context.WithTimeout(context.Background(), time.Second)
5023 // Make a fail-fast RPC.
5024 if _, err := tc.EmptyCall(ctx, &testpb.Empty{}); err != nil {
5025 t.Fatalf("TestService/EmptyCall(_,_) = _, %v, want _, nil", err)
5029 type errCodec struct {
5033 func (c *errCodec) Marshal(v interface{}) ([]byte, error) {
5035 return []byte{}, nil
5037 return nil, fmt.Errorf("3987^12 + 4365^12 = 4472^12")
5040 func (c *errCodec) Unmarshal(data []byte, v interface{}) error {
5044 func (c *errCodec) String() string {
5045 return "Fermat's near-miss."
5048 func TestEncodeDoesntPanic(t *testing.T) {
5049 defer leakcheck.Check(t)
5050 for _, e := range listTestEnv() {
5051 testEncodeDoesntPanic(t, e)
5055 func testEncodeDoesntPanic(t *testing.T, e env) {
5058 te.customCodec = erc
5059 te.startServer(&testServer{security: e.security})
5061 te.customCodec = nil
5062 tc := testpb.NewTestServiceClient(te.clientConn())
5063 // Failure case, should not panic.
5064 tc.EmptyCall(context.Background(), &testpb.Empty{})
5067 if _, err := tc.EmptyCall(context.Background(), &testpb.Empty{}); err != nil {
5068 t.Fatalf("EmptyCall(_, _) = _, %v, want _, <nil>", err)
5072 func TestSvrWriteStatusEarlyWrite(t *testing.T) {
5073 defer leakcheck.Check(t)
5074 for _, e := range listTestEnv() {
5075 testSvrWriteStatusEarlyWrite(t, e)
5079 func testSvrWriteStatusEarlyWrite(t *testing.T, e env) {
5081 const smallSize = 1024
5082 const largeSize = 2048
5083 const extraLargeSize = 4096
5084 te.maxServerReceiveMsgSize = newInt(largeSize)
5085 te.maxServerSendMsgSize = newInt(largeSize)
5086 smallPayload, err := newPayload(testpb.PayloadType_COMPRESSABLE, smallSize)
5090 extraLargePayload, err := newPayload(testpb.PayloadType_COMPRESSABLE, extraLargeSize)
5094 te.startServer(&testServer{security: e.security})
5096 tc := testpb.NewTestServiceClient(te.clientConn())
5097 respParam := []*testpb.ResponseParameters{
5099 Size: int32(smallSize),
5102 sreq := &testpb.StreamingOutputCallRequest{
5103 ResponseType: testpb.PayloadType_COMPRESSABLE,
5104 ResponseParameters: respParam,
5105 Payload: extraLargePayload,
5107 // Test recv case: server receives a message larger than maxServerReceiveMsgSize.
5108 stream, err := tc.FullDuplexCall(te.ctx)
5110 t.Fatalf("%v.FullDuplexCall(_) = _, %v, want <nil>", tc, err)
5112 if err = stream.Send(sreq); err != nil {
5113 t.Fatalf("%v.Send() = _, %v, want <nil>", stream, err)
5115 if _, err = stream.Recv(); err == nil || grpc.Code(err) != codes.ResourceExhausted {
5116 t.Fatalf("%v.Recv() = _, %v, want _, error code: %s", stream, err, codes.ResourceExhausted)
5118 // Test send case: server sends a message larger than maxServerSendMsgSize.
5119 sreq.Payload = smallPayload
5120 respParam[0].Size = int32(extraLargeSize)
5122 stream, err = tc.FullDuplexCall(te.ctx)
5124 t.Fatalf("%v.FullDuplexCall(_) = _, %v, want <nil>", tc, err)
5126 if err = stream.Send(sreq); err != nil {
5127 t.Fatalf("%v.Send(%v) = %v, want <nil>", stream, sreq, err)
5129 if _, err = stream.Recv(); err == nil || grpc.Code(err) != codes.ResourceExhausted {
5130 t.Fatalf("%v.Recv() = _, %v, want _, error code: %s", stream, err, codes.ResourceExhausted)
5134 // The following functions with function name ending with TD indicates that they
5135 // should be deleted after old service config API is deprecated and deleted.
5136 func testServiceConfigSetupTD(t *testing.T, e env) (*test, chan grpc.ServiceConfig) {
5138 // We write before read.
5139 ch := make(chan grpc.ServiceConfig, 1)
5141 te.userAgent = testAppUA
5143 "transport: http2Client.notifyError got notified that the client transport was broken EOF",
5144 "grpc: addrConn.transportMonitor exits due to: grpc: the connection is closing",
5145 "grpc: addrConn.resetTransport failed to create client transport: connection error",
5146 "Failed to dial : context canceled; please retry.",
5151 func TestServiceConfigGetMethodConfigTD(t *testing.T) {
5152 defer leakcheck.Check(t)
5153 for _, e := range listTestEnv() {
5154 testGetMethodConfigTD(t, e)
5158 func testGetMethodConfigTD(t *testing.T, e env) {
5159 te, ch := testServiceConfigSetupTD(t, e)
5162 mc1 := grpc.MethodConfig{
5163 WaitForReady: newBool(true),
5164 Timeout: newDuration(time.Millisecond),
5166 mc2 := grpc.MethodConfig{WaitForReady: newBool(false)}
5167 m := make(map[string]grpc.MethodConfig)
5168 m["/grpc.testing.TestService/EmptyCall"] = mc1
5169 m["/grpc.testing.TestService/"] = mc2
5170 sc := grpc.ServiceConfig{
5175 cc := te.clientConn()
5176 tc := testpb.NewTestServiceClient(cc)
5177 // The following RPCs are expected to become non-fail-fast ones with 1ms deadline.
5178 if _, err := tc.EmptyCall(context.Background(), &testpb.Empty{}); grpc.Code(err) != codes.DeadlineExceeded {
5179 t.Fatalf("TestService/EmptyCall(_, _) = _, %v, want _, %s", err, codes.DeadlineExceeded)
5182 m = make(map[string]grpc.MethodConfig)
5183 m["/grpc.testing.TestService/UnaryCall"] = mc1
5184 m["/grpc.testing.TestService/"] = mc2
5185 sc = grpc.ServiceConfig{
5189 // Wait for the new service config to propagate.
5191 if _, err := tc.EmptyCall(context.Background(), &testpb.Empty{}); grpc.Code(err) == codes.DeadlineExceeded {
5196 // The following RPCs are expected to become fail-fast.
5197 if _, err := tc.EmptyCall(context.Background(), &testpb.Empty{}); grpc.Code(err) != codes.Unavailable {
5198 t.Fatalf("TestService/EmptyCall(_, _) = _, %v, want _, %s", err, codes.Unavailable)
5202 func TestServiceConfigWaitForReadyTD(t *testing.T) {
5203 defer leakcheck.Check(t)
5204 for _, e := range listTestEnv() {
5205 testServiceConfigWaitForReadyTD(t, e)
5209 func testServiceConfigWaitForReadyTD(t *testing.T, e env) {
5210 te, ch := testServiceConfigSetupTD(t, e)
5213 // Case1: Client API set failfast to be false, and service config set wait_for_ready to be false, Client API should win, and the rpc will wait until deadline exceeds.
5214 mc := grpc.MethodConfig{
5215 WaitForReady: newBool(false),
5216 Timeout: newDuration(time.Millisecond),
5218 m := make(map[string]grpc.MethodConfig)
5219 m["/grpc.testing.TestService/EmptyCall"] = mc
5220 m["/grpc.testing.TestService/FullDuplexCall"] = mc
5221 sc := grpc.ServiceConfig{
5226 cc := te.clientConn()
5227 tc := testpb.NewTestServiceClient(cc)
5228 // The following RPCs are expected to become non-fail-fast ones with 1ms deadline.
5229 if _, err := tc.EmptyCall(context.Background(), &testpb.Empty{}, grpc.FailFast(false)); grpc.Code(err) != codes.DeadlineExceeded {
5230 t.Fatalf("TestService/EmptyCall(_, _) = _, %v, want _, %s", err, codes.DeadlineExceeded)
5232 if _, err := tc.FullDuplexCall(context.Background(), grpc.FailFast(false)); grpc.Code(err) != codes.DeadlineExceeded {
5233 t.Fatalf("TestService/FullDuplexCall(_) = _, %v, want %s", err, codes.DeadlineExceeded)
5236 // Generate a service config update.
5237 // Case2: Client API does not set failfast, and service config set wait_for_ready to be true, and the rpc will wait until deadline exceeds.
5238 mc.WaitForReady = newBool(true)
5239 m = make(map[string]grpc.MethodConfig)
5240 m["/grpc.testing.TestService/EmptyCall"] = mc
5241 m["/grpc.testing.TestService/FullDuplexCall"] = mc
5242 sc = grpc.ServiceConfig{
5247 // Wait for the new service config to take effect.
5248 mc = cc.GetMethodConfig("/grpc.testing.TestService/EmptyCall")
5250 if !*mc.WaitForReady {
5251 time.Sleep(100 * time.Millisecond)
5252 mc = cc.GetMethodConfig("/grpc.testing.TestService/EmptyCall")
5257 // The following RPCs are expected to become non-fail-fast ones with 1ms deadline.
5258 if _, err := tc.EmptyCall(context.Background(), &testpb.Empty{}); grpc.Code(err) != codes.DeadlineExceeded {
5259 t.Fatalf("TestService/EmptyCall(_, _) = _, %v, want _, %s", err, codes.DeadlineExceeded)
5261 if _, err := tc.FullDuplexCall(context.Background()); grpc.Code(err) != codes.DeadlineExceeded {
5262 t.Fatalf("TestService/FullDuplexCall(_) = _, %v, want %s", err, codes.DeadlineExceeded)
5266 func TestServiceConfigTimeoutTD(t *testing.T) {
5267 defer leakcheck.Check(t)
5268 for _, e := range listTestEnv() {
5269 testServiceConfigTimeoutTD(t, e)
5273 func testServiceConfigTimeoutTD(t *testing.T, e env) {
5274 te, ch := testServiceConfigSetupTD(t, e)
5277 // Case1: Client API sets timeout to be 1ns and ServiceConfig sets timeout to be 1hr. Timeout should be 1ns (min of 1ns and 1hr) and the rpc will wait until deadline exceeds.
5278 mc := grpc.MethodConfig{
5279 Timeout: newDuration(time.Hour),
5281 m := make(map[string]grpc.MethodConfig)
5282 m["/grpc.testing.TestService/EmptyCall"] = mc
5283 m["/grpc.testing.TestService/FullDuplexCall"] = mc
5284 sc := grpc.ServiceConfig{
5289 cc := te.clientConn()
5290 tc := testpb.NewTestServiceClient(cc)
5291 // The following RPCs are expected to become non-fail-fast ones with 1ns deadline.
5292 ctx, cancel := context.WithTimeout(context.Background(), time.Nanosecond)
5293 if _, err := tc.EmptyCall(ctx, &testpb.Empty{}, grpc.FailFast(false)); grpc.Code(err) != codes.DeadlineExceeded {
5294 t.Fatalf("TestService/EmptyCall(_, _) = _, %v, want _, %s", err, codes.DeadlineExceeded)
5297 ctx, cancel = context.WithTimeout(context.Background(), time.Nanosecond)
5298 if _, err := tc.FullDuplexCall(ctx, grpc.FailFast(false)); grpc.Code(err) != codes.DeadlineExceeded {
5299 t.Fatalf("TestService/FullDuplexCall(_) = _, %v, want %s", err, codes.DeadlineExceeded)
5303 // Generate a service config update.
5304 // Case2: Client API sets timeout to be 1hr and ServiceConfig sets timeout to be 1ns. Timeout should be 1ns (min of 1ns and 1hr) and the rpc will wait until deadline exceeds.
5305 mc.Timeout = newDuration(time.Nanosecond)
5306 m = make(map[string]grpc.MethodConfig)
5307 m["/grpc.testing.TestService/EmptyCall"] = mc
5308 m["/grpc.testing.TestService/FullDuplexCall"] = mc
5309 sc = grpc.ServiceConfig{
5314 // Wait for the new service config to take effect.
5315 mc = cc.GetMethodConfig("/grpc.testing.TestService/FullDuplexCall")
5317 if *mc.Timeout != time.Nanosecond {
5318 time.Sleep(100 * time.Millisecond)
5319 mc = cc.GetMethodConfig("/grpc.testing.TestService/FullDuplexCall")
5325 ctx, cancel = context.WithTimeout(context.Background(), time.Hour)
5326 if _, err := tc.EmptyCall(ctx, &testpb.Empty{}, grpc.FailFast(false)); grpc.Code(err) != codes.DeadlineExceeded {
5327 t.Fatalf("TestService/EmptyCall(_, _) = _, %v, want _, %s", err, codes.DeadlineExceeded)
5331 ctx, cancel = context.WithTimeout(context.Background(), time.Hour)
5332 if _, err := tc.FullDuplexCall(ctx, grpc.FailFast(false)); grpc.Code(err) != codes.DeadlineExceeded {
5333 t.Fatalf("TestService/FullDuplexCall(_) = _, %v, want %s", err, codes.DeadlineExceeded)
5338 func TestServiceConfigMaxMsgSizeTD(t *testing.T) {
5339 defer leakcheck.Check(t)
5340 for _, e := range listTestEnv() {
5341 testServiceConfigMaxMsgSizeTD(t, e)
5345 func testServiceConfigMaxMsgSizeTD(t *testing.T, e env) {
5346 // Setting up values and objects shared across all test cases.
5348 const largeSize = 1024
5349 const extraLargeSize = 2048
5351 smallPayload, err := newPayload(testpb.PayloadType_COMPRESSABLE, smallSize)
5355 largePayload, err := newPayload(testpb.PayloadType_COMPRESSABLE, largeSize)
5359 extraLargePayload, err := newPayload(testpb.PayloadType_COMPRESSABLE, extraLargeSize)
5364 mc := grpc.MethodConfig{
5365 MaxReqSize: newInt(extraLargeSize),
5366 MaxRespSize: newInt(extraLargeSize),
5369 m := make(map[string]grpc.MethodConfig)
5370 m["/grpc.testing.TestService/UnaryCall"] = mc
5371 m["/grpc.testing.TestService/FullDuplexCall"] = mc
5372 sc := grpc.ServiceConfig{
5375 // Case1: sc set maxReqSize to 2048 (send), maxRespSize to 2048 (recv).
5376 te1, ch1 := testServiceConfigSetupTD(t, e)
5377 te1.startServer(&testServer{security: e.security})
5378 defer te1.tearDown()
5381 tc := testpb.NewTestServiceClient(te1.clientConn())
5383 req := &testpb.SimpleRequest{
5384 ResponseType: testpb.PayloadType_COMPRESSABLE,
5385 ResponseSize: int32(extraLargeSize),
5386 Payload: smallPayload,
5388 // Test for unary RPC recv.
5389 if _, err := tc.UnaryCall(context.Background(), req); err == nil || grpc.Code(err) != codes.ResourceExhausted {
5390 t.Fatalf("TestService/UnaryCall(_, _) = _, %v, want _, error code: %s", err, codes.ResourceExhausted)
5393 // Test for unary RPC send.
5394 req.Payload = extraLargePayload
5395 req.ResponseSize = int32(smallSize)
5396 if _, err := tc.UnaryCall(context.Background(), req); err == nil || grpc.Code(err) != codes.ResourceExhausted {
5397 t.Fatalf("TestService/UnaryCall(_, _) = _, %v, want _, error code: %s", err, codes.ResourceExhausted)
5400 // Test for streaming RPC recv.
5401 respParam := []*testpb.ResponseParameters{
5403 Size: int32(extraLargeSize),
5406 sreq := &testpb.StreamingOutputCallRequest{
5407 ResponseType: testpb.PayloadType_COMPRESSABLE,
5408 ResponseParameters: respParam,
5409 Payload: smallPayload,
5411 stream, err := tc.FullDuplexCall(te1.ctx)
5413 t.Fatalf("%v.FullDuplexCall(_) = _, %v, want <nil>", tc, err)
5415 if err := stream.Send(sreq); err != nil {
5416 t.Fatalf("%v.Send(%v) = %v, want <nil>", stream, sreq, err)
5418 if _, err := stream.Recv(); err == nil || grpc.Code(err) != codes.ResourceExhausted {
5419 t.Fatalf("%v.Recv() = _, %v, want _, error code: %s", stream, err, codes.ResourceExhausted)
5422 // Test for streaming RPC send.
5423 respParam[0].Size = int32(smallSize)
5424 sreq.Payload = extraLargePayload
5425 stream, err = tc.FullDuplexCall(te1.ctx)
5427 t.Fatalf("%v.FullDuplexCall(_) = _, %v, want <nil>", tc, err)
5429 if err := stream.Send(sreq); err == nil || grpc.Code(err) != codes.ResourceExhausted {
5430 t.Fatalf("%v.Send(%v) = %v, want _, error code: %s", stream, sreq, err, codes.ResourceExhausted)
5433 // Case2: Client API set maxReqSize to 1024 (send), maxRespSize to 1024 (recv). Sc sets maxReqSize to 2048 (send), maxRespSize to 2048 (recv).
5434 te2, ch2 := testServiceConfigSetupTD(t, e)
5435 te2.maxClientReceiveMsgSize = newInt(1024)
5436 te2.maxClientSendMsgSize = newInt(1024)
5437 te2.startServer(&testServer{security: e.security})
5438 defer te2.tearDown()
5440 tc = testpb.NewTestServiceClient(te2.clientConn())
5442 // Test for unary RPC recv.
5443 req.Payload = smallPayload
5444 req.ResponseSize = int32(largeSize)
5446 if _, err := tc.UnaryCall(context.Background(), req); err == nil || grpc.Code(err) != codes.ResourceExhausted {
5447 t.Fatalf("TestService/UnaryCall(_, _) = _, %v, want _, error code: %s", err, codes.ResourceExhausted)
5450 // Test for unary RPC send.
5451 req.Payload = largePayload
5452 req.ResponseSize = int32(smallSize)
5453 if _, err := tc.UnaryCall(context.Background(), req); err == nil || grpc.Code(err) != codes.ResourceExhausted {
5454 t.Fatalf("TestService/UnaryCall(_, _) = _, %v, want _, error code: %s", err, codes.ResourceExhausted)
5457 // Test for streaming RPC recv.
5458 stream, err = tc.FullDuplexCall(te2.ctx)
5459 respParam[0].Size = int32(largeSize)
5460 sreq.Payload = smallPayload
5462 t.Fatalf("%v.FullDuplexCall(_) = _, %v, want <nil>", tc, err)
5464 if err := stream.Send(sreq); err != nil {
5465 t.Fatalf("%v.Send(%v) = %v, want <nil>", stream, sreq, err)
5467 if _, err := stream.Recv(); err == nil || grpc.Code(err) != codes.ResourceExhausted {
5468 t.Fatalf("%v.Recv() = _, %v, want _, error code: %s", stream, err, codes.ResourceExhausted)
5471 // Test for streaming RPC send.
5472 respParam[0].Size = int32(smallSize)
5473 sreq.Payload = largePayload
5474 stream, err = tc.FullDuplexCall(te2.ctx)
5476 t.Fatalf("%v.FullDuplexCall(_) = _, %v, want <nil>", tc, err)
5478 if err := stream.Send(sreq); err == nil || grpc.Code(err) != codes.ResourceExhausted {
5479 t.Fatalf("%v.Send(%v) = %v, want _, error code: %s", stream, sreq, err, codes.ResourceExhausted)
5482 // Case3: Client API set maxReqSize to 4096 (send), maxRespSize to 4096 (recv). Sc sets maxReqSize to 2048 (send), maxRespSize to 2048 (recv).
5483 te3, ch3 := testServiceConfigSetupTD(t, e)
5484 te3.maxClientReceiveMsgSize = newInt(4096)
5485 te3.maxClientSendMsgSize = newInt(4096)
5486 te3.startServer(&testServer{security: e.security})
5487 defer te3.tearDown()
5489 tc = testpb.NewTestServiceClient(te3.clientConn())
5491 // Test for unary RPC recv.
5492 req.Payload = smallPayload
5493 req.ResponseSize = int32(largeSize)
5495 if _, err := tc.UnaryCall(context.Background(), req); err != nil {
5496 t.Fatalf("TestService/UnaryCall(_, _) = _, %v, want <nil>", err)
5499 req.ResponseSize = int32(extraLargeSize)
5500 if _, err := tc.UnaryCall(context.Background(), req); err == nil || grpc.Code(err) != codes.ResourceExhausted {
5501 t.Fatalf("TestService/UnaryCall(_, _) = _, %v, want _, error code: %s", err, codes.ResourceExhausted)
5504 // Test for unary RPC send.
5505 req.Payload = largePayload
5506 req.ResponseSize = int32(smallSize)
5507 if _, err := tc.UnaryCall(context.Background(), req); err != nil {
5508 t.Fatalf("TestService/UnaryCall(_, _) = _, %v, want <nil>", err)
5511 req.Payload = extraLargePayload
5512 if _, err := tc.UnaryCall(context.Background(), req); err == nil || grpc.Code(err) != codes.ResourceExhausted {
5513 t.Fatalf("TestService/UnaryCall(_, _) = _, %v, want _, error code: %s", err, codes.ResourceExhausted)
5516 // Test for streaming RPC recv.
5517 stream, err = tc.FullDuplexCall(te3.ctx)
5519 t.Fatalf("%v.FullDuplexCall(_) = _, %v, want <nil>", tc, err)
5521 respParam[0].Size = int32(largeSize)
5522 sreq.Payload = smallPayload
5524 if err := stream.Send(sreq); err != nil {
5525 t.Fatalf("%v.Send(%v) = %v, want <nil>", stream, sreq, err)
5527 if _, err := stream.Recv(); err != nil {
5528 t.Fatalf("%v.Recv() = _, %v, want <nil>", stream, err)
5531 respParam[0].Size = int32(extraLargeSize)
5533 if err := stream.Send(sreq); err != nil {
5534 t.Fatalf("%v.Send(%v) = %v, want <nil>", stream, sreq, err)
5536 if _, err := stream.Recv(); err == nil || grpc.Code(err) != codes.ResourceExhausted {
5537 t.Fatalf("%v.Recv() = _, %v, want _, error code: %s", stream, err, codes.ResourceExhausted)
5540 // Test for streaming RPC send.
5541 respParam[0].Size = int32(smallSize)
5542 sreq.Payload = largePayload
5543 stream, err = tc.FullDuplexCall(te3.ctx)
5545 t.Fatalf("%v.FullDuplexCall(_) = _, %v, want <nil>", tc, err)
5547 if err := stream.Send(sreq); err != nil {
5548 t.Fatalf("%v.Send(%v) = %v, want <nil>", stream, sreq, err)
5550 sreq.Payload = extraLargePayload
5551 if err := stream.Send(sreq); err == nil || grpc.Code(err) != codes.ResourceExhausted {
5552 t.Fatalf("%v.Send(%v) = %v, want _, error code: %s", stream, sreq, err, codes.ResourceExhausted)
5556 func TestMethodFromServerStream(t *testing.T) {
5557 defer leakcheck.Check(t)
5558 const testMethod = "/package.service/method"
5563 te.unknownHandler = func(srv interface{}, stream grpc.ServerStream) error {
5564 method, ok = grpc.MethodFromServerStream(stream)
5570 _ = te.clientConn().Invoke(context.Background(), testMethod, nil, nil)
5571 if !ok || method != testMethod {
5572 t.Fatalf("Invoke with method %q, got %q, %v, want %q, true", testMethod, method, ok, testMethod)