4 "github.com/samuel/go-zookeeper/zk"
6 "github.com/go-kit/kit/log"
7 "github.com/go-kit/kit/sd"
8 "github.com/go-kit/kit/sd/internal/instance"
11 // Instancer yield instances stored in a certain ZooKeeper path. Any kind of
12 // change in that path is watched and will update the subscribers.
13 type Instancer struct {
21 // NewInstancer returns a ZooKeeper Instancer. ZooKeeper will start watching
22 // the given path for changes and update the Instancer endpoints.
23 func NewInstancer(c Client, path string, logger log.Logger) (*Instancer, error) {
25 cache: instance.NewCache(),
29 quitc: make(chan struct{}),
32 err := s.client.CreateParentNodes(s.path)
37 instances, eventc, err := s.client.GetEntries(s.path)
39 logger.Log("path", s.path, "msg", "failed to retrieve entries", "err", err)
40 // other implementations continue here, but we exit because we don't know if eventc is valid
43 logger.Log("path", s.path, "instances", len(instances))
44 s.cache.Update(sd.Event{Instances: instances})
51 func (s *Instancer) loop(eventc <-chan zk.Event) {
59 // We received a path update notification. Call GetEntries to
60 // retrieve child node data, and set a new watch, as ZK watches are
62 instances, eventc, err = s.client.GetEntries(s.path)
64 s.logger.Log("path", s.path, "msg", "failed to retrieve entries", "err", err)
65 s.cache.Update(sd.Event{Err: err})
68 s.logger.Log("path", s.path, "instances", len(instances))
69 s.cache.Update(sd.Event{Instances: instances})
77 // Stop terminates the Instancer.
78 func (s *Instancer) Stop() {
82 // Register implements Instancer.
83 func (s *Instancer) Register(ch chan<- sd.Event) {
87 // Deregister implements Instancer.
88 func (s *Instancer) Deregister(ch chan<- sd.Event) {
89 s.cache.Deregister(ch)
92 // state returns the current state of instance.Cache, only for testing
93 func (s *Instancer) state() sd.Event {
94 return s.cache.State()