OSDN Git Service

delete miner
[bytom/vapor.git] / vendor / github.com / go-kit / kit / sd / endpoint_cache_test.go
1 package sd
2
3 import (
4         "errors"
5         "io"
6         "testing"
7         "time"
8
9         "github.com/go-kit/kit/endpoint"
10         "github.com/go-kit/kit/log"
11 )
12
13 func TestEndpointCache(t *testing.T) {
14         var (
15                 ca    = make(closer)
16                 cb    = make(closer)
17                 c     = map[string]io.Closer{"a": ca, "b": cb}
18                 f     = func(instance string) (endpoint.Endpoint, io.Closer, error) { return endpoint.Nop, c[instance], nil }
19                 cache = newEndpointCache(f, log.NewNopLogger(), endpointerOptions{})
20         )
21
22         // Populate
23         cache.Update(Event{Instances: []string{"a", "b"}})
24         select {
25         case <-ca:
26                 t.Errorf("endpoint a closed, not good")
27         case <-cb:
28                 t.Errorf("endpoint b closed, not good")
29         case <-time.After(time.Millisecond):
30                 t.Logf("no closures yet, good")
31         }
32         assertEndpointsLen(t, cache, 2)
33
34         // Duplicate, should be no-op
35         cache.Update(Event{Instances: []string{"a", "b"}})
36         select {
37         case <-ca:
38                 t.Errorf("endpoint a closed, not good")
39         case <-cb:
40                 t.Errorf("endpoint b closed, not good")
41         case <-time.After(time.Millisecond):
42                 t.Logf("no closures yet, good")
43         }
44         assertEndpointsLen(t, cache, 2)
45
46         // Error, should continue returning old endpoints
47         cache.Update(Event{Err: errors.New("sd error")})
48         select {
49         case <-ca:
50                 t.Errorf("endpoint a closed, not good")
51         case <-cb:
52                 t.Errorf("endpoint b closed, not good")
53         case <-time.After(time.Millisecond):
54                 t.Logf("no closures yet, good")
55         }
56         assertEndpointsLen(t, cache, 2)
57
58         // Delete b
59         go cache.Update(Event{Instances: []string{"a"}})
60         select {
61         case <-ca:
62                 t.Errorf("endpoint a closed, not good")
63         case <-cb:
64                 t.Logf("endpoint b closed, good")
65         case <-time.After(time.Second):
66                 t.Errorf("didn't close the deleted instance in time")
67         }
68         assertEndpointsLen(t, cache, 1)
69
70         // Delete a
71         go cache.Update(Event{Instances: []string{}})
72         select {
73         // case <-cb: will succeed, as it's closed
74         case <-ca:
75                 t.Logf("endpoint a closed, good")
76         case <-time.After(time.Second):
77                 t.Errorf("didn't close the deleted instance in time")
78         }
79         assertEndpointsLen(t, cache, 0)
80 }
81
82 func TestEndpointCacheErrorAndTimeout(t *testing.T) {
83         var (
84                 ca      = make(closer)
85                 cb      = make(closer)
86                 c       = map[string]io.Closer{"a": ca, "b": cb}
87                 f       = func(instance string) (endpoint.Endpoint, io.Closer, error) { return endpoint.Nop, c[instance], nil }
88                 timeOut = 100 * time.Millisecond
89                 cache   = newEndpointCache(f, log.NewNopLogger(), endpointerOptions{
90                         invalidateOnError: true,
91                         invalidateTimeout: timeOut,
92                 })
93         )
94
95         timeNow := time.Now()
96         cache.timeNow = func() time.Time { return timeNow }
97
98         // Populate
99         cache.Update(Event{Instances: []string{"a"}})
100         select {
101         case <-ca:
102                 t.Errorf("endpoint a closed, not good")
103         case <-time.After(time.Millisecond):
104                 t.Logf("no closures yet, good")
105         }
106         assertEndpointsLen(t, cache, 1)
107
108         // Send error, keep time still.
109         cache.Update(Event{Err: errors.New("sd error")})
110         select {
111         case <-ca:
112                 t.Errorf("endpoint a closed, not good")
113         case <-time.After(time.Millisecond):
114                 t.Logf("no closures yet, good")
115         }
116         assertEndpointsLen(t, cache, 1)
117
118         // Move the time, but less than the timeout
119         timeNow = timeNow.Add(timeOut / 2)
120         assertEndpointsLen(t, cache, 1)
121         select {
122         case <-ca:
123                 t.Errorf("endpoint a closed, not good")
124         case <-time.After(time.Millisecond):
125                 t.Logf("no closures yet, good")
126         }
127
128         // Move the time past the timeout
129         timeNow = timeNow.Add(timeOut)
130         assertEndpointsError(t, cache, "sd error")
131         select {
132         case <-ca:
133                 t.Logf("endpoint a closed, good")
134         case <-time.After(time.Millisecond):
135                 t.Errorf("didn't close the deleted instance in time")
136         }
137
138         // Send another error
139         cache.Update(Event{Err: errors.New("another sd error")})
140         assertEndpointsError(t, cache, "sd error") // expect original error
141 }
142
143 func TestBadFactory(t *testing.T) {
144         cache := newEndpointCache(func(string) (endpoint.Endpoint, io.Closer, error) {
145                 return nil, nil, errors.New("bad factory")
146         }, log.NewNopLogger(), endpointerOptions{})
147
148         cache.Update(Event{Instances: []string{"foo:1234", "bar:5678"}})
149         assertEndpointsLen(t, cache, 0)
150 }
151
152 func assertEndpointsLen(t *testing.T, cache *endpointCache, l int) {
153         endpoints, err := cache.Endpoints()
154         if err != nil {
155                 t.Errorf("unexpected error %v", err)
156                 return
157         }
158         if want, have := l, len(endpoints); want != have {
159                 t.Errorf("want %d, have %d", want, have)
160         }
161 }
162
163 func assertEndpointsError(t *testing.T, cache *endpointCache, wantErr string) {
164         endpoints, err := cache.Endpoints()
165         if err == nil {
166                 t.Errorf("expecting error, not good")
167                 return
168         }
169         if want, have := wantErr, err.Error(); want != have {
170                 t.Errorf("want %s, have %s", want, have)
171                 return
172         }
173         if want, have := 0, len(endpoints); want != have {
174                 t.Errorf("want %d, have %d", want, have)
175         }
176 }
177
178 type closer chan struct{}
179
180 func (c closer) Close() error { close(c); return nil }