9 "github.com/hudl/fargo"
11 "github.com/go-kit/kit/log"
12 "github.com/go-kit/kit/sd"
15 // Matches official Netflix Java client default.
16 const defaultRenewalInterval = 30 * time.Second
18 // The methods of fargo.Connection used in this package.
19 type fargoConnection interface {
20 RegisterInstance(instance *fargo.Instance) error
21 DeregisterInstance(instance *fargo.Instance) error
22 ReregisterInstance(instance *fargo.Instance) error
23 HeartBeatInstance(instance *fargo.Instance) error
24 ScheduleAppUpdates(name string, await bool, done <-chan struct{}) <-chan fargo.AppUpdate
25 GetApp(name string) (*fargo.Application, error)
28 type fargoUnsuccessfulHTTPResponse struct {
33 func (u *fargoUnsuccessfulHTTPResponse) Error() string {
34 return fmt.Sprintf("err=%s code=%d", u.messagePrefix, u.statusCode)
37 // Registrar maintains service instance liveness information in Eureka.
38 type Registrar struct {
40 instance *fargo.Instance
42 quitc chan chan struct{}
46 var _ sd.Registrar = (*Registrar)(nil)
48 // NewRegistrar returns an Eureka Registrar acting on behalf of the provided
49 // Fargo connection and instance. See the integration test for usage examples.
50 func NewRegistrar(conn fargoConnection, instance *fargo.Instance, logger log.Logger) *Registrar {
54 logger: log.With(logger, "service", instance.App, "address", fmt.Sprintf("%s:%d", instance.IPAddr, instance.Port)),
58 // Register implements sd.Registrar.
59 func (r *Registrar) Register() {
64 return // Already in the registration loop.
67 if err := r.conn.RegisterInstance(r.instance); err != nil {
68 r.logger.Log("during", "Register", "err", err)
71 r.quitc = make(chan chan struct{})
75 // Deregister implements sd.Registrar.
76 func (r *Registrar) Deregister() {
81 return // Already deregistered.
84 q := make(chan struct{})
90 func (r *Registrar) loop() {
91 var renewalInterval time.Duration
92 if r.instance.LeaseInfo.RenewalIntervalInSecs > 0 {
93 renewalInterval = time.Duration(r.instance.LeaseInfo.RenewalIntervalInSecs) * time.Second
95 renewalInterval = defaultRenewalInterval
97 ticker := time.NewTicker(renewalInterval)
103 if err := r.heartbeat(); err != nil {
104 r.logger.Log("during", "heartbeat", "err", err)
108 if err := r.conn.DeregisterInstance(r.instance); err != nil {
109 r.logger.Log("during", "Deregister", "err", err)
117 func httpResponseStatusCode(err error) (code int, present bool) {
118 if code, ok := fargo.HTTPResponseStatusCode(err); ok {
121 // Allow injection of errors for testing.
122 if u, ok := err.(*fargoUnsuccessfulHTTPResponse); ok {
123 return u.statusCode, true
128 func isNotFound(err error) bool {
129 code, ok := httpResponseStatusCode(err)
130 return ok && code == http.StatusNotFound
133 func (r *Registrar) heartbeat() error {
134 err := r.conn.HeartBeatInstance(r.instance)
139 // Instance expired (e.g. network partition). Re-register.
140 return r.conn.ReregisterInstance(r.instance)