3 * Copyright 2016 gRPC authors.
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
9 * http://www.apache.org/licenses/LICENSE-2.0
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
29 "golang.org/x/net/context"
30 "google.golang.org/grpc"
31 "google.golang.org/grpc/benchmark"
32 testpb "google.golang.org/grpc/benchmark/grpc_testing"
33 "google.golang.org/grpc/benchmark/stats"
34 "google.golang.org/grpc/codes"
35 "google.golang.org/grpc/credentials"
36 "google.golang.org/grpc/grpclog"
37 "google.golang.org/grpc/testdata"
40 var caFile = flag.String("ca_file", "", "The file containing the CA root cert file")
42 type lockingHistogram struct {
44 histogram *stats.Histogram
47 func (h *lockingHistogram) add(value int64) {
50 h.histogram.Add(value)
53 // swap sets h.histogram to new, and returns its old value.
54 func (h *lockingHistogram) swap(new *stats.Histogram) *stats.Histogram {
62 func (h *lockingHistogram) mergeInto(merged *stats.Histogram) {
65 merged.Merge(h.histogram)
68 type benchmarkClient struct {
71 lastResetTime time.Time
72 histogramOptions stats.HistogramOptions
73 lockingHistograms []lockingHistogram
74 rusageLastReset *syscall.Rusage
77 func printClientConfig(config *testpb.ClientConfig) {
78 // Some config options are ignored:
80 // will always create sync client
81 // - async client threads.
83 grpclog.Printf(" * client type: %v (ignored, always creates sync client)", config.ClientType)
84 grpclog.Printf(" * async client threads: %v (ignored)", config.AsyncClientThreads)
85 // TODO: use cores specified by CoreList when setting list of cores is supported in go.
86 grpclog.Printf(" * core list: %v (ignored)", config.CoreList)
88 grpclog.Printf(" - security params: %v", config.SecurityParams)
89 grpclog.Printf(" - core limit: %v", config.CoreLimit)
90 grpclog.Printf(" - payload config: %v", config.PayloadConfig)
91 grpclog.Printf(" - rpcs per chann: %v", config.OutstandingRpcsPerChannel)
92 grpclog.Printf(" - channel number: %v", config.ClientChannels)
93 grpclog.Printf(" - load params: %v", config.LoadParams)
94 grpclog.Printf(" - rpc type: %v", config.RpcType)
95 grpclog.Printf(" - histogram params: %v", config.HistogramParams)
96 grpclog.Printf(" - server targets: %v", config.ServerTargets)
99 func setupClientEnv(config *testpb.ClientConfig) {
100 // Use all cpu cores available on machine by default.
101 // TODO: Revisit this for the optimal default setup.
102 if config.CoreLimit > 0 {
103 runtime.GOMAXPROCS(int(config.CoreLimit))
105 runtime.GOMAXPROCS(runtime.NumCPU())
109 // createConns creates connections according to given config.
110 // It returns the connections and corresponding function to close them.
111 // It returns non-nil error if there is anything wrong.
112 func createConns(config *testpb.ClientConfig) ([]*grpc.ClientConn, func(), error) {
113 var opts []grpc.DialOption
115 // Sanity check for client type.
116 switch config.ClientType {
117 case testpb.ClientType_SYNC_CLIENT:
118 case testpb.ClientType_ASYNC_CLIENT:
120 return nil, nil, grpc.Errorf(codes.InvalidArgument, "unknow client type: %v", config.ClientType)
123 // Check and set security options.
124 if config.SecurityParams != nil {
126 *caFile = testdata.Path("ca.pem")
128 creds, err := credentials.NewClientTLSFromFile(*caFile, config.SecurityParams.ServerHostOverride)
130 return nil, nil, grpc.Errorf(codes.InvalidArgument, "failed to create TLS credentials %v", err)
132 opts = append(opts, grpc.WithTransportCredentials(creds))
134 opts = append(opts, grpc.WithInsecure())
137 // Use byteBufCodec if it is required.
138 if config.PayloadConfig != nil {
139 switch config.PayloadConfig.Payload.(type) {
140 case *testpb.PayloadConfig_BytebufParams:
141 opts = append(opts, grpc.WithCodec(byteBufCodec{}))
142 case *testpb.PayloadConfig_SimpleParams:
144 return nil, nil, grpc.Errorf(codes.InvalidArgument, "unknow payload config: %v", config.PayloadConfig)
148 // Create connections.
149 connCount := int(config.ClientChannels)
150 conns := make([]*grpc.ClientConn, connCount, connCount)
151 for connIndex := 0; connIndex < connCount; connIndex++ {
152 conns[connIndex] = benchmark.NewClientConn(config.ServerTargets[connIndex%len(config.ServerTargets)], opts...)
155 return conns, func() {
156 for _, conn := range conns {
162 func performRPCs(config *testpb.ClientConfig, conns []*grpc.ClientConn, bc *benchmarkClient) error {
163 // Read payload size and type from config.
165 payloadReqSize, payloadRespSize int
168 if config.PayloadConfig != nil {
169 switch c := config.PayloadConfig.Payload.(type) {
170 case *testpb.PayloadConfig_BytebufParams:
171 payloadReqSize = int(c.BytebufParams.ReqSize)
172 payloadRespSize = int(c.BytebufParams.RespSize)
173 payloadType = "bytebuf"
174 case *testpb.PayloadConfig_SimpleParams:
175 payloadReqSize = int(c.SimpleParams.ReqSize)
176 payloadRespSize = int(c.SimpleParams.RespSize)
177 payloadType = "protobuf"
179 return grpc.Errorf(codes.InvalidArgument, "unknow payload config: %v", config.PayloadConfig)
183 // TODO add open loop distribution.
184 switch config.LoadParams.Load.(type) {
185 case *testpb.LoadParams_ClosedLoop:
186 case *testpb.LoadParams_Poisson:
187 return grpc.Errorf(codes.Unimplemented, "unsupported load params: %v", config.LoadParams)
189 return grpc.Errorf(codes.InvalidArgument, "unknown load params: %v", config.LoadParams)
192 rpcCountPerConn := int(config.OutstandingRpcsPerChannel)
194 switch config.RpcType {
195 case testpb.RpcType_UNARY:
196 bc.doCloseLoopUnary(conns, rpcCountPerConn, payloadReqSize, payloadRespSize)
198 case testpb.RpcType_STREAMING:
199 bc.doCloseLoopStreaming(conns, rpcCountPerConn, payloadReqSize, payloadRespSize, payloadType)
202 return grpc.Errorf(codes.InvalidArgument, "unknown rpc type: %v", config.RpcType)
208 func startBenchmarkClient(config *testpb.ClientConfig) (*benchmarkClient, error) {
209 printClientConfig(config)
211 // Set running environment like how many cores to use.
212 setupClientEnv(config)
214 conns, closeConns, err := createConns(config)
219 rusage := new(syscall.Rusage)
220 syscall.Getrusage(syscall.RUSAGE_SELF, rusage)
222 rpcCountPerConn := int(config.OutstandingRpcsPerChannel)
223 bc := &benchmarkClient{
224 histogramOptions: stats.HistogramOptions{
225 NumBuckets: int(math.Log(config.HistogramParams.MaxPossible)/math.Log(1+config.HistogramParams.Resolution)) + 1,
226 GrowthFactor: config.HistogramParams.Resolution,
227 BaseBucketSize: (1 + config.HistogramParams.Resolution),
230 lockingHistograms: make([]lockingHistogram, rpcCountPerConn*len(conns), rpcCountPerConn*len(conns)),
232 stop: make(chan bool),
233 lastResetTime: time.Now(),
234 closeConns: closeConns,
235 rusageLastReset: rusage,
238 if err = performRPCs(config, conns, bc); err != nil {
239 // Close all connections if performRPCs failed.
247 func (bc *benchmarkClient) doCloseLoopUnary(conns []*grpc.ClientConn, rpcCountPerConn int, reqSize int, respSize int) {
248 for ic, conn := range conns {
249 client := testpb.NewBenchmarkServiceClient(conn)
250 // For each connection, create rpcCountPerConn goroutines to do rpc.
251 for j := 0; j < rpcCountPerConn; j++ {
252 // Create histogram for each goroutine.
253 idx := ic*rpcCountPerConn + j
254 bc.lockingHistograms[idx].histogram = stats.NewHistogram(bc.histogramOptions)
255 // Start goroutine on the created mutex and histogram.
257 // TODO: do warm up if necessary.
258 // Now relying on worker client to reserve time to do warm up.
259 // The worker client needs to wait for some time after client is created,
260 // before starting benchmark.
261 done := make(chan bool)
265 if err := benchmark.DoUnaryCall(client, reqSize, respSize); err != nil {
272 elapse := time.Since(start)
273 bc.lockingHistograms[idx].add(int64(elapse))
290 func (bc *benchmarkClient) doCloseLoopStreaming(conns []*grpc.ClientConn, rpcCountPerConn int, reqSize int, respSize int, payloadType string) {
291 var doRPC func(testpb.BenchmarkService_StreamingCallClient, int, int) error
292 if payloadType == "bytebuf" {
293 doRPC = benchmark.DoByteBufStreamingRoundTrip
295 doRPC = benchmark.DoStreamingRoundTrip
297 for ic, conn := range conns {
298 // For each connection, create rpcCountPerConn goroutines to do rpc.
299 for j := 0; j < rpcCountPerConn; j++ {
300 c := testpb.NewBenchmarkServiceClient(conn)
301 stream, err := c.StreamingCall(context.Background())
303 grpclog.Fatalf("%v.StreamingCall(_) = _, %v", c, err)
305 // Create histogram for each goroutine.
306 idx := ic*rpcCountPerConn + j
307 bc.lockingHistograms[idx].histogram = stats.NewHistogram(bc.histogramOptions)
308 // Start goroutine on the created mutex and histogram.
310 // TODO: do warm up if necessary.
311 // Now relying on worker client to reserve time to do warm up.
312 // The worker client needs to wait for some time after client is created,
313 // before starting benchmark.
316 if err := doRPC(stream, reqSize, respSize); err != nil {
319 elapse := time.Since(start)
320 bc.lockingHistograms[idx].add(int64(elapse))
332 // getStats returns the stats for benchmark client.
333 // It resets lastResetTime and all histograms if argument reset is true.
334 func (bc *benchmarkClient) getStats(reset bool) *testpb.ClientStats {
335 var wallTimeElapsed, uTimeElapsed, sTimeElapsed float64
336 mergedHistogram := stats.NewHistogram(bc.histogramOptions)
337 latestRusage := new(syscall.Rusage)
340 // Merging histogram may take some time.
341 // Put all histograms aside and merge later.
342 toMerge := make([]*stats.Histogram, len(bc.lockingHistograms), len(bc.lockingHistograms))
343 for i := range bc.lockingHistograms {
344 toMerge[i] = bc.lockingHistograms[i].swap(stats.NewHistogram(bc.histogramOptions))
347 for i := 0; i < len(toMerge); i++ {
348 mergedHistogram.Merge(toMerge[i])
351 wallTimeElapsed = time.Since(bc.lastResetTime).Seconds()
352 syscall.Getrusage(syscall.RUSAGE_SELF, latestRusage)
353 uTimeElapsed, sTimeElapsed = cpuTimeDiff(bc.rusageLastReset, latestRusage)
355 bc.rusageLastReset = latestRusage
356 bc.lastResetTime = time.Now()
358 // Merge only, not reset.
359 for i := range bc.lockingHistograms {
360 bc.lockingHistograms[i].mergeInto(mergedHistogram)
363 wallTimeElapsed = time.Since(bc.lastResetTime).Seconds()
364 syscall.Getrusage(syscall.RUSAGE_SELF, latestRusage)
365 uTimeElapsed, sTimeElapsed = cpuTimeDiff(bc.rusageLastReset, latestRusage)
368 b := make([]uint32, len(mergedHistogram.Buckets), len(mergedHistogram.Buckets))
369 for i, v := range mergedHistogram.Buckets {
370 b[i] = uint32(v.Count)
372 return &testpb.ClientStats{
373 Latencies: &testpb.HistogramData{
375 MinSeen: float64(mergedHistogram.Min),
376 MaxSeen: float64(mergedHistogram.Max),
377 Sum: float64(mergedHistogram.Sum),
378 SumOfSquares: float64(mergedHistogram.SumOfSquares),
379 Count: float64(mergedHistogram.Count),
381 TimeElapsed: wallTimeElapsed,
382 TimeUser: uTimeElapsed,
383 TimeSystem: sTimeElapsed,
387 func (bc *benchmarkClient) shutdown() {