OSDN Git Service

new repo
[bytom/vapor.git] / vendor / golang.org / x / net / http2 / pipe.go
1 // Copyright 2014 The Go Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style
3 // license that can be found in the LICENSE file.
4
5 package http2
6
7 import (
8         "errors"
9         "io"
10         "sync"
11 )
12
13 // pipe is a goroutine-safe io.Reader/io.Writer pair. It's like
14 // io.Pipe except there are no PipeReader/PipeWriter halves, and the
15 // underlying buffer is an interface. (io.Pipe is always unbuffered)
16 type pipe struct {
17         mu       sync.Mutex
18         c        sync.Cond     // c.L lazily initialized to &p.mu
19         b        pipeBuffer    // nil when done reading
20         err      error         // read error once empty. non-nil means closed.
21         breakErr error         // immediate read error (caller doesn't see rest of b)
22         donec    chan struct{} // closed on error
23         readFn   func()        // optional code to run in Read before error
24 }
25
26 type pipeBuffer interface {
27         Len() int
28         io.Writer
29         io.Reader
30 }
31
32 func (p *pipe) Len() int {
33         p.mu.Lock()
34         defer p.mu.Unlock()
35         if p.b == nil {
36                 return 0
37         }
38         return p.b.Len()
39 }
40
41 // Read waits until data is available and copies bytes
42 // from the buffer into p.
43 func (p *pipe) Read(d []byte) (n int, err error) {
44         p.mu.Lock()
45         defer p.mu.Unlock()
46         if p.c.L == nil {
47                 p.c.L = &p.mu
48         }
49         for {
50                 if p.breakErr != nil {
51                         return 0, p.breakErr
52                 }
53                 if p.b != nil && p.b.Len() > 0 {
54                         return p.b.Read(d)
55                 }
56                 if p.err != nil {
57                         if p.readFn != nil {
58                                 p.readFn()     // e.g. copy trailers
59                                 p.readFn = nil // not sticky like p.err
60                         }
61                         p.b = nil
62                         return 0, p.err
63                 }
64                 p.c.Wait()
65         }
66 }
67
68 var errClosedPipeWrite = errors.New("write on closed buffer")
69
70 // Write copies bytes from p into the buffer and wakes a reader.
71 // It is an error to write more data than the buffer can hold.
72 func (p *pipe) Write(d []byte) (n int, err error) {
73         p.mu.Lock()
74         defer p.mu.Unlock()
75         if p.c.L == nil {
76                 p.c.L = &p.mu
77         }
78         defer p.c.Signal()
79         if p.err != nil {
80                 return 0, errClosedPipeWrite
81         }
82         if p.breakErr != nil {
83                 return len(d), nil // discard when there is no reader
84         }
85         return p.b.Write(d)
86 }
87
88 // CloseWithError causes the next Read (waking up a current blocked
89 // Read if needed) to return the provided err after all data has been
90 // read.
91 //
92 // The error must be non-nil.
93 func (p *pipe) CloseWithError(err error) { p.closeWithError(&p.err, err, nil) }
94
95 // BreakWithError causes the next Read (waking up a current blocked
96 // Read if needed) to return the provided err immediately, without
97 // waiting for unread data.
98 func (p *pipe) BreakWithError(err error) { p.closeWithError(&p.breakErr, err, nil) }
99
100 // closeWithErrorAndCode is like CloseWithError but also sets some code to run
101 // in the caller's goroutine before returning the error.
102 func (p *pipe) closeWithErrorAndCode(err error, fn func()) { p.closeWithError(&p.err, err, fn) }
103
104 func (p *pipe) closeWithError(dst *error, err error, fn func()) {
105         if err == nil {
106                 panic("err must be non-nil")
107         }
108         p.mu.Lock()
109         defer p.mu.Unlock()
110         if p.c.L == nil {
111                 p.c.L = &p.mu
112         }
113         defer p.c.Signal()
114         if *dst != nil {
115                 // Already been done.
116                 return
117         }
118         p.readFn = fn
119         if dst == &p.breakErr {
120                 p.b = nil
121         }
122         *dst = err
123         p.closeDoneLocked()
124 }
125
126 // requires p.mu be held.
127 func (p *pipe) closeDoneLocked() {
128         if p.donec == nil {
129                 return
130         }
131         // Close if unclosed. This isn't racy since we always
132         // hold p.mu while closing.
133         select {
134         case <-p.donec:
135         default:
136                 close(p.donec)
137         }
138 }
139
140 // Err returns the error (if any) first set by BreakWithError or CloseWithError.
141 func (p *pipe) Err() error {
142         p.mu.Lock()
143         defer p.mu.Unlock()
144         if p.breakErr != nil {
145                 return p.breakErr
146         }
147         return p.err
148 }
149
150 // Done returns a channel which is closed if and when this pipe is closed
151 // with CloseWithError.
152 func (p *pipe) Done() <-chan struct{} {
153         p.mu.Lock()
154         defer p.mu.Unlock()
155         if p.donec == nil {
156                 p.donec = make(chan struct{})
157                 if p.err != nil || p.breakErr != nil {
158                         // Already hit an error.
159                         p.closeDoneLocked()
160                 }
161         }
162         return p.donec
163 }