3 * Copyright 2014 gRPC authors.
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
9 * http://www.apache.org/licenses/LICENSE-2.0
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
19 //go:generate protoc -I grpc_testing --go_out=plugins=grpc:grpc_testing grpc_testing/control.proto grpc_testing/messages.proto grpc_testing/payloads.proto grpc_testing/services.proto grpc_testing/stats.proto
22 Package benchmark implements the building blocks to setup end-to-end gRPC benchmarks.
34 "golang.org/x/net/context"
35 "google.golang.org/grpc"
36 testpb "google.golang.org/grpc/benchmark/grpc_testing"
37 "google.golang.org/grpc/benchmark/latency"
38 "google.golang.org/grpc/benchmark/stats"
39 "google.golang.org/grpc/grpclog"
42 // AddOne add 1 to the features slice
43 func AddOne(features []int, featuresMaxPosition []int) {
44 for i := len(features) - 1; i >= 0; i-- {
45 features[i] = (features[i] + 1)
46 if features[i]/featuresMaxPosition[i] == 0 {
49 features[i] = features[i] % featuresMaxPosition[i]
53 // Allows reuse of the same testpb.Payload object.
54 func setPayload(p *testpb.Payload, t testpb.PayloadType, size int) {
56 grpclog.Fatalf("Requested a response with invalid length %d", size)
58 body := make([]byte, size)
60 case testpb.PayloadType_COMPRESSABLE:
61 case testpb.PayloadType_UNCOMPRESSABLE:
62 grpclog.Fatalf("PayloadType UNCOMPRESSABLE is not supported")
64 grpclog.Fatalf("Unsupported payload type: %d", t)
71 func newPayload(t testpb.PayloadType, size int) *testpb.Payload {
72 p := new(testpb.Payload)
73 setPayload(p, t, size)
77 type testServer struct {
80 func (s *testServer) UnaryCall(ctx context.Context, in *testpb.SimpleRequest) (*testpb.SimpleResponse, error) {
81 return &testpb.SimpleResponse{
82 Payload: newPayload(in.ResponseType, int(in.ResponseSize)),
86 func (s *testServer) StreamingCall(stream testpb.BenchmarkService_StreamingCallServer) error {
87 response := &testpb.SimpleResponse{
88 Payload: new(testpb.Payload),
90 in := new(testpb.SimpleRequest)
92 // use ServerStream directly to reuse the same testpb.SimpleRequest object
93 err := stream.(grpc.ServerStream).RecvMsg(in)
101 setPayload(response.Payload, in.ResponseType, int(in.ResponseSize))
102 if err := stream.Send(response); err != nil {
108 // byteBufServer is a gRPC server that sends and receives byte buffer.
109 // The purpose is to benchmark the gRPC performance without protobuf serialization/deserialization overhead.
110 type byteBufServer struct {
114 // UnaryCall is an empty function and is not used for benchmark.
115 // If bytebuf UnaryCall benchmark is needed later, the function body needs to be updated.
116 func (s *byteBufServer) UnaryCall(ctx context.Context, in *testpb.SimpleRequest) (*testpb.SimpleResponse, error) {
117 return &testpb.SimpleResponse{}, nil
120 func (s *byteBufServer) StreamingCall(stream testpb.BenchmarkService_StreamingCallServer) error {
123 err := stream.(grpc.ServerStream).RecvMsg(&in)
130 out := make([]byte, s.respSize)
131 if err := stream.(grpc.ServerStream).SendMsg(&out); err != nil {
137 // ServerInfo contains the information to create a gRPC benchmark server.
138 type ServerInfo struct {
139 // Addr is the address of the server.
142 // Type is the type of the server.
143 // It should be "protobuf" or "bytebuf".
146 // Metadata is an optional configuration.
147 // For "protobuf", it's ignored.
148 // For "bytebuf", it should be an int representing response size.
151 // Network can simulate latency
152 Network *latency.Network
155 // StartServer starts a gRPC server serving a benchmark service according to info.
156 // It returns its listen address and a function to stop the server.
157 func StartServer(info ServerInfo, opts ...grpc.ServerOption) (string, func()) {
158 lis, err := net.Listen("tcp", info.Addr)
160 grpclog.Fatalf("Failed to listen: %v", err)
164 lis = nw.Listener(lis)
166 opts = append(opts, grpc.WriteBufferSize(128*1024))
167 opts = append(opts, grpc.ReadBufferSize(128*1024))
168 s := grpc.NewServer(opts...)
171 testpb.RegisterBenchmarkServiceServer(s, &testServer{})
173 respSize, ok := info.Metadata.(int32)
175 grpclog.Fatalf("failed to StartServer, invalid metadata: %v, for Type: %v", info.Metadata, info.Type)
177 testpb.RegisterBenchmarkServiceServer(s, &byteBufServer{respSize: respSize})
179 grpclog.Fatalf("failed to StartServer, unknown Type: %v", info.Type)
182 return lis.Addr().String(), func() {
187 // DoUnaryCall performs an unary RPC with given stub and request and response sizes.
188 func DoUnaryCall(tc testpb.BenchmarkServiceClient, reqSize, respSize int) error {
189 pl := newPayload(testpb.PayloadType_COMPRESSABLE, reqSize)
190 req := &testpb.SimpleRequest{
191 ResponseType: pl.Type,
192 ResponseSize: int32(respSize),
195 if _, err := tc.UnaryCall(context.Background(), req); err != nil {
196 return fmt.Errorf("/BenchmarkService/UnaryCall(_, _) = _, %v, want _, <nil>", err)
201 // DoStreamingRoundTrip performs a round trip for a single streaming rpc.
202 func DoStreamingRoundTrip(stream testpb.BenchmarkService_StreamingCallClient, reqSize, respSize int) error {
203 pl := newPayload(testpb.PayloadType_COMPRESSABLE, reqSize)
204 req := &testpb.SimpleRequest{
205 ResponseType: pl.Type,
206 ResponseSize: int32(respSize),
209 if err := stream.Send(req); err != nil {
210 return fmt.Errorf("/BenchmarkService/StreamingCall.Send(_) = %v, want <nil>", err)
212 if _, err := stream.Recv(); err != nil {
213 // EOF is a valid error here.
217 return fmt.Errorf("/BenchmarkService/StreamingCall.Recv(_) = %v, want <nil>", err)
222 // DoByteBufStreamingRoundTrip performs a round trip for a single streaming rpc, using a custom codec for byte buffer.
223 func DoByteBufStreamingRoundTrip(stream testpb.BenchmarkService_StreamingCallClient, reqSize, respSize int) error {
224 out := make([]byte, reqSize)
225 if err := stream.(grpc.ClientStream).SendMsg(&out); err != nil {
226 return fmt.Errorf("/BenchmarkService/StreamingCall.(ClientStream).SendMsg(_) = %v, want <nil>", err)
229 if err := stream.(grpc.ClientStream).RecvMsg(&in); err != nil {
230 // EOF is a valid error here.
234 return fmt.Errorf("/BenchmarkService/StreamingCall.(ClientStream).RecvMsg(_) = %v, want <nil>", err)
239 // NewClientConn creates a gRPC client connection to addr.
240 func NewClientConn(addr string, opts ...grpc.DialOption) *grpc.ClientConn {
241 opts = append(opts, grpc.WithWriteBufferSize(128*1024))
242 opts = append(opts, grpc.WithReadBufferSize(128*1024))
243 conn, err := grpc.Dial(addr, opts...)
245 grpclog.Fatalf("NewClientConn(%q) failed to create a ClientConn %v", addr, err)
250 func runUnary(b *testing.B, benchFeatures stats.Features) {
251 s := stats.AddStats(b, 38)
252 nw := &latency.Network{Kbps: benchFeatures.Kbps, Latency: benchFeatures.Latency, MTU: benchFeatures.Mtu}
253 target, stopper := StartServer(ServerInfo{Addr: "localhost:0", Type: "protobuf", Network: nw}, grpc.MaxConcurrentStreams(uint32(benchFeatures.MaxConcurrentCalls+1)))
255 conn := NewClientConn(
256 target, grpc.WithInsecure(),
257 grpc.WithDialer(func(address string, timeout time.Duration) (net.Conn, error) {
258 return nw.TimeoutDialer(net.DialTimeout)("tcp", address, timeout)
261 tc := testpb.NewBenchmarkServiceClient(conn)
263 // Warm up connection.
264 for i := 0; i < 10; i++ {
265 unaryCaller(tc, benchFeatures.ReqSizeBytes, benchFeatures.RespSizeBytes)
267 ch := make(chan int, benchFeatures.MaxConcurrentCalls*4)
272 wg.Add(benchFeatures.MaxConcurrentCalls)
274 // Distribute the b.N calls over maxConcurrentCalls workers.
275 for i := 0; i < benchFeatures.MaxConcurrentCalls; i++ {
279 unaryCaller(tc, benchFeatures.ReqSizeBytes, benchFeatures.RespSizeBytes)
280 elapse := time.Since(start)
289 for i := 0; i < b.N; i++ {
298 func runStream(b *testing.B, benchFeatures stats.Features) {
299 s := stats.AddStats(b, 38)
300 nw := &latency.Network{Kbps: benchFeatures.Kbps, Latency: benchFeatures.Latency, MTU: benchFeatures.Mtu}
301 target, stopper := StartServer(ServerInfo{Addr: "localhost:0", Type: "protobuf", Network: nw}, grpc.MaxConcurrentStreams(uint32(benchFeatures.MaxConcurrentCalls+1)))
303 conn := NewClientConn(
304 target, grpc.WithInsecure(),
305 grpc.WithDialer(func(address string, timeout time.Duration) (net.Conn, error) {
306 return nw.TimeoutDialer(net.DialTimeout)("tcp", address, timeout)
309 tc := testpb.NewBenchmarkServiceClient(conn)
311 // Warm up connection.
312 stream, err := tc.StreamingCall(context.Background())
314 b.Fatalf("%v.StreamingCall(_) = _, %v", tc, err)
316 for i := 0; i < 10; i++ {
317 streamCaller(stream, benchFeatures.ReqSizeBytes, benchFeatures.RespSizeBytes)
320 ch := make(chan struct{}, benchFeatures.MaxConcurrentCalls*4)
325 wg.Add(benchFeatures.MaxConcurrentCalls)
327 // Distribute the b.N calls over maxConcurrentCalls workers.
328 for i := 0; i < benchFeatures.MaxConcurrentCalls; i++ {
329 stream, err := tc.StreamingCall(context.Background())
331 b.Fatalf("%v.StreamingCall(_) = _, %v", tc, err)
336 streamCaller(stream, benchFeatures.ReqSizeBytes, benchFeatures.RespSizeBytes)
337 elapse := time.Since(start)
346 for i := 0; i < b.N; i++ {
354 func unaryCaller(client testpb.BenchmarkServiceClient, reqSize, respSize int) {
355 if err := DoUnaryCall(client, reqSize, respSize); err != nil {
356 grpclog.Fatalf("DoUnaryCall failed: %v", err)
360 func streamCaller(stream testpb.BenchmarkService_StreamingCallClient, reqSize, respSize int) {
361 if err := DoStreamingRoundTrip(stream, reqSize, respSize); err != nil {
362 grpclog.Fatalf("DoStreamingRoundTrip failed: %v", err)