OSDN Git Service

new repo
[bytom/vapor.git] / vendor / golang.org / x / net / http2 / writesched.go
1 // Copyright 2014 The Go Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style
3 // license that can be found in the LICENSE file.
4
5 package http2
6
7 import "fmt"
8
9 // WriteScheduler is the interface implemented by HTTP/2 write schedulers.
10 // Methods are never called concurrently.
11 type WriteScheduler interface {
12         // OpenStream opens a new stream in the write scheduler.
13         // It is illegal to call this with streamID=0 or with a streamID that is
14         // already open -- the call may panic.
15         OpenStream(streamID uint32, options OpenStreamOptions)
16
17         // CloseStream closes a stream in the write scheduler. Any frames queued on
18         // this stream should be discarded. It is illegal to call this on a stream
19         // that is not open -- the call may panic.
20         CloseStream(streamID uint32)
21
22         // AdjustStream adjusts the priority of the given stream. This may be called
23         // on a stream that has not yet been opened or has been closed. Note that
24         // RFC 7540 allows PRIORITY frames to be sent on streams in any state. See:
25         // https://tools.ietf.org/html/rfc7540#section-5.1
26         AdjustStream(streamID uint32, priority PriorityParam)
27
28         // Push queues a frame in the scheduler. In most cases, this will not be
29         // called with wr.StreamID()!=0 unless that stream is currently open. The one
30         // exception is RST_STREAM frames, which may be sent on idle or closed streams.
31         Push(wr FrameWriteRequest)
32
33         // Pop dequeues the next frame to write. Returns false if no frames can
34         // be written. Frames with a given wr.StreamID() are Pop'd in the same
35         // order they are Push'd.
36         Pop() (wr FrameWriteRequest, ok bool)
37 }
38
39 // OpenStreamOptions specifies extra options for WriteScheduler.OpenStream.
40 type OpenStreamOptions struct {
41         // PusherID is zero if the stream was initiated by the client. Otherwise,
42         // PusherID names the stream that pushed the newly opened stream.
43         PusherID uint32
44 }
45
46 // FrameWriteRequest is a request to write a frame.
47 type FrameWriteRequest struct {
48         // write is the interface value that does the writing, once the
49         // WriteScheduler has selected this frame to write. The write
50         // functions are all defined in write.go.
51         write writeFramer
52
53         // stream is the stream on which this frame will be written.
54         // nil for non-stream frames like PING and SETTINGS.
55         stream *stream
56
57         // done, if non-nil, must be a buffered channel with space for
58         // 1 message and is sent the return value from write (or an
59         // earlier error) when the frame has been written.
60         done chan error
61 }
62
63 // StreamID returns the id of the stream this frame will be written to.
64 // 0 is used for non-stream frames such as PING and SETTINGS.
65 func (wr FrameWriteRequest) StreamID() uint32 {
66         if wr.stream == nil {
67                 if se, ok := wr.write.(StreamError); ok {
68                         // (*serverConn).resetStream doesn't set
69                         // stream because it doesn't necessarily have
70                         // one. So special case this type of write
71                         // message.
72                         return se.StreamID
73                 }
74                 return 0
75         }
76         return wr.stream.id
77 }
78
79 // DataSize returns the number of flow control bytes that must be consumed
80 // to write this entire frame. This is 0 for non-DATA frames.
81 func (wr FrameWriteRequest) DataSize() int {
82         if wd, ok := wr.write.(*writeData); ok {
83                 return len(wd.p)
84         }
85         return 0
86 }
87
88 // Consume consumes min(n, available) bytes from this frame, where available
89 // is the number of flow control bytes available on the stream. Consume returns
90 // 0, 1, or 2 frames, where the integer return value gives the number of frames
91 // returned.
92 //
93 // If flow control prevents consuming any bytes, this returns (_, _, 0). If
94 // the entire frame was consumed, this returns (wr, _, 1). Otherwise, this
95 // returns (consumed, rest, 2), where 'consumed' contains the consumed bytes and
96 // 'rest' contains the remaining bytes. The consumed bytes are deducted from the
97 // underlying stream's flow control budget.
98 func (wr FrameWriteRequest) Consume(n int32) (FrameWriteRequest, FrameWriteRequest, int) {
99         var empty FrameWriteRequest
100
101         // Non-DATA frames are always consumed whole.
102         wd, ok := wr.write.(*writeData)
103         if !ok || len(wd.p) == 0 {
104                 return wr, empty, 1
105         }
106
107         // Might need to split after applying limits.
108         allowed := wr.stream.flow.available()
109         if n < allowed {
110                 allowed = n
111         }
112         if wr.stream.sc.maxFrameSize < allowed {
113                 allowed = wr.stream.sc.maxFrameSize
114         }
115         if allowed <= 0 {
116                 return empty, empty, 0
117         }
118         if len(wd.p) > int(allowed) {
119                 wr.stream.flow.take(allowed)
120                 consumed := FrameWriteRequest{
121                         stream: wr.stream,
122                         write: &writeData{
123                                 streamID: wd.streamID,
124                                 p:        wd.p[:allowed],
125                                 // Even if the original had endStream set, there
126                                 // are bytes remaining because len(wd.p) > allowed,
127                                 // so we know endStream is false.
128                                 endStream: false,
129                         },
130                         // Our caller is blocking on the final DATA frame, not
131                         // this intermediate frame, so no need to wait.
132                         done: nil,
133                 }
134                 rest := FrameWriteRequest{
135                         stream: wr.stream,
136                         write: &writeData{
137                                 streamID:  wd.streamID,
138                                 p:         wd.p[allowed:],
139                                 endStream: wd.endStream,
140                         },
141                         done: wr.done,
142                 }
143                 return consumed, rest, 2
144         }
145
146         // The frame is consumed whole.
147         // NB: This cast cannot overflow because allowed is <= math.MaxInt32.
148         wr.stream.flow.take(int32(len(wd.p)))
149         return wr, empty, 1
150 }
151
152 // String is for debugging only.
153 func (wr FrameWriteRequest) String() string {
154         var des string
155         if s, ok := wr.write.(fmt.Stringer); ok {
156                 des = s.String()
157         } else {
158                 des = fmt.Sprintf("%T", wr.write)
159         }
160         return fmt.Sprintf("[FrameWriteRequest stream=%d, ch=%v, writer=%v]", wr.StreamID(), wr.done != nil, des)
161 }
162
163 // replyToWriter sends err to wr.done and panics if the send must block
164 // This does nothing if wr.done is nil.
165 func (wr *FrameWriteRequest) replyToWriter(err error) {
166         if wr.done == nil {
167                 return
168         }
169         select {
170         case wr.done <- err:
171         default:
172                 panic(fmt.Sprintf("unbuffered done channel passed in for type %T", wr.write))
173         }
174         wr.write = nil // prevent use (assume it's tainted after wr.done send)
175 }
176
177 // writeQueue is used by implementations of WriteScheduler.
178 type writeQueue struct {
179         s []FrameWriteRequest
180 }
181
182 func (q *writeQueue) empty() bool { return len(q.s) == 0 }
183
184 func (q *writeQueue) push(wr FrameWriteRequest) {
185         q.s = append(q.s, wr)
186 }
187
188 func (q *writeQueue) shift() FrameWriteRequest {
189         if len(q.s) == 0 {
190                 panic("invalid use of queue")
191         }
192         wr := q.s[0]
193         // TODO: less copy-happy queue.
194         copy(q.s, q.s[1:])
195         q.s[len(q.s)-1] = FrameWriteRequest{}
196         q.s = q.s[:len(q.s)-1]
197         return wr
198 }
199
200 // consume consumes up to n bytes from q.s[0]. If the frame is
201 // entirely consumed, it is removed from the queue. If the frame
202 // is partially consumed, the frame is kept with the consumed
203 // bytes removed. Returns true iff any bytes were consumed.
204 func (q *writeQueue) consume(n int32) (FrameWriteRequest, bool) {
205         if len(q.s) == 0 {
206                 return FrameWriteRequest{}, false
207         }
208         consumed, rest, numresult := q.s[0].Consume(n)
209         switch numresult {
210         case 0:
211                 return FrameWriteRequest{}, false
212         case 1:
213                 q.shift()
214         case 2:
215                 q.s[0] = rest
216         }
217         return consumed, true
218 }
219
220 type writeQueuePool []*writeQueue
221
222 // put inserts an unused writeQueue into the pool.
223 func (p *writeQueuePool) put(q *writeQueue) {
224         for i := range q.s {
225                 q.s[i] = FrameWriteRequest{}
226         }
227         q.s = q.s[:0]
228         *p = append(*p, q)
229 }
230
231 // get returns an empty writeQueue.
232 func (p *writeQueuePool) get() *writeQueue {
233         ln := len(*p)
234         if ln == 0 {
235                 return new(writeQueue)
236         }
237         x := ln - 1
238         q := (*p)[x]
239         (*p)[x] = nil
240         *p = (*p)[:x]
241         return q
242 }