9 "github.com/aws/aws-sdk-go/aws"
10 "github.com/aws/aws-sdk-go/service/cloudwatch"
11 "github.com/aws/aws-sdk-go/service/cloudwatch/cloudwatchiface"
13 "github.com/go-kit/kit/log"
14 "github.com/go-kit/kit/metrics"
15 "github.com/go-kit/kit/metrics/generic"
16 "github.com/go-kit/kit/metrics/internal/lv"
21 maxConcurrentRequests = 20
24 type Percentiles []struct {
29 // CloudWatch receives metrics observations and forwards them to CloudWatch.
30 // Create a CloudWatch object, use it to create metrics, and pass those metrics as
31 // dependencies to the components that will use them.
33 // To regularly report metrics to CloudWatch, use the WriteLoop helper method.
34 type CloudWatch struct {
38 svc cloudwatchiface.CloudWatchAPI
42 percentiles []float64 // percentiles to track
44 numConcurrentRequests int
47 type option func(*CloudWatch)
49 func (s *CloudWatch) apply(opt option) {
55 func WithLogger(logger log.Logger) option {
56 return func(c *CloudWatch) {
61 // WithPercentiles registers the percentiles to track, overriding the
62 // existing/default values.
63 // Reason is that Cloudwatch makes you pay per metric, so you can save half the money
64 // by only using 2 metrics instead of the default 4.
65 func WithPercentiles(percentiles ...float64) option {
66 return func(c *CloudWatch) {
67 c.percentiles = make([]float64, 0, len(percentiles))
68 for _, p := range percentiles {
70 continue // illegal entry; ignore
72 c.percentiles = append(c.percentiles, p)
77 func WithConcurrentRequests(n int) option {
78 return func(c *CloudWatch) {
79 if n > maxConcurrentRequests {
80 n = maxConcurrentRequests
82 c.numConcurrentRequests = n
86 // New returns a CloudWatch object that may be used to create metrics.
87 // Namespace is applied to all created metrics and maps to the CloudWatch namespace.
88 // Callers must ensure that regular calls to Send are performed, either
89 // manually or with one of the helper methods.
90 func New(namespace string, svc cloudwatchiface.CloudWatchAPI, options ...option) *CloudWatch {
92 sem: nil, // set below
95 counters: lv.NewSpace(),
96 gauges: lv.NewSpace(),
97 histograms: lv.NewSpace(),
98 numConcurrentRequests: 10,
99 logger: log.NewLogfmtLogger(os.Stderr),
100 percentiles: []float64{0.50, 0.90, 0.95, 0.99},
103 for _, optFunc := range options {
107 cw.sem = make(chan struct{}, cw.numConcurrentRequests)
112 // NewCounter returns a counter. Observations are aggregated and emitted once
113 // per write invocation.
114 func (cw *CloudWatch) NewCounter(name string) metrics.Counter {
117 obs: cw.counters.Observe,
121 // NewGauge returns an gauge.
122 func (cw *CloudWatch) NewGauge(name string) metrics.Gauge {
125 obs: cw.gauges.Observe,
130 // NewHistogram returns a histogram.
131 func (cw *CloudWatch) NewHistogram(name string) metrics.Histogram {
134 obs: cw.histograms.Observe,
138 // WriteLoop is a helper method that invokes Send every time the passed
139 // channel fires. This method blocks until the channel is closed, so clients
140 // probably want to run it in its own goroutine. For typical usage, create a
141 // time.Ticker and pass its C channel to this method.
142 func (cw *CloudWatch) WriteLoop(c <-chan time.Time) {
144 if err := cw.Send(); err != nil {
145 cw.logger.Log("during", "Send", "err", err)
150 // Send will fire an API request to CloudWatch with the latest stats for
151 // all metrics. It is preferred that the WriteLoop method is used.
152 func (cw *CloudWatch) Send() error {
154 defer cw.mtx.RUnlock()
157 var datums []*cloudwatch.MetricDatum
159 cw.counters.Reset().Walk(func(name string, lvs lv.LabelValues, values []float64) bool {
161 datums = append(datums, &cloudwatch.MetricDatum{
162 MetricName: aws.String(name),
163 Dimensions: makeDimensions(lvs...),
164 Value: aws.Float64(value),
165 Timestamp: aws.Time(now),
170 cw.gauges.Reset().Walk(func(name string, lvs lv.LabelValues, values []float64) bool {
171 value := last(values)
172 datums = append(datums, &cloudwatch.MetricDatum{
173 MetricName: aws.String(name),
174 Dimensions: makeDimensions(lvs...),
175 Value: aws.Float64(value),
176 Timestamp: aws.Time(now),
181 // format a [0,1]-float value to a percentile value, with minimum nr of decimals
185 formatPerc := func(p float64) string {
186 return strconv.FormatFloat(p*100, 'f', -1, 64)
189 cw.histograms.Reset().Walk(func(name string, lvs lv.LabelValues, values []float64) bool {
190 histogram := generic.NewHistogram(name, 50)
192 for _, v := range values {
196 for _, perc := range cw.percentiles {
197 value := histogram.Quantile(perc)
198 datums = append(datums, &cloudwatch.MetricDatum{
199 MetricName: aws.String(fmt.Sprintf("%s_%s", name, formatPerc(perc))),
200 Dimensions: makeDimensions(lvs...),
201 Value: aws.Float64(value),
202 Timestamp: aws.Time(now),
208 var batches [][]*cloudwatch.MetricDatum
209 for len(datums) > 0 {
210 var batch []*cloudwatch.MetricDatum
211 lim := min(len(datums), maxConcurrentRequests)
212 batch, datums = datums[:lim], datums[lim:]
213 batches = append(batches, batch)
216 var errors = make(chan error, len(batches))
217 for _, batch := range batches {
218 go func(batch []*cloudwatch.MetricDatum) {
223 _, err := cw.svc.PutMetricData(&cloudwatch.PutMetricDataInput{
224 Namespace: aws.String(cw.namespace),
231 for i := 0; i < cap(errors); i++ {
232 if err := <-errors; err != nil && firstErr != nil {
240 func sum(a []float64) float64 {
242 for _, f := range a {
248 func last(a []float64) float64 {
252 func min(a, b int) int {
259 type observeFunc func(name string, lvs lv.LabelValues, value float64)
261 // Counter is a counter. Observations are forwarded to a node
262 // object, and aggregated (summed) per timeseries.
263 type Counter struct {
269 // With implements metrics.Counter.
270 func (c *Counter) With(labelValues ...string) metrics.Counter {
273 lvs: c.lvs.With(labelValues...),
278 // Add implements metrics.Counter.
279 func (c *Counter) Add(delta float64) {
280 c.obs(c.name, c.lvs, delta)
283 // Gauge is a gauge. Observations are forwarded to a node
284 // object, and aggregated (the last observation selected) per timeseries.
292 // With implements metrics.Gauge.
293 func (g *Gauge) With(labelValues ...string) metrics.Gauge {
296 lvs: g.lvs.With(labelValues...),
302 // Set implements metrics.Gauge.
303 func (g *Gauge) Set(value float64) {
304 g.obs(g.name, g.lvs, value)
307 // Add implements metrics.Gauge.
308 func (g *Gauge) Add(delta float64) {
309 g.add(g.name, g.lvs, delta)
312 // Histogram is an Influx histrogram. Observations are aggregated into a
313 // generic.Histogram and emitted as per-quantile gauges to the Influx server.
314 type Histogram struct {
320 // With implements metrics.Histogram.
321 func (h *Histogram) With(labelValues ...string) metrics.Histogram {
324 lvs: h.lvs.With(labelValues...),
329 // Observe implements metrics.Histogram.
330 func (h *Histogram) Observe(value float64) {
331 h.obs(h.name, h.lvs, value)
334 func makeDimensions(labelValues ...string) []*cloudwatch.Dimension {
335 dimensions := make([]*cloudwatch.Dimension, len(labelValues)/2)
336 for i, j := 0, 0; i < len(labelValues); i, j = i+2, j+1 {
337 dimensions[j] = &cloudwatch.Dimension{
338 Name: aws.String(labelValues[i]),
339 Value: aws.String(labelValues[i+1]),