OSDN Git Service

new repo
[bytom/vapor.git] / vendor / google.golang.org / grpc / picker_wrapper.go
1 /*
2  *
3  * Copyright 2017 gRPC authors.
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  *     http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  *
17  */
18
19 package grpc
20
21 import (
22         "sync"
23
24         "golang.org/x/net/context"
25         "google.golang.org/grpc/balancer"
26         "google.golang.org/grpc/codes"
27         "google.golang.org/grpc/grpclog"
28         "google.golang.org/grpc/status"
29         "google.golang.org/grpc/transport"
30 )
31
32 // pickerWrapper is a wrapper of balancer.Picker. It blocks on certain pick
33 // actions and unblock when there's a picker update.
34 type pickerWrapper struct {
35         mu         sync.Mutex
36         done       bool
37         blockingCh chan struct{}
38         picker     balancer.Picker
39 }
40
41 func newPickerWrapper() *pickerWrapper {
42         bp := &pickerWrapper{blockingCh: make(chan struct{})}
43         return bp
44 }
45
46 // updatePicker is called by UpdateBalancerState. It unblocks all blocked pick.
47 func (bp *pickerWrapper) updatePicker(p balancer.Picker) {
48         bp.mu.Lock()
49         if bp.done {
50                 bp.mu.Unlock()
51                 return
52         }
53         bp.picker = p
54         // bp.blockingCh should never be nil.
55         close(bp.blockingCh)
56         bp.blockingCh = make(chan struct{})
57         bp.mu.Unlock()
58 }
59
60 // pick returns the transport that will be used for the RPC.
61 // It may block in the following cases:
62 // - there's no picker
63 // - the current picker returns ErrNoSubConnAvailable
64 // - the current picker returns other errors and failfast is false.
65 // - the subConn returned by the current picker is not READY
66 // When one of these situations happens, pick blocks until the picker gets updated.
67 func (bp *pickerWrapper) pick(ctx context.Context, failfast bool, opts balancer.PickOptions) (transport.ClientTransport, func(balancer.DoneInfo), error) {
68         var (
69                 p  balancer.Picker
70                 ch chan struct{}
71         )
72
73         for {
74                 bp.mu.Lock()
75                 if bp.done {
76                         bp.mu.Unlock()
77                         return nil, nil, ErrClientConnClosing
78                 }
79
80                 if bp.picker == nil {
81                         ch = bp.blockingCh
82                 }
83                 if ch == bp.blockingCh {
84                         // This could happen when either:
85                         // - bp.picker is nil (the previous if condition), or
86                         // - has called pick on the current picker.
87                         bp.mu.Unlock()
88                         select {
89                         case <-ctx.Done():
90                                 return nil, nil, ctx.Err()
91                         case <-ch:
92                         }
93                         continue
94                 }
95
96                 ch = bp.blockingCh
97                 p = bp.picker
98                 bp.mu.Unlock()
99
100                 subConn, put, err := p.Pick(ctx, opts)
101
102                 if err != nil {
103                         switch err {
104                         case balancer.ErrNoSubConnAvailable:
105                                 continue
106                         case balancer.ErrTransientFailure:
107                                 if !failfast {
108                                         continue
109                                 }
110                                 return nil, nil, status.Errorf(codes.Unavailable, "%v", err)
111                         default:
112                                 // err is some other error.
113                                 return nil, nil, toRPCErr(err)
114                         }
115                 }
116
117                 acw, ok := subConn.(*acBalancerWrapper)
118                 if !ok {
119                         grpclog.Infof("subconn returned from pick is not *acBalancerWrapper")
120                         continue
121                 }
122                 if t, ok := acw.getAddrConn().getReadyTransport(); ok {
123                         return t, put, nil
124                 }
125                 grpclog.Infof("blockingPicker: the picked transport is not ready, loop back to repick")
126                 // If ok == false, ac.state is not READY.
127                 // A valid picker always returns READY subConn. This means the state of ac
128                 // just changed, and picker will be updated shortly.
129                 // continue back to the beginning of the for loop to repick.
130         }
131 }
132
133 func (bp *pickerWrapper) close() {
134         bp.mu.Lock()
135         defer bp.mu.Unlock()
136         if bp.done {
137                 return
138         }
139         bp.done = true
140         close(bp.blockingCh)
141 }