3 * Copyright 2014 gRPC authors.
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
9 * http://www.apache.org/licenses/LICENSE-2.0
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
28 "golang.org/x/net/http2"
29 "golang.org/x/net/http2/hpack"
33 // The default value of flow control window size in HTTP2 spec.
34 defaultWindowSize = 65535
35 // The initial window size for flow control.
36 initialWindowSize = defaultWindowSize // for an RPC
37 infinity = time.Duration(math.MaxInt64)
38 defaultClientKeepaliveTime = infinity
39 defaultClientKeepaliveTimeout = time.Duration(20 * time.Second)
40 defaultMaxStreamsClient = 100
41 defaultMaxConnectionIdle = infinity
42 defaultMaxConnectionAge = infinity
43 defaultMaxConnectionAgeGrace = infinity
44 defaultServerKeepaliveTime = time.Duration(2 * time.Hour)
45 defaultServerKeepaliveTimeout = time.Duration(20 * time.Second)
46 defaultKeepalivePolicyMinTime = time.Duration(5 * time.Minute)
47 // max window limit set by HTTP2 Specs.
48 maxWindowSize = math.MaxInt32
49 // defaultLocalSendQuota sets is default value for number of data
50 // bytes that each stream can schedule before some of it being
52 defaultLocalSendQuota = 64 * 1024
55 // The following defines various control items which could flow through
56 // the control buffer of transport. They represent different aspects of
57 // control tasks, e.g., flow control, settings, streaming resetting, etc.
59 type headerFrame struct {
61 hf []hpack.HeaderField
65 func (*headerFrame) item() {}
67 type continuationFrame struct {
70 headerBlockFragment []byte
73 type dataFrame struct {
80 func (*dataFrame) item() {}
82 func (*continuationFrame) item() {}
84 type windowUpdate struct {
89 func (*windowUpdate) item() {}
91 type settings struct {
96 func (*settings) item() {}
98 type resetStream struct {
103 func (*resetStream) item() {}
112 func (*goAway) item() {}
114 type flushIO struct {
117 func (*flushIO) item() {}
124 func (*ping) item() {}
126 // quotaPool is a pool which accumulates the quota and sends it to acquire()
127 // when it is available.
128 type quotaPool struct {
136 // newQuotaPool creates a quotaPool which has quota q available to consume.
137 func newQuotaPool(q int) *quotaPool {
139 c: make(chan int, 1),
149 // add cancels the pending quota sent on acquired, incremented by v and sends
150 // it back on acquire.
151 func (qb *quotaPool) add(v int) {
157 func (qb *quotaPool) lockedAdd(v int) {
167 // After the pool has been created, this is the only place that sends on
168 // the channel. Since mu is held at this point and any quota that was sent
169 // on the channel has been retrieved, we know that this code will always
170 // place any positive quota value on the channel.
172 case qb.c <- qb.quota:
178 func (qb *quotaPool) addAndUpdate(v int) {
182 // Update the version only after having added to the quota
183 // so that if acquireWithVesrion sees the new vesrion it is
184 // guaranteed to have seen the updated quota.
185 // Also, still keep this inside of the lock, so that when
186 // compareAndExecute is processing, this function doesn't
187 // get executed partially (quota gets updated but the version
189 atomic.AddUint32(&(qb.version), 1)
192 func (qb *quotaPool) acquireWithVersion() (<-chan int, uint32) {
193 return qb.c, atomic.LoadUint32(&(qb.version))
196 func (qb *quotaPool) compareAndExecute(version uint32, success, failure func()) bool {
199 if version == atomic.LoadUint32(&(qb.version)) {
207 // acquire returns the channel on which available quota amounts are sent.
208 func (qb *quotaPool) acquire() <-chan int {
212 // inFlow deals with inbound flow control
215 // The inbound flow control limit for pending data.
217 // pendingData is the overall data which have been received but not been
218 // consumed by applications.
220 // The amount of data the application has consumed but grpc has not sent
221 // window update for them. Used to reduce window update frequency.
223 // delta is the extra window update given by receiver when an application
224 // is reading data bigger in size than the inFlow limit.
228 // newLimit updates the inflow window to a new value n.
229 // It assumes that n is always greater than the old limit.
230 func (f *inFlow) newLimit(n uint32) uint32 {
238 func (f *inFlow) maybeAdjust(n uint32) uint32 {
239 if n > uint32(math.MaxInt32) {
240 n = uint32(math.MaxInt32)
244 // estSenderQuota is the receiver's view of the maximum number of bytes the sender
245 // can send without a window update.
246 estSenderQuota := int32(f.limit - (f.pendingData + f.pendingUpdate))
247 // estUntransmittedData is the maximum number of bytes the sends might not have put
248 // on the wire yet. A value of 0 or less means that we have already received all or
249 // more bytes than the application is requesting to read.
250 estUntransmittedData := int32(n - f.pendingData) // Casting into int32 since it could be negative.
251 // This implies that unless we send a window update, the sender won't be able to send all the bytes
252 // for this message. Therefore we must send an update over the limit since there's an active read
253 // request from the application.
254 if estUntransmittedData > estSenderQuota {
255 // Sender's window shouldn't go more than 2^31 - 1 as speecified in the HTTP spec.
256 if f.limit+n > maxWindowSize {
257 f.delta = maxWindowSize - f.limit
259 // Send a window update for the whole message and not just the difference between
260 // estUntransmittedData and estSenderQuota. This will be helpful in case the message
261 // is padded; We will fallback on the current available window(at least a 1/4th of the limit).
269 // onData is invoked when some data frame is received. It updates pendingData.
270 func (f *inFlow) onData(n uint32) error {
274 if f.pendingData+f.pendingUpdate > f.limit+f.delta {
275 return fmt.Errorf("received %d-bytes data exceeding the limit %d bytes", f.pendingData+f.pendingUpdate, f.limit)
280 // onRead is invoked when the application reads the data. It returns the window size
281 // to be sent to the peer.
282 func (f *inFlow) onRead(n uint32) uint32 {
285 if f.pendingData == 0 {
297 if f.pendingUpdate >= f.limit/4 {
298 wu := f.pendingUpdate
305 func (f *inFlow) resetPendingUpdate() uint32 {