OSDN Git Service

new repo
[bytom/vapor.git] / vendor / google.golang.org / grpc / balancer_conn_wrappers.go
1 /*
2  *
3  * Copyright 2017 gRPC authors.
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  *     http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  *
17  */
18
19 package grpc
20
21 import (
22         "sync"
23
24         "google.golang.org/grpc/balancer"
25         "google.golang.org/grpc/connectivity"
26         "google.golang.org/grpc/grpclog"
27         "google.golang.org/grpc/resolver"
28 )
29
30 // scStateUpdate contains the subConn and the new state it changed to.
31 type scStateUpdate struct {
32         sc    balancer.SubConn
33         state connectivity.State
34 }
35
36 // scStateUpdateBuffer is an unbounded channel for scStateChangeTuple.
37 // TODO make a general purpose buffer that uses interface{}.
38 type scStateUpdateBuffer struct {
39         c       chan *scStateUpdate
40         mu      sync.Mutex
41         backlog []*scStateUpdate
42 }
43
44 func newSCStateUpdateBuffer() *scStateUpdateBuffer {
45         return &scStateUpdateBuffer{
46                 c: make(chan *scStateUpdate, 1),
47         }
48 }
49
50 func (b *scStateUpdateBuffer) put(t *scStateUpdate) {
51         b.mu.Lock()
52         defer b.mu.Unlock()
53         if len(b.backlog) == 0 {
54                 select {
55                 case b.c <- t:
56                         return
57                 default:
58                 }
59         }
60         b.backlog = append(b.backlog, t)
61 }
62
63 func (b *scStateUpdateBuffer) load() {
64         b.mu.Lock()
65         defer b.mu.Unlock()
66         if len(b.backlog) > 0 {
67                 select {
68                 case b.c <- b.backlog[0]:
69                         b.backlog[0] = nil
70                         b.backlog = b.backlog[1:]
71                 default:
72                 }
73         }
74 }
75
76 // get returns the channel that the scStateUpdate will be sent to.
77 //
78 // Upon receiving, the caller should call load to send another
79 // scStateChangeTuple onto the channel if there is any.
80 func (b *scStateUpdateBuffer) get() <-chan *scStateUpdate {
81         return b.c
82 }
83
84 // resolverUpdate contains the new resolved addresses or error if there's
85 // any.
86 type resolverUpdate struct {
87         addrs []resolver.Address
88         err   error
89 }
90
91 // ccBalancerWrapper is a wrapper on top of cc for balancers.
92 // It implements balancer.ClientConn interface.
93 type ccBalancerWrapper struct {
94         cc               *ClientConn
95         balancer         balancer.Balancer
96         stateChangeQueue *scStateUpdateBuffer
97         resolverUpdateCh chan *resolverUpdate
98         done             chan struct{}
99
100         subConns map[*acBalancerWrapper]struct{}
101 }
102
103 func newCCBalancerWrapper(cc *ClientConn, b balancer.Builder, bopts balancer.BuildOptions) *ccBalancerWrapper {
104         ccb := &ccBalancerWrapper{
105                 cc:               cc,
106                 stateChangeQueue: newSCStateUpdateBuffer(),
107                 resolverUpdateCh: make(chan *resolverUpdate, 1),
108                 done:             make(chan struct{}),
109                 subConns:         make(map[*acBalancerWrapper]struct{}),
110         }
111         go ccb.watcher()
112         ccb.balancer = b.Build(ccb, bopts)
113         return ccb
114 }
115
116 // watcher balancer functions sequencially, so the balancer can be implemeneted
117 // lock-free.
118 func (ccb *ccBalancerWrapper) watcher() {
119         for {
120                 select {
121                 case t := <-ccb.stateChangeQueue.get():
122                         ccb.stateChangeQueue.load()
123                         select {
124                         case <-ccb.done:
125                                 ccb.balancer.Close()
126                                 return
127                         default:
128                         }
129                         ccb.balancer.HandleSubConnStateChange(t.sc, t.state)
130                 case t := <-ccb.resolverUpdateCh:
131                         select {
132                         case <-ccb.done:
133                                 ccb.balancer.Close()
134                                 return
135                         default:
136                         }
137                         ccb.balancer.HandleResolvedAddrs(t.addrs, t.err)
138                 case <-ccb.done:
139                 }
140
141                 select {
142                 case <-ccb.done:
143                         ccb.balancer.Close()
144                         for acbw := range ccb.subConns {
145                                 ccb.cc.removeAddrConn(acbw.getAddrConn(), errConnDrain)
146                         }
147                         return
148                 default:
149                 }
150         }
151 }
152
153 func (ccb *ccBalancerWrapper) close() {
154         close(ccb.done)
155 }
156
157 func (ccb *ccBalancerWrapper) handleSubConnStateChange(sc balancer.SubConn, s connectivity.State) {
158         // When updating addresses for a SubConn, if the address in use is not in
159         // the new addresses, the old ac will be tearDown() and a new ac will be
160         // created. tearDown() generates a state change with Shutdown state, we
161         // don't want the balancer to receive this state change. So before
162         // tearDown() on the old ac, ac.acbw (acWrapper) will be set to nil, and
163         // this function will be called with (nil, Shutdown). We don't need to call
164         // balancer method in this case.
165         if sc == nil {
166                 return
167         }
168         ccb.stateChangeQueue.put(&scStateUpdate{
169                 sc:    sc,
170                 state: s,
171         })
172 }
173
174 func (ccb *ccBalancerWrapper) handleResolvedAddrs(addrs []resolver.Address, err error) {
175         select {
176         case <-ccb.resolverUpdateCh:
177         default:
178         }
179         ccb.resolverUpdateCh <- &resolverUpdate{
180                 addrs: addrs,
181                 err:   err,
182         }
183 }
184
185 func (ccb *ccBalancerWrapper) NewSubConn(addrs []resolver.Address, opts balancer.NewSubConnOptions) (balancer.SubConn, error) {
186         ac, err := ccb.cc.newAddrConn(addrs)
187         if err != nil {
188                 return nil, err
189         }
190         acbw := &acBalancerWrapper{ac: ac}
191         acbw.ac.mu.Lock()
192         ac.acbw = acbw
193         acbw.ac.mu.Unlock()
194         ccb.subConns[acbw] = struct{}{}
195         return acbw, nil
196 }
197
198 func (ccb *ccBalancerWrapper) RemoveSubConn(sc balancer.SubConn) {
199         acbw, ok := sc.(*acBalancerWrapper)
200         if !ok {
201                 return
202         }
203         delete(ccb.subConns, acbw)
204         ccb.cc.removeAddrConn(acbw.getAddrConn(), errConnDrain)
205 }
206
207 func (ccb *ccBalancerWrapper) UpdateBalancerState(s connectivity.State, p balancer.Picker) {
208         ccb.cc.csMgr.updateState(s)
209         ccb.cc.blockingpicker.updatePicker(p)
210 }
211
212 func (ccb *ccBalancerWrapper) Target() string {
213         return ccb.cc.target
214 }
215
216 // acBalancerWrapper is a wrapper on top of ac for balancers.
217 // It implements balancer.SubConn interface.
218 type acBalancerWrapper struct {
219         mu sync.Mutex
220         ac *addrConn
221 }
222
223 func (acbw *acBalancerWrapper) UpdateAddresses(addrs []resolver.Address) {
224         acbw.mu.Lock()
225         defer acbw.mu.Unlock()
226         if !acbw.ac.tryUpdateAddrs(addrs) {
227                 cc := acbw.ac.cc
228                 acbw.ac.mu.Lock()
229                 // Set old ac.acbw to nil so the Shutdown state update will be ignored
230                 // by balancer.
231                 //
232                 // TODO(bar) the state transition could be wrong when tearDown() old ac
233                 // and creating new ac, fix the transition.
234                 acbw.ac.acbw = nil
235                 acbw.ac.mu.Unlock()
236                 acState := acbw.ac.getState()
237                 acbw.ac.tearDown(errConnDrain)
238
239                 if acState == connectivity.Shutdown {
240                         return
241                 }
242
243                 ac, err := cc.newAddrConn(addrs)
244                 if err != nil {
245                         grpclog.Warningf("acBalancerWrapper: UpdateAddresses: failed to newAddrConn: %v", err)
246                         return
247                 }
248                 acbw.ac = ac
249                 ac.acbw = acbw
250                 if acState != connectivity.Idle {
251                         ac.connect()
252                 }
253         }
254 }
255
256 func (acbw *acBalancerWrapper) Connect() {
257         acbw.mu.Lock()
258         defer acbw.mu.Unlock()
259         acbw.ac.connect()
260 }
261
262 func (acbw *acBalancerWrapper) getAddrConn() *addrConn {
263         acbw.mu.Lock()
264         defer acbw.mu.Unlock()
265         return acbw.ac
266 }