OSDN Git Service

new repo
[bytom/vapor.git] / vendor / google.golang.org / grpc / grpclb.go
1 /*
2  *
3  * Copyright 2016 gRPC authors.
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  *     http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  *
17  */
18
19 package grpc
20
21 import (
22         "errors"
23         "fmt"
24         "math/rand"
25         "net"
26         "sync"
27         "time"
28
29         "golang.org/x/net/context"
30         "google.golang.org/grpc/codes"
31         lbmpb "google.golang.org/grpc/grpclb/grpc_lb_v1/messages"
32         "google.golang.org/grpc/grpclog"
33         "google.golang.org/grpc/metadata"
34         "google.golang.org/grpc/naming"
35 )
36
37 // Client API for LoadBalancer service.
38 // Mostly copied from generated pb.go file.
39 // To avoid circular dependency.
40 type loadBalancerClient struct {
41         cc *ClientConn
42 }
43
44 func (c *loadBalancerClient) BalanceLoad(ctx context.Context, opts ...CallOption) (*balanceLoadClientStream, error) {
45         desc := &StreamDesc{
46                 StreamName:    "BalanceLoad",
47                 ServerStreams: true,
48                 ClientStreams: true,
49         }
50         stream, err := NewClientStream(ctx, desc, c.cc, "/grpc.lb.v1.LoadBalancer/BalanceLoad", opts...)
51         if err != nil {
52                 return nil, err
53         }
54         x := &balanceLoadClientStream{stream}
55         return x, nil
56 }
57
58 type balanceLoadClientStream struct {
59         ClientStream
60 }
61
62 func (x *balanceLoadClientStream) Send(m *lbmpb.LoadBalanceRequest) error {
63         return x.ClientStream.SendMsg(m)
64 }
65
66 func (x *balanceLoadClientStream) Recv() (*lbmpb.LoadBalanceResponse, error) {
67         m := new(lbmpb.LoadBalanceResponse)
68         if err := x.ClientStream.RecvMsg(m); err != nil {
69                 return nil, err
70         }
71         return m, nil
72 }
73
74 // NewGRPCLBBalancer creates a grpclb load balancer.
75 func NewGRPCLBBalancer(r naming.Resolver) Balancer {
76         return &grpclbBalancer{
77                 r: r,
78         }
79 }
80
81 type remoteBalancerInfo struct {
82         addr string
83         // the server name used for authentication with the remote LB server.
84         name string
85 }
86
87 // grpclbAddrInfo consists of the information of a backend server.
88 type grpclbAddrInfo struct {
89         addr      Address
90         connected bool
91         // dropForRateLimiting indicates whether this particular request should be
92         // dropped by the client for rate limiting.
93         dropForRateLimiting bool
94         // dropForLoadBalancing indicates whether this particular request should be
95         // dropped by the client for load balancing.
96         dropForLoadBalancing bool
97 }
98
99 type grpclbBalancer struct {
100         r      naming.Resolver
101         target string
102         mu     sync.Mutex
103         seq    int // a sequence number to make sure addrCh does not get stale addresses.
104         w      naming.Watcher
105         addrCh chan []Address
106         rbs    []remoteBalancerInfo
107         addrs  []*grpclbAddrInfo
108         next   int
109         waitCh chan struct{}
110         done   bool
111         rand   *rand.Rand
112
113         clientStats lbmpb.ClientStats
114 }
115
116 func (b *grpclbBalancer) watchAddrUpdates(w naming.Watcher, ch chan []remoteBalancerInfo) error {
117         updates, err := w.Next()
118         if err != nil {
119                 grpclog.Warningf("grpclb: failed to get next addr update from watcher: %v", err)
120                 return err
121         }
122         b.mu.Lock()
123         defer b.mu.Unlock()
124         if b.done {
125                 return ErrClientConnClosing
126         }
127         for _, update := range updates {
128                 switch update.Op {
129                 case naming.Add:
130                         var exist bool
131                         for _, v := range b.rbs {
132                                 // TODO: Is the same addr with different server name a different balancer?
133                                 if update.Addr == v.addr {
134                                         exist = true
135                                         break
136                                 }
137                         }
138                         if exist {
139                                 continue
140                         }
141                         md, ok := update.Metadata.(*naming.AddrMetadataGRPCLB)
142                         if !ok {
143                                 // TODO: Revisit the handling here and may introduce some fallback mechanism.
144                                 grpclog.Errorf("The name resolution contains unexpected metadata %v", update.Metadata)
145                                 continue
146                         }
147                         switch md.AddrType {
148                         case naming.Backend:
149                                 // TODO: Revisit the handling here and may introduce some fallback mechanism.
150                                 grpclog.Errorf("The name resolution does not give grpclb addresses")
151                                 continue
152                         case naming.GRPCLB:
153                                 b.rbs = append(b.rbs, remoteBalancerInfo{
154                                         addr: update.Addr,
155                                         name: md.ServerName,
156                                 })
157                         default:
158                                 grpclog.Errorf("Received unknow address type %d", md.AddrType)
159                                 continue
160                         }
161                 case naming.Delete:
162                         for i, v := range b.rbs {
163                                 if update.Addr == v.addr {
164                                         copy(b.rbs[i:], b.rbs[i+1:])
165                                         b.rbs = b.rbs[:len(b.rbs)-1]
166                                         break
167                                 }
168                         }
169                 default:
170                         grpclog.Errorf("Unknown update.Op %v", update.Op)
171                 }
172         }
173         // TODO: Fall back to the basic round-robin load balancing if the resulting address is
174         // not a load balancer.
175         select {
176         case <-ch:
177         default:
178         }
179         ch <- b.rbs
180         return nil
181 }
182
183 func convertDuration(d *lbmpb.Duration) time.Duration {
184         if d == nil {
185                 return 0
186         }
187         return time.Duration(d.Seconds)*time.Second + time.Duration(d.Nanos)*time.Nanosecond
188 }
189
190 func (b *grpclbBalancer) processServerList(l *lbmpb.ServerList, seq int) {
191         if l == nil {
192                 return
193         }
194         servers := l.GetServers()
195         var (
196                 sl    []*grpclbAddrInfo
197                 addrs []Address
198         )
199         for _, s := range servers {
200                 md := metadata.Pairs("lb-token", s.LoadBalanceToken)
201                 ip := net.IP(s.IpAddress)
202                 ipStr := ip.String()
203                 if ip.To4() == nil {
204                         // Add square brackets to ipv6 addresses, otherwise net.Dial() and
205                         // net.SplitHostPort() will return too many colons error.
206                         ipStr = fmt.Sprintf("[%s]", ipStr)
207                 }
208                 addr := Address{
209                         Addr:     fmt.Sprintf("%s:%d", ipStr, s.Port),
210                         Metadata: &md,
211                 }
212                 sl = append(sl, &grpclbAddrInfo{
213                         addr:                 addr,
214                         dropForRateLimiting:  s.DropForRateLimiting,
215                         dropForLoadBalancing: s.DropForLoadBalancing,
216                 })
217                 addrs = append(addrs, addr)
218         }
219         b.mu.Lock()
220         defer b.mu.Unlock()
221         if b.done || seq < b.seq {
222                 return
223         }
224         if len(sl) > 0 {
225                 // reset b.next to 0 when replacing the server list.
226                 b.next = 0
227                 b.addrs = sl
228                 b.addrCh <- addrs
229         }
230         return
231 }
232
233 func (b *grpclbBalancer) sendLoadReport(s *balanceLoadClientStream, interval time.Duration, done <-chan struct{}) {
234         ticker := time.NewTicker(interval)
235         defer ticker.Stop()
236         for {
237                 select {
238                 case <-ticker.C:
239                 case <-done:
240                         return
241                 }
242                 b.mu.Lock()
243                 stats := b.clientStats
244                 b.clientStats = lbmpb.ClientStats{} // Clear the stats.
245                 b.mu.Unlock()
246                 t := time.Now()
247                 stats.Timestamp = &lbmpb.Timestamp{
248                         Seconds: t.Unix(),
249                         Nanos:   int32(t.Nanosecond()),
250                 }
251                 if err := s.Send(&lbmpb.LoadBalanceRequest{
252                         LoadBalanceRequestType: &lbmpb.LoadBalanceRequest_ClientStats{
253                                 ClientStats: &stats,
254                         },
255                 }); err != nil {
256                         grpclog.Errorf("grpclb: failed to send load report: %v", err)
257                         return
258                 }
259         }
260 }
261
262 func (b *grpclbBalancer) callRemoteBalancer(lbc *loadBalancerClient, seq int) (retry bool) {
263         ctx, cancel := context.WithCancel(context.Background())
264         defer cancel()
265         stream, err := lbc.BalanceLoad(ctx)
266         if err != nil {
267                 grpclog.Errorf("grpclb: failed to perform RPC to the remote balancer %v", err)
268                 return
269         }
270         b.mu.Lock()
271         if b.done {
272                 b.mu.Unlock()
273                 return
274         }
275         b.mu.Unlock()
276         initReq := &lbmpb.LoadBalanceRequest{
277                 LoadBalanceRequestType: &lbmpb.LoadBalanceRequest_InitialRequest{
278                         InitialRequest: &lbmpb.InitialLoadBalanceRequest{
279                                 Name: b.target,
280                         },
281                 },
282         }
283         if err := stream.Send(initReq); err != nil {
284                 grpclog.Errorf("grpclb: failed to send init request: %v", err)
285                 // TODO: backoff on retry?
286                 return true
287         }
288         reply, err := stream.Recv()
289         if err != nil {
290                 grpclog.Errorf("grpclb: failed to recv init response: %v", err)
291                 // TODO: backoff on retry?
292                 return true
293         }
294         initResp := reply.GetInitialResponse()
295         if initResp == nil {
296                 grpclog.Errorf("grpclb: reply from remote balancer did not include initial response.")
297                 return
298         }
299         // TODO: Support delegation.
300         if initResp.LoadBalancerDelegate != "" {
301                 // delegation
302                 grpclog.Errorf("TODO: Delegation is not supported yet.")
303                 return
304         }
305         streamDone := make(chan struct{})
306         defer close(streamDone)
307         b.mu.Lock()
308         b.clientStats = lbmpb.ClientStats{} // Clear client stats.
309         b.mu.Unlock()
310         if d := convertDuration(initResp.ClientStatsReportInterval); d > 0 {
311                 go b.sendLoadReport(stream, d, streamDone)
312         }
313         // Retrieve the server list.
314         for {
315                 reply, err := stream.Recv()
316                 if err != nil {
317                         grpclog.Errorf("grpclb: failed to recv server list: %v", err)
318                         break
319                 }
320                 b.mu.Lock()
321                 if b.done || seq < b.seq {
322                         b.mu.Unlock()
323                         return
324                 }
325                 b.seq++ // tick when receiving a new list of servers.
326                 seq = b.seq
327                 b.mu.Unlock()
328                 if serverList := reply.GetServerList(); serverList != nil {
329                         b.processServerList(serverList, seq)
330                 }
331         }
332         return true
333 }
334
335 func (b *grpclbBalancer) Start(target string, config BalancerConfig) error {
336         b.rand = rand.New(rand.NewSource(time.Now().Unix()))
337         // TODO: Fall back to the basic direct connection if there is no name resolver.
338         if b.r == nil {
339                 return errors.New("there is no name resolver installed")
340         }
341         b.target = target
342         b.mu.Lock()
343         if b.done {
344                 b.mu.Unlock()
345                 return ErrClientConnClosing
346         }
347         b.addrCh = make(chan []Address)
348         w, err := b.r.Resolve(target)
349         if err != nil {
350                 b.mu.Unlock()
351                 grpclog.Errorf("grpclb: failed to resolve address: %v, err: %v", target, err)
352                 return err
353         }
354         b.w = w
355         b.mu.Unlock()
356         balancerAddrsCh := make(chan []remoteBalancerInfo, 1)
357         // Spawn a goroutine to monitor the name resolution of remote load balancer.
358         go func() {
359                 for {
360                         if err := b.watchAddrUpdates(w, balancerAddrsCh); err != nil {
361                                 grpclog.Warningf("grpclb: the naming watcher stops working due to %v.\n", err)
362                                 close(balancerAddrsCh)
363                                 return
364                         }
365                 }
366         }()
367         // Spawn a goroutine to talk to the remote load balancer.
368         go func() {
369                 var (
370                         cc *ClientConn
371                         // ccError is closed when there is an error in the current cc.
372                         // A new rb should be picked from rbs and connected.
373                         ccError chan struct{}
374                         rb      *remoteBalancerInfo
375                         rbs     []remoteBalancerInfo
376                         rbIdx   int
377                 )
378
379                 defer func() {
380                         if ccError != nil {
381                                 select {
382                                 case <-ccError:
383                                 default:
384                                         close(ccError)
385                                 }
386                         }
387                         if cc != nil {
388                                 cc.Close()
389                         }
390                 }()
391
392                 for {
393                         var ok bool
394                         select {
395                         case rbs, ok = <-balancerAddrsCh:
396                                 if !ok {
397                                         return
398                                 }
399                                 foundIdx := -1
400                                 if rb != nil {
401                                         for i, trb := range rbs {
402                                                 if trb == *rb {
403                                                         foundIdx = i
404                                                         break
405                                                 }
406                                         }
407                                 }
408                                 if foundIdx >= 0 {
409                                         if foundIdx >= 1 {
410                                                 // Move the address in use to the beginning of the list.
411                                                 b.rbs[0], b.rbs[foundIdx] = b.rbs[foundIdx], b.rbs[0]
412                                                 rbIdx = 0
413                                         }
414                                         continue // If found, don't dial new cc.
415                                 } else if len(rbs) > 0 {
416                                         // Pick a random one from the list, instead of always using the first one.
417                                         if l := len(rbs); l > 1 && rb != nil {
418                                                 tmpIdx := b.rand.Intn(l - 1)
419                                                 b.rbs[0], b.rbs[tmpIdx] = b.rbs[tmpIdx], b.rbs[0]
420                                         }
421                                         rbIdx = 0
422                                         rb = &rbs[0]
423                                 } else {
424                                         // foundIdx < 0 && len(rbs) <= 0.
425                                         rb = nil
426                                 }
427                         case <-ccError:
428                                 ccError = nil
429                                 if rbIdx < len(rbs)-1 {
430                                         rbIdx++
431                                         rb = &rbs[rbIdx]
432                                 } else {
433                                         rb = nil
434                                 }
435                         }
436
437                         if rb == nil {
438                                 continue
439                         }
440
441                         if cc != nil {
442                                 cc.Close()
443                         }
444                         // Talk to the remote load balancer to get the server list.
445                         var (
446                                 err   error
447                                 dopts []DialOption
448                         )
449                         if creds := config.DialCreds; creds != nil {
450                                 if rb.name != "" {
451                                         if err := creds.OverrideServerName(rb.name); err != nil {
452                                                 grpclog.Warningf("grpclb: failed to override the server name in the credentials: %v", err)
453                                                 continue
454                                         }
455                                 }
456                                 dopts = append(dopts, WithTransportCredentials(creds))
457                         } else {
458                                 dopts = append(dopts, WithInsecure())
459                         }
460                         if dialer := config.Dialer; dialer != nil {
461                                 // WithDialer takes a different type of function, so we instead use a special DialOption here.
462                                 dopts = append(dopts, func(o *dialOptions) { o.copts.Dialer = dialer })
463                         }
464                         dopts = append(dopts, WithBlock())
465                         ccError = make(chan struct{})
466                         ctx, cancel := context.WithTimeout(context.Background(), time.Second)
467                         cc, err = DialContext(ctx, rb.addr, dopts...)
468                         cancel()
469                         if err != nil {
470                                 grpclog.Warningf("grpclb: failed to setup a connection to the remote balancer %v: %v", rb.addr, err)
471                                 close(ccError)
472                                 continue
473                         }
474                         b.mu.Lock()
475                         b.seq++ // tick when getting a new balancer address
476                         seq := b.seq
477                         b.next = 0
478                         b.mu.Unlock()
479                         go func(cc *ClientConn, ccError chan struct{}) {
480                                 lbc := &loadBalancerClient{cc}
481                                 b.callRemoteBalancer(lbc, seq)
482                                 cc.Close()
483                                 select {
484                                 case <-ccError:
485                                 default:
486                                         close(ccError)
487                                 }
488                         }(cc, ccError)
489                 }
490         }()
491         return nil
492 }
493
494 func (b *grpclbBalancer) down(addr Address, err error) {
495         b.mu.Lock()
496         defer b.mu.Unlock()
497         for _, a := range b.addrs {
498                 if addr == a.addr {
499                         a.connected = false
500                         break
501                 }
502         }
503 }
504
505 func (b *grpclbBalancer) Up(addr Address) func(error) {
506         b.mu.Lock()
507         defer b.mu.Unlock()
508         if b.done {
509                 return nil
510         }
511         var cnt int
512         for _, a := range b.addrs {
513                 if a.addr == addr {
514                         if a.connected {
515                                 return nil
516                         }
517                         a.connected = true
518                 }
519                 if a.connected && !a.dropForRateLimiting && !a.dropForLoadBalancing {
520                         cnt++
521                 }
522         }
523         // addr is the only one which is connected. Notify the Get() callers who are blocking.
524         if cnt == 1 && b.waitCh != nil {
525                 close(b.waitCh)
526                 b.waitCh = nil
527         }
528         return func(err error) {
529                 b.down(addr, err)
530         }
531 }
532
533 func (b *grpclbBalancer) Get(ctx context.Context, opts BalancerGetOptions) (addr Address, put func(), err error) {
534         var ch chan struct{}
535         b.mu.Lock()
536         if b.done {
537                 b.mu.Unlock()
538                 err = ErrClientConnClosing
539                 return
540         }
541         seq := b.seq
542
543         defer func() {
544                 if err != nil {
545                         return
546                 }
547                 put = func() {
548                         s, ok := rpcInfoFromContext(ctx)
549                         if !ok {
550                                 return
551                         }
552                         b.mu.Lock()
553                         defer b.mu.Unlock()
554                         if b.done || seq < b.seq {
555                                 return
556                         }
557                         b.clientStats.NumCallsFinished++
558                         if !s.bytesSent {
559                                 b.clientStats.NumCallsFinishedWithClientFailedToSend++
560                         } else if s.bytesReceived {
561                                 b.clientStats.NumCallsFinishedKnownReceived++
562                         }
563                 }
564         }()
565
566         b.clientStats.NumCallsStarted++
567         if len(b.addrs) > 0 {
568                 if b.next >= len(b.addrs) {
569                         b.next = 0
570                 }
571                 next := b.next
572                 for {
573                         a := b.addrs[next]
574                         next = (next + 1) % len(b.addrs)
575                         if a.connected {
576                                 if !a.dropForRateLimiting && !a.dropForLoadBalancing {
577                                         addr = a.addr
578                                         b.next = next
579                                         b.mu.Unlock()
580                                         return
581                                 }
582                                 if !opts.BlockingWait {
583                                         b.next = next
584                                         if a.dropForLoadBalancing {
585                                                 b.clientStats.NumCallsFinished++
586                                                 b.clientStats.NumCallsFinishedWithDropForLoadBalancing++
587                                         } else if a.dropForRateLimiting {
588                                                 b.clientStats.NumCallsFinished++
589                                                 b.clientStats.NumCallsFinishedWithDropForRateLimiting++
590                                         }
591                                         b.mu.Unlock()
592                                         err = Errorf(codes.Unavailable, "%s drops requests", a.addr.Addr)
593                                         return
594                                 }
595                         }
596                         if next == b.next {
597                                 // Has iterated all the possible address but none is connected.
598                                 break
599                         }
600                 }
601         }
602         if !opts.BlockingWait {
603                 b.clientStats.NumCallsFinished++
604                 b.clientStats.NumCallsFinishedWithClientFailedToSend++
605                 b.mu.Unlock()
606                 err = Errorf(codes.Unavailable, "there is no address available")
607                 return
608         }
609         // Wait on b.waitCh for non-failfast RPCs.
610         if b.waitCh == nil {
611                 ch = make(chan struct{})
612                 b.waitCh = ch
613         } else {
614                 ch = b.waitCh
615         }
616         b.mu.Unlock()
617         for {
618                 select {
619                 case <-ctx.Done():
620                         b.mu.Lock()
621                         b.clientStats.NumCallsFinished++
622                         b.clientStats.NumCallsFinishedWithClientFailedToSend++
623                         b.mu.Unlock()
624                         err = ctx.Err()
625                         return
626                 case <-ch:
627                         b.mu.Lock()
628                         if b.done {
629                                 b.clientStats.NumCallsFinished++
630                                 b.clientStats.NumCallsFinishedWithClientFailedToSend++
631                                 b.mu.Unlock()
632                                 err = ErrClientConnClosing
633                                 return
634                         }
635
636                         if len(b.addrs) > 0 {
637                                 if b.next >= len(b.addrs) {
638                                         b.next = 0
639                                 }
640                                 next := b.next
641                                 for {
642                                         a := b.addrs[next]
643                                         next = (next + 1) % len(b.addrs)
644                                         if a.connected {
645                                                 if !a.dropForRateLimiting && !a.dropForLoadBalancing {
646                                                         addr = a.addr
647                                                         b.next = next
648                                                         b.mu.Unlock()
649                                                         return
650                                                 }
651                                                 if !opts.BlockingWait {
652                                                         b.next = next
653                                                         if a.dropForLoadBalancing {
654                                                                 b.clientStats.NumCallsFinished++
655                                                                 b.clientStats.NumCallsFinishedWithDropForLoadBalancing++
656                                                         } else if a.dropForRateLimiting {
657                                                                 b.clientStats.NumCallsFinished++
658                                                                 b.clientStats.NumCallsFinishedWithDropForRateLimiting++
659                                                         }
660                                                         b.mu.Unlock()
661                                                         err = Errorf(codes.Unavailable, "drop requests for the addreess %s", a.addr.Addr)
662                                                         return
663                                                 }
664                                         }
665                                         if next == b.next {
666                                                 // Has iterated all the possible address but none is connected.
667                                                 break
668                                         }
669                                 }
670                         }
671                         // The newly added addr got removed by Down() again.
672                         if b.waitCh == nil {
673                                 ch = make(chan struct{})
674                                 b.waitCh = ch
675                         } else {
676                                 ch = b.waitCh
677                         }
678                         b.mu.Unlock()
679                 }
680         }
681 }
682
683 func (b *grpclbBalancer) Notify() <-chan []Address {
684         return b.addrCh
685 }
686
687 func (b *grpclbBalancer) Close() error {
688         b.mu.Lock()
689         defer b.mu.Unlock()
690         if b.done {
691                 return errBalancerClosed
692         }
693         b.done = true
694         if b.waitCh != nil {
695                 close(b.waitCh)
696         }
697         if b.addrCh != nil {
698                 close(b.addrCh)
699         }
700         if b.w != nil {
701                 b.w.Close()
702         }
703         return nil
704 }