package eureka import ( "fmt" "net/http" "sync" "time" "github.com/hudl/fargo" "github.com/go-kit/kit/log" "github.com/go-kit/kit/sd" ) // Matches official Netflix Java client default. const defaultRenewalInterval = 30 * time.Second // The methods of fargo.Connection used in this package. type fargoConnection interface { RegisterInstance(instance *fargo.Instance) error DeregisterInstance(instance *fargo.Instance) error ReregisterInstance(instance *fargo.Instance) error HeartBeatInstance(instance *fargo.Instance) error ScheduleAppUpdates(name string, await bool, done <-chan struct{}) <-chan fargo.AppUpdate GetApp(name string) (*fargo.Application, error) } type fargoUnsuccessfulHTTPResponse struct { statusCode int messagePrefix string } func (u *fargoUnsuccessfulHTTPResponse) Error() string { return fmt.Sprintf("err=%s code=%d", u.messagePrefix, u.statusCode) } // Registrar maintains service instance liveness information in Eureka. type Registrar struct { conn fargoConnection instance *fargo.Instance logger log.Logger quitc chan chan struct{} sync.Mutex } var _ sd.Registrar = (*Registrar)(nil) // NewRegistrar returns an Eureka Registrar acting on behalf of the provided // Fargo connection and instance. See the integration test for usage examples. func NewRegistrar(conn fargoConnection, instance *fargo.Instance, logger log.Logger) *Registrar { return &Registrar{ conn: conn, instance: instance, logger: log.With(logger, "service", instance.App, "address", fmt.Sprintf("%s:%d", instance.IPAddr, instance.Port)), } } // Register implements sd.Registrar. func (r *Registrar) Register() { r.Lock() defer r.Unlock() if r.quitc != nil { return // Already in the registration loop. } if err := r.conn.RegisterInstance(r.instance); err != nil { r.logger.Log("during", "Register", "err", err) } r.quitc = make(chan chan struct{}) go r.loop() } // Deregister implements sd.Registrar. func (r *Registrar) Deregister() { r.Lock() defer r.Unlock() if r.quitc == nil { return // Already deregistered. } q := make(chan struct{}) r.quitc <- q <-q r.quitc = nil } func (r *Registrar) loop() { var renewalInterval time.Duration if r.instance.LeaseInfo.RenewalIntervalInSecs > 0 { renewalInterval = time.Duration(r.instance.LeaseInfo.RenewalIntervalInSecs) * time.Second } else { renewalInterval = defaultRenewalInterval } ticker := time.NewTicker(renewalInterval) defer ticker.Stop() for { select { case <-ticker.C: if err := r.heartbeat(); err != nil { r.logger.Log("during", "heartbeat", "err", err) } case q := <-r.quitc: if err := r.conn.DeregisterInstance(r.instance); err != nil { r.logger.Log("during", "Deregister", "err", err) } close(q) return } } } func httpResponseStatusCode(err error) (code int, present bool) { if code, ok := fargo.HTTPResponseStatusCode(err); ok { return code, true } // Allow injection of errors for testing. if u, ok := err.(*fargoUnsuccessfulHTTPResponse); ok { return u.statusCode, true } return 0, false } func isNotFound(err error) bool { code, ok := httpResponseStatusCode(err) return ok && code == http.StatusNotFound } func (r *Registrar) heartbeat() error { err := r.conn.HeartBeatInstance(r.instance) if err == nil { return nil } if isNotFound(err) { // Instance expired (e.g. network partition). Re-register. return r.conn.ReregisterInstance(r.instance) } return err }