3 * Copyright 2017 gRPC authors.
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
9 * http://www.apache.org/licenses/LICENSE-2.0
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
27 "golang.org/x/net/context"
28 "google.golang.org/grpc/codes"
29 "google.golang.org/grpc/resolver"
30 "google.golang.org/grpc/resolver/manual"
31 "google.golang.org/grpc/test/leakcheck"
34 func TestOneBackendPickfirst(t *testing.T) {
35 defer leakcheck.Check(t)
36 r, rcleanup := manual.GenerateAndRegisterManualResolver()
40 servers, _, scleanup := startServers(t, numServers, math.MaxInt32)
43 cc, err := Dial(r.Scheme()+":///test.server", WithInsecure(), WithBalancerBuilder(newPickfirstBuilder()), WithCodec(testCodec{}))
45 t.Fatalf("failed to dial: %v", err)
48 // The first RPC should fail because there's no address.
49 ctx, cancel := context.WithTimeout(context.Background(), time.Millisecond)
53 if err := Invoke(ctx, "/foo/bar", &req, &reply, cc); err == nil || Code(err) != codes.DeadlineExceeded {
54 t.Fatalf("EmptyCall() = _, %v, want _, DeadlineExceeded", err)
57 r.NewAddress([]resolver.Address{{Addr: servers[0].addr}})
58 // The second RPC should succeed.
59 for i := 0; i < 1000; i++ {
60 if err = Invoke(context.Background(), "/foo/bar", &req, &reply, cc); err != nil && ErrorDesc(err) == servers[0].port {
63 time.Sleep(time.Millisecond)
65 t.Fatalf("EmptyCall() = _, %v, want _, %v", err, servers[0].port)
68 func TestBackendsPickfirst(t *testing.T) {
69 defer leakcheck.Check(t)
70 r, rcleanup := manual.GenerateAndRegisterManualResolver()
74 servers, _, scleanup := startServers(t, numServers, math.MaxInt32)
77 cc, err := Dial(r.Scheme()+":///test.server", WithInsecure(), WithBalancerBuilder(newPickfirstBuilder()), WithCodec(testCodec{}))
79 t.Fatalf("failed to dial: %v", err)
82 // The first RPC should fail because there's no address.
83 ctx, cancel := context.WithTimeout(context.Background(), time.Millisecond)
87 if err := Invoke(ctx, "/foo/bar", &req, &reply, cc); err == nil || Code(err) != codes.DeadlineExceeded {
88 t.Fatalf("EmptyCall() = _, %v, want _, DeadlineExceeded", err)
91 r.NewAddress([]resolver.Address{{Addr: servers[0].addr}, {Addr: servers[1].addr}})
92 // The second RPC should succeed with the first server.
93 for i := 0; i < 1000; i++ {
94 if err = Invoke(context.Background(), "/foo/bar", &req, &reply, cc); err != nil && ErrorDesc(err) == servers[0].port {
97 time.Sleep(time.Millisecond)
99 t.Fatalf("EmptyCall() = _, %v, want _, %v", err, servers[0].port)
102 func TestNewAddressWhileBlockingPickfirst(t *testing.T) {
103 defer leakcheck.Check(t)
104 r, rcleanup := manual.GenerateAndRegisterManualResolver()
108 servers, _, scleanup := startServers(t, numServers, math.MaxInt32)
111 cc, err := Dial(r.Scheme()+":///test.server", WithInsecure(), WithBalancerBuilder(newPickfirstBuilder()), WithCodec(testCodec{}))
113 t.Fatalf("failed to dial: %v", err)
116 // The first RPC should fail because there's no address.
117 ctx, cancel := context.WithTimeout(context.Background(), time.Millisecond)
121 if err := Invoke(ctx, "/foo/bar", &req, &reply, cc); err == nil || Code(err) != codes.DeadlineExceeded {
122 t.Fatalf("EmptyCall() = _, %v, want _, DeadlineExceeded", err)
125 var wg sync.WaitGroup
126 for i := 0; i < 3; i++ {
130 // This RPC blocks until NewAddress is called.
131 Invoke(context.Background(), "/foo/bar", &req, &reply, cc)
134 time.Sleep(50 * time.Millisecond)
135 r.NewAddress([]resolver.Address{{Addr: servers[0].addr}})
139 func TestCloseWithPendingRPCPickfirst(t *testing.T) {
140 defer leakcheck.Check(t)
141 r, rcleanup := manual.GenerateAndRegisterManualResolver()
145 _, _, scleanup := startServers(t, numServers, math.MaxInt32)
148 cc, err := Dial(r.Scheme()+":///test.server", WithInsecure(), WithBalancerBuilder(newPickfirstBuilder()), WithCodec(testCodec{}))
150 t.Fatalf("failed to dial: %v", err)
153 // The first RPC should fail because there's no address.
154 ctx, cancel := context.WithTimeout(context.Background(), time.Millisecond)
158 if err := Invoke(ctx, "/foo/bar", &req, &reply, cc); err == nil || Code(err) != codes.DeadlineExceeded {
159 t.Fatalf("EmptyCall() = _, %v, want _, DeadlineExceeded", err)
162 var wg sync.WaitGroup
163 for i := 0; i < 3; i++ {
167 // This RPC blocks until NewAddress is called.
168 Invoke(context.Background(), "/foo/bar", &req, &reply, cc)
171 time.Sleep(50 * time.Millisecond)
176 func TestOneServerDownPickfirst(t *testing.T) {
177 defer leakcheck.Check(t)
178 r, rcleanup := manual.GenerateAndRegisterManualResolver()
182 servers, _, scleanup := startServers(t, numServers, math.MaxInt32)
185 cc, err := Dial(r.Scheme()+":///test.server", WithInsecure(), WithBalancerBuilder(newPickfirstBuilder()), WithCodec(testCodec{}))
187 t.Fatalf("failed to dial: %v", err)
190 // The first RPC should fail because there's no address.
191 ctx, cancel := context.WithTimeout(context.Background(), time.Millisecond)
195 if err := Invoke(ctx, "/foo/bar", &req, &reply, cc); err == nil || Code(err) != codes.DeadlineExceeded {
196 t.Fatalf("EmptyCall() = _, %v, want _, DeadlineExceeded", err)
199 r.NewAddress([]resolver.Address{{Addr: servers[0].addr}, {Addr: servers[1].addr}})
200 // The second RPC should succeed with the first server.
201 for i := 0; i < 1000; i++ {
202 if err = Invoke(context.Background(), "/foo/bar", &req, &reply, cc); err != nil && ErrorDesc(err) == servers[0].port {
205 time.Sleep(time.Millisecond)
209 for i := 0; i < 1000; i++ {
210 if err = Invoke(context.Background(), "/foo/bar", &req, &reply, cc); err != nil && ErrorDesc(err) == servers[1].port {
213 time.Sleep(time.Millisecond)
215 t.Fatalf("EmptyCall() = _, %v, want _, %v", err, servers[0].port)
218 func TestAllServersDownPickfirst(t *testing.T) {
219 defer leakcheck.Check(t)
220 r, rcleanup := manual.GenerateAndRegisterManualResolver()
224 servers, _, scleanup := startServers(t, numServers, math.MaxInt32)
227 cc, err := Dial(r.Scheme()+":///test.server", WithInsecure(), WithBalancerBuilder(newPickfirstBuilder()), WithCodec(testCodec{}))
229 t.Fatalf("failed to dial: %v", err)
232 // The first RPC should fail because there's no address.
233 ctx, cancel := context.WithTimeout(context.Background(), time.Millisecond)
237 if err := Invoke(ctx, "/foo/bar", &req, &reply, cc); err == nil || Code(err) != codes.DeadlineExceeded {
238 t.Fatalf("EmptyCall() = _, %v, want _, DeadlineExceeded", err)
241 r.NewAddress([]resolver.Address{{Addr: servers[0].addr}, {Addr: servers[1].addr}})
242 // The second RPC should succeed with the first server.
243 for i := 0; i < 1000; i++ {
244 if err = Invoke(context.Background(), "/foo/bar", &req, &reply, cc); err != nil && ErrorDesc(err) == servers[0].port {
247 time.Sleep(time.Millisecond)
250 for i := 0; i < numServers; i++ {
253 for i := 0; i < 1000; i++ {
254 if err = Invoke(context.Background(), "/foo/bar", &req, &reply, cc); Code(err) == codes.Unavailable {
257 time.Sleep(time.Millisecond)
259 t.Fatalf("EmptyCall() = _, %v, want _, error with code unavailable", err)
262 func TestAddressesRemovedPickfirst(t *testing.T) {
263 defer leakcheck.Check(t)
264 r, rcleanup := manual.GenerateAndRegisterManualResolver()
268 servers, _, scleanup := startServers(t, numServers, math.MaxInt32)
271 cc, err := Dial(r.Scheme()+":///test.server", WithInsecure(), WithBalancerBuilder(newPickfirstBuilder()), WithCodec(testCodec{}))
273 t.Fatalf("failed to dial: %v", err)
276 // The first RPC should fail because there's no address.
277 ctx, cancel := context.WithTimeout(context.Background(), time.Millisecond)
281 if err := Invoke(ctx, "/foo/bar", &req, &reply, cc); err == nil || Code(err) != codes.DeadlineExceeded {
282 t.Fatalf("EmptyCall() = _, %v, want _, DeadlineExceeded", err)
285 r.NewAddress([]resolver.Address{{Addr: servers[0].addr}, {Addr: servers[1].addr}, {Addr: servers[2].addr}})
286 for i := 0; i < 1000; i++ {
287 if err = Invoke(context.Background(), "/foo/bar", &req, &reply, cc); err != nil && ErrorDesc(err) == servers[0].port {
290 time.Sleep(time.Millisecond)
292 for i := 0; i < 20; i++ {
293 if err := Invoke(context.Background(), "/foo/bar", &req, &reply, cc); err == nil || ErrorDesc(err) != servers[0].port {
294 t.Fatalf("Index %d: Invoke(_, _, _, _, _) = %v, want %s", 0, err, servers[0].port)
296 time.Sleep(10 * time.Millisecond)
300 r.NewAddress([]resolver.Address{{Addr: servers[1].addr}, {Addr: servers[2].addr}})
301 for i := 0; i < 1000; i++ {
302 if err = Invoke(context.Background(), "/foo/bar", &req, &reply, cc); err != nil && ErrorDesc(err) == servers[1].port {
305 time.Sleep(time.Millisecond)
307 for i := 0; i < 20; i++ {
308 if err := Invoke(context.Background(), "/foo/bar", &req, &reply, cc); err == nil || ErrorDesc(err) != servers[1].port {
309 t.Fatalf("Index %d: Invoke(_, _, _, _, _) = %v, want %s", 1, err, servers[1].port)
311 time.Sleep(10 * time.Millisecond)
314 // Append server[0], nothing should change.
315 r.NewAddress([]resolver.Address{{Addr: servers[1].addr}, {Addr: servers[2].addr}, {Addr: servers[0].addr}})
316 for i := 0; i < 20; i++ {
317 if err := Invoke(context.Background(), "/foo/bar", &req, &reply, cc); err == nil || ErrorDesc(err) != servers[1].port {
318 t.Fatalf("Index %d: Invoke(_, _, _, _, _) = %v, want %s", 1, err, servers[1].port)
320 time.Sleep(10 * time.Millisecond)
324 r.NewAddress([]resolver.Address{{Addr: servers[2].addr}, {Addr: servers[0].addr}})
325 for i := 0; i < 1000; i++ {
326 if err = Invoke(context.Background(), "/foo/bar", &req, &reply, cc); err != nil && ErrorDesc(err) == servers[2].port {
329 time.Sleep(time.Millisecond)
331 for i := 0; i < 20; i++ {
332 if err := Invoke(context.Background(), "/foo/bar", &req, &reply, cc); err == nil || ErrorDesc(err) != servers[2].port {
333 t.Fatalf("Index %d: Invoke(_, _, _, _, _) = %v, want %s", 2, err, servers[2].port)
335 time.Sleep(10 * time.Millisecond)
339 r.NewAddress([]resolver.Address{{Addr: servers[0].addr}})
340 for i := 0; i < 1000; i++ {
341 if err = Invoke(context.Background(), "/foo/bar", &req, &reply, cc); err != nil && ErrorDesc(err) == servers[0].port {
344 time.Sleep(time.Millisecond)
346 for i := 0; i < 20; i++ {
347 if err := Invoke(context.Background(), "/foo/bar", &req, &reply, cc); err == nil || ErrorDesc(err) != servers[0].port {
348 t.Fatalf("Index %d: Invoke(_, _, _, _, _) = %v, want %s", 0, err, servers[0].port)
350 time.Sleep(10 * time.Millisecond)