OSDN Git Service

new repo
[bytom/vapor.git] / vendor / google.golang.org / grpc / balancer_v1_wrapper.go
1 /*
2  *
3  * Copyright 2017 gRPC authors.
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  *     http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  *
17  */
18
19 package grpc
20
21 import (
22         "strings"
23         "sync"
24
25         "golang.org/x/net/context"
26         "google.golang.org/grpc/balancer"
27         "google.golang.org/grpc/codes"
28         "google.golang.org/grpc/connectivity"
29         "google.golang.org/grpc/grpclog"
30         "google.golang.org/grpc/resolver"
31 )
32
33 type balancerWrapperBuilder struct {
34         b Balancer // The v1 balancer.
35 }
36
37 func (bwb *balancerWrapperBuilder) Build(cc balancer.ClientConn, opts balancer.BuildOptions) balancer.Balancer {
38         targetAddr := cc.Target()
39         targetSplitted := strings.Split(targetAddr, ":///")
40         if len(targetSplitted) >= 2 {
41                 targetAddr = targetSplitted[1]
42         }
43
44         bwb.b.Start(targetAddr, BalancerConfig{
45                 DialCreds: opts.DialCreds,
46                 Dialer:    opts.Dialer,
47         })
48         _, pickfirst := bwb.b.(*pickFirst)
49         bw := &balancerWrapper{
50                 balancer:   bwb.b,
51                 pickfirst:  pickfirst,
52                 cc:         cc,
53                 targetAddr: targetAddr,
54                 startCh:    make(chan struct{}),
55                 conns:      make(map[resolver.Address]balancer.SubConn),
56                 connSt:     make(map[balancer.SubConn]*scState),
57                 csEvltr:    &connectivityStateEvaluator{},
58                 state:      connectivity.Idle,
59         }
60         cc.UpdateBalancerState(connectivity.Idle, bw)
61         go bw.lbWatcher()
62         return bw
63 }
64
65 func (bwb *balancerWrapperBuilder) Name() string {
66         return "wrapper"
67 }
68
69 type scState struct {
70         addr Address // The v1 address type.
71         s    connectivity.State
72         down func(error)
73 }
74
75 type balancerWrapper struct {
76         balancer  Balancer // The v1 balancer.
77         pickfirst bool
78
79         cc         balancer.ClientConn
80         targetAddr string // Target without the scheme.
81
82         // To aggregate the connectivity state.
83         csEvltr *connectivityStateEvaluator
84         state   connectivity.State
85
86         mu     sync.Mutex
87         conns  map[resolver.Address]balancer.SubConn
88         connSt map[balancer.SubConn]*scState
89         // This channel is closed when handling the first resolver result.
90         // lbWatcher blocks until this is closed, to avoid race between
91         // - NewSubConn is created, cc wants to notify balancer of state changes;
92         // - Build hasn't return, cc doesn't have access to balancer.
93         startCh chan struct{}
94 }
95
96 // lbWatcher watches the Notify channel of the balancer and manages
97 // connections accordingly.
98 func (bw *balancerWrapper) lbWatcher() {
99         <-bw.startCh
100         notifyCh := bw.balancer.Notify()
101         if notifyCh == nil {
102                 // There's no resolver in the balancer. Connect directly.
103                 a := resolver.Address{
104                         Addr: bw.targetAddr,
105                         Type: resolver.Backend,
106                 }
107                 sc, err := bw.cc.NewSubConn([]resolver.Address{a}, balancer.NewSubConnOptions{})
108                 if err != nil {
109                         grpclog.Warningf("Error creating connection to %v. Err: %v", a, err)
110                 } else {
111                         bw.mu.Lock()
112                         bw.conns[a] = sc
113                         bw.connSt[sc] = &scState{
114                                 addr: Address{Addr: bw.targetAddr},
115                                 s:    connectivity.Idle,
116                         }
117                         bw.mu.Unlock()
118                         sc.Connect()
119                 }
120                 return
121         }
122
123         for addrs := range notifyCh {
124                 grpclog.Infof("balancerWrapper: got update addr from Notify: %v\n", addrs)
125                 if bw.pickfirst {
126                         var (
127                                 oldA  resolver.Address
128                                 oldSC balancer.SubConn
129                         )
130                         bw.mu.Lock()
131                         for oldA, oldSC = range bw.conns {
132                                 break
133                         }
134                         bw.mu.Unlock()
135                         if len(addrs) <= 0 {
136                                 if oldSC != nil {
137                                         // Teardown old sc.
138                                         bw.mu.Lock()
139                                         delete(bw.conns, oldA)
140                                         delete(bw.connSt, oldSC)
141                                         bw.mu.Unlock()
142                                         bw.cc.RemoveSubConn(oldSC)
143                                 }
144                                 continue
145                         }
146
147                         var newAddrs []resolver.Address
148                         for _, a := range addrs {
149                                 newAddr := resolver.Address{
150                                         Addr:       a.Addr,
151                                         Type:       resolver.Backend, // All addresses from balancer are all backends.
152                                         ServerName: "",
153                                         Metadata:   a.Metadata,
154                                 }
155                                 newAddrs = append(newAddrs, newAddr)
156                         }
157                         if oldSC == nil {
158                                 // Create new sc.
159                                 sc, err := bw.cc.NewSubConn(newAddrs, balancer.NewSubConnOptions{})
160                                 if err != nil {
161                                         grpclog.Warningf("Error creating connection to %v. Err: %v", newAddrs, err)
162                                 } else {
163                                         bw.mu.Lock()
164                                         // For pickfirst, there should be only one SubConn, so the
165                                         // address doesn't matter. All states updating (up and down)
166                                         // and picking should all happen on that only SubConn.
167                                         bw.conns[resolver.Address{}] = sc
168                                         bw.connSt[sc] = &scState{
169                                                 addr: addrs[0], // Use the first address.
170                                                 s:    connectivity.Idle,
171                                         }
172                                         bw.mu.Unlock()
173                                         sc.Connect()
174                                 }
175                         } else {
176                                 oldSC.UpdateAddresses(newAddrs)
177                                 bw.mu.Lock()
178                                 bw.connSt[oldSC].addr = addrs[0]
179                                 bw.mu.Unlock()
180                         }
181                 } else {
182                         var (
183                                 add []resolver.Address // Addresses need to setup connections.
184                                 del []balancer.SubConn // Connections need to tear down.
185                         )
186                         resAddrs := make(map[resolver.Address]Address)
187                         for _, a := range addrs {
188                                 resAddrs[resolver.Address{
189                                         Addr:       a.Addr,
190                                         Type:       resolver.Backend, // All addresses from balancer are all backends.
191                                         ServerName: "",
192                                         Metadata:   a.Metadata,
193                                 }] = a
194                         }
195                         bw.mu.Lock()
196                         for a := range resAddrs {
197                                 if _, ok := bw.conns[a]; !ok {
198                                         add = append(add, a)
199                                 }
200                         }
201                         for a, c := range bw.conns {
202                                 if _, ok := resAddrs[a]; !ok {
203                                         del = append(del, c)
204                                         delete(bw.conns, a)
205                                         // Keep the state of this sc in bw.connSt until its state becomes Shutdown.
206                                 }
207                         }
208                         bw.mu.Unlock()
209                         for _, a := range add {
210                                 sc, err := bw.cc.NewSubConn([]resolver.Address{a}, balancer.NewSubConnOptions{})
211                                 if err != nil {
212                                         grpclog.Warningf("Error creating connection to %v. Err: %v", a, err)
213                                 } else {
214                                         bw.mu.Lock()
215                                         bw.conns[a] = sc
216                                         bw.connSt[sc] = &scState{
217                                                 addr: resAddrs[a],
218                                                 s:    connectivity.Idle,
219                                         }
220                                         bw.mu.Unlock()
221                                         sc.Connect()
222                                 }
223                         }
224                         for _, c := range del {
225                                 bw.cc.RemoveSubConn(c)
226                         }
227                 }
228         }
229 }
230
231 func (bw *balancerWrapper) HandleSubConnStateChange(sc balancer.SubConn, s connectivity.State) {
232         bw.mu.Lock()
233         defer bw.mu.Unlock()
234         scSt, ok := bw.connSt[sc]
235         if !ok {
236                 return
237         }
238         if s == connectivity.Idle {
239                 sc.Connect()
240         }
241         oldS := scSt.s
242         scSt.s = s
243         if oldS != connectivity.Ready && s == connectivity.Ready {
244                 scSt.down = bw.balancer.Up(scSt.addr)
245         } else if oldS == connectivity.Ready && s != connectivity.Ready {
246                 if scSt.down != nil {
247                         scSt.down(errConnClosing)
248                 }
249         }
250         sa := bw.csEvltr.recordTransition(oldS, s)
251         if bw.state != sa {
252                 bw.state = sa
253         }
254         bw.cc.UpdateBalancerState(bw.state, bw)
255         if s == connectivity.Shutdown {
256                 // Remove state for this sc.
257                 delete(bw.connSt, sc)
258         }
259         return
260 }
261
262 func (bw *balancerWrapper) HandleResolvedAddrs([]resolver.Address, error) {
263         bw.mu.Lock()
264         defer bw.mu.Unlock()
265         select {
266         case <-bw.startCh:
267         default:
268                 close(bw.startCh)
269         }
270         // There should be a resolver inside the balancer.
271         // All updates here, if any, are ignored.
272         return
273 }
274
275 func (bw *balancerWrapper) Close() {
276         bw.mu.Lock()
277         defer bw.mu.Unlock()
278         select {
279         case <-bw.startCh:
280         default:
281                 close(bw.startCh)
282         }
283         bw.balancer.Close()
284         return
285 }
286
287 // The picker is the balancerWrapper itself.
288 // Pick should never return ErrNoSubConnAvailable.
289 // It either blocks or returns error, consistent with v1 balancer Get().
290 func (bw *balancerWrapper) Pick(ctx context.Context, opts balancer.PickOptions) (balancer.SubConn, func(balancer.DoneInfo), error) {
291         failfast := true // Default failfast is true.
292         if ss, ok := rpcInfoFromContext(ctx); ok {
293                 failfast = ss.failfast
294         }
295         a, p, err := bw.balancer.Get(ctx, BalancerGetOptions{BlockingWait: !failfast})
296         if err != nil {
297                 return nil, nil, err
298         }
299         var done func(balancer.DoneInfo)
300         if p != nil {
301                 done = func(i balancer.DoneInfo) { p() }
302         }
303         var sc balancer.SubConn
304         bw.mu.Lock()
305         defer bw.mu.Unlock()
306         if bw.pickfirst {
307                 // Get the first sc in conns.
308                 for _, sc = range bw.conns {
309                         break
310                 }
311         } else {
312                 var ok bool
313                 sc, ok = bw.conns[resolver.Address{
314                         Addr:       a.Addr,
315                         Type:       resolver.Backend,
316                         ServerName: "",
317                         Metadata:   a.Metadata,
318                 }]
319                 if !ok && failfast {
320                         return nil, nil, Errorf(codes.Unavailable, "there is no connection available")
321                 }
322                 if s, ok := bw.connSt[sc]; failfast && (!ok || s.s != connectivity.Ready) {
323                         // If the returned sc is not ready and RPC is failfast,
324                         // return error, and this RPC will fail.
325                         return nil, nil, Errorf(codes.Unavailable, "there is no connection available")
326                 }
327         }
328
329         return sc, done, nil
330 }
331
332 // connectivityStateEvaluator gets updated by addrConns when their
333 // states transition, based on which it evaluates the state of
334 // ClientConn.
335 type connectivityStateEvaluator struct {
336         mu                  sync.Mutex
337         numReady            uint64 // Number of addrConns in ready state.
338         numConnecting       uint64 // Number of addrConns in connecting state.
339         numTransientFailure uint64 // Number of addrConns in transientFailure.
340 }
341
342 // recordTransition records state change happening in every subConn and based on
343 // that it evaluates what aggregated state should be.
344 // It can only transition between Ready, Connecting and TransientFailure. Other states,
345 // Idle and Shutdown are transitioned into by ClientConn; in the beginning of the connection
346 // before any subConn is created ClientConn is in idle state. In the end when ClientConn
347 // closes it is in Shutdown state.
348 // TODO Note that in later releases, a ClientConn with no activity will be put into an Idle state.
349 func (cse *connectivityStateEvaluator) recordTransition(oldState, newState connectivity.State) connectivity.State {
350         cse.mu.Lock()
351         defer cse.mu.Unlock()
352
353         // Update counters.
354         for idx, state := range []connectivity.State{oldState, newState} {
355                 updateVal := 2*uint64(idx) - 1 // -1 for oldState and +1 for new.
356                 switch state {
357                 case connectivity.Ready:
358                         cse.numReady += updateVal
359                 case connectivity.Connecting:
360                         cse.numConnecting += updateVal
361                 case connectivity.TransientFailure:
362                         cse.numTransientFailure += updateVal
363                 }
364         }
365
366         // Evaluate.
367         if cse.numReady > 0 {
368                 return connectivity.Ready
369         }
370         if cse.numConnecting > 0 {
371                 return connectivity.Connecting
372         }
373         return connectivity.TransientFailure
374 }