OSDN Git Service

delete miner
[bytom/vapor.git] / vendor / github.com / go-kit / kit / sd / eureka / instancer.go
1 package eureka
2
3 import (
4         "fmt"
5
6         "github.com/hudl/fargo"
7
8         "github.com/go-kit/kit/log"
9         "github.com/go-kit/kit/sd"
10         "github.com/go-kit/kit/sd/internal/instance"
11 )
12
13 // Instancer yields instances stored in the Eureka registry for the given app.
14 // Changes in that app are watched and will update the subscribers.
15 type Instancer struct {
16         cache  *instance.Cache
17         conn   fargoConnection
18         app    string
19         logger log.Logger
20         quitc  chan chan struct{}
21 }
22
23 // NewInstancer returns a Eureka Instancer. It will start watching the given
24 // app string for changes, and update the subscribers accordingly.
25 func NewInstancer(conn fargoConnection, app string, logger log.Logger) *Instancer {
26         logger = log.With(logger, "app", app)
27
28         s := &Instancer{
29                 cache:  instance.NewCache(),
30                 conn:   conn,
31                 app:    app,
32                 logger: logger,
33                 quitc:  make(chan chan struct{}),
34         }
35
36         done := make(chan struct{})
37         updates := conn.ScheduleAppUpdates(app, true, done)
38         s.consume(<-updates)
39         go s.loop(updates, done)
40         return s
41 }
42
43 // Stop terminates the Instancer.
44 func (s *Instancer) Stop() {
45         q := make(chan struct{})
46         s.quitc <- q
47         <-q
48         s.quitc = nil
49 }
50
51 func (s *Instancer) consume(update fargo.AppUpdate) {
52         if update.Err != nil {
53                 s.logger.Log("during", "Update", "err", update.Err)
54                 s.cache.Update(sd.Event{Err: update.Err})
55                 return
56         }
57         instances := convertFargoAppToInstances(update.App)
58         s.logger.Log("instances", len(instances))
59         s.cache.Update(sd.Event{Instances: instances})
60 }
61
62 func (s *Instancer) loop(updates <-chan fargo.AppUpdate, done chan<- struct{}) {
63         defer close(done)
64
65         for {
66                 select {
67                 case update := <-updates:
68                         s.consume(update)
69                 case q := <-s.quitc:
70                         close(q)
71                         return
72                 }
73         }
74 }
75
76 func (s *Instancer) getInstances() ([]string, error) {
77         app, err := s.conn.GetApp(s.app)
78         if err != nil {
79                 return nil, err
80         }
81         return convertFargoAppToInstances(app), nil
82 }
83
84 func convertFargoAppToInstances(app *fargo.Application) []string {
85         instances := make([]string, len(app.Instances))
86         for i, inst := range app.Instances {
87                 instances[i] = fmt.Sprintf("%s:%d", inst.IPAddr, inst.Port)
88         }
89         return instances
90 }
91
92 // Register implements Instancer.
93 func (s *Instancer) Register(ch chan<- sd.Event) {
94         s.cache.Register(ch)
95 }
96
97 // Deregister implements Instancer.
98 func (s *Instancer) Deregister(ch chan<- sd.Event) {
99         s.cache.Deregister(ch)
100 }
101
102 // state returns the current state of instance.Cache, only for testing
103 func (s *Instancer) state() sd.Event {
104         return s.cache.State()
105 }