1 // Package influx provides an InfluxDB implementation for metrics. The model is
2 // similar to other push-based instrumentation systems. Observations are
3 // aggregated locally and emitted to the Influx server on regular intervals.
9 influxdb "github.com/influxdata/influxdb/client/v2"
11 "github.com/go-kit/kit/log"
12 "github.com/go-kit/kit/metrics"
13 "github.com/go-kit/kit/metrics/generic"
14 "github.com/go-kit/kit/metrics/internal/lv"
17 // Influx is a store for metrics that will be emitted to an Influx database.
19 // Influx is a general purpose time-series database, and has no native concepts
20 // of counters, gauges, or histograms. Counters are modeled as a timeseries with
21 // one data point per flush, with a "count" field that reflects all adds since
22 // the last flush. Gauges are modeled as a timeseries with one data point per
23 // flush, with a "value" field that reflects the current state of the gauge.
24 // Histograms are modeled as a timeseries with one data point per combination of tags,
25 // with a set of quantile fields that reflects the p50, p90, p95 & p99.
27 // Influx tags are attached to the Influx object, can be given to each
28 // metric at construction and can be updated anytime via With function. Influx fields
29 // are mapped to Go kit label values directly by this collector. Actual metric
30 // values are provided as fields with specific names depending on the metric.
32 // All observations are collected in memory locally, and flushed on demand.
37 tags map[string]string
38 conf influxdb.BatchPointsConfig
42 // New returns an Influx, ready to create metrics and collect observations. Tags
43 // are applied to all metrics created from this object. The BatchPointsConfig is
44 // used during flushing.
45 func New(tags map[string]string, conf influxdb.BatchPointsConfig, logger log.Logger) *Influx {
47 counters: lv.NewSpace(),
48 gauges: lv.NewSpace(),
49 histograms: lv.NewSpace(),
56 // NewCounter returns an Influx counter.
57 func (in *Influx) NewCounter(name string) *Counter {
60 obs: in.counters.Observe,
64 // NewGauge returns an Influx gauge.
65 func (in *Influx) NewGauge(name string) *Gauge {
68 obs: in.gauges.Observe,
73 // NewHistogram returns an Influx histogram.
74 func (in *Influx) NewHistogram(name string) *Histogram {
77 obs: in.histograms.Observe,
81 // BatchPointsWriter captures a subset of the influxdb.Client methods necessary
82 // for emitting metrics observations.
83 type BatchPointsWriter interface {
84 Write(influxdb.BatchPoints) error
87 // WriteLoop is a helper method that invokes WriteTo to the passed writer every
88 // time the passed channel fires. This method blocks until the channel is
89 // closed, so clients probably want to run it in its own goroutine. For typical
90 // usage, create a time.Ticker and pass its C channel to this method.
91 func (in *Influx) WriteLoop(c <-chan time.Time, w BatchPointsWriter) {
93 if err := in.WriteTo(w); err != nil {
94 in.logger.Log("during", "WriteTo", "err", err)
99 // WriteTo flushes the buffered content of the metrics to the writer, in an
100 // Influx BatchPoints format. WriteTo abides best-effort semantics, so
101 // observations are lost if there is a problem with the write. Clients should be
102 // sure to call WriteTo regularly, ideally through the WriteLoop helper method.
103 func (in *Influx) WriteTo(w BatchPointsWriter) (err error) {
104 bp, err := influxdb.NewBatchPoints(in.conf)
111 in.counters.Reset().Walk(func(name string, lvs lv.LabelValues, values []float64) bool {
112 tags := mergeTags(in.tags, lvs)
113 var p *influxdb.Point
114 fields := map[string]interface{}{"count": sum(values)}
115 p, err = influxdb.NewPoint(name, tags, fields, now)
126 in.gauges.Reset().Walk(func(name string, lvs lv.LabelValues, values []float64) bool {
127 tags := mergeTags(in.tags, lvs)
128 var p *influxdb.Point
129 fields := map[string]interface{}{"value": last(values)}
130 p, err = influxdb.NewPoint(name, tags, fields, now)
141 in.histograms.Reset().Walk(func(name string, lvs lv.LabelValues, values []float64) bool {
142 histogram := generic.NewHistogram(name, 50)
143 tags := mergeTags(in.tags, lvs)
144 var p *influxdb.Point
145 for _, v := range values {
148 fields := map[string]interface{}{
149 "p50": histogram.Quantile(0.50),
150 "p90": histogram.Quantile(0.90),
151 "p95": histogram.Quantile(0.95),
152 "p99": histogram.Quantile(0.99),
154 p, err = influxdb.NewPoint(name, tags, fields, now)
168 func mergeTags(tags map[string]string, labelValues []string) map[string]string {
169 if len(labelValues)%2 != 0 {
170 panic("mergeTags received a labelValues with an odd number of strings")
172 ret := make(map[string]string, len(tags)+len(labelValues)/2)
173 for k, v := range tags {
176 for i := 0; i < len(labelValues); i += 2 {
177 ret[labelValues[i]] = labelValues[i+1]
182 func sum(a []float64) float64 {
184 for _, f := range a {
190 func last(a []float64) float64 {
194 type observeFunc func(name string, lvs lv.LabelValues, value float64)
196 // Counter is an Influx counter. Observations are forwarded to an Influx
197 // object, and aggregated (summed) per timeseries.
198 type Counter struct {
204 // With implements metrics.Counter.
205 func (c *Counter) With(labelValues ...string) metrics.Counter {
208 lvs: c.lvs.With(labelValues...),
213 // Add implements metrics.Counter.
214 func (c *Counter) Add(delta float64) {
215 c.obs(c.name, c.lvs, delta)
218 // Gauge is an Influx gauge. Observations are forwarded to a Dogstatsd
219 // object, and aggregated (the last observation selected) per timeseries.
227 // With implements metrics.Gauge.
228 func (g *Gauge) With(labelValues ...string) metrics.Gauge {
231 lvs: g.lvs.With(labelValues...),
237 // Set implements metrics.Gauge.
238 func (g *Gauge) Set(value float64) {
239 g.obs(g.name, g.lvs, value)
242 // Add implements metrics.Gauge.
243 func (g *Gauge) Add(delta float64) {
244 g.add(g.name, g.lvs, delta)
247 // Histogram is an Influx histrogram. Observations are aggregated into a
248 // generic.Histogram and emitted as per-quantile gauges to the Influx server.
249 type Histogram struct {
255 // With implements metrics.Histogram.
256 func (h *Histogram) With(labelValues ...string) metrics.Histogram {
259 lvs: h.lvs.With(labelValues...),
264 // Observe implements metrics.Histogram.
265 func (h *Histogram) Observe(value float64) {
266 h.obs(h.name, h.lvs, value)