OSDN Git Service

new repo
[bytom/vapor.git] / vendor / google.golang.org / grpc / pickfirst_test.go
1 /*
2  *
3  * Copyright 2017 gRPC authors.
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  *     http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  *
17  */
18
19 package grpc
20
21 import (
22         "math"
23         "sync"
24         "testing"
25         "time"
26
27         "golang.org/x/net/context"
28         "google.golang.org/grpc/codes"
29         "google.golang.org/grpc/resolver"
30         "google.golang.org/grpc/resolver/manual"
31         "google.golang.org/grpc/test/leakcheck"
32 )
33
34 func TestOneBackendPickfirst(t *testing.T) {
35         defer leakcheck.Check(t)
36         r, rcleanup := manual.GenerateAndRegisterManualResolver()
37         defer rcleanup()
38
39         numServers := 1
40         servers, _, scleanup := startServers(t, numServers, math.MaxInt32)
41         defer scleanup()
42
43         cc, err := Dial(r.Scheme()+":///test.server", WithInsecure(), WithBalancerBuilder(newPickfirstBuilder()), WithCodec(testCodec{}))
44         if err != nil {
45                 t.Fatalf("failed to dial: %v", err)
46         }
47         defer cc.Close()
48         // The first RPC should fail because there's no address.
49         ctx, cancel := context.WithTimeout(context.Background(), time.Millisecond)
50         defer cancel()
51         req := "port"
52         var reply string
53         if err := Invoke(ctx, "/foo/bar", &req, &reply, cc); err == nil || Code(err) != codes.DeadlineExceeded {
54                 t.Fatalf("EmptyCall() = _, %v, want _, DeadlineExceeded", err)
55         }
56
57         r.NewAddress([]resolver.Address{{Addr: servers[0].addr}})
58         // The second RPC should succeed.
59         for i := 0; i < 1000; i++ {
60                 if err = Invoke(context.Background(), "/foo/bar", &req, &reply, cc); err != nil && ErrorDesc(err) == servers[0].port {
61                         return
62                 }
63                 time.Sleep(time.Millisecond)
64         }
65         t.Fatalf("EmptyCall() = _, %v, want _, %v", err, servers[0].port)
66 }
67
68 func TestBackendsPickfirst(t *testing.T) {
69         defer leakcheck.Check(t)
70         r, rcleanup := manual.GenerateAndRegisterManualResolver()
71         defer rcleanup()
72
73         numServers := 2
74         servers, _, scleanup := startServers(t, numServers, math.MaxInt32)
75         defer scleanup()
76
77         cc, err := Dial(r.Scheme()+":///test.server", WithInsecure(), WithBalancerBuilder(newPickfirstBuilder()), WithCodec(testCodec{}))
78         if err != nil {
79                 t.Fatalf("failed to dial: %v", err)
80         }
81         defer cc.Close()
82         // The first RPC should fail because there's no address.
83         ctx, cancel := context.WithTimeout(context.Background(), time.Millisecond)
84         defer cancel()
85         req := "port"
86         var reply string
87         if err := Invoke(ctx, "/foo/bar", &req, &reply, cc); err == nil || Code(err) != codes.DeadlineExceeded {
88                 t.Fatalf("EmptyCall() = _, %v, want _, DeadlineExceeded", err)
89         }
90
91         r.NewAddress([]resolver.Address{{Addr: servers[0].addr}, {Addr: servers[1].addr}})
92         // The second RPC should succeed with the first server.
93         for i := 0; i < 1000; i++ {
94                 if err = Invoke(context.Background(), "/foo/bar", &req, &reply, cc); err != nil && ErrorDesc(err) == servers[0].port {
95                         return
96                 }
97                 time.Sleep(time.Millisecond)
98         }
99         t.Fatalf("EmptyCall() = _, %v, want _, %v", err, servers[0].port)
100 }
101
102 func TestNewAddressWhileBlockingPickfirst(t *testing.T) {
103         defer leakcheck.Check(t)
104         r, rcleanup := manual.GenerateAndRegisterManualResolver()
105         defer rcleanup()
106
107         numServers := 1
108         servers, _, scleanup := startServers(t, numServers, math.MaxInt32)
109         defer scleanup()
110
111         cc, err := Dial(r.Scheme()+":///test.server", WithInsecure(), WithBalancerBuilder(newPickfirstBuilder()), WithCodec(testCodec{}))
112         if err != nil {
113                 t.Fatalf("failed to dial: %v", err)
114         }
115         defer cc.Close()
116         // The first RPC should fail because there's no address.
117         ctx, cancel := context.WithTimeout(context.Background(), time.Millisecond)
118         defer cancel()
119         req := "port"
120         var reply string
121         if err := Invoke(ctx, "/foo/bar", &req, &reply, cc); err == nil || Code(err) != codes.DeadlineExceeded {
122                 t.Fatalf("EmptyCall() = _, %v, want _, DeadlineExceeded", err)
123         }
124
125         var wg sync.WaitGroup
126         for i := 0; i < 3; i++ {
127                 wg.Add(1)
128                 go func() {
129                         defer wg.Done()
130                         // This RPC blocks until NewAddress is called.
131                         Invoke(context.Background(), "/foo/bar", &req, &reply, cc)
132                 }()
133         }
134         time.Sleep(50 * time.Millisecond)
135         r.NewAddress([]resolver.Address{{Addr: servers[0].addr}})
136         wg.Wait()
137 }
138
139 func TestCloseWithPendingRPCPickfirst(t *testing.T) {
140         defer leakcheck.Check(t)
141         r, rcleanup := manual.GenerateAndRegisterManualResolver()
142         defer rcleanup()
143
144         numServers := 1
145         _, _, scleanup := startServers(t, numServers, math.MaxInt32)
146         defer scleanup()
147
148         cc, err := Dial(r.Scheme()+":///test.server", WithInsecure(), WithBalancerBuilder(newPickfirstBuilder()), WithCodec(testCodec{}))
149         if err != nil {
150                 t.Fatalf("failed to dial: %v", err)
151         }
152         defer cc.Close()
153         // The first RPC should fail because there's no address.
154         ctx, cancel := context.WithTimeout(context.Background(), time.Millisecond)
155         defer cancel()
156         req := "port"
157         var reply string
158         if err := Invoke(ctx, "/foo/bar", &req, &reply, cc); err == nil || Code(err) != codes.DeadlineExceeded {
159                 t.Fatalf("EmptyCall() = _, %v, want _, DeadlineExceeded", err)
160         }
161
162         var wg sync.WaitGroup
163         for i := 0; i < 3; i++ {
164                 wg.Add(1)
165                 go func() {
166                         defer wg.Done()
167                         // This RPC blocks until NewAddress is called.
168                         Invoke(context.Background(), "/foo/bar", &req, &reply, cc)
169                 }()
170         }
171         time.Sleep(50 * time.Millisecond)
172         cc.Close()
173         wg.Wait()
174 }
175
176 func TestOneServerDownPickfirst(t *testing.T) {
177         defer leakcheck.Check(t)
178         r, rcleanup := manual.GenerateAndRegisterManualResolver()
179         defer rcleanup()
180
181         numServers := 2
182         servers, _, scleanup := startServers(t, numServers, math.MaxInt32)
183         defer scleanup()
184
185         cc, err := Dial(r.Scheme()+":///test.server", WithInsecure(), WithBalancerBuilder(newPickfirstBuilder()), WithCodec(testCodec{}))
186         if err != nil {
187                 t.Fatalf("failed to dial: %v", err)
188         }
189         defer cc.Close()
190         // The first RPC should fail because there's no address.
191         ctx, cancel := context.WithTimeout(context.Background(), time.Millisecond)
192         defer cancel()
193         req := "port"
194         var reply string
195         if err := Invoke(ctx, "/foo/bar", &req, &reply, cc); err == nil || Code(err) != codes.DeadlineExceeded {
196                 t.Fatalf("EmptyCall() = _, %v, want _, DeadlineExceeded", err)
197         }
198
199         r.NewAddress([]resolver.Address{{Addr: servers[0].addr}, {Addr: servers[1].addr}})
200         // The second RPC should succeed with the first server.
201         for i := 0; i < 1000; i++ {
202                 if err = Invoke(context.Background(), "/foo/bar", &req, &reply, cc); err != nil && ErrorDesc(err) == servers[0].port {
203                         break
204                 }
205                 time.Sleep(time.Millisecond)
206         }
207
208         servers[0].stop()
209         for i := 0; i < 1000; i++ {
210                 if err = Invoke(context.Background(), "/foo/bar", &req, &reply, cc); err != nil && ErrorDesc(err) == servers[1].port {
211                         return
212                 }
213                 time.Sleep(time.Millisecond)
214         }
215         t.Fatalf("EmptyCall() = _, %v, want _, %v", err, servers[0].port)
216 }
217
218 func TestAllServersDownPickfirst(t *testing.T) {
219         defer leakcheck.Check(t)
220         r, rcleanup := manual.GenerateAndRegisterManualResolver()
221         defer rcleanup()
222
223         numServers := 2
224         servers, _, scleanup := startServers(t, numServers, math.MaxInt32)
225         defer scleanup()
226
227         cc, err := Dial(r.Scheme()+":///test.server", WithInsecure(), WithBalancerBuilder(newPickfirstBuilder()), WithCodec(testCodec{}))
228         if err != nil {
229                 t.Fatalf("failed to dial: %v", err)
230         }
231         defer cc.Close()
232         // The first RPC should fail because there's no address.
233         ctx, cancel := context.WithTimeout(context.Background(), time.Millisecond)
234         defer cancel()
235         req := "port"
236         var reply string
237         if err := Invoke(ctx, "/foo/bar", &req, &reply, cc); err == nil || Code(err) != codes.DeadlineExceeded {
238                 t.Fatalf("EmptyCall() = _, %v, want _, DeadlineExceeded", err)
239         }
240
241         r.NewAddress([]resolver.Address{{Addr: servers[0].addr}, {Addr: servers[1].addr}})
242         // The second RPC should succeed with the first server.
243         for i := 0; i < 1000; i++ {
244                 if err = Invoke(context.Background(), "/foo/bar", &req, &reply, cc); err != nil && ErrorDesc(err) == servers[0].port {
245                         break
246                 }
247                 time.Sleep(time.Millisecond)
248         }
249
250         for i := 0; i < numServers; i++ {
251                 servers[i].stop()
252         }
253         for i := 0; i < 1000; i++ {
254                 if err = Invoke(context.Background(), "/foo/bar", &req, &reply, cc); Code(err) == codes.Unavailable {
255                         return
256                 }
257                 time.Sleep(time.Millisecond)
258         }
259         t.Fatalf("EmptyCall() = _, %v, want _, error with code unavailable", err)
260 }
261
262 func TestAddressesRemovedPickfirst(t *testing.T) {
263         defer leakcheck.Check(t)
264         r, rcleanup := manual.GenerateAndRegisterManualResolver()
265         defer rcleanup()
266
267         numServers := 3
268         servers, _, scleanup := startServers(t, numServers, math.MaxInt32)
269         defer scleanup()
270
271         cc, err := Dial(r.Scheme()+":///test.server", WithInsecure(), WithBalancerBuilder(newPickfirstBuilder()), WithCodec(testCodec{}))
272         if err != nil {
273                 t.Fatalf("failed to dial: %v", err)
274         }
275         defer cc.Close()
276         // The first RPC should fail because there's no address.
277         ctx, cancel := context.WithTimeout(context.Background(), time.Millisecond)
278         defer cancel()
279         req := "port"
280         var reply string
281         if err := Invoke(ctx, "/foo/bar", &req, &reply, cc); err == nil || Code(err) != codes.DeadlineExceeded {
282                 t.Fatalf("EmptyCall() = _, %v, want _, DeadlineExceeded", err)
283         }
284
285         r.NewAddress([]resolver.Address{{Addr: servers[0].addr}, {Addr: servers[1].addr}, {Addr: servers[2].addr}})
286         for i := 0; i < 1000; i++ {
287                 if err = Invoke(context.Background(), "/foo/bar", &req, &reply, cc); err != nil && ErrorDesc(err) == servers[0].port {
288                         break
289                 }
290                 time.Sleep(time.Millisecond)
291         }
292         for i := 0; i < 20; i++ {
293                 if err := Invoke(context.Background(), "/foo/bar", &req, &reply, cc); err == nil || ErrorDesc(err) != servers[0].port {
294                         t.Fatalf("Index %d: Invoke(_, _, _, _, _) = %v, want %s", 0, err, servers[0].port)
295                 }
296                 time.Sleep(10 * time.Millisecond)
297         }
298
299         // Remove server[0].
300         r.NewAddress([]resolver.Address{{Addr: servers[1].addr}, {Addr: servers[2].addr}})
301         for i := 0; i < 1000; i++ {
302                 if err = Invoke(context.Background(), "/foo/bar", &req, &reply, cc); err != nil && ErrorDesc(err) == servers[1].port {
303                         break
304                 }
305                 time.Sleep(time.Millisecond)
306         }
307         for i := 0; i < 20; i++ {
308                 if err := Invoke(context.Background(), "/foo/bar", &req, &reply, cc); err == nil || ErrorDesc(err) != servers[1].port {
309                         t.Fatalf("Index %d: Invoke(_, _, _, _, _) = %v, want %s", 1, err, servers[1].port)
310                 }
311                 time.Sleep(10 * time.Millisecond)
312         }
313
314         // Append server[0], nothing should change.
315         r.NewAddress([]resolver.Address{{Addr: servers[1].addr}, {Addr: servers[2].addr}, {Addr: servers[0].addr}})
316         for i := 0; i < 20; i++ {
317                 if err := Invoke(context.Background(), "/foo/bar", &req, &reply, cc); err == nil || ErrorDesc(err) != servers[1].port {
318                         t.Fatalf("Index %d: Invoke(_, _, _, _, _) = %v, want %s", 1, err, servers[1].port)
319                 }
320                 time.Sleep(10 * time.Millisecond)
321         }
322
323         // Remove server[1].
324         r.NewAddress([]resolver.Address{{Addr: servers[2].addr}, {Addr: servers[0].addr}})
325         for i := 0; i < 1000; i++ {
326                 if err = Invoke(context.Background(), "/foo/bar", &req, &reply, cc); err != nil && ErrorDesc(err) == servers[2].port {
327                         break
328                 }
329                 time.Sleep(time.Millisecond)
330         }
331         for i := 0; i < 20; i++ {
332                 if err := Invoke(context.Background(), "/foo/bar", &req, &reply, cc); err == nil || ErrorDesc(err) != servers[2].port {
333                         t.Fatalf("Index %d: Invoke(_, _, _, _, _) = %v, want %s", 2, err, servers[2].port)
334                 }
335                 time.Sleep(10 * time.Millisecond)
336         }
337
338         // Remove server[2].
339         r.NewAddress([]resolver.Address{{Addr: servers[0].addr}})
340         for i := 0; i < 1000; i++ {
341                 if err = Invoke(context.Background(), "/foo/bar", &req, &reply, cc); err != nil && ErrorDesc(err) == servers[0].port {
342                         break
343                 }
344                 time.Sleep(time.Millisecond)
345         }
346         for i := 0; i < 20; i++ {
347                 if err := Invoke(context.Background(), "/foo/bar", &req, &reply, cc); err == nil || ErrorDesc(err) != servers[0].port {
348                         t.Fatalf("Index %d: Invoke(_, _, _, _, _) = %v, want %s", 0, err, servers[0].port)
349                 }
350                 time.Sleep(10 * time.Millisecond)
351         }
352 }