3 * Copyright 2017 gRPC authors.
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
9 * http://www.apache.org/licenses/LICENSE-2.0
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
25 "golang.org/x/net/context"
26 "google.golang.org/grpc/balancer"
27 "google.golang.org/grpc/codes"
28 "google.golang.org/grpc/connectivity"
29 "google.golang.org/grpc/grpclog"
30 "google.golang.org/grpc/resolver"
33 type balancerWrapperBuilder struct {
34 b Balancer // The v1 balancer.
37 func (bwb *balancerWrapperBuilder) Build(cc balancer.ClientConn, opts balancer.BuildOptions) balancer.Balancer {
38 targetAddr := cc.Target()
39 targetSplitted := strings.Split(targetAddr, ":///")
40 if len(targetSplitted) >= 2 {
41 targetAddr = targetSplitted[1]
44 bwb.b.Start(targetAddr, BalancerConfig{
45 DialCreds: opts.DialCreds,
48 _, pickfirst := bwb.b.(*pickFirst)
49 bw := &balancerWrapper{
53 targetAddr: targetAddr,
54 startCh: make(chan struct{}),
55 conns: make(map[resolver.Address]balancer.SubConn),
56 connSt: make(map[balancer.SubConn]*scState),
57 csEvltr: &connectivityStateEvaluator{},
58 state: connectivity.Idle,
60 cc.UpdateBalancerState(connectivity.Idle, bw)
65 func (bwb *balancerWrapperBuilder) Name() string {
70 addr Address // The v1 address type.
75 type balancerWrapper struct {
76 balancer Balancer // The v1 balancer.
79 cc balancer.ClientConn
80 targetAddr string // Target without the scheme.
82 // To aggregate the connectivity state.
83 csEvltr *connectivityStateEvaluator
84 state connectivity.State
87 conns map[resolver.Address]balancer.SubConn
88 connSt map[balancer.SubConn]*scState
89 // This channel is closed when handling the first resolver result.
90 // lbWatcher blocks until this is closed, to avoid race between
91 // - NewSubConn is created, cc wants to notify balancer of state changes;
92 // - Build hasn't return, cc doesn't have access to balancer.
96 // lbWatcher watches the Notify channel of the balancer and manages
97 // connections accordingly.
98 func (bw *balancerWrapper) lbWatcher() {
100 notifyCh := bw.balancer.Notify()
102 // There's no resolver in the balancer. Connect directly.
103 a := resolver.Address{
105 Type: resolver.Backend,
107 sc, err := bw.cc.NewSubConn([]resolver.Address{a}, balancer.NewSubConnOptions{})
109 grpclog.Warningf("Error creating connection to %v. Err: %v", a, err)
113 bw.connSt[sc] = &scState{
114 addr: Address{Addr: bw.targetAddr},
115 s: connectivity.Idle,
123 for addrs := range notifyCh {
124 grpclog.Infof("balancerWrapper: got update addr from Notify: %v\n", addrs)
127 oldA resolver.Address
128 oldSC balancer.SubConn
131 for oldA, oldSC = range bw.conns {
139 delete(bw.conns, oldA)
140 delete(bw.connSt, oldSC)
142 bw.cc.RemoveSubConn(oldSC)
147 var newAddrs []resolver.Address
148 for _, a := range addrs {
149 newAddr := resolver.Address{
151 Type: resolver.Backend, // All addresses from balancer are all backends.
153 Metadata: a.Metadata,
155 newAddrs = append(newAddrs, newAddr)
159 sc, err := bw.cc.NewSubConn(newAddrs, balancer.NewSubConnOptions{})
161 grpclog.Warningf("Error creating connection to %v. Err: %v", newAddrs, err)
164 // For pickfirst, there should be only one SubConn, so the
165 // address doesn't matter. All states updating (up and down)
166 // and picking should all happen on that only SubConn.
167 bw.conns[resolver.Address{}] = sc
168 bw.connSt[sc] = &scState{
169 addr: addrs[0], // Use the first address.
170 s: connectivity.Idle,
176 oldSC.UpdateAddresses(newAddrs)
178 bw.connSt[oldSC].addr = addrs[0]
183 add []resolver.Address // Addresses need to setup connections.
184 del []balancer.SubConn // Connections need to tear down.
186 resAddrs := make(map[resolver.Address]Address)
187 for _, a := range addrs {
188 resAddrs[resolver.Address{
190 Type: resolver.Backend, // All addresses from balancer are all backends.
192 Metadata: a.Metadata,
196 for a := range resAddrs {
197 if _, ok := bw.conns[a]; !ok {
201 for a, c := range bw.conns {
202 if _, ok := resAddrs[a]; !ok {
205 // Keep the state of this sc in bw.connSt until its state becomes Shutdown.
209 for _, a := range add {
210 sc, err := bw.cc.NewSubConn([]resolver.Address{a}, balancer.NewSubConnOptions{})
212 grpclog.Warningf("Error creating connection to %v. Err: %v", a, err)
216 bw.connSt[sc] = &scState{
218 s: connectivity.Idle,
224 for _, c := range del {
225 bw.cc.RemoveSubConn(c)
231 func (bw *balancerWrapper) HandleSubConnStateChange(sc balancer.SubConn, s connectivity.State) {
234 scSt, ok := bw.connSt[sc]
238 if s == connectivity.Idle {
243 if oldS != connectivity.Ready && s == connectivity.Ready {
244 scSt.down = bw.balancer.Up(scSt.addr)
245 } else if oldS == connectivity.Ready && s != connectivity.Ready {
246 if scSt.down != nil {
247 scSt.down(errConnClosing)
250 sa := bw.csEvltr.recordTransition(oldS, s)
254 bw.cc.UpdateBalancerState(bw.state, bw)
255 if s == connectivity.Shutdown {
256 // Remove state for this sc.
257 delete(bw.connSt, sc)
262 func (bw *balancerWrapper) HandleResolvedAddrs([]resolver.Address, error) {
270 // There should be a resolver inside the balancer.
271 // All updates here, if any, are ignored.
275 func (bw *balancerWrapper) Close() {
287 // The picker is the balancerWrapper itself.
288 // Pick should never return ErrNoSubConnAvailable.
289 // It either blocks or returns error, consistent with v1 balancer Get().
290 func (bw *balancerWrapper) Pick(ctx context.Context, opts balancer.PickOptions) (balancer.SubConn, func(balancer.DoneInfo), error) {
291 failfast := true // Default failfast is true.
292 if ss, ok := rpcInfoFromContext(ctx); ok {
293 failfast = ss.failfast
295 a, p, err := bw.balancer.Get(ctx, BalancerGetOptions{BlockingWait: !failfast})
299 var done func(balancer.DoneInfo)
301 done = func(i balancer.DoneInfo) { p() }
303 var sc balancer.SubConn
307 // Get the first sc in conns.
308 for _, sc = range bw.conns {
313 sc, ok = bw.conns[resolver.Address{
315 Type: resolver.Backend,
317 Metadata: a.Metadata,
320 return nil, nil, Errorf(codes.Unavailable, "there is no connection available")
322 if s, ok := bw.connSt[sc]; failfast && (!ok || s.s != connectivity.Ready) {
323 // If the returned sc is not ready and RPC is failfast,
324 // return error, and this RPC will fail.
325 return nil, nil, Errorf(codes.Unavailable, "there is no connection available")
332 // connectivityStateEvaluator gets updated by addrConns when their
333 // states transition, based on which it evaluates the state of
335 type connectivityStateEvaluator struct {
337 numReady uint64 // Number of addrConns in ready state.
338 numConnecting uint64 // Number of addrConns in connecting state.
339 numTransientFailure uint64 // Number of addrConns in transientFailure.
342 // recordTransition records state change happening in every subConn and based on
343 // that it evaluates what aggregated state should be.
344 // It can only transition between Ready, Connecting and TransientFailure. Other states,
345 // Idle and Shutdown are transitioned into by ClientConn; in the beginning of the connection
346 // before any subConn is created ClientConn is in idle state. In the end when ClientConn
347 // closes it is in Shutdown state.
348 // TODO Note that in later releases, a ClientConn with no activity will be put into an Idle state.
349 func (cse *connectivityStateEvaluator) recordTransition(oldState, newState connectivity.State) connectivity.State {
351 defer cse.mu.Unlock()
354 for idx, state := range []connectivity.State{oldState, newState} {
355 updateVal := 2*uint64(idx) - 1 // -1 for oldState and +1 for new.
357 case connectivity.Ready:
358 cse.numReady += updateVal
359 case connectivity.Connecting:
360 cse.numConnecting += updateVal
361 case connectivity.TransientFailure:
362 cse.numTransientFailure += updateVal
367 if cse.numReady > 0 {
368 return connectivity.Ready
370 if cse.numConnecting > 0 {
371 return connectivity.Connecting
373 return connectivity.TransientFailure