OSDN Git Service

new repo
[bytom/vapor.git] / vendor / github.com / go-kit / kit / metrics / dogstatsd / dogstatsd.go
1 // Package dogstatsd provides a DogStatsD backend for package metrics. It's very
2 // similar to StatsD, but supports arbitrary tags per-metric, which map to Go
3 // kit's label values. So, while label values are no-ops in StatsD, they are
4 // supported here. For more details, see the documentation at
5 // http://docs.datadoghq.com/guides/dogstatsd/.
6 //
7 // This package batches observations and emits them on some schedule to the
8 // remote server. This is useful even if you connect to your DogStatsD server
9 // over UDP. Emitting one network packet per observation can quickly overwhelm
10 // even the fastest internal network.
11 package dogstatsd
12
13 import (
14         "fmt"
15         "io"
16         "strings"
17         "sync"
18         "sync/atomic"
19         "time"
20
21         "github.com/go-kit/kit/log"
22         "github.com/go-kit/kit/metrics"
23         "github.com/go-kit/kit/metrics/generic"
24         "github.com/go-kit/kit/metrics/internal/lv"
25         "github.com/go-kit/kit/metrics/internal/ratemap"
26         "github.com/go-kit/kit/util/conn"
27 )
28
29 // Dogstatsd receives metrics observations and forwards them to a DogStatsD
30 // server. Create a Dogstatsd object, use it to create metrics, and pass those
31 // metrics as dependencies to the components that will use them.
32 //
33 // All metrics are buffered until WriteTo is called. Counters and gauges are
34 // aggregated into a single observation per timeseries per write. Timings and
35 // histograms are buffered but not aggregated.
36 //
37 // To regularly report metrics to an io.Writer, use the WriteLoop helper method.
38 // To send to a DogStatsD server, use the SendLoop helper method.
39 type Dogstatsd struct {
40         mtx        sync.RWMutex
41         prefix     string
42         rates      *ratemap.RateMap
43         counters   *lv.Space
44         gauges     map[string]*gaugeNode
45         timings    *lv.Space
46         histograms *lv.Space
47         logger     log.Logger
48         lvs        lv.LabelValues
49 }
50
51 // New returns a Dogstatsd object that may be used to create metrics. Prefix is
52 // applied to all created metrics. Callers must ensure that regular calls to
53 // WriteTo are performed, either manually or with one of the helper methods.
54 func New(prefix string, logger log.Logger, lvs ...string) *Dogstatsd {
55         if len(lvs)%2 != 0 {
56                 panic("odd number of LabelValues; programmer error!")
57         }
58         return &Dogstatsd{
59                 prefix:     prefix,
60                 rates:      ratemap.New(),
61                 counters:   lv.NewSpace(),
62                 gauges:     map[string]*gaugeNode{},
63                 timings:    lv.NewSpace(),
64                 histograms: lv.NewSpace(),
65                 logger:     logger,
66                 lvs:        lvs,
67         }
68 }
69
70 // NewCounter returns a counter, sending observations to this Dogstatsd object.
71 func (d *Dogstatsd) NewCounter(name string, sampleRate float64) *Counter {
72         d.rates.Set(name, sampleRate)
73         return &Counter{
74                 name: name,
75                 obs:  d.counters.Observe,
76         }
77 }
78
79 // NewGauge returns a gauge, sending observations to this Dogstatsd object.
80 func (d *Dogstatsd) NewGauge(name string) *Gauge {
81         d.mtx.Lock()
82         n, ok := d.gauges[name]
83         if !ok {
84                 n = &gaugeNode{gauge: &Gauge{g: generic.NewGauge(name), ddog: d}}
85                 d.gauges[name] = n
86         }
87         d.mtx.Unlock()
88         return n.gauge
89 }
90
91 // NewTiming returns a histogram whose observations are interpreted as
92 // millisecond durations, and are forwarded to this Dogstatsd object.
93 func (d *Dogstatsd) NewTiming(name string, sampleRate float64) *Timing {
94         d.rates.Set(name, sampleRate)
95         return &Timing{
96                 name: name,
97                 obs:  d.timings.Observe,
98         }
99 }
100
101 // NewHistogram returns a histogram whose observations are of an unspecified
102 // unit, and are forwarded to this Dogstatsd object.
103 func (d *Dogstatsd) NewHistogram(name string, sampleRate float64) *Histogram {
104         d.rates.Set(name, sampleRate)
105         return &Histogram{
106                 name: name,
107                 obs:  d.histograms.Observe,
108         }
109 }
110
111 // WriteLoop is a helper method that invokes WriteTo to the passed writer every
112 // time the passed channel fires. This method blocks until the channel is
113 // closed, so clients probably want to run it in its own goroutine. For typical
114 // usage, create a time.Ticker and pass its C channel to this method.
115 func (d *Dogstatsd) WriteLoop(c <-chan time.Time, w io.Writer) {
116         for range c {
117                 if _, err := d.WriteTo(w); err != nil {
118                         d.logger.Log("during", "WriteTo", "err", err)
119                 }
120         }
121 }
122
123 // SendLoop is a helper method that wraps WriteLoop, passing a managed
124 // connection to the network and address. Like WriteLoop, this method blocks
125 // until the channel is closed, so clients probably want to start it in its own
126 // goroutine. For typical usage, create a time.Ticker and pass its C channel to
127 // this method.
128 func (d *Dogstatsd) SendLoop(c <-chan time.Time, network, address string) {
129         d.WriteLoop(c, conn.NewDefaultManager(network, address, d.logger))
130 }
131
132 // WriteTo flushes the buffered content of the metrics to the writer, in
133 // DogStatsD format. WriteTo abides best-effort semantics, so observations are
134 // lost if there is a problem with the write. Clients should be sure to call
135 // WriteTo regularly, ideally through the WriteLoop or SendLoop helper methods.
136 func (d *Dogstatsd) WriteTo(w io.Writer) (count int64, err error) {
137         var n int
138
139         d.counters.Reset().Walk(func(name string, lvs lv.LabelValues, values []float64) bool {
140                 n, err = fmt.Fprintf(w, "%s%s:%f|c%s%s\n", d.prefix, name, sum(values), sampling(d.rates.Get(name)), d.tagValues(lvs))
141                 if err != nil {
142                         return false
143                 }
144                 count += int64(n)
145                 return true
146         })
147         if err != nil {
148                 return count, err
149         }
150
151         d.mtx.RLock()
152         for _, root := range d.gauges {
153                 root.walk(func(name string, lvs lv.LabelValues, value float64) bool {
154                         n, err = fmt.Fprintf(w, "%s%s:%f|g%s\n", d.prefix, name, value, d.tagValues(lvs))
155                         if err != nil {
156                                 return false
157                         }
158                         count += int64(n)
159                         return true
160                 })
161         }
162         d.mtx.RUnlock()
163
164         d.timings.Reset().Walk(func(name string, lvs lv.LabelValues, values []float64) bool {
165                 sampleRate := d.rates.Get(name)
166                 for _, value := range values {
167                         n, err = fmt.Fprintf(w, "%s%s:%f|ms%s%s\n", d.prefix, name, value, sampling(sampleRate), d.tagValues(lvs))
168                         if err != nil {
169                                 return false
170                         }
171                         count += int64(n)
172                 }
173                 return true
174         })
175         if err != nil {
176                 return count, err
177         }
178
179         d.histograms.Reset().Walk(func(name string, lvs lv.LabelValues, values []float64) bool {
180                 sampleRate := d.rates.Get(name)
181                 for _, value := range values {
182                         n, err = fmt.Fprintf(w, "%s%s:%f|h%s%s\n", d.prefix, name, value, sampling(sampleRate), d.tagValues(lvs))
183                         if err != nil {
184                                 return false
185                         }
186                         count += int64(n)
187                 }
188                 return true
189         })
190         if err != nil {
191                 return count, err
192         }
193
194         return count, err
195 }
196
197 func sum(a []float64) float64 {
198         var v float64
199         for _, f := range a {
200                 v += f
201         }
202         return v
203 }
204
205 func last(a []float64) float64 {
206         return a[len(a)-1]
207 }
208
209 func sampling(r float64) string {
210         var sv string
211         if r < 1.0 {
212                 sv = fmt.Sprintf("|@%f", r)
213         }
214         return sv
215 }
216
217 func (d *Dogstatsd) tagValues(labelValues []string) string {
218         if len(labelValues) == 0 && len(d.lvs) == 0 {
219                 return ""
220         }
221         if len(labelValues)%2 != 0 {
222                 panic("tagValues received a labelValues with an odd number of strings")
223         }
224         pairs := make([]string, 0, (len(d.lvs)+len(labelValues))/2)
225         for i := 0; i < len(d.lvs); i += 2 {
226                 pairs = append(pairs, d.lvs[i]+":"+d.lvs[i+1])
227         }
228         for i := 0; i < len(labelValues); i += 2 {
229                 pairs = append(pairs, labelValues[i]+":"+labelValues[i+1])
230         }
231         return "|#" + strings.Join(pairs, ",")
232 }
233
234 type observeFunc func(name string, lvs lv.LabelValues, value float64)
235
236 // Counter is a DogStatsD counter. Observations are forwarded to a Dogstatsd
237 // object, and aggregated (summed) per timeseries.
238 type Counter struct {
239         name string
240         lvs  lv.LabelValues
241         obs  observeFunc
242 }
243
244 // With implements metrics.Counter.
245 func (c *Counter) With(labelValues ...string) metrics.Counter {
246         return &Counter{
247                 name: c.name,
248                 lvs:  c.lvs.With(labelValues...),
249                 obs:  c.obs,
250         }
251 }
252
253 // Add implements metrics.Counter.
254 func (c *Counter) Add(delta float64) {
255         c.obs(c.name, c.lvs, delta)
256 }
257
258 // Gauge is a DogStatsD gauge. Observations are forwarded to a Dogstatsd
259 // object, and aggregated (the last observation selected) per timeseries.
260 type Gauge struct {
261         g    *generic.Gauge
262         ddog *Dogstatsd
263         set  int32
264 }
265
266 // With implements metrics.Gauge.
267 func (g *Gauge) With(labelValues ...string) metrics.Gauge {
268         g.ddog.mtx.RLock()
269         node := g.ddog.gauges[g.g.Name]
270         g.ddog.mtx.RUnlock()
271
272         ga := &Gauge{g: g.g.With(labelValues...).(*generic.Gauge), ddog: g.ddog}
273         return node.addGauge(ga, ga.g.LabelValues())
274 }
275
276 // Set implements metrics.Gauge.
277 func (g *Gauge) Set(value float64) {
278         g.g.Set(value)
279         g.touch()
280 }
281
282 // Add implements metrics.Gauge.
283 func (g *Gauge) Add(delta float64) {
284         g.g.Add(delta)
285         g.touch()
286 }
287
288 // Timing is a DogStatsD timing, or metrics.Histogram. Observations are
289 // forwarded to a Dogstatsd object, and collected (but not aggregated) per
290 // timeseries.
291 type Timing struct {
292         name string
293         lvs  lv.LabelValues
294         obs  observeFunc
295 }
296
297 // With implements metrics.Timing.
298 func (t *Timing) With(labelValues ...string) metrics.Histogram {
299         return &Timing{
300                 name: t.name,
301                 lvs:  t.lvs.With(labelValues...),
302                 obs:  t.obs,
303         }
304 }
305
306 // Observe implements metrics.Histogram. Value is interpreted as milliseconds.
307 func (t *Timing) Observe(value float64) {
308         t.obs(t.name, t.lvs, value)
309 }
310
311 // Histogram is a DogStatsD histrogram. Observations are forwarded to a
312 // Dogstatsd object, and collected (but not aggregated) per timeseries.
313 type Histogram struct {
314         name string
315         lvs  lv.LabelValues
316         obs  observeFunc
317 }
318
319 // With implements metrics.Histogram.
320 func (h *Histogram) With(labelValues ...string) metrics.Histogram {
321         return &Histogram{
322                 name: h.name,
323                 lvs:  h.lvs.With(labelValues...),
324                 obs:  h.obs,
325         }
326 }
327
328 // Observe implements metrics.Histogram.
329 func (h *Histogram) Observe(value float64) {
330         h.obs(h.name, h.lvs, value)
331 }
332
333 type pair struct{ label, value string }
334
335 type gaugeNode struct {
336         mtx      sync.RWMutex
337         gauge    *Gauge
338         children map[pair]*gaugeNode
339 }
340
341 func (n *gaugeNode) addGauge(g *Gauge, lvs lv.LabelValues) *Gauge {
342         n.mtx.Lock()
343         defer n.mtx.Unlock()
344         if len(lvs) == 0 {
345                 if n.gauge == nil {
346                         n.gauge = g
347                 }
348                 return n.gauge
349         }
350         if len(lvs) < 2 {
351                 panic("too few LabelValues; programmer error!")
352         }
353         head, tail := pair{lvs[0], lvs[1]}, lvs[2:]
354         if n.children == nil {
355                 n.children = map[pair]*gaugeNode{}
356         }
357         child, ok := n.children[head]
358         if !ok {
359                 child = &gaugeNode{}
360                 n.children[head] = child
361         }
362         return child.addGauge(g, tail)
363 }
364
365 func (n *gaugeNode) walk(fn func(string, lv.LabelValues, float64) bool) bool {
366         n.mtx.RLock()
367         defer n.mtx.RUnlock()
368         if n.gauge != nil {
369                 value, ok := n.gauge.read()
370                 if ok && !fn(n.gauge.g.Name, n.gauge.g.LabelValues(), value) {
371                         return false
372                 }
373         }
374         for _, child := range n.children {
375                 if !child.walk(fn) {
376                         return false
377                 }
378         }
379         return true
380 }
381
382 func (g *Gauge) touch() {
383         atomic.StoreInt32(&(g.set), 1)
384 }
385
386 func (g *Gauge) read() (float64, bool) {
387         set := atomic.SwapInt32(&(g.set), 0)
388         return g.g.Value(), set != 0
389 }