3 * Copyright 2017 gRPC authors.
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
9 * http://www.apache.org/licenses/LICENSE-2.0
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
19 // Package latency provides wrappers for net.Conn, net.Listener, and
20 // net.Dialers, designed to interoperate to inject real-world latency into
21 // network connections.
32 "golang.org/x/net/context"
35 // Dialer is a function matching the signature of net.Dial.
36 type Dialer func(network, address string) (net.Conn, error)
38 // TimeoutDialer is a function matching the signature of net.DialTimeout.
39 type TimeoutDialer func(network, address string, timeout time.Duration) (net.Conn, error)
41 // ContextDialer is a function matching the signature of
42 // net.Dialer.DialContext.
43 type ContextDialer func(ctx context.Context, network, address string) (net.Conn, error)
45 // Network represents a network with the given bandwidth, latency, and MTU
46 // (Maximum Transmission Unit) configuration, and can produce wrappers of
47 // net.Listeners, net.Conn, and various forms of dialing functions. The
48 // Listeners and Dialers/Conns on both sides of connections must come from this
49 // package, but need not be created from the same Network. Latency is computed
50 // when sending (in Write), and is injected when receiving (in Read). This
51 // allows senders' Write calls to be non-blocking, as in real-world
54 // Note: Latency is injected by the sender specifying the absolute time data
55 // should be available, and the reader delaying until that time arrives to
56 // provide the data. This package attempts to counter-act the effects of clock
57 // drift and existing network latency by measuring the delay between the
58 // sender's transmission time and the receiver's reception time during startup.
59 // No attempt is made to measure the existing bandwidth of the connection.
61 Kbps int // Kilobits per second; if non-positive, infinite
62 Latency time.Duration // One-way latency (sending); if non-positive, no delay
63 MTU int // Bytes per packet; if non-positive, infinite
67 //Local simulates local network.
68 Local = Network{0, 0, 0}
69 //LAN simulates local area network network.
70 LAN = Network{100 * 1024, 2 * time.Millisecond, 1500}
71 //WAN simulates wide area network.
72 WAN = Network{20 * 1024, 30 * time.Millisecond, 1500}
73 //Longhaul simulates bad network.
74 Longhaul = Network{1000 * 1024, 200 * time.Millisecond, 9000}
77 // Conn returns a net.Conn that wraps c and injects n's latency into that
78 // connection. This function also imposes latency for connection creation.
79 // If n's Latency is lower than the measured latency in c, an error is
81 func (n *Network) Conn(c net.Conn) (net.Conn, error) {
83 nc := &conn{Conn: c, network: n, readBuf: new(bytes.Buffer)}
84 if err := nc.sync(); err != nil {
87 sleep(start.Add(nc.delay).Sub(now()))
95 readBuf *bytes.Buffer // one packet worth of data received
96 lastSendEnd time.Time // time the previous Write should be fully on the wire
97 delay time.Duration // desired latency - measured latency
100 // header is sent before all data transmitted by the application.
102 ReadTime int64 // Time the reader is allowed to read this packet (UnixNano)
103 Sz int32 // Size of the data in the packet
106 func (c *conn) Write(p []byte) (n int, err error) {
108 if c.lastSendEnd.Before(tNow) {
113 if c.network.MTU > 0 && len(pkt) > c.network.MTU {
114 pkt = pkt[:c.network.MTU]
115 p = p[c.network.MTU:]
119 if c.network.Kbps > 0 {
120 if congestion := c.lastSendEnd.Sub(tNow) - c.delay; congestion > 0 {
121 // The network is full; sleep until this packet can be sent.
123 tNow = tNow.Add(congestion)
126 c.lastSendEnd = c.lastSendEnd.Add(c.network.pktTime(len(pkt)))
127 hdr := header{ReadTime: c.lastSendEnd.Add(c.delay).UnixNano(), Sz: int32(len(pkt))}
128 if err := binary.Write(c.Conn, binary.BigEndian, hdr); err != nil {
131 x, err := c.Conn.Write(pkt)
140 func (c *conn) Read(p []byte) (n int, err error) {
141 if c.readBuf.Len() == 0 {
143 if err := binary.Read(c.Conn, binary.BigEndian, &hdr); err != nil {
146 defer func() { sleep(time.Unix(0, hdr.ReadTime).Sub(now())) }()
148 if _, err := io.CopyN(c.readBuf, c.Conn, int64(hdr.Sz)); err != nil {
152 // Read from readBuf.
153 return c.readBuf.Read(p)
156 // sync does a handshake and then measures the latency on the network in
157 // coordination with the other side.
158 func (c *conn) sync() error {
161 warmup = 10 // minimum number of iterations to measure latency
162 giveUp = 50 // maximum number of iterations to measure latency
163 accuracy = time.Millisecond // req'd accuracy to stop early
164 goodRun = 3 // stop early if latency within accuracy this many times
167 type syncMsg struct {
168 SendT int64 // Time sent. If zero, stop.
169 RecvT int64 // Time received. If zero, fill in and respond.
172 // A trivial handshake
173 if err := binary.Write(c.Conn, binary.BigEndian, []byte(pingMsg)); err != nil {
177 if err := binary.Read(c.Conn, binary.BigEndian, &ping); err != nil {
179 } else if string(ping[:]) != pingMsg {
180 return fmt.Errorf("malformed handshake message: %v (want %q)", ping, pingMsg)
183 // Both sides are alive and syncing. Calculate network delay / clock skew.
186 var latency time.Duration
187 localDone, remoteDone := false, false
189 for !localDone || !remoteDone {
191 if err := binary.Write(c.Conn, binary.BigEndian, syncMsg{SendT: now().UnixNano()}); err != nil {
198 // Block until we get a syncMsg
200 if err := binary.Read(c.Conn, binary.BigEndian, &m); err != nil {
205 // Message initiated from other side.
211 m.RecvT = now().UnixNano()
212 if err := binary.Write(c.Conn, binary.BigEndian, m); err != nil {
218 lag := time.Duration(m.RecvT - m.SendT)
220 avgLatency := latency / time.Duration(att)
221 if e := lag - avgLatency; e > -accuracy && e < accuracy {
226 if att < giveUp && (att < warmup || good < goodRun) {
232 // Tell the other side we're done.
233 if err := binary.Write(c.Conn, binary.BigEndian, syncMsg{}); err != nil {
237 if c.network.Latency <= 0 {
240 c.delay = c.network.Latency - latency
242 return fmt.Errorf("measured network latency (%v) higher than desired latency (%v)", latency, c.network.Latency)
247 // Listener returns a net.Listener that wraps l and injects n's latency in its
249 func (n *Network) Listener(l net.Listener) net.Listener {
250 return &listener{Listener: l, network: n}
253 type listener struct {
258 func (l *listener) Accept() (net.Conn, error) {
259 c, err := l.Listener.Accept()
263 return l.network.Conn(c)
266 // Dialer returns a Dialer that wraps d and injects n's latency in its
267 // connections. n's Latency is also injected to the connection's creation.
268 func (n *Network) Dialer(d Dialer) Dialer {
269 return func(network, address string) (net.Conn, error) {
270 conn, err := d(network, address)
278 // TimeoutDialer returns a TimeoutDialer that wraps d and injects n's latency
279 // in its connections. n's Latency is also injected to the connection's
281 func (n *Network) TimeoutDialer(d TimeoutDialer) TimeoutDialer {
282 return func(network, address string, timeout time.Duration) (net.Conn, error) {
283 conn, err := d(network, address, timeout)
291 // ContextDialer returns a ContextDialer that wraps d and injects n's latency
292 // in its connections. n's Latency is also injected to the connection's
294 func (n *Network) ContextDialer(d ContextDialer) ContextDialer {
295 return func(ctx context.Context, network, address string) (net.Conn, error) {
296 conn, err := d(ctx, network, address)
304 // pktTime returns the time it takes to transmit one packet of data of size b
306 func (n *Network) pktTime(b int) time.Duration {
308 return time.Duration(0)
310 return time.Duration(b) * time.Second / time.Duration(n.Kbps*(1024/8))
313 // Wrappers for testing
316 var sleep = time.Sleep