OSDN Git Service

modify ci
[bytom/vapor.git] / vendor / github.com / go-kit / kit / sd / consul / instancer.go
diff --git a/vendor/github.com/go-kit/kit/sd/consul/instancer.go b/vendor/github.com/go-kit/kit/sd/consul/instancer.go
deleted file mode 100644 (file)
index 38b18f0..0000000
+++ /dev/null
@@ -1,167 +0,0 @@
-package consul
-
-import (
-       "fmt"
-       "io"
-
-       consul "github.com/hashicorp/consul/api"
-
-       "github.com/go-kit/kit/log"
-       "github.com/go-kit/kit/sd"
-       "github.com/go-kit/kit/sd/internal/instance"
-)
-
-const defaultIndex = 0
-
-// Instancer yields instances for a service in Consul.
-type Instancer struct {
-       cache       *instance.Cache
-       client      Client
-       logger      log.Logger
-       service     string
-       tags        []string
-       passingOnly bool
-       quitc       chan struct{}
-}
-
-// NewInstancer returns a Consul instancer that publishes instances for the
-// requested service. It only returns instances for which all of the passed tags
-// are present.
-func NewInstancer(client Client, logger log.Logger, service string, tags []string, passingOnly bool) *Instancer {
-       s := &Instancer{
-               cache:       instance.NewCache(),
-               client:      client,
-               logger:      log.With(logger, "service", service, "tags", fmt.Sprint(tags)),
-               service:     service,
-               tags:        tags,
-               passingOnly: passingOnly,
-               quitc:       make(chan struct{}),
-       }
-
-       instances, index, err := s.getInstances(defaultIndex, nil)
-       if err == nil {
-               s.logger.Log("instances", len(instances))
-       } else {
-               s.logger.Log("err", err)
-       }
-
-       s.cache.Update(sd.Event{Instances: instances, Err: err})
-       go s.loop(index)
-       return s
-}
-
-// Stop terminates the instancer.
-func (s *Instancer) Stop() {
-       close(s.quitc)
-}
-
-func (s *Instancer) loop(lastIndex uint64) {
-       var (
-               instances []string
-               err       error
-       )
-       for {
-               instances, lastIndex, err = s.getInstances(lastIndex, s.quitc)
-               switch {
-               case err == io.EOF:
-                       return // stopped via quitc
-               case err != nil:
-                       s.logger.Log("err", err)
-                       s.cache.Update(sd.Event{Err: err})
-               default:
-                       s.cache.Update(sd.Event{Instances: instances})
-               }
-       }
-}
-
-func (s *Instancer) getInstances(lastIndex uint64, interruptc chan struct{}) ([]string, uint64, error) {
-       tag := ""
-       if len(s.tags) > 0 {
-               tag = s.tags[0]
-       }
-
-       // Consul doesn't support more than one tag in its service query method.
-       // https://github.com/hashicorp/consul/issues/294
-       // Hashi suggest prepared queries, but they don't support blocking.
-       // https://www.consul.io/docs/agent/http/query.html#execute
-       // If we want blocking for efficiency, we must filter tags manually.
-
-       type response struct {
-               instances []string
-               index     uint64
-       }
-
-       var (
-               errc = make(chan error, 1)
-               resc = make(chan response, 1)
-       )
-
-       go func() {
-               entries, meta, err := s.client.Service(s.service, tag, s.passingOnly, &consul.QueryOptions{
-                       WaitIndex: lastIndex,
-               })
-               if err != nil {
-                       errc <- err
-                       return
-               }
-               if len(s.tags) > 1 {
-                       entries = filterEntries(entries, s.tags[1:]...)
-               }
-               resc <- response{
-                       instances: makeInstances(entries),
-                       index:     meta.LastIndex,
-               }
-       }()
-
-       select {
-       case err := <-errc:
-               return nil, 0, err
-       case res := <-resc:
-               return res.instances, res.index, nil
-       case <-interruptc:
-               return nil, 0, io.EOF
-       }
-}
-
-// Register implements Instancer.
-func (s *Instancer) Register(ch chan<- sd.Event) {
-       s.cache.Register(ch)
-}
-
-// Deregister implements Instancer.
-func (s *Instancer) Deregister(ch chan<- sd.Event) {
-       s.cache.Deregister(ch)
-}
-
-func filterEntries(entries []*consul.ServiceEntry, tags ...string) []*consul.ServiceEntry {
-       var es []*consul.ServiceEntry
-
-ENTRIES:
-       for _, entry := range entries {
-               ts := make(map[string]struct{}, len(entry.Service.Tags))
-               for _, tag := range entry.Service.Tags {
-                       ts[tag] = struct{}{}
-               }
-
-               for _, tag := range tags {
-                       if _, ok := ts[tag]; !ok {
-                               continue ENTRIES
-                       }
-               }
-               es = append(es, entry)
-       }
-
-       return es
-}
-
-func makeInstances(entries []*consul.ServiceEntry) []string {
-       instances := make([]string, len(entries))
-       for i, entry := range entries {
-               addr := entry.Node.Address
-               if entry.Service.Address != "" {
-                       addr = entry.Service.Address
-               }
-               instances[i] = fmt.Sprintf("%s:%d", addr, entry.Service.Port)
-       }
-       return instances
-}