3 * Copyright 2016 gRPC authors.
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
9 * http://www.apache.org/licenses/LICENSE-2.0
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
29 "golang.org/x/net/context"
30 "google.golang.org/grpc/codes"
31 lbmpb "google.golang.org/grpc/grpclb/grpc_lb_v1/messages"
32 "google.golang.org/grpc/grpclog"
33 "google.golang.org/grpc/metadata"
34 "google.golang.org/grpc/naming"
37 // Client API for LoadBalancer service.
38 // Mostly copied from generated pb.go file.
39 // To avoid circular dependency.
40 type loadBalancerClient struct {
44 func (c *loadBalancerClient) BalanceLoad(ctx context.Context, opts ...CallOption) (*balanceLoadClientStream, error) {
46 StreamName: "BalanceLoad",
50 stream, err := NewClientStream(ctx, desc, c.cc, "/grpc.lb.v1.LoadBalancer/BalanceLoad", opts...)
54 x := &balanceLoadClientStream{stream}
58 type balanceLoadClientStream struct {
62 func (x *balanceLoadClientStream) Send(m *lbmpb.LoadBalanceRequest) error {
63 return x.ClientStream.SendMsg(m)
66 func (x *balanceLoadClientStream) Recv() (*lbmpb.LoadBalanceResponse, error) {
67 m := new(lbmpb.LoadBalanceResponse)
68 if err := x.ClientStream.RecvMsg(m); err != nil {
74 // NewGRPCLBBalancer creates a grpclb load balancer.
75 func NewGRPCLBBalancer(r naming.Resolver) Balancer {
76 return &grpclbBalancer{
81 type remoteBalancerInfo struct {
83 // the server name used for authentication with the remote LB server.
87 // grpclbAddrInfo consists of the information of a backend server.
88 type grpclbAddrInfo struct {
91 // dropForRateLimiting indicates whether this particular request should be
92 // dropped by the client for rate limiting.
93 dropForRateLimiting bool
94 // dropForLoadBalancing indicates whether this particular request should be
95 // dropped by the client for load balancing.
96 dropForLoadBalancing bool
99 type grpclbBalancer struct {
103 seq int // a sequence number to make sure addrCh does not get stale addresses.
105 addrCh chan []Address
106 rbs []remoteBalancerInfo
107 addrs []*grpclbAddrInfo
113 clientStats lbmpb.ClientStats
116 func (b *grpclbBalancer) watchAddrUpdates(w naming.Watcher, ch chan []remoteBalancerInfo) error {
117 updates, err := w.Next()
119 grpclog.Warningf("grpclb: failed to get next addr update from watcher: %v", err)
125 return ErrClientConnClosing
127 for _, update := range updates {
131 for _, v := range b.rbs {
132 // TODO: Is the same addr with different server name a different balancer?
133 if update.Addr == v.addr {
141 md, ok := update.Metadata.(*naming.AddrMetadataGRPCLB)
143 // TODO: Revisit the handling here and may introduce some fallback mechanism.
144 grpclog.Errorf("The name resolution contains unexpected metadata %v", update.Metadata)
149 // TODO: Revisit the handling here and may introduce some fallback mechanism.
150 grpclog.Errorf("The name resolution does not give grpclb addresses")
153 b.rbs = append(b.rbs, remoteBalancerInfo{
158 grpclog.Errorf("Received unknow address type %d", md.AddrType)
162 for i, v := range b.rbs {
163 if update.Addr == v.addr {
164 copy(b.rbs[i:], b.rbs[i+1:])
165 b.rbs = b.rbs[:len(b.rbs)-1]
170 grpclog.Errorf("Unknown update.Op %v", update.Op)
173 // TODO: Fall back to the basic round-robin load balancing if the resulting address is
174 // not a load balancer.
183 func convertDuration(d *lbmpb.Duration) time.Duration {
187 return time.Duration(d.Seconds)*time.Second + time.Duration(d.Nanos)*time.Nanosecond
190 func (b *grpclbBalancer) processServerList(l *lbmpb.ServerList, seq int) {
194 servers := l.GetServers()
199 for _, s := range servers {
200 md := metadata.Pairs("lb-token", s.LoadBalanceToken)
201 ip := net.IP(s.IpAddress)
204 // Add square brackets to ipv6 addresses, otherwise net.Dial() and
205 // net.SplitHostPort() will return too many colons error.
206 ipStr = fmt.Sprintf("[%s]", ipStr)
209 Addr: fmt.Sprintf("%s:%d", ipStr, s.Port),
212 sl = append(sl, &grpclbAddrInfo{
214 dropForRateLimiting: s.DropForRateLimiting,
215 dropForLoadBalancing: s.DropForLoadBalancing,
217 addrs = append(addrs, addr)
221 if b.done || seq < b.seq {
225 // reset b.next to 0 when replacing the server list.
233 func (b *grpclbBalancer) sendLoadReport(s *balanceLoadClientStream, interval time.Duration, done <-chan struct{}) {
234 ticker := time.NewTicker(interval)
243 stats := b.clientStats
244 b.clientStats = lbmpb.ClientStats{} // Clear the stats.
247 stats.Timestamp = &lbmpb.Timestamp{
249 Nanos: int32(t.Nanosecond()),
251 if err := s.Send(&lbmpb.LoadBalanceRequest{
252 LoadBalanceRequestType: &lbmpb.LoadBalanceRequest_ClientStats{
256 grpclog.Errorf("grpclb: failed to send load report: %v", err)
262 func (b *grpclbBalancer) callRemoteBalancer(lbc *loadBalancerClient, seq int) (retry bool) {
263 ctx, cancel := context.WithCancel(context.Background())
265 stream, err := lbc.BalanceLoad(ctx)
267 grpclog.Errorf("grpclb: failed to perform RPC to the remote balancer %v", err)
276 initReq := &lbmpb.LoadBalanceRequest{
277 LoadBalanceRequestType: &lbmpb.LoadBalanceRequest_InitialRequest{
278 InitialRequest: &lbmpb.InitialLoadBalanceRequest{
283 if err := stream.Send(initReq); err != nil {
284 grpclog.Errorf("grpclb: failed to send init request: %v", err)
285 // TODO: backoff on retry?
288 reply, err := stream.Recv()
290 grpclog.Errorf("grpclb: failed to recv init response: %v", err)
291 // TODO: backoff on retry?
294 initResp := reply.GetInitialResponse()
296 grpclog.Errorf("grpclb: reply from remote balancer did not include initial response.")
299 // TODO: Support delegation.
300 if initResp.LoadBalancerDelegate != "" {
302 grpclog.Errorf("TODO: Delegation is not supported yet.")
305 streamDone := make(chan struct{})
306 defer close(streamDone)
308 b.clientStats = lbmpb.ClientStats{} // Clear client stats.
310 if d := convertDuration(initResp.ClientStatsReportInterval); d > 0 {
311 go b.sendLoadReport(stream, d, streamDone)
313 // Retrieve the server list.
315 reply, err := stream.Recv()
317 grpclog.Errorf("grpclb: failed to recv server list: %v", err)
321 if b.done || seq < b.seq {
325 b.seq++ // tick when receiving a new list of servers.
328 if serverList := reply.GetServerList(); serverList != nil {
329 b.processServerList(serverList, seq)
335 func (b *grpclbBalancer) Start(target string, config BalancerConfig) error {
336 b.rand = rand.New(rand.NewSource(time.Now().Unix()))
337 // TODO: Fall back to the basic direct connection if there is no name resolver.
339 return errors.New("there is no name resolver installed")
345 return ErrClientConnClosing
347 b.addrCh = make(chan []Address)
348 w, err := b.r.Resolve(target)
351 grpclog.Errorf("grpclb: failed to resolve address: %v, err: %v", target, err)
356 balancerAddrsCh := make(chan []remoteBalancerInfo, 1)
357 // Spawn a goroutine to monitor the name resolution of remote load balancer.
360 if err := b.watchAddrUpdates(w, balancerAddrsCh); err != nil {
361 grpclog.Warningf("grpclb: the naming watcher stops working due to %v.\n", err)
362 close(balancerAddrsCh)
367 // Spawn a goroutine to talk to the remote load balancer.
371 // ccError is closed when there is an error in the current cc.
372 // A new rb should be picked from rbs and connected.
373 ccError chan struct{}
374 rb *remoteBalancerInfo
375 rbs []remoteBalancerInfo
395 case rbs, ok = <-balancerAddrsCh:
401 for i, trb := range rbs {
410 // Move the address in use to the beginning of the list.
411 b.rbs[0], b.rbs[foundIdx] = b.rbs[foundIdx], b.rbs[0]
414 continue // If found, don't dial new cc.
415 } else if len(rbs) > 0 {
416 // Pick a random one from the list, instead of always using the first one.
417 if l := len(rbs); l > 1 && rb != nil {
418 tmpIdx := b.rand.Intn(l - 1)
419 b.rbs[0], b.rbs[tmpIdx] = b.rbs[tmpIdx], b.rbs[0]
424 // foundIdx < 0 && len(rbs) <= 0.
429 if rbIdx < len(rbs)-1 {
444 // Talk to the remote load balancer to get the server list.
449 if creds := config.DialCreds; creds != nil {
451 if err := creds.OverrideServerName(rb.name); err != nil {
452 grpclog.Warningf("grpclb: failed to override the server name in the credentials: %v", err)
456 dopts = append(dopts, WithTransportCredentials(creds))
458 dopts = append(dopts, WithInsecure())
460 if dialer := config.Dialer; dialer != nil {
461 // WithDialer takes a different type of function, so we instead use a special DialOption here.
462 dopts = append(dopts, func(o *dialOptions) { o.copts.Dialer = dialer })
464 dopts = append(dopts, WithBlock())
465 ccError = make(chan struct{})
466 ctx, cancel := context.WithTimeout(context.Background(), time.Second)
467 cc, err = DialContext(ctx, rb.addr, dopts...)
470 grpclog.Warningf("grpclb: failed to setup a connection to the remote balancer %v: %v", rb.addr, err)
475 b.seq++ // tick when getting a new balancer address
479 go func(cc *ClientConn, ccError chan struct{}) {
480 lbc := &loadBalancerClient{cc}
481 b.callRemoteBalancer(lbc, seq)
494 func (b *grpclbBalancer) down(addr Address, err error) {
497 for _, a := range b.addrs {
505 func (b *grpclbBalancer) Up(addr Address) func(error) {
512 for _, a := range b.addrs {
519 if a.connected && !a.dropForRateLimiting && !a.dropForLoadBalancing {
523 // addr is the only one which is connected. Notify the Get() callers who are blocking.
524 if cnt == 1 && b.waitCh != nil {
528 return func(err error) {
533 func (b *grpclbBalancer) Get(ctx context.Context, opts BalancerGetOptions) (addr Address, put func(), err error) {
538 err = ErrClientConnClosing
548 s, ok := rpcInfoFromContext(ctx)
554 if b.done || seq < b.seq {
557 b.clientStats.NumCallsFinished++
559 b.clientStats.NumCallsFinishedWithClientFailedToSend++
560 } else if s.bytesReceived {
561 b.clientStats.NumCallsFinishedKnownReceived++
566 b.clientStats.NumCallsStarted++
567 if len(b.addrs) > 0 {
568 if b.next >= len(b.addrs) {
574 next = (next + 1) % len(b.addrs)
576 if !a.dropForRateLimiting && !a.dropForLoadBalancing {
582 if !opts.BlockingWait {
584 if a.dropForLoadBalancing {
585 b.clientStats.NumCallsFinished++
586 b.clientStats.NumCallsFinishedWithDropForLoadBalancing++
587 } else if a.dropForRateLimiting {
588 b.clientStats.NumCallsFinished++
589 b.clientStats.NumCallsFinishedWithDropForRateLimiting++
592 err = Errorf(codes.Unavailable, "%s drops requests", a.addr.Addr)
597 // Has iterated all the possible address but none is connected.
602 if !opts.BlockingWait {
603 b.clientStats.NumCallsFinished++
604 b.clientStats.NumCallsFinishedWithClientFailedToSend++
606 err = Errorf(codes.Unavailable, "there is no address available")
609 // Wait on b.waitCh for non-failfast RPCs.
611 ch = make(chan struct{})
621 b.clientStats.NumCallsFinished++
622 b.clientStats.NumCallsFinishedWithClientFailedToSend++
629 b.clientStats.NumCallsFinished++
630 b.clientStats.NumCallsFinishedWithClientFailedToSend++
632 err = ErrClientConnClosing
636 if len(b.addrs) > 0 {
637 if b.next >= len(b.addrs) {
643 next = (next + 1) % len(b.addrs)
645 if !a.dropForRateLimiting && !a.dropForLoadBalancing {
651 if !opts.BlockingWait {
653 if a.dropForLoadBalancing {
654 b.clientStats.NumCallsFinished++
655 b.clientStats.NumCallsFinishedWithDropForLoadBalancing++
656 } else if a.dropForRateLimiting {
657 b.clientStats.NumCallsFinished++
658 b.clientStats.NumCallsFinishedWithDropForRateLimiting++
661 err = Errorf(codes.Unavailable, "drop requests for the addreess %s", a.addr.Addr)
666 // Has iterated all the possible address but none is connected.
671 // The newly added addr got removed by Down() again.
673 ch = make(chan struct{})
683 func (b *grpclbBalancer) Notify() <-chan []Address {
687 func (b *grpclbBalancer) Close() error {
691 return errBalancerClosed