OSDN Git Service

new repo
[bytom/vapor.git] / vendor / google.golang.org / grpc / transport / control.go
1 /*
2  *
3  * Copyright 2014 gRPC authors.
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  *     http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  *
17  */
18
19 package transport
20
21 import (
22         "fmt"
23         "math"
24         "sync"
25         "sync/atomic"
26         "time"
27
28         "golang.org/x/net/http2"
29         "golang.org/x/net/http2/hpack"
30 )
31
32 const (
33         // The default value of flow control window size in HTTP2 spec.
34         defaultWindowSize = 65535
35         // The initial window size for flow control.
36         initialWindowSize             = defaultWindowSize // for an RPC
37         infinity                      = time.Duration(math.MaxInt64)
38         defaultClientKeepaliveTime    = infinity
39         defaultClientKeepaliveTimeout = time.Duration(20 * time.Second)
40         defaultMaxStreamsClient       = 100
41         defaultMaxConnectionIdle      = infinity
42         defaultMaxConnectionAge       = infinity
43         defaultMaxConnectionAgeGrace  = infinity
44         defaultServerKeepaliveTime    = time.Duration(2 * time.Hour)
45         defaultServerKeepaliveTimeout = time.Duration(20 * time.Second)
46         defaultKeepalivePolicyMinTime = time.Duration(5 * time.Minute)
47         // max window limit set by HTTP2 Specs.
48         maxWindowSize = math.MaxInt32
49         // defaultLocalSendQuota sets is default value for number of data
50         // bytes that each stream can schedule before some of it being
51         // flushed out.
52         defaultLocalSendQuota = 64 * 1024
53 )
54
55 // The following defines various control items which could flow through
56 // the control buffer of transport. They represent different aspects of
57 // control tasks, e.g., flow control, settings, streaming resetting, etc.
58
59 type headerFrame struct {
60         streamID  uint32
61         hf        []hpack.HeaderField
62         endStream bool
63 }
64
65 func (*headerFrame) item() {}
66
67 type continuationFrame struct {
68         streamID            uint32
69         endHeaders          bool
70         headerBlockFragment []byte
71 }
72
73 type dataFrame struct {
74         streamID  uint32
75         endStream bool
76         d         []byte
77         f         func()
78 }
79
80 func (*dataFrame) item() {}
81
82 func (*continuationFrame) item() {}
83
84 type windowUpdate struct {
85         streamID  uint32
86         increment uint32
87 }
88
89 func (*windowUpdate) item() {}
90
91 type settings struct {
92         ack bool
93         ss  []http2.Setting
94 }
95
96 func (*settings) item() {}
97
98 type resetStream struct {
99         streamID uint32
100         code     http2.ErrCode
101 }
102
103 func (*resetStream) item() {}
104
105 type goAway struct {
106         code      http2.ErrCode
107         debugData []byte
108         headsUp   bool
109         closeConn bool
110 }
111
112 func (*goAway) item() {}
113
114 type flushIO struct {
115 }
116
117 func (*flushIO) item() {}
118
119 type ping struct {
120         ack  bool
121         data [8]byte
122 }
123
124 func (*ping) item() {}
125
126 // quotaPool is a pool which accumulates the quota and sends it to acquire()
127 // when it is available.
128 type quotaPool struct {
129         c chan int
130
131         mu      sync.Mutex
132         version uint32
133         quota   int
134 }
135
136 // newQuotaPool creates a quotaPool which has quota q available to consume.
137 func newQuotaPool(q int) *quotaPool {
138         qb := &quotaPool{
139                 c: make(chan int, 1),
140         }
141         if q > 0 {
142                 qb.c <- q
143         } else {
144                 qb.quota = q
145         }
146         return qb
147 }
148
149 // add cancels the pending quota sent on acquired, incremented by v and sends
150 // it back on acquire.
151 func (qb *quotaPool) add(v int) {
152         qb.mu.Lock()
153         defer qb.mu.Unlock()
154         qb.lockedAdd(v)
155 }
156
157 func (qb *quotaPool) lockedAdd(v int) {
158         select {
159         case n := <-qb.c:
160                 qb.quota += n
161         default:
162         }
163         qb.quota += v
164         if qb.quota <= 0 {
165                 return
166         }
167         // After the pool has been created, this is the only place that sends on
168         // the channel. Since mu is held at this point and any quota that was sent
169         // on the channel has been retrieved, we know that this code will always
170         // place any positive quota value on the channel.
171         select {
172         case qb.c <- qb.quota:
173                 qb.quota = 0
174         default:
175         }
176 }
177
178 func (qb *quotaPool) addAndUpdate(v int) {
179         qb.mu.Lock()
180         defer qb.mu.Unlock()
181         qb.lockedAdd(v)
182         // Update the version only after having added to the quota
183         // so that if acquireWithVesrion sees the new vesrion it is
184         // guaranteed to have seen the updated quota.
185         // Also, still keep this inside of the lock, so that when
186         // compareAndExecute is processing, this function doesn't
187         // get executed partially (quota gets updated but the version
188         // doesn't).
189         atomic.AddUint32(&(qb.version), 1)
190 }
191
192 func (qb *quotaPool) acquireWithVersion() (<-chan int, uint32) {
193         return qb.c, atomic.LoadUint32(&(qb.version))
194 }
195
196 func (qb *quotaPool) compareAndExecute(version uint32, success, failure func()) bool {
197         qb.mu.Lock()
198         defer qb.mu.Unlock()
199         if version == atomic.LoadUint32(&(qb.version)) {
200                 success()
201                 return true
202         }
203         failure()
204         return false
205 }
206
207 // acquire returns the channel on which available quota amounts are sent.
208 func (qb *quotaPool) acquire() <-chan int {
209         return qb.c
210 }
211
212 // inFlow deals with inbound flow control
213 type inFlow struct {
214         mu sync.Mutex
215         // The inbound flow control limit for pending data.
216         limit uint32
217         // pendingData is the overall data which have been received but not been
218         // consumed by applications.
219         pendingData uint32
220         // The amount of data the application has consumed but grpc has not sent
221         // window update for them. Used to reduce window update frequency.
222         pendingUpdate uint32
223         // delta is the extra window update given by receiver when an application
224         // is reading data bigger in size than the inFlow limit.
225         delta uint32
226 }
227
228 // newLimit updates the inflow window to a new value n.
229 // It assumes that n is always greater than the old limit.
230 func (f *inFlow) newLimit(n uint32) uint32 {
231         f.mu.Lock()
232         defer f.mu.Unlock()
233         d := n - f.limit
234         f.limit = n
235         return d
236 }
237
238 func (f *inFlow) maybeAdjust(n uint32) uint32 {
239         if n > uint32(math.MaxInt32) {
240                 n = uint32(math.MaxInt32)
241         }
242         f.mu.Lock()
243         defer f.mu.Unlock()
244         // estSenderQuota is the receiver's view of the maximum number of bytes the sender
245         // can send without a window update.
246         estSenderQuota := int32(f.limit - (f.pendingData + f.pendingUpdate))
247         // estUntransmittedData is the maximum number of bytes the sends might not have put
248         // on the wire yet. A value of 0 or less means that we have already received all or
249         // more bytes than the application is requesting to read.
250         estUntransmittedData := int32(n - f.pendingData) // Casting into int32 since it could be negative.
251         // This implies that unless we send a window update, the sender won't be able to send all the bytes
252         // for this message. Therefore we must send an update over the limit since there's an active read
253         // request from the application.
254         if estUntransmittedData > estSenderQuota {
255                 // Sender's window shouldn't go more than 2^31 - 1 as speecified in the HTTP spec.
256                 if f.limit+n > maxWindowSize {
257                         f.delta = maxWindowSize - f.limit
258                 } else {
259                         // Send a window update for the whole message and not just the difference between
260                         // estUntransmittedData and estSenderQuota. This will be helpful in case the message
261                         // is padded; We will fallback on the current available window(at least a 1/4th of the limit).
262                         f.delta = n
263                 }
264                 return f.delta
265         }
266         return 0
267 }
268
269 // onData is invoked when some data frame is received. It updates pendingData.
270 func (f *inFlow) onData(n uint32) error {
271         f.mu.Lock()
272         defer f.mu.Unlock()
273         f.pendingData += n
274         if f.pendingData+f.pendingUpdate > f.limit+f.delta {
275                 return fmt.Errorf("received %d-bytes data exceeding the limit %d bytes", f.pendingData+f.pendingUpdate, f.limit)
276         }
277         return nil
278 }
279
280 // onRead is invoked when the application reads the data. It returns the window size
281 // to be sent to the peer.
282 func (f *inFlow) onRead(n uint32) uint32 {
283         f.mu.Lock()
284         defer f.mu.Unlock()
285         if f.pendingData == 0 {
286                 return 0
287         }
288         f.pendingData -= n
289         if n > f.delta {
290                 n -= f.delta
291                 f.delta = 0
292         } else {
293                 f.delta -= n
294                 n = 0
295         }
296         f.pendingUpdate += n
297         if f.pendingUpdate >= f.limit/4 {
298                 wu := f.pendingUpdate
299                 f.pendingUpdate = 0
300                 return wu
301         }
302         return 0
303 }
304
305 func (f *inFlow) resetPendingUpdate() uint32 {
306         f.mu.Lock()
307         defer f.mu.Unlock()
308         n := f.pendingUpdate
309         f.pendingUpdate = 0
310         return n
311 }