OSDN Git Service

new repo
[bytom/vapor.git] / vendor / google.golang.org / grpc / balancer / roundrobin / roundrobin_test.go
1 /*
2  *
3  * Copyright 2017 gRPC authors.
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  *     http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  *
17  */
18
19 package roundrobin_test
20
21 import (
22         "fmt"
23         "net"
24         "sync"
25         "testing"
26         "time"
27
28         "golang.org/x/net/context"
29         "google.golang.org/grpc"
30         "google.golang.org/grpc/balancer"
31         "google.golang.org/grpc/codes"
32         _ "google.golang.org/grpc/grpclog/glogger"
33         "google.golang.org/grpc/peer"
34         "google.golang.org/grpc/resolver"
35         "google.golang.org/grpc/resolver/manual"
36         testpb "google.golang.org/grpc/test/grpc_testing"
37         "google.golang.org/grpc/test/leakcheck"
38 )
39
40 var rr = balancer.Get("roundrobin")
41
42 type testServer struct {
43         testpb.TestServiceServer
44 }
45
46 func (s *testServer) EmptyCall(ctx context.Context, in *testpb.Empty) (*testpb.Empty, error) {
47         return &testpb.Empty{}, nil
48 }
49
50 func (s *testServer) FullDuplexCall(stream testpb.TestService_FullDuplexCallServer) error {
51         return nil
52 }
53
54 type test struct {
55         servers   []*grpc.Server
56         addresses []string
57 }
58
59 func (t *test) cleanup() {
60         for _, s := range t.servers {
61                 s.Stop()
62         }
63 }
64
65 func startTestServers(count int) (_ *test, err error) {
66         t := &test{}
67
68         defer func() {
69                 if err != nil {
70                         for _, s := range t.servers {
71                                 s.Stop()
72                         }
73                 }
74         }()
75         for i := 0; i < count; i++ {
76                 lis, err := net.Listen("tcp", "localhost:0")
77                 if err != nil {
78                         return nil, fmt.Errorf("Failed to listen %v", err)
79                 }
80
81                 s := grpc.NewServer()
82                 testpb.RegisterTestServiceServer(s, &testServer{})
83                 t.servers = append(t.servers, s)
84                 t.addresses = append(t.addresses, lis.Addr().String())
85
86                 go func(s *grpc.Server, l net.Listener) {
87                         s.Serve(l)
88                 }(s, lis)
89         }
90
91         return t, nil
92 }
93
94 func TestOneBackend(t *testing.T) {
95         defer leakcheck.Check(t)
96         r, cleanup := manual.GenerateAndRegisterManualResolver()
97         defer cleanup()
98
99         test, err := startTestServers(1)
100         if err != nil {
101                 t.Fatalf("failed to start servers: %v", err)
102         }
103         defer test.cleanup()
104
105         cc, err := grpc.Dial(r.Scheme()+":///test.server", grpc.WithInsecure(), grpc.WithBalancerBuilder(rr))
106         if err != nil {
107                 t.Fatalf("failed to dial: %v", err)
108         }
109         defer cc.Close()
110         testc := testpb.NewTestServiceClient(cc)
111         // The first RPC should fail because there's no address.
112         ctx, cancel := context.WithTimeout(context.Background(), time.Millisecond)
113         defer cancel()
114         if _, err := testc.EmptyCall(ctx, &testpb.Empty{}); err == nil || grpc.Code(err) != codes.DeadlineExceeded {
115                 t.Fatalf("EmptyCall() = _, %v, want _, DeadlineExceeded", err)
116         }
117
118         r.NewAddress([]resolver.Address{{Addr: test.addresses[0]}})
119         // The second RPC should succeed.
120         if _, err := testc.EmptyCall(context.Background(), &testpb.Empty{}); err != nil {
121                 t.Fatalf("EmptyCall() = _, %v, want _, <nil>", err)
122         }
123 }
124
125 func TestBackendsRoundRobin(t *testing.T) {
126         defer leakcheck.Check(t)
127         r, cleanup := manual.GenerateAndRegisterManualResolver()
128         defer cleanup()
129
130         backendCount := 5
131         test, err := startTestServers(backendCount)
132         if err != nil {
133                 t.Fatalf("failed to start servers: %v", err)
134         }
135         defer test.cleanup()
136
137         cc, err := grpc.Dial(r.Scheme()+":///test.server", grpc.WithInsecure(), grpc.WithBalancerBuilder(rr))
138         if err != nil {
139                 t.Fatalf("failed to dial: %v", err)
140         }
141         defer cc.Close()
142         testc := testpb.NewTestServiceClient(cc)
143         // The first RPC should fail because there's no address.
144         ctx, cancel := context.WithTimeout(context.Background(), time.Millisecond)
145         defer cancel()
146         if _, err := testc.EmptyCall(ctx, &testpb.Empty{}); err == nil || grpc.Code(err) != codes.DeadlineExceeded {
147                 t.Fatalf("EmptyCall() = _, %v, want _, DeadlineExceeded", err)
148         }
149
150         var resolvedAddrs []resolver.Address
151         for i := 0; i < backendCount; i++ {
152                 resolvedAddrs = append(resolvedAddrs, resolver.Address{Addr: test.addresses[i]})
153         }
154
155         r.NewAddress(resolvedAddrs)
156         var p peer.Peer
157         // Make sure connections to all servers are up.
158         for si := 0; si < backendCount; si++ {
159                 var connected bool
160                 for i := 0; i < 1000; i++ {
161                         if _, err := testc.EmptyCall(context.Background(), &testpb.Empty{}, grpc.Peer(&p)); err != nil {
162                                 t.Fatalf("EmptyCall() = _, %v, want _, <nil>", err)
163                         }
164                         if p.Addr.String() == test.addresses[si] {
165                                 connected = true
166                                 break
167                         }
168                         time.Sleep(time.Millisecond)
169                 }
170                 if !connected {
171                         t.Fatalf("Connection to %v was not up after more than 1 second", test.addresses[si])
172                 }
173         }
174
175         for i := 0; i < 3*backendCount; i++ {
176                 if _, err := testc.EmptyCall(context.Background(), &testpb.Empty{}, grpc.Peer(&p)); err != nil {
177                         t.Fatalf("EmptyCall() = _, %v, want _, <nil>", err)
178                 }
179                 if p.Addr.String() != test.addresses[i%backendCount] {
180                         t.Fatalf("Index %d: want peer %v, got peer %v", i, test.addresses[i%backendCount], p.Addr.String())
181                 }
182         }
183 }
184
185 func TestAddressesRemoved(t *testing.T) {
186         defer leakcheck.Check(t)
187         r, cleanup := manual.GenerateAndRegisterManualResolver()
188         defer cleanup()
189
190         test, err := startTestServers(1)
191         if err != nil {
192                 t.Fatalf("failed to start servers: %v", err)
193         }
194         defer test.cleanup()
195
196         cc, err := grpc.Dial(r.Scheme()+":///test.server", grpc.WithInsecure(), grpc.WithBalancerBuilder(rr))
197         if err != nil {
198                 t.Fatalf("failed to dial: %v", err)
199         }
200         defer cc.Close()
201         testc := testpb.NewTestServiceClient(cc)
202         // The first RPC should fail because there's no address.
203         ctx, cancel := context.WithTimeout(context.Background(), time.Millisecond)
204         defer cancel()
205         if _, err := testc.EmptyCall(ctx, &testpb.Empty{}); err == nil || grpc.Code(err) != codes.DeadlineExceeded {
206                 t.Fatalf("EmptyCall() = _, %v, want _, DeadlineExceeded", err)
207         }
208
209         r.NewAddress([]resolver.Address{{Addr: test.addresses[0]}})
210         // The second RPC should succeed.
211         if _, err := testc.EmptyCall(context.Background(), &testpb.Empty{}); err != nil {
212                 t.Fatalf("EmptyCall() = _, %v, want _, <nil>", err)
213         }
214
215         r.NewAddress([]resolver.Address{})
216         for i := 0; i < 1000; i++ {
217                 ctx, cancel := context.WithTimeout(context.Background(), 50*time.Millisecond)
218                 defer cancel()
219                 if _, err := testc.EmptyCall(ctx, &testpb.Empty{}, grpc.FailFast(false)); grpc.Code(err) == codes.DeadlineExceeded {
220                         return
221                 }
222                 time.Sleep(time.Millisecond)
223         }
224         t.Fatalf("No RPC failed after removing all addresses, want RPC to fail with DeadlineExceeded")
225 }
226
227 func TestCloseWithPendingRPC(t *testing.T) {
228         defer leakcheck.Check(t)
229         r, cleanup := manual.GenerateAndRegisterManualResolver()
230         defer cleanup()
231
232         test, err := startTestServers(1)
233         if err != nil {
234                 t.Fatalf("failed to start servers: %v", err)
235         }
236         defer test.cleanup()
237
238         cc, err := grpc.Dial(r.Scheme()+":///test.server", grpc.WithInsecure(), grpc.WithBalancerBuilder(rr))
239         if err != nil {
240                 t.Fatalf("failed to dial: %v", err)
241         }
242         testc := testpb.NewTestServiceClient(cc)
243
244         var wg sync.WaitGroup
245         for i := 0; i < 3; i++ {
246                 wg.Add(1)
247                 go func() {
248                         defer wg.Done()
249                         // This RPC blocks until cc is closed.
250                         ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
251                         if _, err := testc.EmptyCall(ctx, &testpb.Empty{}); grpc.Code(err) == codes.DeadlineExceeded {
252                                 t.Errorf("RPC failed because of deadline after cc is closed; want error the client connection is closing")
253                         }
254                         cancel()
255                 }()
256         }
257         cc.Close()
258         wg.Wait()
259 }
260
261 func TestNewAddressWhileBlocking(t *testing.T) {
262         defer leakcheck.Check(t)
263         r, cleanup := manual.GenerateAndRegisterManualResolver()
264         defer cleanup()
265
266         test, err := startTestServers(1)
267         if err != nil {
268                 t.Fatalf("failed to start servers: %v", err)
269         }
270         defer test.cleanup()
271
272         cc, err := grpc.Dial(r.Scheme()+":///test.server", grpc.WithInsecure(), grpc.WithBalancerBuilder(rr))
273         if err != nil {
274                 t.Fatalf("failed to dial: %v", err)
275         }
276         defer cc.Close()
277         testc := testpb.NewTestServiceClient(cc)
278         // The first RPC should fail because there's no address.
279         ctx, cancel := context.WithTimeout(context.Background(), time.Millisecond)
280         defer cancel()
281         if _, err := testc.EmptyCall(ctx, &testpb.Empty{}); err == nil || grpc.Code(err) != codes.DeadlineExceeded {
282                 t.Fatalf("EmptyCall() = _, %v, want _, DeadlineExceeded", err)
283         }
284
285         r.NewAddress([]resolver.Address{{Addr: test.addresses[0]}})
286         // The second RPC should succeed.
287         ctx, cancel = context.WithTimeout(context.Background(), 2*time.Second)
288         defer cancel()
289         if _, err := testc.EmptyCall(ctx, &testpb.Empty{}); err != nil {
290                 t.Fatalf("EmptyCall() = _, %v, want _, nil", err)
291         }
292
293         r.NewAddress([]resolver.Address{})
294
295         var wg sync.WaitGroup
296         for i := 0; i < 3; i++ {
297                 wg.Add(1)
298                 go func() {
299                         defer wg.Done()
300                         // This RPC blocks until NewAddress is called.
301                         testc.EmptyCall(context.Background(), &testpb.Empty{})
302                 }()
303         }
304         time.Sleep(50 * time.Millisecond)
305         r.NewAddress([]resolver.Address{{Addr: test.addresses[0]}})
306         wg.Wait()
307 }
308
309 func TestOneServerDown(t *testing.T) {
310         defer leakcheck.Check(t)
311         r, cleanup := manual.GenerateAndRegisterManualResolver()
312         defer cleanup()
313
314         backendCount := 3
315         test, err := startTestServers(backendCount)
316         if err != nil {
317                 t.Fatalf("failed to start servers: %v", err)
318         }
319         defer test.cleanup()
320
321         cc, err := grpc.Dial(r.Scheme()+":///test.server", grpc.WithInsecure(), grpc.WithBalancerBuilder(rr))
322         if err != nil {
323                 t.Fatalf("failed to dial: %v", err)
324         }
325         defer cc.Close()
326         testc := testpb.NewTestServiceClient(cc)
327         // The first RPC should fail because there's no address.
328         ctx, cancel := context.WithTimeout(context.Background(), time.Millisecond)
329         defer cancel()
330         if _, err := testc.EmptyCall(ctx, &testpb.Empty{}); err == nil || grpc.Code(err) != codes.DeadlineExceeded {
331                 t.Fatalf("EmptyCall() = _, %v, want _, DeadlineExceeded", err)
332         }
333
334         var resolvedAddrs []resolver.Address
335         for i := 0; i < backendCount; i++ {
336                 resolvedAddrs = append(resolvedAddrs, resolver.Address{Addr: test.addresses[i]})
337         }
338
339         r.NewAddress(resolvedAddrs)
340         var p peer.Peer
341         // Make sure connections to all servers are up.
342         for si := 0; si < backendCount; si++ {
343                 var connected bool
344                 for i := 0; i < 1000; i++ {
345                         if _, err := testc.EmptyCall(context.Background(), &testpb.Empty{}, grpc.Peer(&p)); err != nil {
346                                 t.Fatalf("EmptyCall() = _, %v, want _, <nil>", err)
347                         }
348                         if p.Addr.String() == test.addresses[si] {
349                                 connected = true
350                                 break
351                         }
352                         time.Sleep(time.Millisecond)
353                 }
354                 if !connected {
355                         t.Fatalf("Connection to %v was not up after more than 1 second", test.addresses[si])
356                 }
357         }
358
359         for i := 0; i < 3*backendCount; i++ {
360                 if _, err := testc.EmptyCall(context.Background(), &testpb.Empty{}, grpc.Peer(&p)); err != nil {
361                         t.Fatalf("EmptyCall() = _, %v, want _, <nil>", err)
362                 }
363                 if p.Addr.String() != test.addresses[i%backendCount] {
364                         t.Fatalf("Index %d: want peer %v, got peer %v", i, test.addresses[i%backendCount], p.Addr.String())
365                 }
366         }
367
368         // Stop one server, RPCs should roundrobin among the remaining servers.
369         backendCount--
370         test.servers[backendCount].Stop()
371         // Loop until see server[backendCount-1] twice without seeing server[backendCount].
372         var targetSeen int
373         for i := 0; i < 1000; i++ {
374                 if _, err := testc.EmptyCall(context.Background(), &testpb.Empty{}, grpc.Peer(&p)); err != nil {
375                         t.Logf("EmptyCall() = _, %v, want _, <nil>", err)
376                         // Due to a race, this RPC could possibly get the connection that
377                         // was closing, and this RPC may fail. Keep trying when this
378                         // happens.
379                         continue
380                 }
381                 switch p.Addr.String() {
382                 case test.addresses[backendCount-1]:
383                         targetSeen++
384                 case test.addresses[backendCount]:
385                         // Reset targetSeen if peer is server[backendCount].
386                         targetSeen = 0
387                 }
388                 // Break to make sure the last picked address is server[-1], so the following for loop won't be flaky.
389                 if targetSeen >= 2 {
390                         break
391                 }
392         }
393         if targetSeen != 2 {
394                 t.Fatal("Failed to see server[backendCount-1] twice without seeing server[backendCount]")
395         }
396         for i := 0; i < 3*backendCount; i++ {
397                 if _, err := testc.EmptyCall(context.Background(), &testpb.Empty{}, grpc.Peer(&p)); err != nil {
398                         t.Fatalf("EmptyCall() = _, %v, want _, <nil>", err)
399                 }
400                 if p.Addr.String() != test.addresses[i%backendCount] {
401                         t.Errorf("Index %d: want peer %v, got peer %v", i, test.addresses[i%backendCount], p.Addr.String())
402                 }
403         }
404 }
405
406 func TestAllServersDown(t *testing.T) {
407         defer leakcheck.Check(t)
408         r, cleanup := manual.GenerateAndRegisterManualResolver()
409         defer cleanup()
410
411         backendCount := 3
412         test, err := startTestServers(backendCount)
413         if err != nil {
414                 t.Fatalf("failed to start servers: %v", err)
415         }
416         defer test.cleanup()
417
418         cc, err := grpc.Dial(r.Scheme()+":///test.server", grpc.WithInsecure(), grpc.WithBalancerBuilder(rr))
419         if err != nil {
420                 t.Fatalf("failed to dial: %v", err)
421         }
422         defer cc.Close()
423         testc := testpb.NewTestServiceClient(cc)
424         // The first RPC should fail because there's no address.
425         ctx, cancel := context.WithTimeout(context.Background(), time.Millisecond)
426         defer cancel()
427         if _, err := testc.EmptyCall(ctx, &testpb.Empty{}); err == nil || grpc.Code(err) != codes.DeadlineExceeded {
428                 t.Fatalf("EmptyCall() = _, %v, want _, DeadlineExceeded", err)
429         }
430
431         var resolvedAddrs []resolver.Address
432         for i := 0; i < backendCount; i++ {
433                 resolvedAddrs = append(resolvedAddrs, resolver.Address{Addr: test.addresses[i]})
434         }
435
436         r.NewAddress(resolvedAddrs)
437         var p peer.Peer
438         // Make sure connections to all servers are up.
439         for si := 0; si < backendCount; si++ {
440                 var connected bool
441                 for i := 0; i < 1000; i++ {
442                         if _, err := testc.EmptyCall(context.Background(), &testpb.Empty{}, grpc.Peer(&p)); err != nil {
443                                 t.Fatalf("EmptyCall() = _, %v, want _, <nil>", err)
444                         }
445                         if p.Addr.String() == test.addresses[si] {
446                                 connected = true
447                                 break
448                         }
449                         time.Sleep(time.Millisecond)
450                 }
451                 if !connected {
452                         t.Fatalf("Connection to %v was not up after more than 1 second", test.addresses[si])
453                 }
454         }
455
456         for i := 0; i < 3*backendCount; i++ {
457                 if _, err := testc.EmptyCall(context.Background(), &testpb.Empty{}, grpc.Peer(&p)); err != nil {
458                         t.Fatalf("EmptyCall() = _, %v, want _, <nil>", err)
459                 }
460                 if p.Addr.String() != test.addresses[i%backendCount] {
461                         t.Fatalf("Index %d: want peer %v, got peer %v", i, test.addresses[i%backendCount], p.Addr.String())
462                 }
463         }
464
465         // All servers are stopped, failfast RPC should fail with unavailable.
466         for i := 0; i < backendCount; i++ {
467                 test.servers[i].Stop()
468         }
469         time.Sleep(100 * time.Millisecond)
470         for i := 0; i < 1000; i++ {
471                 if _, err := testc.EmptyCall(context.Background(), &testpb.Empty{}); grpc.Code(err) == codes.Unavailable {
472                         return
473                 }
474                 time.Sleep(time.Millisecond)
475         }
476         t.Fatalf("Failfast RPCs didn't fail with Unavailable after all servers are stopped")
477 }