OSDN Git Service

Hulk did something
[bytom/vapor.git] / vendor / github.com / tendermint / tmlibs / flowrate / flowrate.go
1 //
2 // Written by Maxim Khitrov (November 2012)
3 //
4
5 // Package flowrate provides the tools for monitoring and limiting the flow rate
6 // of an arbitrary data stream.
7 package flowrate
8
9 import (
10         "math"
11         "sync"
12         "time"
13 )
14
15 // Monitor monitors and limits the transfer rate of a data stream.
16 type Monitor struct {
17         mu      sync.Mutex    // Mutex guarding access to all internal fields
18         active  bool          // Flag indicating an active transfer
19         start   time.Duration // Transfer start time (clock() value)
20         bytes   int64         // Total number of bytes transferred
21         samples int64         // Total number of samples taken
22
23         rSample float64 // Most recent transfer rate sample (bytes per second)
24         rEMA    float64 // Exponential moving average of rSample
25         rPeak   float64 // Peak transfer rate (max of all rSamples)
26         rWindow float64 // rEMA window (seconds)
27
28         sBytes int64         // Number of bytes transferred since sLast
29         sLast  time.Duration // Most recent sample time (stop time when inactive)
30         sRate  time.Duration // Sampling rate
31
32         tBytes int64         // Number of bytes expected in the current transfer
33         tLast  time.Duration // Time of the most recent transfer of at least 1 byte
34 }
35
36 // New creates a new flow control monitor. Instantaneous transfer rate is
37 // measured and updated for each sampleRate interval. windowSize determines the
38 // weight of each sample in the exponential moving average (EMA) calculation.
39 // The exact formulas are:
40 //
41 //      sampleTime = currentTime - prevSampleTime
42 //      sampleRate = byteCount / sampleTime
43 //      weight     = 1 - exp(-sampleTime/windowSize)
44 //      newRate    = weight*sampleRate + (1-weight)*oldRate
45 //
46 // The default values for sampleRate and windowSize (if <= 0) are 100ms and 1s,
47 // respectively.
48 func New(sampleRate, windowSize time.Duration) *Monitor {
49         if sampleRate = clockRound(sampleRate); sampleRate <= 0 {
50                 sampleRate = 5 * clockRate
51         }
52         if windowSize <= 0 {
53                 windowSize = 1 * time.Second
54         }
55         now := clock()
56         return &Monitor{
57                 active:  true,
58                 start:   now,
59                 rWindow: windowSize.Seconds(),
60                 sLast:   now,
61                 sRate:   sampleRate,
62                 tLast:   now,
63         }
64 }
65
66 // Update records the transfer of n bytes and returns n. It should be called
67 // after each Read/Write operation, even if n is 0.
68 func (m *Monitor) Update(n int) int {
69         m.mu.Lock()
70         m.update(n)
71         m.mu.Unlock()
72         return n
73 }
74
75 // Hack to set the current rEMA.
76 func (m *Monitor) SetREMA(rEMA float64) {
77         m.mu.Lock()
78         m.rEMA = rEMA
79         m.samples++
80         m.mu.Unlock()
81 }
82
83 // IO is a convenience method intended to wrap io.Reader and io.Writer method
84 // execution. It calls m.Update(n) and then returns (n, err) unmodified.
85 func (m *Monitor) IO(n int, err error) (int, error) {
86         return m.Update(n), err
87 }
88
89 // Done marks the transfer as finished and prevents any further updates or
90 // limiting. Instantaneous and current transfer rates drop to 0. Update, IO, and
91 // Limit methods become NOOPs. It returns the total number of bytes transferred.
92 func (m *Monitor) Done() int64 {
93         m.mu.Lock()
94         if now := m.update(0); m.sBytes > 0 {
95                 m.reset(now)
96         }
97         m.active = false
98         m.tLast = 0
99         n := m.bytes
100         m.mu.Unlock()
101         return n
102 }
103
104 // timeRemLimit is the maximum Status.TimeRem value.
105 const timeRemLimit = 999*time.Hour + 59*time.Minute + 59*time.Second
106
107 // Status represents the current Monitor status. All transfer rates are in bytes
108 // per second rounded to the nearest byte.
109 type Status struct {
110         Active   bool          // Flag indicating an active transfer
111         Start    time.Time     // Transfer start time
112         Duration time.Duration // Time period covered by the statistics
113         Idle     time.Duration // Time since the last transfer of at least 1 byte
114         Bytes    int64         // Total number of bytes transferred
115         Samples  int64         // Total number of samples taken
116         InstRate int64         // Instantaneous transfer rate
117         CurRate  int64         // Current transfer rate (EMA of InstRate)
118         AvgRate  int64         // Average transfer rate (Bytes / Duration)
119         PeakRate int64         // Maximum instantaneous transfer rate
120         BytesRem int64         // Number of bytes remaining in the transfer
121         TimeRem  time.Duration // Estimated time to completion
122         Progress Percent       // Overall transfer progress
123 }
124
125 // Status returns current transfer status information. The returned value
126 // becomes static after a call to Done.
127 func (m *Monitor) Status() Status {
128         m.mu.Lock()
129         now := m.update(0)
130         s := Status{
131                 Active:   m.active,
132                 Start:    clockToTime(m.start),
133                 Duration: m.sLast - m.start,
134                 Idle:     now - m.tLast,
135                 Bytes:    m.bytes,
136                 Samples:  m.samples,
137                 PeakRate: round(m.rPeak),
138                 BytesRem: m.tBytes - m.bytes,
139                 Progress: percentOf(float64(m.bytes), float64(m.tBytes)),
140         }
141         if s.BytesRem < 0 {
142                 s.BytesRem = 0
143         }
144         if s.Duration > 0 {
145                 rAvg := float64(s.Bytes) / s.Duration.Seconds()
146                 s.AvgRate = round(rAvg)
147                 if s.Active {
148                         s.InstRate = round(m.rSample)
149                         s.CurRate = round(m.rEMA)
150                         if s.BytesRem > 0 {
151                                 if tRate := 0.8*m.rEMA + 0.2*rAvg; tRate > 0 {
152                                         ns := float64(s.BytesRem) / tRate * 1e9
153                                         if ns > float64(timeRemLimit) {
154                                                 ns = float64(timeRemLimit)
155                                         }
156                                         s.TimeRem = clockRound(time.Duration(ns))
157                                 }
158                         }
159                 }
160         }
161         m.mu.Unlock()
162         return s
163 }
164
165 // Limit restricts the instantaneous (per-sample) data flow to rate bytes per
166 // second. It returns the maximum number of bytes (0 <= n <= want) that may be
167 // transferred immediately without exceeding the limit. If block == true, the
168 // call blocks until n > 0. want is returned unmodified if want < 1, rate < 1,
169 // or the transfer is inactive (after a call to Done).
170 //
171 // At least one byte is always allowed to be transferred in any given sampling
172 // period. Thus, if the sampling rate is 100ms, the lowest achievable flow rate
173 // is 10 bytes per second.
174 //
175 // For usage examples, see the implementation of Reader and Writer in io.go.
176 func (m *Monitor) Limit(want int, rate int64, block bool) (n int) {
177         if want < 1 || rate < 1 {
178                 return want
179         }
180         m.mu.Lock()
181
182         // Determine the maximum number of bytes that can be sent in one sample
183         limit := round(float64(rate) * m.sRate.Seconds())
184         if limit <= 0 {
185                 limit = 1
186         }
187
188         // If block == true, wait until m.sBytes < limit
189         if now := m.update(0); block {
190                 for m.sBytes >= limit && m.active {
191                         now = m.waitNextSample(now)
192                 }
193         }
194
195         // Make limit <= want (unlimited if the transfer is no longer active)
196         if limit -= m.sBytes; limit > int64(want) || !m.active {
197                 limit = int64(want)
198         }
199         m.mu.Unlock()
200
201         if limit < 0 {
202                 limit = 0
203         }
204         return int(limit)
205 }
206
207 // SetTransferSize specifies the total size of the data transfer, which allows
208 // the Monitor to calculate the overall progress and time to completion.
209 func (m *Monitor) SetTransferSize(bytes int64) {
210         if bytes < 0 {
211                 bytes = 0
212         }
213         m.mu.Lock()
214         m.tBytes = bytes
215         m.mu.Unlock()
216 }
217
218 // update accumulates the transferred byte count for the current sample until
219 // clock() - m.sLast >= m.sRate. The monitor status is updated once the current
220 // sample is done.
221 func (m *Monitor) update(n int) (now time.Duration) {
222         if !m.active {
223                 return
224         }
225         if now = clock(); n > 0 {
226                 m.tLast = now
227         }
228         m.sBytes += int64(n)
229         if sTime := now - m.sLast; sTime >= m.sRate {
230                 t := sTime.Seconds()
231                 if m.rSample = float64(m.sBytes) / t; m.rSample > m.rPeak {
232                         m.rPeak = m.rSample
233                 }
234
235                 // Exponential moving average using a method similar to *nix load
236                 // average calculation. Longer sampling periods carry greater weight.
237                 if m.samples > 0 {
238                         w := math.Exp(-t / m.rWindow)
239                         m.rEMA = m.rSample + w*(m.rEMA-m.rSample)
240                 } else {
241                         m.rEMA = m.rSample
242                 }
243                 m.reset(now)
244         }
245         return
246 }
247
248 // reset clears the current sample state in preparation for the next sample.
249 func (m *Monitor) reset(sampleTime time.Duration) {
250         m.bytes += m.sBytes
251         m.samples++
252         m.sBytes = 0
253         m.sLast = sampleTime
254 }
255
256 // waitNextSample sleeps for the remainder of the current sample. The lock is
257 // released and reacquired during the actual sleep period, so it's possible for
258 // the transfer to be inactive when this method returns.
259 func (m *Monitor) waitNextSample(now time.Duration) time.Duration {
260         const minWait = 5 * time.Millisecond
261         current := m.sLast
262
263         // sleep until the last sample time changes (ideally, just one iteration)
264         for m.sLast == current && m.active {
265                 d := current + m.sRate - now
266                 m.mu.Unlock()
267                 if d < minWait {
268                         d = minWait
269                 }
270                 time.Sleep(d)
271                 m.mu.Lock()
272                 now = m.update(0)
273         }
274         return now
275 }