OSDN Git Service

new repo
[bytom/vapor.git] / vendor / google.golang.org / grpc / benchmark / benchmark.go
1 /*
2  *
3  * Copyright 2014 gRPC authors.
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  *     http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  *
17  */
18
19 //go:generate protoc -I grpc_testing --go_out=plugins=grpc:grpc_testing grpc_testing/control.proto grpc_testing/messages.proto grpc_testing/payloads.proto grpc_testing/services.proto grpc_testing/stats.proto
20
21 /*
22 Package benchmark implements the building blocks to setup end-to-end gRPC benchmarks.
23 */
24 package benchmark
25
26 import (
27         "fmt"
28         "io"
29         "net"
30         "sync"
31         "testing"
32         "time"
33
34         "golang.org/x/net/context"
35         "google.golang.org/grpc"
36         testpb "google.golang.org/grpc/benchmark/grpc_testing"
37         "google.golang.org/grpc/benchmark/latency"
38         "google.golang.org/grpc/benchmark/stats"
39         "google.golang.org/grpc/grpclog"
40 )
41
42 // AddOne add 1 to the features slice
43 func AddOne(features []int, featuresMaxPosition []int) {
44         for i := len(features) - 1; i >= 0; i-- {
45                 features[i] = (features[i] + 1)
46                 if features[i]/featuresMaxPosition[i] == 0 {
47                         break
48                 }
49                 features[i] = features[i] % featuresMaxPosition[i]
50         }
51 }
52
53 // Allows reuse of the same testpb.Payload object.
54 func setPayload(p *testpb.Payload, t testpb.PayloadType, size int) {
55         if size < 0 {
56                 grpclog.Fatalf("Requested a response with invalid length %d", size)
57         }
58         body := make([]byte, size)
59         switch t {
60         case testpb.PayloadType_COMPRESSABLE:
61         case testpb.PayloadType_UNCOMPRESSABLE:
62                 grpclog.Fatalf("PayloadType UNCOMPRESSABLE is not supported")
63         default:
64                 grpclog.Fatalf("Unsupported payload type: %d", t)
65         }
66         p.Type = t
67         p.Body = body
68         return
69 }
70
71 func newPayload(t testpb.PayloadType, size int) *testpb.Payload {
72         p := new(testpb.Payload)
73         setPayload(p, t, size)
74         return p
75 }
76
77 type testServer struct {
78 }
79
80 func (s *testServer) UnaryCall(ctx context.Context, in *testpb.SimpleRequest) (*testpb.SimpleResponse, error) {
81         return &testpb.SimpleResponse{
82                 Payload: newPayload(in.ResponseType, int(in.ResponseSize)),
83         }, nil
84 }
85
86 func (s *testServer) StreamingCall(stream testpb.BenchmarkService_StreamingCallServer) error {
87         response := &testpb.SimpleResponse{
88                 Payload: new(testpb.Payload),
89         }
90         in := new(testpb.SimpleRequest)
91         for {
92                 // use ServerStream directly to reuse the same testpb.SimpleRequest object
93                 err := stream.(grpc.ServerStream).RecvMsg(in)
94                 if err == io.EOF {
95                         // read done.
96                         return nil
97                 }
98                 if err != nil {
99                         return err
100                 }
101                 setPayload(response.Payload, in.ResponseType, int(in.ResponseSize))
102                 if err := stream.Send(response); err != nil {
103                         return err
104                 }
105         }
106 }
107
108 // byteBufServer is a gRPC server that sends and receives byte buffer.
109 // The purpose is to benchmark the gRPC performance without protobuf serialization/deserialization overhead.
110 type byteBufServer struct {
111         respSize int32
112 }
113
114 // UnaryCall is an empty function and is not used for benchmark.
115 // If bytebuf UnaryCall benchmark is needed later, the function body needs to be updated.
116 func (s *byteBufServer) UnaryCall(ctx context.Context, in *testpb.SimpleRequest) (*testpb.SimpleResponse, error) {
117         return &testpb.SimpleResponse{}, nil
118 }
119
120 func (s *byteBufServer) StreamingCall(stream testpb.BenchmarkService_StreamingCallServer) error {
121         for {
122                 var in []byte
123                 err := stream.(grpc.ServerStream).RecvMsg(&in)
124                 if err == io.EOF {
125                         return nil
126                 }
127                 if err != nil {
128                         return err
129                 }
130                 out := make([]byte, s.respSize)
131                 if err := stream.(grpc.ServerStream).SendMsg(&out); err != nil {
132                         return err
133                 }
134         }
135 }
136
137 // ServerInfo contains the information to create a gRPC benchmark server.
138 type ServerInfo struct {
139         // Addr is the address of the server.
140         Addr string
141
142         // Type is the type of the server.
143         // It should be "protobuf" or "bytebuf".
144         Type string
145
146         // Metadata is an optional configuration.
147         // For "protobuf", it's ignored.
148         // For "bytebuf", it should be an int representing response size.
149         Metadata interface{}
150
151         // Network can simulate latency
152         Network *latency.Network
153 }
154
155 // StartServer starts a gRPC server serving a benchmark service according to info.
156 // It returns its listen address and a function to stop the server.
157 func StartServer(info ServerInfo, opts ...grpc.ServerOption) (string, func()) {
158         lis, err := net.Listen("tcp", info.Addr)
159         if err != nil {
160                 grpclog.Fatalf("Failed to listen: %v", err)
161         }
162         nw := info.Network
163         if nw != nil {
164                 lis = nw.Listener(lis)
165         }
166         opts = append(opts, grpc.WriteBufferSize(128*1024))
167         opts = append(opts, grpc.ReadBufferSize(128*1024))
168         s := grpc.NewServer(opts...)
169         switch info.Type {
170         case "protobuf":
171                 testpb.RegisterBenchmarkServiceServer(s, &testServer{})
172         case "bytebuf":
173                 respSize, ok := info.Metadata.(int32)
174                 if !ok {
175                         grpclog.Fatalf("failed to StartServer, invalid metadata: %v, for Type: %v", info.Metadata, info.Type)
176                 }
177                 testpb.RegisterBenchmarkServiceServer(s, &byteBufServer{respSize: respSize})
178         default:
179                 grpclog.Fatalf("failed to StartServer, unknown Type: %v", info.Type)
180         }
181         go s.Serve(lis)
182         return lis.Addr().String(), func() {
183                 s.Stop()
184         }
185 }
186
187 // DoUnaryCall performs an unary RPC with given stub and request and response sizes.
188 func DoUnaryCall(tc testpb.BenchmarkServiceClient, reqSize, respSize int) error {
189         pl := newPayload(testpb.PayloadType_COMPRESSABLE, reqSize)
190         req := &testpb.SimpleRequest{
191                 ResponseType: pl.Type,
192                 ResponseSize: int32(respSize),
193                 Payload:      pl,
194         }
195         if _, err := tc.UnaryCall(context.Background(), req); err != nil {
196                 return fmt.Errorf("/BenchmarkService/UnaryCall(_, _) = _, %v, want _, <nil>", err)
197         }
198         return nil
199 }
200
201 // DoStreamingRoundTrip performs a round trip for a single streaming rpc.
202 func DoStreamingRoundTrip(stream testpb.BenchmarkService_StreamingCallClient, reqSize, respSize int) error {
203         pl := newPayload(testpb.PayloadType_COMPRESSABLE, reqSize)
204         req := &testpb.SimpleRequest{
205                 ResponseType: pl.Type,
206                 ResponseSize: int32(respSize),
207                 Payload:      pl,
208         }
209         if err := stream.Send(req); err != nil {
210                 return fmt.Errorf("/BenchmarkService/StreamingCall.Send(_) = %v, want <nil>", err)
211         }
212         if _, err := stream.Recv(); err != nil {
213                 // EOF is a valid error here.
214                 if err == io.EOF {
215                         return nil
216                 }
217                 return fmt.Errorf("/BenchmarkService/StreamingCall.Recv(_) = %v, want <nil>", err)
218         }
219         return nil
220 }
221
222 // DoByteBufStreamingRoundTrip performs a round trip for a single streaming rpc, using a custom codec for byte buffer.
223 func DoByteBufStreamingRoundTrip(stream testpb.BenchmarkService_StreamingCallClient, reqSize, respSize int) error {
224         out := make([]byte, reqSize)
225         if err := stream.(grpc.ClientStream).SendMsg(&out); err != nil {
226                 return fmt.Errorf("/BenchmarkService/StreamingCall.(ClientStream).SendMsg(_) = %v, want <nil>", err)
227         }
228         var in []byte
229         if err := stream.(grpc.ClientStream).RecvMsg(&in); err != nil {
230                 // EOF is a valid error here.
231                 if err == io.EOF {
232                         return nil
233                 }
234                 return fmt.Errorf("/BenchmarkService/StreamingCall.(ClientStream).RecvMsg(_) = %v, want <nil>", err)
235         }
236         return nil
237 }
238
239 // NewClientConn creates a gRPC client connection to addr.
240 func NewClientConn(addr string, opts ...grpc.DialOption) *grpc.ClientConn {
241         opts = append(opts, grpc.WithWriteBufferSize(128*1024))
242         opts = append(opts, grpc.WithReadBufferSize(128*1024))
243         conn, err := grpc.Dial(addr, opts...)
244         if err != nil {
245                 grpclog.Fatalf("NewClientConn(%q) failed to create a ClientConn %v", addr, err)
246         }
247         return conn
248 }
249
250 func runUnary(b *testing.B, benchFeatures stats.Features) {
251         s := stats.AddStats(b, 38)
252         nw := &latency.Network{Kbps: benchFeatures.Kbps, Latency: benchFeatures.Latency, MTU: benchFeatures.Mtu}
253         target, stopper := StartServer(ServerInfo{Addr: "localhost:0", Type: "protobuf", Network: nw}, grpc.MaxConcurrentStreams(uint32(benchFeatures.MaxConcurrentCalls+1)))
254         defer stopper()
255         conn := NewClientConn(
256                 target, grpc.WithInsecure(),
257                 grpc.WithDialer(func(address string, timeout time.Duration) (net.Conn, error) {
258                         return nw.TimeoutDialer(net.DialTimeout)("tcp", address, timeout)
259                 }),
260         )
261         tc := testpb.NewBenchmarkServiceClient(conn)
262
263         // Warm up connection.
264         for i := 0; i < 10; i++ {
265                 unaryCaller(tc, benchFeatures.ReqSizeBytes, benchFeatures.RespSizeBytes)
266         }
267         ch := make(chan int, benchFeatures.MaxConcurrentCalls*4)
268         var (
269                 mu sync.Mutex
270                 wg sync.WaitGroup
271         )
272         wg.Add(benchFeatures.MaxConcurrentCalls)
273
274         // Distribute the b.N calls over maxConcurrentCalls workers.
275         for i := 0; i < benchFeatures.MaxConcurrentCalls; i++ {
276                 go func() {
277                         for range ch {
278                                 start := time.Now()
279                                 unaryCaller(tc, benchFeatures.ReqSizeBytes, benchFeatures.RespSizeBytes)
280                                 elapse := time.Since(start)
281                                 mu.Lock()
282                                 s.Add(elapse)
283                                 mu.Unlock()
284                         }
285                         wg.Done()
286                 }()
287         }
288         b.ResetTimer()
289         for i := 0; i < b.N; i++ {
290                 ch <- i
291         }
292         close(ch)
293         wg.Wait()
294         b.StopTimer()
295         conn.Close()
296 }
297
298 func runStream(b *testing.B, benchFeatures stats.Features) {
299         s := stats.AddStats(b, 38)
300         nw := &latency.Network{Kbps: benchFeatures.Kbps, Latency: benchFeatures.Latency, MTU: benchFeatures.Mtu}
301         target, stopper := StartServer(ServerInfo{Addr: "localhost:0", Type: "protobuf", Network: nw}, grpc.MaxConcurrentStreams(uint32(benchFeatures.MaxConcurrentCalls+1)))
302         defer stopper()
303         conn := NewClientConn(
304                 target, grpc.WithInsecure(),
305                 grpc.WithDialer(func(address string, timeout time.Duration) (net.Conn, error) {
306                         return nw.TimeoutDialer(net.DialTimeout)("tcp", address, timeout)
307                 }),
308         )
309         tc := testpb.NewBenchmarkServiceClient(conn)
310
311         // Warm up connection.
312         stream, err := tc.StreamingCall(context.Background())
313         if err != nil {
314                 b.Fatalf("%v.StreamingCall(_) = _, %v", tc, err)
315         }
316         for i := 0; i < 10; i++ {
317                 streamCaller(stream, benchFeatures.ReqSizeBytes, benchFeatures.RespSizeBytes)
318         }
319
320         ch := make(chan struct{}, benchFeatures.MaxConcurrentCalls*4)
321         var (
322                 mu sync.Mutex
323                 wg sync.WaitGroup
324         )
325         wg.Add(benchFeatures.MaxConcurrentCalls)
326
327         // Distribute the b.N calls over maxConcurrentCalls workers.
328         for i := 0; i < benchFeatures.MaxConcurrentCalls; i++ {
329                 stream, err := tc.StreamingCall(context.Background())
330                 if err != nil {
331                         b.Fatalf("%v.StreamingCall(_) = _, %v", tc, err)
332                 }
333                 go func() {
334                         for range ch {
335                                 start := time.Now()
336                                 streamCaller(stream, benchFeatures.ReqSizeBytes, benchFeatures.RespSizeBytes)
337                                 elapse := time.Since(start)
338                                 mu.Lock()
339                                 s.Add(elapse)
340                                 mu.Unlock()
341                         }
342                         wg.Done()
343                 }()
344         }
345         b.ResetTimer()
346         for i := 0; i < b.N; i++ {
347                 ch <- struct{}{}
348         }
349         close(ch)
350         wg.Wait()
351         b.StopTimer()
352         conn.Close()
353 }
354 func unaryCaller(client testpb.BenchmarkServiceClient, reqSize, respSize int) {
355         if err := DoUnaryCall(client, reqSize, respSize); err != nil {
356                 grpclog.Fatalf("DoUnaryCall failed: %v", err)
357         }
358 }
359
360 func streamCaller(stream testpb.BenchmarkService_StreamingCallClient, reqSize, respSize int) {
361         if err := DoStreamingRoundTrip(stream, reqSize, respSize); err != nil {
362                 grpclog.Fatalf("DoStreamingRoundTrip failed: %v", err)
363         }
364 }