3 * Copyright 2017 gRPC authors.
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
9 * http://www.apache.org/licenses/LICENSE-2.0
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
19 package roundrobin_test
28 "golang.org/x/net/context"
29 "google.golang.org/grpc"
30 "google.golang.org/grpc/balancer"
31 "google.golang.org/grpc/codes"
32 _ "google.golang.org/grpc/grpclog/glogger"
33 "google.golang.org/grpc/peer"
34 "google.golang.org/grpc/resolver"
35 "google.golang.org/grpc/resolver/manual"
36 testpb "google.golang.org/grpc/test/grpc_testing"
37 "google.golang.org/grpc/test/leakcheck"
40 var rr = balancer.Get("roundrobin")
42 type testServer struct {
43 testpb.TestServiceServer
46 func (s *testServer) EmptyCall(ctx context.Context, in *testpb.Empty) (*testpb.Empty, error) {
47 return &testpb.Empty{}, nil
50 func (s *testServer) FullDuplexCall(stream testpb.TestService_FullDuplexCallServer) error {
55 servers []*grpc.Server
59 func (t *test) cleanup() {
60 for _, s := range t.servers {
65 func startTestServers(count int) (_ *test, err error) {
70 for _, s := range t.servers {
75 for i := 0; i < count; i++ {
76 lis, err := net.Listen("tcp", "localhost:0")
78 return nil, fmt.Errorf("Failed to listen %v", err)
82 testpb.RegisterTestServiceServer(s, &testServer{})
83 t.servers = append(t.servers, s)
84 t.addresses = append(t.addresses, lis.Addr().String())
86 go func(s *grpc.Server, l net.Listener) {
94 func TestOneBackend(t *testing.T) {
95 defer leakcheck.Check(t)
96 r, cleanup := manual.GenerateAndRegisterManualResolver()
99 test, err := startTestServers(1)
101 t.Fatalf("failed to start servers: %v", err)
105 cc, err := grpc.Dial(r.Scheme()+":///test.server", grpc.WithInsecure(), grpc.WithBalancerBuilder(rr))
107 t.Fatalf("failed to dial: %v", err)
110 testc := testpb.NewTestServiceClient(cc)
111 // The first RPC should fail because there's no address.
112 ctx, cancel := context.WithTimeout(context.Background(), time.Millisecond)
114 if _, err := testc.EmptyCall(ctx, &testpb.Empty{}); err == nil || grpc.Code(err) != codes.DeadlineExceeded {
115 t.Fatalf("EmptyCall() = _, %v, want _, DeadlineExceeded", err)
118 r.NewAddress([]resolver.Address{{Addr: test.addresses[0]}})
119 // The second RPC should succeed.
120 if _, err := testc.EmptyCall(context.Background(), &testpb.Empty{}); err != nil {
121 t.Fatalf("EmptyCall() = _, %v, want _, <nil>", err)
125 func TestBackendsRoundRobin(t *testing.T) {
126 defer leakcheck.Check(t)
127 r, cleanup := manual.GenerateAndRegisterManualResolver()
131 test, err := startTestServers(backendCount)
133 t.Fatalf("failed to start servers: %v", err)
137 cc, err := grpc.Dial(r.Scheme()+":///test.server", grpc.WithInsecure(), grpc.WithBalancerBuilder(rr))
139 t.Fatalf("failed to dial: %v", err)
142 testc := testpb.NewTestServiceClient(cc)
143 // The first RPC should fail because there's no address.
144 ctx, cancel := context.WithTimeout(context.Background(), time.Millisecond)
146 if _, err := testc.EmptyCall(ctx, &testpb.Empty{}); err == nil || grpc.Code(err) != codes.DeadlineExceeded {
147 t.Fatalf("EmptyCall() = _, %v, want _, DeadlineExceeded", err)
150 var resolvedAddrs []resolver.Address
151 for i := 0; i < backendCount; i++ {
152 resolvedAddrs = append(resolvedAddrs, resolver.Address{Addr: test.addresses[i]})
155 r.NewAddress(resolvedAddrs)
157 // Make sure connections to all servers are up.
158 for si := 0; si < backendCount; si++ {
160 for i := 0; i < 1000; i++ {
161 if _, err := testc.EmptyCall(context.Background(), &testpb.Empty{}, grpc.Peer(&p)); err != nil {
162 t.Fatalf("EmptyCall() = _, %v, want _, <nil>", err)
164 if p.Addr.String() == test.addresses[si] {
168 time.Sleep(time.Millisecond)
171 t.Fatalf("Connection to %v was not up after more than 1 second", test.addresses[si])
175 for i := 0; i < 3*backendCount; i++ {
176 if _, err := testc.EmptyCall(context.Background(), &testpb.Empty{}, grpc.Peer(&p)); err != nil {
177 t.Fatalf("EmptyCall() = _, %v, want _, <nil>", err)
179 if p.Addr.String() != test.addresses[i%backendCount] {
180 t.Fatalf("Index %d: want peer %v, got peer %v", i, test.addresses[i%backendCount], p.Addr.String())
185 func TestAddressesRemoved(t *testing.T) {
186 defer leakcheck.Check(t)
187 r, cleanup := manual.GenerateAndRegisterManualResolver()
190 test, err := startTestServers(1)
192 t.Fatalf("failed to start servers: %v", err)
196 cc, err := grpc.Dial(r.Scheme()+":///test.server", grpc.WithInsecure(), grpc.WithBalancerBuilder(rr))
198 t.Fatalf("failed to dial: %v", err)
201 testc := testpb.NewTestServiceClient(cc)
202 // The first RPC should fail because there's no address.
203 ctx, cancel := context.WithTimeout(context.Background(), time.Millisecond)
205 if _, err := testc.EmptyCall(ctx, &testpb.Empty{}); err == nil || grpc.Code(err) != codes.DeadlineExceeded {
206 t.Fatalf("EmptyCall() = _, %v, want _, DeadlineExceeded", err)
209 r.NewAddress([]resolver.Address{{Addr: test.addresses[0]}})
210 // The second RPC should succeed.
211 if _, err := testc.EmptyCall(context.Background(), &testpb.Empty{}); err != nil {
212 t.Fatalf("EmptyCall() = _, %v, want _, <nil>", err)
215 r.NewAddress([]resolver.Address{})
216 for i := 0; i < 1000; i++ {
217 ctx, cancel := context.WithTimeout(context.Background(), 50*time.Millisecond)
219 if _, err := testc.EmptyCall(ctx, &testpb.Empty{}, grpc.FailFast(false)); grpc.Code(err) == codes.DeadlineExceeded {
222 time.Sleep(time.Millisecond)
224 t.Fatalf("No RPC failed after removing all addresses, want RPC to fail with DeadlineExceeded")
227 func TestCloseWithPendingRPC(t *testing.T) {
228 defer leakcheck.Check(t)
229 r, cleanup := manual.GenerateAndRegisterManualResolver()
232 test, err := startTestServers(1)
234 t.Fatalf("failed to start servers: %v", err)
238 cc, err := grpc.Dial(r.Scheme()+":///test.server", grpc.WithInsecure(), grpc.WithBalancerBuilder(rr))
240 t.Fatalf("failed to dial: %v", err)
242 testc := testpb.NewTestServiceClient(cc)
244 var wg sync.WaitGroup
245 for i := 0; i < 3; i++ {
249 // This RPC blocks until cc is closed.
250 ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
251 if _, err := testc.EmptyCall(ctx, &testpb.Empty{}); grpc.Code(err) == codes.DeadlineExceeded {
252 t.Errorf("RPC failed because of deadline after cc is closed; want error the client connection is closing")
261 func TestNewAddressWhileBlocking(t *testing.T) {
262 defer leakcheck.Check(t)
263 r, cleanup := manual.GenerateAndRegisterManualResolver()
266 test, err := startTestServers(1)
268 t.Fatalf("failed to start servers: %v", err)
272 cc, err := grpc.Dial(r.Scheme()+":///test.server", grpc.WithInsecure(), grpc.WithBalancerBuilder(rr))
274 t.Fatalf("failed to dial: %v", err)
277 testc := testpb.NewTestServiceClient(cc)
278 // The first RPC should fail because there's no address.
279 ctx, cancel := context.WithTimeout(context.Background(), time.Millisecond)
281 if _, err := testc.EmptyCall(ctx, &testpb.Empty{}); err == nil || grpc.Code(err) != codes.DeadlineExceeded {
282 t.Fatalf("EmptyCall() = _, %v, want _, DeadlineExceeded", err)
285 r.NewAddress([]resolver.Address{{Addr: test.addresses[0]}})
286 // The second RPC should succeed.
287 ctx, cancel = context.WithTimeout(context.Background(), 2*time.Second)
289 if _, err := testc.EmptyCall(ctx, &testpb.Empty{}); err != nil {
290 t.Fatalf("EmptyCall() = _, %v, want _, nil", err)
293 r.NewAddress([]resolver.Address{})
295 var wg sync.WaitGroup
296 for i := 0; i < 3; i++ {
300 // This RPC blocks until NewAddress is called.
301 testc.EmptyCall(context.Background(), &testpb.Empty{})
304 time.Sleep(50 * time.Millisecond)
305 r.NewAddress([]resolver.Address{{Addr: test.addresses[0]}})
309 func TestOneServerDown(t *testing.T) {
310 defer leakcheck.Check(t)
311 r, cleanup := manual.GenerateAndRegisterManualResolver()
315 test, err := startTestServers(backendCount)
317 t.Fatalf("failed to start servers: %v", err)
321 cc, err := grpc.Dial(r.Scheme()+":///test.server", grpc.WithInsecure(), grpc.WithBalancerBuilder(rr))
323 t.Fatalf("failed to dial: %v", err)
326 testc := testpb.NewTestServiceClient(cc)
327 // The first RPC should fail because there's no address.
328 ctx, cancel := context.WithTimeout(context.Background(), time.Millisecond)
330 if _, err := testc.EmptyCall(ctx, &testpb.Empty{}); err == nil || grpc.Code(err) != codes.DeadlineExceeded {
331 t.Fatalf("EmptyCall() = _, %v, want _, DeadlineExceeded", err)
334 var resolvedAddrs []resolver.Address
335 for i := 0; i < backendCount; i++ {
336 resolvedAddrs = append(resolvedAddrs, resolver.Address{Addr: test.addresses[i]})
339 r.NewAddress(resolvedAddrs)
341 // Make sure connections to all servers are up.
342 for si := 0; si < backendCount; si++ {
344 for i := 0; i < 1000; i++ {
345 if _, err := testc.EmptyCall(context.Background(), &testpb.Empty{}, grpc.Peer(&p)); err != nil {
346 t.Fatalf("EmptyCall() = _, %v, want _, <nil>", err)
348 if p.Addr.String() == test.addresses[si] {
352 time.Sleep(time.Millisecond)
355 t.Fatalf("Connection to %v was not up after more than 1 second", test.addresses[si])
359 for i := 0; i < 3*backendCount; i++ {
360 if _, err := testc.EmptyCall(context.Background(), &testpb.Empty{}, grpc.Peer(&p)); err != nil {
361 t.Fatalf("EmptyCall() = _, %v, want _, <nil>", err)
363 if p.Addr.String() != test.addresses[i%backendCount] {
364 t.Fatalf("Index %d: want peer %v, got peer %v", i, test.addresses[i%backendCount], p.Addr.String())
368 // Stop one server, RPCs should roundrobin among the remaining servers.
370 test.servers[backendCount].Stop()
371 // Loop until see server[backendCount-1] twice without seeing server[backendCount].
373 for i := 0; i < 1000; i++ {
374 if _, err := testc.EmptyCall(context.Background(), &testpb.Empty{}, grpc.Peer(&p)); err != nil {
375 t.Logf("EmptyCall() = _, %v, want _, <nil>", err)
376 // Due to a race, this RPC could possibly get the connection that
377 // was closing, and this RPC may fail. Keep trying when this
381 switch p.Addr.String() {
382 case test.addresses[backendCount-1]:
384 case test.addresses[backendCount]:
385 // Reset targetSeen if peer is server[backendCount].
388 // Break to make sure the last picked address is server[-1], so the following for loop won't be flaky.
394 t.Fatal("Failed to see server[backendCount-1] twice without seeing server[backendCount]")
396 for i := 0; i < 3*backendCount; i++ {
397 if _, err := testc.EmptyCall(context.Background(), &testpb.Empty{}, grpc.Peer(&p)); err != nil {
398 t.Fatalf("EmptyCall() = _, %v, want _, <nil>", err)
400 if p.Addr.String() != test.addresses[i%backendCount] {
401 t.Errorf("Index %d: want peer %v, got peer %v", i, test.addresses[i%backendCount], p.Addr.String())
406 func TestAllServersDown(t *testing.T) {
407 defer leakcheck.Check(t)
408 r, cleanup := manual.GenerateAndRegisterManualResolver()
412 test, err := startTestServers(backendCount)
414 t.Fatalf("failed to start servers: %v", err)
418 cc, err := grpc.Dial(r.Scheme()+":///test.server", grpc.WithInsecure(), grpc.WithBalancerBuilder(rr))
420 t.Fatalf("failed to dial: %v", err)
423 testc := testpb.NewTestServiceClient(cc)
424 // The first RPC should fail because there's no address.
425 ctx, cancel := context.WithTimeout(context.Background(), time.Millisecond)
427 if _, err := testc.EmptyCall(ctx, &testpb.Empty{}); err == nil || grpc.Code(err) != codes.DeadlineExceeded {
428 t.Fatalf("EmptyCall() = _, %v, want _, DeadlineExceeded", err)
431 var resolvedAddrs []resolver.Address
432 for i := 0; i < backendCount; i++ {
433 resolvedAddrs = append(resolvedAddrs, resolver.Address{Addr: test.addresses[i]})
436 r.NewAddress(resolvedAddrs)
438 // Make sure connections to all servers are up.
439 for si := 0; si < backendCount; si++ {
441 for i := 0; i < 1000; i++ {
442 if _, err := testc.EmptyCall(context.Background(), &testpb.Empty{}, grpc.Peer(&p)); err != nil {
443 t.Fatalf("EmptyCall() = _, %v, want _, <nil>", err)
445 if p.Addr.String() == test.addresses[si] {
449 time.Sleep(time.Millisecond)
452 t.Fatalf("Connection to %v was not up after more than 1 second", test.addresses[si])
456 for i := 0; i < 3*backendCount; i++ {
457 if _, err := testc.EmptyCall(context.Background(), &testpb.Empty{}, grpc.Peer(&p)); err != nil {
458 t.Fatalf("EmptyCall() = _, %v, want _, <nil>", err)
460 if p.Addr.String() != test.addresses[i%backendCount] {
461 t.Fatalf("Index %d: want peer %v, got peer %v", i, test.addresses[i%backendCount], p.Addr.String())
465 // All servers are stopped, failfast RPC should fail with unavailable.
466 for i := 0; i < backendCount; i++ {
467 test.servers[i].Stop()
469 time.Sleep(100 * time.Millisecond)
470 for i := 0; i < 1000; i++ {
471 if _, err := testc.EmptyCall(context.Background(), &testpb.Empty{}); grpc.Code(err) == codes.Unavailable {
474 time.Sleep(time.Millisecond)
476 t.Fatalf("Failfast RPCs didn't fail with Unavailable after all servers are stopped")