6 "github.com/hudl/fargo"
8 "github.com/go-kit/kit/log"
9 "github.com/go-kit/kit/sd"
10 "github.com/go-kit/kit/sd/internal/instance"
13 // Instancer yields instances stored in the Eureka registry for the given app.
14 // Changes in that app are watched and will update the subscribers.
15 type Instancer struct {
20 quitc chan chan struct{}
23 // NewInstancer returns a Eureka Instancer. It will start watching the given
24 // app string for changes, and update the subscribers accordingly.
25 func NewInstancer(conn fargoConnection, app string, logger log.Logger) *Instancer {
26 logger = log.With(logger, "app", app)
29 cache: instance.NewCache(),
33 quitc: make(chan chan struct{}),
36 done := make(chan struct{})
37 updates := conn.ScheduleAppUpdates(app, true, done)
39 go s.loop(updates, done)
43 // Stop terminates the Instancer.
44 func (s *Instancer) Stop() {
45 q := make(chan struct{})
51 func (s *Instancer) consume(update fargo.AppUpdate) {
52 if update.Err != nil {
53 s.logger.Log("during", "Update", "err", update.Err)
54 s.cache.Update(sd.Event{Err: update.Err})
57 instances := convertFargoAppToInstances(update.App)
58 s.logger.Log("instances", len(instances))
59 s.cache.Update(sd.Event{Instances: instances})
62 func (s *Instancer) loop(updates <-chan fargo.AppUpdate, done chan<- struct{}) {
67 case update := <-updates:
76 func (s *Instancer) getInstances() ([]string, error) {
77 app, err := s.conn.GetApp(s.app)
81 return convertFargoAppToInstances(app), nil
84 func convertFargoAppToInstances(app *fargo.Application) []string {
85 instances := make([]string, len(app.Instances))
86 for i, inst := range app.Instances {
87 instances[i] = fmt.Sprintf("%s:%d", inst.IPAddr, inst.Port)
92 // Register implements Instancer.
93 func (s *Instancer) Register(ch chan<- sd.Event) {
97 // Deregister implements Instancer.
98 func (s *Instancer) Deregister(ch chan<- sd.Event) {
99 s.cache.Deregister(ch)
102 // state returns the current state of instance.Cache, only for testing
103 func (s *Instancer) state() sd.Event {
104 return s.cache.State()