OSDN Git Service

new repo
[bytom/vapor.git] / vendor / golang.org / x / net / http2 / writesched_priority.go
1 // Copyright 2016 The Go Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style
3 // license that can be found in the LICENSE file.
4
5 package http2
6
7 import (
8         "fmt"
9         "math"
10         "sort"
11 )
12
13 // RFC 7540, Section 5.3.5: the default weight is 16.
14 const priorityDefaultWeight = 15 // 16 = 15 + 1
15
16 // PriorityWriteSchedulerConfig configures a priorityWriteScheduler.
17 type PriorityWriteSchedulerConfig struct {
18         // MaxClosedNodesInTree controls the maximum number of closed streams to
19         // retain in the priority tree. Setting this to zero saves a small amount
20         // of memory at the cost of performance.
21         //
22         // See RFC 7540, Section 5.3.4:
23         //   "It is possible for a stream to become closed while prioritization
24         //   information ... is in transit. ... This potentially creates suboptimal
25         //   prioritization, since the stream could be given a priority that is
26         //   different from what is intended. To avoid these problems, an endpoint
27         //   SHOULD retain stream prioritization state for a period after streams
28         //   become closed. The longer state is retained, the lower the chance that
29         //   streams are assigned incorrect or default priority values."
30         MaxClosedNodesInTree int
31
32         // MaxIdleNodesInTree controls the maximum number of idle streams to
33         // retain in the priority tree. Setting this to zero saves a small amount
34         // of memory at the cost of performance.
35         //
36         // See RFC 7540, Section 5.3.4:
37         //   Similarly, streams that are in the "idle" state can be assigned
38         //   priority or become a parent of other streams. This allows for the
39         //   creation of a grouping node in the dependency tree, which enables
40         //   more flexible expressions of priority. Idle streams begin with a
41         //   default priority (Section 5.3.5).
42         MaxIdleNodesInTree int
43
44         // ThrottleOutOfOrderWrites enables write throttling to help ensure that
45         // data is delivered in priority order. This works around a race where
46         // stream B depends on stream A and both streams are about to call Write
47         // to queue DATA frames. If B wins the race, a naive scheduler would eagerly
48         // write as much data from B as possible, but this is suboptimal because A
49         // is a higher-priority stream. With throttling enabled, we write a small
50         // amount of data from B to minimize the amount of bandwidth that B can
51         // steal from A.
52         ThrottleOutOfOrderWrites bool
53 }
54
55 // NewPriorityWriteScheduler constructs a WriteScheduler that schedules
56 // frames by following HTTP/2 priorities as described in RFC 7540 Section 5.3.
57 // If cfg is nil, default options are used.
58 func NewPriorityWriteScheduler(cfg *PriorityWriteSchedulerConfig) WriteScheduler {
59         if cfg == nil {
60                 // For justification of these defaults, see:
61                 // https://docs.google.com/document/d/1oLhNg1skaWD4_DtaoCxdSRN5erEXrH-KnLrMwEpOtFY
62                 cfg = &PriorityWriteSchedulerConfig{
63                         MaxClosedNodesInTree:     10,
64                         MaxIdleNodesInTree:       10,
65                         ThrottleOutOfOrderWrites: false,
66                 }
67         }
68
69         ws := &priorityWriteScheduler{
70                 nodes:                make(map[uint32]*priorityNode),
71                 maxClosedNodesInTree: cfg.MaxClosedNodesInTree,
72                 maxIdleNodesInTree:   cfg.MaxIdleNodesInTree,
73                 enableWriteThrottle:  cfg.ThrottleOutOfOrderWrites,
74         }
75         ws.nodes[0] = &ws.root
76         if cfg.ThrottleOutOfOrderWrites {
77                 ws.writeThrottleLimit = 1024
78         } else {
79                 ws.writeThrottleLimit = math.MaxInt32
80         }
81         return ws
82 }
83
84 type priorityNodeState int
85
86 const (
87         priorityNodeOpen priorityNodeState = iota
88         priorityNodeClosed
89         priorityNodeIdle
90 )
91
92 // priorityNode is a node in an HTTP/2 priority tree.
93 // Each node is associated with a single stream ID.
94 // See RFC 7540, Section 5.3.
95 type priorityNode struct {
96         q            writeQueue        // queue of pending frames to write
97         id           uint32            // id of the stream, or 0 for the root of the tree
98         weight       uint8             // the actual weight is weight+1, so the value is in [1,256]
99         state        priorityNodeState // open | closed | idle
100         bytes        int64             // number of bytes written by this node, or 0 if closed
101         subtreeBytes int64             // sum(node.bytes) of all nodes in this subtree
102
103         // These links form the priority tree.
104         parent     *priorityNode
105         kids       *priorityNode // start of the kids list
106         prev, next *priorityNode // doubly-linked list of siblings
107 }
108
109 func (n *priorityNode) setParent(parent *priorityNode) {
110         if n == parent {
111                 panic("setParent to self")
112         }
113         if n.parent == parent {
114                 return
115         }
116         // Unlink from current parent.
117         if parent := n.parent; parent != nil {
118                 if n.prev == nil {
119                         parent.kids = n.next
120                 } else {
121                         n.prev.next = n.next
122                 }
123                 if n.next != nil {
124                         n.next.prev = n.prev
125                 }
126         }
127         // Link to new parent.
128         // If parent=nil, remove n from the tree.
129         // Always insert at the head of parent.kids (this is assumed by walkReadyInOrder).
130         n.parent = parent
131         if parent == nil {
132                 n.next = nil
133                 n.prev = nil
134         } else {
135                 n.next = parent.kids
136                 n.prev = nil
137                 if n.next != nil {
138                         n.next.prev = n
139                 }
140                 parent.kids = n
141         }
142 }
143
144 func (n *priorityNode) addBytes(b int64) {
145         n.bytes += b
146         for ; n != nil; n = n.parent {
147                 n.subtreeBytes += b
148         }
149 }
150
151 // walkReadyInOrder iterates over the tree in priority order, calling f for each node
152 // with a non-empty write queue. When f returns true, this funcion returns true and the
153 // walk halts. tmp is used as scratch space for sorting.
154 //
155 // f(n, openParent) takes two arguments: the node to visit, n, and a bool that is true
156 // if any ancestor p of n is still open (ignoring the root node).
157 func (n *priorityNode) walkReadyInOrder(openParent bool, tmp *[]*priorityNode, f func(*priorityNode, bool) bool) bool {
158         if !n.q.empty() && f(n, openParent) {
159                 return true
160         }
161         if n.kids == nil {
162                 return false
163         }
164
165         // Don't consider the root "open" when updating openParent since
166         // we can't send data frames on the root stream (only control frames).
167         if n.id != 0 {
168                 openParent = openParent || (n.state == priorityNodeOpen)
169         }
170
171         // Common case: only one kid or all kids have the same weight.
172         // Some clients don't use weights; other clients (like web browsers)
173         // use mostly-linear priority trees.
174         w := n.kids.weight
175         needSort := false
176         for k := n.kids.next; k != nil; k = k.next {
177                 if k.weight != w {
178                         needSort = true
179                         break
180                 }
181         }
182         if !needSort {
183                 for k := n.kids; k != nil; k = k.next {
184                         if k.walkReadyInOrder(openParent, tmp, f) {
185                                 return true
186                         }
187                 }
188                 return false
189         }
190
191         // Uncommon case: sort the child nodes. We remove the kids from the parent,
192         // then re-insert after sorting so we can reuse tmp for future sort calls.
193         *tmp = (*tmp)[:0]
194         for n.kids != nil {
195                 *tmp = append(*tmp, n.kids)
196                 n.kids.setParent(nil)
197         }
198         sort.Sort(sortPriorityNodeSiblings(*tmp))
199         for i := len(*tmp) - 1; i >= 0; i-- {
200                 (*tmp)[i].setParent(n) // setParent inserts at the head of n.kids
201         }
202         for k := n.kids; k != nil; k = k.next {
203                 if k.walkReadyInOrder(openParent, tmp, f) {
204                         return true
205                 }
206         }
207         return false
208 }
209
210 type sortPriorityNodeSiblings []*priorityNode
211
212 func (z sortPriorityNodeSiblings) Len() int      { return len(z) }
213 func (z sortPriorityNodeSiblings) Swap(i, k int) { z[i], z[k] = z[k], z[i] }
214 func (z sortPriorityNodeSiblings) Less(i, k int) bool {
215         // Prefer the subtree that has sent fewer bytes relative to its weight.
216         // See sections 5.3.2 and 5.3.4.
217         wi, bi := float64(z[i].weight+1), float64(z[i].subtreeBytes)
218         wk, bk := float64(z[k].weight+1), float64(z[k].subtreeBytes)
219         if bi == 0 && bk == 0 {
220                 return wi >= wk
221         }
222         if bk == 0 {
223                 return false
224         }
225         return bi/bk <= wi/wk
226 }
227
228 type priorityWriteScheduler struct {
229         // root is the root of the priority tree, where root.id = 0.
230         // The root queues control frames that are not associated with any stream.
231         root priorityNode
232
233         // nodes maps stream ids to priority tree nodes.
234         nodes map[uint32]*priorityNode
235
236         // maxID is the maximum stream id in nodes.
237         maxID uint32
238
239         // lists of nodes that have been closed or are idle, but are kept in
240         // the tree for improved prioritization. When the lengths exceed either
241         // maxClosedNodesInTree or maxIdleNodesInTree, old nodes are discarded.
242         closedNodes, idleNodes []*priorityNode
243
244         // From the config.
245         maxClosedNodesInTree int
246         maxIdleNodesInTree   int
247         writeThrottleLimit   int32
248         enableWriteThrottle  bool
249
250         // tmp is scratch space for priorityNode.walkReadyInOrder to reduce allocations.
251         tmp []*priorityNode
252
253         // pool of empty queues for reuse.
254         queuePool writeQueuePool
255 }
256
257 func (ws *priorityWriteScheduler) OpenStream(streamID uint32, options OpenStreamOptions) {
258         // The stream may be currently idle but cannot be opened or closed.
259         if curr := ws.nodes[streamID]; curr != nil {
260                 if curr.state != priorityNodeIdle {
261                         panic(fmt.Sprintf("stream %d already opened", streamID))
262                 }
263                 curr.state = priorityNodeOpen
264                 return
265         }
266
267         // RFC 7540, Section 5.3.5:
268         //  "All streams are initially assigned a non-exclusive dependency on stream 0x0.
269         //  Pushed streams initially depend on their associated stream. In both cases,
270         //  streams are assigned a default weight of 16."
271         parent := ws.nodes[options.PusherID]
272         if parent == nil {
273                 parent = &ws.root
274         }
275         n := &priorityNode{
276                 q:      *ws.queuePool.get(),
277                 id:     streamID,
278                 weight: priorityDefaultWeight,
279                 state:  priorityNodeOpen,
280         }
281         n.setParent(parent)
282         ws.nodes[streamID] = n
283         if streamID > ws.maxID {
284                 ws.maxID = streamID
285         }
286 }
287
288 func (ws *priorityWriteScheduler) CloseStream(streamID uint32) {
289         if streamID == 0 {
290                 panic("violation of WriteScheduler interface: cannot close stream 0")
291         }
292         if ws.nodes[streamID] == nil {
293                 panic(fmt.Sprintf("violation of WriteScheduler interface: unknown stream %d", streamID))
294         }
295         if ws.nodes[streamID].state != priorityNodeOpen {
296                 panic(fmt.Sprintf("violation of WriteScheduler interface: stream %d already closed", streamID))
297         }
298
299         n := ws.nodes[streamID]
300         n.state = priorityNodeClosed
301         n.addBytes(-n.bytes)
302
303         q := n.q
304         ws.queuePool.put(&q)
305         n.q.s = nil
306         if ws.maxClosedNodesInTree > 0 {
307                 ws.addClosedOrIdleNode(&ws.closedNodes, ws.maxClosedNodesInTree, n)
308         } else {
309                 ws.removeNode(n)
310         }
311 }
312
313 func (ws *priorityWriteScheduler) AdjustStream(streamID uint32, priority PriorityParam) {
314         if streamID == 0 {
315                 panic("adjustPriority on root")
316         }
317
318         // If streamID does not exist, there are two cases:
319         // - A closed stream that has been removed (this will have ID <= maxID)
320         // - An idle stream that is being used for "grouping" (this will have ID > maxID)
321         n := ws.nodes[streamID]
322         if n == nil {
323                 if streamID <= ws.maxID || ws.maxIdleNodesInTree == 0 {
324                         return
325                 }
326                 ws.maxID = streamID
327                 n = &priorityNode{
328                         q:      *ws.queuePool.get(),
329                         id:     streamID,
330                         weight: priorityDefaultWeight,
331                         state:  priorityNodeIdle,
332                 }
333                 n.setParent(&ws.root)
334                 ws.nodes[streamID] = n
335                 ws.addClosedOrIdleNode(&ws.idleNodes, ws.maxIdleNodesInTree, n)
336         }
337
338         // Section 5.3.1: A dependency on a stream that is not currently in the tree
339         // results in that stream being given a default priority (Section 5.3.5).
340         parent := ws.nodes[priority.StreamDep]
341         if parent == nil {
342                 n.setParent(&ws.root)
343                 n.weight = priorityDefaultWeight
344                 return
345         }
346
347         // Ignore if the client tries to make a node its own parent.
348         if n == parent {
349                 return
350         }
351
352         // Section 5.3.3:
353         //   "If a stream is made dependent on one of its own dependencies, the
354         //   formerly dependent stream is first moved to be dependent on the
355         //   reprioritized stream's previous parent. The moved dependency retains
356         //   its weight."
357         //
358         // That is: if parent depends on n, move parent to depend on n.parent.
359         for x := parent.parent; x != nil; x = x.parent {
360                 if x == n {
361                         parent.setParent(n.parent)
362                         break
363                 }
364         }
365
366         // Section 5.3.3: The exclusive flag causes the stream to become the sole
367         // dependency of its parent stream, causing other dependencies to become
368         // dependent on the exclusive stream.
369         if priority.Exclusive {
370                 k := parent.kids
371                 for k != nil {
372                         next := k.next
373                         if k != n {
374                                 k.setParent(n)
375                         }
376                         k = next
377                 }
378         }
379
380         n.setParent(parent)
381         n.weight = priority.Weight
382 }
383
384 func (ws *priorityWriteScheduler) Push(wr FrameWriteRequest) {
385         var n *priorityNode
386         if id := wr.StreamID(); id == 0 {
387                 n = &ws.root
388         } else {
389                 n = ws.nodes[id]
390                 if n == nil {
391                         // id is an idle or closed stream. wr should not be a HEADERS or
392                         // DATA frame. However, wr can be a RST_STREAM. In this case, we
393                         // push wr onto the root, rather than creating a new priorityNode,
394                         // since RST_STREAM is tiny and the stream's priority is unknown
395                         // anyway. See issue #17919.
396                         if wr.DataSize() > 0 {
397                                 panic("add DATA on non-open stream")
398                         }
399                         n = &ws.root
400                 }
401         }
402         n.q.push(wr)
403 }
404
405 func (ws *priorityWriteScheduler) Pop() (wr FrameWriteRequest, ok bool) {
406         ws.root.walkReadyInOrder(false, &ws.tmp, func(n *priorityNode, openParent bool) bool {
407                 limit := int32(math.MaxInt32)
408                 if openParent {
409                         limit = ws.writeThrottleLimit
410                 }
411                 wr, ok = n.q.consume(limit)
412                 if !ok {
413                         return false
414                 }
415                 n.addBytes(int64(wr.DataSize()))
416                 // If B depends on A and B continuously has data available but A
417                 // does not, gradually increase the throttling limit to allow B to
418                 // steal more and more bandwidth from A.
419                 if openParent {
420                         ws.writeThrottleLimit += 1024
421                         if ws.writeThrottleLimit < 0 {
422                                 ws.writeThrottleLimit = math.MaxInt32
423                         }
424                 } else if ws.enableWriteThrottle {
425                         ws.writeThrottleLimit = 1024
426                 }
427                 return true
428         })
429         return wr, ok
430 }
431
432 func (ws *priorityWriteScheduler) addClosedOrIdleNode(list *[]*priorityNode, maxSize int, n *priorityNode) {
433         if maxSize == 0 {
434                 return
435         }
436         if len(*list) == maxSize {
437                 // Remove the oldest node, then shift left.
438                 ws.removeNode((*list)[0])
439                 x := (*list)[1:]
440                 copy(*list, x)
441                 *list = (*list)[:len(x)]
442         }
443         *list = append(*list, n)
444 }
445
446 func (ws *priorityWriteScheduler) removeNode(n *priorityNode) {
447         for k := n.kids; k != nil; k = k.next {
448                 k.setParent(n.parent)
449         }
450         n.setParent(nil)
451         delete(ws.nodes, n.id)
452 }