3 * Copyright 2017 gRPC authors.
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
9 * http://www.apache.org/licenses/LICENSE-2.0
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
19 // Package bufconn provides a net.Conn implemented by a buffer and related
20 // dialing and listening functionality.
31 // Listener implements a net.Listener that creates local, buffered net.Conns
32 // via its Accept and Dial method.
33 type Listener struct {
40 var errClosed = fmt.Errorf("Closed")
42 // Listen returns a Listener that can only be contacted by its own Dialers and
43 // creates buffered connections between the two.
44 func Listen(sz int) *Listener {
45 return &Listener{sz: sz, ch: make(chan net.Conn), done: make(chan struct{})}
48 // Accept blocks until Dial is called, then returns a net.Conn for the server
49 // half of the connection.
50 func (l *Listener) Accept() (net.Conn, error) {
59 // Close stops the listener.
60 func (l *Listener) Close() error {
73 // Addr reports the address of the listener.
74 func (l *Listener) Addr() net.Addr { return addr{} }
76 // Dial creates an in-memory full-duplex network connection, unblocks Accept by
77 // providing it the server half of the connection, and returns the client half
79 func (l *Listener) Dial() (net.Conn, error) {
80 p1, p2 := newPipe(l.sz), newPipe(l.sz)
84 case l.ch <- &conn{p1, p2}:
85 return &conn{p2, p1}, nil
92 // buf contains the data in the pipe. It is a ring buffer of fixed capacity,
93 // with r and w pointing to the offset to read and write, respsectively.
95 // Data is read between [r, w) and written to [w, r), wrapping around the end
96 // of the slice if necessary.
98 // The buffer is empty if r == len(buf), otherwise if r == w, it is full.
100 // w and r are always in the range [0, cap(buf)) and [0, len(buf)].
109 func newPipe(sz int) *pipe {
110 p := &pipe{buf: make([]byte, 0, sz)}
116 func (p *pipe) empty() bool {
117 return p.r == len(p.buf)
120 func (p *pipe) full() bool {
121 return p.r < len(p.buf) && p.r == p.w
124 func (p *pipe) Read(b []byte) (n int, err error) {
127 // Block until p has data.
130 return 0, io.ErrClosedPipe
139 n = copy(b, p.buf[p.r:len(p.buf)])
141 if p.r == cap(p.buf) {
146 // Signal a blocked writer, if any
154 func (p *pipe) Write(b []byte) (n int, err error) {
158 return 0, io.ErrClosedPipe
161 // Block until p is not full.
164 return 0, io.ErrClosedPipe
171 wasEmpty := p.empty()
177 x := copy(p.buf[p.w:end], b)
181 if p.w > len(p.buf) {
184 if p.w == cap(p.buf) {
188 // Signal a blocked reader, if any.
196 func (p *pipe) Close() error {
200 // Signal all blocked readers and writers to return an error.
211 func (c *conn) Close() error {
212 err1 := c.ReadCloser.Close()
213 err2 := c.WriteCloser.Close()
220 func (*conn) LocalAddr() net.Addr { return addr{} }
221 func (*conn) RemoteAddr() net.Addr { return addr{} }
222 func (c *conn) SetDeadline(t time.Time) error { return fmt.Errorf("unsupported") }
223 func (c *conn) SetReadDeadline(t time.Time) error { return fmt.Errorf("unsupported") }
224 func (c *conn) SetWriteDeadline(t time.Time) error { return fmt.Errorf("unsupported") }
228 func (addr) Network() string { return "bufconn" }
229 func (addr) String() string { return "bufconn" }