OSDN Git Service

new repo
[bytom/vapor.git] / vendor / google.golang.org / grpc / clientconn.go
1 /*
2  *
3  * Copyright 2014 gRPC authors.
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  *     http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  *
17  */
18
19 package grpc
20
21 import (
22         "errors"
23         "fmt"
24         "math"
25         "net"
26         "reflect"
27         "strings"
28         "sync"
29         "time"
30
31         "golang.org/x/net/context"
32         "golang.org/x/net/trace"
33         "google.golang.org/grpc/balancer"
34         _ "google.golang.org/grpc/balancer/roundrobin" // To register roundrobin.
35         "google.golang.org/grpc/connectivity"
36         "google.golang.org/grpc/credentials"
37         "google.golang.org/grpc/grpclog"
38         "google.golang.org/grpc/keepalive"
39         "google.golang.org/grpc/resolver"
40         _ "google.golang.org/grpc/resolver/dns"         // To register dns resolver.
41         _ "google.golang.org/grpc/resolver/passthrough" // To register passthrough resolver.
42         "google.golang.org/grpc/stats"
43         "google.golang.org/grpc/transport"
44 )
45
46 var (
47         // ErrClientConnClosing indicates that the operation is illegal because
48         // the ClientConn is closing.
49         ErrClientConnClosing = errors.New("grpc: the client connection is closing")
50         // ErrClientConnTimeout indicates that the ClientConn cannot establish the
51         // underlying connections within the specified timeout.
52         // DEPRECATED: Please use context.DeadlineExceeded instead.
53         ErrClientConnTimeout = errors.New("grpc: timed out when dialing")
54
55         // errNoTransportSecurity indicates that there is no transport security
56         // being set for ClientConn. Users should either set one or explicitly
57         // call WithInsecure DialOption to disable security.
58         errNoTransportSecurity = errors.New("grpc: no transport security set (use grpc.WithInsecure() explicitly or set credentials)")
59         // errTransportCredentialsMissing indicates that users want to transmit security
60         // information (e.g., oauth2 token) which requires secure connection on an insecure
61         // connection.
62         errTransportCredentialsMissing = errors.New("grpc: the credentials require transport level security (use grpc.WithTransportCredentials() to set)")
63         // errCredentialsConflict indicates that grpc.WithTransportCredentials()
64         // and grpc.WithInsecure() are both called for a connection.
65         errCredentialsConflict = errors.New("grpc: transport credentials are set for an insecure connection (grpc.WithTransportCredentials() and grpc.WithInsecure() are both called)")
66         // errNetworkIO indicates that the connection is down due to some network I/O error.
67         errNetworkIO = errors.New("grpc: failed with network I/O error")
68         // errConnDrain indicates that the connection starts to be drained and does not accept any new RPCs.
69         errConnDrain = errors.New("grpc: the connection is drained")
70         // errConnClosing indicates that the connection is closing.
71         errConnClosing = errors.New("grpc: the connection is closing")
72         // errConnUnavailable indicates that the connection is unavailable.
73         errConnUnavailable = errors.New("grpc: the connection is unavailable")
74         // errBalancerClosed indicates that the balancer is closed.
75         errBalancerClosed = errors.New("grpc: balancer is closed")
76         // minimum time to give a connection to complete
77         minConnectTimeout = 20 * time.Second
78 )
79
80 // dialOptions configure a Dial call. dialOptions are set by the DialOption
81 // values passed to Dial.
82 type dialOptions struct {
83         unaryInt    UnaryClientInterceptor
84         streamInt   StreamClientInterceptor
85         codec       Codec
86         cp          Compressor
87         dc          Decompressor
88         bs          backoffStrategy
89         block       bool
90         insecure    bool
91         timeout     time.Duration
92         scChan      <-chan ServiceConfig
93         copts       transport.ConnectOptions
94         callOptions []CallOption
95         // This is to support v1 balancer.
96         balancerBuilder balancer.Builder
97 }
98
99 const (
100         defaultClientMaxReceiveMessageSize = 1024 * 1024 * 4
101         defaultClientMaxSendMessageSize    = math.MaxInt32
102 )
103
104 // DialOption configures how we set up the connection.
105 type DialOption func(*dialOptions)
106
107 // WithWriteBufferSize lets you set the size of write buffer, this determines how much data can be batched
108 // before doing a write on the wire.
109 func WithWriteBufferSize(s int) DialOption {
110         return func(o *dialOptions) {
111                 o.copts.WriteBufferSize = s
112         }
113 }
114
115 // WithReadBufferSize lets you set the size of read buffer, this determines how much data can be read at most
116 // for each read syscall.
117 func WithReadBufferSize(s int) DialOption {
118         return func(o *dialOptions) {
119                 o.copts.ReadBufferSize = s
120         }
121 }
122
123 // WithInitialWindowSize returns a DialOption which sets the value for initial window size on a stream.
124 // The lower bound for window size is 64K and any value smaller than that will be ignored.
125 func WithInitialWindowSize(s int32) DialOption {
126         return func(o *dialOptions) {
127                 o.copts.InitialWindowSize = s
128         }
129 }
130
131 // WithInitialConnWindowSize returns a DialOption which sets the value for initial window size on a connection.
132 // The lower bound for window size is 64K and any value smaller than that will be ignored.
133 func WithInitialConnWindowSize(s int32) DialOption {
134         return func(o *dialOptions) {
135                 o.copts.InitialConnWindowSize = s
136         }
137 }
138
139 // WithMaxMsgSize returns a DialOption which sets the maximum message size the client can receive. Deprecated: use WithDefaultCallOptions(MaxCallRecvMsgSize(s)) instead.
140 func WithMaxMsgSize(s int) DialOption {
141         return WithDefaultCallOptions(MaxCallRecvMsgSize(s))
142 }
143
144 // WithDefaultCallOptions returns a DialOption which sets the default CallOptions for calls over the connection.
145 func WithDefaultCallOptions(cos ...CallOption) DialOption {
146         return func(o *dialOptions) {
147                 o.callOptions = append(o.callOptions, cos...)
148         }
149 }
150
151 // WithCodec returns a DialOption which sets a codec for message marshaling and unmarshaling.
152 func WithCodec(c Codec) DialOption {
153         return func(o *dialOptions) {
154                 o.codec = c
155         }
156 }
157
158 // WithCompressor returns a DialOption which sets a CompressorGenerator for generating message
159 // compressor.
160 func WithCompressor(cp Compressor) DialOption {
161         return func(o *dialOptions) {
162                 o.cp = cp
163         }
164 }
165
166 // WithDecompressor returns a DialOption which sets a DecompressorGenerator for generating
167 // message decompressor.
168 func WithDecompressor(dc Decompressor) DialOption {
169         return func(o *dialOptions) {
170                 o.dc = dc
171         }
172 }
173
174 // WithBalancer returns a DialOption which sets a load balancer with the v1 API.
175 // Name resolver will be ignored if this DialOption is specified.
176 // Deprecated: use the new balancer APIs in balancer package instead.
177 func WithBalancer(b Balancer) DialOption {
178         return func(o *dialOptions) {
179                 o.balancerBuilder = &balancerWrapperBuilder{
180                         b: b,
181                 }
182         }
183 }
184
185 // WithBalancerBuilder is for testing only. Users using custom balancers should
186 // register their balancer and use service config to choose the balancer to use.
187 func WithBalancerBuilder(b balancer.Builder) DialOption {
188         // TODO(bar) remove this when switching balancer is done.
189         return func(o *dialOptions) {
190                 o.balancerBuilder = b
191         }
192 }
193
194 // WithServiceConfig returns a DialOption which has a channel to read the service configuration.
195 // DEPRECATED: service config should be received through name resolver, as specified here.
196 // https://github.com/grpc/grpc/blob/master/doc/service_config.md
197 func WithServiceConfig(c <-chan ServiceConfig) DialOption {
198         return func(o *dialOptions) {
199                 o.scChan = c
200         }
201 }
202
203 // WithBackoffMaxDelay configures the dialer to use the provided maximum delay
204 // when backing off after failed connection attempts.
205 func WithBackoffMaxDelay(md time.Duration) DialOption {
206         return WithBackoffConfig(BackoffConfig{MaxDelay: md})
207 }
208
209 // WithBackoffConfig configures the dialer to use the provided backoff
210 // parameters after connection failures.
211 //
212 // Use WithBackoffMaxDelay until more parameters on BackoffConfig are opened up
213 // for use.
214 func WithBackoffConfig(b BackoffConfig) DialOption {
215         // Set defaults to ensure that provided BackoffConfig is valid and
216         // unexported fields get default values.
217         setDefaults(&b)
218         return withBackoff(b)
219 }
220
221 // withBackoff sets the backoff strategy used for retries after a
222 // failed connection attempt.
223 //
224 // This can be exported if arbitrary backoff strategies are allowed by gRPC.
225 func withBackoff(bs backoffStrategy) DialOption {
226         return func(o *dialOptions) {
227                 o.bs = bs
228         }
229 }
230
231 // WithBlock returns a DialOption which makes caller of Dial blocks until the underlying
232 // connection is up. Without this, Dial returns immediately and connecting the server
233 // happens in background.
234 func WithBlock() DialOption {
235         return func(o *dialOptions) {
236                 o.block = true
237         }
238 }
239
240 // WithInsecure returns a DialOption which disables transport security for this ClientConn.
241 // Note that transport security is required unless WithInsecure is set.
242 func WithInsecure() DialOption {
243         return func(o *dialOptions) {
244                 o.insecure = true
245         }
246 }
247
248 // WithTransportCredentials returns a DialOption which configures a
249 // connection level security credentials (e.g., TLS/SSL).
250 func WithTransportCredentials(creds credentials.TransportCredentials) DialOption {
251         return func(o *dialOptions) {
252                 o.copts.TransportCredentials = creds
253         }
254 }
255
256 // WithPerRPCCredentials returns a DialOption which sets
257 // credentials and places auth state on each outbound RPC.
258 func WithPerRPCCredentials(creds credentials.PerRPCCredentials) DialOption {
259         return func(o *dialOptions) {
260                 o.copts.PerRPCCredentials = append(o.copts.PerRPCCredentials, creds)
261         }
262 }
263
264 // WithTimeout returns a DialOption that configures a timeout for dialing a ClientConn
265 // initially. This is valid if and only if WithBlock() is present.
266 // Deprecated: use DialContext and context.WithTimeout instead.
267 func WithTimeout(d time.Duration) DialOption {
268         return func(o *dialOptions) {
269                 o.timeout = d
270         }
271 }
272
273 // WithDialer returns a DialOption that specifies a function to use for dialing network addresses.
274 // If FailOnNonTempDialError() is set to true, and an error is returned by f, gRPC checks the error's
275 // Temporary() method to decide if it should try to reconnect to the network address.
276 func WithDialer(f func(string, time.Duration) (net.Conn, error)) DialOption {
277         return func(o *dialOptions) {
278                 o.copts.Dialer = func(ctx context.Context, addr string) (net.Conn, error) {
279                         if deadline, ok := ctx.Deadline(); ok {
280                                 return f(addr, deadline.Sub(time.Now()))
281                         }
282                         return f(addr, 0)
283                 }
284         }
285 }
286
287 // WithStatsHandler returns a DialOption that specifies the stats handler
288 // for all the RPCs and underlying network connections in this ClientConn.
289 func WithStatsHandler(h stats.Handler) DialOption {
290         return func(o *dialOptions) {
291                 o.copts.StatsHandler = h
292         }
293 }
294
295 // FailOnNonTempDialError returns a DialOption that specifies if gRPC fails on non-temporary dial errors.
296 // If f is true, and dialer returns a non-temporary error, gRPC will fail the connection to the network
297 // address and won't try to reconnect.
298 // The default value of FailOnNonTempDialError is false.
299 // This is an EXPERIMENTAL API.
300 func FailOnNonTempDialError(f bool) DialOption {
301         return func(o *dialOptions) {
302                 o.copts.FailOnNonTempDialError = f
303         }
304 }
305
306 // WithUserAgent returns a DialOption that specifies a user agent string for all the RPCs.
307 func WithUserAgent(s string) DialOption {
308         return func(o *dialOptions) {
309                 o.copts.UserAgent = s
310         }
311 }
312
313 // WithKeepaliveParams returns a DialOption that specifies keepalive parameters for the client transport.
314 func WithKeepaliveParams(kp keepalive.ClientParameters) DialOption {
315         return func(o *dialOptions) {
316                 o.copts.KeepaliveParams = kp
317         }
318 }
319
320 // WithUnaryInterceptor returns a DialOption that specifies the interceptor for unary RPCs.
321 func WithUnaryInterceptor(f UnaryClientInterceptor) DialOption {
322         return func(o *dialOptions) {
323                 o.unaryInt = f
324         }
325 }
326
327 // WithStreamInterceptor returns a DialOption that specifies the interceptor for streaming RPCs.
328 func WithStreamInterceptor(f StreamClientInterceptor) DialOption {
329         return func(o *dialOptions) {
330                 o.streamInt = f
331         }
332 }
333
334 // WithAuthority returns a DialOption that specifies the value to be used as
335 // the :authority pseudo-header. This value only works with WithInsecure and
336 // has no effect if TransportCredentials are present.
337 func WithAuthority(a string) DialOption {
338         return func(o *dialOptions) {
339                 o.copts.Authority = a
340         }
341 }
342
343 // Dial creates a client connection to the given target.
344 func Dial(target string, opts ...DialOption) (*ClientConn, error) {
345         return DialContext(context.Background(), target, opts...)
346 }
347
348 // DialContext creates a client connection to the given target. ctx can be used to
349 // cancel or expire the pending connection. Once this function returns, the
350 // cancellation and expiration of ctx will be noop. Users should call ClientConn.Close
351 // to terminate all the pending operations after this function returns.
352 func DialContext(ctx context.Context, target string, opts ...DialOption) (conn *ClientConn, err error) {
353         cc := &ClientConn{
354                 target: target,
355                 csMgr:  &connectivityStateManager{},
356                 conns:  make(map[*addrConn]struct{}),
357
358                 blockingpicker: newPickerWrapper(),
359         }
360         cc.ctx, cc.cancel = context.WithCancel(context.Background())
361
362         for _, opt := range opts {
363                 opt(&cc.dopts)
364         }
365
366         if !cc.dopts.insecure {
367                 if cc.dopts.copts.TransportCredentials == nil {
368                         return nil, errNoTransportSecurity
369                 }
370         } else {
371                 if cc.dopts.copts.TransportCredentials != nil {
372                         return nil, errCredentialsConflict
373                 }
374                 for _, cd := range cc.dopts.copts.PerRPCCredentials {
375                         if cd.RequireTransportSecurity() {
376                                 return nil, errTransportCredentialsMissing
377                         }
378                 }
379         }
380
381         cc.mkp = cc.dopts.copts.KeepaliveParams
382
383         if cc.dopts.copts.Dialer == nil {
384                 cc.dopts.copts.Dialer = newProxyDialer(
385                         func(ctx context.Context, addr string) (net.Conn, error) {
386                                 return dialContext(ctx, "tcp", addr)
387                         },
388                 )
389         }
390
391         if cc.dopts.copts.UserAgent != "" {
392                 cc.dopts.copts.UserAgent += " " + grpcUA
393         } else {
394                 cc.dopts.copts.UserAgent = grpcUA
395         }
396
397         if cc.dopts.timeout > 0 {
398                 var cancel context.CancelFunc
399                 ctx, cancel = context.WithTimeout(ctx, cc.dopts.timeout)
400                 defer cancel()
401         }
402
403         defer func() {
404                 select {
405                 case <-ctx.Done():
406                         conn, err = nil, ctx.Err()
407                 default:
408                 }
409
410                 if err != nil {
411                         cc.Close()
412                 }
413         }()
414
415         scSet := false
416         if cc.dopts.scChan != nil {
417                 // Try to get an initial service config.
418                 select {
419                 case sc, ok := <-cc.dopts.scChan:
420                         if ok {
421                                 cc.sc = sc
422                                 scSet = true
423                         }
424                 default:
425                 }
426         }
427         // Set defaults.
428         if cc.dopts.codec == nil {
429                 cc.dopts.codec = protoCodec{}
430         }
431         if cc.dopts.bs == nil {
432                 cc.dopts.bs = DefaultBackoffConfig
433         }
434         cc.parsedTarget = parseTarget(cc.target)
435         creds := cc.dopts.copts.TransportCredentials
436         if creds != nil && creds.Info().ServerName != "" {
437                 cc.authority = creds.Info().ServerName
438         } else if cc.dopts.insecure && cc.dopts.copts.Authority != "" {
439                 cc.authority = cc.dopts.copts.Authority
440         } else {
441                 // Use endpoint from "scheme://authority/endpoint" as the default
442                 // authority for ClientConn.
443                 cc.authority = cc.parsedTarget.Endpoint
444         }
445
446         if cc.dopts.scChan != nil && !scSet {
447                 // Blocking wait for the initial service config.
448                 select {
449                 case sc, ok := <-cc.dopts.scChan:
450                         if ok {
451                                 cc.sc = sc
452                         }
453                 case <-ctx.Done():
454                         return nil, ctx.Err()
455                 }
456         }
457         if cc.dopts.scChan != nil {
458                 go cc.scWatcher()
459         }
460
461         var credsClone credentials.TransportCredentials
462         if creds := cc.dopts.copts.TransportCredentials; creds != nil {
463                 credsClone = creds.Clone()
464         }
465         cc.balancerBuildOpts = balancer.BuildOptions{
466                 DialCreds: credsClone,
467                 Dialer:    cc.dopts.copts.Dialer,
468         }
469
470         if cc.dopts.balancerBuilder != nil {
471                 cc.customBalancer = true
472                 // Build should not take long time. So it's ok to not have a goroutine for it.
473                 cc.balancerWrapper = newCCBalancerWrapper(cc, cc.dopts.balancerBuilder, cc.balancerBuildOpts)
474         }
475
476         // Build the resolver.
477         cc.resolverWrapper, err = newCCResolverWrapper(cc)
478         if err != nil {
479                 return nil, fmt.Errorf("failed to build resolver: %v", err)
480         }
481
482         // A blocking dial blocks until the clientConn is ready.
483         if cc.dopts.block {
484                 for {
485                         s := cc.GetState()
486                         if s == connectivity.Ready {
487                                 break
488                         }
489                         if !cc.WaitForStateChange(ctx, s) {
490                                 // ctx got timeout or canceled.
491                                 return nil, ctx.Err()
492                         }
493                 }
494         }
495
496         return cc, nil
497 }
498
499 // connectivityStateManager keeps the connectivity.State of ClientConn.
500 // This struct will eventually be exported so the balancers can access it.
501 type connectivityStateManager struct {
502         mu         sync.Mutex
503         state      connectivity.State
504         notifyChan chan struct{}
505 }
506
507 // updateState updates the connectivity.State of ClientConn.
508 // If there's a change it notifies goroutines waiting on state change to
509 // happen.
510 func (csm *connectivityStateManager) updateState(state connectivity.State) {
511         csm.mu.Lock()
512         defer csm.mu.Unlock()
513         if csm.state == connectivity.Shutdown {
514                 return
515         }
516         if csm.state == state {
517                 return
518         }
519         csm.state = state
520         if csm.notifyChan != nil {
521                 // There are other goroutines waiting on this channel.
522                 close(csm.notifyChan)
523                 csm.notifyChan = nil
524         }
525 }
526
527 func (csm *connectivityStateManager) getState() connectivity.State {
528         csm.mu.Lock()
529         defer csm.mu.Unlock()
530         return csm.state
531 }
532
533 func (csm *connectivityStateManager) getNotifyChan() <-chan struct{} {
534         csm.mu.Lock()
535         defer csm.mu.Unlock()
536         if csm.notifyChan == nil {
537                 csm.notifyChan = make(chan struct{})
538         }
539         return csm.notifyChan
540 }
541
542 // ClientConn represents a client connection to an RPC server.
543 type ClientConn struct {
544         ctx    context.Context
545         cancel context.CancelFunc
546
547         target       string
548         parsedTarget resolver.Target
549         authority    string
550         dopts        dialOptions
551         csMgr        *connectivityStateManager
552
553         customBalancer    bool // If this is true, switching balancer will be disabled.
554         balancerBuildOpts balancer.BuildOptions
555         resolverWrapper   *ccResolverWrapper
556         blockingpicker    *pickerWrapper
557
558         mu    sync.RWMutex
559         sc    ServiceConfig
560         scRaw string
561         conns map[*addrConn]struct{}
562         // Keepalive parameter can be updated if a GoAway is received.
563         mkp             keepalive.ClientParameters
564         curBalancerName string
565         curAddresses    []resolver.Address
566         balancerWrapper *ccBalancerWrapper
567 }
568
569 // WaitForStateChange waits until the connectivity.State of ClientConn changes from sourceState or
570 // ctx expires. A true value is returned in former case and false in latter.
571 // This is an EXPERIMENTAL API.
572 func (cc *ClientConn) WaitForStateChange(ctx context.Context, sourceState connectivity.State) bool {
573         ch := cc.csMgr.getNotifyChan()
574         if cc.csMgr.getState() != sourceState {
575                 return true
576         }
577         select {
578         case <-ctx.Done():
579                 return false
580         case <-ch:
581                 return true
582         }
583 }
584
585 // GetState returns the connectivity.State of ClientConn.
586 // This is an EXPERIMENTAL API.
587 func (cc *ClientConn) GetState() connectivity.State {
588         return cc.csMgr.getState()
589 }
590
591 func (cc *ClientConn) scWatcher() {
592         for {
593                 select {
594                 case sc, ok := <-cc.dopts.scChan:
595                         if !ok {
596                                 return
597                         }
598                         cc.mu.Lock()
599                         // TODO: load balance policy runtime change is ignored.
600                         // We may revist this decision in the future.
601                         cc.sc = sc
602                         cc.scRaw = ""
603                         cc.mu.Unlock()
604                 case <-cc.ctx.Done():
605                         return
606                 }
607         }
608 }
609
610 func (cc *ClientConn) handleResolvedAddrs(addrs []resolver.Address, err error) {
611         cc.mu.Lock()
612         defer cc.mu.Unlock()
613         if cc.conns == nil {
614                 return
615         }
616
617         // TODO(bar switching) when grpclb is submitted, check address type and start grpclb.
618         if !cc.customBalancer && cc.balancerWrapper == nil {
619                 // No customBalancer was specified by DialOption, and this is the first
620                 // time handling resolved addresses, create a pickfirst balancer.
621                 builder := newPickfirstBuilder()
622                 cc.curBalancerName = builder.Name()
623                 cc.balancerWrapper = newCCBalancerWrapper(cc, builder, cc.balancerBuildOpts)
624         }
625
626         // TODO(bar switching) compare addresses, if there's no update, don't notify balancer.
627         cc.curAddresses = addrs
628         cc.balancerWrapper.handleResolvedAddrs(addrs, nil)
629 }
630
631 // switchBalancer starts the switching from current balancer to the balancer with name.
632 func (cc *ClientConn) switchBalancer(name string) {
633         cc.mu.Lock()
634         defer cc.mu.Unlock()
635         if cc.conns == nil {
636                 return
637         }
638         grpclog.Infof("ClientConn switching balancer to %q", name)
639
640         if cc.customBalancer {
641                 grpclog.Infoln("ignoring service config balancer configuration: WithBalancer DialOption used instead")
642                 return
643         }
644
645         if cc.curBalancerName == name {
646                 return
647         }
648
649         // TODO(bar switching) change this to two steps: drain and close.
650         // Keep track of sc in wrapper.
651         cc.balancerWrapper.close()
652
653         builder := balancer.Get(name)
654         if builder == nil {
655                 grpclog.Infof("failed to get balancer builder for: %v (this should never happen...)", name)
656                 builder = newPickfirstBuilder()
657         }
658         cc.curBalancerName = builder.Name()
659         cc.balancerWrapper = newCCBalancerWrapper(cc, builder, cc.balancerBuildOpts)
660         cc.balancerWrapper.handleResolvedAddrs(cc.curAddresses, nil)
661 }
662
663 func (cc *ClientConn) handleSubConnStateChange(sc balancer.SubConn, s connectivity.State) {
664         cc.mu.Lock()
665         if cc.conns == nil {
666                 cc.mu.Unlock()
667                 return
668         }
669         // TODO(bar switching) send updates to all balancer wrappers when balancer
670         // gracefully switching is supported.
671         cc.balancerWrapper.handleSubConnStateChange(sc, s)
672         cc.mu.Unlock()
673 }
674
675 // newAddrConn creates an addrConn for addrs and adds it to cc.conns.
676 func (cc *ClientConn) newAddrConn(addrs []resolver.Address) (*addrConn, error) {
677         ac := &addrConn{
678                 cc:    cc,
679                 addrs: addrs,
680                 dopts: cc.dopts,
681         }
682         ac.ctx, ac.cancel = context.WithCancel(cc.ctx)
683         // Track ac in cc. This needs to be done before any getTransport(...) is called.
684         cc.mu.Lock()
685         if cc.conns == nil {
686                 cc.mu.Unlock()
687                 return nil, ErrClientConnClosing
688         }
689         cc.conns[ac] = struct{}{}
690         cc.mu.Unlock()
691         return ac, nil
692 }
693
694 // removeAddrConn removes the addrConn in the subConn from clientConn.
695 // It also tears down the ac with the given error.
696 func (cc *ClientConn) removeAddrConn(ac *addrConn, err error) {
697         cc.mu.Lock()
698         if cc.conns == nil {
699                 cc.mu.Unlock()
700                 return
701         }
702         delete(cc.conns, ac)
703         cc.mu.Unlock()
704         ac.tearDown(err)
705 }
706
707 // connect starts to creating transport and also starts the transport monitor
708 // goroutine for this ac.
709 // It does nothing if the ac is not IDLE.
710 // TODO(bar) Move this to the addrConn section.
711 // This was part of resetAddrConn, keep it here to make the diff look clean.
712 func (ac *addrConn) connect() error {
713         ac.mu.Lock()
714         if ac.state == connectivity.Shutdown {
715                 ac.mu.Unlock()
716                 return errConnClosing
717         }
718         if ac.state != connectivity.Idle {
719                 ac.mu.Unlock()
720                 return nil
721         }
722         ac.state = connectivity.Connecting
723         ac.cc.handleSubConnStateChange(ac.acbw, ac.state)
724         ac.mu.Unlock()
725
726         // Start a goroutine connecting to the server asynchronously.
727         go func() {
728                 if err := ac.resetTransport(); err != nil {
729                         grpclog.Warningf("Failed to dial %s: %v; please retry.", ac.addrs[0].Addr, err)
730                         if err != errConnClosing {
731                                 // Keep this ac in cc.conns, to get the reason it's torn down.
732                                 ac.tearDown(err)
733                         }
734                         return
735                 }
736                 ac.transportMonitor()
737         }()
738         return nil
739 }
740
741 // tryUpdateAddrs tries to update ac.addrs with the new addresses list.
742 //
743 // It checks whether current connected address of ac is in the new addrs list.
744 //  - If true, it updates ac.addrs and returns true. The ac will keep using
745 //    the existing connection.
746 //  - If false, it does nothing and returns false.
747 func (ac *addrConn) tryUpdateAddrs(addrs []resolver.Address) bool {
748         ac.mu.Lock()
749         defer ac.mu.Unlock()
750         grpclog.Infof("addrConn: tryUpdateAddrs curAddr: %v, addrs: %v", ac.curAddr, addrs)
751         if ac.state == connectivity.Shutdown {
752                 ac.addrs = addrs
753                 return true
754         }
755
756         var curAddrFound bool
757         for _, a := range addrs {
758                 if reflect.DeepEqual(ac.curAddr, a) {
759                         curAddrFound = true
760                         break
761                 }
762         }
763         grpclog.Infof("addrConn: tryUpdateAddrs curAddrFound: %v", curAddrFound)
764         if curAddrFound {
765                 ac.addrs = addrs
766         }
767
768         return curAddrFound
769 }
770
771 // GetMethodConfig gets the method config of the input method.
772 // If there's an exact match for input method (i.e. /service/method), we return
773 // the corresponding MethodConfig.
774 // If there isn't an exact match for the input method, we look for the default config
775 // under the service (i.e /service/). If there is a default MethodConfig for
776 // the serivce, we return it.
777 // Otherwise, we return an empty MethodConfig.
778 func (cc *ClientConn) GetMethodConfig(method string) MethodConfig {
779         // TODO: Avoid the locking here.
780         cc.mu.RLock()
781         defer cc.mu.RUnlock()
782         m, ok := cc.sc.Methods[method]
783         if !ok {
784                 i := strings.LastIndex(method, "/")
785                 m, _ = cc.sc.Methods[method[:i+1]]
786         }
787         return m
788 }
789
790 func (cc *ClientConn) getTransport(ctx context.Context, failfast bool) (transport.ClientTransport, func(balancer.DoneInfo), error) {
791         t, done, err := cc.blockingpicker.pick(ctx, failfast, balancer.PickOptions{})
792         if err != nil {
793                 return nil, nil, toRPCErr(err)
794         }
795         return t, done, nil
796 }
797
798 // handleServiceConfig parses the service config string in JSON format to Go native
799 // struct ServiceConfig, and store both the struct and the JSON string in ClientConn.
800 func (cc *ClientConn) handleServiceConfig(js string) error {
801         sc, err := parseServiceConfig(js)
802         if err != nil {
803                 return err
804         }
805         cc.mu.Lock()
806         cc.scRaw = js
807         cc.sc = sc
808         cc.mu.Unlock()
809         return nil
810 }
811
812 // Close tears down the ClientConn and all underlying connections.
813 func (cc *ClientConn) Close() error {
814         cc.cancel()
815
816         cc.mu.Lock()
817         if cc.conns == nil {
818                 cc.mu.Unlock()
819                 return ErrClientConnClosing
820         }
821         conns := cc.conns
822         cc.conns = nil
823         cc.csMgr.updateState(connectivity.Shutdown)
824
825         rWrapper := cc.resolverWrapper
826         cc.resolverWrapper = nil
827         bWrapper := cc.balancerWrapper
828         cc.balancerWrapper = nil
829         cc.mu.Unlock()
830         cc.blockingpicker.close()
831         if rWrapper != nil {
832                 rWrapper.close()
833         }
834         if bWrapper != nil {
835                 bWrapper.close()
836         }
837         for ac := range conns {
838                 ac.tearDown(ErrClientConnClosing)
839         }
840         return nil
841 }
842
843 // addrConn is a network connection to a given address.
844 type addrConn struct {
845         ctx    context.Context
846         cancel context.CancelFunc
847
848         cc      *ClientConn
849         curAddr resolver.Address
850         addrs   []resolver.Address
851         dopts   dialOptions
852         events  trace.EventLog
853         acbw    balancer.SubConn
854
855         mu    sync.Mutex
856         state connectivity.State
857         // ready is closed and becomes nil when a new transport is up or failed
858         // due to timeout.
859         ready     chan struct{}
860         transport transport.ClientTransport
861
862         // The reason this addrConn is torn down.
863         tearDownErr error
864 }
865
866 // adjustParams updates parameters used to create transports upon
867 // receiving a GoAway.
868 func (ac *addrConn) adjustParams(r transport.GoAwayReason) {
869         switch r {
870         case transport.TooManyPings:
871                 v := 2 * ac.dopts.copts.KeepaliveParams.Time
872                 ac.cc.mu.Lock()
873                 if v > ac.cc.mkp.Time {
874                         ac.cc.mkp.Time = v
875                 }
876                 ac.cc.mu.Unlock()
877         }
878 }
879
880 // printf records an event in ac's event log, unless ac has been closed.
881 // REQUIRES ac.mu is held.
882 func (ac *addrConn) printf(format string, a ...interface{}) {
883         if ac.events != nil {
884                 ac.events.Printf(format, a...)
885         }
886 }
887
888 // errorf records an error in ac's event log, unless ac has been closed.
889 // REQUIRES ac.mu is held.
890 func (ac *addrConn) errorf(format string, a ...interface{}) {
891         if ac.events != nil {
892                 ac.events.Errorf(format, a...)
893         }
894 }
895
896 // resetTransport recreates a transport to the address for ac.  The old
897 // transport will close itself on error or when the clientconn is closed.
898 //
899 // TODO(bar) make sure all state transitions are valid.
900 func (ac *addrConn) resetTransport() error {
901         ac.mu.Lock()
902         if ac.state == connectivity.Shutdown {
903                 ac.mu.Unlock()
904                 return errConnClosing
905         }
906         if ac.ready != nil {
907                 close(ac.ready)
908                 ac.ready = nil
909         }
910         ac.transport = nil
911         ac.curAddr = resolver.Address{}
912         ac.mu.Unlock()
913         ac.cc.mu.RLock()
914         ac.dopts.copts.KeepaliveParams = ac.cc.mkp
915         ac.cc.mu.RUnlock()
916         for retries := 0; ; retries++ {
917                 sleepTime := ac.dopts.bs.backoff(retries)
918                 timeout := minConnectTimeout
919                 ac.mu.Lock()
920                 if timeout < time.Duration(int(sleepTime)/len(ac.addrs)) {
921                         timeout = time.Duration(int(sleepTime) / len(ac.addrs))
922                 }
923                 connectTime := time.Now()
924                 if ac.state == connectivity.Shutdown {
925                         ac.mu.Unlock()
926                         return errConnClosing
927                 }
928                 ac.printf("connecting")
929                 if ac.state != connectivity.Connecting {
930                         ac.state = connectivity.Connecting
931                         ac.cc.handleSubConnStateChange(ac.acbw, ac.state)
932                 }
933                 // copy ac.addrs in case of race
934                 addrsIter := make([]resolver.Address, len(ac.addrs))
935                 copy(addrsIter, ac.addrs)
936                 copts := ac.dopts.copts
937                 ac.mu.Unlock()
938                 for _, addr := range addrsIter {
939                         ac.mu.Lock()
940                         if ac.state == connectivity.Shutdown {
941                                 // ac.tearDown(...) has been invoked.
942                                 ac.mu.Unlock()
943                                 return errConnClosing
944                         }
945                         ac.mu.Unlock()
946                         sinfo := transport.TargetInfo{
947                                 Addr:      addr.Addr,
948                                 Metadata:  addr.Metadata,
949                                 Authority: ac.cc.authority,
950                         }
951                         newTransport, err := transport.NewClientTransport(ac.cc.ctx, sinfo, copts, timeout)
952                         if err != nil {
953                                 if e, ok := err.(transport.ConnectionError); ok && !e.Temporary() {
954                                         return err
955                                 }
956                                 grpclog.Warningf("grpc: addrConn.resetTransport failed to create client transport: %v; Reconnecting to %v", err, addr)
957                                 ac.mu.Lock()
958                                 if ac.state == connectivity.Shutdown {
959                                         // ac.tearDown(...) has been invoked.
960                                         ac.mu.Unlock()
961                                         return errConnClosing
962                                 }
963                                 ac.mu.Unlock()
964                                 continue
965                         }
966                         ac.mu.Lock()
967                         ac.printf("ready")
968                         if ac.state == connectivity.Shutdown {
969                                 // ac.tearDown(...) has been invoked.
970                                 ac.mu.Unlock()
971                                 newTransport.Close()
972                                 return errConnClosing
973                         }
974                         ac.state = connectivity.Ready
975                         ac.cc.handleSubConnStateChange(ac.acbw, ac.state)
976                         t := ac.transport
977                         ac.transport = newTransport
978                         if t != nil {
979                                 t.Close()
980                         }
981                         ac.curAddr = addr
982                         if ac.ready != nil {
983                                 close(ac.ready)
984                                 ac.ready = nil
985                         }
986                         ac.mu.Unlock()
987                         return nil
988                 }
989                 ac.mu.Lock()
990                 ac.state = connectivity.TransientFailure
991                 ac.cc.handleSubConnStateChange(ac.acbw, ac.state)
992                 if ac.ready != nil {
993                         close(ac.ready)
994                         ac.ready = nil
995                 }
996                 ac.mu.Unlock()
997                 timer := time.NewTimer(sleepTime - time.Since(connectTime))
998                 select {
999                 case <-timer.C:
1000                 case <-ac.ctx.Done():
1001                         timer.Stop()
1002                         return ac.ctx.Err()
1003                 }
1004                 timer.Stop()
1005         }
1006 }
1007
1008 // Run in a goroutine to track the error in transport and create the
1009 // new transport if an error happens. It returns when the channel is closing.
1010 func (ac *addrConn) transportMonitor() {
1011         for {
1012                 ac.mu.Lock()
1013                 t := ac.transport
1014                 ac.mu.Unlock()
1015                 // Block until we receive a goaway or an error occurs.
1016                 select {
1017                 case <-t.GoAway():
1018                 case <-t.Error():
1019                 }
1020                 // If a GoAway happened, regardless of error, adjust our keepalive
1021                 // parameters as appropriate.
1022                 select {
1023                 case <-t.GoAway():
1024                         ac.adjustParams(t.GetGoAwayReason())
1025                 default:
1026                 }
1027                 ac.mu.Lock()
1028                 // Set connectivity state to TransientFailure before calling
1029                 // resetTransport. Transition READY->CONNECTING is not valid.
1030                 ac.state = connectivity.TransientFailure
1031                 ac.cc.handleSubConnStateChange(ac.acbw, ac.state)
1032                 ac.curAddr = resolver.Address{}
1033                 ac.mu.Unlock()
1034                 if err := ac.resetTransport(); err != nil {
1035                         ac.mu.Lock()
1036                         ac.printf("transport exiting: %v", err)
1037                         ac.mu.Unlock()
1038                         grpclog.Warningf("grpc: addrConn.transportMonitor exits due to: %v", err)
1039                         if err != errConnClosing {
1040                                 // Keep this ac in cc.conns, to get the reason it's torn down.
1041                                 ac.tearDown(err)
1042                         }
1043                         return
1044                 }
1045         }
1046 }
1047
1048 // wait blocks until i) the new transport is up or ii) ctx is done or iii) ac is closed or
1049 // iv) transport is in connectivity.TransientFailure and there is a balancer/failfast is true.
1050 func (ac *addrConn) wait(ctx context.Context, hasBalancer, failfast bool) (transport.ClientTransport, error) {
1051         for {
1052                 ac.mu.Lock()
1053                 switch {
1054                 case ac.state == connectivity.Shutdown:
1055                         if failfast || !hasBalancer {
1056                                 // RPC is failfast or balancer is nil. This RPC should fail with ac.tearDownErr.
1057                                 err := ac.tearDownErr
1058                                 ac.mu.Unlock()
1059                                 return nil, err
1060                         }
1061                         ac.mu.Unlock()
1062                         return nil, errConnClosing
1063                 case ac.state == connectivity.Ready:
1064                         ct := ac.transport
1065                         ac.mu.Unlock()
1066                         return ct, nil
1067                 case ac.state == connectivity.TransientFailure:
1068                         if failfast || hasBalancer {
1069                                 ac.mu.Unlock()
1070                                 return nil, errConnUnavailable
1071                         }
1072                 }
1073                 ready := ac.ready
1074                 if ready == nil {
1075                         ready = make(chan struct{})
1076                         ac.ready = ready
1077                 }
1078                 ac.mu.Unlock()
1079                 select {
1080                 case <-ctx.Done():
1081                         return nil, toRPCErr(ctx.Err())
1082                 // Wait until the new transport is ready or failed.
1083                 case <-ready:
1084                 }
1085         }
1086 }
1087
1088 // getReadyTransport returns the transport if ac's state is READY.
1089 // Otherwise it returns nil, false.
1090 // If ac's state is IDLE, it will trigger ac to connect.
1091 func (ac *addrConn) getReadyTransport() (transport.ClientTransport, bool) {
1092         ac.mu.Lock()
1093         if ac.state == connectivity.Ready {
1094                 t := ac.transport
1095                 ac.mu.Unlock()
1096                 return t, true
1097         }
1098         var idle bool
1099         if ac.state == connectivity.Idle {
1100                 idle = true
1101         }
1102         ac.mu.Unlock()
1103         // Trigger idle ac to connect.
1104         if idle {
1105                 ac.connect()
1106         }
1107         return nil, false
1108 }
1109
1110 // tearDown starts to tear down the addrConn.
1111 // TODO(zhaoq): Make this synchronous to avoid unbounded memory consumption in
1112 // some edge cases (e.g., the caller opens and closes many addrConn's in a
1113 // tight loop.
1114 // tearDown doesn't remove ac from ac.cc.conns.
1115 func (ac *addrConn) tearDown(err error) {
1116         ac.cancel()
1117         ac.mu.Lock()
1118         defer ac.mu.Unlock()
1119         ac.curAddr = resolver.Address{}
1120         if err == errConnDrain && ac.transport != nil {
1121                 // GracefulClose(...) may be executed multiple times when
1122                 // i) receiving multiple GoAway frames from the server; or
1123                 // ii) there are concurrent name resolver/Balancer triggered
1124                 // address removal and GoAway.
1125                 ac.transport.GracefulClose()
1126         }
1127         if ac.state == connectivity.Shutdown {
1128                 return
1129         }
1130         ac.state = connectivity.Shutdown
1131         ac.tearDownErr = err
1132         ac.cc.handleSubConnStateChange(ac.acbw, ac.state)
1133         if ac.events != nil {
1134                 ac.events.Finish()
1135                 ac.events = nil
1136         }
1137         if ac.ready != nil {
1138                 close(ac.ready)
1139                 ac.ready = nil
1140         }
1141         return
1142 }
1143
1144 func (ac *addrConn) getState() connectivity.State {
1145         ac.mu.Lock()
1146         defer ac.mu.Unlock()
1147         return ac.state
1148 }