/* * * Copyright 2017 gRPC authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. * */ // Package roundrobin defines a roundrobin balancer. Roundrobin balancer is // installed as one of the default balancers in gRPC, users don't need to // explicitly install this balancer. package roundrobin import ( "sync" "golang.org/x/net/context" "google.golang.org/grpc/balancer" "google.golang.org/grpc/connectivity" "google.golang.org/grpc/grpclog" "google.golang.org/grpc/resolver" ) // newBuilder creates a new roundrobin balancer builder. func newBuilder() balancer.Builder { return &rrBuilder{} } func init() { balancer.Register(newBuilder()) } type rrBuilder struct{} func (*rrBuilder) Build(cc balancer.ClientConn, opt balancer.BuildOptions) balancer.Balancer { return &rrBalancer{ cc: cc, subConns: make(map[resolver.Address]balancer.SubConn), scStates: make(map[balancer.SubConn]connectivity.State), csEvltr: &connectivityStateEvaluator{}, // Initialize picker to a picker that always return // ErrNoSubConnAvailable, because when state of a SubConn changes, we // may call UpdateBalancerState with this picker. picker: newPicker([]balancer.SubConn{}, nil), } } func (*rrBuilder) Name() string { return "roundrobin" } type rrBalancer struct { cc balancer.ClientConn csEvltr *connectivityStateEvaluator state connectivity.State subConns map[resolver.Address]balancer.SubConn scStates map[balancer.SubConn]connectivity.State picker *picker } func (b *rrBalancer) HandleResolvedAddrs(addrs []resolver.Address, err error) { if err != nil { grpclog.Infof("roundrobin.rrBalancer: HandleResolvedAddrs called with error %v", err) return } grpclog.Infoln("roundrobin.rrBalancer: got new resolved addresses: ", addrs) // addrsSet is the set converted from addrs, it's used for quick lookup of an address. addrsSet := make(map[resolver.Address]struct{}) for _, a := range addrs { addrsSet[a] = struct{}{} if _, ok := b.subConns[a]; !ok { // a is a new address (not existing in b.subConns). sc, err := b.cc.NewSubConn([]resolver.Address{a}, balancer.NewSubConnOptions{}) if err != nil { grpclog.Warningf("roundrobin.rrBalancer: failed to create new SubConn: %v", err) continue } b.subConns[a] = sc b.scStates[sc] = connectivity.Idle sc.Connect() } } for a, sc := range b.subConns { // a was removed by resolver. if _, ok := addrsSet[a]; !ok { b.cc.RemoveSubConn(sc) delete(b.subConns, a) // Keep the state of this sc in b.scStates until sc's state becomes Shutdown. // The entry will be deleted in HandleSubConnStateChange. } } } // regeneratePicker takes a snapshot of the balancer, and generates a picker // from it. The picker // - always returns ErrTransientFailure if the balancer is in TransientFailure, // - or does round robin selection of all READY SubConns otherwise. func (b *rrBalancer) regeneratePicker() { if b.state == connectivity.TransientFailure { b.picker = newPicker(nil, balancer.ErrTransientFailure) return } var readySCs []balancer.SubConn for sc, st := range b.scStates { if st == connectivity.Ready { readySCs = append(readySCs, sc) } } b.picker = newPicker(readySCs, nil) } func (b *rrBalancer) HandleSubConnStateChange(sc balancer.SubConn, s connectivity.State) { grpclog.Infof("roundrobin.rrBalancer: handle SubConn state change: %p, %v", sc, s) oldS, ok := b.scStates[sc] if !ok { grpclog.Infof("roundrobin.rrBalancer: got state changes for an unknown SubConn: %p, %v", sc, s) return } b.scStates[sc] = s switch s { case connectivity.Idle: sc.Connect() case connectivity.Shutdown: // When an address was removed by resolver, b called RemoveSubConn but // kept the sc's state in scStates. Remove state for this sc here. delete(b.scStates, sc) } oldAggrState := b.state b.state = b.csEvltr.recordTransition(oldS, s) // Regenerate picker when one of the following happens: // - this sc became ready from not-ready // - this sc became not-ready from ready // - the aggregated state of balancer became TransientFailure from non-TransientFailure // - the aggregated state of balancer became non-TransientFailure from TransientFailure if (s == connectivity.Ready) != (oldS == connectivity.Ready) || (b.state == connectivity.TransientFailure) != (oldAggrState == connectivity.TransientFailure) { b.regeneratePicker() } b.cc.UpdateBalancerState(b.state, b.picker) return } // Close is a nop because roundrobin balancer doesn't internal state to clean // up, and it doesn't need to call RemoveSubConn for the SubConns. func (b *rrBalancer) Close() { } type picker struct { // If err is not nil, Pick always returns this err. It's immutable after // picker is created. err error // subConns is the snapshot of the roundrobin balancer when this picker was // created. The slice is immutable. Each Get() will do a round robin // selection from it and return the selected SubConn. subConns []balancer.SubConn mu sync.Mutex next int } func newPicker(scs []balancer.SubConn, err error) *picker { grpclog.Infof("roundrobinPicker: newPicker called with scs: %v, %v", scs, err) if err != nil { return &picker{err: err} } return &picker{ subConns: scs, } } func (p *picker) Pick(ctx context.Context, opts balancer.PickOptions) (balancer.SubConn, func(balancer.DoneInfo), error) { if p.err != nil { return nil, nil, p.err } if len(p.subConns) <= 0 { return nil, nil, balancer.ErrNoSubConnAvailable } p.mu.Lock() sc := p.subConns[p.next] p.next = (p.next + 1) % len(p.subConns) p.mu.Unlock() return sc, nil, nil } // connectivityStateEvaluator gets updated by addrConns when their // states transition, based on which it evaluates the state of // ClientConn. type connectivityStateEvaluator struct { numReady uint64 // Number of addrConns in ready state. numConnecting uint64 // Number of addrConns in connecting state. numTransientFailure uint64 // Number of addrConns in transientFailure. } // recordTransition records state change happening in every subConn and based on // that it evaluates what aggregated state should be. // It can only transition between Ready, Connecting and TransientFailure. Other states, // Idle and Shutdown are transitioned into by ClientConn; in the beginning of the connection // before any subConn is created ClientConn is in idle state. In the end when ClientConn // closes it is in Shutdown state. // // recordTransition should only be called synchronously from the same goroutine. func (cse *connectivityStateEvaluator) recordTransition(oldState, newState connectivity.State) connectivity.State { // Update counters. for idx, state := range []connectivity.State{oldState, newState} { updateVal := 2*uint64(idx) - 1 // -1 for oldState and +1 for new. switch state { case connectivity.Ready: cse.numReady += updateVal case connectivity.Connecting: cse.numConnecting += updateVal case connectivity.TransientFailure: cse.numTransientFailure += updateVal } } // Evaluate. if cse.numReady > 0 { return connectivity.Ready } if cse.numConnecting > 0 { return connectivity.Connecting } return connectivity.TransientFailure }