3 * Copyright 2017 gRPC authors.
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
9 * http://www.apache.org/licenses/LICENSE-2.0
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
24 "google.golang.org/grpc/balancer"
25 "google.golang.org/grpc/connectivity"
26 "google.golang.org/grpc/grpclog"
27 "google.golang.org/grpc/resolver"
30 // scStateUpdate contains the subConn and the new state it changed to.
31 type scStateUpdate struct {
33 state connectivity.State
36 // scStateUpdateBuffer is an unbounded channel for scStateChangeTuple.
37 // TODO make a general purpose buffer that uses interface{}.
38 type scStateUpdateBuffer struct {
41 backlog []*scStateUpdate
44 func newSCStateUpdateBuffer() *scStateUpdateBuffer {
45 return &scStateUpdateBuffer{
46 c: make(chan *scStateUpdate, 1),
50 func (b *scStateUpdateBuffer) put(t *scStateUpdate) {
53 if len(b.backlog) == 0 {
60 b.backlog = append(b.backlog, t)
63 func (b *scStateUpdateBuffer) load() {
66 if len(b.backlog) > 0 {
68 case b.c <- b.backlog[0]:
70 b.backlog = b.backlog[1:]
76 // get returns the channel that the scStateUpdate will be sent to.
78 // Upon receiving, the caller should call load to send another
79 // scStateChangeTuple onto the channel if there is any.
80 func (b *scStateUpdateBuffer) get() <-chan *scStateUpdate {
84 // resolverUpdate contains the new resolved addresses or error if there's
86 type resolverUpdate struct {
87 addrs []resolver.Address
91 // ccBalancerWrapper is a wrapper on top of cc for balancers.
92 // It implements balancer.ClientConn interface.
93 type ccBalancerWrapper struct {
95 balancer balancer.Balancer
96 stateChangeQueue *scStateUpdateBuffer
97 resolverUpdateCh chan *resolverUpdate
100 subConns map[*acBalancerWrapper]struct{}
103 func newCCBalancerWrapper(cc *ClientConn, b balancer.Builder, bopts balancer.BuildOptions) *ccBalancerWrapper {
104 ccb := &ccBalancerWrapper{
106 stateChangeQueue: newSCStateUpdateBuffer(),
107 resolverUpdateCh: make(chan *resolverUpdate, 1),
108 done: make(chan struct{}),
109 subConns: make(map[*acBalancerWrapper]struct{}),
112 ccb.balancer = b.Build(ccb, bopts)
116 // watcher balancer functions sequencially, so the balancer can be implemeneted
118 func (ccb *ccBalancerWrapper) watcher() {
121 case t := <-ccb.stateChangeQueue.get():
122 ccb.stateChangeQueue.load()
129 ccb.balancer.HandleSubConnStateChange(t.sc, t.state)
130 case t := <-ccb.resolverUpdateCh:
137 ccb.balancer.HandleResolvedAddrs(t.addrs, t.err)
144 for acbw := range ccb.subConns {
145 ccb.cc.removeAddrConn(acbw.getAddrConn(), errConnDrain)
153 func (ccb *ccBalancerWrapper) close() {
157 func (ccb *ccBalancerWrapper) handleSubConnStateChange(sc balancer.SubConn, s connectivity.State) {
158 // When updating addresses for a SubConn, if the address in use is not in
159 // the new addresses, the old ac will be tearDown() and a new ac will be
160 // created. tearDown() generates a state change with Shutdown state, we
161 // don't want the balancer to receive this state change. So before
162 // tearDown() on the old ac, ac.acbw (acWrapper) will be set to nil, and
163 // this function will be called with (nil, Shutdown). We don't need to call
164 // balancer method in this case.
168 ccb.stateChangeQueue.put(&scStateUpdate{
174 func (ccb *ccBalancerWrapper) handleResolvedAddrs(addrs []resolver.Address, err error) {
176 case <-ccb.resolverUpdateCh:
179 ccb.resolverUpdateCh <- &resolverUpdate{
185 func (ccb *ccBalancerWrapper) NewSubConn(addrs []resolver.Address, opts balancer.NewSubConnOptions) (balancer.SubConn, error) {
186 ac, err := ccb.cc.newAddrConn(addrs)
190 acbw := &acBalancerWrapper{ac: ac}
194 ccb.subConns[acbw] = struct{}{}
198 func (ccb *ccBalancerWrapper) RemoveSubConn(sc balancer.SubConn) {
199 acbw, ok := sc.(*acBalancerWrapper)
203 delete(ccb.subConns, acbw)
204 ccb.cc.removeAddrConn(acbw.getAddrConn(), errConnDrain)
207 func (ccb *ccBalancerWrapper) UpdateBalancerState(s connectivity.State, p balancer.Picker) {
208 ccb.cc.csMgr.updateState(s)
209 ccb.cc.blockingpicker.updatePicker(p)
212 func (ccb *ccBalancerWrapper) Target() string {
216 // acBalancerWrapper is a wrapper on top of ac for balancers.
217 // It implements balancer.SubConn interface.
218 type acBalancerWrapper struct {
223 func (acbw *acBalancerWrapper) UpdateAddresses(addrs []resolver.Address) {
225 defer acbw.mu.Unlock()
226 if !acbw.ac.tryUpdateAddrs(addrs) {
229 // Set old ac.acbw to nil so the Shutdown state update will be ignored
232 // TODO(bar) the state transition could be wrong when tearDown() old ac
233 // and creating new ac, fix the transition.
236 acState := acbw.ac.getState()
237 acbw.ac.tearDown(errConnDrain)
239 if acState == connectivity.Shutdown {
243 ac, err := cc.newAddrConn(addrs)
245 grpclog.Warningf("acBalancerWrapper: UpdateAddresses: failed to newAddrConn: %v", err)
250 if acState != connectivity.Idle {
256 func (acbw *acBalancerWrapper) Connect() {
258 defer acbw.mu.Unlock()
262 func (acbw *acBalancerWrapper) getAddrConn() *addrConn {
264 defer acbw.mu.Unlock()