OSDN Git Service

new repo
[bytom/vapor.git] / vendor / google.golang.org / grpc / balancer_test.go
1 /*
2  *
3  * Copyright 2016 gRPC authors.
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  *     http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  *
17  */
18
19 package grpc
20
21 import (
22         "fmt"
23         "math"
24         "strconv"
25         "sync"
26         "testing"
27         "time"
28
29         "golang.org/x/net/context"
30         "google.golang.org/grpc/codes"
31         _ "google.golang.org/grpc/grpclog/glogger"
32         "google.golang.org/grpc/naming"
33         "google.golang.org/grpc/test/leakcheck"
34
35         // V1 balancer tests use passthrough resolver instead of dns.
36         // TODO(bar) remove this when removing v1 balaner entirely.
37         _ "google.golang.org/grpc/resolver/passthrough"
38 )
39
40 type testWatcher struct {
41         // the channel to receives name resolution updates
42         update chan *naming.Update
43         // the side channel to get to know how many updates in a batch
44         side chan int
45         // the channel to notifiy update injector that the update reading is done
46         readDone chan int
47 }
48
49 func (w *testWatcher) Next() (updates []*naming.Update, err error) {
50         n := <-w.side
51         if n == 0 {
52                 return nil, fmt.Errorf("w.side is closed")
53         }
54         for i := 0; i < n; i++ {
55                 u := <-w.update
56                 if u != nil {
57                         updates = append(updates, u)
58                 }
59         }
60         w.readDone <- 0
61         return
62 }
63
64 func (w *testWatcher) Close() {
65         close(w.side)
66 }
67
68 // Inject naming resolution updates to the testWatcher.
69 func (w *testWatcher) inject(updates []*naming.Update) {
70         w.side <- len(updates)
71         for _, u := range updates {
72                 w.update <- u
73         }
74         <-w.readDone
75 }
76
77 type testNameResolver struct {
78         w    *testWatcher
79         addr string
80 }
81
82 func (r *testNameResolver) Resolve(target string) (naming.Watcher, error) {
83         r.w = &testWatcher{
84                 update:   make(chan *naming.Update, 1),
85                 side:     make(chan int, 1),
86                 readDone: make(chan int),
87         }
88         r.w.side <- 1
89         r.w.update <- &naming.Update{
90                 Op:   naming.Add,
91                 Addr: r.addr,
92         }
93         go func() {
94                 <-r.w.readDone
95         }()
96         return r.w, nil
97 }
98
99 func startServers(t *testing.T, numServers int, maxStreams uint32) ([]*server, *testNameResolver, func()) {
100         var servers []*server
101         for i := 0; i < numServers; i++ {
102                 s := newTestServer()
103                 servers = append(servers, s)
104                 go s.start(t, 0, maxStreams)
105                 s.wait(t, 2*time.Second)
106         }
107         // Point to server[0]
108         addr := "localhost:" + servers[0].port
109         return servers, &testNameResolver{
110                         addr: addr,
111                 }, func() {
112                         for i := 0; i < numServers; i++ {
113                                 servers[i].stop()
114                         }
115                 }
116 }
117
118 func TestNameDiscovery(t *testing.T) {
119         defer leakcheck.Check(t)
120         // Start 2 servers on 2 ports.
121         numServers := 2
122         servers, r, cleanup := startServers(t, numServers, math.MaxUint32)
123         defer cleanup()
124         cc, err := Dial("passthrough:///foo.bar.com", WithBalancer(RoundRobin(r)), WithBlock(), WithInsecure(), WithCodec(testCodec{}))
125         if err != nil {
126                 t.Fatalf("Failed to create ClientConn: %v", err)
127         }
128         defer cc.Close()
129         req := "port"
130         var reply string
131         if err := Invoke(context.Background(), "/foo/bar", &req, &reply, cc); err == nil || ErrorDesc(err) != servers[0].port {
132                 t.Fatalf("grpc.Invoke(_, _, _, _, _) = %v, want %s", err, servers[0].port)
133         }
134         // Inject the name resolution change to remove servers[0] and add servers[1].
135         var updates []*naming.Update
136         updates = append(updates, &naming.Update{
137                 Op:   naming.Delete,
138                 Addr: "localhost:" + servers[0].port,
139         })
140         updates = append(updates, &naming.Update{
141                 Op:   naming.Add,
142                 Addr: "localhost:" + servers[1].port,
143         })
144         r.w.inject(updates)
145         // Loop until the rpcs in flight talks to servers[1].
146         for {
147                 if err := Invoke(context.Background(), "/foo/bar", &req, &reply, cc); err != nil && ErrorDesc(err) == servers[1].port {
148                         break
149                 }
150                 time.Sleep(10 * time.Millisecond)
151         }
152 }
153
154 func TestEmptyAddrs(t *testing.T) {
155         defer leakcheck.Check(t)
156         servers, r, cleanup := startServers(t, 1, math.MaxUint32)
157         defer cleanup()
158         cc, err := Dial("passthrough:///foo.bar.com", WithBalancer(RoundRobin(r)), WithBlock(), WithInsecure(), WithCodec(testCodec{}))
159         if err != nil {
160                 t.Fatalf("Failed to create ClientConn: %v", err)
161         }
162         defer cc.Close()
163         var reply string
164         if err := Invoke(context.Background(), "/foo/bar", &expectedRequest, &reply, cc); err != nil || reply != expectedResponse {
165                 t.Fatalf("grpc.Invoke(_, _, _, _, _) = %v, reply = %q, want %q, <nil>", err, reply, expectedResponse)
166         }
167         // Inject name resolution change to remove the server so that there is no address
168         // available after that.
169         u := &naming.Update{
170                 Op:   naming.Delete,
171                 Addr: "localhost:" + servers[0].port,
172         }
173         r.w.inject([]*naming.Update{u})
174         // Loop until the above updates apply.
175         for {
176                 time.Sleep(10 * time.Millisecond)
177                 ctx, cancel := context.WithTimeout(context.Background(), 10*time.Millisecond)
178                 if err := Invoke(ctx, "/foo/bar", &expectedRequest, &reply, cc); err != nil {
179                         cancel()
180                         break
181                 }
182                 cancel()
183         }
184 }
185
186 func TestRoundRobin(t *testing.T) {
187         defer leakcheck.Check(t)
188         // Start 3 servers on 3 ports.
189         numServers := 3
190         servers, r, cleanup := startServers(t, numServers, math.MaxUint32)
191         defer cleanup()
192         cc, err := Dial("passthrough:///foo.bar.com", WithBalancer(RoundRobin(r)), WithBlock(), WithInsecure(), WithCodec(testCodec{}))
193         if err != nil {
194                 t.Fatalf("Failed to create ClientConn: %v", err)
195         }
196         defer cc.Close()
197         // Add servers[1] to the service discovery.
198         u := &naming.Update{
199                 Op:   naming.Add,
200                 Addr: "localhost:" + servers[1].port,
201         }
202         r.w.inject([]*naming.Update{u})
203         req := "port"
204         var reply string
205         // Loop until servers[1] is up
206         for {
207                 if err := Invoke(context.Background(), "/foo/bar", &req, &reply, cc); err != nil && ErrorDesc(err) == servers[1].port {
208                         break
209                 }
210                 time.Sleep(10 * time.Millisecond)
211         }
212         // Add server2[2] to the service discovery.
213         u = &naming.Update{
214                 Op:   naming.Add,
215                 Addr: "localhost:" + servers[2].port,
216         }
217         r.w.inject([]*naming.Update{u})
218         // Loop until both servers[2] are up.
219         for {
220                 if err := Invoke(context.Background(), "/foo/bar", &req, &reply, cc); err != nil && ErrorDesc(err) == servers[2].port {
221                         break
222                 }
223                 time.Sleep(10 * time.Millisecond)
224         }
225         // Check the incoming RPCs served in a round-robin manner.
226         for i := 0; i < 10; i++ {
227                 if err := Invoke(context.Background(), "/foo/bar", &req, &reply, cc); err == nil || ErrorDesc(err) != servers[i%numServers].port {
228                         t.Fatalf("Index %d: Invoke(_, _, _, _, _) = %v, want %s", i, err, servers[i%numServers].port)
229                 }
230         }
231 }
232
233 func TestCloseWithPendingRPC(t *testing.T) {
234         defer leakcheck.Check(t)
235         servers, r, cleanup := startServers(t, 1, math.MaxUint32)
236         defer cleanup()
237         cc, err := Dial("passthrough:///foo.bar.com", WithBalancer(RoundRobin(r)), WithBlock(), WithInsecure(), WithCodec(testCodec{}))
238         if err != nil {
239                 t.Fatalf("Failed to create ClientConn: %v", err)
240         }
241         defer cc.Close()
242         var reply string
243         if err := Invoke(context.Background(), "/foo/bar", &expectedRequest, &reply, cc, FailFast(false)); err != nil {
244                 t.Fatalf("grpc.Invoke(_, _, _, _, _) = %v, want %s", err, servers[0].port)
245         }
246         // Remove the server.
247         updates := []*naming.Update{{
248                 Op:   naming.Delete,
249                 Addr: "localhost:" + servers[0].port,
250         }}
251         r.w.inject(updates)
252         // Loop until the above update applies.
253         for {
254                 ctx, cancel := context.WithTimeout(context.Background(), 10*time.Millisecond)
255                 if err := Invoke(ctx, "/foo/bar", &expectedRequest, &reply, cc, FailFast(false)); Code(err) == codes.DeadlineExceeded {
256                         cancel()
257                         break
258                 }
259                 time.Sleep(10 * time.Millisecond)
260                 cancel()
261         }
262         // Issue 2 RPCs which should be completed with error status once cc is closed.
263         var wg sync.WaitGroup
264         wg.Add(2)
265         go func() {
266                 defer wg.Done()
267                 var reply string
268                 if err := Invoke(context.Background(), "/foo/bar", &expectedRequest, &reply, cc, FailFast(false)); err == nil {
269                         t.Errorf("grpc.Invoke(_, _, _, _, _) = %v, want not nil", err)
270                 }
271         }()
272         go func() {
273                 defer wg.Done()
274                 var reply string
275                 time.Sleep(5 * time.Millisecond)
276                 if err := Invoke(context.Background(), "/foo/bar", &expectedRequest, &reply, cc, FailFast(false)); err == nil {
277                         t.Errorf("grpc.Invoke(_, _, _, _, _) = %v, want not nil", err)
278                 }
279         }()
280         time.Sleep(5 * time.Millisecond)
281         cc.Close()
282         wg.Wait()
283 }
284
285 func TestGetOnWaitChannel(t *testing.T) {
286         defer leakcheck.Check(t)
287         servers, r, cleanup := startServers(t, 1, math.MaxUint32)
288         defer cleanup()
289         cc, err := Dial("passthrough:///foo.bar.com", WithBalancer(RoundRobin(r)), WithBlock(), WithInsecure(), WithCodec(testCodec{}))
290         if err != nil {
291                 t.Fatalf("Failed to create ClientConn: %v", err)
292         }
293         defer cc.Close()
294         // Remove all servers so that all upcoming RPCs will block on waitCh.
295         updates := []*naming.Update{{
296                 Op:   naming.Delete,
297                 Addr: "localhost:" + servers[0].port,
298         }}
299         r.w.inject(updates)
300         for {
301                 var reply string
302                 ctx, cancel := context.WithTimeout(context.Background(), 10*time.Millisecond)
303                 if err := Invoke(ctx, "/foo/bar", &expectedRequest, &reply, cc, FailFast(false)); Code(err) == codes.DeadlineExceeded {
304                         cancel()
305                         break
306                 }
307                 cancel()
308                 time.Sleep(10 * time.Millisecond)
309         }
310         var wg sync.WaitGroup
311         wg.Add(1)
312         go func() {
313                 defer wg.Done()
314                 var reply string
315                 if err := Invoke(context.Background(), "/foo/bar", &expectedRequest, &reply, cc, FailFast(false)); err != nil {
316                         t.Errorf("grpc.Invoke(_, _, _, _, _) = %v, want <nil>", err)
317                 }
318         }()
319         // Add a connected server to get the above RPC through.
320         updates = []*naming.Update{{
321                 Op:   naming.Add,
322                 Addr: "localhost:" + servers[0].port,
323         }}
324         r.w.inject(updates)
325         // Wait until the above RPC succeeds.
326         wg.Wait()
327 }
328
329 func TestOneServerDown(t *testing.T) {
330         defer leakcheck.Check(t)
331         // Start 2 servers.
332         numServers := 2
333         servers, r, cleanup := startServers(t, numServers, math.MaxUint32)
334         defer cleanup()
335         cc, err := Dial("passthrough:///foo.bar.com", WithBalancer(RoundRobin(r)), WithBlock(), WithInsecure(), WithCodec(testCodec{}))
336         if err != nil {
337                 t.Fatalf("Failed to create ClientConn: %v", err)
338         }
339         defer cc.Close()
340         // Add servers[1] to the service discovery.
341         var updates []*naming.Update
342         updates = append(updates, &naming.Update{
343                 Op:   naming.Add,
344                 Addr: "localhost:" + servers[1].port,
345         })
346         r.w.inject(updates)
347         req := "port"
348         var reply string
349         // Loop until servers[1] is up
350         for {
351                 if err := Invoke(context.Background(), "/foo/bar", &req, &reply, cc); err != nil && ErrorDesc(err) == servers[1].port {
352                         break
353                 }
354                 time.Sleep(10 * time.Millisecond)
355         }
356
357         var wg sync.WaitGroup
358         numRPC := 100
359         sleepDuration := 10 * time.Millisecond
360         wg.Add(1)
361         go func() {
362                 time.Sleep(sleepDuration)
363                 // After sleepDuration, kill server[0].
364                 servers[0].stop()
365                 wg.Done()
366         }()
367
368         // All non-failfast RPCs should not block because there's at least one connection available.
369         for i := 0; i < numRPC; i++ {
370                 wg.Add(1)
371                 go func() {
372                         time.Sleep(sleepDuration)
373                         // After sleepDuration, invoke RPC.
374                         // server[0] is killed around the same time to make it racy between balancer and gRPC internals.
375                         Invoke(context.Background(), "/foo/bar", &req, &reply, cc, FailFast(false))
376                         wg.Done()
377                 }()
378         }
379         wg.Wait()
380 }
381
382 func TestOneAddressRemoval(t *testing.T) {
383         defer leakcheck.Check(t)
384         // Start 2 servers.
385         numServers := 2
386         servers, r, cleanup := startServers(t, numServers, math.MaxUint32)
387         defer cleanup()
388         cc, err := Dial("passthrough:///foo.bar.com", WithBalancer(RoundRobin(r)), WithBlock(), WithInsecure(), WithCodec(testCodec{}))
389         if err != nil {
390                 t.Fatalf("Failed to create ClientConn: %v", err)
391         }
392         defer cc.Close()
393         // Add servers[1] to the service discovery.
394         var updates []*naming.Update
395         updates = append(updates, &naming.Update{
396                 Op:   naming.Add,
397                 Addr: "localhost:" + servers[1].port,
398         })
399         r.w.inject(updates)
400         req := "port"
401         var reply string
402         // Loop until servers[1] is up
403         for {
404                 if err := Invoke(context.Background(), "/foo/bar", &req, &reply, cc); err != nil && ErrorDesc(err) == servers[1].port {
405                         break
406                 }
407                 time.Sleep(10 * time.Millisecond)
408         }
409
410         var wg sync.WaitGroup
411         numRPC := 100
412         sleepDuration := 10 * time.Millisecond
413         wg.Add(1)
414         go func() {
415                 time.Sleep(sleepDuration)
416                 // After sleepDuration, delete server[0].
417                 var updates []*naming.Update
418                 updates = append(updates, &naming.Update{
419                         Op:   naming.Delete,
420                         Addr: "localhost:" + servers[0].port,
421                 })
422                 r.w.inject(updates)
423                 wg.Done()
424         }()
425
426         // All non-failfast RPCs should not fail because there's at least one connection available.
427         for i := 0; i < numRPC; i++ {
428                 wg.Add(1)
429                 go func() {
430                         var reply string
431                         time.Sleep(sleepDuration)
432                         // After sleepDuration, invoke RPC.
433                         // server[0] is removed around the same time to make it racy between balancer and gRPC internals.
434                         if err := Invoke(context.Background(), "/foo/bar", &expectedRequest, &reply, cc, FailFast(false)); err != nil {
435                                 t.Errorf("grpc.Invoke(_, _, _, _, _) = %v, want not nil", err)
436                         }
437                         wg.Done()
438                 }()
439         }
440         wg.Wait()
441 }
442
443 func checkServerUp(t *testing.T, currentServer *server) {
444         req := "port"
445         port := currentServer.port
446         cc, err := Dial("passthrough:///localhost:"+port, WithBlock(), WithInsecure(), WithCodec(testCodec{}))
447         if err != nil {
448                 t.Fatalf("Failed to create ClientConn: %v", err)
449         }
450         defer cc.Close()
451         var reply string
452         for {
453                 if err := Invoke(context.Background(), "/foo/bar", &req, &reply, cc); err != nil && ErrorDesc(err) == port {
454                         break
455                 }
456                 time.Sleep(10 * time.Millisecond)
457         }
458 }
459
460 func TestPickFirstEmptyAddrs(t *testing.T) {
461         defer leakcheck.Check(t)
462         servers, r, cleanup := startServers(t, 1, math.MaxUint32)
463         defer cleanup()
464         cc, err := Dial("passthrough:///foo.bar.com", WithBalancer(pickFirstBalancerV1(r)), WithBlock(), WithInsecure(), WithCodec(testCodec{}))
465         if err != nil {
466                 t.Fatalf("Failed to create ClientConn: %v", err)
467         }
468         defer cc.Close()
469         var reply string
470         if err := Invoke(context.Background(), "/foo/bar", &expectedRequest, &reply, cc); err != nil || reply != expectedResponse {
471                 t.Fatalf("grpc.Invoke(_, _, _, _, _) = %v, reply = %q, want %q, <nil>", err, reply, expectedResponse)
472         }
473         // Inject name resolution change to remove the server so that there is no address
474         // available after that.
475         u := &naming.Update{
476                 Op:   naming.Delete,
477                 Addr: "localhost:" + servers[0].port,
478         }
479         r.w.inject([]*naming.Update{u})
480         // Loop until the above updates apply.
481         for {
482                 time.Sleep(10 * time.Millisecond)
483                 ctx, cancel := context.WithTimeout(context.Background(), 10*time.Millisecond)
484                 if err := Invoke(ctx, "/foo/bar", &expectedRequest, &reply, cc); err != nil {
485                         cancel()
486                         break
487                 }
488                 cancel()
489         }
490 }
491
492 func TestPickFirstCloseWithPendingRPC(t *testing.T) {
493         defer leakcheck.Check(t)
494         servers, r, cleanup := startServers(t, 1, math.MaxUint32)
495         defer cleanup()
496         cc, err := Dial("passthrough:///foo.bar.com", WithBalancer(pickFirstBalancerV1(r)), WithBlock(), WithInsecure(), WithCodec(testCodec{}))
497         if err != nil {
498                 t.Fatalf("Failed to create ClientConn: %v", err)
499         }
500         defer cc.Close()
501         var reply string
502         if err := Invoke(context.Background(), "/foo/bar", &expectedRequest, &reply, cc, FailFast(false)); err != nil {
503                 t.Fatalf("grpc.Invoke(_, _, _, _, _) = %v, want %s", err, servers[0].port)
504         }
505         // Remove the server.
506         updates := []*naming.Update{{
507                 Op:   naming.Delete,
508                 Addr: "localhost:" + servers[0].port,
509         }}
510         r.w.inject(updates)
511         // Loop until the above update applies.
512         for {
513                 ctx, cancel := context.WithTimeout(context.Background(), 10*time.Millisecond)
514                 if err := Invoke(ctx, "/foo/bar", &expectedRequest, &reply, cc, FailFast(false)); Code(err) == codes.DeadlineExceeded {
515                         cancel()
516                         break
517                 }
518                 time.Sleep(10 * time.Millisecond)
519                 cancel()
520         }
521         // Issue 2 RPCs which should be completed with error status once cc is closed.
522         var wg sync.WaitGroup
523         wg.Add(2)
524         go func() {
525                 defer wg.Done()
526                 var reply string
527                 if err := Invoke(context.Background(), "/foo/bar", &expectedRequest, &reply, cc, FailFast(false)); err == nil {
528                         t.Errorf("grpc.Invoke(_, _, _, _, _) = %v, want not nil", err)
529                 }
530         }()
531         go func() {
532                 defer wg.Done()
533                 var reply string
534                 time.Sleep(5 * time.Millisecond)
535                 if err := Invoke(context.Background(), "/foo/bar", &expectedRequest, &reply, cc, FailFast(false)); err == nil {
536                         t.Errorf("grpc.Invoke(_, _, _, _, _) = %v, want not nil", err)
537                 }
538         }()
539         time.Sleep(5 * time.Millisecond)
540         cc.Close()
541         wg.Wait()
542 }
543
544 func TestPickFirstOrderAllServerUp(t *testing.T) {
545         defer leakcheck.Check(t)
546         // Start 3 servers on 3 ports.
547         numServers := 3
548         servers, r, cleanup := startServers(t, numServers, math.MaxUint32)
549         defer cleanup()
550         cc, err := Dial("passthrough:///foo.bar.com", WithBalancer(pickFirstBalancerV1(r)), WithBlock(), WithInsecure(), WithCodec(testCodec{}))
551         if err != nil {
552                 t.Fatalf("Failed to create ClientConn: %v", err)
553         }
554         defer cc.Close()
555         // Add servers[1] and [2] to the service discovery.
556         u := &naming.Update{
557                 Op:   naming.Add,
558                 Addr: "localhost:" + servers[1].port,
559         }
560         r.w.inject([]*naming.Update{u})
561
562         u = &naming.Update{
563                 Op:   naming.Add,
564                 Addr: "localhost:" + servers[2].port,
565         }
566         r.w.inject([]*naming.Update{u})
567
568         // Loop until all 3 servers are up
569         checkServerUp(t, servers[0])
570         checkServerUp(t, servers[1])
571         checkServerUp(t, servers[2])
572
573         // Check the incoming RPCs served in server[0]
574         req := "port"
575         var reply string
576         for i := 0; i < 20; i++ {
577                 if err := Invoke(context.Background(), "/foo/bar", &req, &reply, cc); err == nil || ErrorDesc(err) != servers[0].port {
578                         t.Fatalf("Index %d: Invoke(_, _, _, _, _) = %v, want %s", 0, err, servers[0].port)
579                 }
580                 time.Sleep(10 * time.Millisecond)
581         }
582
583         // Delete server[0] in the balancer, the incoming RPCs served in server[1]
584         // For test addrconn, close server[0] instead
585         u = &naming.Update{
586                 Op:   naming.Delete,
587                 Addr: "localhost:" + servers[0].port,
588         }
589         r.w.inject([]*naming.Update{u})
590         // Loop until it changes to server[1]
591         for {
592                 if err := Invoke(context.Background(), "/foo/bar", &req, &reply, cc); err != nil && ErrorDesc(err) == servers[1].port {
593                         break
594                 }
595                 time.Sleep(10 * time.Millisecond)
596         }
597         for i := 0; i < 20; i++ {
598                 if err := Invoke(context.Background(), "/foo/bar", &req, &reply, cc); err == nil || ErrorDesc(err) != servers[1].port {
599                         t.Fatalf("Index %d: Invoke(_, _, _, _, _) = %v, want %s", 1, err, servers[1].port)
600                 }
601                 time.Sleep(10 * time.Millisecond)
602         }
603
604         // Add server[0] back to the balancer, the incoming RPCs served in server[1]
605         // Add is append operation, the order of Notify now is {server[1].port server[2].port server[0].port}
606         u = &naming.Update{
607                 Op:   naming.Add,
608                 Addr: "localhost:" + servers[0].port,
609         }
610         r.w.inject([]*naming.Update{u})
611         for i := 0; i < 20; i++ {
612                 if err := Invoke(context.Background(), "/foo/bar", &req, &reply, cc); err == nil || ErrorDesc(err) != servers[1].port {
613                         t.Fatalf("Index %d: Invoke(_, _, _, _, _) = %v, want %s", 1, err, servers[1].port)
614                 }
615                 time.Sleep(10 * time.Millisecond)
616         }
617
618         // Delete server[1] in the balancer, the incoming RPCs served in server[2]
619         u = &naming.Update{
620                 Op:   naming.Delete,
621                 Addr: "localhost:" + servers[1].port,
622         }
623         r.w.inject([]*naming.Update{u})
624         for {
625                 if err := Invoke(context.Background(), "/foo/bar", &req, &reply, cc); err != nil && ErrorDesc(err) == servers[2].port {
626                         break
627                 }
628                 time.Sleep(1 * time.Second)
629         }
630         for i := 0; i < 20; i++ {
631                 if err := Invoke(context.Background(), "/foo/bar", &req, &reply, cc); err == nil || ErrorDesc(err) != servers[2].port {
632                         t.Fatalf("Index %d: Invoke(_, _, _, _, _) = %v, want %s", 2, err, servers[2].port)
633                 }
634                 time.Sleep(10 * time.Millisecond)
635         }
636
637         // Delete server[2] in the balancer, the incoming RPCs served in server[0]
638         u = &naming.Update{
639                 Op:   naming.Delete,
640                 Addr: "localhost:" + servers[2].port,
641         }
642         r.w.inject([]*naming.Update{u})
643         for {
644                 if err := Invoke(context.Background(), "/foo/bar", &req, &reply, cc); err != nil && ErrorDesc(err) == servers[0].port {
645                         break
646                 }
647                 time.Sleep(1 * time.Second)
648         }
649         for i := 0; i < 20; i++ {
650                 if err := Invoke(context.Background(), "/foo/bar", &req, &reply, cc); err == nil || ErrorDesc(err) != servers[0].port {
651                         t.Fatalf("Index %d: Invoke(_, _, _, _, _) = %v, want %s", 0, err, servers[0].port)
652                 }
653                 time.Sleep(10 * time.Millisecond)
654         }
655 }
656
657 func TestPickFirstOrderOneServerDown(t *testing.T) {
658         defer leakcheck.Check(t)
659         // Start 3 servers on 3 ports.
660         numServers := 3
661         servers, r, cleanup := startServers(t, numServers, math.MaxUint32)
662         defer cleanup()
663         cc, err := Dial("passthrough:///foo.bar.com", WithBalancer(pickFirstBalancerV1(r)), WithBlock(), WithInsecure(), WithCodec(testCodec{}))
664         if err != nil {
665                 t.Fatalf("Failed to create ClientConn: %v", err)
666         }
667         defer cc.Close()
668         // Add servers[1] and [2] to the service discovery.
669         u := &naming.Update{
670                 Op:   naming.Add,
671                 Addr: "localhost:" + servers[1].port,
672         }
673         r.w.inject([]*naming.Update{u})
674
675         u = &naming.Update{
676                 Op:   naming.Add,
677                 Addr: "localhost:" + servers[2].port,
678         }
679         r.w.inject([]*naming.Update{u})
680
681         // Loop until all 3 servers are up
682         checkServerUp(t, servers[0])
683         checkServerUp(t, servers[1])
684         checkServerUp(t, servers[2])
685
686         // Check the incoming RPCs served in server[0]
687         req := "port"
688         var reply string
689         for i := 0; i < 20; i++ {
690                 if err := Invoke(context.Background(), "/foo/bar", &req, &reply, cc); err == nil || ErrorDesc(err) != servers[0].port {
691                         t.Fatalf("Index %d: Invoke(_, _, _, _, _) = %v, want %s", 0, err, servers[0].port)
692                 }
693                 time.Sleep(10 * time.Millisecond)
694         }
695
696         // server[0] down, incoming RPCs served in server[1], but the order of Notify still remains
697         // {server[0] server[1] server[2]}
698         servers[0].stop()
699         // Loop until it changes to server[1]
700         for {
701                 if err := Invoke(context.Background(), "/foo/bar", &req, &reply, cc); err != nil && ErrorDesc(err) == servers[1].port {
702                         break
703                 }
704                 time.Sleep(10 * time.Millisecond)
705         }
706         for i := 0; i < 20; i++ {
707                 if err := Invoke(context.Background(), "/foo/bar", &req, &reply, cc); err == nil || ErrorDesc(err) != servers[1].port {
708                         t.Fatalf("Index %d: Invoke(_, _, _, _, _) = %v, want %s", 1, err, servers[1].port)
709                 }
710                 time.Sleep(10 * time.Millisecond)
711         }
712
713         // up the server[0] back, the incoming RPCs served in server[1]
714         p, _ := strconv.Atoi(servers[0].port)
715         servers[0] = newTestServer()
716         go servers[0].start(t, p, math.MaxUint32)
717         defer servers[0].stop()
718         servers[0].wait(t, 2*time.Second)
719         checkServerUp(t, servers[0])
720
721         for i := 0; i < 20; i++ {
722                 if err := Invoke(context.Background(), "/foo/bar", &req, &reply, cc); err == nil || ErrorDesc(err) != servers[1].port {
723                         t.Fatalf("Index %d: Invoke(_, _, _, _, _) = %v, want %s", 1, err, servers[1].port)
724                 }
725                 time.Sleep(10 * time.Millisecond)
726         }
727
728         // Delete server[1] in the balancer, the incoming RPCs served in server[0]
729         u = &naming.Update{
730                 Op:   naming.Delete,
731                 Addr: "localhost:" + servers[1].port,
732         }
733         r.w.inject([]*naming.Update{u})
734         for {
735                 if err := Invoke(context.Background(), "/foo/bar", &req, &reply, cc); err != nil && ErrorDesc(err) == servers[0].port {
736                         break
737                 }
738                 time.Sleep(1 * time.Second)
739         }
740         for i := 0; i < 20; i++ {
741                 if err := Invoke(context.Background(), "/foo/bar", &req, &reply, cc); err == nil || ErrorDesc(err) != servers[0].port {
742                         t.Fatalf("Index %d: Invoke(_, _, _, _, _) = %v, want %s", 0, err, servers[0].port)
743                 }
744                 time.Sleep(10 * time.Millisecond)
745         }
746 }
747
748 func TestPickFirstOneAddressRemoval(t *testing.T) {
749         defer leakcheck.Check(t)
750         // Start 2 servers.
751         numServers := 2
752         servers, r, cleanup := startServers(t, numServers, math.MaxUint32)
753         defer cleanup()
754         cc, err := Dial("passthrough:///localhost:"+servers[0].port, WithBalancer(pickFirstBalancerV1(r)), WithBlock(), WithInsecure(), WithCodec(testCodec{}))
755         if err != nil {
756                 t.Fatalf("Failed to create ClientConn: %v", err)
757         }
758         defer cc.Close()
759         // Add servers[1] to the service discovery.
760         var updates []*naming.Update
761         updates = append(updates, &naming.Update{
762                 Op:   naming.Add,
763                 Addr: "localhost:" + servers[1].port,
764         })
765         r.w.inject(updates)
766
767         // Create a new cc to Loop until servers[1] is up
768         checkServerUp(t, servers[0])
769         checkServerUp(t, servers[1])
770
771         var wg sync.WaitGroup
772         numRPC := 100
773         sleepDuration := 10 * time.Millisecond
774         wg.Add(1)
775         go func() {
776                 time.Sleep(sleepDuration)
777                 // After sleepDuration, delete server[0].
778                 var updates []*naming.Update
779                 updates = append(updates, &naming.Update{
780                         Op:   naming.Delete,
781                         Addr: "localhost:" + servers[0].port,
782                 })
783                 r.w.inject(updates)
784                 wg.Done()
785         }()
786
787         // All non-failfast RPCs should not fail because there's at least one connection available.
788         for i := 0; i < numRPC; i++ {
789                 wg.Add(1)
790                 go func() {
791                         var reply string
792                         time.Sleep(sleepDuration)
793                         // After sleepDuration, invoke RPC.
794                         // server[0] is removed around the same time to make it racy between balancer and gRPC internals.
795                         if err := Invoke(context.Background(), "/foo/bar", &expectedRequest, &reply, cc, FailFast(false)); err != nil {
796                                 t.Errorf("grpc.Invoke(_, _, _, _, _) = %v, want not nil", err)
797                         }
798                         wg.Done()
799                 }()
800         }
801         wg.Wait()
802 }