OSDN Git Service

new repo
[bytom/vapor.git] / vendor / google.golang.org / grpc / benchmark / latency / latency.go
1 /*
2  *
3  * Copyright 2017 gRPC authors.
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  *     http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  *
17  */
18
19 // Package latency provides wrappers for net.Conn, net.Listener, and
20 // net.Dialers, designed to interoperate to inject real-world latency into
21 // network connections.
22 package latency
23
24 import (
25         "bytes"
26         "encoding/binary"
27         "fmt"
28         "io"
29         "net"
30         "time"
31
32         "golang.org/x/net/context"
33 )
34
35 // Dialer is a function matching the signature of net.Dial.
36 type Dialer func(network, address string) (net.Conn, error)
37
38 // TimeoutDialer is a function matching the signature of net.DialTimeout.
39 type TimeoutDialer func(network, address string, timeout time.Duration) (net.Conn, error)
40
41 // ContextDialer is a function matching the signature of
42 // net.Dialer.DialContext.
43 type ContextDialer func(ctx context.Context, network, address string) (net.Conn, error)
44
45 // Network represents a network with the given bandwidth, latency, and MTU
46 // (Maximum Transmission Unit) configuration, and can produce wrappers of
47 // net.Listeners, net.Conn, and various forms of dialing functions.  The
48 // Listeners and Dialers/Conns on both sides of connections must come from this
49 // package, but need not be created from the same Network.  Latency is computed
50 // when sending (in Write), and is injected when receiving (in Read).  This
51 // allows senders' Write calls to be non-blocking, as in real-world
52 // applications.
53 //
54 // Note: Latency is injected by the sender specifying the absolute time data
55 // should be available, and the reader delaying until that time arrives to
56 // provide the data.  This package attempts to counter-act the effects of clock
57 // drift and existing network latency by measuring the delay between the
58 // sender's transmission time and the receiver's reception time during startup.
59 // No attempt is made to measure the existing bandwidth of the connection.
60 type Network struct {
61         Kbps    int           // Kilobits per second; if non-positive, infinite
62         Latency time.Duration // One-way latency (sending); if non-positive, no delay
63         MTU     int           // Bytes per packet; if non-positive, infinite
64 }
65
66 var (
67         //Local simulates local network.
68         Local = Network{0, 0, 0}
69         //LAN simulates local area network network.
70         LAN = Network{100 * 1024, 2 * time.Millisecond, 1500}
71         //WAN simulates wide area network.
72         WAN = Network{20 * 1024, 30 * time.Millisecond, 1500}
73         //Longhaul simulates bad network.
74         Longhaul = Network{1000 * 1024, 200 * time.Millisecond, 9000}
75 )
76
77 // Conn returns a net.Conn that wraps c and injects n's latency into that
78 // connection.  This function also imposes latency for connection creation.
79 // If n's Latency is lower than the measured latency in c, an error is
80 // returned.
81 func (n *Network) Conn(c net.Conn) (net.Conn, error) {
82         start := now()
83         nc := &conn{Conn: c, network: n, readBuf: new(bytes.Buffer)}
84         if err := nc.sync(); err != nil {
85                 return nil, err
86         }
87         sleep(start.Add(nc.delay).Sub(now()))
88         return nc, nil
89 }
90
91 type conn struct {
92         net.Conn
93         network *Network
94
95         readBuf     *bytes.Buffer // one packet worth of data received
96         lastSendEnd time.Time     // time the previous Write should be fully on the wire
97         delay       time.Duration // desired latency - measured latency
98 }
99
100 // header is sent before all data transmitted by the application.
101 type header struct {
102         ReadTime int64 // Time the reader is allowed to read this packet (UnixNano)
103         Sz       int32 // Size of the data in the packet
104 }
105
106 func (c *conn) Write(p []byte) (n int, err error) {
107         tNow := now()
108         if c.lastSendEnd.Before(tNow) {
109                 c.lastSendEnd = tNow
110         }
111         for len(p) > 0 {
112                 pkt := p
113                 if c.network.MTU > 0 && len(pkt) > c.network.MTU {
114                         pkt = pkt[:c.network.MTU]
115                         p = p[c.network.MTU:]
116                 } else {
117                         p = nil
118                 }
119                 if c.network.Kbps > 0 {
120                         if congestion := c.lastSendEnd.Sub(tNow) - c.delay; congestion > 0 {
121                                 // The network is full; sleep until this packet can be sent.
122                                 sleep(congestion)
123                                 tNow = tNow.Add(congestion)
124                         }
125                 }
126                 c.lastSendEnd = c.lastSendEnd.Add(c.network.pktTime(len(pkt)))
127                 hdr := header{ReadTime: c.lastSendEnd.Add(c.delay).UnixNano(), Sz: int32(len(pkt))}
128                 if err := binary.Write(c.Conn, binary.BigEndian, hdr); err != nil {
129                         return n, err
130                 }
131                 x, err := c.Conn.Write(pkt)
132                 n += x
133                 if err != nil {
134                         return n, err
135                 }
136         }
137         return n, nil
138 }
139
140 func (c *conn) Read(p []byte) (n int, err error) {
141         if c.readBuf.Len() == 0 {
142                 var hdr header
143                 if err := binary.Read(c.Conn, binary.BigEndian, &hdr); err != nil {
144                         return 0, err
145                 }
146                 defer func() { sleep(time.Unix(0, hdr.ReadTime).Sub(now())) }()
147
148                 if _, err := io.CopyN(c.readBuf, c.Conn, int64(hdr.Sz)); err != nil {
149                         return 0, err
150                 }
151         }
152         // Read from readBuf.
153         return c.readBuf.Read(p)
154 }
155
156 // sync does a handshake and then measures the latency on the network in
157 // coordination with the other side.
158 func (c *conn) sync() error {
159         const (
160                 pingMsg  = "syncPing"
161                 warmup   = 10               // minimum number of iterations to measure latency
162                 giveUp   = 50               // maximum number of iterations to measure latency
163                 accuracy = time.Millisecond // req'd accuracy to stop early
164                 goodRun  = 3                // stop early if latency within accuracy this many times
165         )
166
167         type syncMsg struct {
168                 SendT int64 // Time sent.  If zero, stop.
169                 RecvT int64 // Time received.  If zero, fill in and respond.
170         }
171
172         // A trivial handshake
173         if err := binary.Write(c.Conn, binary.BigEndian, []byte(pingMsg)); err != nil {
174                 return err
175         }
176         var ping [8]byte
177         if err := binary.Read(c.Conn, binary.BigEndian, &ping); err != nil {
178                 return err
179         } else if string(ping[:]) != pingMsg {
180                 return fmt.Errorf("malformed handshake message: %v (want %q)", ping, pingMsg)
181         }
182
183         // Both sides are alive and syncing.  Calculate network delay / clock skew.
184         att := 0
185         good := 0
186         var latency time.Duration
187         localDone, remoteDone := false, false
188         send := true
189         for !localDone || !remoteDone {
190                 if send {
191                         if err := binary.Write(c.Conn, binary.BigEndian, syncMsg{SendT: now().UnixNano()}); err != nil {
192                                 return err
193                         }
194                         att++
195                         send = false
196                 }
197
198                 // Block until we get a syncMsg
199                 m := syncMsg{}
200                 if err := binary.Read(c.Conn, binary.BigEndian, &m); err != nil {
201                         return err
202                 }
203
204                 if m.RecvT == 0 {
205                         // Message initiated from other side.
206                         if m.SendT == 0 {
207                                 remoteDone = true
208                                 continue
209                         }
210                         // Send response.
211                         m.RecvT = now().UnixNano()
212                         if err := binary.Write(c.Conn, binary.BigEndian, m); err != nil {
213                                 return err
214                         }
215                         continue
216                 }
217
218                 lag := time.Duration(m.RecvT - m.SendT)
219                 latency += lag
220                 avgLatency := latency / time.Duration(att)
221                 if e := lag - avgLatency; e > -accuracy && e < accuracy {
222                         good++
223                 } else {
224                         good = 0
225                 }
226                 if att < giveUp && (att < warmup || good < goodRun) {
227                         send = true
228                         continue
229                 }
230                 localDone = true
231                 latency = avgLatency
232                 // Tell the other side we're done.
233                 if err := binary.Write(c.Conn, binary.BigEndian, syncMsg{}); err != nil {
234                         return err
235                 }
236         }
237         if c.network.Latency <= 0 {
238                 return nil
239         }
240         c.delay = c.network.Latency - latency
241         if c.delay < 0 {
242                 return fmt.Errorf("measured network latency (%v) higher than desired latency (%v)", latency, c.network.Latency)
243         }
244         return nil
245 }
246
247 // Listener returns a net.Listener that wraps l and injects n's latency in its
248 // connections.
249 func (n *Network) Listener(l net.Listener) net.Listener {
250         return &listener{Listener: l, network: n}
251 }
252
253 type listener struct {
254         net.Listener
255         network *Network
256 }
257
258 func (l *listener) Accept() (net.Conn, error) {
259         c, err := l.Listener.Accept()
260         if err != nil {
261                 return nil, err
262         }
263         return l.network.Conn(c)
264 }
265
266 // Dialer returns a Dialer that wraps d and injects n's latency in its
267 // connections.  n's Latency is also injected to the connection's creation.
268 func (n *Network) Dialer(d Dialer) Dialer {
269         return func(network, address string) (net.Conn, error) {
270                 conn, err := d(network, address)
271                 if err != nil {
272                         return nil, err
273                 }
274                 return n.Conn(conn)
275         }
276 }
277
278 // TimeoutDialer returns a TimeoutDialer that wraps d and injects n's latency
279 // in its connections.  n's Latency is also injected to the connection's
280 // creation.
281 func (n *Network) TimeoutDialer(d TimeoutDialer) TimeoutDialer {
282         return func(network, address string, timeout time.Duration) (net.Conn, error) {
283                 conn, err := d(network, address, timeout)
284                 if err != nil {
285                         return nil, err
286                 }
287                 return n.Conn(conn)
288         }
289 }
290
291 // ContextDialer returns a ContextDialer that wraps d and injects n's latency
292 // in its connections.  n's Latency is also injected to the connection's
293 // creation.
294 func (n *Network) ContextDialer(d ContextDialer) ContextDialer {
295         return func(ctx context.Context, network, address string) (net.Conn, error) {
296                 conn, err := d(ctx, network, address)
297                 if err != nil {
298                         return nil, err
299                 }
300                 return n.Conn(conn)
301         }
302 }
303
304 // pktTime returns the time it takes to transmit one packet of data of size b
305 // in bytes.
306 func (n *Network) pktTime(b int) time.Duration {
307         if n.Kbps <= 0 {
308                 return time.Duration(0)
309         }
310         return time.Duration(b) * time.Second / time.Duration(n.Kbps*(1024/8))
311 }
312
313 // Wrappers for testing
314
315 var now = time.Now
316 var sleep = time.Sleep