OSDN Git Service

delete miner
[bytom/vapor.git] / vendor / github.com / go-kit / kit / sd / eureka / registrar.go
1 package eureka
2
3 import (
4         "fmt"
5         "net/http"
6         "sync"
7         "time"
8
9         "github.com/hudl/fargo"
10
11         "github.com/go-kit/kit/log"
12         "github.com/go-kit/kit/sd"
13 )
14
15 // Matches official Netflix Java client default.
16 const defaultRenewalInterval = 30 * time.Second
17
18 // The methods of fargo.Connection used in this package.
19 type fargoConnection interface {
20         RegisterInstance(instance *fargo.Instance) error
21         DeregisterInstance(instance *fargo.Instance) error
22         ReregisterInstance(instance *fargo.Instance) error
23         HeartBeatInstance(instance *fargo.Instance) error
24         ScheduleAppUpdates(name string, await bool, done <-chan struct{}) <-chan fargo.AppUpdate
25         GetApp(name string) (*fargo.Application, error)
26 }
27
28 type fargoUnsuccessfulHTTPResponse struct {
29         statusCode    int
30         messagePrefix string
31 }
32
33 func (u *fargoUnsuccessfulHTTPResponse) Error() string {
34         return fmt.Sprintf("err=%s code=%d", u.messagePrefix, u.statusCode)
35 }
36
37 // Registrar maintains service instance liveness information in Eureka.
38 type Registrar struct {
39         conn     fargoConnection
40         instance *fargo.Instance
41         logger   log.Logger
42         quitc    chan chan struct{}
43         sync.Mutex
44 }
45
46 var _ sd.Registrar = (*Registrar)(nil)
47
48 // NewRegistrar returns an Eureka Registrar acting on behalf of the provided
49 // Fargo connection and instance. See the integration test for usage examples.
50 func NewRegistrar(conn fargoConnection, instance *fargo.Instance, logger log.Logger) *Registrar {
51         return &Registrar{
52                 conn:     conn,
53                 instance: instance,
54                 logger:   log.With(logger, "service", instance.App, "address", fmt.Sprintf("%s:%d", instance.IPAddr, instance.Port)),
55         }
56 }
57
58 // Register implements sd.Registrar.
59 func (r *Registrar) Register() {
60         r.Lock()
61         defer r.Unlock()
62
63         if r.quitc != nil {
64                 return // Already in the registration loop.
65         }
66
67         if err := r.conn.RegisterInstance(r.instance); err != nil {
68                 r.logger.Log("during", "Register", "err", err)
69         }
70
71         r.quitc = make(chan chan struct{})
72         go r.loop()
73 }
74
75 // Deregister implements sd.Registrar.
76 func (r *Registrar) Deregister() {
77         r.Lock()
78         defer r.Unlock()
79
80         if r.quitc == nil {
81                 return // Already deregistered.
82         }
83
84         q := make(chan struct{})
85         r.quitc <- q
86         <-q
87         r.quitc = nil
88 }
89
90 func (r *Registrar) loop() {
91         var renewalInterval time.Duration
92         if r.instance.LeaseInfo.RenewalIntervalInSecs > 0 {
93                 renewalInterval = time.Duration(r.instance.LeaseInfo.RenewalIntervalInSecs) * time.Second
94         } else {
95                 renewalInterval = defaultRenewalInterval
96         }
97         ticker := time.NewTicker(renewalInterval)
98         defer ticker.Stop()
99
100         for {
101                 select {
102                 case <-ticker.C:
103                         if err := r.heartbeat(); err != nil {
104                                 r.logger.Log("during", "heartbeat", "err", err)
105                         }
106
107                 case q := <-r.quitc:
108                         if err := r.conn.DeregisterInstance(r.instance); err != nil {
109                                 r.logger.Log("during", "Deregister", "err", err)
110                         }
111                         close(q)
112                         return
113                 }
114         }
115 }
116
117 func httpResponseStatusCode(err error) (code int, present bool) {
118         if code, ok := fargo.HTTPResponseStatusCode(err); ok {
119                 return code, true
120         }
121         // Allow injection of errors for testing.
122         if u, ok := err.(*fargoUnsuccessfulHTTPResponse); ok {
123                 return u.statusCode, true
124         }
125         return 0, false
126 }
127
128 func isNotFound(err error) bool {
129         code, ok := httpResponseStatusCode(err)
130         return ok && code == http.StatusNotFound
131 }
132
133 func (r *Registrar) heartbeat() error {
134         err := r.conn.HeartBeatInstance(r.instance)
135         if err == nil {
136                 return nil
137         }
138         if isNotFound(err) {
139                 // Instance expired (e.g. network partition). Re-register.
140                 return r.conn.ReregisterInstance(r.instance)
141         }
142         return err
143 }