OSDN Git Service

new repo
[bytom/vapor.git] / vendor / golang.org / x / net / internal / timeseries / timeseries.go
1 // Copyright 2015 The Go Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style
3 // license that can be found in the LICENSE file.
4
5 // Package timeseries implements a time series structure for stats collection.
6 package timeseries // import "golang.org/x/net/internal/timeseries"
7
8 import (
9         "fmt"
10         "log"
11         "time"
12 )
13
14 const (
15         timeSeriesNumBuckets       = 64
16         minuteHourSeriesNumBuckets = 60
17 )
18
19 var timeSeriesResolutions = []time.Duration{
20         1 * time.Second,
21         10 * time.Second,
22         1 * time.Minute,
23         10 * time.Minute,
24         1 * time.Hour,
25         6 * time.Hour,
26         24 * time.Hour,          // 1 day
27         7 * 24 * time.Hour,      // 1 week
28         4 * 7 * 24 * time.Hour,  // 4 weeks
29         16 * 7 * 24 * time.Hour, // 16 weeks
30 }
31
32 var minuteHourSeriesResolutions = []time.Duration{
33         1 * time.Second,
34         1 * time.Minute,
35 }
36
37 // An Observable is a kind of data that can be aggregated in a time series.
38 type Observable interface {
39         Multiply(ratio float64)    // Multiplies the data in self by a given ratio
40         Add(other Observable)      // Adds the data from a different observation to self
41         Clear()                    // Clears the observation so it can be reused.
42         CopyFrom(other Observable) // Copies the contents of a given observation to self
43 }
44
45 // Float attaches the methods of Observable to a float64.
46 type Float float64
47
48 // NewFloat returns a Float.
49 func NewFloat() Observable {
50         f := Float(0)
51         return &f
52 }
53
54 // String returns the float as a string.
55 func (f *Float) String() string { return fmt.Sprintf("%g", f.Value()) }
56
57 // Value returns the float's value.
58 func (f *Float) Value() float64 { return float64(*f) }
59
60 func (f *Float) Multiply(ratio float64) { *f *= Float(ratio) }
61
62 func (f *Float) Add(other Observable) {
63         o := other.(*Float)
64         *f += *o
65 }
66
67 func (f *Float) Clear() { *f = 0 }
68
69 func (f *Float) CopyFrom(other Observable) {
70         o := other.(*Float)
71         *f = *o
72 }
73
74 // A Clock tells the current time.
75 type Clock interface {
76         Time() time.Time
77 }
78
79 type defaultClock int
80
81 var defaultClockInstance defaultClock
82
83 func (defaultClock) Time() time.Time { return time.Now() }
84
85 // Information kept per level. Each level consists of a circular list of
86 // observations. The start of the level may be derived from end and the
87 // len(buckets) * sizeInMillis.
88 type tsLevel struct {
89         oldest   int               // index to oldest bucketed Observable
90         newest   int               // index to newest bucketed Observable
91         end      time.Time         // end timestamp for this level
92         size     time.Duration     // duration of the bucketed Observable
93         buckets  []Observable      // collections of observations
94         provider func() Observable // used for creating new Observable
95 }
96
97 func (l *tsLevel) Clear() {
98         l.oldest = 0
99         l.newest = len(l.buckets) - 1
100         l.end = time.Time{}
101         for i := range l.buckets {
102                 if l.buckets[i] != nil {
103                         l.buckets[i].Clear()
104                         l.buckets[i] = nil
105                 }
106         }
107 }
108
109 func (l *tsLevel) InitLevel(size time.Duration, numBuckets int, f func() Observable) {
110         l.size = size
111         l.provider = f
112         l.buckets = make([]Observable, numBuckets)
113 }
114
115 // Keeps a sequence of levels. Each level is responsible for storing data at
116 // a given resolution. For example, the first level stores data at a one
117 // minute resolution while the second level stores data at a one hour
118 // resolution.
119
120 // Each level is represented by a sequence of buckets. Each bucket spans an
121 // interval equal to the resolution of the level. New observations are added
122 // to the last bucket.
123 type timeSeries struct {
124         provider    func() Observable // make more Observable
125         numBuckets  int               // number of buckets in each level
126         levels      []*tsLevel        // levels of bucketed Observable
127         lastAdd     time.Time         // time of last Observable tracked
128         total       Observable        // convenient aggregation of all Observable
129         clock       Clock             // Clock for getting current time
130         pending     Observable        // observations not yet bucketed
131         pendingTime time.Time         // what time are we keeping in pending
132         dirty       bool              // if there are pending observations
133 }
134
135 // init initializes a level according to the supplied criteria.
136 func (ts *timeSeries) init(resolutions []time.Duration, f func() Observable, numBuckets int, clock Clock) {
137         ts.provider = f
138         ts.numBuckets = numBuckets
139         ts.clock = clock
140         ts.levels = make([]*tsLevel, len(resolutions))
141
142         for i := range resolutions {
143                 if i > 0 && resolutions[i-1] >= resolutions[i] {
144                         log.Print("timeseries: resolutions must be monotonically increasing")
145                         break
146                 }
147                 newLevel := new(tsLevel)
148                 newLevel.InitLevel(resolutions[i], ts.numBuckets, ts.provider)
149                 ts.levels[i] = newLevel
150         }
151
152         ts.Clear()
153 }
154
155 // Clear removes all observations from the time series.
156 func (ts *timeSeries) Clear() {
157         ts.lastAdd = time.Time{}
158         ts.total = ts.resetObservation(ts.total)
159         ts.pending = ts.resetObservation(ts.pending)
160         ts.pendingTime = time.Time{}
161         ts.dirty = false
162
163         for i := range ts.levels {
164                 ts.levels[i].Clear()
165         }
166 }
167
168 // Add records an observation at the current time.
169 func (ts *timeSeries) Add(observation Observable) {
170         ts.AddWithTime(observation, ts.clock.Time())
171 }
172
173 // AddWithTime records an observation at the specified time.
174 func (ts *timeSeries) AddWithTime(observation Observable, t time.Time) {
175
176         smallBucketDuration := ts.levels[0].size
177
178         if t.After(ts.lastAdd) {
179                 ts.lastAdd = t
180         }
181
182         if t.After(ts.pendingTime) {
183                 ts.advance(t)
184                 ts.mergePendingUpdates()
185                 ts.pendingTime = ts.levels[0].end
186                 ts.pending.CopyFrom(observation)
187                 ts.dirty = true
188         } else if t.After(ts.pendingTime.Add(-1 * smallBucketDuration)) {
189                 // The observation is close enough to go into the pending bucket.
190                 // This compensates for clock skewing and small scheduling delays
191                 // by letting the update stay in the fast path.
192                 ts.pending.Add(observation)
193                 ts.dirty = true
194         } else {
195                 ts.mergeValue(observation, t)
196         }
197 }
198
199 // mergeValue inserts the observation at the specified time in the past into all levels.
200 func (ts *timeSeries) mergeValue(observation Observable, t time.Time) {
201         for _, level := range ts.levels {
202                 index := (ts.numBuckets - 1) - int(level.end.Sub(t)/level.size)
203                 if 0 <= index && index < ts.numBuckets {
204                         bucketNumber := (level.oldest + index) % ts.numBuckets
205                         if level.buckets[bucketNumber] == nil {
206                                 level.buckets[bucketNumber] = level.provider()
207                         }
208                         level.buckets[bucketNumber].Add(observation)
209                 }
210         }
211         ts.total.Add(observation)
212 }
213
214 // mergePendingUpdates applies the pending updates into all levels.
215 func (ts *timeSeries) mergePendingUpdates() {
216         if ts.dirty {
217                 ts.mergeValue(ts.pending, ts.pendingTime)
218                 ts.pending = ts.resetObservation(ts.pending)
219                 ts.dirty = false
220         }
221 }
222
223 // advance cycles the buckets at each level until the latest bucket in
224 // each level can hold the time specified.
225 func (ts *timeSeries) advance(t time.Time) {
226         if !t.After(ts.levels[0].end) {
227                 return
228         }
229         for i := 0; i < len(ts.levels); i++ {
230                 level := ts.levels[i]
231                 if !level.end.Before(t) {
232                         break
233                 }
234
235                 // If the time is sufficiently far, just clear the level and advance
236                 // directly.
237                 if !t.Before(level.end.Add(level.size * time.Duration(ts.numBuckets))) {
238                         for _, b := range level.buckets {
239                                 ts.resetObservation(b)
240                         }
241                         level.end = time.Unix(0, (t.UnixNano()/level.size.Nanoseconds())*level.size.Nanoseconds())
242                 }
243
244                 for t.After(level.end) {
245                         level.end = level.end.Add(level.size)
246                         level.newest = level.oldest
247                         level.oldest = (level.oldest + 1) % ts.numBuckets
248                         ts.resetObservation(level.buckets[level.newest])
249                 }
250
251                 t = level.end
252         }
253 }
254
255 // Latest returns the sum of the num latest buckets from the level.
256 func (ts *timeSeries) Latest(level, num int) Observable {
257         now := ts.clock.Time()
258         if ts.levels[0].end.Before(now) {
259                 ts.advance(now)
260         }
261
262         ts.mergePendingUpdates()
263
264         result := ts.provider()
265         l := ts.levels[level]
266         index := l.newest
267
268         for i := 0; i < num; i++ {
269                 if l.buckets[index] != nil {
270                         result.Add(l.buckets[index])
271                 }
272                 if index == 0 {
273                         index = ts.numBuckets
274                 }
275                 index--
276         }
277
278         return result
279 }
280
281 // LatestBuckets returns a copy of the num latest buckets from level.
282 func (ts *timeSeries) LatestBuckets(level, num int) []Observable {
283         if level < 0 || level > len(ts.levels) {
284                 log.Print("timeseries: bad level argument: ", level)
285                 return nil
286         }
287         if num < 0 || num >= ts.numBuckets {
288                 log.Print("timeseries: bad num argument: ", num)
289                 return nil
290         }
291
292         results := make([]Observable, num)
293         now := ts.clock.Time()
294         if ts.levels[0].end.Before(now) {
295                 ts.advance(now)
296         }
297
298         ts.mergePendingUpdates()
299
300         l := ts.levels[level]
301         index := l.newest
302
303         for i := 0; i < num; i++ {
304                 result := ts.provider()
305                 results[i] = result
306                 if l.buckets[index] != nil {
307                         result.CopyFrom(l.buckets[index])
308                 }
309
310                 if index == 0 {
311                         index = ts.numBuckets
312                 }
313                 index -= 1
314         }
315         return results
316 }
317
318 // ScaleBy updates observations by scaling by factor.
319 func (ts *timeSeries) ScaleBy(factor float64) {
320         for _, l := range ts.levels {
321                 for i := 0; i < ts.numBuckets; i++ {
322                         l.buckets[i].Multiply(factor)
323                 }
324         }
325
326         ts.total.Multiply(factor)
327         ts.pending.Multiply(factor)
328 }
329
330 // Range returns the sum of observations added over the specified time range.
331 // If start or finish times don't fall on bucket boundaries of the same
332 // level, then return values are approximate answers.
333 func (ts *timeSeries) Range(start, finish time.Time) Observable {
334         return ts.ComputeRange(start, finish, 1)[0]
335 }
336
337 // Recent returns the sum of observations from the last delta.
338 func (ts *timeSeries) Recent(delta time.Duration) Observable {
339         now := ts.clock.Time()
340         return ts.Range(now.Add(-delta), now)
341 }
342
343 // Total returns the total of all observations.
344 func (ts *timeSeries) Total() Observable {
345         ts.mergePendingUpdates()
346         return ts.total
347 }
348
349 // ComputeRange computes a specified number of values into a slice using
350 // the observations recorded over the specified time period. The return
351 // values are approximate if the start or finish times don't fall on the
352 // bucket boundaries at the same level or if the number of buckets spanning
353 // the range is not an integral multiple of num.
354 func (ts *timeSeries) ComputeRange(start, finish time.Time, num int) []Observable {
355         if start.After(finish) {
356                 log.Printf("timeseries: start > finish, %v>%v", start, finish)
357                 return nil
358         }
359
360         if num < 0 {
361                 log.Printf("timeseries: num < 0, %v", num)
362                 return nil
363         }
364
365         results := make([]Observable, num)
366
367         for _, l := range ts.levels {
368                 if !start.Before(l.end.Add(-l.size * time.Duration(ts.numBuckets))) {
369                         ts.extract(l, start, finish, num, results)
370                         return results
371                 }
372         }
373
374         // Failed to find a level that covers the desired range. So just
375         // extract from the last level, even if it doesn't cover the entire
376         // desired range.
377         ts.extract(ts.levels[len(ts.levels)-1], start, finish, num, results)
378
379         return results
380 }
381
382 // RecentList returns the specified number of values in slice over the most
383 // recent time period of the specified range.
384 func (ts *timeSeries) RecentList(delta time.Duration, num int) []Observable {
385         if delta < 0 {
386                 return nil
387         }
388         now := ts.clock.Time()
389         return ts.ComputeRange(now.Add(-delta), now, num)
390 }
391
392 // extract returns a slice of specified number of observations from a given
393 // level over a given range.
394 func (ts *timeSeries) extract(l *tsLevel, start, finish time.Time, num int, results []Observable) {
395         ts.mergePendingUpdates()
396
397         srcInterval := l.size
398         dstInterval := finish.Sub(start) / time.Duration(num)
399         dstStart := start
400         srcStart := l.end.Add(-srcInterval * time.Duration(ts.numBuckets))
401
402         srcIndex := 0
403
404         // Where should scanning start?
405         if dstStart.After(srcStart) {
406                 advance := dstStart.Sub(srcStart) / srcInterval
407                 srcIndex += int(advance)
408                 srcStart = srcStart.Add(advance * srcInterval)
409         }
410
411         // The i'th value is computed as show below.
412         // interval = (finish/start)/num
413         // i'th value = sum of observation in range
414         //   [ start + i       * interval,
415         //     start + (i + 1) * interval )
416         for i := 0; i < num; i++ {
417                 results[i] = ts.resetObservation(results[i])
418                 dstEnd := dstStart.Add(dstInterval)
419                 for srcIndex < ts.numBuckets && srcStart.Before(dstEnd) {
420                         srcEnd := srcStart.Add(srcInterval)
421                         if srcEnd.After(ts.lastAdd) {
422                                 srcEnd = ts.lastAdd
423                         }
424
425                         if !srcEnd.Before(dstStart) {
426                                 srcValue := l.buckets[(srcIndex+l.oldest)%ts.numBuckets]
427                                 if !srcStart.Before(dstStart) && !srcEnd.After(dstEnd) {
428                                         // dst completely contains src.
429                                         if srcValue != nil {
430                                                 results[i].Add(srcValue)
431                                         }
432                                 } else {
433                                         // dst partially overlaps src.
434                                         overlapStart := maxTime(srcStart, dstStart)
435                                         overlapEnd := minTime(srcEnd, dstEnd)
436                                         base := srcEnd.Sub(srcStart)
437                                         fraction := overlapEnd.Sub(overlapStart).Seconds() / base.Seconds()
438
439                                         used := ts.provider()
440                                         if srcValue != nil {
441                                                 used.CopyFrom(srcValue)
442                                         }
443                                         used.Multiply(fraction)
444                                         results[i].Add(used)
445                                 }
446
447                                 if srcEnd.After(dstEnd) {
448                                         break
449                                 }
450                         }
451                         srcIndex++
452                         srcStart = srcStart.Add(srcInterval)
453                 }
454                 dstStart = dstStart.Add(dstInterval)
455         }
456 }
457
458 // resetObservation clears the content so the struct may be reused.
459 func (ts *timeSeries) resetObservation(observation Observable) Observable {
460         if observation == nil {
461                 observation = ts.provider()
462         } else {
463                 observation.Clear()
464         }
465         return observation
466 }
467
468 // TimeSeries tracks data at granularities from 1 second to 16 weeks.
469 type TimeSeries struct {
470         timeSeries
471 }
472
473 // NewTimeSeries creates a new TimeSeries using the function provided for creating new Observable.
474 func NewTimeSeries(f func() Observable) *TimeSeries {
475         return NewTimeSeriesWithClock(f, defaultClockInstance)
476 }
477
478 // NewTimeSeriesWithClock creates a new TimeSeries using the function provided for creating new Observable and the clock for
479 // assigning timestamps.
480 func NewTimeSeriesWithClock(f func() Observable, clock Clock) *TimeSeries {
481         ts := new(TimeSeries)
482         ts.timeSeries.init(timeSeriesResolutions, f, timeSeriesNumBuckets, clock)
483         return ts
484 }
485
486 // MinuteHourSeries tracks data at granularities of 1 minute and 1 hour.
487 type MinuteHourSeries struct {
488         timeSeries
489 }
490
491 // NewMinuteHourSeries creates a new MinuteHourSeries using the function provided for creating new Observable.
492 func NewMinuteHourSeries(f func() Observable) *MinuteHourSeries {
493         return NewMinuteHourSeriesWithClock(f, defaultClockInstance)
494 }
495
496 // NewMinuteHourSeriesWithClock creates a new MinuteHourSeries using the function provided for creating new Observable and the clock for
497 // assigning timestamps.
498 func NewMinuteHourSeriesWithClock(f func() Observable, clock Clock) *MinuteHourSeries {
499         ts := new(MinuteHourSeries)
500         ts.timeSeries.init(minuteHourSeriesResolutions, f,
501                 minuteHourSeriesNumBuckets, clock)
502         return ts
503 }
504
505 func (ts *MinuteHourSeries) Minute() Observable {
506         return ts.timeSeries.Latest(0, 60)
507 }
508
509 func (ts *MinuteHourSeries) Hour() Observable {
510         return ts.timeSeries.Latest(1, 60)
511 }
512
513 func minTime(a, b time.Time) time.Time {
514         if a.Before(b) {
515                 return a
516         }
517         return b
518 }
519
520 func maxTime(a, b time.Time) time.Time {
521         if a.After(b) {
522                 return a
523         }
524         return b
525 }