2 // Written by Maxim Khitrov (November 2012)
5 // Package flowrate provides the tools for monitoring and limiting the flow rate
6 // of an arbitrary data stream.
15 // Monitor monitors and limits the transfer rate of a data stream.
17 mu sync.Mutex // Mutex guarding access to all internal fields
18 active bool // Flag indicating an active transfer
19 start time.Duration // Transfer start time (clock() value)
20 bytes int64 // Total number of bytes transferred
21 samples int64 // Total number of samples taken
23 rSample float64 // Most recent transfer rate sample (bytes per second)
24 rEMA float64 // Exponential moving average of rSample
25 rPeak float64 // Peak transfer rate (max of all rSamples)
26 rWindow float64 // rEMA window (seconds)
28 sBytes int64 // Number of bytes transferred since sLast
29 sLast time.Duration // Most recent sample time (stop time when inactive)
30 sRate time.Duration // Sampling rate
32 tBytes int64 // Number of bytes expected in the current transfer
33 tLast time.Duration // Time of the most recent transfer of at least 1 byte
36 // New creates a new flow control monitor. Instantaneous transfer rate is
37 // measured and updated for each sampleRate interval. windowSize determines the
38 // weight of each sample in the exponential moving average (EMA) calculation.
39 // The exact formulas are:
41 // sampleTime = currentTime - prevSampleTime
42 // sampleRate = byteCount / sampleTime
43 // weight = 1 - exp(-sampleTime/windowSize)
44 // newRate = weight*sampleRate + (1-weight)*oldRate
46 // The default values for sampleRate and windowSize (if <= 0) are 100ms and 1s,
48 func New(sampleRate, windowSize time.Duration) *Monitor {
49 if sampleRate = clockRound(sampleRate); sampleRate <= 0 {
50 sampleRate = 5 * clockRate
53 windowSize = 1 * time.Second
59 rWindow: windowSize.Seconds(),
66 // Update records the transfer of n bytes and returns n. It should be called
67 // after each Read/Write operation, even if n is 0.
68 func (m *Monitor) Update(n int) int {
75 // Hack to set the current rEMA.
76 func (m *Monitor) SetREMA(rEMA float64) {
83 // IO is a convenience method intended to wrap io.Reader and io.Writer method
84 // execution. It calls m.Update(n) and then returns (n, err) unmodified.
85 func (m *Monitor) IO(n int, err error) (int, error) {
86 return m.Update(n), err
89 // Done marks the transfer as finished and prevents any further updates or
90 // limiting. Instantaneous and current transfer rates drop to 0. Update, IO, and
91 // Limit methods become NOOPs. It returns the total number of bytes transferred.
92 func (m *Monitor) Done() int64 {
94 if now := m.update(0); m.sBytes > 0 {
104 // timeRemLimit is the maximum Status.TimeRem value.
105 const timeRemLimit = 999*time.Hour + 59*time.Minute + 59*time.Second
107 // Status represents the current Monitor status. All transfer rates are in bytes
108 // per second rounded to the nearest byte.
110 Active bool // Flag indicating an active transfer
111 Start time.Time // Transfer start time
112 Duration time.Duration // Time period covered by the statistics
113 Idle time.Duration // Time since the last transfer of at least 1 byte
114 Bytes int64 // Total number of bytes transferred
115 Samples int64 // Total number of samples taken
116 InstRate int64 // Instantaneous transfer rate
117 CurRate int64 // Current transfer rate (EMA of InstRate)
118 AvgRate int64 // Average transfer rate (Bytes / Duration)
119 PeakRate int64 // Maximum instantaneous transfer rate
120 BytesRem int64 // Number of bytes remaining in the transfer
121 TimeRem time.Duration // Estimated time to completion
122 Progress Percent // Overall transfer progress
125 // Status returns current transfer status information. The returned value
126 // becomes static after a call to Done.
127 func (m *Monitor) Status() Status {
132 Start: clockToTime(m.start),
133 Duration: m.sLast - m.start,
137 PeakRate: round(m.rPeak),
138 BytesRem: m.tBytes - m.bytes,
139 Progress: percentOf(float64(m.bytes), float64(m.tBytes)),
145 rAvg := float64(s.Bytes) / s.Duration.Seconds()
146 s.AvgRate = round(rAvg)
148 s.InstRate = round(m.rSample)
149 s.CurRate = round(m.rEMA)
151 if tRate := 0.8*m.rEMA + 0.2*rAvg; tRate > 0 {
152 ns := float64(s.BytesRem) / tRate * 1e9
153 if ns > float64(timeRemLimit) {
154 ns = float64(timeRemLimit)
156 s.TimeRem = clockRound(time.Duration(ns))
165 // Limit restricts the instantaneous (per-sample) data flow to rate bytes per
166 // second. It returns the maximum number of bytes (0 <= n <= want) that may be
167 // transferred immediately without exceeding the limit. If block == true, the
168 // call blocks until n > 0. want is returned unmodified if want < 1, rate < 1,
169 // or the transfer is inactive (after a call to Done).
171 // At least one byte is always allowed to be transferred in any given sampling
172 // period. Thus, if the sampling rate is 100ms, the lowest achievable flow rate
173 // is 10 bytes per second.
175 // For usage examples, see the implementation of Reader and Writer in io.go.
176 func (m *Monitor) Limit(want int, rate int64, block bool) (n int) {
177 if want < 1 || rate < 1 {
182 // Determine the maximum number of bytes that can be sent in one sample
183 limit := round(float64(rate) * m.sRate.Seconds())
188 // If block == true, wait until m.sBytes < limit
189 if now := m.update(0); block {
190 for m.sBytes >= limit && m.active {
191 now = m.waitNextSample(now)
195 // Make limit <= want (unlimited if the transfer is no longer active)
196 if limit -= m.sBytes; limit > int64(want) || !m.active {
207 // SetTransferSize specifies the total size of the data transfer, which allows
208 // the Monitor to calculate the overall progress and time to completion.
209 func (m *Monitor) SetTransferSize(bytes int64) {
218 // update accumulates the transferred byte count for the current sample until
219 // clock() - m.sLast >= m.sRate. The monitor status is updated once the current
221 func (m *Monitor) update(n int) (now time.Duration) {
225 if now = clock(); n > 0 {
229 if sTime := now - m.sLast; sTime >= m.sRate {
231 if m.rSample = float64(m.sBytes) / t; m.rSample > m.rPeak {
235 // Exponential moving average using a method similar to *nix load
236 // average calculation. Longer sampling periods carry greater weight.
238 w := math.Exp(-t / m.rWindow)
239 m.rEMA = m.rSample + w*(m.rEMA-m.rSample)
248 // reset clears the current sample state in preparation for the next sample.
249 func (m *Monitor) reset(sampleTime time.Duration) {
256 // waitNextSample sleeps for the remainder of the current sample. The lock is
257 // released and reacquired during the actual sleep period, so it's possible for
258 // the transfer to be inactive when this method returns.
259 func (m *Monitor) waitNextSample(now time.Duration) time.Duration {
260 const minWait = 5 * time.Millisecond
263 // sleep until the last sample time changes (ideally, just one iteration)
264 for m.sLast == current && m.active {
265 d := current + m.sRate - now