7 consul "github.com/hashicorp/consul/api"
9 "github.com/go-kit/kit/log"
10 "github.com/go-kit/kit/sd"
11 "github.com/go-kit/kit/sd/internal/instance"
14 const defaultIndex = 0
16 // Instancer yields instances for a service in Consul.
17 type Instancer struct {
27 // NewInstancer returns a Consul instancer that publishes instances for the
28 // requested service. It only returns instances for which all of the passed tags
30 func NewInstancer(client Client, logger log.Logger, service string, tags []string, passingOnly bool) *Instancer {
32 cache: instance.NewCache(),
34 logger: log.With(logger, "service", service, "tags", fmt.Sprint(tags)),
37 passingOnly: passingOnly,
38 quitc: make(chan struct{}),
41 instances, index, err := s.getInstances(defaultIndex, nil)
43 s.logger.Log("instances", len(instances))
45 s.logger.Log("err", err)
48 s.cache.Update(sd.Event{Instances: instances, Err: err})
53 // Stop terminates the instancer.
54 func (s *Instancer) Stop() {
58 func (s *Instancer) loop(lastIndex uint64) {
64 instances, lastIndex, err = s.getInstances(lastIndex, s.quitc)
67 return // stopped via quitc
69 s.logger.Log("err", err)
70 s.cache.Update(sd.Event{Err: err})
72 s.cache.Update(sd.Event{Instances: instances})
77 func (s *Instancer) getInstances(lastIndex uint64, interruptc chan struct{}) ([]string, uint64, error) {
83 // Consul doesn't support more than one tag in its service query method.
84 // https://github.com/hashicorp/consul/issues/294
85 // Hashi suggest prepared queries, but they don't support blocking.
86 // https://www.consul.io/docs/agent/http/query.html#execute
87 // If we want blocking for efficiency, we must filter tags manually.
89 type response struct {
95 errc = make(chan error, 1)
96 resc = make(chan response, 1)
100 entries, meta, err := s.client.Service(s.service, tag, s.passingOnly, &consul.QueryOptions{
101 WaitIndex: lastIndex,
108 entries = filterEntries(entries, s.tags[1:]...)
111 instances: makeInstances(entries),
112 index: meta.LastIndex,
120 return res.instances, res.index, nil
122 return nil, 0, io.EOF
126 // Register implements Instancer.
127 func (s *Instancer) Register(ch chan<- sd.Event) {
131 // Deregister implements Instancer.
132 func (s *Instancer) Deregister(ch chan<- sd.Event) {
133 s.cache.Deregister(ch)
136 func filterEntries(entries []*consul.ServiceEntry, tags ...string) []*consul.ServiceEntry {
137 var es []*consul.ServiceEntry
140 for _, entry := range entries {
141 ts := make(map[string]struct{}, len(entry.Service.Tags))
142 for _, tag := range entry.Service.Tags {
146 for _, tag := range tags {
147 if _, ok := ts[tag]; !ok {
151 es = append(es, entry)
157 func makeInstances(entries []*consul.ServiceEntry) []string {
158 instances := make([]string, len(entries))
159 for i, entry := range entries {
160 addr := entry.Node.Address
161 if entry.Service.Address != "" {
162 addr = entry.Service.Address
164 instances[i] = fmt.Sprintf("%s:%d", addr, entry.Service.Port)