3 * Copyright 2016 gRPC authors.
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
9 * http://www.apache.org/licenses/LICENSE-2.0
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
19 //go:generate protoc --go_out=plugins=:$GOPATH grpc_lb_v1/messages/messages.proto
20 //go:generate protoc --go_out=plugins=grpc:$GOPATH grpc_lb_v1/service/service.proto
22 // Package grpclb_test is currently used only for grpclb testing.
35 "github.com/golang/protobuf/proto"
36 "golang.org/x/net/context"
37 "google.golang.org/grpc"
38 "google.golang.org/grpc/codes"
39 "google.golang.org/grpc/credentials"
40 lbmpb "google.golang.org/grpc/grpclb/grpc_lb_v1/messages"
41 lbspb "google.golang.org/grpc/grpclb/grpc_lb_v1/service"
42 _ "google.golang.org/grpc/grpclog/glogger"
43 "google.golang.org/grpc/metadata"
44 "google.golang.org/grpc/naming"
45 testpb "google.golang.org/grpc/test/grpc_testing"
46 "google.golang.org/grpc/test/leakcheck"
54 // Resolver replaces localhost with fakeName in Next().
55 // Dialer replaces fakeName with localhost when dialing.
56 // This will test that custom dialer is passed from Dial to grpclb.
57 fakeName = "fake.Name"
60 type testWatcher struct {
61 // the channel to receives name resolution updates
62 update chan *naming.Update
63 // the side channel to get to know how many updates in a batch
65 // the channel to notifiy update injector that the update reading is done
69 func (w *testWatcher) Next() (updates []*naming.Update, err error) {
72 return nil, fmt.Errorf("w.side is closed")
74 for i := 0; i < n; i++ {
80 // Resolver replaces localhost with fakeName in Next().
81 // Custom dialer will replace fakeName with localhost when dialing.
82 u.Addr = strings.Replace(u.Addr, "localhost", fakeName, 1)
83 updates = append(updates, u)
90 func (w *testWatcher) Close() {
94 // Inject naming resolution updates to the testWatcher.
95 func (w *testWatcher) inject(updates []*naming.Update) {
96 w.side <- len(updates)
97 for _, u := range updates {
103 type testNameResolver struct {
108 func (r *testNameResolver) Resolve(target string) (naming.Watcher, error) {
110 update: make(chan *naming.Update, len(r.addrs)),
111 side: make(chan int, 1),
112 readDone: make(chan int),
114 r.w.side <- len(r.addrs)
115 for _, addr := range r.addrs {
116 r.w.update <- &naming.Update{
119 Metadata: &naming.AddrMetadataGRPCLB{
120 AddrType: naming.GRPCLB,
131 func (r *testNameResolver) inject(updates []*naming.Update) {
137 type serverNameCheckCreds struct {
143 func (c *serverNameCheckCreds) ServerHandshake(rawConn net.Conn) (net.Conn, credentials.AuthInfo, error) {
144 if _, err := io.WriteString(rawConn, c.sn); err != nil {
145 fmt.Printf("Failed to write the server name %s to the client %v", c.sn, err)
148 return rawConn, nil, nil
150 func (c *serverNameCheckCreds) ClientHandshake(ctx context.Context, addr string, rawConn net.Conn) (net.Conn, credentials.AuthInfo, error) {
153 b := make([]byte, len(c.expected))
154 errCh := make(chan error, 1)
156 _, err := rawConn.Read(b)
162 fmt.Printf("Failed to read the server name from the server %v", err)
166 return nil, nil, ctx.Err()
168 if c.expected != string(b) {
169 fmt.Printf("Read the server name %s want %s", string(b), c.expected)
170 return nil, nil, errors.New("received unexpected server name")
172 return rawConn, nil, nil
174 func (c *serverNameCheckCreds) Info() credentials.ProtocolInfo {
177 return credentials.ProtocolInfo{}
179 func (c *serverNameCheckCreds) Clone() credentials.TransportCredentials {
182 return &serverNameCheckCreds{
183 expected: c.expected,
186 func (c *serverNameCheckCreds) OverrideServerName(s string) error {
193 // fakeNameDialer replaces fakeName with localhost when dialing.
194 // This will test that custom dialer is passed from Dial to grpclb.
195 func fakeNameDialer(addr string, timeout time.Duration) (net.Conn, error) {
196 addr = strings.Replace(addr, fakeName, "localhost", 1)
197 return net.DialTimeout("tcp", addr, timeout)
200 type remoteBalancer struct {
201 sls []*lbmpb.ServerList
202 intervals []time.Duration
203 statsDura time.Duration
206 stats lbmpb.ClientStats
209 func newRemoteBalancer(sls []*lbmpb.ServerList, intervals []time.Duration) *remoteBalancer {
210 return &remoteBalancer{
212 intervals: intervals,
213 done: make(chan struct{}),
217 func (b *remoteBalancer) stop() {
221 func (b *remoteBalancer) BalanceLoad(stream lbspb.LoadBalancer_BalanceLoadServer) error {
222 req, err := stream.Recv()
226 initReq := req.GetInitialRequest()
227 if initReq.Name != besn {
228 return grpc.Errorf(codes.InvalidArgument, "invalid service name: %v", initReq.Name)
230 resp := &lbmpb.LoadBalanceResponse{
231 LoadBalanceResponseType: &lbmpb.LoadBalanceResponse_InitialResponse{
232 InitialResponse: &lbmpb.InitialLoadBalanceResponse{
233 ClientStatsReportInterval: &lbmpb.Duration{
234 Seconds: int64(b.statsDura.Seconds()),
235 Nanos: int32(b.statsDura.Nanoseconds() - int64(b.statsDura.Seconds())*1e9),
240 if err := stream.Send(resp); err != nil {
246 req *lbmpb.LoadBalanceRequest
249 if req, err = stream.Recv(); err != nil {
253 b.stats.NumCallsStarted += req.GetClientStats().NumCallsStarted
254 b.stats.NumCallsFinished += req.GetClientStats().NumCallsFinished
255 b.stats.NumCallsFinishedWithDropForRateLimiting += req.GetClientStats().NumCallsFinishedWithDropForRateLimiting
256 b.stats.NumCallsFinishedWithDropForLoadBalancing += req.GetClientStats().NumCallsFinishedWithDropForLoadBalancing
257 b.stats.NumCallsFinishedWithClientFailedToSend += req.GetClientStats().NumCallsFinishedWithClientFailedToSend
258 b.stats.NumCallsFinishedKnownReceived += req.GetClientStats().NumCallsFinishedKnownReceived
262 for k, v := range b.sls {
263 time.Sleep(b.intervals[k])
264 resp = &lbmpb.LoadBalanceResponse{
265 LoadBalanceResponseType: &lbmpb.LoadBalanceResponse_ServerList{
269 if err := stream.Send(resp); err != nil {
277 type testServer struct {
278 testpb.TestServiceServer
283 const testmdkey = "testmd"
285 func (s *testServer) EmptyCall(ctx context.Context, in *testpb.Empty) (*testpb.Empty, error) {
286 md, ok := metadata.FromIncomingContext(ctx)
288 return nil, grpc.Errorf(codes.Internal, "failed to receive metadata")
290 if md == nil || md["lb-token"][0] != lbToken {
291 return nil, grpc.Errorf(codes.Internal, "received unexpected metadata: %v", md)
293 grpc.SetTrailer(ctx, metadata.Pairs(testmdkey, s.addr))
294 return &testpb.Empty{}, nil
297 func (s *testServer) FullDuplexCall(stream testpb.TestService_FullDuplexCallServer) error {
301 func startBackends(sn string, lis ...net.Listener) (servers []*grpc.Server) {
302 for _, l := range lis {
303 creds := &serverNameCheckCreds{
306 s := grpc.NewServer(grpc.Creds(creds))
307 testpb.RegisterTestServiceServer(s, &testServer{addr: l.Addr().String()})
308 servers = append(servers, s)
309 go func(s *grpc.Server, l net.Listener) {
316 func stopBackends(servers []*grpc.Server) {
317 for _, s := range servers {
322 type testServers struct {
330 func newLoadBalancer(numberOfBackends int) (tss *testServers, cleanup func(), err error) {
332 beListeners []net.Listener
338 for i := 0; i < numberOfBackends; i++ {
340 beLis, e := net.Listen("tcp", "localhost:0")
342 err = fmt.Errorf("Failed to listen %v", err)
345 beIPs = append(beIPs, beLis.Addr().(*net.TCPAddr).IP)
346 bePorts = append(bePorts, beLis.Addr().(*net.TCPAddr).Port)
348 beListeners = append(beListeners, beLis)
350 backends := startBackends(besn, beListeners...)
352 // Start a load balancer.
353 lbLis, err := net.Listen("tcp", "localhost:0")
355 err = fmt.Errorf("Failed to create the listener for the load balancer %v", err)
358 lbCreds := &serverNameCheckCreds{
361 lb = grpc.NewServer(grpc.Creds(lbCreds))
363 err = fmt.Errorf("Failed to generate the port number %v", err)
366 ls = newRemoteBalancer(nil, nil)
367 lbspb.RegisterLoadBalancerServer(lb, ls)
373 lbAddr: lbLis.Addr().String(),
380 defer stopBackends(backends)
389 func TestGRPCLB(t *testing.T) {
390 defer leakcheck.Check(t)
391 tss, cleanup, err := newLoadBalancer(1)
393 t.Fatalf("failed to create new load balancer: %v", err)
398 IpAddress: tss.beIPs[0],
399 Port: int32(tss.bePorts[0]),
400 LoadBalanceToken: lbToken,
402 var bes []*lbmpb.Server
403 bes = append(bes, be)
404 sl := &lbmpb.ServerList{
407 tss.ls.sls = []*lbmpb.ServerList{sl}
408 tss.ls.intervals = []time.Duration{0}
409 creds := serverNameCheckCreds{
412 ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
414 cc, err := grpc.DialContext(ctx, besn,
415 grpc.WithBalancer(grpc.NewGRPCLBBalancer(&testNameResolver{addrs: []string{tss.lbAddr}})),
416 grpc.WithBlock(), grpc.WithTransportCredentials(&creds), grpc.WithDialer(fakeNameDialer))
418 t.Fatalf("Failed to dial to the backend %v", err)
421 testC := testpb.NewTestServiceClient(cc)
422 if _, err := testC.EmptyCall(context.Background(), &testpb.Empty{}, grpc.FailFast(false)); err != nil {
423 t.Fatalf("%v.EmptyCall(_, _) = _, %v, want _, <nil>", testC, err)
427 func TestDropRequest(t *testing.T) {
428 defer leakcheck.Check(t)
429 tss, cleanup, err := newLoadBalancer(2)
431 t.Fatalf("failed to create new load balancer: %v", err)
434 tss.ls.sls = []*lbmpb.ServerList{{
435 Servers: []*lbmpb.Server{{
436 IpAddress: tss.beIPs[0],
437 Port: int32(tss.bePorts[0]),
438 LoadBalanceToken: lbToken,
439 DropForLoadBalancing: true,
441 IpAddress: tss.beIPs[1],
442 Port: int32(tss.bePorts[1]),
443 LoadBalanceToken: lbToken,
444 DropForLoadBalancing: false,
447 tss.ls.intervals = []time.Duration{0}
448 creds := serverNameCheckCreds{
451 ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
453 cc, err := grpc.DialContext(ctx, besn,
454 grpc.WithBalancer(grpc.NewGRPCLBBalancer(&testNameResolver{addrs: []string{tss.lbAddr}})),
455 grpc.WithBlock(), grpc.WithTransportCredentials(&creds), grpc.WithDialer(fakeNameDialer))
457 t.Fatalf("Failed to dial to the backend %v", err)
460 testC := testpb.NewTestServiceClient(cc)
461 // Wait until the first connection is up.
462 // The first one has Drop set to true, error should contain "drop requests".
464 if _, err := testC.EmptyCall(context.Background(), &testpb.Empty{}); err != nil {
465 if strings.Contains(err.Error(), "drops requests") {
470 // The 1st, non-fail-fast RPC should succeed. This ensures both server
471 // connections are made, because the first one has DropForLoadBalancing set to true.
472 if _, err := testC.EmptyCall(context.Background(), &testpb.Empty{}, grpc.FailFast(false)); err != nil {
473 t.Fatalf("%v.SayHello(_, _) = _, %v, want _, <nil>", testC, err)
475 for i := 0; i < 3; i++ {
476 // Odd fail-fast RPCs should fail, because the 1st backend has DropForLoadBalancing
478 if _, err := testC.EmptyCall(context.Background(), &testpb.Empty{}); grpc.Code(err) != codes.Unavailable {
479 t.Fatalf("%v.EmptyCall(_, _) = _, %v, want _, %s", testC, err, codes.Unavailable)
481 // Even fail-fast RPCs should succeed since they choose the
482 // non-drop-request backend according to the round robin policy.
483 if _, err := testC.EmptyCall(context.Background(), &testpb.Empty{}); err != nil {
484 t.Fatalf("%v.EmptyCall(_, _) = _, %v, want _, <nil>", testC, err)
489 func TestDropRequestFailedNonFailFast(t *testing.T) {
490 defer leakcheck.Check(t)
491 tss, cleanup, err := newLoadBalancer(1)
493 t.Fatalf("failed to create new load balancer: %v", err)
497 IpAddress: tss.beIPs[0],
498 Port: int32(tss.bePorts[0]),
499 LoadBalanceToken: lbToken,
500 DropForLoadBalancing: true,
502 var bes []*lbmpb.Server
503 bes = append(bes, be)
504 sl := &lbmpb.ServerList{
507 tss.ls.sls = []*lbmpb.ServerList{sl}
508 tss.ls.intervals = []time.Duration{0}
509 creds := serverNameCheckCreds{
512 ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
514 cc, err := grpc.DialContext(ctx, besn,
515 grpc.WithBalancer(grpc.NewGRPCLBBalancer(&testNameResolver{addrs: []string{tss.lbAddr}})),
516 grpc.WithBlock(), grpc.WithTransportCredentials(&creds), grpc.WithDialer(fakeNameDialer))
518 t.Fatalf("Failed to dial to the backend %v", err)
521 testC := testpb.NewTestServiceClient(cc)
522 ctx, cancel = context.WithTimeout(context.Background(), 10*time.Millisecond)
524 if _, err := testC.EmptyCall(ctx, &testpb.Empty{}, grpc.FailFast(false)); grpc.Code(err) != codes.DeadlineExceeded {
525 t.Fatalf("%v.EmptyCall(_, _) = _, %v, want _, %s", testC, err, codes.DeadlineExceeded)
529 // When the balancer in use disconnects, grpclb should connect to the next address from resolved balancer address list.
530 func TestBalancerDisconnects(t *testing.T) {
531 defer leakcheck.Check(t)
536 for i := 0; i < 3; i++ {
537 tss, cleanup, err := newLoadBalancer(1)
539 t.Fatalf("failed to create new load balancer: %v", err)
544 IpAddress: tss.beIPs[0],
545 Port: int32(tss.bePorts[0]),
546 LoadBalanceToken: lbToken,
548 var bes []*lbmpb.Server
549 bes = append(bes, be)
550 sl := &lbmpb.ServerList{
553 tss.ls.sls = []*lbmpb.ServerList{sl}
554 tss.ls.intervals = []time.Duration{0}
556 lbAddrs = append(lbAddrs, tss.lbAddr)
557 lbs = append(lbs, tss.lb)
560 creds := serverNameCheckCreds{
563 ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
565 resolver := &testNameResolver{
568 cc, err := grpc.DialContext(ctx, besn,
569 grpc.WithBalancer(grpc.NewGRPCLBBalancer(resolver)),
570 grpc.WithBlock(), grpc.WithTransportCredentials(&creds), grpc.WithDialer(fakeNameDialer))
572 t.Fatalf("Failed to dial to the backend %v", err)
575 testC := testpb.NewTestServiceClient(cc)
576 var previousTrailer string
577 trailer := metadata.MD{}
578 if _, err := testC.EmptyCall(context.Background(), &testpb.Empty{}, grpc.Trailer(&trailer), grpc.FailFast(false)); err != nil {
579 t.Fatalf("%v.EmptyCall(_, _) = _, %v, want _, <nil>", testC, err)
581 previousTrailer = trailer[testmdkey][0]
583 // The initial resolver update contains lbs[0] and lbs[1].
584 // When lbs[0] is stopped, lbs[1] should be used.
587 if _, err := testC.EmptyCall(context.Background(), &testpb.Empty{}, grpc.Trailer(&trailer), grpc.FailFast(false)); err != nil {
588 t.Fatalf("%v.EmptyCall(_, _) = _, %v, want _, <nil>", testC, err)
589 } else if trailer[testmdkey][0] != previousTrailer {
590 // A new backend server should receive the request.
591 // The trailer contains the backend address, so the trailer should be different from the previous one.
592 previousTrailer = trailer[testmdkey][0]
595 time.Sleep(100 * time.Millisecond)
597 // Inject a update to add lbs[2] to resolved addresses.
598 resolver.inject([]*naming.Update{
601 Metadata: &naming.AddrMetadataGRPCLB{
602 AddrType: naming.GRPCLB,
607 // Stop lbs[1]. Now lbs[0] and lbs[1] are all stopped. lbs[2] should be used.
610 if _, err := testC.EmptyCall(context.Background(), &testpb.Empty{}, grpc.Trailer(&trailer), grpc.FailFast(false)); err != nil {
611 t.Fatalf("%v.EmptyCall(_, _) = _, %v, want _, <nil>", testC, err)
612 } else if trailer[testmdkey][0] != previousTrailer {
613 // A new backend server should receive the request.
614 // The trailer contains the backend address, so the trailer should be different from the previous one.
617 time.Sleep(100 * time.Millisecond)
621 type failPreRPCCred struct{}
623 func (failPreRPCCred) GetRequestMetadata(ctx context.Context, uri ...string) (map[string]string, error) {
624 if strings.Contains(uri[0], "failtosend") {
625 return nil, fmt.Errorf("rpc should fail to send")
630 func (failPreRPCCred) RequireTransportSecurity() bool {
634 func checkStats(stats *lbmpb.ClientStats, expected *lbmpb.ClientStats) error {
635 if !proto.Equal(stats, expected) {
636 return fmt.Errorf("stats not equal: got %+v, want %+v", stats, expected)
641 func runAndGetStats(t *testing.T, dropForLoadBalancing, dropForRateLimiting bool, runRPCs func(*grpc.ClientConn)) lbmpb.ClientStats {
642 tss, cleanup, err := newLoadBalancer(3)
644 t.Fatalf("failed to create new load balancer: %v", err)
647 tss.ls.sls = []*lbmpb.ServerList{{
648 Servers: []*lbmpb.Server{{
649 IpAddress: tss.beIPs[2],
650 Port: int32(tss.bePorts[2]),
651 LoadBalanceToken: lbToken,
652 DropForLoadBalancing: dropForLoadBalancing,
653 DropForRateLimiting: dropForRateLimiting,
656 tss.ls.intervals = []time.Duration{0}
657 tss.ls.statsDura = 100 * time.Millisecond
658 creds := serverNameCheckCreds{expected: besn}
660 ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
662 cc, err := grpc.DialContext(ctx, besn,
663 grpc.WithBalancer(grpc.NewGRPCLBBalancer(&testNameResolver{addrs: []string{tss.lbAddr}})),
664 grpc.WithTransportCredentials(&creds), grpc.WithPerRPCCredentials(failPreRPCCred{}),
665 grpc.WithBlock(), grpc.WithDialer(fakeNameDialer))
667 t.Fatalf("Failed to dial to the backend %v", err)
672 time.Sleep(1 * time.Second)
674 stats := tss.ls.stats
681 func TestGRPCLBStatsUnarySuccess(t *testing.T) {
682 defer leakcheck.Check(t)
683 stats := runAndGetStats(t, false, false, func(cc *grpc.ClientConn) {
684 testC := testpb.NewTestServiceClient(cc)
685 // The first non-failfast RPC succeeds, all connections are up.
686 if _, err := testC.EmptyCall(context.Background(), &testpb.Empty{}, grpc.FailFast(false)); err != nil {
687 t.Fatalf("%v.EmptyCall(_, _) = _, %v, want _, <nil>", testC, err)
689 for i := 0; i < countRPC-1; i++ {
690 testC.EmptyCall(context.Background(), &testpb.Empty{})
694 if err := checkStats(&stats, &lbmpb.ClientStats{
695 NumCallsStarted: int64(countRPC),
696 NumCallsFinished: int64(countRPC),
697 NumCallsFinishedKnownReceived: int64(countRPC),
703 func TestGRPCLBStatsUnaryDropLoadBalancing(t *testing.T) {
704 defer leakcheck.Check(t)
706 stats := runAndGetStats(t, true, false, func(cc *grpc.ClientConn) {
707 testC := testpb.NewTestServiceClient(cc)
710 if _, err := testC.EmptyCall(context.Background(), &testpb.Empty{}); err != nil {
711 if strings.Contains(err.Error(), "drops requests") {
716 for i := 0; i < countRPC; i++ {
717 testC.EmptyCall(context.Background(), &testpb.Empty{})
721 if err := checkStats(&stats, &lbmpb.ClientStats{
722 NumCallsStarted: int64(countRPC + c),
723 NumCallsFinished: int64(countRPC + c),
724 NumCallsFinishedWithDropForLoadBalancing: int64(countRPC + 1),
725 NumCallsFinishedWithClientFailedToSend: int64(c - 1),
731 func TestGRPCLBStatsUnaryDropRateLimiting(t *testing.T) {
732 defer leakcheck.Check(t)
734 stats := runAndGetStats(t, false, true, func(cc *grpc.ClientConn) {
735 testC := testpb.NewTestServiceClient(cc)
738 if _, err := testC.EmptyCall(context.Background(), &testpb.Empty{}); err != nil {
739 if strings.Contains(err.Error(), "drops requests") {
744 for i := 0; i < countRPC; i++ {
745 testC.EmptyCall(context.Background(), &testpb.Empty{})
749 if err := checkStats(&stats, &lbmpb.ClientStats{
750 NumCallsStarted: int64(countRPC + c),
751 NumCallsFinished: int64(countRPC + c),
752 NumCallsFinishedWithDropForRateLimiting: int64(countRPC + 1),
753 NumCallsFinishedWithClientFailedToSend: int64(c - 1),
759 func TestGRPCLBStatsUnaryFailedToSend(t *testing.T) {
760 defer leakcheck.Check(t)
761 stats := runAndGetStats(t, false, false, func(cc *grpc.ClientConn) {
762 testC := testpb.NewTestServiceClient(cc)
763 // The first non-failfast RPC succeeds, all connections are up.
764 if _, err := testC.EmptyCall(context.Background(), &testpb.Empty{}, grpc.FailFast(false)); err != nil {
765 t.Fatalf("%v.EmptyCall(_, _) = _, %v, want _, <nil>", testC, err)
767 for i := 0; i < countRPC-1; i++ {
768 grpc.Invoke(context.Background(), "failtosend", &testpb.Empty{}, nil, cc)
772 if err := checkStats(&stats, &lbmpb.ClientStats{
773 NumCallsStarted: int64(countRPC),
774 NumCallsFinished: int64(countRPC),
775 NumCallsFinishedWithClientFailedToSend: int64(countRPC - 1),
776 NumCallsFinishedKnownReceived: 1,
782 func TestGRPCLBStatsStreamingSuccess(t *testing.T) {
783 defer leakcheck.Check(t)
784 stats := runAndGetStats(t, false, false, func(cc *grpc.ClientConn) {
785 testC := testpb.NewTestServiceClient(cc)
786 // The first non-failfast RPC succeeds, all connections are up.
787 stream, err := testC.FullDuplexCall(context.Background(), grpc.FailFast(false))
789 t.Fatalf("%v.FullDuplexCall(_, _) = _, %v, want _, <nil>", testC, err)
792 if _, err = stream.Recv(); err == io.EOF {
796 for i := 0; i < countRPC-1; i++ {
797 stream, err = testC.FullDuplexCall(context.Background())
799 // Wait for stream to end if err is nil.
801 if _, err = stream.Recv(); err == io.EOF {
809 if err := checkStats(&stats, &lbmpb.ClientStats{
810 NumCallsStarted: int64(countRPC),
811 NumCallsFinished: int64(countRPC),
812 NumCallsFinishedKnownReceived: int64(countRPC),
818 func TestGRPCLBStatsStreamingDropLoadBalancing(t *testing.T) {
819 defer leakcheck.Check(t)
821 stats := runAndGetStats(t, true, false, func(cc *grpc.ClientConn) {
822 testC := testpb.NewTestServiceClient(cc)
825 if _, err := testC.EmptyCall(context.Background(), &testpb.Empty{}); err != nil {
826 if strings.Contains(err.Error(), "drops requests") {
831 for i := 0; i < countRPC; i++ {
832 testC.FullDuplexCall(context.Background())
836 if err := checkStats(&stats, &lbmpb.ClientStats{
837 NumCallsStarted: int64(countRPC + c),
838 NumCallsFinished: int64(countRPC + c),
839 NumCallsFinishedWithDropForLoadBalancing: int64(countRPC + 1),
840 NumCallsFinishedWithClientFailedToSend: int64(c - 1),
846 func TestGRPCLBStatsStreamingDropRateLimiting(t *testing.T) {
847 defer leakcheck.Check(t)
849 stats := runAndGetStats(t, false, true, func(cc *grpc.ClientConn) {
850 testC := testpb.NewTestServiceClient(cc)
853 if _, err := testC.EmptyCall(context.Background(), &testpb.Empty{}); err != nil {
854 if strings.Contains(err.Error(), "drops requests") {
859 for i := 0; i < countRPC; i++ {
860 testC.FullDuplexCall(context.Background())
864 if err := checkStats(&stats, &lbmpb.ClientStats{
865 NumCallsStarted: int64(countRPC + c),
866 NumCallsFinished: int64(countRPC + c),
867 NumCallsFinishedWithDropForRateLimiting: int64(countRPC + 1),
868 NumCallsFinishedWithClientFailedToSend: int64(c - 1),
874 func TestGRPCLBStatsStreamingFailedToSend(t *testing.T) {
875 defer leakcheck.Check(t)
876 stats := runAndGetStats(t, false, false, func(cc *grpc.ClientConn) {
877 testC := testpb.NewTestServiceClient(cc)
878 // The first non-failfast RPC succeeds, all connections are up.
879 stream, err := testC.FullDuplexCall(context.Background(), grpc.FailFast(false))
881 t.Fatalf("%v.FullDuplexCall(_, _) = _, %v, want _, <nil>", testC, err)
884 if _, err = stream.Recv(); err == io.EOF {
888 for i := 0; i < countRPC-1; i++ {
889 grpc.NewClientStream(context.Background(), &grpc.StreamDesc{}, cc, "failtosend")
893 if err := checkStats(&stats, &lbmpb.ClientStats{
894 NumCallsStarted: int64(countRPC),
895 NumCallsFinished: int64(countRPC),
896 NumCallsFinishedWithClientFailedToSend: int64(countRPC - 1),
897 NumCallsFinishedKnownReceived: 1,