OSDN Git Service

new repo
[bytom/vapor.git] / vendor / google.golang.org / grpc / test / end2end_test.go
1 /*
2  *
3  * Copyright 2014 gRPC authors.
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  *     http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  *
17  */
18
19 //go:generate protoc --go_out=plugins=grpc:. codec_perf/perf.proto
20 //go:generate protoc --go_out=plugins=grpc:. grpc_testing/test.proto
21
22 package test
23
24 import (
25         "bytes"
26         "crypto/tls"
27         "errors"
28         "flag"
29         "fmt"
30         "io"
31         "math"
32         "net"
33         "os"
34         "reflect"
35         "runtime"
36         "strings"
37         "sync"
38         "sync/atomic"
39         "syscall"
40         "testing"
41         "time"
42
43         "github.com/golang/protobuf/proto"
44         anypb "github.com/golang/protobuf/ptypes/any"
45         "golang.org/x/net/context"
46         "golang.org/x/net/http2"
47         spb "google.golang.org/genproto/googleapis/rpc/status"
48         "google.golang.org/grpc"
49         "google.golang.org/grpc/balancer"
50         _ "google.golang.org/grpc/balancer/roundrobin"
51         "google.golang.org/grpc/codes"
52         "google.golang.org/grpc/connectivity"
53         "google.golang.org/grpc/credentials"
54         _ "google.golang.org/grpc/grpclog/glogger"
55         "google.golang.org/grpc/health"
56         healthpb "google.golang.org/grpc/health/grpc_health_v1"
57         "google.golang.org/grpc/internal"
58         "google.golang.org/grpc/metadata"
59         "google.golang.org/grpc/peer"
60         "google.golang.org/grpc/resolver"
61         "google.golang.org/grpc/resolver/manual"
62         _ "google.golang.org/grpc/resolver/passthrough"
63         "google.golang.org/grpc/stats"
64         "google.golang.org/grpc/status"
65         "google.golang.org/grpc/tap"
66         testpb "google.golang.org/grpc/test/grpc_testing"
67         "google.golang.org/grpc/test/leakcheck"
68         "google.golang.org/grpc/testdata"
69 )
70
71 var (
72         // For headers:
73         testMetadata = metadata.MD{
74                 "key1":     []string{"value1"},
75                 "key2":     []string{"value2"},
76                 "key3-bin": []string{"binvalue1", string([]byte{1, 2, 3})},
77         }
78         testMetadata2 = metadata.MD{
79                 "key1": []string{"value12"},
80                 "key2": []string{"value22"},
81         }
82         // For trailers:
83         testTrailerMetadata = metadata.MD{
84                 "tkey1":     []string{"trailerValue1"},
85                 "tkey2":     []string{"trailerValue2"},
86                 "tkey3-bin": []string{"trailerbinvalue1", string([]byte{3, 2, 1})},
87         }
88         testTrailerMetadata2 = metadata.MD{
89                 "tkey1": []string{"trailerValue12"},
90                 "tkey2": []string{"trailerValue22"},
91         }
92         // capital "Key" is illegal in HTTP/2.
93         malformedHTTP2Metadata = metadata.MD{
94                 "Key": []string{"foo"},
95         }
96         testAppUA     = "myApp1/1.0 myApp2/0.9"
97         failAppUA     = "fail-this-RPC"
98         detailedError = status.ErrorProto(&spb.Status{
99                 Code:    int32(codes.DataLoss),
100                 Message: "error for testing: " + failAppUA,
101                 Details: []*anypb.Any{{
102                         TypeUrl: "url",
103                         Value:   []byte{6, 0, 0, 6, 1, 3},
104                 }},
105         })
106 )
107
108 var raceMode bool // set by race.go in race mode
109
110 type testServer struct {
111         security           string // indicate the authentication protocol used by this server.
112         earlyFail          bool   // whether to error out the execution of a service handler prematurely.
113         setAndSendHeader   bool   // whether to call setHeader and sendHeader.
114         setHeaderOnly      bool   // whether to only call setHeader, not sendHeader.
115         multipleSetTrailer bool   // whether to call setTrailer multiple times.
116         unaryCallSleepTime time.Duration
117 }
118
119 func (s *testServer) EmptyCall(ctx context.Context, in *testpb.Empty) (*testpb.Empty, error) {
120         if md, ok := metadata.FromIncomingContext(ctx); ok {
121                 // For testing purpose, returns an error if user-agent is failAppUA.
122                 // To test that client gets the correct error.
123                 if ua, ok := md["user-agent"]; !ok || strings.HasPrefix(ua[0], failAppUA) {
124                         return nil, detailedError
125                 }
126                 var str []string
127                 for _, entry := range md["user-agent"] {
128                         str = append(str, "ua", entry)
129                 }
130                 grpc.SendHeader(ctx, metadata.Pairs(str...))
131         }
132         return new(testpb.Empty), nil
133 }
134
135 func newPayload(t testpb.PayloadType, size int32) (*testpb.Payload, error) {
136         if size < 0 {
137                 return nil, fmt.Errorf("Requested a response with invalid length %d", size)
138         }
139         body := make([]byte, size)
140         switch t {
141         case testpb.PayloadType_COMPRESSABLE:
142         case testpb.PayloadType_UNCOMPRESSABLE:
143                 return nil, fmt.Errorf("PayloadType UNCOMPRESSABLE is not supported")
144         default:
145                 return nil, fmt.Errorf("Unsupported payload type: %d", t)
146         }
147         return &testpb.Payload{
148                 Type: t,
149                 Body: body,
150         }, nil
151 }
152
153 func (s *testServer) UnaryCall(ctx context.Context, in *testpb.SimpleRequest) (*testpb.SimpleResponse, error) {
154         md, ok := metadata.FromIncomingContext(ctx)
155         if ok {
156                 if _, exists := md[":authority"]; !exists {
157                         return nil, grpc.Errorf(codes.DataLoss, "expected an :authority metadata: %v", md)
158                 }
159                 if s.setAndSendHeader {
160                         if err := grpc.SetHeader(ctx, md); err != nil {
161                                 return nil, grpc.Errorf(grpc.Code(err), "grpc.SetHeader(_, %v) = %v, want <nil>", md, err)
162                         }
163                         if err := grpc.SendHeader(ctx, testMetadata2); err != nil {
164                                 return nil, grpc.Errorf(grpc.Code(err), "grpc.SendHeader(_, %v) = %v, want <nil>", testMetadata2, err)
165                         }
166                 } else if s.setHeaderOnly {
167                         if err := grpc.SetHeader(ctx, md); err != nil {
168                                 return nil, grpc.Errorf(grpc.Code(err), "grpc.SetHeader(_, %v) = %v, want <nil>", md, err)
169                         }
170                         if err := grpc.SetHeader(ctx, testMetadata2); err != nil {
171                                 return nil, grpc.Errorf(grpc.Code(err), "grpc.SetHeader(_, %v) = %v, want <nil>", testMetadata2, err)
172                         }
173                 } else {
174                         if err := grpc.SendHeader(ctx, md); err != nil {
175                                 return nil, grpc.Errorf(grpc.Code(err), "grpc.SendHeader(_, %v) = %v, want <nil>", md, err)
176                         }
177                 }
178                 if err := grpc.SetTrailer(ctx, testTrailerMetadata); err != nil {
179                         return nil, grpc.Errorf(grpc.Code(err), "grpc.SetTrailer(_, %v) = %v, want <nil>", testTrailerMetadata, err)
180                 }
181                 if s.multipleSetTrailer {
182                         if err := grpc.SetTrailer(ctx, testTrailerMetadata2); err != nil {
183                                 return nil, grpc.Errorf(grpc.Code(err), "grpc.SetTrailer(_, %v) = %v, want <nil>", testTrailerMetadata2, err)
184                         }
185                 }
186         }
187         pr, ok := peer.FromContext(ctx)
188         if !ok {
189                 return nil, grpc.Errorf(codes.DataLoss, "failed to get peer from ctx")
190         }
191         if pr.Addr == net.Addr(nil) {
192                 return nil, grpc.Errorf(codes.DataLoss, "failed to get peer address")
193         }
194         if s.security != "" {
195                 // Check Auth info
196                 var authType, serverName string
197                 switch info := pr.AuthInfo.(type) {
198                 case credentials.TLSInfo:
199                         authType = info.AuthType()
200                         serverName = info.State.ServerName
201                 default:
202                         return nil, grpc.Errorf(codes.Unauthenticated, "Unknown AuthInfo type")
203                 }
204                 if authType != s.security {
205                         return nil, grpc.Errorf(codes.Unauthenticated, "Wrong auth type: got %q, want %q", authType, s.security)
206                 }
207                 if serverName != "x.test.youtube.com" {
208                         return nil, grpc.Errorf(codes.Unauthenticated, "Unknown server name %q", serverName)
209                 }
210         }
211         // Simulate some service delay.
212         time.Sleep(s.unaryCallSleepTime)
213
214         payload, err := newPayload(in.GetResponseType(), in.GetResponseSize())
215         if err != nil {
216                 return nil, err
217         }
218
219         return &testpb.SimpleResponse{
220                 Payload: payload,
221         }, nil
222 }
223
224 func (s *testServer) StreamingOutputCall(args *testpb.StreamingOutputCallRequest, stream testpb.TestService_StreamingOutputCallServer) error {
225         if md, ok := metadata.FromIncomingContext(stream.Context()); ok {
226                 if _, exists := md[":authority"]; !exists {
227                         return grpc.Errorf(codes.DataLoss, "expected an :authority metadata: %v", md)
228                 }
229                 // For testing purpose, returns an error if user-agent is failAppUA.
230                 // To test that client gets the correct error.
231                 if ua, ok := md["user-agent"]; !ok || strings.HasPrefix(ua[0], failAppUA) {
232                         return grpc.Errorf(codes.DataLoss, "error for testing: "+failAppUA)
233                 }
234         }
235         cs := args.GetResponseParameters()
236         for _, c := range cs {
237                 if us := c.GetIntervalUs(); us > 0 {
238                         time.Sleep(time.Duration(us) * time.Microsecond)
239                 }
240
241                 payload, err := newPayload(args.GetResponseType(), c.GetSize())
242                 if err != nil {
243                         return err
244                 }
245
246                 if err := stream.Send(&testpb.StreamingOutputCallResponse{
247                         Payload: payload,
248                 }); err != nil {
249                         return err
250                 }
251         }
252         return nil
253 }
254
255 func (s *testServer) StreamingInputCall(stream testpb.TestService_StreamingInputCallServer) error {
256         var sum int
257         for {
258                 in, err := stream.Recv()
259                 if err == io.EOF {
260                         return stream.SendAndClose(&testpb.StreamingInputCallResponse{
261                                 AggregatedPayloadSize: int32(sum),
262                         })
263                 }
264                 if err != nil {
265                         return err
266                 }
267                 p := in.GetPayload().GetBody()
268                 sum += len(p)
269                 if s.earlyFail {
270                         return grpc.Errorf(codes.NotFound, "not found")
271                 }
272         }
273 }
274
275 func (s *testServer) FullDuplexCall(stream testpb.TestService_FullDuplexCallServer) error {
276         md, ok := metadata.FromIncomingContext(stream.Context())
277         if ok {
278                 if s.setAndSendHeader {
279                         if err := stream.SetHeader(md); err != nil {
280                                 return grpc.Errorf(grpc.Code(err), "%v.SetHeader(_, %v) = %v, want <nil>", stream, md, err)
281                         }
282                         if err := stream.SendHeader(testMetadata2); err != nil {
283                                 return grpc.Errorf(grpc.Code(err), "%v.SendHeader(_, %v) = %v, want <nil>", stream, testMetadata2, err)
284                         }
285                 } else if s.setHeaderOnly {
286                         if err := stream.SetHeader(md); err != nil {
287                                 return grpc.Errorf(grpc.Code(err), "%v.SetHeader(_, %v) = %v, want <nil>", stream, md, err)
288                         }
289                         if err := stream.SetHeader(testMetadata2); err != nil {
290                                 return grpc.Errorf(grpc.Code(err), "%v.SetHeader(_, %v) = %v, want <nil>", stream, testMetadata2, err)
291                         }
292                 } else {
293                         if err := stream.SendHeader(md); err != nil {
294                                 return grpc.Errorf(grpc.Code(err), "%v.SendHeader(%v) = %v, want %v", stream, md, err, nil)
295                         }
296                 }
297                 stream.SetTrailer(testTrailerMetadata)
298                 if s.multipleSetTrailer {
299                         stream.SetTrailer(testTrailerMetadata2)
300                 }
301         }
302         for {
303                 in, err := stream.Recv()
304                 if err == io.EOF {
305                         // read done.
306                         return nil
307                 }
308                 if err != nil {
309                         // to facilitate testSvrWriteStatusEarlyWrite
310                         if grpc.Code(err) == codes.ResourceExhausted {
311                                 return grpc.Errorf(codes.Internal, "fake error for test testSvrWriteStatusEarlyWrite. true error: %s", err.Error())
312                         }
313                         return err
314                 }
315                 cs := in.GetResponseParameters()
316                 for _, c := range cs {
317                         if us := c.GetIntervalUs(); us > 0 {
318                                 time.Sleep(time.Duration(us) * time.Microsecond)
319                         }
320
321                         payload, err := newPayload(in.GetResponseType(), c.GetSize())
322                         if err != nil {
323                                 return err
324                         }
325
326                         if err := stream.Send(&testpb.StreamingOutputCallResponse{
327                                 Payload: payload,
328                         }); err != nil {
329                                 // to facilitate testSvrWriteStatusEarlyWrite
330                                 if grpc.Code(err) == codes.ResourceExhausted {
331                                         return grpc.Errorf(codes.Internal, "fake error for test testSvrWriteStatusEarlyWrite. true error: %s", err.Error())
332                                 }
333                                 return err
334                         }
335                 }
336         }
337 }
338
339 func (s *testServer) HalfDuplexCall(stream testpb.TestService_HalfDuplexCallServer) error {
340         var msgBuf []*testpb.StreamingOutputCallRequest
341         for {
342                 in, err := stream.Recv()
343                 if err == io.EOF {
344                         // read done.
345                         break
346                 }
347                 if err != nil {
348                         return err
349                 }
350                 msgBuf = append(msgBuf, in)
351         }
352         for _, m := range msgBuf {
353                 cs := m.GetResponseParameters()
354                 for _, c := range cs {
355                         if us := c.GetIntervalUs(); us > 0 {
356                                 time.Sleep(time.Duration(us) * time.Microsecond)
357                         }
358
359                         payload, err := newPayload(m.GetResponseType(), c.GetSize())
360                         if err != nil {
361                                 return err
362                         }
363
364                         if err := stream.Send(&testpb.StreamingOutputCallResponse{
365                                 Payload: payload,
366                         }); err != nil {
367                                 return err
368                         }
369                 }
370         }
371         return nil
372 }
373
374 type env struct {
375         name         string
376         network      string // The type of network such as tcp, unix, etc.
377         security     string // The security protocol such as TLS, SSH, etc.
378         httpHandler  bool   // whether to use the http.Handler ServerTransport; requires TLS
379         balancer     string // One of "roundrobin", "pickfirst", "v1", or "".
380         customDialer func(string, string, time.Duration) (net.Conn, error)
381 }
382
383 func (e env) runnable() bool {
384         if runtime.GOOS == "windows" && e.network == "unix" {
385                 return false
386         }
387         return true
388 }
389
390 func (e env) dialer(addr string, timeout time.Duration) (net.Conn, error) {
391         if e.customDialer != nil {
392                 return e.customDialer(e.network, addr, timeout)
393         }
394         return net.DialTimeout(e.network, addr, timeout)
395 }
396
397 var (
398         tcpClearEnv   = env{name: "tcp-clear-v1-balancer", network: "tcp", balancer: "v1"}
399         tcpTLSEnv     = env{name: "tcp-tls-v1-balancer", network: "tcp", security: "tls", balancer: "v1"}
400         tcpClearRREnv = env{name: "tcp-clear", network: "tcp", balancer: "roundrobin"}
401         tcpTLSRREnv   = env{name: "tcp-tls", network: "tcp", security: "tls", balancer: "roundrobin"}
402         handlerEnv    = env{name: "handler-tls", network: "tcp", security: "tls", httpHandler: true, balancer: "roundrobin"}
403         noBalancerEnv = env{name: "no-balancer", network: "tcp", security: "tls"}
404         allEnv        = []env{tcpClearEnv, tcpTLSEnv, tcpClearRREnv, tcpTLSRREnv, handlerEnv, noBalancerEnv}
405 )
406
407 var onlyEnv = flag.String("only_env", "", "If non-empty, one of 'tcp-clear', 'tcp-tls', 'unix-clear', 'unix-tls', or 'handler-tls' to only run the tests for that environment. Empty means all.")
408
409 func listTestEnv() (envs []env) {
410         if *onlyEnv != "" {
411                 for _, e := range allEnv {
412                         if e.name == *onlyEnv {
413                                 if !e.runnable() {
414                                         panic(fmt.Sprintf("--only_env environment %q does not run on %s", *onlyEnv, runtime.GOOS))
415                                 }
416                                 return []env{e}
417                         }
418                 }
419                 panic(fmt.Sprintf("invalid --only_env value %q", *onlyEnv))
420         }
421         for _, e := range allEnv {
422                 if e.runnable() {
423                         envs = append(envs, e)
424                 }
425         }
426         return envs
427 }
428
429 // test is an end-to-end test. It should be created with the newTest
430 // func, modified as needed, and then started with its startServer method.
431 // It should be cleaned up with the tearDown method.
432 type test struct {
433         t *testing.T
434         e env
435
436         ctx    context.Context // valid for life of test, before tearDown
437         cancel context.CancelFunc
438
439         // Configurable knobs, after newTest returns:
440         testServer                  testpb.TestServiceServer // nil means none
441         healthServer                *health.Server           // nil means disabled
442         maxStream                   uint32
443         tapHandle                   tap.ServerInHandle
444         maxMsgSize                  *int
445         maxClientReceiveMsgSize     *int
446         maxClientSendMsgSize        *int
447         maxServerReceiveMsgSize     *int
448         maxServerSendMsgSize        *int
449         userAgent                   string
450         clientCompression           bool
451         serverCompression           bool
452         unaryClientInt              grpc.UnaryClientInterceptor
453         streamClientInt             grpc.StreamClientInterceptor
454         unaryServerInt              grpc.UnaryServerInterceptor
455         streamServerInt             grpc.StreamServerInterceptor
456         unknownHandler              grpc.StreamHandler
457         sc                          <-chan grpc.ServiceConfig
458         customCodec                 grpc.Codec
459         serverInitialWindowSize     int32
460         serverInitialConnWindowSize int32
461         clientInitialWindowSize     int32
462         clientInitialConnWindowSize int32
463         perRPCCreds                 credentials.PerRPCCredentials
464         resolverScheme              string
465
466         // All test dialing is blocking by default. Set this to true if dial
467         // should be non-blocking.
468         nonBlockingDial bool
469
470         // srv and srvAddr are set once startServer is called.
471         srv     *grpc.Server
472         srvAddr string
473
474         cc          *grpc.ClientConn // nil until requested via clientConn
475         restoreLogs func()           // nil unless declareLogNoise is used
476 }
477
478 func (te *test) tearDown() {
479         if te.cancel != nil {
480                 te.cancel()
481                 te.cancel = nil
482         }
483         if te.cc != nil {
484                 te.cc.Close()
485                 te.cc = nil
486         }
487         if te.restoreLogs != nil {
488                 te.restoreLogs()
489                 te.restoreLogs = nil
490         }
491         if te.srv != nil {
492                 te.srv.Stop()
493         }
494 }
495
496 // newTest returns a new test using the provided testing.T and
497 // environment.  It is returned with default values. Tests should
498 // modify it before calling its startServer and clientConn methods.
499 func newTest(t *testing.T, e env) *test {
500         te := &test{
501                 t:         t,
502                 e:         e,
503                 maxStream: math.MaxUint32,
504         }
505         te.ctx, te.cancel = context.WithCancel(context.Background())
506         return te
507 }
508
509 // startServer starts a gRPC server listening. Callers should defer a
510 // call to te.tearDown to clean up.
511 func (te *test) startServer(ts testpb.TestServiceServer) {
512         te.testServer = ts
513         te.t.Logf("Running test in %s environment...", te.e.name)
514         sopts := []grpc.ServerOption{grpc.MaxConcurrentStreams(te.maxStream)}
515         if te.maxMsgSize != nil {
516                 sopts = append(sopts, grpc.MaxMsgSize(*te.maxMsgSize))
517         }
518         if te.maxServerReceiveMsgSize != nil {
519                 sopts = append(sopts, grpc.MaxRecvMsgSize(*te.maxServerReceiveMsgSize))
520         }
521         if te.maxServerSendMsgSize != nil {
522                 sopts = append(sopts, grpc.MaxSendMsgSize(*te.maxServerSendMsgSize))
523         }
524         if te.tapHandle != nil {
525                 sopts = append(sopts, grpc.InTapHandle(te.tapHandle))
526         }
527         if te.serverCompression {
528                 sopts = append(sopts,
529                         grpc.RPCCompressor(grpc.NewGZIPCompressor()),
530                         grpc.RPCDecompressor(grpc.NewGZIPDecompressor()),
531                 )
532         }
533         if te.unaryServerInt != nil {
534                 sopts = append(sopts, grpc.UnaryInterceptor(te.unaryServerInt))
535         }
536         if te.streamServerInt != nil {
537                 sopts = append(sopts, grpc.StreamInterceptor(te.streamServerInt))
538         }
539         if te.unknownHandler != nil {
540                 sopts = append(sopts, grpc.UnknownServiceHandler(te.unknownHandler))
541         }
542         if te.serverInitialWindowSize > 0 {
543                 sopts = append(sopts, grpc.InitialWindowSize(te.serverInitialWindowSize))
544         }
545         if te.serverInitialConnWindowSize > 0 {
546                 sopts = append(sopts, grpc.InitialConnWindowSize(te.serverInitialConnWindowSize))
547         }
548         la := "localhost:0"
549         switch te.e.network {
550         case "unix":
551                 la = "/tmp/testsock" + fmt.Sprintf("%d", time.Now().UnixNano())
552                 syscall.Unlink(la)
553         }
554         lis, err := net.Listen(te.e.network, la)
555         if err != nil {
556                 te.t.Fatalf("Failed to listen: %v", err)
557         }
558         switch te.e.security {
559         case "tls":
560                 creds, err := credentials.NewServerTLSFromFile(testdata.Path("server1.pem"), testdata.Path("server1.key"))
561                 if err != nil {
562                         te.t.Fatalf("Failed to generate credentials %v", err)
563                 }
564                 sopts = append(sopts, grpc.Creds(creds))
565         case "clientTimeoutCreds":
566                 sopts = append(sopts, grpc.Creds(&clientTimeoutCreds{}))
567         }
568         if te.customCodec != nil {
569                 sopts = append(sopts, grpc.CustomCodec(te.customCodec))
570         }
571         s := grpc.NewServer(sopts...)
572         te.srv = s
573         if te.e.httpHandler {
574                 internal.TestingUseHandlerImpl(s)
575         }
576         if te.healthServer != nil {
577                 healthpb.RegisterHealthServer(s, te.healthServer)
578         }
579         if te.testServer != nil {
580                 testpb.RegisterTestServiceServer(s, te.testServer)
581         }
582         addr := la
583         switch te.e.network {
584         case "unix":
585         default:
586                 _, port, err := net.SplitHostPort(lis.Addr().String())
587                 if err != nil {
588                         te.t.Fatalf("Failed to parse listener address: %v", err)
589                 }
590                 addr = "localhost:" + port
591         }
592
593         go s.Serve(lis)
594         te.srvAddr = addr
595 }
596
597 func (te *test) clientConn() *grpc.ClientConn {
598         if te.cc != nil {
599                 return te.cc
600         }
601         opts := []grpc.DialOption{
602                 grpc.WithDialer(te.e.dialer),
603                 grpc.WithUserAgent(te.userAgent),
604         }
605
606         if te.sc != nil {
607                 opts = append(opts, grpc.WithServiceConfig(te.sc))
608         }
609
610         if te.clientCompression {
611                 opts = append(opts,
612                         grpc.WithCompressor(grpc.NewGZIPCompressor()),
613                         grpc.WithDecompressor(grpc.NewGZIPDecompressor()),
614                 )
615         }
616         if te.unaryClientInt != nil {
617                 opts = append(opts, grpc.WithUnaryInterceptor(te.unaryClientInt))
618         }
619         if te.streamClientInt != nil {
620                 opts = append(opts, grpc.WithStreamInterceptor(te.streamClientInt))
621         }
622         if te.maxMsgSize != nil {
623                 opts = append(opts, grpc.WithMaxMsgSize(*te.maxMsgSize))
624         }
625         if te.maxClientReceiveMsgSize != nil {
626                 opts = append(opts, grpc.WithDefaultCallOptions(grpc.MaxCallRecvMsgSize(*te.maxClientReceiveMsgSize)))
627         }
628         if te.maxClientSendMsgSize != nil {
629                 opts = append(opts, grpc.WithDefaultCallOptions(grpc.MaxCallSendMsgSize(*te.maxClientSendMsgSize)))
630         }
631         switch te.e.security {
632         case "tls":
633                 creds, err := credentials.NewClientTLSFromFile(testdata.Path("ca.pem"), "x.test.youtube.com")
634                 if err != nil {
635                         te.t.Fatalf("Failed to load credentials: %v", err)
636                 }
637                 opts = append(opts, grpc.WithTransportCredentials(creds))
638         case "clientTimeoutCreds":
639                 opts = append(opts, grpc.WithTransportCredentials(&clientTimeoutCreds{}))
640         default:
641                 opts = append(opts, grpc.WithInsecure())
642         }
643         // TODO(bar) switch balancer case "pickfirst".
644         var scheme string
645         if te.resolverScheme == "" {
646                 scheme = "passthrough:///"
647         } else {
648                 scheme = te.resolverScheme + ":///"
649         }
650         switch te.e.balancer {
651         case "v1":
652                 opts = append(opts, grpc.WithBalancer(grpc.RoundRobin(nil)))
653         case "roundrobin":
654                 rr := balancer.Get("roundrobin")
655                 if rr == nil {
656                         te.t.Fatalf("got nil when trying to get roundrobin balancer builder")
657                 }
658                 opts = append(opts, grpc.WithBalancerBuilder(rr))
659         }
660         if te.clientInitialWindowSize > 0 {
661                 opts = append(opts, grpc.WithInitialWindowSize(te.clientInitialWindowSize))
662         }
663         if te.clientInitialConnWindowSize > 0 {
664                 opts = append(opts, grpc.WithInitialConnWindowSize(te.clientInitialConnWindowSize))
665         }
666         if te.perRPCCreds != nil {
667                 opts = append(opts, grpc.WithPerRPCCredentials(te.perRPCCreds))
668         }
669         if te.customCodec != nil {
670                 opts = append(opts, grpc.WithCodec(te.customCodec))
671         }
672         if !te.nonBlockingDial && te.srvAddr != "" {
673                 // Only do a blocking dial if server is up.
674                 opts = append(opts, grpc.WithBlock())
675         }
676         if te.srvAddr == "" {
677                 te.srvAddr = "client.side.only.test"
678         }
679         var err error
680         te.cc, err = grpc.Dial(scheme+te.srvAddr, opts...)
681         if err != nil {
682                 te.t.Fatalf("Dial(%q) = %v", scheme+te.srvAddr, err)
683         }
684         return te.cc
685 }
686
687 func (te *test) declareLogNoise(phrases ...string) {
688         te.restoreLogs = declareLogNoise(te.t, phrases...)
689 }
690
691 func (te *test) withServerTester(fn func(st *serverTester)) {
692         c, err := te.e.dialer(te.srvAddr, 10*time.Second)
693         if err != nil {
694                 te.t.Fatal(err)
695         }
696         defer c.Close()
697         if te.e.security == "tls" {
698                 c = tls.Client(c, &tls.Config{
699                         InsecureSkipVerify: true,
700                         NextProtos:         []string{http2.NextProtoTLS},
701                 })
702         }
703         st := newServerTesterFromConn(te.t, c)
704         st.greet()
705         fn(st)
706 }
707
708 type lazyConn struct {
709         net.Conn
710         beLazy int32
711 }
712
713 func (l *lazyConn) Write(b []byte) (int, error) {
714         if atomic.LoadInt32(&(l.beLazy)) == 1 {
715                 // The sleep duration here needs to less than the leakCheck deadline.
716                 time.Sleep(time.Second)
717         }
718         return l.Conn.Write(b)
719 }
720
721 func TestContextDeadlineNotIgnored(t *testing.T) {
722         defer leakcheck.Check(t)
723         e := noBalancerEnv
724         var lc *lazyConn
725         e.customDialer = func(network, addr string, timeout time.Duration) (net.Conn, error) {
726                 conn, err := net.DialTimeout(network, addr, timeout)
727                 if err != nil {
728                         return nil, err
729                 }
730                 lc = &lazyConn{Conn: conn}
731                 return lc, nil
732         }
733
734         te := newTest(t, e)
735         te.startServer(&testServer{security: e.security})
736         defer te.tearDown()
737
738         cc := te.clientConn()
739         tc := testpb.NewTestServiceClient(cc)
740         if _, err := tc.EmptyCall(context.Background(), &testpb.Empty{}); err != nil {
741                 t.Fatalf("TestService/EmptyCall(_, _) = _, %v, want _, <nil>", err)
742         }
743         atomic.StoreInt32(&(lc.beLazy), 1)
744         ctx, cancel := context.WithTimeout(context.Background(), 50*time.Millisecond)
745         defer cancel()
746         t1 := time.Now()
747         if _, err := tc.EmptyCall(ctx, &testpb.Empty{}); grpc.Code(err) != codes.DeadlineExceeded {
748                 t.Fatalf("TestService/EmptyCall(_, _) = _, %v, want _, context.DeadlineExceeded", err)
749         }
750         if time.Since(t1) > 2*time.Second {
751                 t.Fatalf("TestService/EmptyCall(_, _) ran over the deadline")
752         }
753 }
754
755 func TestTimeoutOnDeadServer(t *testing.T) {
756         defer leakcheck.Check(t)
757         for _, e := range listTestEnv() {
758                 testTimeoutOnDeadServer(t, e)
759         }
760 }
761
762 func testTimeoutOnDeadServer(t *testing.T, e env) {
763         te := newTest(t, e)
764         te.userAgent = testAppUA
765         te.declareLogNoise(
766                 "transport: http2Client.notifyError got notified that the client transport was broken EOF",
767                 "grpc: addrConn.transportMonitor exits due to: grpc: the connection is closing",
768                 "grpc: addrConn.resetTransport failed to create client transport: connection error",
769         )
770         te.startServer(&testServer{security: e.security})
771         defer te.tearDown()
772
773         cc := te.clientConn()
774         tc := testpb.NewTestServiceClient(cc)
775         if _, err := tc.EmptyCall(context.Background(), &testpb.Empty{}, grpc.FailFast(false)); err != nil {
776                 t.Fatalf("TestService/EmptyCall(_, _) = _, %v, want _, <nil>", err)
777         }
778         te.srv.Stop()
779         ctx, cancel := context.WithTimeout(context.Background(), time.Millisecond)
780         _, err := tc.EmptyCall(ctx, &testpb.Empty{}, grpc.FailFast(false))
781         cancel()
782         if e.balancer != "" && grpc.Code(err) != codes.DeadlineExceeded {
783                 // If e.balancer == nil, the ac will stop reconnecting because the dialer returns non-temp error,
784                 // the error will be an internal error.
785                 t.Fatalf("TestService/EmptyCall(%v, _) = _, %v, want _, error code: %s", ctx, err, codes.DeadlineExceeded)
786         }
787         awaitNewConnLogOutput()
788 }
789
790 func TestServerGracefulStopIdempotent(t *testing.T) {
791         defer leakcheck.Check(t)
792         for _, e := range listTestEnv() {
793                 if e.name == "handler-tls" {
794                         continue
795                 }
796                 testServerGracefulStopIdempotent(t, e)
797         }
798 }
799
800 func testServerGracefulStopIdempotent(t *testing.T, e env) {
801         te := newTest(t, e)
802         te.userAgent = testAppUA
803         te.startServer(&testServer{security: e.security})
804         defer te.tearDown()
805
806         for i := 0; i < 3; i++ {
807                 te.srv.GracefulStop()
808         }
809 }
810
811 func TestServerGoAway(t *testing.T) {
812         defer leakcheck.Check(t)
813         for _, e := range listTestEnv() {
814                 if e.name == "handler-tls" {
815                         continue
816                 }
817                 testServerGoAway(t, e)
818         }
819 }
820
821 func testServerGoAway(t *testing.T, e env) {
822         te := newTest(t, e)
823         te.userAgent = testAppUA
824         te.startServer(&testServer{security: e.security})
825         defer te.tearDown()
826
827         cc := te.clientConn()
828         tc := testpb.NewTestServiceClient(cc)
829         // Finish an RPC to make sure the connection is good.
830         if _, err := tc.EmptyCall(context.Background(), &testpb.Empty{}, grpc.FailFast(false)); err != nil {
831                 t.Fatalf("TestService/EmptyCall(_, _) = _, %v, want _, <nil>", err)
832         }
833         ch := make(chan struct{})
834         go func() {
835                 te.srv.GracefulStop()
836                 close(ch)
837         }()
838         // Loop until the server side GoAway signal is propagated to the client.
839         for {
840                 ctx, cancel := context.WithTimeout(context.Background(), 10*time.Millisecond)
841                 if _, err := tc.EmptyCall(ctx, &testpb.Empty{}); err != nil && grpc.Code(err) != codes.DeadlineExceeded {
842                         cancel()
843                         break
844                 }
845                 cancel()
846         }
847         // A new RPC should fail.
848         if _, err := tc.EmptyCall(context.Background(), &testpb.Empty{}); grpc.Code(err) != codes.Unavailable && grpc.Code(err) != codes.Internal {
849                 t.Fatalf("TestService/EmptyCall(_, _) = _, %v, want _, %s or %s", err, codes.Unavailable, codes.Internal)
850         }
851         <-ch
852         awaitNewConnLogOutput()
853 }
854
855 func TestServerGoAwayPendingRPC(t *testing.T) {
856         defer leakcheck.Check(t)
857         for _, e := range listTestEnv() {
858                 if e.name == "handler-tls" {
859                         continue
860                 }
861                 testServerGoAwayPendingRPC(t, e)
862         }
863 }
864
865 func testServerGoAwayPendingRPC(t *testing.T, e env) {
866         te := newTest(t, e)
867         te.userAgent = testAppUA
868         te.declareLogNoise(
869                 "transport: http2Client.notifyError got notified that the client transport was broken EOF",
870                 "grpc: addrConn.transportMonitor exits due to: grpc: the connection is closing",
871                 "grpc: addrConn.resetTransport failed to create client transport: connection error",
872         )
873         te.startServer(&testServer{security: e.security})
874         defer te.tearDown()
875
876         cc := te.clientConn()
877         tc := testpb.NewTestServiceClient(cc)
878         ctx, cancel := context.WithCancel(context.Background())
879         stream, err := tc.FullDuplexCall(ctx, grpc.FailFast(false))
880         if err != nil {
881                 t.Fatalf("%v.FullDuplexCall(_) = _, %v, want <nil>", tc, err)
882         }
883         // Finish an RPC to make sure the connection is good.
884         if _, err := tc.EmptyCall(context.Background(), &testpb.Empty{}, grpc.FailFast(false)); err != nil {
885                 t.Fatalf("%v.EmptyCall(_, _, _) = _, %v, want _, <nil>", tc, err)
886         }
887         ch := make(chan struct{})
888         go func() {
889                 te.srv.GracefulStop()
890                 close(ch)
891         }()
892         // Loop until the server side GoAway signal is propagated to the client.
893         for {
894                 ctx, cancel := context.WithTimeout(context.Background(), 10*time.Millisecond)
895                 if _, err := tc.EmptyCall(ctx, &testpb.Empty{}, grpc.FailFast(false)); err != nil {
896                         cancel()
897                         break
898                 }
899                 cancel()
900         }
901         respParam := []*testpb.ResponseParameters{
902                 {
903                         Size: 1,
904                 },
905         }
906         payload, err := newPayload(testpb.PayloadType_COMPRESSABLE, int32(100))
907         if err != nil {
908                 t.Fatal(err)
909         }
910         req := &testpb.StreamingOutputCallRequest{
911                 ResponseType:       testpb.PayloadType_COMPRESSABLE,
912                 ResponseParameters: respParam,
913                 Payload:            payload,
914         }
915         // The existing RPC should be still good to proceed.
916         if err := stream.Send(req); err != nil {
917                 t.Fatalf("%v.Send(%v) = %v, want <nil>", stream, req, err)
918         }
919         if _, err := stream.Recv(); err != nil {
920                 t.Fatalf("%v.Recv() = _, %v, want _, <nil>", stream, err)
921         }
922         cancel()
923         <-ch
924         awaitNewConnLogOutput()
925 }
926
927 func TestServerMultipleGoAwayPendingRPC(t *testing.T) {
928         defer leakcheck.Check(t)
929         for _, e := range listTestEnv() {
930                 if e.name == "handler-tls" {
931                         continue
932                 }
933                 testServerMultipleGoAwayPendingRPC(t, e)
934         }
935 }
936
937 func testServerMultipleGoAwayPendingRPC(t *testing.T, e env) {
938         te := newTest(t, e)
939         te.userAgent = testAppUA
940         te.declareLogNoise(
941                 "transport: http2Client.notifyError got notified that the client transport was broken EOF",
942                 "grpc: addrConn.transportMonitor exits due to: grpc: the connection is closing",
943                 "grpc: addrConn.resetTransport failed to create client transport: connection error",
944         )
945         te.startServer(&testServer{security: e.security})
946         defer te.tearDown()
947
948         cc := te.clientConn()
949         tc := testpb.NewTestServiceClient(cc)
950         ctx, cancel := context.WithCancel(context.Background())
951         stream, err := tc.FullDuplexCall(ctx, grpc.FailFast(false))
952         if err != nil {
953                 t.Fatalf("%v.FullDuplexCall(_) = _, %v, want <nil>", tc, err)
954         }
955         // Finish an RPC to make sure the connection is good.
956         if _, err := tc.EmptyCall(context.Background(), &testpb.Empty{}, grpc.FailFast(false)); err != nil {
957                 t.Fatalf("%v.EmptyCall(_, _, _) = _, %v, want _, <nil>", tc, err)
958         }
959         ch1 := make(chan struct{})
960         go func() {
961                 te.srv.GracefulStop()
962                 close(ch1)
963         }()
964         ch2 := make(chan struct{})
965         go func() {
966                 te.srv.GracefulStop()
967                 close(ch2)
968         }()
969         // Loop until the server side GoAway signal is propagated to the client.
970         for {
971                 ctx, cancel := context.WithTimeout(context.Background(), 10*time.Millisecond)
972                 if _, err := tc.EmptyCall(ctx, &testpb.Empty{}, grpc.FailFast(false)); err != nil {
973                         cancel()
974                         break
975                 }
976                 cancel()
977         }
978         select {
979         case <-ch1:
980                 t.Fatal("GracefulStop() terminated early")
981         case <-ch2:
982                 t.Fatal("GracefulStop() terminated early")
983         default:
984         }
985         respParam := []*testpb.ResponseParameters{
986                 {
987                         Size: 1,
988                 },
989         }
990         payload, err := newPayload(testpb.PayloadType_COMPRESSABLE, int32(100))
991         if err != nil {
992                 t.Fatal(err)
993         }
994         req := &testpb.StreamingOutputCallRequest{
995                 ResponseType:       testpb.PayloadType_COMPRESSABLE,
996                 ResponseParameters: respParam,
997                 Payload:            payload,
998         }
999         // The existing RPC should be still good to proceed.
1000         if err := stream.Send(req); err != nil {
1001                 t.Fatalf("%v.Send(%v) = %v, want <nil>", stream, req, err)
1002         }
1003         if _, err := stream.Recv(); err != nil {
1004                 t.Fatalf("%v.Recv() = _, %v, want _, <nil>", stream, err)
1005         }
1006         if err := stream.CloseSend(); err != nil {
1007                 t.Fatalf("%v.CloseSend() = %v, want <nil>", stream, err)
1008         }
1009         <-ch1
1010         <-ch2
1011         cancel()
1012         awaitNewConnLogOutput()
1013 }
1014
1015 func TestConcurrentClientConnCloseAndServerGoAway(t *testing.T) {
1016         defer leakcheck.Check(t)
1017         for _, e := range listTestEnv() {
1018                 if e.name == "handler-tls" {
1019                         continue
1020                 }
1021                 testConcurrentClientConnCloseAndServerGoAway(t, e)
1022         }
1023 }
1024
1025 func testConcurrentClientConnCloseAndServerGoAway(t *testing.T, e env) {
1026         te := newTest(t, e)
1027         te.userAgent = testAppUA
1028         te.declareLogNoise(
1029                 "transport: http2Client.notifyError got notified that the client transport was broken EOF",
1030                 "grpc: addrConn.transportMonitor exits due to: grpc: the connection is closing",
1031                 "grpc: addrConn.resetTransport failed to create client transport: connection error",
1032         )
1033         te.startServer(&testServer{security: e.security})
1034         defer te.tearDown()
1035
1036         cc := te.clientConn()
1037         tc := testpb.NewTestServiceClient(cc)
1038         if _, err := tc.EmptyCall(context.Background(), &testpb.Empty{}, grpc.FailFast(false)); err != nil {
1039                 t.Fatalf("%v.EmptyCall(_, _, _) = _, %v, want _, <nil>", tc, err)
1040         }
1041         ch := make(chan struct{})
1042         // Close ClientConn and Server concurrently.
1043         go func() {
1044                 te.srv.GracefulStop()
1045                 close(ch)
1046         }()
1047         go func() {
1048                 cc.Close()
1049         }()
1050         <-ch
1051 }
1052
1053 func TestConcurrentServerStopAndGoAway(t *testing.T) {
1054         defer leakcheck.Check(t)
1055         for _, e := range listTestEnv() {
1056                 if e.name == "handler-tls" {
1057                         continue
1058                 }
1059                 testConcurrentServerStopAndGoAway(t, e)
1060         }
1061 }
1062
1063 func testConcurrentServerStopAndGoAway(t *testing.T, e env) {
1064         te := newTest(t, e)
1065         te.userAgent = testAppUA
1066         te.declareLogNoise(
1067                 "transport: http2Client.notifyError got notified that the client transport was broken EOF",
1068                 "grpc: addrConn.transportMonitor exits due to: grpc: the connection is closing",
1069                 "grpc: addrConn.resetTransport failed to create client transport: connection error",
1070         )
1071         te.startServer(&testServer{security: e.security})
1072         defer te.tearDown()
1073
1074         cc := te.clientConn()
1075         tc := testpb.NewTestServiceClient(cc)
1076         stream, err := tc.FullDuplexCall(context.Background(), grpc.FailFast(false))
1077         if err != nil {
1078                 t.Fatalf("%v.FullDuplexCall(_) = _, %v, want <nil>", tc, err)
1079         }
1080         // Finish an RPC to make sure the connection is good.
1081         if _, err := tc.EmptyCall(context.Background(), &testpb.Empty{}, grpc.FailFast(false)); err != nil {
1082                 t.Fatalf("%v.EmptyCall(_, _, _) = _, %v, want _, <nil>", tc, err)
1083         }
1084         ch := make(chan struct{})
1085         go func() {
1086                 te.srv.GracefulStop()
1087                 close(ch)
1088         }()
1089         // Loop until the server side GoAway signal is propagated to the client.
1090         for {
1091                 ctx, cancel := context.WithTimeout(context.Background(), 10*time.Millisecond)
1092                 if _, err := tc.EmptyCall(ctx, &testpb.Empty{}, grpc.FailFast(false)); err != nil {
1093                         cancel()
1094                         break
1095                 }
1096                 cancel()
1097         }
1098         // Stop the server and close all the connections.
1099         te.srv.Stop()
1100         respParam := []*testpb.ResponseParameters{
1101                 {
1102                         Size: 1,
1103                 },
1104         }
1105         payload, err := newPayload(testpb.PayloadType_COMPRESSABLE, int32(100))
1106         if err != nil {
1107                 t.Fatal(err)
1108         }
1109         req := &testpb.StreamingOutputCallRequest{
1110                 ResponseType:       testpb.PayloadType_COMPRESSABLE,
1111                 ResponseParameters: respParam,
1112                 Payload:            payload,
1113         }
1114         if err := stream.Send(req); err == nil {
1115                 if _, err := stream.Recv(); err == nil {
1116                         t.Fatalf("%v.Recv() = _, %v, want _, <nil>", stream, err)
1117                 }
1118         }
1119         <-ch
1120         awaitNewConnLogOutput()
1121 }
1122
1123 func TestClientConnCloseAfterGoAwayWithActiveStream(t *testing.T) {
1124         defer leakcheck.Check(t)
1125         for _, e := range listTestEnv() {
1126                 if e.name == "handler-tls" {
1127                         continue
1128                 }
1129                 testClientConnCloseAfterGoAwayWithActiveStream(t, e)
1130         }
1131 }
1132
1133 func testClientConnCloseAfterGoAwayWithActiveStream(t *testing.T, e env) {
1134         te := newTest(t, e)
1135         te.startServer(&testServer{security: e.security})
1136         defer te.tearDown()
1137         cc := te.clientConn()
1138         tc := testpb.NewTestServiceClient(cc)
1139
1140         if _, err := tc.FullDuplexCall(context.Background()); err != nil {
1141                 t.Fatalf("%v.FullDuplexCall(_) = _, %v, want _, <nil>", tc, err)
1142         }
1143         done := make(chan struct{})
1144         go func() {
1145                 te.srv.GracefulStop()
1146                 close(done)
1147         }()
1148         time.Sleep(50 * time.Millisecond)
1149         cc.Close()
1150         timeout := time.NewTimer(time.Second)
1151         select {
1152         case <-done:
1153         case <-timeout.C:
1154                 t.Fatalf("Test timed-out.")
1155         }
1156 }
1157
1158 func TestFailFast(t *testing.T) {
1159         defer leakcheck.Check(t)
1160         for _, e := range listTestEnv() {
1161                 testFailFast(t, e)
1162         }
1163 }
1164
1165 func testFailFast(t *testing.T, e env) {
1166         te := newTest(t, e)
1167         te.userAgent = testAppUA
1168         te.declareLogNoise(
1169                 "transport: http2Client.notifyError got notified that the client transport was broken EOF",
1170                 "grpc: addrConn.transportMonitor exits due to: grpc: the connection is closing",
1171                 "grpc: addrConn.resetTransport failed to create client transport: connection error",
1172         )
1173         te.startServer(&testServer{security: e.security})
1174         defer te.tearDown()
1175
1176         cc := te.clientConn()
1177         tc := testpb.NewTestServiceClient(cc)
1178         if _, err := tc.EmptyCall(context.Background(), &testpb.Empty{}); err != nil {
1179                 t.Fatalf("TestService/EmptyCall(_, _) = _, %v, want _, <nil>", err)
1180         }
1181         // Stop the server and tear down all the exisiting connections.
1182         te.srv.Stop()
1183         // Loop until the server teardown is propagated to the client.
1184         for {
1185                 _, err := tc.EmptyCall(context.Background(), &testpb.Empty{})
1186                 if grpc.Code(err) == codes.Unavailable {
1187                         break
1188                 }
1189                 fmt.Printf("%v.EmptyCall(_, _) = _, %v", tc, err)
1190                 time.Sleep(10 * time.Millisecond)
1191         }
1192         // The client keeps reconnecting and ongoing fail-fast RPCs should fail with code.Unavailable.
1193         if _, err := tc.EmptyCall(context.Background(), &testpb.Empty{}); grpc.Code(err) != codes.Unavailable {
1194                 t.Fatalf("TestService/EmptyCall(_, _, _) = _, %v, want _, error code: %s", err, codes.Unavailable)
1195         }
1196         if _, err := tc.StreamingInputCall(context.Background()); grpc.Code(err) != codes.Unavailable {
1197                 t.Fatalf("TestService/StreamingInputCall(_) = _, %v, want _, error code: %s", err, codes.Unavailable)
1198         }
1199
1200         awaitNewConnLogOutput()
1201 }
1202
1203 func testServiceConfigSetup(t *testing.T, e env) *test {
1204         te := newTest(t, e)
1205         te.userAgent = testAppUA
1206         te.declareLogNoise(
1207                 "transport: http2Client.notifyError got notified that the client transport was broken EOF",
1208                 "grpc: addrConn.transportMonitor exits due to: grpc: the connection is closing",
1209                 "grpc: addrConn.resetTransport failed to create client transport: connection error",
1210                 "Failed to dial : context canceled; please retry.",
1211         )
1212         return te
1213 }
1214
1215 func newBool(b bool) (a *bool) {
1216         return &b
1217 }
1218
1219 func newInt(b int) (a *int) {
1220         return &b
1221 }
1222
1223 func newDuration(b time.Duration) (a *time.Duration) {
1224         a = new(time.Duration)
1225         *a = b
1226         return
1227 }
1228
1229 func TestGetMethodConfig(t *testing.T) {
1230         te := testServiceConfigSetup(t, tcpClearRREnv)
1231         defer te.tearDown()
1232         r, rcleanup := manual.GenerateAndRegisterManualResolver()
1233         defer rcleanup()
1234
1235         te.resolverScheme = r.Scheme()
1236         cc := te.clientConn()
1237         r.NewAddress([]resolver.Address{{Addr: te.srvAddr}})
1238         r.NewServiceConfig(`{
1239     "methodConfig": [
1240         {
1241             "name": [
1242                 {
1243                     "service": "grpc.testing.TestService",
1244                     "method": "EmptyCall"
1245                 }
1246             ],
1247             "waitForReady": true,
1248             "timeout": "1ms"
1249         },
1250         {
1251             "name": [
1252                 {
1253                     "service": "grpc.testing.TestService"
1254                 }
1255             ],
1256             "waitForReady": false
1257         }
1258     ]
1259 }`)
1260
1261         tc := testpb.NewTestServiceClient(cc)
1262
1263         // Make sure service config has been processed by grpc.
1264         for {
1265                 if cc.GetMethodConfig("/grpc.testing.TestService/EmptyCall").WaitForReady != nil {
1266                         break
1267                 }
1268                 time.Sleep(time.Millisecond)
1269         }
1270
1271         // The following RPCs are expected to become non-fail-fast ones with 1ms deadline.
1272         var err error
1273         if _, err = tc.EmptyCall(context.Background(), &testpb.Empty{}); grpc.Code(err) != codes.DeadlineExceeded {
1274                 t.Fatalf("TestService/EmptyCall(_, _) = _, %v, want _, %s", err, codes.DeadlineExceeded)
1275         }
1276
1277         r.NewServiceConfig(`{
1278     "methodConfig": [
1279         {
1280             "name": [
1281                 {
1282                     "service": "grpc.testing.TestService",
1283                     "method": "UnaryCall"
1284                 }
1285             ],
1286             "waitForReady": true,
1287             "timeout": "1ms"
1288         },
1289         {
1290             "name": [
1291                 {
1292                     "service": "grpc.testing.TestService"
1293                 }
1294             ],
1295             "waitForReady": false
1296         }
1297     ]
1298 }`)
1299
1300         // Make sure service config has been processed by grpc.
1301         for {
1302                 if mc := cc.GetMethodConfig("/grpc.testing.TestService/EmptyCall"); mc.WaitForReady != nil && !*mc.WaitForReady {
1303                         break
1304                 }
1305                 time.Sleep(time.Millisecond)
1306         }
1307         // The following RPCs are expected to become fail-fast.
1308         if _, err = tc.EmptyCall(context.Background(), &testpb.Empty{}); grpc.Code(err) != codes.Unavailable {
1309                 t.Fatalf("TestService/EmptyCall(_, _) = _, %v, want _, %s", err, codes.Unavailable)
1310         }
1311 }
1312
1313 func TestServiceConfigWaitForReady(t *testing.T) {
1314         te := testServiceConfigSetup(t, tcpClearRREnv)
1315         defer te.tearDown()
1316         r, rcleanup := manual.GenerateAndRegisterManualResolver()
1317         defer rcleanup()
1318
1319         // Case1: Client API set failfast to be false, and service config set wait_for_ready to be false, Client API should win, and the rpc will wait until deadline exceeds.
1320         te.resolverScheme = r.Scheme()
1321         cc := te.clientConn()
1322         r.NewAddress([]resolver.Address{{Addr: te.srvAddr}})
1323         r.NewServiceConfig(`{
1324     "methodConfig": [
1325         {
1326             "name": [
1327                 {
1328                     "service": "grpc.testing.TestService",
1329                     "method": "EmptyCall"
1330                 },
1331                 {
1332                     "service": "grpc.testing.TestService",
1333                     "method": "FullDuplexCall"
1334                 }
1335             ],
1336             "waitForReady": false,
1337             "timeout": "1ms"
1338         }
1339     ]
1340 }`)
1341
1342         tc := testpb.NewTestServiceClient(cc)
1343
1344         // Make sure service config has been processed by grpc.
1345         for {
1346                 if cc.GetMethodConfig("/grpc.testing.TestService/FullDuplexCall").WaitForReady != nil {
1347                         break
1348                 }
1349                 time.Sleep(time.Millisecond)
1350         }
1351
1352         // The following RPCs are expected to become non-fail-fast ones with 1ms deadline.
1353         var err error
1354         if _, err = tc.EmptyCall(context.Background(), &testpb.Empty{}, grpc.FailFast(false)); grpc.Code(err) != codes.DeadlineExceeded {
1355                 t.Fatalf("TestService/EmptyCall(_, _) = _, %v, want _, %s", err, codes.DeadlineExceeded)
1356         }
1357         if _, err := tc.FullDuplexCall(context.Background(), grpc.FailFast(false)); grpc.Code(err) != codes.DeadlineExceeded {
1358                 t.Fatalf("TestService/FullDuplexCall(_) = _, %v, want %s", err, codes.DeadlineExceeded)
1359         }
1360
1361         // Generate a service config update.
1362         // Case2:Client API set failfast to be false, and service config set wait_for_ready to be true, and the rpc will wait until deadline exceeds.
1363         r.NewServiceConfig(`{
1364     "methodConfig": [
1365         {
1366             "name": [
1367                 {
1368                     "service": "grpc.testing.TestService",
1369                     "method": "EmptyCall"
1370                 },
1371                 {
1372                     "service": "grpc.testing.TestService",
1373                     "method": "FullDuplexCall"
1374                 }
1375             ],
1376             "waitForReady": true,
1377             "timeout": "1ms"
1378         }
1379     ]
1380 }`)
1381
1382         // Wait for the new service config to take effect.
1383         for {
1384                 if mc := cc.GetMethodConfig("/grpc.testing.TestService/EmptyCall"); mc.WaitForReady != nil && *mc.WaitForReady {
1385                         break
1386                 }
1387                 time.Sleep(time.Millisecond)
1388         }
1389         // The following RPCs are expected to become non-fail-fast ones with 1ms deadline.
1390         if _, err := tc.EmptyCall(context.Background(), &testpb.Empty{}); grpc.Code(err) != codes.DeadlineExceeded {
1391                 t.Fatalf("TestService/EmptyCall(_, _) = _, %v, want _, %s", err, codes.DeadlineExceeded)
1392         }
1393         if _, err := tc.FullDuplexCall(context.Background()); grpc.Code(err) != codes.DeadlineExceeded {
1394                 t.Fatalf("TestService/FullDuplexCall(_) = _, %v, want %s", err, codes.DeadlineExceeded)
1395         }
1396 }
1397
1398 func TestServiceConfigTimeout(t *testing.T) {
1399         te := testServiceConfigSetup(t, tcpClearRREnv)
1400         defer te.tearDown()
1401         r, rcleanup := manual.GenerateAndRegisterManualResolver()
1402         defer rcleanup()
1403
1404         // Case1: Client API sets timeout to be 1ns and ServiceConfig sets timeout to be 1hr. Timeout should be 1ns (min of 1ns and 1hr) and the rpc will wait until deadline exceeds.
1405         te.resolverScheme = r.Scheme()
1406         cc := te.clientConn()
1407         r.NewAddress([]resolver.Address{{Addr: te.srvAddr}})
1408         r.NewServiceConfig(`{
1409     "methodConfig": [
1410         {
1411             "name": [
1412                 {
1413                     "service": "grpc.testing.TestService",
1414                     "method": "EmptyCall"
1415                 },
1416                 {
1417                     "service": "grpc.testing.TestService",
1418                     "method": "FullDuplexCall"
1419                 }
1420             ],
1421             "waitForReady": true,
1422             "timeout": "1h"
1423         }
1424     ]
1425 }`)
1426
1427         tc := testpb.NewTestServiceClient(cc)
1428
1429         // Make sure service config has been processed by grpc.
1430         for {
1431                 if cc.GetMethodConfig("/grpc.testing.TestService/FullDuplexCall").Timeout != nil {
1432                         break
1433                 }
1434                 time.Sleep(time.Millisecond)
1435         }
1436
1437         // The following RPCs are expected to become non-fail-fast ones with 1ns deadline.
1438         var err error
1439         ctx, cancel := context.WithTimeout(context.Background(), time.Nanosecond)
1440         if _, err = tc.EmptyCall(ctx, &testpb.Empty{}, grpc.FailFast(false)); grpc.Code(err) != codes.DeadlineExceeded {
1441                 t.Fatalf("TestService/EmptyCall(_, _) = _, %v, want _, %s", err, codes.DeadlineExceeded)
1442         }
1443         cancel()
1444
1445         ctx, cancel = context.WithTimeout(context.Background(), time.Nanosecond)
1446         if _, err = tc.FullDuplexCall(ctx, grpc.FailFast(false)); grpc.Code(err) != codes.DeadlineExceeded {
1447                 t.Fatalf("TestService/FullDuplexCall(_) = _, %v, want %s", err, codes.DeadlineExceeded)
1448         }
1449         cancel()
1450
1451         // Generate a service config update.
1452         // Case2: Client API sets timeout to be 1hr and ServiceConfig sets timeout to be 1ns. Timeout should be 1ns (min of 1ns and 1hr) and the rpc will wait until deadline exceeds.
1453         r.NewServiceConfig(`{
1454     "methodConfig": [
1455         {
1456             "name": [
1457                 {
1458                     "service": "grpc.testing.TestService",
1459                     "method": "EmptyCall"
1460                 },
1461                 {
1462                     "service": "grpc.testing.TestService",
1463                     "method": "FullDuplexCall"
1464                 }
1465             ],
1466             "waitForReady": true,
1467             "timeout": "1ns"
1468         }
1469     ]
1470 }`)
1471
1472         // Wait for the new service config to take effect.
1473         for {
1474                 if mc := cc.GetMethodConfig("/grpc.testing.TestService/FullDuplexCall"); mc.Timeout != nil && *mc.Timeout == time.Nanosecond {
1475                         break
1476                 }
1477                 time.Sleep(time.Millisecond)
1478         }
1479
1480         ctx, cancel = context.WithTimeout(context.Background(), time.Hour)
1481         if _, err = tc.EmptyCall(ctx, &testpb.Empty{}, grpc.FailFast(false)); grpc.Code(err) != codes.DeadlineExceeded {
1482                 t.Fatalf("TestService/EmptyCall(_, _) = _, %v, want _, %s", err, codes.DeadlineExceeded)
1483         }
1484         cancel()
1485
1486         ctx, cancel = context.WithTimeout(context.Background(), time.Hour)
1487         if _, err = tc.FullDuplexCall(ctx, grpc.FailFast(false)); grpc.Code(err) != codes.DeadlineExceeded {
1488                 t.Fatalf("TestService/FullDuplexCall(_) = _, %v, want %s", err, codes.DeadlineExceeded)
1489         }
1490         cancel()
1491 }
1492
1493 func TestServiceConfigMaxMsgSize(t *testing.T) {
1494         e := tcpClearRREnv
1495         r, rcleanup := manual.GenerateAndRegisterManualResolver()
1496         defer rcleanup()
1497
1498         // Setting up values and objects shared across all test cases.
1499         const smallSize = 1
1500         const largeSize = 1024
1501         const extraLargeSize = 2048
1502
1503         smallPayload, err := newPayload(testpb.PayloadType_COMPRESSABLE, smallSize)
1504         if err != nil {
1505                 t.Fatal(err)
1506         }
1507         largePayload, err := newPayload(testpb.PayloadType_COMPRESSABLE, largeSize)
1508         if err != nil {
1509                 t.Fatal(err)
1510         }
1511         extraLargePayload, err := newPayload(testpb.PayloadType_COMPRESSABLE, extraLargeSize)
1512         if err != nil {
1513                 t.Fatal(err)
1514         }
1515
1516         scjs := `{
1517     "methodConfig": [
1518         {
1519             "name": [
1520                 {
1521                     "service": "grpc.testing.TestService",
1522                     "method": "UnaryCall"
1523                 },
1524                 {
1525                     "service": "grpc.testing.TestService",
1526                     "method": "FullDuplexCall"
1527                 }
1528             ],
1529             "maxRequestMessageBytes": 2048,
1530             "maxResponseMessageBytes": 2048
1531         }
1532     ]
1533 }`
1534
1535         // Case1: sc set maxReqSize to 2048 (send), maxRespSize to 2048 (recv).
1536         te1 := testServiceConfigSetup(t, e)
1537         defer te1.tearDown()
1538
1539         te1.resolverScheme = r.Scheme()
1540         te1.nonBlockingDial = true
1541         te1.startServer(&testServer{security: e.security})
1542         cc1 := te1.clientConn()
1543
1544         r.NewAddress([]resolver.Address{{Addr: te1.srvAddr}})
1545         r.NewServiceConfig(scjs)
1546         tc := testpb.NewTestServiceClient(cc1)
1547
1548         req := &testpb.SimpleRequest{
1549                 ResponseType: testpb.PayloadType_COMPRESSABLE,
1550                 ResponseSize: int32(extraLargeSize),
1551                 Payload:      smallPayload,
1552         }
1553
1554         for {
1555                 if cc1.GetMethodConfig("/grpc.testing.TestService/FullDuplexCall").MaxReqSize != nil {
1556                         break
1557                 }
1558                 time.Sleep(time.Millisecond)
1559         }
1560
1561         // Test for unary RPC recv.
1562         if _, err = tc.UnaryCall(context.Background(), req, grpc.FailFast(false)); err == nil || grpc.Code(err) != codes.ResourceExhausted {
1563                 t.Fatalf("TestService/UnaryCall(_, _) = _, %v, want _, error code: %s", err, codes.ResourceExhausted)
1564         }
1565
1566         // Test for unary RPC send.
1567         req.Payload = extraLargePayload
1568         req.ResponseSize = int32(smallSize)
1569         if _, err := tc.UnaryCall(context.Background(), req); err == nil || grpc.Code(err) != codes.ResourceExhausted {
1570                 t.Fatalf("TestService/UnaryCall(_, _) = _, %v, want _, error code: %s", err, codes.ResourceExhausted)
1571         }
1572
1573         // Test for streaming RPC recv.
1574         respParam := []*testpb.ResponseParameters{
1575                 {
1576                         Size: int32(extraLargeSize),
1577                 },
1578         }
1579         sreq := &testpb.StreamingOutputCallRequest{
1580                 ResponseType:       testpb.PayloadType_COMPRESSABLE,
1581                 ResponseParameters: respParam,
1582                 Payload:            smallPayload,
1583         }
1584         stream, err := tc.FullDuplexCall(te1.ctx)
1585         if err != nil {
1586                 t.Fatalf("%v.FullDuplexCall(_) = _, %v, want <nil>", tc, err)
1587         }
1588         if err = stream.Send(sreq); err != nil {
1589                 t.Fatalf("%v.Send(%v) = %v, want <nil>", stream, sreq, err)
1590         }
1591         if _, err = stream.Recv(); err == nil || grpc.Code(err) != codes.ResourceExhausted {
1592                 t.Fatalf("%v.Recv() = _, %v, want _, error code: %s", stream, err, codes.ResourceExhausted)
1593         }
1594
1595         // Test for streaming RPC send.
1596         respParam[0].Size = int32(smallSize)
1597         sreq.Payload = extraLargePayload
1598         stream, err = tc.FullDuplexCall(te1.ctx)
1599         if err != nil {
1600                 t.Fatalf("%v.FullDuplexCall(_) = _, %v, want <nil>", tc, err)
1601         }
1602         if err = stream.Send(sreq); err == nil || grpc.Code(err) != codes.ResourceExhausted {
1603                 t.Fatalf("%v.Send(%v) = %v, want _, error code: %s", stream, sreq, err, codes.ResourceExhausted)
1604         }
1605
1606         // Case2: Client API set maxReqSize to 1024 (send), maxRespSize to 1024 (recv). Sc sets maxReqSize to 2048 (send), maxRespSize to 2048 (recv).
1607         te2 := testServiceConfigSetup(t, e)
1608         te2.resolverScheme = r.Scheme()
1609         te2.nonBlockingDial = true
1610         te2.maxClientReceiveMsgSize = newInt(1024)
1611         te2.maxClientSendMsgSize = newInt(1024)
1612
1613         te2.startServer(&testServer{security: e.security})
1614         defer te2.tearDown()
1615         cc2 := te2.clientConn()
1616         r.NewAddress([]resolver.Address{{Addr: te2.srvAddr}})
1617         r.NewServiceConfig(scjs)
1618         tc = testpb.NewTestServiceClient(cc2)
1619
1620         for {
1621                 if cc2.GetMethodConfig("/grpc.testing.TestService/FullDuplexCall").MaxReqSize != nil {
1622                         break
1623                 }
1624                 time.Sleep(time.Millisecond)
1625         }
1626
1627         // Test for unary RPC recv.
1628         req.Payload = smallPayload
1629         req.ResponseSize = int32(largeSize)
1630
1631         if _, err = tc.UnaryCall(context.Background(), req, grpc.FailFast(false)); err == nil || grpc.Code(err) != codes.ResourceExhausted {
1632                 t.Fatalf("TestService/UnaryCall(_, _) = _, %v, want _, error code: %s", err, codes.ResourceExhausted)
1633         }
1634
1635         // Test for unary RPC send.
1636         req.Payload = largePayload
1637         req.ResponseSize = int32(smallSize)
1638         if _, err := tc.UnaryCall(context.Background(), req); err == nil || grpc.Code(err) != codes.ResourceExhausted {
1639                 t.Fatalf("TestService/UnaryCall(_, _) = _, %v, want _, error code: %s", err, codes.ResourceExhausted)
1640         }
1641
1642         // Test for streaming RPC recv.
1643         stream, err = tc.FullDuplexCall(te2.ctx)
1644         respParam[0].Size = int32(largeSize)
1645         sreq.Payload = smallPayload
1646         if err != nil {
1647                 t.Fatalf("%v.FullDuplexCall(_) = _, %v, want <nil>", tc, err)
1648         }
1649         if err = stream.Send(sreq); err != nil {
1650                 t.Fatalf("%v.Send(%v) = %v, want <nil>", stream, sreq, err)
1651         }
1652         if _, err = stream.Recv(); err == nil || grpc.Code(err) != codes.ResourceExhausted {
1653                 t.Fatalf("%v.Recv() = _, %v, want _, error code: %s", stream, err, codes.ResourceExhausted)
1654         }
1655
1656         // Test for streaming RPC send.
1657         respParam[0].Size = int32(smallSize)
1658         sreq.Payload = largePayload
1659         stream, err = tc.FullDuplexCall(te2.ctx)
1660         if err != nil {
1661                 t.Fatalf("%v.FullDuplexCall(_) = _, %v, want <nil>", tc, err)
1662         }
1663         if err = stream.Send(sreq); err == nil || grpc.Code(err) != codes.ResourceExhausted {
1664                 t.Fatalf("%v.Send(%v) = %v, want _, error code: %s", stream, sreq, err, codes.ResourceExhausted)
1665         }
1666
1667         // Case3: Client API set maxReqSize to 4096 (send), maxRespSize to 4096 (recv). Sc sets maxReqSize to 2048 (send), maxRespSize to 2048 (recv).
1668         te3 := testServiceConfigSetup(t, e)
1669         te3.resolverScheme = r.Scheme()
1670         te3.nonBlockingDial = true
1671         te3.maxClientReceiveMsgSize = newInt(4096)
1672         te3.maxClientSendMsgSize = newInt(4096)
1673
1674         te3.startServer(&testServer{security: e.security})
1675         defer te3.tearDown()
1676
1677         cc3 := te3.clientConn()
1678         r.NewAddress([]resolver.Address{{Addr: te3.srvAddr}})
1679         r.NewServiceConfig(scjs)
1680         tc = testpb.NewTestServiceClient(cc3)
1681
1682         for {
1683                 if cc3.GetMethodConfig("/grpc.testing.TestService/FullDuplexCall").MaxReqSize != nil {
1684                         break
1685                 }
1686                 time.Sleep(time.Millisecond)
1687         }
1688
1689         // Test for unary RPC recv.
1690         req.Payload = smallPayload
1691         req.ResponseSize = int32(largeSize)
1692
1693         if _, err = tc.UnaryCall(context.Background(), req, grpc.FailFast(false)); err != nil {
1694                 t.Fatalf("TestService/UnaryCall(_, _) = _, %v, want <nil>", err)
1695         }
1696
1697         req.ResponseSize = int32(extraLargeSize)
1698         if _, err := tc.UnaryCall(context.Background(), req); err == nil || grpc.Code(err) != codes.ResourceExhausted {
1699                 t.Fatalf("TestService/UnaryCall(_, _) = _, %v, want _, error code: %s", err, codes.ResourceExhausted)
1700         }
1701
1702         // Test for unary RPC send.
1703         req.Payload = largePayload
1704         req.ResponseSize = int32(smallSize)
1705         if _, err := tc.UnaryCall(context.Background(), req); err != nil {
1706                 t.Fatalf("TestService/UnaryCall(_, _) = _, %v, want <nil>", err)
1707         }
1708
1709         req.Payload = extraLargePayload
1710         if _, err = tc.UnaryCall(context.Background(), req); err == nil || grpc.Code(err) != codes.ResourceExhausted {
1711                 t.Fatalf("TestService/UnaryCall(_, _) = _, %v, want _, error code: %s", err, codes.ResourceExhausted)
1712         }
1713
1714         // Test for streaming RPC recv.
1715         stream, err = tc.FullDuplexCall(te3.ctx)
1716         if err != nil {
1717                 t.Fatalf("%v.FullDuplexCall(_) = _, %v, want <nil>", tc, err)
1718         }
1719         respParam[0].Size = int32(largeSize)
1720         sreq.Payload = smallPayload
1721
1722         if err = stream.Send(sreq); err != nil {
1723                 t.Fatalf("%v.Send(%v) = %v, want <nil>", stream, sreq, err)
1724         }
1725         if _, err = stream.Recv(); err != nil {
1726                 t.Fatalf("%v.Recv() = _, %v, want <nil>", stream, err)
1727         }
1728
1729         respParam[0].Size = int32(extraLargeSize)
1730
1731         if err = stream.Send(sreq); err != nil {
1732                 t.Fatalf("%v.Send(%v) = %v, want <nil>", stream, sreq, err)
1733         }
1734         if _, err = stream.Recv(); err == nil || grpc.Code(err) != codes.ResourceExhausted {
1735                 t.Fatalf("%v.Recv() = _, %v, want _, error code: %s", stream, err, codes.ResourceExhausted)
1736         }
1737
1738         // Test for streaming RPC send.
1739         respParam[0].Size = int32(smallSize)
1740         sreq.Payload = largePayload
1741         stream, err = tc.FullDuplexCall(te3.ctx)
1742         if err != nil {
1743                 t.Fatalf("%v.FullDuplexCall(_) = _, %v, want <nil>", tc, err)
1744         }
1745         if err := stream.Send(sreq); err != nil {
1746                 t.Fatalf("%v.Send(%v) = %v, want <nil>", stream, sreq, err)
1747         }
1748         sreq.Payload = extraLargePayload
1749         if err := stream.Send(sreq); err == nil || grpc.Code(err) != codes.ResourceExhausted {
1750                 t.Fatalf("%v.Send(%v) = %v, want _, error code: %s", stream, sreq, err, codes.ResourceExhausted)
1751         }
1752 }
1753
1754 func TestMaxMsgSizeClientDefault(t *testing.T) {
1755         defer leakcheck.Check(t)
1756         for _, e := range listTestEnv() {
1757                 testMaxMsgSizeClientDefault(t, e)
1758         }
1759 }
1760
1761 func testMaxMsgSizeClientDefault(t *testing.T, e env) {
1762         te := newTest(t, e)
1763         te.userAgent = testAppUA
1764         te.declareLogNoise(
1765                 "transport: http2Client.notifyError got notified that the client transport was broken EOF",
1766                 "grpc: addrConn.transportMonitor exits due to: grpc: the connection is closing",
1767                 "grpc: addrConn.resetTransport failed to create client transport: connection error",
1768                 "Failed to dial : context canceled; please retry.",
1769         )
1770         te.startServer(&testServer{security: e.security})
1771
1772         defer te.tearDown()
1773         tc := testpb.NewTestServiceClient(te.clientConn())
1774
1775         const smallSize = 1
1776         const largeSize = 4 * 1024 * 1024
1777         smallPayload, err := newPayload(testpb.PayloadType_COMPRESSABLE, smallSize)
1778         if err != nil {
1779                 t.Fatal(err)
1780         }
1781         req := &testpb.SimpleRequest{
1782                 ResponseType: testpb.PayloadType_COMPRESSABLE,
1783                 ResponseSize: int32(largeSize),
1784                 Payload:      smallPayload,
1785         }
1786         // Test for unary RPC recv.
1787         if _, err := tc.UnaryCall(context.Background(), req); err == nil || grpc.Code(err) != codes.ResourceExhausted {
1788                 t.Fatalf("TestService/UnaryCall(_, _) = _, %v, want _, error code: %s", err, codes.ResourceExhausted)
1789         }
1790
1791         respParam := []*testpb.ResponseParameters{
1792                 {
1793                         Size: int32(largeSize),
1794                 },
1795         }
1796         sreq := &testpb.StreamingOutputCallRequest{
1797                 ResponseType:       testpb.PayloadType_COMPRESSABLE,
1798                 ResponseParameters: respParam,
1799                 Payload:            smallPayload,
1800         }
1801
1802         // Test for streaming RPC recv.
1803         stream, err := tc.FullDuplexCall(te.ctx)
1804         if err != nil {
1805                 t.Fatalf("%v.FullDuplexCall(_) = _, %v, want <nil>", tc, err)
1806         }
1807         if err := stream.Send(sreq); err != nil {
1808                 t.Fatalf("%v.Send(%v) = %v, want <nil>", stream, sreq, err)
1809         }
1810         if _, err := stream.Recv(); err == nil || grpc.Code(err) != codes.ResourceExhausted {
1811                 t.Fatalf("%v.Recv() = _, %v, want _, error code: %s", stream, err, codes.ResourceExhausted)
1812         }
1813 }
1814
1815 func TestMaxMsgSizeClientAPI(t *testing.T) {
1816         defer leakcheck.Check(t)
1817         for _, e := range listTestEnv() {
1818                 testMaxMsgSizeClientAPI(t, e)
1819         }
1820 }
1821
1822 func testMaxMsgSizeClientAPI(t *testing.T, e env) {
1823         te := newTest(t, e)
1824         te.userAgent = testAppUA
1825         // To avoid error on server side.
1826         te.maxServerSendMsgSize = newInt(5 * 1024 * 1024)
1827         te.maxClientReceiveMsgSize = newInt(1024)
1828         te.maxClientSendMsgSize = newInt(1024)
1829         te.declareLogNoise(
1830                 "transport: http2Client.notifyError got notified that the client transport was broken EOF",
1831                 "grpc: addrConn.transportMonitor exits due to: grpc: the connection is closing",
1832                 "grpc: addrConn.resetTransport failed to create client transport: connection error",
1833                 "Failed to dial : context canceled; please retry.",
1834         )
1835         te.startServer(&testServer{security: e.security})
1836
1837         defer te.tearDown()
1838         tc := testpb.NewTestServiceClient(te.clientConn())
1839
1840         const smallSize = 1
1841         const largeSize = 1024
1842         smallPayload, err := newPayload(testpb.PayloadType_COMPRESSABLE, smallSize)
1843         if err != nil {
1844                 t.Fatal(err)
1845         }
1846
1847         largePayload, err := newPayload(testpb.PayloadType_COMPRESSABLE, largeSize)
1848         if err != nil {
1849                 t.Fatal(err)
1850         }
1851         req := &testpb.SimpleRequest{
1852                 ResponseType: testpb.PayloadType_COMPRESSABLE,
1853                 ResponseSize: int32(largeSize),
1854                 Payload:      smallPayload,
1855         }
1856         // Test for unary RPC recv.
1857         if _, err := tc.UnaryCall(context.Background(), req); err == nil || grpc.Code(err) != codes.ResourceExhausted {
1858                 t.Fatalf("TestService/UnaryCall(_, _) = _, %v, want _, error code: %s", err, codes.ResourceExhausted)
1859         }
1860
1861         // Test for unary RPC send.
1862         req.Payload = largePayload
1863         req.ResponseSize = int32(smallSize)
1864         if _, err := tc.UnaryCall(context.Background(), req); err == nil || grpc.Code(err) != codes.ResourceExhausted {
1865                 t.Fatalf("TestService/UnaryCall(_, _) = _, %v, want _, error code: %s", err, codes.ResourceExhausted)
1866         }
1867
1868         respParam := []*testpb.ResponseParameters{
1869                 {
1870                         Size: int32(largeSize),
1871                 },
1872         }
1873         sreq := &testpb.StreamingOutputCallRequest{
1874                 ResponseType:       testpb.PayloadType_COMPRESSABLE,
1875                 ResponseParameters: respParam,
1876                 Payload:            smallPayload,
1877         }
1878
1879         // Test for streaming RPC recv.
1880         stream, err := tc.FullDuplexCall(te.ctx)
1881         if err != nil {
1882                 t.Fatalf("%v.FullDuplexCall(_) = _, %v, want <nil>", tc, err)
1883         }
1884         if err := stream.Send(sreq); err != nil {
1885                 t.Fatalf("%v.Send(%v) = %v, want <nil>", stream, sreq, err)
1886         }
1887         if _, err := stream.Recv(); err == nil || grpc.Code(err) != codes.ResourceExhausted {
1888                 t.Fatalf("%v.Recv() = _, %v, want _, error code: %s", stream, err, codes.ResourceExhausted)
1889         }
1890
1891         // Test for streaming RPC send.
1892         respParam[0].Size = int32(smallSize)
1893         sreq.Payload = largePayload
1894         stream, err = tc.FullDuplexCall(te.ctx)
1895         if err != nil {
1896                 t.Fatalf("%v.FullDuplexCall(_) = _, %v, want <nil>", tc, err)
1897         }
1898         if err := stream.Send(sreq); err == nil || grpc.Code(err) != codes.ResourceExhausted {
1899                 t.Fatalf("%v.Send(%v) = %v, want _, error code: %s", stream, sreq, err, codes.ResourceExhausted)
1900         }
1901 }
1902
1903 func TestMaxMsgSizeServerAPI(t *testing.T) {
1904         defer leakcheck.Check(t)
1905         for _, e := range listTestEnv() {
1906                 testMaxMsgSizeServerAPI(t, e)
1907         }
1908 }
1909
1910 func testMaxMsgSizeServerAPI(t *testing.T, e env) {
1911         te := newTest(t, e)
1912         te.userAgent = testAppUA
1913         te.maxServerReceiveMsgSize = newInt(1024)
1914         te.maxServerSendMsgSize = newInt(1024)
1915         te.declareLogNoise(
1916                 "transport: http2Client.notifyError got notified that the client transport was broken EOF",
1917                 "grpc: addrConn.transportMonitor exits due to: grpc: the connection is closing",
1918                 "grpc: addrConn.resetTransport failed to create client transport: connection error",
1919                 "Failed to dial : context canceled; please retry.",
1920         )
1921         te.startServer(&testServer{security: e.security})
1922
1923         defer te.tearDown()
1924         tc := testpb.NewTestServiceClient(te.clientConn())
1925
1926         const smallSize = 1
1927         const largeSize = 1024
1928         smallPayload, err := newPayload(testpb.PayloadType_COMPRESSABLE, smallSize)
1929         if err != nil {
1930                 t.Fatal(err)
1931         }
1932
1933         largePayload, err := newPayload(testpb.PayloadType_COMPRESSABLE, largeSize)
1934         if err != nil {
1935                 t.Fatal(err)
1936         }
1937         req := &testpb.SimpleRequest{
1938                 ResponseType: testpb.PayloadType_COMPRESSABLE,
1939                 ResponseSize: int32(largeSize),
1940                 Payload:      smallPayload,
1941         }
1942         // Test for unary RPC send.
1943         if _, err := tc.UnaryCall(context.Background(), req); err == nil || grpc.Code(err) != codes.ResourceExhausted {
1944                 t.Fatalf("TestService/UnaryCall(_, _) = _, %v, want _, error code: %s", err, codes.ResourceExhausted)
1945         }
1946
1947         // Test for unary RPC recv.
1948         req.Payload = largePayload
1949         req.ResponseSize = int32(smallSize)
1950         if _, err := tc.UnaryCall(context.Background(), req); err == nil || grpc.Code(err) != codes.ResourceExhausted {
1951                 t.Fatalf("TestService/UnaryCall(_, _) = _, %v, want _, error code: %s", err, codes.ResourceExhausted)
1952         }
1953
1954         respParam := []*testpb.ResponseParameters{
1955                 {
1956                         Size: int32(largeSize),
1957                 },
1958         }
1959         sreq := &testpb.StreamingOutputCallRequest{
1960                 ResponseType:       testpb.PayloadType_COMPRESSABLE,
1961                 ResponseParameters: respParam,
1962                 Payload:            smallPayload,
1963         }
1964
1965         // Test for streaming RPC send.
1966         stream, err := tc.FullDuplexCall(te.ctx)
1967         if err != nil {
1968                 t.Fatalf("%v.FullDuplexCall(_) = _, %v, want <nil>", tc, err)
1969         }
1970         if err := stream.Send(sreq); err != nil {
1971                 t.Fatalf("%v.Send(%v) = %v, want <nil>", stream, sreq, err)
1972         }
1973         if _, err := stream.Recv(); err == nil || grpc.Code(err) != codes.ResourceExhausted {
1974                 t.Fatalf("%v.Recv() = _, %v, want _, error code: %s", stream, err, codes.ResourceExhausted)
1975         }
1976
1977         // Test for streaming RPC recv.
1978         respParam[0].Size = int32(smallSize)
1979         sreq.Payload = largePayload
1980         stream, err = tc.FullDuplexCall(te.ctx)
1981         if err != nil {
1982                 t.Fatalf("%v.FullDuplexCall(_) = _, %v, want <nil>", tc, err)
1983         }
1984         if err := stream.Send(sreq); err != nil {
1985                 t.Fatalf("%v.Send(%v) = %v, want <nil>", stream, sreq, err)
1986         }
1987         if _, err := stream.Recv(); err == nil || grpc.Code(err) != codes.ResourceExhausted {
1988                 t.Fatalf("%v.Recv() = _, %v, want _, error code: %s", stream, err, codes.ResourceExhausted)
1989         }
1990 }
1991
1992 func TestTap(t *testing.T) {
1993         defer leakcheck.Check(t)
1994         for _, e := range listTestEnv() {
1995                 if e.name == "handler-tls" {
1996                         continue
1997                 }
1998                 testTap(t, e)
1999         }
2000 }
2001
2002 type myTap struct {
2003         cnt int
2004 }
2005
2006 func (t *myTap) handle(ctx context.Context, info *tap.Info) (context.Context, error) {
2007         if info != nil {
2008                 if info.FullMethodName == "/grpc.testing.TestService/EmptyCall" {
2009                         t.cnt++
2010                 } else if info.FullMethodName == "/grpc.testing.TestService/UnaryCall" {
2011                         return nil, fmt.Errorf("tap error")
2012                 }
2013         }
2014         return ctx, nil
2015 }
2016
2017 func testTap(t *testing.T, e env) {
2018         te := newTest(t, e)
2019         te.userAgent = testAppUA
2020         ttap := &myTap{}
2021         te.tapHandle = ttap.handle
2022         te.declareLogNoise(
2023                 "transport: http2Client.notifyError got notified that the client transport was broken EOF",
2024                 "grpc: addrConn.transportMonitor exits due to: grpc: the connection is closing",
2025                 "grpc: addrConn.resetTransport failed to create client transport: connection error",
2026         )
2027         te.startServer(&testServer{security: e.security})
2028         defer te.tearDown()
2029
2030         cc := te.clientConn()
2031         tc := testpb.NewTestServiceClient(cc)
2032         if _, err := tc.EmptyCall(context.Background(), &testpb.Empty{}); err != nil {
2033                 t.Fatalf("TestService/EmptyCall(_, _) = _, %v, want _, <nil>", err)
2034         }
2035         if ttap.cnt != 1 {
2036                 t.Fatalf("Get the count in ttap %d, want 1", ttap.cnt)
2037         }
2038
2039         payload, err := newPayload(testpb.PayloadType_COMPRESSABLE, 31)
2040         if err != nil {
2041                 t.Fatal(err)
2042         }
2043
2044         req := &testpb.SimpleRequest{
2045                 ResponseType: testpb.PayloadType_COMPRESSABLE,
2046                 ResponseSize: 45,
2047                 Payload:      payload,
2048         }
2049         if _, err := tc.UnaryCall(context.Background(), req); grpc.Code(err) != codes.Unavailable {
2050                 t.Fatalf("TestService/UnaryCall(_, _) = _, %v, want _, %s", err, codes.Unavailable)
2051         }
2052 }
2053
2054 func healthCheck(d time.Duration, cc *grpc.ClientConn, serviceName string) (*healthpb.HealthCheckResponse, error) {
2055         ctx, cancel := context.WithTimeout(context.Background(), d)
2056         defer cancel()
2057         hc := healthpb.NewHealthClient(cc)
2058         req := &healthpb.HealthCheckRequest{
2059                 Service: serviceName,
2060         }
2061         return hc.Check(ctx, req)
2062 }
2063
2064 func TestHealthCheckOnSuccess(t *testing.T) {
2065         defer leakcheck.Check(t)
2066         for _, e := range listTestEnv() {
2067                 testHealthCheckOnSuccess(t, e)
2068         }
2069 }
2070
2071 func testHealthCheckOnSuccess(t *testing.T, e env) {
2072         te := newTest(t, e)
2073         hs := health.NewServer()
2074         hs.SetServingStatus("grpc.health.v1.Health", 1)
2075         te.healthServer = hs
2076         te.startServer(&testServer{security: e.security})
2077         defer te.tearDown()
2078
2079         cc := te.clientConn()
2080         if _, err := healthCheck(1*time.Second, cc, "grpc.health.v1.Health"); err != nil {
2081                 t.Fatalf("Health/Check(_, _) = _, %v, want _, <nil>", err)
2082         }
2083 }
2084
2085 func TestHealthCheckOnFailure(t *testing.T) {
2086         defer leakcheck.Check(t)
2087         for _, e := range listTestEnv() {
2088                 testHealthCheckOnFailure(t, e)
2089         }
2090 }
2091
2092 func testHealthCheckOnFailure(t *testing.T, e env) {
2093         defer leakcheck.Check(t)
2094         te := newTest(t, e)
2095         te.declareLogNoise(
2096                 "Failed to dial ",
2097                 "grpc: the client connection is closing; please retry",
2098         )
2099         hs := health.NewServer()
2100         hs.SetServingStatus("grpc.health.v1.HealthCheck", 1)
2101         te.healthServer = hs
2102         te.startServer(&testServer{security: e.security})
2103         defer te.tearDown()
2104
2105         cc := te.clientConn()
2106         wantErr := grpc.Errorf(codes.DeadlineExceeded, "context deadline exceeded")
2107         if _, err := healthCheck(0*time.Second, cc, "grpc.health.v1.Health"); !reflect.DeepEqual(err, wantErr) {
2108                 t.Fatalf("Health/Check(_, _) = _, %v, want _, error code %s", err, codes.DeadlineExceeded)
2109         }
2110         awaitNewConnLogOutput()
2111 }
2112
2113 func TestHealthCheckOff(t *testing.T) {
2114         defer leakcheck.Check(t)
2115         for _, e := range listTestEnv() {
2116                 // TODO(bradfitz): Temporarily skip this env due to #619.
2117                 if e.name == "handler-tls" {
2118                         continue
2119                 }
2120                 testHealthCheckOff(t, e)
2121         }
2122 }
2123
2124 func testHealthCheckOff(t *testing.T, e env) {
2125         te := newTest(t, e)
2126         te.startServer(&testServer{security: e.security})
2127         defer te.tearDown()
2128         want := grpc.Errorf(codes.Unimplemented, "unknown service grpc.health.v1.Health")
2129         if _, err := healthCheck(1*time.Second, te.clientConn(), ""); !reflect.DeepEqual(err, want) {
2130                 t.Fatalf("Health/Check(_, _) = _, %v, want _, %v", err, want)
2131         }
2132 }
2133
2134 func TestUnknownHandler(t *testing.T) {
2135         defer leakcheck.Check(t)
2136         // An example unknownHandler that returns a different code and a different method, making sure that we do not
2137         // expose what methods are implemented to a client that is not authenticated.
2138         unknownHandler := func(srv interface{}, stream grpc.ServerStream) error {
2139                 return grpc.Errorf(codes.Unauthenticated, "user unauthenticated")
2140         }
2141         for _, e := range listTestEnv() {
2142                 // TODO(bradfitz): Temporarily skip this env due to #619.
2143                 if e.name == "handler-tls" {
2144                         continue
2145                 }
2146                 testUnknownHandler(t, e, unknownHandler)
2147         }
2148 }
2149
2150 func testUnknownHandler(t *testing.T, e env, unknownHandler grpc.StreamHandler) {
2151         te := newTest(t, e)
2152         te.unknownHandler = unknownHandler
2153         te.startServer(&testServer{security: e.security})
2154         defer te.tearDown()
2155         want := grpc.Errorf(codes.Unauthenticated, "user unauthenticated")
2156         if _, err := healthCheck(1*time.Second, te.clientConn(), ""); !reflect.DeepEqual(err, want) {
2157                 t.Fatalf("Health/Check(_, _) = _, %v, want _, %v", err, want)
2158         }
2159 }
2160
2161 func TestHealthCheckServingStatus(t *testing.T) {
2162         defer leakcheck.Check(t)
2163         for _, e := range listTestEnv() {
2164                 testHealthCheckServingStatus(t, e)
2165         }
2166 }
2167
2168 func testHealthCheckServingStatus(t *testing.T, e env) {
2169         te := newTest(t, e)
2170         hs := health.NewServer()
2171         te.healthServer = hs
2172         te.startServer(&testServer{security: e.security})
2173         defer te.tearDown()
2174
2175         cc := te.clientConn()
2176         out, err := healthCheck(1*time.Second, cc, "")
2177         if err != nil {
2178                 t.Fatalf("Health/Check(_, _) = _, %v, want _, <nil>", err)
2179         }
2180         if out.Status != healthpb.HealthCheckResponse_SERVING {
2181                 t.Fatalf("Got the serving status %v, want SERVING", out.Status)
2182         }
2183         wantErr := grpc.Errorf(codes.NotFound, "unknown service")
2184         if _, err := healthCheck(1*time.Second, cc, "grpc.health.v1.Health"); !reflect.DeepEqual(err, wantErr) {
2185                 t.Fatalf("Health/Check(_, _) = _, %v, want _, error code %s", err, codes.NotFound)
2186         }
2187         hs.SetServingStatus("grpc.health.v1.Health", healthpb.HealthCheckResponse_SERVING)
2188         out, err = healthCheck(1*time.Second, cc, "grpc.health.v1.Health")
2189         if err != nil {
2190                 t.Fatalf("Health/Check(_, _) = _, %v, want _, <nil>", err)
2191         }
2192         if out.Status != healthpb.HealthCheckResponse_SERVING {
2193                 t.Fatalf("Got the serving status %v, want SERVING", out.Status)
2194         }
2195         hs.SetServingStatus("grpc.health.v1.Health", healthpb.HealthCheckResponse_NOT_SERVING)
2196         out, err = healthCheck(1*time.Second, cc, "grpc.health.v1.Health")
2197         if err != nil {
2198                 t.Fatalf("Health/Check(_, _) = _, %v, want _, <nil>", err)
2199         }
2200         if out.Status != healthpb.HealthCheckResponse_NOT_SERVING {
2201                 t.Fatalf("Got the serving status %v, want NOT_SERVING", out.Status)
2202         }
2203
2204 }
2205
2206 func TestErrorChanNoIO(t *testing.T) {
2207         defer leakcheck.Check(t)
2208         for _, e := range listTestEnv() {
2209                 testErrorChanNoIO(t, e)
2210         }
2211 }
2212
2213 func testErrorChanNoIO(t *testing.T, e env) {
2214         te := newTest(t, e)
2215         te.startServer(&testServer{security: e.security})
2216         defer te.tearDown()
2217
2218         tc := testpb.NewTestServiceClient(te.clientConn())
2219         if _, err := tc.FullDuplexCall(context.Background()); err != nil {
2220                 t.Fatalf("%v.FullDuplexCall(_) = _, %v, want <nil>", tc, err)
2221         }
2222 }
2223
2224 func TestEmptyUnaryWithUserAgent(t *testing.T) {
2225         defer leakcheck.Check(t)
2226         for _, e := range listTestEnv() {
2227                 testEmptyUnaryWithUserAgent(t, e)
2228         }
2229 }
2230
2231 func testEmptyUnaryWithUserAgent(t *testing.T, e env) {
2232         te := newTest(t, e)
2233         te.userAgent = testAppUA
2234         te.startServer(&testServer{security: e.security})
2235         defer te.tearDown()
2236
2237         cc := te.clientConn()
2238         tc := testpb.NewTestServiceClient(cc)
2239         var header metadata.MD
2240         reply, err := tc.EmptyCall(context.Background(), &testpb.Empty{}, grpc.Header(&header))
2241         if err != nil || !proto.Equal(&testpb.Empty{}, reply) {
2242                 t.Fatalf("TestService/EmptyCall(_, _) = %v, %v, want %v, <nil>", reply, err, &testpb.Empty{})
2243         }
2244         if v, ok := header["ua"]; !ok || !strings.HasPrefix(v[0], testAppUA) {
2245                 t.Fatalf("header[\"ua\"] = %q, %t, want string with prefix %q, true", v, ok, testAppUA)
2246         }
2247
2248         te.srv.Stop()
2249 }
2250
2251 func TestFailedEmptyUnary(t *testing.T) {
2252         defer leakcheck.Check(t)
2253         for _, e := range listTestEnv() {
2254                 if e.name == "handler-tls" {
2255                         // This test covers status details, but
2256                         // Grpc-Status-Details-Bin is not support in handler_server.
2257                         continue
2258                 }
2259                 testFailedEmptyUnary(t, e)
2260         }
2261 }
2262
2263 func testFailedEmptyUnary(t *testing.T, e env) {
2264         te := newTest(t, e)
2265         te.userAgent = failAppUA
2266         te.startServer(&testServer{security: e.security})
2267         defer te.tearDown()
2268         tc := testpb.NewTestServiceClient(te.clientConn())
2269
2270         ctx := metadata.NewOutgoingContext(context.Background(), testMetadata)
2271         wantErr := detailedError
2272         if _, err := tc.EmptyCall(ctx, &testpb.Empty{}); !reflect.DeepEqual(err, wantErr) {
2273                 t.Fatalf("TestService/EmptyCall(_, _) = _, %v, want _, %v", err, wantErr)
2274         }
2275 }
2276
2277 func TestLargeUnary(t *testing.T) {
2278         defer leakcheck.Check(t)
2279         for _, e := range listTestEnv() {
2280                 testLargeUnary(t, e)
2281         }
2282 }
2283
2284 func testLargeUnary(t *testing.T, e env) {
2285         te := newTest(t, e)
2286         te.startServer(&testServer{security: e.security})
2287         defer te.tearDown()
2288         tc := testpb.NewTestServiceClient(te.clientConn())
2289
2290         const argSize = 271828
2291         const respSize = 314159
2292
2293         payload, err := newPayload(testpb.PayloadType_COMPRESSABLE, argSize)
2294         if err != nil {
2295                 t.Fatal(err)
2296         }
2297
2298         req := &testpb.SimpleRequest{
2299                 ResponseType: testpb.PayloadType_COMPRESSABLE,
2300                 ResponseSize: respSize,
2301                 Payload:      payload,
2302         }
2303         reply, err := tc.UnaryCall(context.Background(), req)
2304         if err != nil {
2305                 t.Fatalf("TestService/UnaryCall(_, _) = _, %v, want _, <nil>", err)
2306         }
2307         pt := reply.GetPayload().GetType()
2308         ps := len(reply.GetPayload().GetBody())
2309         if pt != testpb.PayloadType_COMPRESSABLE || ps != respSize {
2310                 t.Fatalf("Got the reply with type %d len %d; want %d, %d", pt, ps, testpb.PayloadType_COMPRESSABLE, respSize)
2311         }
2312 }
2313
2314 // Test backward-compatibility API for setting msg size limit.
2315 func TestExceedMsgLimit(t *testing.T) {
2316         defer leakcheck.Check(t)
2317         for _, e := range listTestEnv() {
2318                 testExceedMsgLimit(t, e)
2319         }
2320 }
2321
2322 func testExceedMsgLimit(t *testing.T, e env) {
2323         te := newTest(t, e)
2324         te.maxMsgSize = newInt(1024)
2325         te.startServer(&testServer{security: e.security})
2326         defer te.tearDown()
2327         tc := testpb.NewTestServiceClient(te.clientConn())
2328
2329         argSize := int32(*te.maxMsgSize + 1)
2330         const smallSize = 1
2331
2332         payload, err := newPayload(testpb.PayloadType_COMPRESSABLE, argSize)
2333         if err != nil {
2334                 t.Fatal(err)
2335         }
2336         smallPayload, err := newPayload(testpb.PayloadType_COMPRESSABLE, smallSize)
2337         if err != nil {
2338                 t.Fatal(err)
2339         }
2340
2341         // Test on server side for unary RPC.
2342         req := &testpb.SimpleRequest{
2343                 ResponseType: testpb.PayloadType_COMPRESSABLE,
2344                 ResponseSize: smallSize,
2345                 Payload:      payload,
2346         }
2347         if _, err := tc.UnaryCall(context.Background(), req); err == nil || grpc.Code(err) != codes.ResourceExhausted {
2348                 t.Fatalf("TestService/UnaryCall(_, _) = _, %v, want _, error code: %s", err, codes.ResourceExhausted)
2349         }
2350         // Test on client side for unary RPC.
2351         req.ResponseSize = int32(*te.maxMsgSize) + 1
2352         req.Payload = smallPayload
2353         if _, err := tc.UnaryCall(context.Background(), req); err == nil || grpc.Code(err) != codes.ResourceExhausted {
2354                 t.Fatalf("TestService/UnaryCall(_, _) = _, %v, want _, error code: %s", err, codes.ResourceExhausted)
2355         }
2356
2357         // Test on server side for streaming RPC.
2358         stream, err := tc.FullDuplexCall(te.ctx)
2359         if err != nil {
2360                 t.Fatalf("%v.FullDuplexCall(_) = _, %v, want <nil>", tc, err)
2361         }
2362         respParam := []*testpb.ResponseParameters{
2363                 {
2364                         Size: 1,
2365                 },
2366         }
2367
2368         spayload, err := newPayload(testpb.PayloadType_COMPRESSABLE, int32(*te.maxMsgSize+1))
2369         if err != nil {
2370                 t.Fatal(err)
2371         }
2372
2373         sreq := &testpb.StreamingOutputCallRequest{
2374                 ResponseType:       testpb.PayloadType_COMPRESSABLE,
2375                 ResponseParameters: respParam,
2376                 Payload:            spayload,
2377         }
2378         if err := stream.Send(sreq); err != nil {
2379                 t.Fatalf("%v.Send(%v) = %v, want <nil>", stream, sreq, err)
2380         }
2381         if _, err := stream.Recv(); err == nil || grpc.Code(err) != codes.ResourceExhausted {
2382                 t.Fatalf("%v.Recv() = _, %v, want _, error code: %s", stream, err, codes.ResourceExhausted)
2383         }
2384
2385         // Test on client side for streaming RPC.
2386         stream, err = tc.FullDuplexCall(te.ctx)
2387         if err != nil {
2388                 t.Fatalf("%v.FullDuplexCall(_) = _, %v, want <nil>", tc, err)
2389         }
2390         respParam[0].Size = int32(*te.maxMsgSize) + 1
2391         sreq.Payload = smallPayload
2392         if err := stream.Send(sreq); err != nil {
2393                 t.Fatalf("%v.Send(%v) = %v, want <nil>", stream, sreq, err)
2394         }
2395         if _, err := stream.Recv(); err == nil || grpc.Code(err) != codes.ResourceExhausted {
2396                 t.Fatalf("%v.Recv() = _, %v, want _, error code: %s", stream, err, codes.ResourceExhausted)
2397         }
2398
2399 }
2400
2401 func TestPeerClientSide(t *testing.T) {
2402         defer leakcheck.Check(t)
2403         for _, e := range listTestEnv() {
2404                 testPeerClientSide(t, e)
2405         }
2406 }
2407
2408 func testPeerClientSide(t *testing.T, e env) {
2409         te := newTest(t, e)
2410         te.userAgent = testAppUA
2411         te.startServer(&testServer{security: e.security})
2412         defer te.tearDown()
2413         tc := testpb.NewTestServiceClient(te.clientConn())
2414         peer := new(peer.Peer)
2415         if _, err := tc.EmptyCall(context.Background(), &testpb.Empty{}, grpc.Peer(peer), grpc.FailFast(false)); err != nil {
2416                 t.Fatalf("TestService/EmptyCall(_, _) = _, %v, want _, <nil>", err)
2417         }
2418         pa := peer.Addr.String()
2419         if e.network == "unix" {
2420                 if pa != te.srvAddr {
2421                         t.Fatalf("peer.Addr = %v, want %v", pa, te.srvAddr)
2422                 }
2423                 return
2424         }
2425         _, pp, err := net.SplitHostPort(pa)
2426         if err != nil {
2427                 t.Fatalf("Failed to parse address from peer.")
2428         }
2429         _, sp, err := net.SplitHostPort(te.srvAddr)
2430         if err != nil {
2431                 t.Fatalf("Failed to parse address of test server.")
2432         }
2433         if pp != sp {
2434                 t.Fatalf("peer.Addr = localhost:%v, want localhost:%v", pp, sp)
2435         }
2436 }
2437
2438 // TestPeerNegative tests that if call fails setting peer
2439 // doesn't cause a segmentation fault.
2440 // issue#1141 https://github.com/grpc/grpc-go/issues/1141
2441 func TestPeerNegative(t *testing.T) {
2442         defer leakcheck.Check(t)
2443         for _, e := range listTestEnv() {
2444                 testPeerNegative(t, e)
2445         }
2446 }
2447
2448 func testPeerNegative(t *testing.T, e env) {
2449         te := newTest(t, e)
2450         te.startServer(&testServer{security: e.security})
2451         defer te.tearDown()
2452
2453         cc := te.clientConn()
2454         tc := testpb.NewTestServiceClient(cc)
2455         peer := new(peer.Peer)
2456         ctx, cancel := context.WithCancel(context.Background())
2457         cancel()
2458         tc.EmptyCall(ctx, &testpb.Empty{}, grpc.Peer(peer))
2459 }
2460
2461 func TestPeerFailedRPC(t *testing.T) {
2462         defer leakcheck.Check(t)
2463         for _, e := range listTestEnv() {
2464                 testPeerFailedRPC(t, e)
2465         }
2466 }
2467
2468 func testPeerFailedRPC(t *testing.T, e env) {
2469         te := newTest(t, e)
2470         te.maxServerReceiveMsgSize = newInt(1 * 1024)
2471         te.startServer(&testServer{security: e.security})
2472
2473         defer te.tearDown()
2474         tc := testpb.NewTestServiceClient(te.clientConn())
2475
2476         // first make a successful request to the server
2477         if _, err := tc.EmptyCall(context.Background(), &testpb.Empty{}); err != nil {
2478                 t.Fatalf("TestService/EmptyCall(_, _) = _, %v, want _, <nil>", err)
2479         }
2480
2481         // make a second request that will be rejected by the server
2482         const largeSize = 5 * 1024
2483         largePayload, err := newPayload(testpb.PayloadType_COMPRESSABLE, largeSize)
2484         if err != nil {
2485                 t.Fatal(err)
2486         }
2487         req := &testpb.SimpleRequest{
2488                 ResponseType: testpb.PayloadType_COMPRESSABLE,
2489                 Payload:      largePayload,
2490         }
2491
2492         peer := new(peer.Peer)
2493         if _, err := tc.UnaryCall(context.Background(), req, grpc.Peer(peer)); err == nil || grpc.Code(err) != codes.ResourceExhausted {
2494                 t.Fatalf("TestService/UnaryCall(_, _) = _, %v, want _, error code: %s", err, codes.ResourceExhausted)
2495         } else {
2496                 pa := peer.Addr.String()
2497                 if e.network == "unix" {
2498                         if pa != te.srvAddr {
2499                                 t.Fatalf("peer.Addr = %v, want %v", pa, te.srvAddr)
2500                         }
2501                         return
2502                 }
2503                 _, pp, err := net.SplitHostPort(pa)
2504                 if err != nil {
2505                         t.Fatalf("Failed to parse address from peer.")
2506                 }
2507                 _, sp, err := net.SplitHostPort(te.srvAddr)
2508                 if err != nil {
2509                         t.Fatalf("Failed to parse address of test server.")
2510                 }
2511                 if pp != sp {
2512                         t.Fatalf("peer.Addr = localhost:%v, want localhost:%v", pp, sp)
2513                 }
2514         }
2515 }
2516
2517 func TestMetadataUnaryRPC(t *testing.T) {
2518         defer leakcheck.Check(t)
2519         for _, e := range listTestEnv() {
2520                 testMetadataUnaryRPC(t, e)
2521         }
2522 }
2523
2524 func testMetadataUnaryRPC(t *testing.T, e env) {
2525         te := newTest(t, e)
2526         te.startServer(&testServer{security: e.security})
2527         defer te.tearDown()
2528         tc := testpb.NewTestServiceClient(te.clientConn())
2529
2530         const argSize = 2718
2531         const respSize = 314
2532
2533         payload, err := newPayload(testpb.PayloadType_COMPRESSABLE, argSize)
2534         if err != nil {
2535                 t.Fatal(err)
2536         }
2537
2538         req := &testpb.SimpleRequest{
2539                 ResponseType: testpb.PayloadType_COMPRESSABLE,
2540                 ResponseSize: respSize,
2541                 Payload:      payload,
2542         }
2543         var header, trailer metadata.MD
2544         ctx := metadata.NewOutgoingContext(context.Background(), testMetadata)
2545         if _, err := tc.UnaryCall(ctx, req, grpc.Header(&header), grpc.Trailer(&trailer)); err != nil {
2546                 t.Fatalf("TestService.UnaryCall(%v, _, _, _) = _, %v; want _, <nil>", ctx, err)
2547         }
2548         // Ignore optional response headers that Servers may set:
2549         if header != nil {
2550                 delete(header, "trailer") // RFC 2616 says server SHOULD (but optional) declare trailers
2551                 delete(header, "date")    // the Date header is also optional
2552                 delete(header, "user-agent")
2553         }
2554         if !reflect.DeepEqual(header, testMetadata) {
2555                 t.Fatalf("Received header metadata %v, want %v", header, testMetadata)
2556         }
2557         if !reflect.DeepEqual(trailer, testTrailerMetadata) {
2558                 t.Fatalf("Received trailer metadata %v, want %v", trailer, testTrailerMetadata)
2559         }
2560 }
2561
2562 func TestMultipleSetTrailerUnaryRPC(t *testing.T) {
2563         defer leakcheck.Check(t)
2564         for _, e := range listTestEnv() {
2565                 testMultipleSetTrailerUnaryRPC(t, e)
2566         }
2567 }
2568
2569 func testMultipleSetTrailerUnaryRPC(t *testing.T, e env) {
2570         te := newTest(t, e)
2571         te.startServer(&testServer{security: e.security, multipleSetTrailer: true})
2572         defer te.tearDown()
2573         tc := testpb.NewTestServiceClient(te.clientConn())
2574
2575         const (
2576                 argSize  = 1
2577                 respSize = 1
2578         )
2579         payload, err := newPayload(testpb.PayloadType_COMPRESSABLE, argSize)
2580         if err != nil {
2581                 t.Fatal(err)
2582         }
2583
2584         req := &testpb.SimpleRequest{
2585                 ResponseType: testpb.PayloadType_COMPRESSABLE,
2586                 ResponseSize: respSize,
2587                 Payload:      payload,
2588         }
2589         var trailer metadata.MD
2590         ctx := metadata.NewOutgoingContext(context.Background(), testMetadata)
2591         if _, err := tc.UnaryCall(ctx, req, grpc.Trailer(&trailer), grpc.FailFast(false)); err != nil {
2592                 t.Fatalf("TestService.UnaryCall(%v, _, _, _) = _, %v; want _, <nil>", ctx, err)
2593         }
2594         expectedTrailer := metadata.Join(testTrailerMetadata, testTrailerMetadata2)
2595         if !reflect.DeepEqual(trailer, expectedTrailer) {
2596                 t.Fatalf("Received trailer metadata %v, want %v", trailer, expectedTrailer)
2597         }
2598 }
2599
2600 func TestMultipleSetTrailerStreamingRPC(t *testing.T) {
2601         defer leakcheck.Check(t)
2602         for _, e := range listTestEnv() {
2603                 testMultipleSetTrailerStreamingRPC(t, e)
2604         }
2605 }
2606
2607 func testMultipleSetTrailerStreamingRPC(t *testing.T, e env) {
2608         te := newTest(t, e)
2609         te.startServer(&testServer{security: e.security, multipleSetTrailer: true})
2610         defer te.tearDown()
2611         tc := testpb.NewTestServiceClient(te.clientConn())
2612
2613         ctx := metadata.NewOutgoingContext(context.Background(), testMetadata)
2614         stream, err := tc.FullDuplexCall(ctx, grpc.FailFast(false))
2615         if err != nil {
2616                 t.Fatalf("%v.FullDuplexCall(_) = _, %v, want <nil>", tc, err)
2617         }
2618         if err := stream.CloseSend(); err != nil {
2619                 t.Fatalf("%v.CloseSend() got %v, want %v", stream, err, nil)
2620         }
2621         if _, err := stream.Recv(); err != io.EOF {
2622                 t.Fatalf("%v failed to complele the FullDuplexCall: %v", stream, err)
2623         }
2624
2625         trailer := stream.Trailer()
2626         expectedTrailer := metadata.Join(testTrailerMetadata, testTrailerMetadata2)
2627         if !reflect.DeepEqual(trailer, expectedTrailer) {
2628                 t.Fatalf("Received trailer metadata %v, want %v", trailer, expectedTrailer)
2629         }
2630 }
2631
2632 func TestSetAndSendHeaderUnaryRPC(t *testing.T) {
2633         defer leakcheck.Check(t)
2634         for _, e := range listTestEnv() {
2635                 if e.name == "handler-tls" {
2636                         continue
2637                 }
2638                 testSetAndSendHeaderUnaryRPC(t, e)
2639         }
2640 }
2641
2642 // To test header metadata is sent on SendHeader().
2643 func testSetAndSendHeaderUnaryRPC(t *testing.T, e env) {
2644         te := newTest(t, e)
2645         te.startServer(&testServer{security: e.security, setAndSendHeader: true})
2646         defer te.tearDown()
2647         tc := testpb.NewTestServiceClient(te.clientConn())
2648
2649         const (
2650                 argSize  = 1
2651                 respSize = 1
2652         )
2653         payload, err := newPayload(testpb.PayloadType_COMPRESSABLE, argSize)
2654         if err != nil {
2655                 t.Fatal(err)
2656         }
2657
2658         req := &testpb.SimpleRequest{
2659                 ResponseType: testpb.PayloadType_COMPRESSABLE,
2660                 ResponseSize: respSize,
2661                 Payload:      payload,
2662         }
2663         var header metadata.MD
2664         ctx := metadata.NewOutgoingContext(context.Background(), testMetadata)
2665         if _, err := tc.UnaryCall(ctx, req, grpc.Header(&header), grpc.FailFast(false)); err != nil {
2666                 t.Fatalf("TestService.UnaryCall(%v, _, _, _) = _, %v; want _, <nil>", ctx, err)
2667         }
2668         delete(header, "user-agent")
2669         expectedHeader := metadata.Join(testMetadata, testMetadata2)
2670         if !reflect.DeepEqual(header, expectedHeader) {
2671                 t.Fatalf("Received header metadata %v, want %v", header, expectedHeader)
2672         }
2673 }
2674
2675 func TestMultipleSetHeaderUnaryRPC(t *testing.T) {
2676         defer leakcheck.Check(t)
2677         for _, e := range listTestEnv() {
2678                 if e.name == "handler-tls" {
2679                         continue
2680                 }
2681                 testMultipleSetHeaderUnaryRPC(t, e)
2682         }
2683 }
2684
2685 // To test header metadata is sent when sending response.
2686 func testMultipleSetHeaderUnaryRPC(t *testing.T, e env) {
2687         te := newTest(t, e)
2688         te.startServer(&testServer{security: e.security, setHeaderOnly: true})
2689         defer te.tearDown()
2690         tc := testpb.NewTestServiceClient(te.clientConn())
2691
2692         const (
2693                 argSize  = 1
2694                 respSize = 1
2695         )
2696         payload, err := newPayload(testpb.PayloadType_COMPRESSABLE, argSize)
2697         if err != nil {
2698                 t.Fatal(err)
2699         }
2700
2701         req := &testpb.SimpleRequest{
2702                 ResponseType: testpb.PayloadType_COMPRESSABLE,
2703                 ResponseSize: respSize,
2704                 Payload:      payload,
2705         }
2706
2707         var header metadata.MD
2708         ctx := metadata.NewOutgoingContext(context.Background(), testMetadata)
2709         if _, err := tc.UnaryCall(ctx, req, grpc.Header(&header), grpc.FailFast(false)); err != nil {
2710                 t.Fatalf("TestService.UnaryCall(%v, _, _, _) = _, %v; want _, <nil>", ctx, err)
2711         }
2712         delete(header, "user-agent")
2713         expectedHeader := metadata.Join(testMetadata, testMetadata2)
2714         if !reflect.DeepEqual(header, expectedHeader) {
2715                 t.Fatalf("Received header metadata %v, want %v", header, expectedHeader)
2716         }
2717 }
2718
2719 func TestMultipleSetHeaderUnaryRPCError(t *testing.T) {
2720         defer leakcheck.Check(t)
2721         for _, e := range listTestEnv() {
2722                 if e.name == "handler-tls" {
2723                         continue
2724                 }
2725                 testMultipleSetHeaderUnaryRPCError(t, e)
2726         }
2727 }
2728
2729 // To test header metadata is sent when sending status.
2730 func testMultipleSetHeaderUnaryRPCError(t *testing.T, e env) {
2731         te := newTest(t, e)
2732         te.startServer(&testServer{security: e.security, setHeaderOnly: true})
2733         defer te.tearDown()
2734         tc := testpb.NewTestServiceClient(te.clientConn())
2735
2736         const (
2737                 argSize  = 1
2738                 respSize = -1 // Invalid respSize to make RPC fail.
2739         )
2740         payload, err := newPayload(testpb.PayloadType_COMPRESSABLE, argSize)
2741         if err != nil {
2742                 t.Fatal(err)
2743         }
2744
2745         req := &testpb.SimpleRequest{
2746                 ResponseType: testpb.PayloadType_COMPRESSABLE,
2747                 ResponseSize: respSize,
2748                 Payload:      payload,
2749         }
2750         var header metadata.MD
2751         ctx := metadata.NewOutgoingContext(context.Background(), testMetadata)
2752         if _, err := tc.UnaryCall(ctx, req, grpc.Header(&header), grpc.FailFast(false)); err == nil {
2753                 t.Fatalf("TestService.UnaryCall(%v, _, _, _) = _, %v; want _, <non-nil>", ctx, err)
2754         }
2755         delete(header, "user-agent")
2756         expectedHeader := metadata.Join(testMetadata, testMetadata2)
2757         if !reflect.DeepEqual(header, expectedHeader) {
2758                 t.Fatalf("Received header metadata %v, want %v", header, expectedHeader)
2759         }
2760 }
2761
2762 func TestSetAndSendHeaderStreamingRPC(t *testing.T) {
2763         defer leakcheck.Check(t)
2764         for _, e := range listTestEnv() {
2765                 if e.name == "handler-tls" {
2766                         continue
2767                 }
2768                 testSetAndSendHeaderStreamingRPC(t, e)
2769         }
2770 }
2771
2772 // To test header metadata is sent on SendHeader().
2773 func testSetAndSendHeaderStreamingRPC(t *testing.T, e env) {
2774         te := newTest(t, e)
2775         te.startServer(&testServer{security: e.security, setAndSendHeader: true})
2776         defer te.tearDown()
2777         tc := testpb.NewTestServiceClient(te.clientConn())
2778
2779         const (
2780                 argSize  = 1
2781                 respSize = 1
2782         )
2783         ctx := metadata.NewOutgoingContext(context.Background(), testMetadata)
2784         stream, err := tc.FullDuplexCall(ctx)
2785         if err != nil {
2786                 t.Fatalf("%v.FullDuplexCall(_) = _, %v, want <nil>", tc, err)
2787         }
2788         if err := stream.CloseSend(); err != nil {
2789                 t.Fatalf("%v.CloseSend() got %v, want %v", stream, err, nil)
2790         }
2791         if _, err := stream.Recv(); err != io.EOF {
2792                 t.Fatalf("%v failed to complele the FullDuplexCall: %v", stream, err)
2793         }
2794
2795         header, err := stream.Header()
2796         if err != nil {
2797                 t.Fatalf("%v.Header() = _, %v, want _, <nil>", stream, err)
2798         }
2799         delete(header, "user-agent")
2800         expectedHeader := metadata.Join(testMetadata, testMetadata2)
2801         if !reflect.DeepEqual(header, expectedHeader) {
2802                 t.Fatalf("Received header metadata %v, want %v", header, expectedHeader)
2803         }
2804 }
2805
2806 func TestMultipleSetHeaderStreamingRPC(t *testing.T) {
2807         defer leakcheck.Check(t)
2808         for _, e := range listTestEnv() {
2809                 if e.name == "handler-tls" {
2810                         continue
2811                 }
2812                 testMultipleSetHeaderStreamingRPC(t, e)
2813         }
2814 }
2815
2816 // To test header metadata is sent when sending response.
2817 func testMultipleSetHeaderStreamingRPC(t *testing.T, e env) {
2818         te := newTest(t, e)
2819         te.startServer(&testServer{security: e.security, setHeaderOnly: true})
2820         defer te.tearDown()
2821         tc := testpb.NewTestServiceClient(te.clientConn())
2822
2823         const (
2824                 argSize  = 1
2825                 respSize = 1
2826         )
2827         ctx := metadata.NewOutgoingContext(context.Background(), testMetadata)
2828         stream, err := tc.FullDuplexCall(ctx)
2829         if err != nil {
2830                 t.Fatalf("%v.FullDuplexCall(_) = _, %v, want <nil>", tc, err)
2831         }
2832
2833         payload, err := newPayload(testpb.PayloadType_COMPRESSABLE, argSize)
2834         if err != nil {
2835                 t.Fatal(err)
2836         }
2837
2838         req := &testpb.StreamingOutputCallRequest{
2839                 ResponseType: testpb.PayloadType_COMPRESSABLE,
2840                 ResponseParameters: []*testpb.ResponseParameters{
2841                         {Size: respSize},
2842                 },
2843                 Payload: payload,
2844         }
2845         if err := stream.Send(req); err != nil {
2846                 t.Fatalf("%v.Send(%v) = %v, want <nil>", stream, req, err)
2847         }
2848         if _, err := stream.Recv(); err != nil {
2849                 t.Fatalf("%v.Recv() = %v, want <nil>", stream, err)
2850         }
2851         if err := stream.CloseSend(); err != nil {
2852                 t.Fatalf("%v.CloseSend() got %v, want %v", stream, err, nil)
2853         }
2854         if _, err := stream.Recv(); err != io.EOF {
2855                 t.Fatalf("%v failed to complele the FullDuplexCall: %v", stream, err)
2856         }
2857
2858         header, err := stream.Header()
2859         if err != nil {
2860                 t.Fatalf("%v.Header() = _, %v, want _, <nil>", stream, err)
2861         }
2862         delete(header, "user-agent")
2863         expectedHeader := metadata.Join(testMetadata, testMetadata2)
2864         if !reflect.DeepEqual(header, expectedHeader) {
2865                 t.Fatalf("Received header metadata %v, want %v", header, expectedHeader)
2866         }
2867
2868 }
2869
2870 func TestMultipleSetHeaderStreamingRPCError(t *testing.T) {
2871         defer leakcheck.Check(t)
2872         for _, e := range listTestEnv() {
2873                 if e.name == "handler-tls" {
2874                         continue
2875                 }
2876                 testMultipleSetHeaderStreamingRPCError(t, e)
2877         }
2878 }
2879
2880 // To test header metadata is sent when sending status.
2881 func testMultipleSetHeaderStreamingRPCError(t *testing.T, e env) {
2882         te := newTest(t, e)
2883         te.startServer(&testServer{security: e.security, setHeaderOnly: true})
2884         defer te.tearDown()
2885         tc := testpb.NewTestServiceClient(te.clientConn())
2886
2887         const (
2888                 argSize  = 1
2889                 respSize = -1
2890         )
2891         ctx := metadata.NewOutgoingContext(context.Background(), testMetadata)
2892         stream, err := tc.FullDuplexCall(ctx)
2893         if err != nil {
2894                 t.Fatalf("%v.FullDuplexCall(_) = _, %v, want <nil>", tc, err)
2895         }
2896
2897         payload, err := newPayload(testpb.PayloadType_COMPRESSABLE, argSize)
2898         if err != nil {
2899                 t.Fatal(err)
2900         }
2901
2902         req := &testpb.StreamingOutputCallRequest{
2903                 ResponseType: testpb.PayloadType_COMPRESSABLE,
2904                 ResponseParameters: []*testpb.ResponseParameters{
2905                         {Size: respSize},
2906                 },
2907                 Payload: payload,
2908         }
2909         if err := stream.Send(req); err != nil {
2910                 t.Fatalf("%v.Send(%v) = %v, want <nil>", stream, req, err)
2911         }
2912         if _, err := stream.Recv(); err == nil {
2913                 t.Fatalf("%v.Recv() = %v, want <non-nil>", stream, err)
2914         }
2915
2916         header, err := stream.Header()
2917         if err != nil {
2918                 t.Fatalf("%v.Header() = _, %v, want _, <nil>", stream, err)
2919         }
2920         delete(header, "user-agent")
2921         expectedHeader := metadata.Join(testMetadata, testMetadata2)
2922         if !reflect.DeepEqual(header, expectedHeader) {
2923                 t.Fatalf("Received header metadata %v, want %v", header, expectedHeader)
2924         }
2925
2926         if err := stream.CloseSend(); err != nil {
2927                 t.Fatalf("%v.CloseSend() got %v, want %v", stream, err, nil)
2928         }
2929 }
2930
2931 // TestMalformedHTTP2Metedata verfies the returned error when the client
2932 // sends an illegal metadata.
2933 func TestMalformedHTTP2Metadata(t *testing.T) {
2934         defer leakcheck.Check(t)
2935         for _, e := range listTestEnv() {
2936                 if e.name == "handler-tls" {
2937                         // Failed with "server stops accepting new RPCs".
2938                         // Server stops accepting new RPCs when the client sends an illegal http2 header.
2939                         continue
2940                 }
2941                 testMalformedHTTP2Metadata(t, e)
2942         }
2943 }
2944
2945 func testMalformedHTTP2Metadata(t *testing.T, e env) {
2946         te := newTest(t, e)
2947         te.startServer(&testServer{security: e.security})
2948         defer te.tearDown()
2949         tc := testpb.NewTestServiceClient(te.clientConn())
2950
2951         payload, err := newPayload(testpb.PayloadType_COMPRESSABLE, 2718)
2952         if err != nil {
2953                 t.Fatal(err)
2954         }
2955
2956         req := &testpb.SimpleRequest{
2957                 ResponseType: testpb.PayloadType_COMPRESSABLE,
2958                 ResponseSize: 314,
2959                 Payload:      payload,
2960         }
2961         ctx := metadata.NewOutgoingContext(context.Background(), malformedHTTP2Metadata)
2962         if _, err := tc.UnaryCall(ctx, req); grpc.Code(err) != codes.Internal {
2963                 t.Fatalf("TestService.UnaryCall(%v, _) = _, %v; want _, %s", ctx, err, codes.Internal)
2964         }
2965 }
2966
2967 func performOneRPC(t *testing.T, tc testpb.TestServiceClient, wg *sync.WaitGroup) {
2968         defer wg.Done()
2969         const argSize = 2718
2970         const respSize = 314
2971
2972         payload, err := newPayload(testpb.PayloadType_COMPRESSABLE, argSize)
2973         if err != nil {
2974                 t.Error(err)
2975                 return
2976         }
2977
2978         req := &testpb.SimpleRequest{
2979                 ResponseType: testpb.PayloadType_COMPRESSABLE,
2980                 ResponseSize: respSize,
2981                 Payload:      payload,
2982         }
2983         reply, err := tc.UnaryCall(context.Background(), req, grpc.FailFast(false))
2984         if err != nil {
2985                 t.Errorf("TestService/UnaryCall(_, _) = _, %v, want _, <nil>", err)
2986                 return
2987         }
2988         pt := reply.GetPayload().GetType()
2989         ps := len(reply.GetPayload().GetBody())
2990         if pt != testpb.PayloadType_COMPRESSABLE || ps != respSize {
2991                 t.Errorf("Got reply with type %d len %d; want %d, %d", pt, ps, testpb.PayloadType_COMPRESSABLE, respSize)
2992                 return
2993         }
2994 }
2995
2996 func TestRetry(t *testing.T) {
2997         defer leakcheck.Check(t)
2998         for _, e := range listTestEnv() {
2999                 if e.name == "handler-tls" {
3000                         // Fails with RST_STREAM / FLOW_CONTROL_ERROR
3001                         continue
3002                 }
3003                 testRetry(t, e)
3004         }
3005 }
3006
3007 // This test mimics a user who sends 1000 RPCs concurrently on a faulty transport.
3008 // TODO(zhaoq): Refactor to make this clearer and add more cases to test racy
3009 // and error-prone paths.
3010 func testRetry(t *testing.T, e env) {
3011         te := newTest(t, e)
3012         te.declareLogNoise("transport: http2Client.notifyError got notified that the client transport was broken")
3013         te.startServer(&testServer{security: e.security})
3014         defer te.tearDown()
3015
3016         cc := te.clientConn()
3017         tc := testpb.NewTestServiceClient(cc)
3018         var wg sync.WaitGroup
3019
3020         numRPC := 1000
3021         rpcSpacing := 2 * time.Millisecond
3022         if raceMode {
3023                 // The race detector has a limit on how many goroutines it can track.
3024                 // This test is near the upper limit, and goes over the limit
3025                 // depending on the environment (the http.Handler environment uses
3026                 // more goroutines)
3027                 t.Logf("Shortening test in race mode.")
3028                 numRPC /= 2
3029                 rpcSpacing *= 2
3030         }
3031
3032         wg.Add(1)
3033         go func() {
3034                 // Halfway through starting RPCs, kill all connections:
3035                 time.Sleep(time.Duration(numRPC/2) * rpcSpacing)
3036
3037                 // The server shuts down the network connection to make a
3038                 // transport error which will be detected by the client side
3039                 // code.
3040                 internal.TestingCloseConns(te.srv)
3041                 wg.Done()
3042         }()
3043         // All these RPCs should succeed eventually.
3044         for i := 0; i < numRPC; i++ {
3045                 time.Sleep(rpcSpacing)
3046                 wg.Add(1)
3047                 go performOneRPC(t, tc, &wg)
3048         }
3049         wg.Wait()
3050 }
3051
3052 func TestRPCTimeout(t *testing.T) {
3053         defer leakcheck.Check(t)
3054         for _, e := range listTestEnv() {
3055                 testRPCTimeout(t, e)
3056         }
3057 }
3058
3059 // TODO(zhaoq): Have a better test coverage of timeout and cancellation mechanism.
3060 func testRPCTimeout(t *testing.T, e env) {
3061         te := newTest(t, e)
3062         te.startServer(&testServer{security: e.security, unaryCallSleepTime: 50 * time.Millisecond})
3063         defer te.tearDown()
3064
3065         cc := te.clientConn()
3066         tc := testpb.NewTestServiceClient(cc)
3067
3068         const argSize = 2718
3069         const respSize = 314
3070
3071         payload, err := newPayload(testpb.PayloadType_COMPRESSABLE, argSize)
3072         if err != nil {
3073                 t.Fatal(err)
3074         }
3075
3076         req := &testpb.SimpleRequest{
3077                 ResponseType: testpb.PayloadType_COMPRESSABLE,
3078                 ResponseSize: respSize,
3079                 Payload:      payload,
3080         }
3081         for i := -1; i <= 10; i++ {
3082                 ctx, cancel := context.WithTimeout(context.Background(), time.Duration(i)*time.Millisecond)
3083                 if _, err := tc.UnaryCall(ctx, req); grpc.Code(err) != codes.DeadlineExceeded {
3084                         t.Fatalf("TestService/UnaryCallv(_, _) = _, %v; want <nil>, error code: %s", err, codes.DeadlineExceeded)
3085                 }
3086                 cancel()
3087         }
3088 }
3089
3090 func TestCancel(t *testing.T) {
3091         defer leakcheck.Check(t)
3092         for _, e := range listTestEnv() {
3093                 testCancel(t, e)
3094         }
3095 }
3096
3097 func testCancel(t *testing.T, e env) {
3098         te := newTest(t, e)
3099         te.declareLogNoise("grpc: the client connection is closing; please retry")
3100         te.startServer(&testServer{security: e.security, unaryCallSleepTime: time.Second})
3101         defer te.tearDown()
3102
3103         cc := te.clientConn()
3104         tc := testpb.NewTestServiceClient(cc)
3105
3106         const argSize = 2718
3107         const respSize = 314
3108
3109         payload, err := newPayload(testpb.PayloadType_COMPRESSABLE, argSize)
3110         if err != nil {
3111                 t.Fatal(err)
3112         }
3113
3114         req := &testpb.SimpleRequest{
3115                 ResponseType: testpb.PayloadType_COMPRESSABLE,
3116                 ResponseSize: respSize,
3117                 Payload:      payload,
3118         }
3119         ctx, cancel := context.WithCancel(context.Background())
3120         time.AfterFunc(1*time.Millisecond, cancel)
3121         if r, err := tc.UnaryCall(ctx, req); grpc.Code(err) != codes.Canceled {
3122                 t.Fatalf("TestService/UnaryCall(_, _) = %v, %v; want _, error code: %s", r, err, codes.Canceled)
3123         }
3124         awaitNewConnLogOutput()
3125 }
3126
3127 func TestCancelNoIO(t *testing.T) {
3128         defer leakcheck.Check(t)
3129         for _, e := range listTestEnv() {
3130                 testCancelNoIO(t, e)
3131         }
3132 }
3133
3134 func testCancelNoIO(t *testing.T, e env) {
3135         te := newTest(t, e)
3136         te.declareLogNoise("http2Client.notifyError got notified that the client transport was broken")
3137         te.maxStream = 1 // Only allows 1 live stream per server transport.
3138         te.startServer(&testServer{security: e.security})
3139         defer te.tearDown()
3140
3141         cc := te.clientConn()
3142         tc := testpb.NewTestServiceClient(cc)
3143
3144         // Start one blocked RPC for which we'll never send streaming
3145         // input. This will consume the 1 maximum concurrent streams,
3146         // causing future RPCs to hang.
3147         ctx, cancelFirst := context.WithCancel(context.Background())
3148         _, err := tc.StreamingInputCall(ctx)
3149         if err != nil {
3150                 t.Fatalf("%v.StreamingInputCall(_) = _, %v, want _, <nil>", tc, err)
3151         }
3152
3153         // Loop until the ClientConn receives the initial settings
3154         // frame from the server, notifying it about the maximum
3155         // concurrent streams. We know when it's received it because
3156         // an RPC will fail with codes.DeadlineExceeded instead of
3157         // succeeding.
3158         // TODO(bradfitz): add internal test hook for this (Issue 534)
3159         for {
3160                 ctx, cancelSecond := context.WithTimeout(context.Background(), 50*time.Millisecond)
3161                 _, err := tc.StreamingInputCall(ctx)
3162                 cancelSecond()
3163                 if err == nil {
3164                         continue
3165                 }
3166                 if grpc.Code(err) == codes.DeadlineExceeded {
3167                         break
3168                 }
3169                 t.Fatalf("%v.StreamingInputCall(_) = _, %v, want _, %s", tc, err, codes.DeadlineExceeded)
3170         }
3171         // If there are any RPCs in flight before the client receives
3172         // the max streams setting, let them be expired.
3173         // TODO(bradfitz): add internal test hook for this (Issue 534)
3174         time.Sleep(50 * time.Millisecond)
3175
3176         go func() {
3177                 time.Sleep(50 * time.Millisecond)
3178                 cancelFirst()
3179         }()
3180
3181         // This should be blocked until the 1st is canceled, then succeed.
3182         ctx, cancelThird := context.WithTimeout(context.Background(), 500*time.Millisecond)
3183         if _, err := tc.StreamingInputCall(ctx); err != nil {
3184                 t.Errorf("%v.StreamingInputCall(_) = _, %v, want _, <nil>", tc, err)
3185         }
3186         cancelThird()
3187 }
3188
3189 // The following tests the gRPC streaming RPC implementations.
3190 // TODO(zhaoq): Have better coverage on error cases.
3191 var (
3192         reqSizes  = []int{27182, 8, 1828, 45904}
3193         respSizes = []int{31415, 9, 2653, 58979}
3194 )
3195
3196 func TestNoService(t *testing.T) {
3197         defer leakcheck.Check(t)
3198         for _, e := range listTestEnv() {
3199                 testNoService(t, e)
3200         }
3201 }
3202
3203 func testNoService(t *testing.T, e env) {
3204         te := newTest(t, e)
3205         te.startServer(nil)
3206         defer te.tearDown()
3207
3208         cc := te.clientConn()
3209         tc := testpb.NewTestServiceClient(cc)
3210
3211         stream, err := tc.FullDuplexCall(te.ctx, grpc.FailFast(false))
3212         if err != nil {
3213                 t.Fatalf("%v.FullDuplexCall(_) = _, %v, want <nil>", tc, err)
3214         }
3215         if _, err := stream.Recv(); grpc.Code(err) != codes.Unimplemented {
3216                 t.Fatalf("stream.Recv() = _, %v, want _, error code %s", err, codes.Unimplemented)
3217         }
3218 }
3219
3220 func TestPingPong(t *testing.T) {
3221         defer leakcheck.Check(t)
3222         for _, e := range listTestEnv() {
3223                 testPingPong(t, e)
3224         }
3225 }
3226
3227 func testPingPong(t *testing.T, e env) {
3228         te := newTest(t, e)
3229         te.startServer(&testServer{security: e.security})
3230         defer te.tearDown()
3231         tc := testpb.NewTestServiceClient(te.clientConn())
3232
3233         stream, err := tc.FullDuplexCall(te.ctx)
3234         if err != nil {
3235                 t.Fatalf("%v.FullDuplexCall(_) = _, %v, want <nil>", tc, err)
3236         }
3237         var index int
3238         for index < len(reqSizes) {
3239                 respParam := []*testpb.ResponseParameters{
3240                         {
3241                                 Size: int32(respSizes[index]),
3242                         },
3243                 }
3244
3245                 payload, err := newPayload(testpb.PayloadType_COMPRESSABLE, int32(reqSizes[index]))
3246                 if err != nil {
3247                         t.Fatal(err)
3248                 }
3249
3250                 req := &testpb.StreamingOutputCallRequest{
3251                         ResponseType:       testpb.PayloadType_COMPRESSABLE,
3252                         ResponseParameters: respParam,
3253                         Payload:            payload,
3254                 }
3255                 if err := stream.Send(req); err != nil {
3256                         t.Fatalf("%v.Send(%v) = %v, want <nil>", stream, req, err)
3257                 }
3258                 reply, err := stream.Recv()
3259                 if err != nil {
3260                         t.Fatalf("%v.Recv() = %v, want <nil>", stream, err)
3261                 }
3262                 pt := reply.GetPayload().GetType()
3263                 if pt != testpb.PayloadType_COMPRESSABLE {
3264                         t.Fatalf("Got the reply of type %d, want %d", pt, testpb.PayloadType_COMPRESSABLE)
3265                 }
3266                 size := len(reply.GetPayload().GetBody())
3267                 if size != int(respSizes[index]) {
3268                         t.Fatalf("Got reply body of length %d, want %d", size, respSizes[index])
3269                 }
3270                 index++
3271         }
3272         if err := stream.CloseSend(); err != nil {
3273                 t.Fatalf("%v.CloseSend() got %v, want %v", stream, err, nil)
3274         }
3275         if _, err := stream.Recv(); err != io.EOF {
3276                 t.Fatalf("%v failed to complele the ping pong test: %v", stream, err)
3277         }
3278 }
3279
3280 func TestMetadataStreamingRPC(t *testing.T) {
3281         defer leakcheck.Check(t)
3282         for _, e := range listTestEnv() {
3283                 testMetadataStreamingRPC(t, e)
3284         }
3285 }
3286
3287 func testMetadataStreamingRPC(t *testing.T, e env) {
3288         te := newTest(t, e)
3289         te.startServer(&testServer{security: e.security})
3290         defer te.tearDown()
3291         tc := testpb.NewTestServiceClient(te.clientConn())
3292
3293         ctx := metadata.NewOutgoingContext(te.ctx, testMetadata)
3294         stream, err := tc.FullDuplexCall(ctx)
3295         if err != nil {
3296                 t.Fatalf("%v.FullDuplexCall(_) = _, %v, want <nil>", tc, err)
3297         }
3298         go func() {
3299                 headerMD, err := stream.Header()
3300                 if e.security == "tls" {
3301                         delete(headerMD, "transport_security_type")
3302                 }
3303                 delete(headerMD, "trailer") // ignore if present
3304                 delete(headerMD, "user-agent")
3305                 if err != nil || !reflect.DeepEqual(testMetadata, headerMD) {
3306                         t.Errorf("#1 %v.Header() = %v, %v, want %v, <nil>", stream, headerMD, err, testMetadata)
3307                 }
3308                 // test the cached value.
3309                 headerMD, err = stream.Header()
3310                 delete(headerMD, "trailer") // ignore if present
3311                 delete(headerMD, "user-agent")
3312                 if err != nil || !reflect.DeepEqual(testMetadata, headerMD) {
3313                         t.Errorf("#2 %v.Header() = %v, %v, want %v, <nil>", stream, headerMD, err, testMetadata)
3314                 }
3315                 err = func() error {
3316                         for index := 0; index < len(reqSizes); index++ {
3317                                 respParam := []*testpb.ResponseParameters{
3318                                         {
3319                                                 Size: int32(respSizes[index]),
3320                                         },
3321                                 }
3322
3323                                 payload, err := newPayload(testpb.PayloadType_COMPRESSABLE, int32(reqSizes[index]))
3324                                 if err != nil {
3325                                         return err
3326                                 }
3327
3328                                 req := &testpb.StreamingOutputCallRequest{
3329                                         ResponseType:       testpb.PayloadType_COMPRESSABLE,
3330                                         ResponseParameters: respParam,
3331                                         Payload:            payload,
3332                                 }
3333                                 if err := stream.Send(req); err != nil {
3334                                         return fmt.Errorf("%v.Send(%v) = %v, want <nil>", stream, req, err)
3335                                 }
3336                         }
3337                         return nil
3338                 }()
3339                 // Tell the server we're done sending args.
3340                 stream.CloseSend()
3341                 if err != nil {
3342                         t.Error(err)
3343                 }
3344         }()
3345         for {
3346                 if _, err := stream.Recv(); err != nil {
3347                         break
3348                 }
3349         }
3350         trailerMD := stream.Trailer()
3351         if !reflect.DeepEqual(testTrailerMetadata, trailerMD) {
3352                 t.Fatalf("%v.Trailer() = %v, want %v", stream, trailerMD, testTrailerMetadata)
3353         }
3354 }
3355
3356 func TestServerStreaming(t *testing.T) {
3357         defer leakcheck.Check(t)
3358         for _, e := range listTestEnv() {
3359                 testServerStreaming(t, e)
3360         }
3361 }
3362
3363 func testServerStreaming(t *testing.T, e env) {
3364         te := newTest(t, e)
3365         te.startServer(&testServer{security: e.security})
3366         defer te.tearDown()
3367         tc := testpb.NewTestServiceClient(te.clientConn())
3368
3369         respParam := make([]*testpb.ResponseParameters, len(respSizes))
3370         for i, s := range respSizes {
3371                 respParam[i] = &testpb.ResponseParameters{
3372                         Size: int32(s),
3373                 }
3374         }
3375         req := &testpb.StreamingOutputCallRequest{
3376                 ResponseType:       testpb.PayloadType_COMPRESSABLE,
3377                 ResponseParameters: respParam,
3378         }
3379         stream, err := tc.StreamingOutputCall(context.Background(), req)
3380         if err != nil {
3381                 t.Fatalf("%v.StreamingOutputCall(_) = _, %v, want <nil>", tc, err)
3382         }
3383         var rpcStatus error
3384         var respCnt int
3385         var index int
3386         for {
3387                 reply, err := stream.Recv()
3388                 if err != nil {
3389                         rpcStatus = err
3390                         break
3391                 }
3392                 pt := reply.GetPayload().GetType()
3393                 if pt != testpb.PayloadType_COMPRESSABLE {
3394                         t.Fatalf("Got the reply of type %d, want %d", pt, testpb.PayloadType_COMPRESSABLE)
3395                 }
3396                 size := len(reply.GetPayload().GetBody())
3397                 if size != int(respSizes[index]) {
3398                         t.Fatalf("Got reply body of length %d, want %d", size, respSizes[index])
3399                 }
3400                 index++
3401                 respCnt++
3402         }
3403         if rpcStatus != io.EOF {
3404                 t.Fatalf("Failed to finish the server streaming rpc: %v, want <EOF>", rpcStatus)
3405         }
3406         if respCnt != len(respSizes) {
3407                 t.Fatalf("Got %d reply, want %d", len(respSizes), respCnt)
3408         }
3409 }
3410
3411 func TestFailedServerStreaming(t *testing.T) {
3412         defer leakcheck.Check(t)
3413         for _, e := range listTestEnv() {
3414                 testFailedServerStreaming(t, e)
3415         }
3416 }
3417
3418 func testFailedServerStreaming(t *testing.T, e env) {
3419         te := newTest(t, e)
3420         te.userAgent = failAppUA
3421         te.startServer(&testServer{security: e.security})
3422         defer te.tearDown()
3423         tc := testpb.NewTestServiceClient(te.clientConn())
3424
3425         respParam := make([]*testpb.ResponseParameters, len(respSizes))
3426         for i, s := range respSizes {
3427                 respParam[i] = &testpb.ResponseParameters{
3428                         Size: int32(s),
3429                 }
3430         }
3431         req := &testpb.StreamingOutputCallRequest{
3432                 ResponseType:       testpb.PayloadType_COMPRESSABLE,
3433                 ResponseParameters: respParam,
3434         }
3435         ctx := metadata.NewOutgoingContext(te.ctx, testMetadata)
3436         stream, err := tc.StreamingOutputCall(ctx, req)
3437         if err != nil {
3438                 t.Fatalf("%v.StreamingOutputCall(_) = _, %v, want <nil>", tc, err)
3439         }
3440         wantErr := grpc.Errorf(codes.DataLoss, "error for testing: "+failAppUA)
3441         if _, err := stream.Recv(); !reflect.DeepEqual(err, wantErr) {
3442                 t.Fatalf("%v.Recv() = _, %v, want _, %v", stream, err, wantErr)
3443         }
3444 }
3445
3446 // concurrentSendServer is a TestServiceServer whose
3447 // StreamingOutputCall makes ten serial Send calls, sending payloads
3448 // "0".."9", inclusive.  TestServerStreamingConcurrent verifies they
3449 // were received in the correct order, and that there were no races.
3450 //
3451 // All other TestServiceServer methods crash if called.
3452 type concurrentSendServer struct {
3453         testpb.TestServiceServer
3454 }
3455
3456 func (s concurrentSendServer) StreamingOutputCall(args *testpb.StreamingOutputCallRequest, stream testpb.TestService_StreamingOutputCallServer) error {
3457         for i := 0; i < 10; i++ {
3458                 stream.Send(&testpb.StreamingOutputCallResponse{
3459                         Payload: &testpb.Payload{
3460                                 Body: []byte{'0' + uint8(i)},
3461                         },
3462                 })
3463         }
3464         return nil
3465 }
3466
3467 // Tests doing a bunch of concurrent streaming output calls.
3468 func TestServerStreamingConcurrent(t *testing.T) {
3469         defer leakcheck.Check(t)
3470         for _, e := range listTestEnv() {
3471                 testServerStreamingConcurrent(t, e)
3472         }
3473 }
3474
3475 func testServerStreamingConcurrent(t *testing.T, e env) {
3476         te := newTest(t, e)
3477         te.startServer(concurrentSendServer{})
3478         defer te.tearDown()
3479
3480         cc := te.clientConn()
3481         tc := testpb.NewTestServiceClient(cc)
3482
3483         doStreamingCall := func() {
3484                 req := &testpb.StreamingOutputCallRequest{}
3485                 stream, err := tc.StreamingOutputCall(context.Background(), req)
3486                 if err != nil {
3487                         t.Errorf("%v.StreamingOutputCall(_) = _, %v, want <nil>", tc, err)
3488                         return
3489                 }
3490                 var ngot int
3491                 var buf bytes.Buffer
3492                 for {
3493                         reply, err := stream.Recv()
3494                         if err == io.EOF {
3495                                 break
3496                         }
3497                         if err != nil {
3498                                 t.Fatal(err)
3499                         }
3500                         ngot++
3501                         if buf.Len() > 0 {
3502                                 buf.WriteByte(',')
3503                         }
3504                         buf.Write(reply.GetPayload().GetBody())
3505                 }
3506                 if want := 10; ngot != want {
3507                         t.Errorf("Got %d replies, want %d", ngot, want)
3508                 }
3509                 if got, want := buf.String(), "0,1,2,3,4,5,6,7,8,9"; got != want {
3510                         t.Errorf("Got replies %q; want %q", got, want)
3511                 }
3512         }
3513
3514         var wg sync.WaitGroup
3515         for i := 0; i < 20; i++ {
3516                 wg.Add(1)
3517                 go func() {
3518                         defer wg.Done()
3519                         doStreamingCall()
3520                 }()
3521         }
3522         wg.Wait()
3523
3524 }
3525
3526 func generatePayloadSizes() [][]int {
3527         reqSizes := [][]int{
3528                 {27182, 8, 1828, 45904},
3529         }
3530
3531         num8KPayloads := 1024
3532         eightKPayloads := []int{}
3533         for i := 0; i < num8KPayloads; i++ {
3534                 eightKPayloads = append(eightKPayloads, (1 << 13))
3535         }
3536         reqSizes = append(reqSizes, eightKPayloads)
3537
3538         num2MPayloads := 8
3539         twoMPayloads := []int{}
3540         for i := 0; i < num2MPayloads; i++ {
3541                 twoMPayloads = append(twoMPayloads, (1 << 21))
3542         }
3543         reqSizes = append(reqSizes, twoMPayloads)
3544
3545         return reqSizes
3546 }
3547
3548 func TestClientStreaming(t *testing.T) {
3549         defer leakcheck.Check(t)
3550         for _, s := range generatePayloadSizes() {
3551                 for _, e := range listTestEnv() {
3552                         testClientStreaming(t, e, s)
3553                 }
3554         }
3555 }
3556
3557 func testClientStreaming(t *testing.T, e env, sizes []int) {
3558         te := newTest(t, e)
3559         te.startServer(&testServer{security: e.security})
3560         defer te.tearDown()
3561         tc := testpb.NewTestServiceClient(te.clientConn())
3562
3563         ctx, cancel := context.WithTimeout(te.ctx, time.Second*30)
3564         defer cancel()
3565         stream, err := tc.StreamingInputCall(ctx)
3566         if err != nil {
3567                 t.Fatalf("%v.StreamingInputCall(_) = _, %v, want <nil>", tc, err)
3568         }
3569
3570         var sum int
3571         for _, s := range sizes {
3572                 payload, err := newPayload(testpb.PayloadType_COMPRESSABLE, int32(s))
3573                 if err != nil {
3574                         t.Fatal(err)
3575                 }
3576
3577                 req := &testpb.StreamingInputCallRequest{
3578                         Payload: payload,
3579                 }
3580                 if err := stream.Send(req); err != nil {
3581                         t.Fatalf("%v.Send(%v) = %v, want <nil>", stream, req, err)
3582                 }
3583                 sum += s
3584         }
3585         reply, err := stream.CloseAndRecv()
3586         if err != nil {
3587                 t.Fatalf("%v.CloseAndRecv() got error %v, want %v", stream, err, nil)
3588         }
3589         if reply.GetAggregatedPayloadSize() != int32(sum) {
3590                 t.Fatalf("%v.CloseAndRecv().GetAggregatePayloadSize() = %v; want %v", stream, reply.GetAggregatedPayloadSize(), sum)
3591         }
3592 }
3593
3594 func TestClientStreamingError(t *testing.T) {
3595         defer leakcheck.Check(t)
3596         for _, e := range listTestEnv() {
3597                 if e.name == "handler-tls" {
3598                         continue
3599                 }
3600                 testClientStreamingError(t, e)
3601         }
3602 }
3603
3604 func testClientStreamingError(t *testing.T, e env) {
3605         te := newTest(t, e)
3606         te.startServer(&testServer{security: e.security, earlyFail: true})
3607         defer te.tearDown()
3608         tc := testpb.NewTestServiceClient(te.clientConn())
3609
3610         stream, err := tc.StreamingInputCall(te.ctx)
3611         if err != nil {
3612                 t.Fatalf("%v.StreamingInputCall(_) = _, %v, want <nil>", tc, err)
3613         }
3614         payload, err := newPayload(testpb.PayloadType_COMPRESSABLE, 1)
3615         if err != nil {
3616                 t.Fatal(err)
3617         }
3618
3619         req := &testpb.StreamingInputCallRequest{
3620                 Payload: payload,
3621         }
3622         // The 1st request should go through.
3623         if err := stream.Send(req); err != nil {
3624                 t.Fatalf("%v.Send(%v) = %v, want <nil>", stream, req, err)
3625         }
3626         for {
3627                 if err := stream.Send(req); err != io.EOF {
3628                         continue
3629                 }
3630                 if _, err := stream.CloseAndRecv(); grpc.Code(err) != codes.NotFound {
3631                         t.Fatalf("%v.CloseAndRecv() = %v, want error %s", stream, err, codes.NotFound)
3632                 }
3633                 break
3634         }
3635 }
3636
3637 func TestExceedMaxStreamsLimit(t *testing.T) {
3638         defer leakcheck.Check(t)
3639         for _, e := range listTestEnv() {
3640                 testExceedMaxStreamsLimit(t, e)
3641         }
3642 }
3643
3644 func testExceedMaxStreamsLimit(t *testing.T, e env) {
3645         te := newTest(t, e)
3646         te.declareLogNoise(
3647                 "http2Client.notifyError got notified that the client transport was broken",
3648                 "Conn.resetTransport failed to create client transport",
3649                 "grpc: the connection is closing",
3650         )
3651         te.maxStream = 1 // Only allows 1 live stream per server transport.
3652         te.startServer(&testServer{security: e.security})
3653         defer te.tearDown()
3654
3655         cc := te.clientConn()
3656         tc := testpb.NewTestServiceClient(cc)
3657
3658         _, err := tc.StreamingInputCall(te.ctx)
3659         if err != nil {
3660                 t.Fatalf("%v.StreamingInputCall(_) = _, %v, want _, <nil>", tc, err)
3661         }
3662         // Loop until receiving the new max stream setting from the server.
3663         for {
3664                 ctx, cancel := context.WithTimeout(context.Background(), 50*time.Millisecond)
3665                 defer cancel()
3666                 _, err := tc.StreamingInputCall(ctx)
3667                 if err == nil {
3668                         time.Sleep(50 * time.Millisecond)
3669                         continue
3670                 }
3671                 if grpc.Code(err) == codes.DeadlineExceeded {
3672                         break
3673                 }
3674                 t.Fatalf("%v.StreamingInputCall(_) = _, %v, want _, %s", tc, err, codes.DeadlineExceeded)
3675         }
3676 }
3677
3678 func TestStreamsQuotaRecovery(t *testing.T) {
3679         defer leakcheck.Check(t)
3680         for _, e := range listTestEnv() {
3681                 testStreamsQuotaRecovery(t, e)
3682         }
3683 }
3684
3685 func testStreamsQuotaRecovery(t *testing.T, e env) {
3686         te := newTest(t, e)
3687         te.declareLogNoise(
3688                 "http2Client.notifyError got notified that the client transport was broken",
3689                 "Conn.resetTransport failed to create client transport",
3690                 "grpc: the connection is closing",
3691         )
3692         te.maxStream = 1 // Allows 1 live stream.
3693         te.startServer(&testServer{security: e.security})
3694         defer te.tearDown()
3695
3696         cc := te.clientConn()
3697         tc := testpb.NewTestServiceClient(cc)
3698         if _, err := tc.StreamingInputCall(context.Background()); err != nil {
3699                 t.Fatalf("%v.StreamingInputCall(_) = _, %v, want _, <nil>", tc, err)
3700         }
3701         // Loop until the new max stream setting is effective.
3702         for {
3703                 ctx, cancel := context.WithTimeout(context.Background(), 50*time.Millisecond)
3704                 defer cancel()
3705                 _, err := tc.StreamingInputCall(ctx)
3706                 if err == nil {
3707                         time.Sleep(50 * time.Millisecond)
3708                         continue
3709                 }
3710                 if grpc.Code(err) == codes.DeadlineExceeded {
3711                         break
3712                 }
3713                 t.Fatalf("%v.StreamingInputCall(_) = _, %v, want _, %s", tc, err, codes.DeadlineExceeded)
3714         }
3715
3716         var wg sync.WaitGroup
3717         for i := 0; i < 10; i++ {
3718                 wg.Add(1)
3719                 go func() {
3720                         defer wg.Done()
3721                         payload, err := newPayload(testpb.PayloadType_COMPRESSABLE, 314)
3722                         if err != nil {
3723                                 t.Error(err)
3724                                 return
3725                         }
3726                         req := &testpb.SimpleRequest{
3727                                 ResponseType: testpb.PayloadType_COMPRESSABLE,
3728                                 ResponseSize: 1592,
3729                                 Payload:      payload,
3730                         }
3731                         // No rpc should go through due to the max streams limit.
3732                         ctx, cancel := context.WithTimeout(context.Background(), 10*time.Millisecond)
3733                         defer cancel()
3734                         if _, err := tc.UnaryCall(ctx, req, grpc.FailFast(false)); grpc.Code(err) != codes.DeadlineExceeded {
3735                                 t.Errorf("TestService/UnaryCall(_, _) = _, %v, want _, %s", err, codes.DeadlineExceeded)
3736                         }
3737                 }()
3738         }
3739         wg.Wait()
3740 }
3741
3742 func TestCompressServerHasNoSupport(t *testing.T) {
3743         defer leakcheck.Check(t)
3744         for _, e := range listTestEnv() {
3745                 testCompressServerHasNoSupport(t, e)
3746         }
3747 }
3748
3749 func testCompressServerHasNoSupport(t *testing.T, e env) {
3750         te := newTest(t, e)
3751         te.serverCompression = false
3752         te.clientCompression = true
3753         te.startServer(&testServer{security: e.security})
3754         defer te.tearDown()
3755         tc := testpb.NewTestServiceClient(te.clientConn())
3756
3757         const argSize = 271828
3758         const respSize = 314159
3759         payload, err := newPayload(testpb.PayloadType_COMPRESSABLE, argSize)
3760         if err != nil {
3761                 t.Fatal(err)
3762         }
3763         req := &testpb.SimpleRequest{
3764                 ResponseType: testpb.PayloadType_COMPRESSABLE,
3765                 ResponseSize: respSize,
3766                 Payload:      payload,
3767         }
3768         if _, err := tc.UnaryCall(context.Background(), req); err == nil || grpc.Code(err) != codes.Unimplemented {
3769                 t.Fatalf("TestService/UnaryCall(_, _) = _, %v, want _, error code %s", err, codes.Unimplemented)
3770         }
3771         // Streaming RPC
3772         stream, err := tc.FullDuplexCall(context.Background())
3773         if err != nil {
3774                 t.Fatalf("%v.FullDuplexCall(_) = _, %v, want <nil>", tc, err)
3775         }
3776         respParam := []*testpb.ResponseParameters{
3777                 {
3778                         Size: 31415,
3779                 },
3780         }
3781         payload, err = newPayload(testpb.PayloadType_COMPRESSABLE, int32(31415))
3782         if err != nil {
3783                 t.Fatal(err)
3784         }
3785         sreq := &testpb.StreamingOutputCallRequest{
3786                 ResponseType:       testpb.PayloadType_COMPRESSABLE,
3787                 ResponseParameters: respParam,
3788                 Payload:            payload,
3789         }
3790         if err := stream.Send(sreq); err != nil {
3791                 t.Fatalf("%v.Send(%v) = %v, want <nil>", stream, sreq, err)
3792         }
3793         if _, err := stream.Recv(); err == nil || grpc.Code(err) != codes.Unimplemented {
3794                 t.Fatalf("%v.Recv() = %v, want error code %s", stream, err, codes.Unimplemented)
3795         }
3796 }
3797
3798 func TestCompressOK(t *testing.T) {
3799         defer leakcheck.Check(t)
3800         for _, e := range listTestEnv() {
3801                 testCompressOK(t, e)
3802         }
3803 }
3804
3805 func testCompressOK(t *testing.T, e env) {
3806         te := newTest(t, e)
3807         te.serverCompression = true
3808         te.clientCompression = true
3809         te.startServer(&testServer{security: e.security})
3810         defer te.tearDown()
3811         tc := testpb.NewTestServiceClient(te.clientConn())
3812
3813         // Unary call
3814         const argSize = 271828
3815         const respSize = 314159
3816         payload, err := newPayload(testpb.PayloadType_COMPRESSABLE, argSize)
3817         if err != nil {
3818                 t.Fatal(err)
3819         }
3820         req := &testpb.SimpleRequest{
3821                 ResponseType: testpb.PayloadType_COMPRESSABLE,
3822                 ResponseSize: respSize,
3823                 Payload:      payload,
3824         }
3825         ctx := metadata.NewOutgoingContext(context.Background(), metadata.Pairs("something", "something"))
3826         if _, err := tc.UnaryCall(ctx, req); err != nil {
3827                 t.Fatalf("TestService/UnaryCall(_, _) = _, %v, want _, <nil>", err)
3828         }
3829         // Streaming RPC
3830         ctx, cancel := context.WithCancel(context.Background())
3831         defer cancel()
3832         stream, err := tc.FullDuplexCall(ctx)
3833         if err != nil {
3834                 t.Fatalf("%v.FullDuplexCall(_) = _, %v, want <nil>", tc, err)
3835         }
3836         respParam := []*testpb.ResponseParameters{
3837                 {
3838                         Size: 31415,
3839                 },
3840         }
3841         payload, err = newPayload(testpb.PayloadType_COMPRESSABLE, int32(31415))
3842         if err != nil {
3843                 t.Fatal(err)
3844         }
3845         sreq := &testpb.StreamingOutputCallRequest{
3846                 ResponseType:       testpb.PayloadType_COMPRESSABLE,
3847                 ResponseParameters: respParam,
3848                 Payload:            payload,
3849         }
3850         if err := stream.Send(sreq); err != nil {
3851                 t.Fatalf("%v.Send(%v) = %v, want <nil>", stream, sreq, err)
3852         }
3853         if _, err := stream.Recv(); err != nil {
3854                 t.Fatalf("%v.Recv() = %v, want <nil>", stream, err)
3855         }
3856 }
3857
3858 func TestUnaryClientInterceptor(t *testing.T) {
3859         defer leakcheck.Check(t)
3860         for _, e := range listTestEnv() {
3861                 testUnaryClientInterceptor(t, e)
3862         }
3863 }
3864
3865 func failOkayRPC(ctx context.Context, method string, req, reply interface{}, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error {
3866         err := invoker(ctx, method, req, reply, cc, opts...)
3867         if err == nil {
3868                 return grpc.Errorf(codes.NotFound, "")
3869         }
3870         return err
3871 }
3872
3873 func testUnaryClientInterceptor(t *testing.T, e env) {
3874         te := newTest(t, e)
3875         te.userAgent = testAppUA
3876         te.unaryClientInt = failOkayRPC
3877         te.startServer(&testServer{security: e.security})
3878         defer te.tearDown()
3879
3880         tc := testpb.NewTestServiceClient(te.clientConn())
3881         if _, err := tc.EmptyCall(context.Background(), &testpb.Empty{}); grpc.Code(err) != codes.NotFound {
3882                 t.Fatalf("%v.EmptyCall(_, _) = _, %v, want _, error code %s", tc, err, codes.NotFound)
3883         }
3884 }
3885
3886 func TestStreamClientInterceptor(t *testing.T) {
3887         defer leakcheck.Check(t)
3888         for _, e := range listTestEnv() {
3889                 testStreamClientInterceptor(t, e)
3890         }
3891 }
3892
3893 func failOkayStream(ctx context.Context, desc *grpc.StreamDesc, cc *grpc.ClientConn, method string, streamer grpc.Streamer, opts ...grpc.CallOption) (grpc.ClientStream, error) {
3894         s, err := streamer(ctx, desc, cc, method, opts...)
3895         if err == nil {
3896                 return nil, grpc.Errorf(codes.NotFound, "")
3897         }
3898         return s, nil
3899 }
3900
3901 func testStreamClientInterceptor(t *testing.T, e env) {
3902         te := newTest(t, e)
3903         te.streamClientInt = failOkayStream
3904         te.startServer(&testServer{security: e.security})
3905         defer te.tearDown()
3906
3907         tc := testpb.NewTestServiceClient(te.clientConn())
3908         respParam := []*testpb.ResponseParameters{
3909                 {
3910                         Size: int32(1),
3911                 },
3912         }
3913         payload, err := newPayload(testpb.PayloadType_COMPRESSABLE, int32(1))
3914         if err != nil {
3915                 t.Fatal(err)
3916         }
3917         req := &testpb.StreamingOutputCallRequest{
3918                 ResponseType:       testpb.PayloadType_COMPRESSABLE,
3919                 ResponseParameters: respParam,
3920                 Payload:            payload,
3921         }
3922         if _, err := tc.StreamingOutputCall(context.Background(), req); grpc.Code(err) != codes.NotFound {
3923                 t.Fatalf("%v.StreamingOutputCall(_) = _, %v, want _, error code %s", tc, err, codes.NotFound)
3924         }
3925 }
3926
3927 func TestUnaryServerInterceptor(t *testing.T) {
3928         defer leakcheck.Check(t)
3929         for _, e := range listTestEnv() {
3930                 testUnaryServerInterceptor(t, e)
3931         }
3932 }
3933
3934 func errInjector(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error) {
3935         return nil, grpc.Errorf(codes.PermissionDenied, "")
3936 }
3937
3938 func testUnaryServerInterceptor(t *testing.T, e env) {
3939         te := newTest(t, e)
3940         te.unaryServerInt = errInjector
3941         te.startServer(&testServer{security: e.security})
3942         defer te.tearDown()
3943
3944         tc := testpb.NewTestServiceClient(te.clientConn())
3945         if _, err := tc.EmptyCall(context.Background(), &testpb.Empty{}); grpc.Code(err) != codes.PermissionDenied {
3946                 t.Fatalf("%v.EmptyCall(_, _) = _, %v, want _, error code %s", tc, err, codes.PermissionDenied)
3947         }
3948 }
3949
3950 func TestStreamServerInterceptor(t *testing.T) {
3951         defer leakcheck.Check(t)
3952         for _, e := range listTestEnv() {
3953                 // TODO(bradfitz): Temporarily skip this env due to #619.
3954                 if e.name == "handler-tls" {
3955                         continue
3956                 }
3957                 testStreamServerInterceptor(t, e)
3958         }
3959 }
3960
3961 func fullDuplexOnly(srv interface{}, ss grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) error {
3962         if info.FullMethod == "/grpc.testing.TestService/FullDuplexCall" {
3963                 return handler(srv, ss)
3964         }
3965         // Reject the other methods.
3966         return grpc.Errorf(codes.PermissionDenied, "")
3967 }
3968
3969 func testStreamServerInterceptor(t *testing.T, e env) {
3970         te := newTest(t, e)
3971         te.streamServerInt = fullDuplexOnly
3972         te.startServer(&testServer{security: e.security})
3973         defer te.tearDown()
3974
3975         tc := testpb.NewTestServiceClient(te.clientConn())
3976         respParam := []*testpb.ResponseParameters{
3977                 {
3978                         Size: int32(1),
3979                 },
3980         }
3981         payload, err := newPayload(testpb.PayloadType_COMPRESSABLE, int32(1))
3982         if err != nil {
3983                 t.Fatal(err)
3984         }
3985         req := &testpb.StreamingOutputCallRequest{
3986                 ResponseType:       testpb.PayloadType_COMPRESSABLE,
3987                 ResponseParameters: respParam,
3988                 Payload:            payload,
3989         }
3990         s1, err := tc.StreamingOutputCall(context.Background(), req)
3991         if err != nil {
3992                 t.Fatalf("%v.StreamingOutputCall(_) = _, %v, want _, <nil>", tc, err)
3993         }
3994         if _, err := s1.Recv(); grpc.Code(err) != codes.PermissionDenied {
3995                 t.Fatalf("%v.StreamingInputCall(_) = _, %v, want _, error code %s", tc, err, codes.PermissionDenied)
3996         }
3997         s2, err := tc.FullDuplexCall(context.Background())
3998         if err != nil {
3999                 t.Fatalf("%v.FullDuplexCall(_) = _, %v, want <nil>", tc, err)
4000         }
4001         if err := s2.Send(req); err != nil {
4002                 t.Fatalf("%v.Send(_) = %v, want <nil>", s2, err)
4003         }
4004         if _, err := s2.Recv(); err != nil {
4005                 t.Fatalf("%v.Recv() = _, %v, want _, <nil>", s2, err)
4006         }
4007 }
4008
4009 // funcServer implements methods of TestServiceServer using funcs,
4010 // similar to an http.HandlerFunc.
4011 // Any unimplemented method will crash. Tests implement the method(s)
4012 // they need.
4013 type funcServer struct {
4014         testpb.TestServiceServer
4015         unaryCall          func(ctx context.Context, in *testpb.SimpleRequest) (*testpb.SimpleResponse, error)
4016         streamingInputCall func(stream testpb.TestService_StreamingInputCallServer) error
4017 }
4018
4019 func (s *funcServer) UnaryCall(ctx context.Context, in *testpb.SimpleRequest) (*testpb.SimpleResponse, error) {
4020         return s.unaryCall(ctx, in)
4021 }
4022
4023 func (s *funcServer) StreamingInputCall(stream testpb.TestService_StreamingInputCallServer) error {
4024         return s.streamingInputCall(stream)
4025 }
4026
4027 func TestClientRequestBodyErrorUnexpectedEOF(t *testing.T) {
4028         defer leakcheck.Check(t)
4029         for _, e := range listTestEnv() {
4030                 testClientRequestBodyErrorUnexpectedEOF(t, e)
4031         }
4032 }
4033
4034 func testClientRequestBodyErrorUnexpectedEOF(t *testing.T, e env) {
4035         te := newTest(t, e)
4036         ts := &funcServer{unaryCall: func(ctx context.Context, in *testpb.SimpleRequest) (*testpb.SimpleResponse, error) {
4037                 errUnexpectedCall := errors.New("unexpected call func server method")
4038                 t.Error(errUnexpectedCall)
4039                 return nil, errUnexpectedCall
4040         }}
4041         te.startServer(ts)
4042         defer te.tearDown()
4043         te.withServerTester(func(st *serverTester) {
4044                 st.writeHeadersGRPC(1, "/grpc.testing.TestService/UnaryCall")
4045                 // Say we have 5 bytes coming, but set END_STREAM flag:
4046                 st.writeData(1, true, []byte{0, 0, 0, 0, 5})
4047                 st.wantAnyFrame() // wait for server to crash (it used to crash)
4048         })
4049 }
4050
4051 func TestClientRequestBodyErrorCloseAfterLength(t *testing.T) {
4052         defer leakcheck.Check(t)
4053         for _, e := range listTestEnv() {
4054                 testClientRequestBodyErrorCloseAfterLength(t, e)
4055         }
4056 }
4057
4058 func testClientRequestBodyErrorCloseAfterLength(t *testing.T, e env) {
4059         te := newTest(t, e)
4060         te.declareLogNoise("Server.processUnaryRPC failed to write status")
4061         ts := &funcServer{unaryCall: func(ctx context.Context, in *testpb.SimpleRequest) (*testpb.SimpleResponse, error) {
4062                 errUnexpectedCall := errors.New("unexpected call func server method")
4063                 t.Error(errUnexpectedCall)
4064                 return nil, errUnexpectedCall
4065         }}
4066         te.startServer(ts)
4067         defer te.tearDown()
4068         te.withServerTester(func(st *serverTester) {
4069                 st.writeHeadersGRPC(1, "/grpc.testing.TestService/UnaryCall")
4070                 // say we're sending 5 bytes, but then close the connection instead.
4071                 st.writeData(1, false, []byte{0, 0, 0, 0, 5})
4072                 st.cc.Close()
4073         })
4074 }
4075
4076 func TestClientRequestBodyErrorCancel(t *testing.T) {
4077         defer leakcheck.Check(t)
4078         for _, e := range listTestEnv() {
4079                 testClientRequestBodyErrorCancel(t, e)
4080         }
4081 }
4082
4083 func testClientRequestBodyErrorCancel(t *testing.T, e env) {
4084         te := newTest(t, e)
4085         gotCall := make(chan bool, 1)
4086         ts := &funcServer{unaryCall: func(ctx context.Context, in *testpb.SimpleRequest) (*testpb.SimpleResponse, error) {
4087                 gotCall <- true
4088                 return new(testpb.SimpleResponse), nil
4089         }}
4090         te.startServer(ts)
4091         defer te.tearDown()
4092         te.withServerTester(func(st *serverTester) {
4093                 st.writeHeadersGRPC(1, "/grpc.testing.TestService/UnaryCall")
4094                 // Say we have 5 bytes coming, but cancel it instead.
4095                 st.writeRSTStream(1, http2.ErrCodeCancel)
4096                 st.writeData(1, false, []byte{0, 0, 0, 0, 5})
4097
4098                 // Verify we didn't a call yet.
4099                 select {
4100                 case <-gotCall:
4101                         t.Fatal("unexpected call")
4102                 default:
4103                 }
4104
4105                 // And now send an uncanceled (but still invalid), just to get a response.
4106                 st.writeHeadersGRPC(3, "/grpc.testing.TestService/UnaryCall")
4107                 st.writeData(3, true, []byte{0, 0, 0, 0, 0})
4108                 <-gotCall
4109                 st.wantAnyFrame()
4110         })
4111 }
4112
4113 func TestClientRequestBodyErrorCancelStreamingInput(t *testing.T) {
4114         defer leakcheck.Check(t)
4115         for _, e := range listTestEnv() {
4116                 testClientRequestBodyErrorCancelStreamingInput(t, e)
4117         }
4118 }
4119
4120 func testClientRequestBodyErrorCancelStreamingInput(t *testing.T, e env) {
4121         te := newTest(t, e)
4122         recvErr := make(chan error, 1)
4123         ts := &funcServer{streamingInputCall: func(stream testpb.TestService_StreamingInputCallServer) error {
4124                 _, err := stream.Recv()
4125                 recvErr <- err
4126                 return nil
4127         }}
4128         te.startServer(ts)
4129         defer te.tearDown()
4130         te.withServerTester(func(st *serverTester) {
4131                 st.writeHeadersGRPC(1, "/grpc.testing.TestService/StreamingInputCall")
4132                 // Say we have 5 bytes coming, but cancel it instead.
4133                 st.writeData(1, false, []byte{0, 0, 0, 0, 5})
4134                 st.writeRSTStream(1, http2.ErrCodeCancel)
4135
4136                 var got error
4137                 select {
4138                 case got = <-recvErr:
4139                 case <-time.After(3 * time.Second):
4140                         t.Fatal("timeout waiting for error")
4141                 }
4142                 if grpc.Code(got) != codes.Canceled {
4143                         t.Errorf("error = %#v; want error code %s", got, codes.Canceled)
4144                 }
4145         })
4146 }
4147
4148 type clientTimeoutCreds struct {
4149         timeoutReturned bool
4150 }
4151
4152 func (c *clientTimeoutCreds) ClientHandshake(ctx context.Context, addr string, rawConn net.Conn) (net.Conn, credentials.AuthInfo, error) {
4153         if !c.timeoutReturned {
4154                 c.timeoutReturned = true
4155                 return nil, nil, context.DeadlineExceeded
4156         }
4157         return rawConn, nil, nil
4158 }
4159 func (c *clientTimeoutCreds) ServerHandshake(rawConn net.Conn) (net.Conn, credentials.AuthInfo, error) {
4160         return rawConn, nil, nil
4161 }
4162 func (c *clientTimeoutCreds) Info() credentials.ProtocolInfo {
4163         return credentials.ProtocolInfo{}
4164 }
4165 func (c *clientTimeoutCreds) Clone() credentials.TransportCredentials {
4166         return nil
4167 }
4168 func (c *clientTimeoutCreds) OverrideServerName(s string) error {
4169         return nil
4170 }
4171
4172 func TestNonFailFastRPCSucceedOnTimeoutCreds(t *testing.T) {
4173         te := newTest(t, env{name: "timeout-cred", network: "tcp", security: "clientTimeoutCreds", balancer: "v1"})
4174         te.userAgent = testAppUA
4175         te.startServer(&testServer{security: te.e.security})
4176         defer te.tearDown()
4177
4178         cc := te.clientConn()
4179         tc := testpb.NewTestServiceClient(cc)
4180         // This unary call should succeed, because ClientHandshake will succeed for the second time.
4181         if _, err := tc.EmptyCall(context.Background(), &testpb.Empty{}, grpc.FailFast(false)); err != nil {
4182                 te.t.Fatalf("TestService/EmptyCall(_, _) = _, %v, want <nil>", err)
4183         }
4184 }
4185
4186 type serverDispatchCred struct {
4187         rawConnCh chan net.Conn
4188 }
4189
4190 func newServerDispatchCred() *serverDispatchCred {
4191         return &serverDispatchCred{
4192                 rawConnCh: make(chan net.Conn, 1),
4193         }
4194 }
4195 func (c *serverDispatchCred) ClientHandshake(ctx context.Context, addr string, rawConn net.Conn) (net.Conn, credentials.AuthInfo, error) {
4196         return rawConn, nil, nil
4197 }
4198 func (c *serverDispatchCred) ServerHandshake(rawConn net.Conn) (net.Conn, credentials.AuthInfo, error) {
4199         select {
4200         case c.rawConnCh <- rawConn:
4201         default:
4202         }
4203         return nil, nil, credentials.ErrConnDispatched
4204 }
4205 func (c *serverDispatchCred) Info() credentials.ProtocolInfo {
4206         return credentials.ProtocolInfo{}
4207 }
4208 func (c *serverDispatchCred) Clone() credentials.TransportCredentials {
4209         return nil
4210 }
4211 func (c *serverDispatchCred) OverrideServerName(s string) error {
4212         return nil
4213 }
4214 func (c *serverDispatchCred) getRawConn() net.Conn {
4215         return <-c.rawConnCh
4216 }
4217
4218 func TestServerCredsDispatch(t *testing.T) {
4219         lis, err := net.Listen("tcp", "localhost:0")
4220         if err != nil {
4221                 t.Fatalf("Failed to listen: %v", err)
4222         }
4223         cred := newServerDispatchCred()
4224         s := grpc.NewServer(grpc.Creds(cred))
4225         go s.Serve(lis)
4226         defer s.Stop()
4227
4228         cc, err := grpc.Dial(lis.Addr().String(), grpc.WithTransportCredentials(cred))
4229         if err != nil {
4230                 t.Fatalf("grpc.Dial(%q) = %v", lis.Addr().String(), err)
4231         }
4232         defer cc.Close()
4233
4234         rawConn := cred.getRawConn()
4235         // Give grpc a chance to see the error and potentially close the connection.
4236         // And check that connection is not closed after that.
4237         time.Sleep(100 * time.Millisecond)
4238         // Check rawConn is not closed.
4239         if n, err := rawConn.Write([]byte{0}); n <= 0 || err != nil {
4240                 t.Errorf("Read() = %v, %v; want n>0, <nil>", n, err)
4241         }
4242 }
4243
4244 type authorityCheckCreds struct {
4245         got string
4246 }
4247
4248 func (c *authorityCheckCreds) ServerHandshake(rawConn net.Conn) (net.Conn, credentials.AuthInfo, error) {
4249         return rawConn, nil, nil
4250 }
4251 func (c *authorityCheckCreds) ClientHandshake(ctx context.Context, authority string, rawConn net.Conn) (net.Conn, credentials.AuthInfo, error) {
4252         c.got = authority
4253         return rawConn, nil, nil
4254 }
4255 func (c *authorityCheckCreds) Info() credentials.ProtocolInfo {
4256         return credentials.ProtocolInfo{}
4257 }
4258 func (c *authorityCheckCreds) Clone() credentials.TransportCredentials {
4259         return c
4260 }
4261 func (c *authorityCheckCreds) OverrideServerName(s string) error {
4262         return nil
4263 }
4264
4265 // This test makes sure that the authority client handshake gets is the endpoint
4266 // in dial target, not the resolved ip address.
4267 func TestCredsHandshakeAuthority(t *testing.T) {
4268         const testAuthority = "test.auth.ori.ty"
4269
4270         lis, err := net.Listen("tcp", "localhost:0")
4271         if err != nil {
4272                 t.Fatalf("Failed to listen: %v", err)
4273         }
4274         cred := &authorityCheckCreds{}
4275         s := grpc.NewServer()
4276         go s.Serve(lis)
4277         defer s.Stop()
4278
4279         r, rcleanup := manual.GenerateAndRegisterManualResolver()
4280         defer rcleanup()
4281
4282         cc, err := grpc.Dial(r.Scheme()+":///"+testAuthority, grpc.WithTransportCredentials(cred))
4283         if err != nil {
4284                 t.Fatalf("grpc.Dial(%q) = %v", lis.Addr().String(), err)
4285         }
4286         defer cc.Close()
4287         r.NewAddress([]resolver.Address{{Addr: lis.Addr().String()}})
4288
4289         ctx, cancel := context.WithTimeout(context.Background(), 100*time.Millisecond)
4290         defer cancel()
4291         for {
4292                 s := cc.GetState()
4293                 if s == connectivity.Ready {
4294                         break
4295                 }
4296                 if !cc.WaitForStateChange(ctx, s) {
4297                         // ctx got timeout or canceled.
4298                         t.Fatalf("ClientConn is not ready after 100 ms")
4299                 }
4300         }
4301
4302         if cred.got != testAuthority {
4303                 t.Fatalf("client creds got authority: %q, want: %q", cred.got, testAuthority)
4304         }
4305 }
4306
4307 func TestFlowControlLogicalRace(t *testing.T) {
4308         // Test for a regression of https://github.com/grpc/grpc-go/issues/632,
4309         // and other flow control bugs.
4310
4311         defer leakcheck.Check(t)
4312
4313         const (
4314                 itemCount   = 100
4315                 itemSize    = 1 << 10
4316                 recvCount   = 2
4317                 maxFailures = 3
4318
4319                 requestTimeout = time.Second * 5
4320         )
4321
4322         requestCount := 10000
4323         if raceMode {
4324                 requestCount = 1000
4325         }
4326
4327         lis, err := net.Listen("tcp", "localhost:0")
4328         if err != nil {
4329                 t.Fatalf("Failed to listen: %v", err)
4330         }
4331         defer lis.Close()
4332
4333         s := grpc.NewServer()
4334         testpb.RegisterTestServiceServer(s, &flowControlLogicalRaceServer{
4335                 itemCount: itemCount,
4336                 itemSize:  itemSize,
4337         })
4338         defer s.Stop()
4339
4340         go s.Serve(lis)
4341
4342         ctx := context.Background()
4343
4344         cc, err := grpc.Dial(lis.Addr().String(), grpc.WithInsecure(), grpc.WithBlock())
4345         if err != nil {
4346                 t.Fatalf("grpc.Dial(%q) = %v", lis.Addr().String(), err)
4347         }
4348         defer cc.Close()
4349         cl := testpb.NewTestServiceClient(cc)
4350
4351         failures := 0
4352         for i := 0; i < requestCount; i++ {
4353                 ctx, cancel := context.WithTimeout(ctx, requestTimeout)
4354                 output, err := cl.StreamingOutputCall(ctx, &testpb.StreamingOutputCallRequest{})
4355                 if err != nil {
4356                         t.Fatalf("StreamingOutputCall; err = %q", err)
4357                 }
4358
4359                 j := 0
4360         loop:
4361                 for ; j < recvCount; j++ {
4362                         _, err := output.Recv()
4363                         if err != nil {
4364                                 if err == io.EOF {
4365                                         break loop
4366                                 }
4367                                 switch grpc.Code(err) {
4368                                 case codes.DeadlineExceeded:
4369                                         break loop
4370                                 default:
4371                                         t.Fatalf("Recv; err = %q", err)
4372                                 }
4373                         }
4374                 }
4375                 cancel()
4376                 <-ctx.Done()
4377
4378                 if j < recvCount {
4379                         t.Errorf("got %d responses to request %d", j, i)
4380                         failures++
4381                         if failures >= maxFailures {
4382                                 // Continue past the first failure to see if the connection is
4383                                 // entirely broken, or if only a single RPC was affected
4384                                 break
4385                         }
4386                 }
4387         }
4388 }
4389
4390 type flowControlLogicalRaceServer struct {
4391         testpb.TestServiceServer
4392
4393         itemSize  int
4394         itemCount int
4395 }
4396
4397 func (s *flowControlLogicalRaceServer) StreamingOutputCall(req *testpb.StreamingOutputCallRequest, srv testpb.TestService_StreamingOutputCallServer) error {
4398         for i := 0; i < s.itemCount; i++ {
4399                 err := srv.Send(&testpb.StreamingOutputCallResponse{
4400                         Payload: &testpb.Payload{
4401                                 // Sending a large stream of data which the client reject
4402                                 // helps to trigger some types of flow control bugs.
4403                                 //
4404                                 // Reallocating memory here is inefficient, but the stress it
4405                                 // puts on the GC leads to more frequent flow control
4406                                 // failures. The GC likely causes more variety in the
4407                                 // goroutine scheduling orders.
4408                                 Body: bytes.Repeat([]byte("a"), s.itemSize),
4409                         },
4410                 })
4411                 if err != nil {
4412                         return err
4413                 }
4414         }
4415         return nil
4416 }
4417
4418 type lockingWriter struct {
4419         mu sync.Mutex
4420         w  io.Writer
4421 }
4422
4423 func (lw *lockingWriter) Write(p []byte) (n int, err error) {
4424         lw.mu.Lock()
4425         defer lw.mu.Unlock()
4426         return lw.w.Write(p)
4427 }
4428
4429 func (lw *lockingWriter) setWriter(w io.Writer) {
4430         lw.mu.Lock()
4431         defer lw.mu.Unlock()
4432         lw.w = w
4433 }
4434
4435 var testLogOutput = &lockingWriter{w: os.Stderr}
4436
4437 // awaitNewConnLogOutput waits for any of grpc.NewConn's goroutines to
4438 // terminate, if they're still running. It spams logs with this
4439 // message.  We wait for it so our log filter is still
4440 // active. Otherwise the "defer restore()" at the top of various test
4441 // functions restores our log filter and then the goroutine spams.
4442 func awaitNewConnLogOutput() {
4443         awaitLogOutput(50*time.Millisecond, "grpc: the client connection is closing; please retry")
4444 }
4445
4446 func awaitLogOutput(maxWait time.Duration, phrase string) {
4447         pb := []byte(phrase)
4448
4449         timer := time.NewTimer(maxWait)
4450         defer timer.Stop()
4451         wakeup := make(chan bool, 1)
4452         for {
4453                 if logOutputHasContents(pb, wakeup) {
4454                         return
4455                 }
4456                 select {
4457                 case <-timer.C:
4458                         // Too slow. Oh well.
4459                         return
4460                 case <-wakeup:
4461                 }
4462         }
4463 }
4464
4465 func logOutputHasContents(v []byte, wakeup chan<- bool) bool {
4466         testLogOutput.mu.Lock()
4467         defer testLogOutput.mu.Unlock()
4468         fw, ok := testLogOutput.w.(*filterWriter)
4469         if !ok {
4470                 return false
4471         }
4472         fw.mu.Lock()
4473         defer fw.mu.Unlock()
4474         if bytes.Contains(fw.buf.Bytes(), v) {
4475                 return true
4476         }
4477         fw.wakeup = wakeup
4478         return false
4479 }
4480
4481 var verboseLogs = flag.Bool("verbose_logs", false, "show all grpclog output, without filtering")
4482
4483 func noop() {}
4484
4485 // declareLogNoise declares that t is expected to emit the following noisy phrases,
4486 // even on success. Those phrases will be filtered from grpclog output
4487 // and only be shown if *verbose_logs or t ends up failing.
4488 // The returned restore function should be called with defer to be run
4489 // before the test ends.
4490 func declareLogNoise(t *testing.T, phrases ...string) (restore func()) {
4491         if *verboseLogs {
4492                 return noop
4493         }
4494         fw := &filterWriter{dst: os.Stderr, filter: phrases}
4495         testLogOutput.setWriter(fw)
4496         return func() {
4497                 if t.Failed() {
4498                         fw.mu.Lock()
4499                         defer fw.mu.Unlock()
4500                         if fw.buf.Len() > 0 {
4501                                 t.Logf("Complete log output:\n%s", fw.buf.Bytes())
4502                         }
4503                 }
4504                 testLogOutput.setWriter(os.Stderr)
4505         }
4506 }
4507
4508 type filterWriter struct {
4509         dst    io.Writer
4510         filter []string
4511
4512         mu     sync.Mutex
4513         buf    bytes.Buffer
4514         wakeup chan<- bool // if non-nil, gets true on write
4515 }
4516
4517 func (fw *filterWriter) Write(p []byte) (n int, err error) {
4518         fw.mu.Lock()
4519         fw.buf.Write(p)
4520         if fw.wakeup != nil {
4521                 select {
4522                 case fw.wakeup <- true:
4523                 default:
4524                 }
4525         }
4526         fw.mu.Unlock()
4527
4528         ps := string(p)
4529         for _, f := range fw.filter {
4530                 if strings.Contains(ps, f) {
4531                         return len(p), nil
4532                 }
4533         }
4534         return fw.dst.Write(p)
4535 }
4536
4537 // stubServer is a server that is easy to customize within individual test
4538 // cases.
4539 type stubServer struct {
4540         // Guarantees we satisfy this interface; panics if unimplemented methods are called.
4541         testpb.TestServiceServer
4542
4543         // Customizable implementations of server handlers.
4544         emptyCall      func(ctx context.Context, in *testpb.Empty) (*testpb.Empty, error)
4545         fullDuplexCall func(stream testpb.TestService_FullDuplexCallServer) error
4546
4547         // A client connected to this service the test may use.  Created in Start().
4548         client testpb.TestServiceClient
4549
4550         cleanups []func() // Lambdas executed in Stop(); populated by Start().
4551 }
4552
4553 func (ss *stubServer) EmptyCall(ctx context.Context, in *testpb.Empty) (*testpb.Empty, error) {
4554         return ss.emptyCall(ctx, in)
4555 }
4556
4557 func (ss *stubServer) FullDuplexCall(stream testpb.TestService_FullDuplexCallServer) error {
4558         return ss.fullDuplexCall(stream)
4559 }
4560
4561 // Start starts the server and creates a client connected to it.
4562 func (ss *stubServer) Start(sopts []grpc.ServerOption) error {
4563         lis, err := net.Listen("tcp", "localhost:0")
4564         if err != nil {
4565                 return fmt.Errorf(`net.Listen("tcp", "localhost:0") = %v`, err)
4566         }
4567         ss.cleanups = append(ss.cleanups, func() { lis.Close() })
4568
4569         s := grpc.NewServer(sopts...)
4570         testpb.RegisterTestServiceServer(s, ss)
4571         go s.Serve(lis)
4572         ss.cleanups = append(ss.cleanups, s.Stop)
4573
4574         cc, err := grpc.Dial(lis.Addr().String(), grpc.WithInsecure(), grpc.WithBlock())
4575         if err != nil {
4576                 return fmt.Errorf("grpc.Dial(%q) = %v", lis.Addr().String(), err)
4577         }
4578         ss.cleanups = append(ss.cleanups, func() { cc.Close() })
4579
4580         ss.client = testpb.NewTestServiceClient(cc)
4581         return nil
4582 }
4583
4584 func (ss *stubServer) Stop() {
4585         for i := len(ss.cleanups) - 1; i >= 0; i-- {
4586                 ss.cleanups[i]()
4587         }
4588 }
4589
4590 func TestUnaryProxyDoesNotForwardMetadata(t *testing.T) {
4591         const mdkey = "somedata"
4592
4593         // endpoint ensures mdkey is NOT in metadata and returns an error if it is.
4594         endpoint := &stubServer{
4595                 emptyCall: func(ctx context.Context, in *testpb.Empty) (*testpb.Empty, error) {
4596                         if md, ok := metadata.FromIncomingContext(ctx); !ok || md[mdkey] != nil {
4597                                 return nil, status.Errorf(codes.Internal, "endpoint: md=%v; want !contains(%q)", md, mdkey)
4598                         }
4599                         return &testpb.Empty{}, nil
4600                 },
4601         }
4602         if err := endpoint.Start(nil); err != nil {
4603                 t.Fatalf("Error starting endpoint server: %v", err)
4604         }
4605         defer endpoint.Stop()
4606
4607         // proxy ensures mdkey IS in metadata, then forwards the RPC to endpoint
4608         // without explicitly copying the metadata.
4609         proxy := &stubServer{
4610                 emptyCall: func(ctx context.Context, in *testpb.Empty) (*testpb.Empty, error) {
4611                         if md, ok := metadata.FromIncomingContext(ctx); !ok || md[mdkey] == nil {
4612                                 return nil, status.Errorf(codes.Internal, "proxy: md=%v; want contains(%q)", md, mdkey)
4613                         }
4614                         return endpoint.client.EmptyCall(ctx, in)
4615                 },
4616         }
4617         if err := proxy.Start(nil); err != nil {
4618                 t.Fatalf("Error starting proxy server: %v", err)
4619         }
4620         defer proxy.Stop()
4621
4622         ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
4623         defer cancel()
4624         md := metadata.Pairs(mdkey, "val")
4625         ctx = metadata.NewOutgoingContext(ctx, md)
4626
4627         // Sanity check that endpoint properly errors when it sees mdkey.
4628         _, err := endpoint.client.EmptyCall(ctx, &testpb.Empty{})
4629         if s, ok := status.FromError(err); !ok || s.Code() != codes.Internal {
4630                 t.Fatalf("endpoint.client.EmptyCall(_, _) = _, %v; want _, <status with Code()=Internal>", err)
4631         }
4632
4633         if _, err := proxy.client.EmptyCall(ctx, &testpb.Empty{}); err != nil {
4634                 t.Fatal(err.Error())
4635         }
4636 }
4637
4638 func TestStreamingProxyDoesNotForwardMetadata(t *testing.T) {
4639         const mdkey = "somedata"
4640
4641         // doFDC performs a FullDuplexCall with client and returns the error from the
4642         // first stream.Recv call, or nil if that error is io.EOF.  Calls t.Fatal if
4643         // the stream cannot be established.
4644         doFDC := func(ctx context.Context, client testpb.TestServiceClient) error {
4645                 stream, err := client.FullDuplexCall(ctx)
4646                 if err != nil {
4647                         t.Fatalf("Unwanted error: %v", err)
4648                 }
4649                 if _, err := stream.Recv(); err != io.EOF {
4650                         return err
4651                 }
4652                 return nil
4653         }
4654
4655         // endpoint ensures mdkey is NOT in metadata and returns an error if it is.
4656         endpoint := &stubServer{
4657                 fullDuplexCall: func(stream testpb.TestService_FullDuplexCallServer) error {
4658                         ctx := stream.Context()
4659                         if md, ok := metadata.FromIncomingContext(ctx); !ok || md[mdkey] != nil {
4660                                 return status.Errorf(codes.Internal, "endpoint: md=%v; want !contains(%q)", md, mdkey)
4661                         }
4662                         return nil
4663                 },
4664         }
4665         if err := endpoint.Start(nil); err != nil {
4666                 t.Fatalf("Error starting endpoint server: %v", err)
4667         }
4668         defer endpoint.Stop()
4669
4670         // proxy ensures mdkey IS in metadata, then forwards the RPC to endpoint
4671         // without explicitly copying the metadata.
4672         proxy := &stubServer{
4673                 fullDuplexCall: func(stream testpb.TestService_FullDuplexCallServer) error {
4674                         ctx := stream.Context()
4675                         if md, ok := metadata.FromIncomingContext(ctx); !ok || md[mdkey] == nil {
4676                                 return status.Errorf(codes.Internal, "endpoint: md=%v; want !contains(%q)", md, mdkey)
4677                         }
4678                         return doFDC(ctx, endpoint.client)
4679                 },
4680         }
4681         if err := proxy.Start(nil); err != nil {
4682                 t.Fatalf("Error starting proxy server: %v", err)
4683         }
4684         defer proxy.Stop()
4685
4686         ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
4687         defer cancel()
4688         md := metadata.Pairs(mdkey, "val")
4689         ctx = metadata.NewOutgoingContext(ctx, md)
4690
4691         // Sanity check that endpoint properly errors when it sees mdkey in ctx.
4692         err := doFDC(ctx, endpoint.client)
4693         if s, ok := status.FromError(err); !ok || s.Code() != codes.Internal {
4694                 t.Fatalf("stream.Recv() = _, %v; want _, <status with Code()=Internal>", err)
4695         }
4696
4697         if err := doFDC(ctx, proxy.client); err != nil {
4698                 t.Fatalf("doFDC(_, proxy.client) = %v; want nil", err)
4699         }
4700 }
4701
4702 func TestStatsTagsAndTrace(t *testing.T) {
4703         // Data added to context by client (typically in a stats handler).
4704         tags := []byte{1, 5, 2, 4, 3}
4705         trace := []byte{5, 2, 1, 3, 4}
4706
4707         // endpoint ensures Tags() and Trace() in context match those that were added
4708         // by the client and returns an error if not.
4709         endpoint := &stubServer{
4710                 emptyCall: func(ctx context.Context, in *testpb.Empty) (*testpb.Empty, error) {
4711                         md, _ := metadata.FromIncomingContext(ctx)
4712                         if tg := stats.Tags(ctx); !reflect.DeepEqual(tg, tags) {
4713                                 return nil, status.Errorf(codes.Internal, "stats.Tags(%v)=%v; want %v", ctx, tg, tags)
4714                         }
4715                         if !reflect.DeepEqual(md["grpc-tags-bin"], []string{string(tags)}) {
4716                                 return nil, status.Errorf(codes.Internal, "md['grpc-tags-bin']=%v; want %v", md["grpc-tags-bin"], tags)
4717                         }
4718                         if tr := stats.Trace(ctx); !reflect.DeepEqual(tr, trace) {
4719                                 return nil, status.Errorf(codes.Internal, "stats.Trace(%v)=%v; want %v", ctx, tr, trace)
4720                         }
4721                         if !reflect.DeepEqual(md["grpc-trace-bin"], []string{string(trace)}) {
4722                                 return nil, status.Errorf(codes.Internal, "md['grpc-trace-bin']=%v; want %v", md["grpc-trace-bin"], trace)
4723                         }
4724                         return &testpb.Empty{}, nil
4725                 },
4726         }
4727         if err := endpoint.Start(nil); err != nil {
4728                 t.Fatalf("Error starting endpoint server: %v", err)
4729         }
4730         defer endpoint.Stop()
4731
4732         ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
4733         defer cancel()
4734
4735         testCases := []struct {
4736                 ctx  context.Context
4737                 want codes.Code
4738         }{
4739                 {ctx: ctx, want: codes.Internal},
4740                 {ctx: stats.SetTags(ctx, tags), want: codes.Internal},
4741                 {ctx: stats.SetTrace(ctx, trace), want: codes.Internal},
4742                 {ctx: stats.SetTags(stats.SetTrace(ctx, tags), tags), want: codes.Internal},
4743                 {ctx: stats.SetTags(stats.SetTrace(ctx, trace), tags), want: codes.OK},
4744         }
4745
4746         for _, tc := range testCases {
4747                 _, err := endpoint.client.EmptyCall(tc.ctx, &testpb.Empty{})
4748                 if tc.want == codes.OK && err != nil {
4749                         t.Fatalf("endpoint.client.EmptyCall(%v, _) = _, %v; want _, nil", tc.ctx, err)
4750                 }
4751                 if s, ok := status.FromError(err); !ok || s.Code() != tc.want {
4752                         t.Fatalf("endpoint.client.EmptyCall(%v, _) = _, %v; want _, <status with Code()=%v>", tc.ctx, err, tc.want)
4753                 }
4754         }
4755 }
4756
4757 func TestTapTimeout(t *testing.T) {
4758         sopts := []grpc.ServerOption{
4759                 grpc.InTapHandle(func(ctx context.Context, _ *tap.Info) (context.Context, error) {
4760                         c, cancel := context.WithCancel(ctx)
4761                         // Call cancel instead of setting a deadline so we can detect which error
4762                         // occurred -- this cancellation (desired) or the client's deadline
4763                         // expired (indicating this cancellation did not affect the RPC).
4764                         time.AfterFunc(10*time.Millisecond, cancel)
4765                         return c, nil
4766                 }),
4767         }
4768
4769         ss := &stubServer{
4770                 emptyCall: func(ctx context.Context, in *testpb.Empty) (*testpb.Empty, error) {
4771                         <-ctx.Done()
4772                         return &testpb.Empty{}, nil
4773                 },
4774         }
4775         if err := ss.Start(sopts); err != nil {
4776                 t.Fatalf("Error starting endpoint server: %v", err)
4777         }
4778         defer ss.Stop()
4779
4780         // This was known to be flaky; test several times.
4781         for i := 0; i < 10; i++ {
4782                 // Set our own deadline in case the server hangs.
4783                 ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
4784                 res, err := ss.client.EmptyCall(ctx, &testpb.Empty{})
4785                 cancel()
4786                 if s, ok := status.FromError(err); !ok || s.Code() != codes.Canceled {
4787                         t.Fatalf("ss.client.EmptyCall(context.Background(), _) = %v, %v; want nil, <status with Code()=Canceled>", res, err)
4788                 }
4789         }
4790 }
4791
4792 type windowSizeConfig struct {
4793         serverStream int32
4794         serverConn   int32
4795         clientStream int32
4796         clientConn   int32
4797 }
4798
4799 func max(a, b int32) int32 {
4800         if a > b {
4801                 return a
4802         }
4803         return b
4804 }
4805
4806 func TestConfigurableWindowSizeWithLargeWindow(t *testing.T) {
4807         defer leakcheck.Check(t)
4808         wc := windowSizeConfig{
4809                 serverStream: 8 * 1024 * 1024,
4810                 serverConn:   12 * 1024 * 1024,
4811                 clientStream: 6 * 1024 * 1024,
4812                 clientConn:   8 * 1024 * 1024,
4813         }
4814         for _, e := range listTestEnv() {
4815                 testConfigurableWindowSize(t, e, wc)
4816         }
4817 }
4818
4819 func TestConfigurableWindowSizeWithSmallWindow(t *testing.T) {
4820         defer leakcheck.Check(t)
4821         wc := windowSizeConfig{
4822                 serverStream: 1,
4823                 serverConn:   1,
4824                 clientStream: 1,
4825                 clientConn:   1,
4826         }
4827         for _, e := range listTestEnv() {
4828                 testConfigurableWindowSize(t, e, wc)
4829         }
4830 }
4831
4832 func testConfigurableWindowSize(t *testing.T, e env, wc windowSizeConfig) {
4833         te := newTest(t, e)
4834         te.serverInitialWindowSize = wc.serverStream
4835         te.serverInitialConnWindowSize = wc.serverConn
4836         te.clientInitialWindowSize = wc.clientStream
4837         te.clientInitialConnWindowSize = wc.clientConn
4838
4839         te.startServer(&testServer{security: e.security})
4840         defer te.tearDown()
4841
4842         cc := te.clientConn()
4843         tc := testpb.NewTestServiceClient(cc)
4844         stream, err := tc.FullDuplexCall(context.Background())
4845         if err != nil {
4846                 t.Fatalf("%v.FullDuplexCall(_) = _, %v, want <nil>", tc, err)
4847         }
4848         numOfIter := 11
4849         // Set message size to exhaust largest of window sizes.
4850         messageSize := max(max(wc.serverStream, wc.serverConn), max(wc.clientStream, wc.clientConn)) / int32(numOfIter-1)
4851         messageSize = max(messageSize, 64*1024)
4852         payload, err := newPayload(testpb.PayloadType_COMPRESSABLE, messageSize)
4853         if err != nil {
4854                 t.Fatal(err)
4855         }
4856         respParams := []*testpb.ResponseParameters{
4857                 {
4858                         Size: messageSize,
4859                 },
4860         }
4861         req := &testpb.StreamingOutputCallRequest{
4862                 ResponseType:       testpb.PayloadType_COMPRESSABLE,
4863                 ResponseParameters: respParams,
4864                 Payload:            payload,
4865         }
4866         for i := 0; i < numOfIter; i++ {
4867                 if err := stream.Send(req); err != nil {
4868                         t.Fatalf("%v.Send(%v) = %v, want <nil>", stream, req, err)
4869                 }
4870                 if _, err := stream.Recv(); err != nil {
4871                         t.Fatalf("%v.Recv() = _, %v, want _, <nil>", stream, err)
4872                 }
4873         }
4874         if err := stream.CloseSend(); err != nil {
4875                 t.Fatalf("%v.CloseSend() = %v, want <nil>", stream, err)
4876         }
4877 }
4878
4879 var (
4880         // test authdata
4881         authdata = map[string]string{
4882                 "test-key":      "test-value",
4883                 "test-key2-bin": string([]byte{1, 2, 3}),
4884         }
4885 )
4886
4887 type testPerRPCCredentials struct{}
4888
4889 func (cr testPerRPCCredentials) GetRequestMetadata(ctx context.Context, uri ...string) (map[string]string, error) {
4890         return authdata, nil
4891 }
4892
4893 func (cr testPerRPCCredentials) RequireTransportSecurity() bool {
4894         return false
4895 }
4896
4897 func authHandle(ctx context.Context, info *tap.Info) (context.Context, error) {
4898         md, ok := metadata.FromIncomingContext(ctx)
4899         if !ok {
4900                 return ctx, fmt.Errorf("didn't find metadata in context")
4901         }
4902         for k, vwant := range authdata {
4903                 vgot, ok := md[k]
4904                 if !ok {
4905                         return ctx, fmt.Errorf("didn't find authdata key %v in context", k)
4906                 }
4907                 if vgot[0] != vwant {
4908                         return ctx, fmt.Errorf("for key %v, got value %v, want %v", k, vgot, vwant)
4909                 }
4910         }
4911         return ctx, nil
4912 }
4913
4914 func TestPerRPCCredentialsViaDialOptions(t *testing.T) {
4915         defer leakcheck.Check(t)
4916         for _, e := range listTestEnv() {
4917                 testPerRPCCredentialsViaDialOptions(t, e)
4918         }
4919 }
4920
4921 func testPerRPCCredentialsViaDialOptions(t *testing.T, e env) {
4922         te := newTest(t, e)
4923         te.tapHandle = authHandle
4924         te.perRPCCreds = testPerRPCCredentials{}
4925         te.startServer(&testServer{security: e.security})
4926         defer te.tearDown()
4927
4928         cc := te.clientConn()
4929         tc := testpb.NewTestServiceClient(cc)
4930         if _, err := tc.EmptyCall(context.Background(), &testpb.Empty{}); err != nil {
4931                 t.Fatalf("Test failed. Reason: %v", err)
4932         }
4933 }
4934
4935 func TestPerRPCCredentialsViaCallOptions(t *testing.T) {
4936         defer leakcheck.Check(t)
4937         for _, e := range listTestEnv() {
4938                 testPerRPCCredentialsViaCallOptions(t, e)
4939         }
4940 }
4941
4942 func testPerRPCCredentialsViaCallOptions(t *testing.T, e env) {
4943         te := newTest(t, e)
4944         te.tapHandle = authHandle
4945         te.startServer(&testServer{security: e.security})
4946         defer te.tearDown()
4947
4948         cc := te.clientConn()
4949         tc := testpb.NewTestServiceClient(cc)
4950         if _, err := tc.EmptyCall(context.Background(), &testpb.Empty{}, grpc.PerRPCCredentials(testPerRPCCredentials{})); err != nil {
4951                 t.Fatalf("Test failed. Reason: %v", err)
4952         }
4953 }
4954
4955 func TestPerRPCCredentialsViaDialOptionsAndCallOptions(t *testing.T) {
4956         defer leakcheck.Check(t)
4957         for _, e := range listTestEnv() {
4958                 testPerRPCCredentialsViaDialOptionsAndCallOptions(t, e)
4959         }
4960 }
4961
4962 func testPerRPCCredentialsViaDialOptionsAndCallOptions(t *testing.T, e env) {
4963         te := newTest(t, e)
4964         te.perRPCCreds = testPerRPCCredentials{}
4965         // When credentials are provided via both dial options and call options,
4966         // we apply both sets.
4967         te.tapHandle = func(ctx context.Context, _ *tap.Info) (context.Context, error) {
4968                 md, ok := metadata.FromIncomingContext(ctx)
4969                 if !ok {
4970                         return ctx, fmt.Errorf("couldn't find metadata in context")
4971                 }
4972                 for k, vwant := range authdata {
4973                         vgot, ok := md[k]
4974                         if !ok {
4975                                 return ctx, fmt.Errorf("couldn't find metadata for key %v", k)
4976                         }
4977                         if len(vgot) != 2 {
4978                                 return ctx, fmt.Errorf("len of value for key %v was %v, want 2", k, len(vgot))
4979                         }
4980                         if vgot[0] != vwant || vgot[1] != vwant {
4981                                 return ctx, fmt.Errorf("value for %v was %v, want [%v, %v]", k, vgot, vwant, vwant)
4982                         }
4983                 }
4984                 return ctx, nil
4985         }
4986         te.startServer(&testServer{security: e.security})
4987         defer te.tearDown()
4988
4989         cc := te.clientConn()
4990         tc := testpb.NewTestServiceClient(cc)
4991         if _, err := tc.EmptyCall(context.Background(), &testpb.Empty{}, grpc.PerRPCCredentials(testPerRPCCredentials{})); err != nil {
4992                 t.Fatalf("Test failed. Reason: %v", err)
4993         }
4994 }
4995
4996 func TestWaitForReadyConnection(t *testing.T) {
4997         defer leakcheck.Check(t)
4998         for _, e := range listTestEnv() {
4999                 testWaitForReadyConnection(t, e)
5000         }
5001
5002 }
5003
5004 func testWaitForReadyConnection(t *testing.T, e env) {
5005         te := newTest(t, e)
5006         te.userAgent = testAppUA
5007         te.startServer(&testServer{security: e.security})
5008         defer te.tearDown()
5009
5010         cc := te.clientConn() // Non-blocking dial.
5011         tc := testpb.NewTestServiceClient(cc)
5012         ctx, cancel := context.WithTimeout(context.Background(), time.Second)
5013         defer cancel()
5014         state := cc.GetState()
5015         // Wait for connection to be Ready.
5016         for ; state != connectivity.Ready && cc.WaitForStateChange(ctx, state); state = cc.GetState() {
5017         }
5018         if state != connectivity.Ready {
5019                 t.Fatalf("Want connection state to be Ready, got %v", state)
5020         }
5021         ctx, cancel = context.WithTimeout(context.Background(), time.Second)
5022         defer cancel()
5023         // Make a fail-fast RPC.
5024         if _, err := tc.EmptyCall(ctx, &testpb.Empty{}); err != nil {
5025                 t.Fatalf("TestService/EmptyCall(_,_) = _, %v, want _, nil", err)
5026         }
5027 }
5028
5029 type errCodec struct {
5030         noError bool
5031 }
5032
5033 func (c *errCodec) Marshal(v interface{}) ([]byte, error) {
5034         if c.noError {
5035                 return []byte{}, nil
5036         }
5037         return nil, fmt.Errorf("3987^12 + 4365^12 = 4472^12")
5038 }
5039
5040 func (c *errCodec) Unmarshal(data []byte, v interface{}) error {
5041         return nil
5042 }
5043
5044 func (c *errCodec) String() string {
5045         return "Fermat's near-miss."
5046 }
5047
5048 func TestEncodeDoesntPanic(t *testing.T) {
5049         defer leakcheck.Check(t)
5050         for _, e := range listTestEnv() {
5051                 testEncodeDoesntPanic(t, e)
5052         }
5053 }
5054
5055 func testEncodeDoesntPanic(t *testing.T, e env) {
5056         te := newTest(t, e)
5057         erc := &errCodec{}
5058         te.customCodec = erc
5059         te.startServer(&testServer{security: e.security})
5060         defer te.tearDown()
5061         te.customCodec = nil
5062         tc := testpb.NewTestServiceClient(te.clientConn())
5063         // Failure case, should not panic.
5064         tc.EmptyCall(context.Background(), &testpb.Empty{})
5065         erc.noError = true
5066         // Passing case.
5067         if _, err := tc.EmptyCall(context.Background(), &testpb.Empty{}); err != nil {
5068                 t.Fatalf("EmptyCall(_, _) = _, %v, want _, <nil>", err)
5069         }
5070 }
5071
5072 func TestSvrWriteStatusEarlyWrite(t *testing.T) {
5073         defer leakcheck.Check(t)
5074         for _, e := range listTestEnv() {
5075                 testSvrWriteStatusEarlyWrite(t, e)
5076         }
5077 }
5078
5079 func testSvrWriteStatusEarlyWrite(t *testing.T, e env) {
5080         te := newTest(t, e)
5081         const smallSize = 1024
5082         const largeSize = 2048
5083         const extraLargeSize = 4096
5084         te.maxServerReceiveMsgSize = newInt(largeSize)
5085         te.maxServerSendMsgSize = newInt(largeSize)
5086         smallPayload, err := newPayload(testpb.PayloadType_COMPRESSABLE, smallSize)
5087         if err != nil {
5088                 t.Fatal(err)
5089         }
5090         extraLargePayload, err := newPayload(testpb.PayloadType_COMPRESSABLE, extraLargeSize)
5091         if err != nil {
5092                 t.Fatal(err)
5093         }
5094         te.startServer(&testServer{security: e.security})
5095         defer te.tearDown()
5096         tc := testpb.NewTestServiceClient(te.clientConn())
5097         respParam := []*testpb.ResponseParameters{
5098                 {
5099                         Size: int32(smallSize),
5100                 },
5101         }
5102         sreq := &testpb.StreamingOutputCallRequest{
5103                 ResponseType:       testpb.PayloadType_COMPRESSABLE,
5104                 ResponseParameters: respParam,
5105                 Payload:            extraLargePayload,
5106         }
5107         // Test recv case: server receives a message larger than maxServerReceiveMsgSize.
5108         stream, err := tc.FullDuplexCall(te.ctx)
5109         if err != nil {
5110                 t.Fatalf("%v.FullDuplexCall(_) = _, %v, want <nil>", tc, err)
5111         }
5112         if err = stream.Send(sreq); err != nil {
5113                 t.Fatalf("%v.Send() = _, %v, want <nil>", stream, err)
5114         }
5115         if _, err = stream.Recv(); err == nil || grpc.Code(err) != codes.ResourceExhausted {
5116                 t.Fatalf("%v.Recv() = _, %v, want _, error code: %s", stream, err, codes.ResourceExhausted)
5117         }
5118         // Test send case: server sends a message larger than maxServerSendMsgSize.
5119         sreq.Payload = smallPayload
5120         respParam[0].Size = int32(extraLargeSize)
5121
5122         stream, err = tc.FullDuplexCall(te.ctx)
5123         if err != nil {
5124                 t.Fatalf("%v.FullDuplexCall(_) = _, %v, want <nil>", tc, err)
5125         }
5126         if err = stream.Send(sreq); err != nil {
5127                 t.Fatalf("%v.Send(%v) = %v, want <nil>", stream, sreq, err)
5128         }
5129         if _, err = stream.Recv(); err == nil || grpc.Code(err) != codes.ResourceExhausted {
5130                 t.Fatalf("%v.Recv() = _, %v, want _, error code: %s", stream, err, codes.ResourceExhausted)
5131         }
5132 }
5133
5134 // The following functions with function name ending with TD indicates that they
5135 // should be deleted after old service config API is deprecated and deleted.
5136 func testServiceConfigSetupTD(t *testing.T, e env) (*test, chan grpc.ServiceConfig) {
5137         te := newTest(t, e)
5138         // We write before read.
5139         ch := make(chan grpc.ServiceConfig, 1)
5140         te.sc = ch
5141         te.userAgent = testAppUA
5142         te.declareLogNoise(
5143                 "transport: http2Client.notifyError got notified that the client transport was broken EOF",
5144                 "grpc: addrConn.transportMonitor exits due to: grpc: the connection is closing",
5145                 "grpc: addrConn.resetTransport failed to create client transport: connection error",
5146                 "Failed to dial : context canceled; please retry.",
5147         )
5148         return te, ch
5149 }
5150
5151 func TestServiceConfigGetMethodConfigTD(t *testing.T) {
5152         defer leakcheck.Check(t)
5153         for _, e := range listTestEnv() {
5154                 testGetMethodConfigTD(t, e)
5155         }
5156 }
5157
5158 func testGetMethodConfigTD(t *testing.T, e env) {
5159         te, ch := testServiceConfigSetupTD(t, e)
5160         defer te.tearDown()
5161
5162         mc1 := grpc.MethodConfig{
5163                 WaitForReady: newBool(true),
5164                 Timeout:      newDuration(time.Millisecond),
5165         }
5166         mc2 := grpc.MethodConfig{WaitForReady: newBool(false)}
5167         m := make(map[string]grpc.MethodConfig)
5168         m["/grpc.testing.TestService/EmptyCall"] = mc1
5169         m["/grpc.testing.TestService/"] = mc2
5170         sc := grpc.ServiceConfig{
5171                 Methods: m,
5172         }
5173         ch <- sc
5174
5175         cc := te.clientConn()
5176         tc := testpb.NewTestServiceClient(cc)
5177         // The following RPCs are expected to become non-fail-fast ones with 1ms deadline.
5178         if _, err := tc.EmptyCall(context.Background(), &testpb.Empty{}); grpc.Code(err) != codes.DeadlineExceeded {
5179                 t.Fatalf("TestService/EmptyCall(_, _) = _, %v, want _, %s", err, codes.DeadlineExceeded)
5180         }
5181
5182         m = make(map[string]grpc.MethodConfig)
5183         m["/grpc.testing.TestService/UnaryCall"] = mc1
5184         m["/grpc.testing.TestService/"] = mc2
5185         sc = grpc.ServiceConfig{
5186                 Methods: m,
5187         }
5188         ch <- sc
5189         // Wait for the new service config to propagate.
5190         for {
5191                 if _, err := tc.EmptyCall(context.Background(), &testpb.Empty{}); grpc.Code(err) == codes.DeadlineExceeded {
5192                         continue
5193                 }
5194                 break
5195         }
5196         // The following RPCs are expected to become fail-fast.
5197         if _, err := tc.EmptyCall(context.Background(), &testpb.Empty{}); grpc.Code(err) != codes.Unavailable {
5198                 t.Fatalf("TestService/EmptyCall(_, _) = _, %v, want _, %s", err, codes.Unavailable)
5199         }
5200 }
5201
5202 func TestServiceConfigWaitForReadyTD(t *testing.T) {
5203         defer leakcheck.Check(t)
5204         for _, e := range listTestEnv() {
5205                 testServiceConfigWaitForReadyTD(t, e)
5206         }
5207 }
5208
5209 func testServiceConfigWaitForReadyTD(t *testing.T, e env) {
5210         te, ch := testServiceConfigSetupTD(t, e)
5211         defer te.tearDown()
5212
5213         // Case1: Client API set failfast to be false, and service config set wait_for_ready to be false, Client API should win, and the rpc will wait until deadline exceeds.
5214         mc := grpc.MethodConfig{
5215                 WaitForReady: newBool(false),
5216                 Timeout:      newDuration(time.Millisecond),
5217         }
5218         m := make(map[string]grpc.MethodConfig)
5219         m["/grpc.testing.TestService/EmptyCall"] = mc
5220         m["/grpc.testing.TestService/FullDuplexCall"] = mc
5221         sc := grpc.ServiceConfig{
5222                 Methods: m,
5223         }
5224         ch <- sc
5225
5226         cc := te.clientConn()
5227         tc := testpb.NewTestServiceClient(cc)
5228         // The following RPCs are expected to become non-fail-fast ones with 1ms deadline.
5229         if _, err := tc.EmptyCall(context.Background(), &testpb.Empty{}, grpc.FailFast(false)); grpc.Code(err) != codes.DeadlineExceeded {
5230                 t.Fatalf("TestService/EmptyCall(_, _) = _, %v, want _, %s", err, codes.DeadlineExceeded)
5231         }
5232         if _, err := tc.FullDuplexCall(context.Background(), grpc.FailFast(false)); grpc.Code(err) != codes.DeadlineExceeded {
5233                 t.Fatalf("TestService/FullDuplexCall(_) = _, %v, want %s", err, codes.DeadlineExceeded)
5234         }
5235
5236         // Generate a service config update.
5237         // Case2: Client API does not set failfast, and service config set wait_for_ready to be true, and the rpc will wait until deadline exceeds.
5238         mc.WaitForReady = newBool(true)
5239         m = make(map[string]grpc.MethodConfig)
5240         m["/grpc.testing.TestService/EmptyCall"] = mc
5241         m["/grpc.testing.TestService/FullDuplexCall"] = mc
5242         sc = grpc.ServiceConfig{
5243                 Methods: m,
5244         }
5245         ch <- sc
5246
5247         // Wait for the new service config to take effect.
5248         mc = cc.GetMethodConfig("/grpc.testing.TestService/EmptyCall")
5249         for {
5250                 if !*mc.WaitForReady {
5251                         time.Sleep(100 * time.Millisecond)
5252                         mc = cc.GetMethodConfig("/grpc.testing.TestService/EmptyCall")
5253                         continue
5254                 }
5255                 break
5256         }
5257         // The following RPCs are expected to become non-fail-fast ones with 1ms deadline.
5258         if _, err := tc.EmptyCall(context.Background(), &testpb.Empty{}); grpc.Code(err) != codes.DeadlineExceeded {
5259                 t.Fatalf("TestService/EmptyCall(_, _) = _, %v, want _, %s", err, codes.DeadlineExceeded)
5260         }
5261         if _, err := tc.FullDuplexCall(context.Background()); grpc.Code(err) != codes.DeadlineExceeded {
5262                 t.Fatalf("TestService/FullDuplexCall(_) = _, %v, want %s", err, codes.DeadlineExceeded)
5263         }
5264 }
5265
5266 func TestServiceConfigTimeoutTD(t *testing.T) {
5267         defer leakcheck.Check(t)
5268         for _, e := range listTestEnv() {
5269                 testServiceConfigTimeoutTD(t, e)
5270         }
5271 }
5272
5273 func testServiceConfigTimeoutTD(t *testing.T, e env) {
5274         te, ch := testServiceConfigSetupTD(t, e)
5275         defer te.tearDown()
5276
5277         // Case1: Client API sets timeout to be 1ns and ServiceConfig sets timeout to be 1hr. Timeout should be 1ns (min of 1ns and 1hr) and the rpc will wait until deadline exceeds.
5278         mc := grpc.MethodConfig{
5279                 Timeout: newDuration(time.Hour),
5280         }
5281         m := make(map[string]grpc.MethodConfig)
5282         m["/grpc.testing.TestService/EmptyCall"] = mc
5283         m["/grpc.testing.TestService/FullDuplexCall"] = mc
5284         sc := grpc.ServiceConfig{
5285                 Methods: m,
5286         }
5287         ch <- sc
5288
5289         cc := te.clientConn()
5290         tc := testpb.NewTestServiceClient(cc)
5291         // The following RPCs are expected to become non-fail-fast ones with 1ns deadline.
5292         ctx, cancel := context.WithTimeout(context.Background(), time.Nanosecond)
5293         if _, err := tc.EmptyCall(ctx, &testpb.Empty{}, grpc.FailFast(false)); grpc.Code(err) != codes.DeadlineExceeded {
5294                 t.Fatalf("TestService/EmptyCall(_, _) = _, %v, want _, %s", err, codes.DeadlineExceeded)
5295         }
5296         cancel()
5297         ctx, cancel = context.WithTimeout(context.Background(), time.Nanosecond)
5298         if _, err := tc.FullDuplexCall(ctx, grpc.FailFast(false)); grpc.Code(err) != codes.DeadlineExceeded {
5299                 t.Fatalf("TestService/FullDuplexCall(_) = _, %v, want %s", err, codes.DeadlineExceeded)
5300         }
5301         cancel()
5302
5303         // Generate a service config update.
5304         // Case2: Client API sets timeout to be 1hr and ServiceConfig sets timeout to be 1ns. Timeout should be 1ns (min of 1ns and 1hr) and the rpc will wait until deadline exceeds.
5305         mc.Timeout = newDuration(time.Nanosecond)
5306         m = make(map[string]grpc.MethodConfig)
5307         m["/grpc.testing.TestService/EmptyCall"] = mc
5308         m["/grpc.testing.TestService/FullDuplexCall"] = mc
5309         sc = grpc.ServiceConfig{
5310                 Methods: m,
5311         }
5312         ch <- sc
5313
5314         // Wait for the new service config to take effect.
5315         mc = cc.GetMethodConfig("/grpc.testing.TestService/FullDuplexCall")
5316         for {
5317                 if *mc.Timeout != time.Nanosecond {
5318                         time.Sleep(100 * time.Millisecond)
5319                         mc = cc.GetMethodConfig("/grpc.testing.TestService/FullDuplexCall")
5320                         continue
5321                 }
5322                 break
5323         }
5324
5325         ctx, cancel = context.WithTimeout(context.Background(), time.Hour)
5326         if _, err := tc.EmptyCall(ctx, &testpb.Empty{}, grpc.FailFast(false)); grpc.Code(err) != codes.DeadlineExceeded {
5327                 t.Fatalf("TestService/EmptyCall(_, _) = _, %v, want _, %s", err, codes.DeadlineExceeded)
5328         }
5329         cancel()
5330
5331         ctx, cancel = context.WithTimeout(context.Background(), time.Hour)
5332         if _, err := tc.FullDuplexCall(ctx, grpc.FailFast(false)); grpc.Code(err) != codes.DeadlineExceeded {
5333                 t.Fatalf("TestService/FullDuplexCall(_) = _, %v, want %s", err, codes.DeadlineExceeded)
5334         }
5335         cancel()
5336 }
5337
5338 func TestServiceConfigMaxMsgSizeTD(t *testing.T) {
5339         defer leakcheck.Check(t)
5340         for _, e := range listTestEnv() {
5341                 testServiceConfigMaxMsgSizeTD(t, e)
5342         }
5343 }
5344
5345 func testServiceConfigMaxMsgSizeTD(t *testing.T, e env) {
5346         // Setting up values and objects shared across all test cases.
5347         const smallSize = 1
5348         const largeSize = 1024
5349         const extraLargeSize = 2048
5350
5351         smallPayload, err := newPayload(testpb.PayloadType_COMPRESSABLE, smallSize)
5352         if err != nil {
5353                 t.Fatal(err)
5354         }
5355         largePayload, err := newPayload(testpb.PayloadType_COMPRESSABLE, largeSize)
5356         if err != nil {
5357                 t.Fatal(err)
5358         }
5359         extraLargePayload, err := newPayload(testpb.PayloadType_COMPRESSABLE, extraLargeSize)
5360         if err != nil {
5361                 t.Fatal(err)
5362         }
5363
5364         mc := grpc.MethodConfig{
5365                 MaxReqSize:  newInt(extraLargeSize),
5366                 MaxRespSize: newInt(extraLargeSize),
5367         }
5368
5369         m := make(map[string]grpc.MethodConfig)
5370         m["/grpc.testing.TestService/UnaryCall"] = mc
5371         m["/grpc.testing.TestService/FullDuplexCall"] = mc
5372         sc := grpc.ServiceConfig{
5373                 Methods: m,
5374         }
5375         // Case1: sc set maxReqSize to 2048 (send), maxRespSize to 2048 (recv).
5376         te1, ch1 := testServiceConfigSetupTD(t, e)
5377         te1.startServer(&testServer{security: e.security})
5378         defer te1.tearDown()
5379
5380         ch1 <- sc
5381         tc := testpb.NewTestServiceClient(te1.clientConn())
5382
5383         req := &testpb.SimpleRequest{
5384                 ResponseType: testpb.PayloadType_COMPRESSABLE,
5385                 ResponseSize: int32(extraLargeSize),
5386                 Payload:      smallPayload,
5387         }
5388         // Test for unary RPC recv.
5389         if _, err := tc.UnaryCall(context.Background(), req); err == nil || grpc.Code(err) != codes.ResourceExhausted {
5390                 t.Fatalf("TestService/UnaryCall(_, _) = _, %v, want _, error code: %s", err, codes.ResourceExhausted)
5391         }
5392
5393         // Test for unary RPC send.
5394         req.Payload = extraLargePayload
5395         req.ResponseSize = int32(smallSize)
5396         if _, err := tc.UnaryCall(context.Background(), req); err == nil || grpc.Code(err) != codes.ResourceExhausted {
5397                 t.Fatalf("TestService/UnaryCall(_, _) = _, %v, want _, error code: %s", err, codes.ResourceExhausted)
5398         }
5399
5400         // Test for streaming RPC recv.
5401         respParam := []*testpb.ResponseParameters{
5402                 {
5403                         Size: int32(extraLargeSize),
5404                 },
5405         }
5406         sreq := &testpb.StreamingOutputCallRequest{
5407                 ResponseType:       testpb.PayloadType_COMPRESSABLE,
5408                 ResponseParameters: respParam,
5409                 Payload:            smallPayload,
5410         }
5411         stream, err := tc.FullDuplexCall(te1.ctx)
5412         if err != nil {
5413                 t.Fatalf("%v.FullDuplexCall(_) = _, %v, want <nil>", tc, err)
5414         }
5415         if err := stream.Send(sreq); err != nil {
5416                 t.Fatalf("%v.Send(%v) = %v, want <nil>", stream, sreq, err)
5417         }
5418         if _, err := stream.Recv(); err == nil || grpc.Code(err) != codes.ResourceExhausted {
5419                 t.Fatalf("%v.Recv() = _, %v, want _, error code: %s", stream, err, codes.ResourceExhausted)
5420         }
5421
5422         // Test for streaming RPC send.
5423         respParam[0].Size = int32(smallSize)
5424         sreq.Payload = extraLargePayload
5425         stream, err = tc.FullDuplexCall(te1.ctx)
5426         if err != nil {
5427                 t.Fatalf("%v.FullDuplexCall(_) = _, %v, want <nil>", tc, err)
5428         }
5429         if err := stream.Send(sreq); err == nil || grpc.Code(err) != codes.ResourceExhausted {
5430                 t.Fatalf("%v.Send(%v) = %v, want _, error code: %s", stream, sreq, err, codes.ResourceExhausted)
5431         }
5432
5433         // Case2: Client API set maxReqSize to 1024 (send), maxRespSize to 1024 (recv). Sc sets maxReqSize to 2048 (send), maxRespSize to 2048 (recv).
5434         te2, ch2 := testServiceConfigSetupTD(t, e)
5435         te2.maxClientReceiveMsgSize = newInt(1024)
5436         te2.maxClientSendMsgSize = newInt(1024)
5437         te2.startServer(&testServer{security: e.security})
5438         defer te2.tearDown()
5439         ch2 <- sc
5440         tc = testpb.NewTestServiceClient(te2.clientConn())
5441
5442         // Test for unary RPC recv.
5443         req.Payload = smallPayload
5444         req.ResponseSize = int32(largeSize)
5445
5446         if _, err := tc.UnaryCall(context.Background(), req); err == nil || grpc.Code(err) != codes.ResourceExhausted {
5447                 t.Fatalf("TestService/UnaryCall(_, _) = _, %v, want _, error code: %s", err, codes.ResourceExhausted)
5448         }
5449
5450         // Test for unary RPC send.
5451         req.Payload = largePayload
5452         req.ResponseSize = int32(smallSize)
5453         if _, err := tc.UnaryCall(context.Background(), req); err == nil || grpc.Code(err) != codes.ResourceExhausted {
5454                 t.Fatalf("TestService/UnaryCall(_, _) = _, %v, want _, error code: %s", err, codes.ResourceExhausted)
5455         }
5456
5457         // Test for streaming RPC recv.
5458         stream, err = tc.FullDuplexCall(te2.ctx)
5459         respParam[0].Size = int32(largeSize)
5460         sreq.Payload = smallPayload
5461         if err != nil {
5462                 t.Fatalf("%v.FullDuplexCall(_) = _, %v, want <nil>", tc, err)
5463         }
5464         if err := stream.Send(sreq); err != nil {
5465                 t.Fatalf("%v.Send(%v) = %v, want <nil>", stream, sreq, err)
5466         }
5467         if _, err := stream.Recv(); err == nil || grpc.Code(err) != codes.ResourceExhausted {
5468                 t.Fatalf("%v.Recv() = _, %v, want _, error code: %s", stream, err, codes.ResourceExhausted)
5469         }
5470
5471         // Test for streaming RPC send.
5472         respParam[0].Size = int32(smallSize)
5473         sreq.Payload = largePayload
5474         stream, err = tc.FullDuplexCall(te2.ctx)
5475         if err != nil {
5476                 t.Fatalf("%v.FullDuplexCall(_) = _, %v, want <nil>", tc, err)
5477         }
5478         if err := stream.Send(sreq); err == nil || grpc.Code(err) != codes.ResourceExhausted {
5479                 t.Fatalf("%v.Send(%v) = %v, want _, error code: %s", stream, sreq, err, codes.ResourceExhausted)
5480         }
5481
5482         // Case3: Client API set maxReqSize to 4096 (send), maxRespSize to 4096 (recv). Sc sets maxReqSize to 2048 (send), maxRespSize to 2048 (recv).
5483         te3, ch3 := testServiceConfigSetupTD(t, e)
5484         te3.maxClientReceiveMsgSize = newInt(4096)
5485         te3.maxClientSendMsgSize = newInt(4096)
5486         te3.startServer(&testServer{security: e.security})
5487         defer te3.tearDown()
5488         ch3 <- sc
5489         tc = testpb.NewTestServiceClient(te3.clientConn())
5490
5491         // Test for unary RPC recv.
5492         req.Payload = smallPayload
5493         req.ResponseSize = int32(largeSize)
5494
5495         if _, err := tc.UnaryCall(context.Background(), req); err != nil {
5496                 t.Fatalf("TestService/UnaryCall(_, _) = _, %v, want <nil>", err)
5497         }
5498
5499         req.ResponseSize = int32(extraLargeSize)
5500         if _, err := tc.UnaryCall(context.Background(), req); err == nil || grpc.Code(err) != codes.ResourceExhausted {
5501                 t.Fatalf("TestService/UnaryCall(_, _) = _, %v, want _, error code: %s", err, codes.ResourceExhausted)
5502         }
5503
5504         // Test for unary RPC send.
5505         req.Payload = largePayload
5506         req.ResponseSize = int32(smallSize)
5507         if _, err := tc.UnaryCall(context.Background(), req); err != nil {
5508                 t.Fatalf("TestService/UnaryCall(_, _) = _, %v, want <nil>", err)
5509         }
5510
5511         req.Payload = extraLargePayload
5512         if _, err := tc.UnaryCall(context.Background(), req); err == nil || grpc.Code(err) != codes.ResourceExhausted {
5513                 t.Fatalf("TestService/UnaryCall(_, _) = _, %v, want _, error code: %s", err, codes.ResourceExhausted)
5514         }
5515
5516         // Test for streaming RPC recv.
5517         stream, err = tc.FullDuplexCall(te3.ctx)
5518         if err != nil {
5519                 t.Fatalf("%v.FullDuplexCall(_) = _, %v, want <nil>", tc, err)
5520         }
5521         respParam[0].Size = int32(largeSize)
5522         sreq.Payload = smallPayload
5523
5524         if err := stream.Send(sreq); err != nil {
5525                 t.Fatalf("%v.Send(%v) = %v, want <nil>", stream, sreq, err)
5526         }
5527         if _, err := stream.Recv(); err != nil {
5528                 t.Fatalf("%v.Recv() = _, %v, want <nil>", stream, err)
5529         }
5530
5531         respParam[0].Size = int32(extraLargeSize)
5532
5533         if err := stream.Send(sreq); err != nil {
5534                 t.Fatalf("%v.Send(%v) = %v, want <nil>", stream, sreq, err)
5535         }
5536         if _, err := stream.Recv(); err == nil || grpc.Code(err) != codes.ResourceExhausted {
5537                 t.Fatalf("%v.Recv() = _, %v, want _, error code: %s", stream, err, codes.ResourceExhausted)
5538         }
5539
5540         // Test for streaming RPC send.
5541         respParam[0].Size = int32(smallSize)
5542         sreq.Payload = largePayload
5543         stream, err = tc.FullDuplexCall(te3.ctx)
5544         if err != nil {
5545                 t.Fatalf("%v.FullDuplexCall(_) = _, %v, want <nil>", tc, err)
5546         }
5547         if err := stream.Send(sreq); err != nil {
5548                 t.Fatalf("%v.Send(%v) = %v, want <nil>", stream, sreq, err)
5549         }
5550         sreq.Payload = extraLargePayload
5551         if err := stream.Send(sreq); err == nil || grpc.Code(err) != codes.ResourceExhausted {
5552                 t.Fatalf("%v.Send(%v) = %v, want _, error code: %s", stream, sreq, err, codes.ResourceExhausted)
5553         }
5554 }
5555
5556 func TestMethodFromServerStream(t *testing.T) {
5557         defer leakcheck.Check(t)
5558         const testMethod = "/package.service/method"
5559         e := tcpClearRREnv
5560         te := newTest(t, e)
5561         var method string
5562         var ok bool
5563         te.unknownHandler = func(srv interface{}, stream grpc.ServerStream) error {
5564                 method, ok = grpc.MethodFromServerStream(stream)
5565                 return nil
5566         }
5567
5568         te.startServer(nil)
5569         defer te.tearDown()
5570         _ = te.clientConn().Invoke(context.Background(), testMethod, nil, nil)
5571         if !ok || method != testMethod {
5572                 t.Fatalf("Invoke with method %q, got %q, %v, want %q, true", testMethod, method, ok, testMethod)
5573         }
5574 }