package etcd import ( "github.com/go-kit/kit/log" "github.com/go-kit/kit/sd" "github.com/go-kit/kit/sd/internal/instance" ) // Instancer yields instances stored in a certain etcd keyspace. Any kind of // change in that keyspace is watched and will update the Instancer's Instancers. type Instancer struct { cache *instance.Cache client Client prefix string logger log.Logger quitc chan struct{} } // NewInstancer returns an etcd instancer. It will start watching the given // prefix for changes, and update the subscribers. func NewInstancer(c Client, prefix string, logger log.Logger) (*Instancer, error) { s := &Instancer{ client: c, prefix: prefix, cache: instance.NewCache(), logger: logger, quitc: make(chan struct{}), } instances, err := s.client.GetEntries(s.prefix) if err == nil { logger.Log("prefix", s.prefix, "instances", len(instances)) } else { logger.Log("prefix", s.prefix, "err", err) } s.cache.Update(sd.Event{Instances: instances, Err: err}) go s.loop() return s, nil } func (s *Instancer) loop() { ch := make(chan struct{}) go s.client.WatchPrefix(s.prefix, ch) for { select { case <-ch: instances, err := s.client.GetEntries(s.prefix) if err != nil { s.logger.Log("msg", "failed to retrieve entries", "err", err) s.cache.Update(sd.Event{Err: err}) continue } s.cache.Update(sd.Event{Instances: instances}) case <-s.quitc: return } } } // Stop terminates the Instancer. func (s *Instancer) Stop() { close(s.quitc) } // Register implements Instancer. func (s *Instancer) Register(ch chan<- sd.Event) { s.cache.Register(ch) } // Deregister implements Instancer. func (s *Instancer) Deregister(ch chan<- sd.Event) { s.cache.Deregister(ch) }