9 "github.com/go-kit/kit/endpoint"
10 "github.com/go-kit/kit/log"
13 // endpointCache collects the most recent set of instances from a service discovery
14 // system, creates endpoints for them using a factory function, and makes
15 // them available to consumers.
16 type endpointCache struct {
17 options endpointerOptions
20 cache map[string]endpointCloser
22 endpoints []endpoint.Endpoint
24 invalidateDeadline time.Time
25 timeNow func() time.Time
28 type endpointCloser struct {
33 // newEndpointCache returns a new, empty endpointCache.
34 func newEndpointCache(factory Factory, logger log.Logger, options endpointerOptions) *endpointCache {
35 return &endpointCache{
38 cache: map[string]endpointCloser{},
44 // Update should be invoked by clients with a complete set of current instance
45 // strings whenever that set changes. The cache manufactures new endpoints via
46 // the factory, closes old endpoints when they disappear, and persists existing
47 // endpoints if they survive through an update.
48 func (c *endpointCache) Update(event Event) {
54 c.updateCache(event.Instances)
59 // Sad path. Something's gone wrong in sd.
60 c.logger.Log("err", event.Err)
61 if !c.options.invalidateOnError {
62 return // keep returning the last known endpoints on error
65 return // already in the error state, do nothing & keep original error
68 // set new deadline to invalidate Endpoints unless non-error Event is received
69 c.invalidateDeadline = c.timeNow().Add(c.options.invalidateTimeout)
73 func (c *endpointCache) updateCache(instances []string) {
74 // Deterministic order (for later).
75 sort.Strings(instances)
77 // Produce the current set of services.
78 cache := make(map[string]endpointCloser, len(instances))
79 for _, instance := range instances {
80 // If it already exists, just copy it over.
81 if sc, ok := c.cache[instance]; ok {
83 delete(c.cache, instance)
87 // If it doesn't exist, create it.
88 service, closer, err := c.factory(instance)
90 c.logger.Log("instance", instance, "err", err)
93 cache[instance] = endpointCloser{service, closer}
96 // Close any leftover endpoints.
97 for _, sc := range c.cache {
103 // Populate the slice of endpoints.
104 endpoints := make([]endpoint.Endpoint, 0, len(cache))
105 for _, instance := range instances {
106 // A bad factory may mean an instance is not present.
107 if _, ok := cache[instance]; !ok {
110 endpoints = append(endpoints, cache[instance].Endpoint)
113 // Swap and trigger GC for old copies.
114 c.endpoints = endpoints
118 // Endpoints yields the current set of (presumably identical) endpoints, ordered
119 // lexicographically by the corresponding instance string.
120 func (c *endpointCache) Endpoints() ([]endpoint.Endpoint, error) {
121 // in the steady state we're going to have many goroutines calling Endpoints()
122 // concurrently, so to minimize contention we use a shared R-lock.
125 if c.err == nil || c.timeNow().Before(c.invalidateDeadline) {
126 defer c.mtx.RUnlock()
127 return c.endpoints, nil
132 // in case of an error, switch to an exclusive lock.
136 // re-check condition due to a race between RUnlock() and Lock().
137 if c.err == nil || c.timeNow().Before(c.invalidateDeadline) {
138 return c.endpoints, nil
141 c.updateCache(nil) // close any remaining active endpoints