OSDN Git Service

delete miner
[bytom/vapor.git] / vendor / github.com / go-kit / kit / sd / consul / instancer.go
1 package consul
2
3 import (
4         "fmt"
5         "io"
6
7         consul "github.com/hashicorp/consul/api"
8
9         "github.com/go-kit/kit/log"
10         "github.com/go-kit/kit/sd"
11         "github.com/go-kit/kit/sd/internal/instance"
12 )
13
14 const defaultIndex = 0
15
16 // Instancer yields instances for a service in Consul.
17 type Instancer struct {
18         cache       *instance.Cache
19         client      Client
20         logger      log.Logger
21         service     string
22         tags        []string
23         passingOnly bool
24         quitc       chan struct{}
25 }
26
27 // NewInstancer returns a Consul instancer that publishes instances for the
28 // requested service. It only returns instances for which all of the passed tags
29 // are present.
30 func NewInstancer(client Client, logger log.Logger, service string, tags []string, passingOnly bool) *Instancer {
31         s := &Instancer{
32                 cache:       instance.NewCache(),
33                 client:      client,
34                 logger:      log.With(logger, "service", service, "tags", fmt.Sprint(tags)),
35                 service:     service,
36                 tags:        tags,
37                 passingOnly: passingOnly,
38                 quitc:       make(chan struct{}),
39         }
40
41         instances, index, err := s.getInstances(defaultIndex, nil)
42         if err == nil {
43                 s.logger.Log("instances", len(instances))
44         } else {
45                 s.logger.Log("err", err)
46         }
47
48         s.cache.Update(sd.Event{Instances: instances, Err: err})
49         go s.loop(index)
50         return s
51 }
52
53 // Stop terminates the instancer.
54 func (s *Instancer) Stop() {
55         close(s.quitc)
56 }
57
58 func (s *Instancer) loop(lastIndex uint64) {
59         var (
60                 instances []string
61                 err       error
62         )
63         for {
64                 instances, lastIndex, err = s.getInstances(lastIndex, s.quitc)
65                 switch {
66                 case err == io.EOF:
67                         return // stopped via quitc
68                 case err != nil:
69                         s.logger.Log("err", err)
70                         s.cache.Update(sd.Event{Err: err})
71                 default:
72                         s.cache.Update(sd.Event{Instances: instances})
73                 }
74         }
75 }
76
77 func (s *Instancer) getInstances(lastIndex uint64, interruptc chan struct{}) ([]string, uint64, error) {
78         tag := ""
79         if len(s.tags) > 0 {
80                 tag = s.tags[0]
81         }
82
83         // Consul doesn't support more than one tag in its service query method.
84         // https://github.com/hashicorp/consul/issues/294
85         // Hashi suggest prepared queries, but they don't support blocking.
86         // https://www.consul.io/docs/agent/http/query.html#execute
87         // If we want blocking for efficiency, we must filter tags manually.
88
89         type response struct {
90                 instances []string
91                 index     uint64
92         }
93
94         var (
95                 errc = make(chan error, 1)
96                 resc = make(chan response, 1)
97         )
98
99         go func() {
100                 entries, meta, err := s.client.Service(s.service, tag, s.passingOnly, &consul.QueryOptions{
101                         WaitIndex: lastIndex,
102                 })
103                 if err != nil {
104                         errc <- err
105                         return
106                 }
107                 if len(s.tags) > 1 {
108                         entries = filterEntries(entries, s.tags[1:]...)
109                 }
110                 resc <- response{
111                         instances: makeInstances(entries),
112                         index:     meta.LastIndex,
113                 }
114         }()
115
116         select {
117         case err := <-errc:
118                 return nil, 0, err
119         case res := <-resc:
120                 return res.instances, res.index, nil
121         case <-interruptc:
122                 return nil, 0, io.EOF
123         }
124 }
125
126 // Register implements Instancer.
127 func (s *Instancer) Register(ch chan<- sd.Event) {
128         s.cache.Register(ch)
129 }
130
131 // Deregister implements Instancer.
132 func (s *Instancer) Deregister(ch chan<- sd.Event) {
133         s.cache.Deregister(ch)
134 }
135
136 func filterEntries(entries []*consul.ServiceEntry, tags ...string) []*consul.ServiceEntry {
137         var es []*consul.ServiceEntry
138
139 ENTRIES:
140         for _, entry := range entries {
141                 ts := make(map[string]struct{}, len(entry.Service.Tags))
142                 for _, tag := range entry.Service.Tags {
143                         ts[tag] = struct{}{}
144                 }
145
146                 for _, tag := range tags {
147                         if _, ok := ts[tag]; !ok {
148                                 continue ENTRIES
149                         }
150                 }
151                 es = append(es, entry)
152         }
153
154         return es
155 }
156
157 func makeInstances(entries []*consul.ServiceEntry) []string {
158         instances := make([]string, len(entries))
159         for i, entry := range entries {
160                 addr := entry.Node.Address
161                 if entry.Service.Address != "" {
162                         addr = entry.Service.Address
163                 }
164                 instances[i] = fmt.Sprintf("%s:%d", addr, entry.Service.Port)
165         }
166         return instances
167 }