3 * Copyright 2016 gRPC authors.
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
9 * http://www.apache.org/licenses/LICENSE-2.0
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
29 "golang.org/x/net/context"
30 "google.golang.org/grpc/codes"
31 _ "google.golang.org/grpc/grpclog/glogger"
32 "google.golang.org/grpc/naming"
33 "google.golang.org/grpc/test/leakcheck"
35 // V1 balancer tests use passthrough resolver instead of dns.
36 // TODO(bar) remove this when removing v1 balaner entirely.
37 _ "google.golang.org/grpc/resolver/passthrough"
40 type testWatcher struct {
41 // the channel to receives name resolution updates
42 update chan *naming.Update
43 // the side channel to get to know how many updates in a batch
45 // the channel to notifiy update injector that the update reading is done
49 func (w *testWatcher) Next() (updates []*naming.Update, err error) {
52 return nil, fmt.Errorf("w.side is closed")
54 for i := 0; i < n; i++ {
57 updates = append(updates, u)
64 func (w *testWatcher) Close() {
68 // Inject naming resolution updates to the testWatcher.
69 func (w *testWatcher) inject(updates []*naming.Update) {
70 w.side <- len(updates)
71 for _, u := range updates {
77 type testNameResolver struct {
82 func (r *testNameResolver) Resolve(target string) (naming.Watcher, error) {
84 update: make(chan *naming.Update, 1),
85 side: make(chan int, 1),
86 readDone: make(chan int),
89 r.w.update <- &naming.Update{
99 func startServers(t *testing.T, numServers int, maxStreams uint32) ([]*server, *testNameResolver, func()) {
100 var servers []*server
101 for i := 0; i < numServers; i++ {
103 servers = append(servers, s)
104 go s.start(t, 0, maxStreams)
105 s.wait(t, 2*time.Second)
107 // Point to server[0]
108 addr := "localhost:" + servers[0].port
109 return servers, &testNameResolver{
112 for i := 0; i < numServers; i++ {
118 func TestNameDiscovery(t *testing.T) {
119 defer leakcheck.Check(t)
120 // Start 2 servers on 2 ports.
122 servers, r, cleanup := startServers(t, numServers, math.MaxUint32)
124 cc, err := Dial("passthrough:///foo.bar.com", WithBalancer(RoundRobin(r)), WithBlock(), WithInsecure(), WithCodec(testCodec{}))
126 t.Fatalf("Failed to create ClientConn: %v", err)
131 if err := Invoke(context.Background(), "/foo/bar", &req, &reply, cc); err == nil || ErrorDesc(err) != servers[0].port {
132 t.Fatalf("grpc.Invoke(_, _, _, _, _) = %v, want %s", err, servers[0].port)
134 // Inject the name resolution change to remove servers[0] and add servers[1].
135 var updates []*naming.Update
136 updates = append(updates, &naming.Update{
138 Addr: "localhost:" + servers[0].port,
140 updates = append(updates, &naming.Update{
142 Addr: "localhost:" + servers[1].port,
145 // Loop until the rpcs in flight talks to servers[1].
147 if err := Invoke(context.Background(), "/foo/bar", &req, &reply, cc); err != nil && ErrorDesc(err) == servers[1].port {
150 time.Sleep(10 * time.Millisecond)
154 func TestEmptyAddrs(t *testing.T) {
155 defer leakcheck.Check(t)
156 servers, r, cleanup := startServers(t, 1, math.MaxUint32)
158 cc, err := Dial("passthrough:///foo.bar.com", WithBalancer(RoundRobin(r)), WithBlock(), WithInsecure(), WithCodec(testCodec{}))
160 t.Fatalf("Failed to create ClientConn: %v", err)
164 if err := Invoke(context.Background(), "/foo/bar", &expectedRequest, &reply, cc); err != nil || reply != expectedResponse {
165 t.Fatalf("grpc.Invoke(_, _, _, _, _) = %v, reply = %q, want %q, <nil>", err, reply, expectedResponse)
167 // Inject name resolution change to remove the server so that there is no address
168 // available after that.
171 Addr: "localhost:" + servers[0].port,
173 r.w.inject([]*naming.Update{u})
174 // Loop until the above updates apply.
176 time.Sleep(10 * time.Millisecond)
177 ctx, cancel := context.WithTimeout(context.Background(), 10*time.Millisecond)
178 if err := Invoke(ctx, "/foo/bar", &expectedRequest, &reply, cc); err != nil {
186 func TestRoundRobin(t *testing.T) {
187 defer leakcheck.Check(t)
188 // Start 3 servers on 3 ports.
190 servers, r, cleanup := startServers(t, numServers, math.MaxUint32)
192 cc, err := Dial("passthrough:///foo.bar.com", WithBalancer(RoundRobin(r)), WithBlock(), WithInsecure(), WithCodec(testCodec{}))
194 t.Fatalf("Failed to create ClientConn: %v", err)
197 // Add servers[1] to the service discovery.
200 Addr: "localhost:" + servers[1].port,
202 r.w.inject([]*naming.Update{u})
205 // Loop until servers[1] is up
207 if err := Invoke(context.Background(), "/foo/bar", &req, &reply, cc); err != nil && ErrorDesc(err) == servers[1].port {
210 time.Sleep(10 * time.Millisecond)
212 // Add server2[2] to the service discovery.
215 Addr: "localhost:" + servers[2].port,
217 r.w.inject([]*naming.Update{u})
218 // Loop until both servers[2] are up.
220 if err := Invoke(context.Background(), "/foo/bar", &req, &reply, cc); err != nil && ErrorDesc(err) == servers[2].port {
223 time.Sleep(10 * time.Millisecond)
225 // Check the incoming RPCs served in a round-robin manner.
226 for i := 0; i < 10; i++ {
227 if err := Invoke(context.Background(), "/foo/bar", &req, &reply, cc); err == nil || ErrorDesc(err) != servers[i%numServers].port {
228 t.Fatalf("Index %d: Invoke(_, _, _, _, _) = %v, want %s", i, err, servers[i%numServers].port)
233 func TestCloseWithPendingRPC(t *testing.T) {
234 defer leakcheck.Check(t)
235 servers, r, cleanup := startServers(t, 1, math.MaxUint32)
237 cc, err := Dial("passthrough:///foo.bar.com", WithBalancer(RoundRobin(r)), WithBlock(), WithInsecure(), WithCodec(testCodec{}))
239 t.Fatalf("Failed to create ClientConn: %v", err)
243 if err := Invoke(context.Background(), "/foo/bar", &expectedRequest, &reply, cc, FailFast(false)); err != nil {
244 t.Fatalf("grpc.Invoke(_, _, _, _, _) = %v, want %s", err, servers[0].port)
246 // Remove the server.
247 updates := []*naming.Update{{
249 Addr: "localhost:" + servers[0].port,
252 // Loop until the above update applies.
254 ctx, cancel := context.WithTimeout(context.Background(), 10*time.Millisecond)
255 if err := Invoke(ctx, "/foo/bar", &expectedRequest, &reply, cc, FailFast(false)); Code(err) == codes.DeadlineExceeded {
259 time.Sleep(10 * time.Millisecond)
262 // Issue 2 RPCs which should be completed with error status once cc is closed.
263 var wg sync.WaitGroup
268 if err := Invoke(context.Background(), "/foo/bar", &expectedRequest, &reply, cc, FailFast(false)); err == nil {
269 t.Errorf("grpc.Invoke(_, _, _, _, _) = %v, want not nil", err)
275 time.Sleep(5 * time.Millisecond)
276 if err := Invoke(context.Background(), "/foo/bar", &expectedRequest, &reply, cc, FailFast(false)); err == nil {
277 t.Errorf("grpc.Invoke(_, _, _, _, _) = %v, want not nil", err)
280 time.Sleep(5 * time.Millisecond)
285 func TestGetOnWaitChannel(t *testing.T) {
286 defer leakcheck.Check(t)
287 servers, r, cleanup := startServers(t, 1, math.MaxUint32)
289 cc, err := Dial("passthrough:///foo.bar.com", WithBalancer(RoundRobin(r)), WithBlock(), WithInsecure(), WithCodec(testCodec{}))
291 t.Fatalf("Failed to create ClientConn: %v", err)
294 // Remove all servers so that all upcoming RPCs will block on waitCh.
295 updates := []*naming.Update{{
297 Addr: "localhost:" + servers[0].port,
302 ctx, cancel := context.WithTimeout(context.Background(), 10*time.Millisecond)
303 if err := Invoke(ctx, "/foo/bar", &expectedRequest, &reply, cc, FailFast(false)); Code(err) == codes.DeadlineExceeded {
308 time.Sleep(10 * time.Millisecond)
310 var wg sync.WaitGroup
315 if err := Invoke(context.Background(), "/foo/bar", &expectedRequest, &reply, cc, FailFast(false)); err != nil {
316 t.Errorf("grpc.Invoke(_, _, _, _, _) = %v, want <nil>", err)
319 // Add a connected server to get the above RPC through.
320 updates = []*naming.Update{{
322 Addr: "localhost:" + servers[0].port,
325 // Wait until the above RPC succeeds.
329 func TestOneServerDown(t *testing.T) {
330 defer leakcheck.Check(t)
333 servers, r, cleanup := startServers(t, numServers, math.MaxUint32)
335 cc, err := Dial("passthrough:///foo.bar.com", WithBalancer(RoundRobin(r)), WithBlock(), WithInsecure(), WithCodec(testCodec{}))
337 t.Fatalf("Failed to create ClientConn: %v", err)
340 // Add servers[1] to the service discovery.
341 var updates []*naming.Update
342 updates = append(updates, &naming.Update{
344 Addr: "localhost:" + servers[1].port,
349 // Loop until servers[1] is up
351 if err := Invoke(context.Background(), "/foo/bar", &req, &reply, cc); err != nil && ErrorDesc(err) == servers[1].port {
354 time.Sleep(10 * time.Millisecond)
357 var wg sync.WaitGroup
359 sleepDuration := 10 * time.Millisecond
362 time.Sleep(sleepDuration)
363 // After sleepDuration, kill server[0].
368 // All non-failfast RPCs should not block because there's at least one connection available.
369 for i := 0; i < numRPC; i++ {
372 time.Sleep(sleepDuration)
373 // After sleepDuration, invoke RPC.
374 // server[0] is killed around the same time to make it racy between balancer and gRPC internals.
375 Invoke(context.Background(), "/foo/bar", &req, &reply, cc, FailFast(false))
382 func TestOneAddressRemoval(t *testing.T) {
383 defer leakcheck.Check(t)
386 servers, r, cleanup := startServers(t, numServers, math.MaxUint32)
388 cc, err := Dial("passthrough:///foo.bar.com", WithBalancer(RoundRobin(r)), WithBlock(), WithInsecure(), WithCodec(testCodec{}))
390 t.Fatalf("Failed to create ClientConn: %v", err)
393 // Add servers[1] to the service discovery.
394 var updates []*naming.Update
395 updates = append(updates, &naming.Update{
397 Addr: "localhost:" + servers[1].port,
402 // Loop until servers[1] is up
404 if err := Invoke(context.Background(), "/foo/bar", &req, &reply, cc); err != nil && ErrorDesc(err) == servers[1].port {
407 time.Sleep(10 * time.Millisecond)
410 var wg sync.WaitGroup
412 sleepDuration := 10 * time.Millisecond
415 time.Sleep(sleepDuration)
416 // After sleepDuration, delete server[0].
417 var updates []*naming.Update
418 updates = append(updates, &naming.Update{
420 Addr: "localhost:" + servers[0].port,
426 // All non-failfast RPCs should not fail because there's at least one connection available.
427 for i := 0; i < numRPC; i++ {
431 time.Sleep(sleepDuration)
432 // After sleepDuration, invoke RPC.
433 // server[0] is removed around the same time to make it racy between balancer and gRPC internals.
434 if err := Invoke(context.Background(), "/foo/bar", &expectedRequest, &reply, cc, FailFast(false)); err != nil {
435 t.Errorf("grpc.Invoke(_, _, _, _, _) = %v, want not nil", err)
443 func checkServerUp(t *testing.T, currentServer *server) {
445 port := currentServer.port
446 cc, err := Dial("passthrough:///localhost:"+port, WithBlock(), WithInsecure(), WithCodec(testCodec{}))
448 t.Fatalf("Failed to create ClientConn: %v", err)
453 if err := Invoke(context.Background(), "/foo/bar", &req, &reply, cc); err != nil && ErrorDesc(err) == port {
456 time.Sleep(10 * time.Millisecond)
460 func TestPickFirstEmptyAddrs(t *testing.T) {
461 defer leakcheck.Check(t)
462 servers, r, cleanup := startServers(t, 1, math.MaxUint32)
464 cc, err := Dial("passthrough:///foo.bar.com", WithBalancer(pickFirstBalancerV1(r)), WithBlock(), WithInsecure(), WithCodec(testCodec{}))
466 t.Fatalf("Failed to create ClientConn: %v", err)
470 if err := Invoke(context.Background(), "/foo/bar", &expectedRequest, &reply, cc); err != nil || reply != expectedResponse {
471 t.Fatalf("grpc.Invoke(_, _, _, _, _) = %v, reply = %q, want %q, <nil>", err, reply, expectedResponse)
473 // Inject name resolution change to remove the server so that there is no address
474 // available after that.
477 Addr: "localhost:" + servers[0].port,
479 r.w.inject([]*naming.Update{u})
480 // Loop until the above updates apply.
482 time.Sleep(10 * time.Millisecond)
483 ctx, cancel := context.WithTimeout(context.Background(), 10*time.Millisecond)
484 if err := Invoke(ctx, "/foo/bar", &expectedRequest, &reply, cc); err != nil {
492 func TestPickFirstCloseWithPendingRPC(t *testing.T) {
493 defer leakcheck.Check(t)
494 servers, r, cleanup := startServers(t, 1, math.MaxUint32)
496 cc, err := Dial("passthrough:///foo.bar.com", WithBalancer(pickFirstBalancerV1(r)), WithBlock(), WithInsecure(), WithCodec(testCodec{}))
498 t.Fatalf("Failed to create ClientConn: %v", err)
502 if err := Invoke(context.Background(), "/foo/bar", &expectedRequest, &reply, cc, FailFast(false)); err != nil {
503 t.Fatalf("grpc.Invoke(_, _, _, _, _) = %v, want %s", err, servers[0].port)
505 // Remove the server.
506 updates := []*naming.Update{{
508 Addr: "localhost:" + servers[0].port,
511 // Loop until the above update applies.
513 ctx, cancel := context.WithTimeout(context.Background(), 10*time.Millisecond)
514 if err := Invoke(ctx, "/foo/bar", &expectedRequest, &reply, cc, FailFast(false)); Code(err) == codes.DeadlineExceeded {
518 time.Sleep(10 * time.Millisecond)
521 // Issue 2 RPCs which should be completed with error status once cc is closed.
522 var wg sync.WaitGroup
527 if err := Invoke(context.Background(), "/foo/bar", &expectedRequest, &reply, cc, FailFast(false)); err == nil {
528 t.Errorf("grpc.Invoke(_, _, _, _, _) = %v, want not nil", err)
534 time.Sleep(5 * time.Millisecond)
535 if err := Invoke(context.Background(), "/foo/bar", &expectedRequest, &reply, cc, FailFast(false)); err == nil {
536 t.Errorf("grpc.Invoke(_, _, _, _, _) = %v, want not nil", err)
539 time.Sleep(5 * time.Millisecond)
544 func TestPickFirstOrderAllServerUp(t *testing.T) {
545 defer leakcheck.Check(t)
546 // Start 3 servers on 3 ports.
548 servers, r, cleanup := startServers(t, numServers, math.MaxUint32)
550 cc, err := Dial("passthrough:///foo.bar.com", WithBalancer(pickFirstBalancerV1(r)), WithBlock(), WithInsecure(), WithCodec(testCodec{}))
552 t.Fatalf("Failed to create ClientConn: %v", err)
555 // Add servers[1] and [2] to the service discovery.
558 Addr: "localhost:" + servers[1].port,
560 r.w.inject([]*naming.Update{u})
564 Addr: "localhost:" + servers[2].port,
566 r.w.inject([]*naming.Update{u})
568 // Loop until all 3 servers are up
569 checkServerUp(t, servers[0])
570 checkServerUp(t, servers[1])
571 checkServerUp(t, servers[2])
573 // Check the incoming RPCs served in server[0]
576 for i := 0; i < 20; i++ {
577 if err := Invoke(context.Background(), "/foo/bar", &req, &reply, cc); err == nil || ErrorDesc(err) != servers[0].port {
578 t.Fatalf("Index %d: Invoke(_, _, _, _, _) = %v, want %s", 0, err, servers[0].port)
580 time.Sleep(10 * time.Millisecond)
583 // Delete server[0] in the balancer, the incoming RPCs served in server[1]
584 // For test addrconn, close server[0] instead
587 Addr: "localhost:" + servers[0].port,
589 r.w.inject([]*naming.Update{u})
590 // Loop until it changes to server[1]
592 if err := Invoke(context.Background(), "/foo/bar", &req, &reply, cc); err != nil && ErrorDesc(err) == servers[1].port {
595 time.Sleep(10 * time.Millisecond)
597 for i := 0; i < 20; i++ {
598 if err := Invoke(context.Background(), "/foo/bar", &req, &reply, cc); err == nil || ErrorDesc(err) != servers[1].port {
599 t.Fatalf("Index %d: Invoke(_, _, _, _, _) = %v, want %s", 1, err, servers[1].port)
601 time.Sleep(10 * time.Millisecond)
604 // Add server[0] back to the balancer, the incoming RPCs served in server[1]
605 // Add is append operation, the order of Notify now is {server[1].port server[2].port server[0].port}
608 Addr: "localhost:" + servers[0].port,
610 r.w.inject([]*naming.Update{u})
611 for i := 0; i < 20; i++ {
612 if err := Invoke(context.Background(), "/foo/bar", &req, &reply, cc); err == nil || ErrorDesc(err) != servers[1].port {
613 t.Fatalf("Index %d: Invoke(_, _, _, _, _) = %v, want %s", 1, err, servers[1].port)
615 time.Sleep(10 * time.Millisecond)
618 // Delete server[1] in the balancer, the incoming RPCs served in server[2]
621 Addr: "localhost:" + servers[1].port,
623 r.w.inject([]*naming.Update{u})
625 if err := Invoke(context.Background(), "/foo/bar", &req, &reply, cc); err != nil && ErrorDesc(err) == servers[2].port {
628 time.Sleep(1 * time.Second)
630 for i := 0; i < 20; i++ {
631 if err := Invoke(context.Background(), "/foo/bar", &req, &reply, cc); err == nil || ErrorDesc(err) != servers[2].port {
632 t.Fatalf("Index %d: Invoke(_, _, _, _, _) = %v, want %s", 2, err, servers[2].port)
634 time.Sleep(10 * time.Millisecond)
637 // Delete server[2] in the balancer, the incoming RPCs served in server[0]
640 Addr: "localhost:" + servers[2].port,
642 r.w.inject([]*naming.Update{u})
644 if err := Invoke(context.Background(), "/foo/bar", &req, &reply, cc); err != nil && ErrorDesc(err) == servers[0].port {
647 time.Sleep(1 * time.Second)
649 for i := 0; i < 20; i++ {
650 if err := Invoke(context.Background(), "/foo/bar", &req, &reply, cc); err == nil || ErrorDesc(err) != servers[0].port {
651 t.Fatalf("Index %d: Invoke(_, _, _, _, _) = %v, want %s", 0, err, servers[0].port)
653 time.Sleep(10 * time.Millisecond)
657 func TestPickFirstOrderOneServerDown(t *testing.T) {
658 defer leakcheck.Check(t)
659 // Start 3 servers on 3 ports.
661 servers, r, cleanup := startServers(t, numServers, math.MaxUint32)
663 cc, err := Dial("passthrough:///foo.bar.com", WithBalancer(pickFirstBalancerV1(r)), WithBlock(), WithInsecure(), WithCodec(testCodec{}))
665 t.Fatalf("Failed to create ClientConn: %v", err)
668 // Add servers[1] and [2] to the service discovery.
671 Addr: "localhost:" + servers[1].port,
673 r.w.inject([]*naming.Update{u})
677 Addr: "localhost:" + servers[2].port,
679 r.w.inject([]*naming.Update{u})
681 // Loop until all 3 servers are up
682 checkServerUp(t, servers[0])
683 checkServerUp(t, servers[1])
684 checkServerUp(t, servers[2])
686 // Check the incoming RPCs served in server[0]
689 for i := 0; i < 20; i++ {
690 if err := Invoke(context.Background(), "/foo/bar", &req, &reply, cc); err == nil || ErrorDesc(err) != servers[0].port {
691 t.Fatalf("Index %d: Invoke(_, _, _, _, _) = %v, want %s", 0, err, servers[0].port)
693 time.Sleep(10 * time.Millisecond)
696 // server[0] down, incoming RPCs served in server[1], but the order of Notify still remains
697 // {server[0] server[1] server[2]}
699 // Loop until it changes to server[1]
701 if err := Invoke(context.Background(), "/foo/bar", &req, &reply, cc); err != nil && ErrorDesc(err) == servers[1].port {
704 time.Sleep(10 * time.Millisecond)
706 for i := 0; i < 20; i++ {
707 if err := Invoke(context.Background(), "/foo/bar", &req, &reply, cc); err == nil || ErrorDesc(err) != servers[1].port {
708 t.Fatalf("Index %d: Invoke(_, _, _, _, _) = %v, want %s", 1, err, servers[1].port)
710 time.Sleep(10 * time.Millisecond)
713 // up the server[0] back, the incoming RPCs served in server[1]
714 p, _ := strconv.Atoi(servers[0].port)
715 servers[0] = newTestServer()
716 go servers[0].start(t, p, math.MaxUint32)
717 defer servers[0].stop()
718 servers[0].wait(t, 2*time.Second)
719 checkServerUp(t, servers[0])
721 for i := 0; i < 20; i++ {
722 if err := Invoke(context.Background(), "/foo/bar", &req, &reply, cc); err == nil || ErrorDesc(err) != servers[1].port {
723 t.Fatalf("Index %d: Invoke(_, _, _, _, _) = %v, want %s", 1, err, servers[1].port)
725 time.Sleep(10 * time.Millisecond)
728 // Delete server[1] in the balancer, the incoming RPCs served in server[0]
731 Addr: "localhost:" + servers[1].port,
733 r.w.inject([]*naming.Update{u})
735 if err := Invoke(context.Background(), "/foo/bar", &req, &reply, cc); err != nil && ErrorDesc(err) == servers[0].port {
738 time.Sleep(1 * time.Second)
740 for i := 0; i < 20; i++ {
741 if err := Invoke(context.Background(), "/foo/bar", &req, &reply, cc); err == nil || ErrorDesc(err) != servers[0].port {
742 t.Fatalf("Index %d: Invoke(_, _, _, _, _) = %v, want %s", 0, err, servers[0].port)
744 time.Sleep(10 * time.Millisecond)
748 func TestPickFirstOneAddressRemoval(t *testing.T) {
749 defer leakcheck.Check(t)
752 servers, r, cleanup := startServers(t, numServers, math.MaxUint32)
754 cc, err := Dial("passthrough:///localhost:"+servers[0].port, WithBalancer(pickFirstBalancerV1(r)), WithBlock(), WithInsecure(), WithCodec(testCodec{}))
756 t.Fatalf("Failed to create ClientConn: %v", err)
759 // Add servers[1] to the service discovery.
760 var updates []*naming.Update
761 updates = append(updates, &naming.Update{
763 Addr: "localhost:" + servers[1].port,
767 // Create a new cc to Loop until servers[1] is up
768 checkServerUp(t, servers[0])
769 checkServerUp(t, servers[1])
771 var wg sync.WaitGroup
773 sleepDuration := 10 * time.Millisecond
776 time.Sleep(sleepDuration)
777 // After sleepDuration, delete server[0].
778 var updates []*naming.Update
779 updates = append(updates, &naming.Update{
781 Addr: "localhost:" + servers[0].port,
787 // All non-failfast RPCs should not fail because there's at least one connection available.
788 for i := 0; i < numRPC; i++ {
792 time.Sleep(sleepDuration)
793 // After sleepDuration, invoke RPC.
794 // server[0] is removed around the same time to make it racy between balancer and gRPC internals.
795 if err := Invoke(context.Background(), "/foo/bar", &expectedRequest, &reply, cc, FailFast(false)); err != nil {
796 t.Errorf("grpc.Invoke(_, _, _, _, _) = %v, want not nil", err)