OSDN Git Service

new repo
[bytom/vapor.git] / vendor / google.golang.org / grpc / test / bufconn / bufconn.go
1 /*
2  *
3  * Copyright 2017 gRPC authors.
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  *     http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  *
17  */
18
19 // Package bufconn provides a net.Conn implemented by a buffer and related
20 // dialing and listening functionality.
21 package bufconn
22
23 import (
24         "fmt"
25         "io"
26         "net"
27         "sync"
28         "time"
29 )
30
31 // Listener implements a net.Listener that creates local, buffered net.Conns
32 // via its Accept and Dial method.
33 type Listener struct {
34         mu   sync.Mutex
35         sz   int
36         ch   chan net.Conn
37         done chan struct{}
38 }
39
40 var errClosed = fmt.Errorf("Closed")
41
42 // Listen returns a Listener that can only be contacted by its own Dialers and
43 // creates buffered connections between the two.
44 func Listen(sz int) *Listener {
45         return &Listener{sz: sz, ch: make(chan net.Conn), done: make(chan struct{})}
46 }
47
48 // Accept blocks until Dial is called, then returns a net.Conn for the server
49 // half of the connection.
50 func (l *Listener) Accept() (net.Conn, error) {
51         select {
52         case <-l.done:
53                 return nil, errClosed
54         case c := <-l.ch:
55                 return c, nil
56         }
57 }
58
59 // Close stops the listener.
60 func (l *Listener) Close() error {
61         l.mu.Lock()
62         defer l.mu.Unlock()
63         select {
64         case <-l.done:
65                 // Already closed.
66                 break
67         default:
68                 close(l.done)
69         }
70         return nil
71 }
72
73 // Addr reports the address of the listener.
74 func (l *Listener) Addr() net.Addr { return addr{} }
75
76 // Dial creates an in-memory full-duplex network connection, unblocks Accept by
77 // providing it the server half of the connection, and returns the client half
78 // of the connection.
79 func (l *Listener) Dial() (net.Conn, error) {
80         p1, p2 := newPipe(l.sz), newPipe(l.sz)
81         select {
82         case <-l.done:
83                 return nil, errClosed
84         case l.ch <- &conn{p1, p2}:
85                 return &conn{p2, p1}, nil
86         }
87 }
88
89 type pipe struct {
90         mu sync.Mutex
91
92         // buf contains the data in the pipe.  It is a ring buffer of fixed capacity,
93         // with r and w pointing to the offset to read and write, respsectively.
94         //
95         // Data is read between [r, w) and written to [w, r), wrapping around the end
96         // of the slice if necessary.
97         //
98         // The buffer is empty if r == len(buf), otherwise if r == w, it is full.
99         //
100         // w and r are always in the range [0, cap(buf)) and [0, len(buf)].
101         buf  []byte
102         w, r int
103
104         wwait  sync.Cond
105         rwait  sync.Cond
106         closed bool
107 }
108
109 func newPipe(sz int) *pipe {
110         p := &pipe{buf: make([]byte, 0, sz)}
111         p.wwait.L = &p.mu
112         p.rwait.L = &p.mu
113         return p
114 }
115
116 func (p *pipe) empty() bool {
117         return p.r == len(p.buf)
118 }
119
120 func (p *pipe) full() bool {
121         return p.r < len(p.buf) && p.r == p.w
122 }
123
124 func (p *pipe) Read(b []byte) (n int, err error) {
125         p.mu.Lock()
126         defer p.mu.Unlock()
127         // Block until p has data.
128         for {
129                 if p.closed {
130                         return 0, io.ErrClosedPipe
131                 }
132                 if !p.empty() {
133                         break
134                 }
135                 p.rwait.Wait()
136         }
137         wasFull := p.full()
138
139         n = copy(b, p.buf[p.r:len(p.buf)])
140         p.r += n
141         if p.r == cap(p.buf) {
142                 p.r = 0
143                 p.buf = p.buf[:p.w]
144         }
145
146         // Signal a blocked writer, if any
147         if wasFull {
148                 p.wwait.Signal()
149         }
150
151         return n, nil
152 }
153
154 func (p *pipe) Write(b []byte) (n int, err error) {
155         p.mu.Lock()
156         defer p.mu.Unlock()
157         if p.closed {
158                 return 0, io.ErrClosedPipe
159         }
160         for len(b) > 0 {
161                 // Block until p is not full.
162                 for {
163                         if p.closed {
164                                 return 0, io.ErrClosedPipe
165                         }
166                         if !p.full() {
167                                 break
168                         }
169                         p.wwait.Wait()
170                 }
171                 wasEmpty := p.empty()
172
173                 end := cap(p.buf)
174                 if p.w < p.r {
175                         end = p.r
176                 }
177                 x := copy(p.buf[p.w:end], b)
178                 b = b[x:]
179                 n += x
180                 p.w += x
181                 if p.w > len(p.buf) {
182                         p.buf = p.buf[:p.w]
183                 }
184                 if p.w == cap(p.buf) {
185                         p.w = 0
186                 }
187
188                 // Signal a blocked reader, if any.
189                 if wasEmpty {
190                         p.rwait.Signal()
191                 }
192         }
193         return n, nil
194 }
195
196 func (p *pipe) Close() error {
197         p.mu.Lock()
198         defer p.mu.Unlock()
199         p.closed = true
200         // Signal all blocked readers and writers to return an error.
201         p.rwait.Broadcast()
202         p.wwait.Broadcast()
203         return nil
204 }
205
206 type conn struct {
207         io.ReadCloser
208         io.WriteCloser
209 }
210
211 func (c *conn) Close() error {
212         err1 := c.ReadCloser.Close()
213         err2 := c.WriteCloser.Close()
214         if err1 != nil {
215                 return err1
216         }
217         return err2
218 }
219
220 func (*conn) LocalAddr() net.Addr                  { return addr{} }
221 func (*conn) RemoteAddr() net.Addr                 { return addr{} }
222 func (c *conn) SetDeadline(t time.Time) error      { return fmt.Errorf("unsupported") }
223 func (c *conn) SetReadDeadline(t time.Time) error  { return fmt.Errorf("unsupported") }
224 func (c *conn) SetWriteDeadline(t time.Time) error { return fmt.Errorf("unsupported") }
225
226 type addr struct{}
227
228 func (addr) Network() string { return "bufconn" }
229 func (addr) String() string  { return "bufconn" }