6 "github.com/go-kit/kit/endpoint"
7 "github.com/go-kit/kit/log"
10 // Endpointer listens to a service discovery system and yields a set of
11 // identical endpoints on demand. An error indicates a problem with connectivity
12 // to the service discovery system, or within the system itself; an Endpointer
13 // may yield no endpoints without error.
14 type Endpointer interface {
15 Endpoints() ([]endpoint.Endpoint, error)
18 // FixedEndpointer yields a fixed set of endpoints.
19 type FixedEndpointer []endpoint.Endpoint
21 // Endpoints implements Endpointer.
22 func (s FixedEndpointer) Endpoints() ([]endpoint.Endpoint, error) { return s, nil }
24 // NewEndpointer creates an Endpointer that subscribes to updates from Instancer src
25 // and uses factory f to create Endpoints. If src notifies of an error, the Endpointer
26 // keeps returning previously created Endpoints assuming they are still good, unless
27 // this behavior is disabled via InvalidateOnError option.
28 func NewEndpointer(src Instancer, f Factory, logger log.Logger, options ...EndpointerOption) *DefaultEndpointer {
29 opts := endpointerOptions{}
30 for _, opt := range options {
33 se := &DefaultEndpointer{
34 cache: newEndpointCache(f, logger, opts),
43 // EndpointerOption allows control of endpointCache behavior.
44 type EndpointerOption func(*endpointerOptions)
46 // InvalidateOnError returns EndpointerOption that controls how the Endpointer
47 // behaves when then Instancer publishes an Event containing an error.
48 // Without this option the Endpointer continues returning the last known
49 // endpoints. With this option, the Endpointer continues returning the last
50 // known endpoints until the timeout elapses, then closes all active endpoints
51 // and starts returning an error. Once the Instancer sends a new update with
52 // valid resource instances, the normal operation is resumed.
53 func InvalidateOnError(timeout time.Duration) EndpointerOption {
54 return func(opts *endpointerOptions) {
55 opts.invalidateOnError = true
56 opts.invalidateTimeout = timeout
60 type endpointerOptions struct {
61 invalidateOnError bool
62 invalidateTimeout time.Duration
65 // DefaultEndpointer implements an Endpointer interface.
66 // When created with NewEndpointer function, it automatically registers
67 // as a subscriber to events from the Instances and maintains a list
68 // of active Endpoints.
69 type DefaultEndpointer struct {
75 func (de *DefaultEndpointer) receive() {
76 for event := range de.ch {
77 de.cache.Update(event)
81 // Close deregisters DefaultEndpointer from the Instancer and stops the internal go-routine.
82 func (de *DefaultEndpointer) Close() {
83 de.instancer.Deregister(de.ch)
87 // Endpoints implements Endpointer.
88 func (de *DefaultEndpointer) Endpoints() ([]endpoint.Endpoint, error) {
89 return de.cache.Endpoints()