OSDN Git Service

new repo
[bytom/vapor.git] / vendor / google.golang.org / grpc / balancer / roundrobin / roundrobin.go
1 /*
2  *
3  * Copyright 2017 gRPC authors.
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  *     http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  *
17  */
18
19 // Package roundrobin defines a roundrobin balancer. Roundrobin balancer is
20 // installed as one of the default balancers in gRPC, users don't need to
21 // explicitly install this balancer.
22 package roundrobin
23
24 import (
25         "sync"
26
27         "golang.org/x/net/context"
28         "google.golang.org/grpc/balancer"
29         "google.golang.org/grpc/connectivity"
30         "google.golang.org/grpc/grpclog"
31         "google.golang.org/grpc/resolver"
32 )
33
34 // newBuilder creates a new roundrobin balancer builder.
35 func newBuilder() balancer.Builder {
36         return &rrBuilder{}
37 }
38
39 func init() {
40         balancer.Register(newBuilder())
41 }
42
43 type rrBuilder struct{}
44
45 func (*rrBuilder) Build(cc balancer.ClientConn, opt balancer.BuildOptions) balancer.Balancer {
46         return &rrBalancer{
47                 cc:       cc,
48                 subConns: make(map[resolver.Address]balancer.SubConn),
49                 scStates: make(map[balancer.SubConn]connectivity.State),
50                 csEvltr:  &connectivityStateEvaluator{},
51                 // Initialize picker to a picker that always return
52                 // ErrNoSubConnAvailable, because when state of a SubConn changes, we
53                 // may call UpdateBalancerState with this picker.
54                 picker: newPicker([]balancer.SubConn{}, nil),
55         }
56 }
57
58 func (*rrBuilder) Name() string {
59         return "roundrobin"
60 }
61
62 type rrBalancer struct {
63         cc balancer.ClientConn
64
65         csEvltr *connectivityStateEvaluator
66         state   connectivity.State
67
68         subConns map[resolver.Address]balancer.SubConn
69         scStates map[balancer.SubConn]connectivity.State
70         picker   *picker
71 }
72
73 func (b *rrBalancer) HandleResolvedAddrs(addrs []resolver.Address, err error) {
74         if err != nil {
75                 grpclog.Infof("roundrobin.rrBalancer: HandleResolvedAddrs called with error %v", err)
76                 return
77         }
78         grpclog.Infoln("roundrobin.rrBalancer: got new resolved addresses: ", addrs)
79         // addrsSet is the set converted from addrs, it's used for quick lookup of an address.
80         addrsSet := make(map[resolver.Address]struct{})
81         for _, a := range addrs {
82                 addrsSet[a] = struct{}{}
83                 if _, ok := b.subConns[a]; !ok {
84                         // a is a new address (not existing in b.subConns).
85                         sc, err := b.cc.NewSubConn([]resolver.Address{a}, balancer.NewSubConnOptions{})
86                         if err != nil {
87                                 grpclog.Warningf("roundrobin.rrBalancer: failed to create new SubConn: %v", err)
88                                 continue
89                         }
90                         b.subConns[a] = sc
91                         b.scStates[sc] = connectivity.Idle
92                         sc.Connect()
93                 }
94         }
95         for a, sc := range b.subConns {
96                 // a was removed by resolver.
97                 if _, ok := addrsSet[a]; !ok {
98                         b.cc.RemoveSubConn(sc)
99                         delete(b.subConns, a)
100                         // Keep the state of this sc in b.scStates until sc's state becomes Shutdown.
101                         // The entry will be deleted in HandleSubConnStateChange.
102                 }
103         }
104 }
105
106 // regeneratePicker takes a snapshot of the balancer, and generates a picker
107 // from it. The picker
108 //  - always returns ErrTransientFailure if the balancer is in TransientFailure,
109 //  - or does round robin selection of all READY SubConns otherwise.
110 func (b *rrBalancer) regeneratePicker() {
111         if b.state == connectivity.TransientFailure {
112                 b.picker = newPicker(nil, balancer.ErrTransientFailure)
113                 return
114         }
115         var readySCs []balancer.SubConn
116         for sc, st := range b.scStates {
117                 if st == connectivity.Ready {
118                         readySCs = append(readySCs, sc)
119                 }
120         }
121         b.picker = newPicker(readySCs, nil)
122 }
123
124 func (b *rrBalancer) HandleSubConnStateChange(sc balancer.SubConn, s connectivity.State) {
125         grpclog.Infof("roundrobin.rrBalancer: handle SubConn state change: %p, %v", sc, s)
126         oldS, ok := b.scStates[sc]
127         if !ok {
128                 grpclog.Infof("roundrobin.rrBalancer: got state changes for an unknown SubConn: %p, %v", sc, s)
129                 return
130         }
131         b.scStates[sc] = s
132         switch s {
133         case connectivity.Idle:
134                 sc.Connect()
135         case connectivity.Shutdown:
136                 // When an address was removed by resolver, b called RemoveSubConn but
137                 // kept the sc's state in scStates. Remove state for this sc here.
138                 delete(b.scStates, sc)
139         }
140
141         oldAggrState := b.state
142         b.state = b.csEvltr.recordTransition(oldS, s)
143
144         // Regenerate picker when one of the following happens:
145         //  - this sc became ready from not-ready
146         //  - this sc became not-ready from ready
147         //  - the aggregated state of balancer became TransientFailure from non-TransientFailure
148         //  - the aggregated state of balancer became non-TransientFailure from TransientFailure
149         if (s == connectivity.Ready) != (oldS == connectivity.Ready) ||
150                 (b.state == connectivity.TransientFailure) != (oldAggrState == connectivity.TransientFailure) {
151                 b.regeneratePicker()
152         }
153
154         b.cc.UpdateBalancerState(b.state, b.picker)
155         return
156 }
157
158 // Close is a nop because roundrobin balancer doesn't internal state to clean
159 // up, and it doesn't need to call RemoveSubConn for the SubConns.
160 func (b *rrBalancer) Close() {
161 }
162
163 type picker struct {
164         // If err is not nil, Pick always returns this err. It's immutable after
165         // picker is created.
166         err error
167
168         // subConns is the snapshot of the roundrobin balancer when this picker was
169         // created. The slice is immutable. Each Get() will do a round robin
170         // selection from it and return the selected SubConn.
171         subConns []balancer.SubConn
172
173         mu   sync.Mutex
174         next int
175 }
176
177 func newPicker(scs []balancer.SubConn, err error) *picker {
178         grpclog.Infof("roundrobinPicker: newPicker called with scs: %v, %v", scs, err)
179         if err != nil {
180                 return &picker{err: err}
181         }
182         return &picker{
183                 subConns: scs,
184         }
185 }
186
187 func (p *picker) Pick(ctx context.Context, opts balancer.PickOptions) (balancer.SubConn, func(balancer.DoneInfo), error) {
188         if p.err != nil {
189                 return nil, nil, p.err
190         }
191         if len(p.subConns) <= 0 {
192                 return nil, nil, balancer.ErrNoSubConnAvailable
193         }
194
195         p.mu.Lock()
196         sc := p.subConns[p.next]
197         p.next = (p.next + 1) % len(p.subConns)
198         p.mu.Unlock()
199         return sc, nil, nil
200 }
201
202 // connectivityStateEvaluator gets updated by addrConns when their
203 // states transition, based on which it evaluates the state of
204 // ClientConn.
205 type connectivityStateEvaluator struct {
206         numReady            uint64 // Number of addrConns in ready state.
207         numConnecting       uint64 // Number of addrConns in connecting state.
208         numTransientFailure uint64 // Number of addrConns in transientFailure.
209 }
210
211 // recordTransition records state change happening in every subConn and based on
212 // that it evaluates what aggregated state should be.
213 // It can only transition between Ready, Connecting and TransientFailure. Other states,
214 // Idle and Shutdown are transitioned into by ClientConn; in the beginning of the connection
215 // before any subConn is created ClientConn is in idle state. In the end when ClientConn
216 // closes it is in Shutdown state.
217 //
218 // recordTransition should only be called synchronously from the same goroutine.
219 func (cse *connectivityStateEvaluator) recordTransition(oldState, newState connectivity.State) connectivity.State {
220         // Update counters.
221         for idx, state := range []connectivity.State{oldState, newState} {
222                 updateVal := 2*uint64(idx) - 1 // -1 for oldState and +1 for new.
223                 switch state {
224                 case connectivity.Ready:
225                         cse.numReady += updateVal
226                 case connectivity.Connecting:
227                         cse.numConnecting += updateVal
228                 case connectivity.TransientFailure:
229                         cse.numTransientFailure += updateVal
230                 }
231         }
232
233         // Evaluate.
234         if cse.numReady > 0 {
235                 return connectivity.Ready
236         }
237         if cse.numConnecting > 0 {
238                 return connectivity.Connecting
239         }
240         return connectivity.TransientFailure
241 }