package eureka import ( "fmt" "github.com/hudl/fargo" "github.com/go-kit/kit/log" "github.com/go-kit/kit/sd" "github.com/go-kit/kit/sd/internal/instance" ) // Instancer yields instances stored in the Eureka registry for the given app. // Changes in that app are watched and will update the subscribers. type Instancer struct { cache *instance.Cache conn fargoConnection app string logger log.Logger quitc chan chan struct{} } // NewInstancer returns a Eureka Instancer. It will start watching the given // app string for changes, and update the subscribers accordingly. func NewInstancer(conn fargoConnection, app string, logger log.Logger) *Instancer { logger = log.With(logger, "app", app) s := &Instancer{ cache: instance.NewCache(), conn: conn, app: app, logger: logger, quitc: make(chan chan struct{}), } done := make(chan struct{}) updates := conn.ScheduleAppUpdates(app, true, done) s.consume(<-updates) go s.loop(updates, done) return s } // Stop terminates the Instancer. func (s *Instancer) Stop() { q := make(chan struct{}) s.quitc <- q <-q s.quitc = nil } func (s *Instancer) consume(update fargo.AppUpdate) { if update.Err != nil { s.logger.Log("during", "Update", "err", update.Err) s.cache.Update(sd.Event{Err: update.Err}) return } instances := convertFargoAppToInstances(update.App) s.logger.Log("instances", len(instances)) s.cache.Update(sd.Event{Instances: instances}) } func (s *Instancer) loop(updates <-chan fargo.AppUpdate, done chan<- struct{}) { defer close(done) for { select { case update := <-updates: s.consume(update) case q := <-s.quitc: close(q) return } } } func (s *Instancer) getInstances() ([]string, error) { app, err := s.conn.GetApp(s.app) if err != nil { return nil, err } return convertFargoAppToInstances(app), nil } func convertFargoAppToInstances(app *fargo.Application) []string { instances := make([]string, len(app.Instances)) for i, inst := range app.Instances { instances[i] = fmt.Sprintf("%s:%d", inst.IPAddr, inst.Port) } return instances } // Register implements Instancer. func (s *Instancer) Register(ch chan<- sd.Event) { s.cache.Register(ch) } // Deregister implements Instancer. func (s *Instancer) Deregister(ch chan<- sd.Event) { s.cache.Deregister(ch) } // state returns the current state of instance.Cache, only for testing func (s *Instancer) state() sd.Event { return s.cache.State() }