OSDN Git Service

delete miner
[bytom/vapor.git] / vendor / github.com / go-kit / kit / metrics / cloudwatch / cloudwatch.go
1 package cloudwatch
2
3 import (
4         "fmt"
5         "os"
6         "sync"
7         "time"
8
9         "github.com/aws/aws-sdk-go/aws"
10         "github.com/aws/aws-sdk-go/service/cloudwatch"
11         "github.com/aws/aws-sdk-go/service/cloudwatch/cloudwatchiface"
12
13         "github.com/go-kit/kit/log"
14         "github.com/go-kit/kit/metrics"
15         "github.com/go-kit/kit/metrics/generic"
16         "github.com/go-kit/kit/metrics/internal/lv"
17         "strconv"
18 )
19
20 const (
21         maxConcurrentRequests = 20
22 )
23
24 type Percentiles []struct {
25         s string
26         f float64
27 }
28
29 // CloudWatch receives metrics observations and forwards them to CloudWatch.
30 // Create a CloudWatch object, use it to create metrics, and pass those metrics as
31 // dependencies to the components that will use them.
32 //
33 // To regularly report metrics to CloudWatch, use the WriteLoop helper method.
34 type CloudWatch struct {
35         mtx                   sync.RWMutex
36         sem                   chan struct{}
37         namespace             string
38         svc                   cloudwatchiface.CloudWatchAPI
39         counters              *lv.Space
40         gauges                *lv.Space
41         histograms            *lv.Space
42         percentiles           []float64 // percentiles to track
43         logger                log.Logger
44         numConcurrentRequests int
45 }
46
47 type option func(*CloudWatch)
48
49 func (s *CloudWatch) apply(opt option) {
50         if opt != nil {
51                 opt(s)
52         }
53 }
54
55 func WithLogger(logger log.Logger) option {
56         return func(c *CloudWatch) {
57                 c.logger = logger
58         }
59 }
60
61 // WithPercentiles registers the percentiles to track, overriding the
62 // existing/default values.
63 // Reason is that Cloudwatch makes you pay per metric, so you can save half the money
64 // by only using 2 metrics instead of the default 4.
65 func WithPercentiles(percentiles ...float64) option {
66         return func(c *CloudWatch) {
67                 c.percentiles = make([]float64, 0, len(percentiles))
68                 for _, p := range percentiles {
69                         if p < 0 || p > 1 {
70                                 continue // illegal entry; ignore
71                         }
72                         c.percentiles = append(c.percentiles, p)
73                 }
74         }
75 }
76
77 func WithConcurrentRequests(n int) option {
78         return func(c *CloudWatch) {
79                 if n > maxConcurrentRequests {
80                         n = maxConcurrentRequests
81                 }
82                 c.numConcurrentRequests = n
83         }
84 }
85
86 // New returns a CloudWatch object that may be used to create metrics.
87 // Namespace is applied to all created metrics and maps to the CloudWatch namespace.
88 // Callers must ensure that regular calls to Send are performed, either
89 // manually or with one of the helper methods.
90 func New(namespace string, svc cloudwatchiface.CloudWatchAPI, options ...option) *CloudWatch {
91         cw := &CloudWatch{
92                 sem:                   nil, // set below
93                 namespace:             namespace,
94                 svc:                   svc,
95                 counters:              lv.NewSpace(),
96                 gauges:                lv.NewSpace(),
97                 histograms:            lv.NewSpace(),
98                 numConcurrentRequests: 10,
99                 logger:                log.NewLogfmtLogger(os.Stderr),
100                 percentiles:           []float64{0.50, 0.90, 0.95, 0.99},
101         }
102
103         for _, optFunc := range options {
104                 optFunc(cw)
105         }
106
107         cw.sem = make(chan struct{}, cw.numConcurrentRequests)
108
109         return cw
110 }
111
112 // NewCounter returns a counter. Observations are aggregated and emitted once
113 // per write invocation.
114 func (cw *CloudWatch) NewCounter(name string) metrics.Counter {
115         return &Counter{
116                 name: name,
117                 obs:  cw.counters.Observe,
118         }
119 }
120
121 // NewGauge returns an gauge.
122 func (cw *CloudWatch) NewGauge(name string) metrics.Gauge {
123         return &Gauge{
124                 name: name,
125                 obs:  cw.gauges.Observe,
126                 add:  cw.gauges.Add,
127         }
128 }
129
130 // NewHistogram returns a histogram.
131 func (cw *CloudWatch) NewHistogram(name string) metrics.Histogram {
132         return &Histogram{
133                 name: name,
134                 obs:  cw.histograms.Observe,
135         }
136 }
137
138 // WriteLoop is a helper method that invokes Send every time the passed
139 // channel fires. This method blocks until the channel is closed, so clients
140 // probably want to run it in its own goroutine. For typical usage, create a
141 // time.Ticker and pass its C channel to this method.
142 func (cw *CloudWatch) WriteLoop(c <-chan time.Time) {
143         for range c {
144                 if err := cw.Send(); err != nil {
145                         cw.logger.Log("during", "Send", "err", err)
146                 }
147         }
148 }
149
150 // Send will fire an API request to CloudWatch with the latest stats for
151 // all metrics. It is preferred that the WriteLoop method is used.
152 func (cw *CloudWatch) Send() error {
153         cw.mtx.RLock()
154         defer cw.mtx.RUnlock()
155         now := time.Now()
156
157         var datums []*cloudwatch.MetricDatum
158
159         cw.counters.Reset().Walk(func(name string, lvs lv.LabelValues, values []float64) bool {
160                 value := sum(values)
161                 datums = append(datums, &cloudwatch.MetricDatum{
162                         MetricName: aws.String(name),
163                         Dimensions: makeDimensions(lvs...),
164                         Value:      aws.Float64(value),
165                         Timestamp:  aws.Time(now),
166                 })
167                 return true
168         })
169
170         cw.gauges.Reset().Walk(func(name string, lvs lv.LabelValues, values []float64) bool {
171                 value := last(values)
172                 datums = append(datums, &cloudwatch.MetricDatum{
173                         MetricName: aws.String(name),
174                         Dimensions: makeDimensions(lvs...),
175                         Value:      aws.Float64(value),
176                         Timestamp:  aws.Time(now),
177                 })
178                 return true
179         })
180
181         // format a [0,1]-float value to a percentile value, with minimum nr of decimals
182         // 0.90 -> "90"
183         // 0.95 -> "95"
184         // 0.999 -> "99.9"
185         formatPerc := func(p float64) string {
186                 return strconv.FormatFloat(p*100, 'f', -1, 64)
187         }
188
189         cw.histograms.Reset().Walk(func(name string, lvs lv.LabelValues, values []float64) bool {
190                 histogram := generic.NewHistogram(name, 50)
191
192                 for _, v := range values {
193                         histogram.Observe(v)
194                 }
195
196                 for _, perc := range cw.percentiles {
197                         value := histogram.Quantile(perc)
198                         datums = append(datums, &cloudwatch.MetricDatum{
199                                 MetricName: aws.String(fmt.Sprintf("%s_%s", name, formatPerc(perc))),
200                                 Dimensions: makeDimensions(lvs...),
201                                 Value:      aws.Float64(value),
202                                 Timestamp:  aws.Time(now),
203                         })
204                 }
205                 return true
206         })
207
208         var batches [][]*cloudwatch.MetricDatum
209         for len(datums) > 0 {
210                 var batch []*cloudwatch.MetricDatum
211                 lim := min(len(datums), maxConcurrentRequests)
212                 batch, datums = datums[:lim], datums[lim:]
213                 batches = append(batches, batch)
214         }
215
216         var errors = make(chan error, len(batches))
217         for _, batch := range batches {
218                 go func(batch []*cloudwatch.MetricDatum) {
219                         cw.sem <- struct{}{}
220                         defer func() {
221                                 <-cw.sem
222                         }()
223                         _, err := cw.svc.PutMetricData(&cloudwatch.PutMetricDataInput{
224                                 Namespace:  aws.String(cw.namespace),
225                                 MetricData: batch,
226                         })
227                         errors <- err
228                 }(batch)
229         }
230         var firstErr error
231         for i := 0; i < cap(errors); i++ {
232                 if err := <-errors; err != nil && firstErr != nil {
233                         firstErr = err
234                 }
235         }
236
237         return firstErr
238 }
239
240 func sum(a []float64) float64 {
241         var v float64
242         for _, f := range a {
243                 v += f
244         }
245         return v
246 }
247
248 func last(a []float64) float64 {
249         return a[len(a)-1]
250 }
251
252 func min(a, b int) int {
253         if a < b {
254                 return a
255         }
256         return b
257 }
258
259 type observeFunc func(name string, lvs lv.LabelValues, value float64)
260
261 // Counter is a counter. Observations are forwarded to a node
262 // object, and aggregated (summed) per timeseries.
263 type Counter struct {
264         name string
265         lvs  lv.LabelValues
266         obs  observeFunc
267 }
268
269 // With implements metrics.Counter.
270 func (c *Counter) With(labelValues ...string) metrics.Counter {
271         return &Counter{
272                 name: c.name,
273                 lvs:  c.lvs.With(labelValues...),
274                 obs:  c.obs,
275         }
276 }
277
278 // Add implements metrics.Counter.
279 func (c *Counter) Add(delta float64) {
280         c.obs(c.name, c.lvs, delta)
281 }
282
283 // Gauge is a gauge. Observations are forwarded to a node
284 // object, and aggregated (the last observation selected) per timeseries.
285 type Gauge struct {
286         name string
287         lvs  lv.LabelValues
288         obs  observeFunc
289         add  observeFunc
290 }
291
292 // With implements metrics.Gauge.
293 func (g *Gauge) With(labelValues ...string) metrics.Gauge {
294         return &Gauge{
295                 name: g.name,
296                 lvs:  g.lvs.With(labelValues...),
297                 obs:  g.obs,
298                 add:  g.add,
299         }
300 }
301
302 // Set implements metrics.Gauge.
303 func (g *Gauge) Set(value float64) {
304         g.obs(g.name, g.lvs, value)
305 }
306
307 // Add implements metrics.Gauge.
308 func (g *Gauge) Add(delta float64) {
309         g.add(g.name, g.lvs, delta)
310 }
311
312 // Histogram is an Influx histrogram. Observations are aggregated into a
313 // generic.Histogram and emitted as per-quantile gauges to the Influx server.
314 type Histogram struct {
315         name string
316         lvs  lv.LabelValues
317         obs  observeFunc
318 }
319
320 // With implements metrics.Histogram.
321 func (h *Histogram) With(labelValues ...string) metrics.Histogram {
322         return &Histogram{
323                 name: h.name,
324                 lvs:  h.lvs.With(labelValues...),
325                 obs:  h.obs,
326         }
327 }
328
329 // Observe implements metrics.Histogram.
330 func (h *Histogram) Observe(value float64) {
331         h.obs(h.name, h.lvs, value)
332 }
333
334 func makeDimensions(labelValues ...string) []*cloudwatch.Dimension {
335         dimensions := make([]*cloudwatch.Dimension, len(labelValues)/2)
336         for i, j := 0, 0; i < len(labelValues); i, j = i+2, j+1 {
337                 dimensions[j] = &cloudwatch.Dimension{
338                         Name:  aws.String(labelValues[i]),
339                         Value: aws.String(labelValues[i+1]),
340                 }
341         }
342         return dimensions
343 }