3 * Copyright 2017 gRPC authors.
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
9 * http://www.apache.org/licenses/LICENSE-2.0
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
20 Package main provides benchmark with setting flags.
22 An example to run some benchmarks with profiling enabled:
24 go run benchmark/benchmain/main.go -benchtime=10s -workloads=all \
25 -compression=on -maxConcurrentCalls=1 -trace=off \
26 -reqSizeBytes=1,1048576 -respSizeBytes=1,1048576 -networkMode=Local \
27 -cpuProfile=cpuProf -memProfile=memProf -memProfileRate=10000 -resultFile=result
29 As a suggestion, when creating a branch, you can run this benchmark and save the result
30 file "-resultFile=basePerf", and later when you at the middle of the work or finish the
31 work, you can get the benchmark result and compare it with the base anytime.
33 Assume there are two result files names as "basePerf" and "curPerf" created by adding
34 -resultFile=basePerf and -resultFile=curPerf.
35 To format the curPerf, run:
36 go run benchmark/benchresult/main.go curPerf
37 To observe how the performance changes based on a base result, run:
38 go run benchmark/benchresult/main.go basePerf curPerf
62 "golang.org/x/net/context"
63 "google.golang.org/grpc"
64 bm "google.golang.org/grpc/benchmark"
65 testpb "google.golang.org/grpc/benchmark/grpc_testing"
66 "google.golang.org/grpc/benchmark/latency"
67 "google.golang.org/grpc/benchmark/stats"
68 "google.golang.org/grpc/grpclog"
77 var allCompressionModes = []string{modeOn, modeOff, modeBoth}
78 var allTraceModes = []string{modeOn, modeOff, modeBoth}
81 workloadsUnary = "unary"
82 workloadsStreaming = "streaming"
86 var allWorkloads = []string{workloadsUnary, workloadsStreaming, workloadsAll}
89 runMode = []bool{true, true} // {runUnary, runStream}
90 // When set the latency to 0 (no delay), the result is slower than the real result with no delay
91 // because latency simulation section has extra operations
92 ltc = []time.Duration{0, 40 * time.Millisecond} // if non-positive, no delay.
93 kbps = []int{0, 10240} // if non-positive, infinite
94 mtu = []int{0} // if non-positive, infinite
95 maxConcurrentCalls = []int{1, 8, 64, 512}
96 reqSizeBytes = []int{1, 1024, 1024 * 1024}
97 respSizeBytes = []int{1, 1024, 1024 * 1024}
99 benchtime time.Duration
100 memProfile, cpuProfile string
102 enableCompressor []bool
104 benchmarkResultFile string
105 networks = map[string]latency.Network{
106 "Local": latency.Local,
109 "Longhaul": latency.Longhaul,
113 func unaryBenchmark(startTimer func(), stopTimer func(int32), benchFeatures stats.Features, benchtime time.Duration, s *stats.Stats) {
114 caller, close := makeFuncUnary(benchFeatures)
116 runBenchmark(caller, startTimer, stopTimer, benchFeatures, benchtime, s)
119 func streamBenchmark(startTimer func(), stopTimer func(int32), benchFeatures stats.Features, benchtime time.Duration, s *stats.Stats) {
120 caller, close := makeFuncStream(benchFeatures)
122 runBenchmark(caller, startTimer, stopTimer, benchFeatures, benchtime, s)
125 func makeFuncUnary(benchFeatures stats.Features) (func(int), func()) {
126 nw := &latency.Network{Kbps: benchFeatures.Kbps, Latency: benchFeatures.Latency, MTU: benchFeatures.Mtu}
127 opts := []grpc.DialOption{}
128 sopts := []grpc.ServerOption{}
129 if benchFeatures.EnableCompressor {
130 sopts = append(sopts,
131 grpc.RPCCompressor(nopCompressor{}),
132 grpc.RPCDecompressor(nopDecompressor{}),
135 grpc.WithCompressor(nopCompressor{}),
136 grpc.WithDecompressor(nopDecompressor{}),
139 sopts = append(sopts, grpc.MaxConcurrentStreams(uint32(benchFeatures.MaxConcurrentCalls+1)))
140 opts = append(opts, grpc.WithDialer(func(address string, timeout time.Duration) (net.Conn, error) {
141 return nw.TimeoutDialer(net.DialTimeout)("tcp", address, timeout)
143 opts = append(opts, grpc.WithInsecure())
145 target, stopper := bm.StartServer(bm.ServerInfo{Addr: "localhost:0", Type: "protobuf", Network: nw}, sopts...)
146 conn := bm.NewClientConn(target, opts...)
147 tc := testpb.NewBenchmarkServiceClient(conn)
149 unaryCaller(tc, benchFeatures.ReqSizeBytes, benchFeatures.RespSizeBytes)
156 func makeFuncStream(benchFeatures stats.Features) (func(int), func()) {
157 nw := &latency.Network{Kbps: benchFeatures.Kbps, Latency: benchFeatures.Latency, MTU: benchFeatures.Mtu}
158 opts := []grpc.DialOption{}
159 sopts := []grpc.ServerOption{}
160 if benchFeatures.EnableCompressor {
161 sopts = append(sopts,
162 grpc.RPCCompressor(grpc.NewGZIPCompressor()),
163 grpc.RPCDecompressor(grpc.NewGZIPDecompressor()),
166 grpc.WithCompressor(grpc.NewGZIPCompressor()),
167 grpc.WithDecompressor(grpc.NewGZIPDecompressor()),
170 sopts = append(sopts, grpc.MaxConcurrentStreams(uint32(benchFeatures.MaxConcurrentCalls+1)))
171 opts = append(opts, grpc.WithDialer(func(address string, timeout time.Duration) (net.Conn, error) {
172 return nw.TimeoutDialer(net.DialTimeout)("tcp", address, timeout)
174 opts = append(opts, grpc.WithInsecure())
176 target, stopper := bm.StartServer(bm.ServerInfo{Addr: "localhost:0", Type: "protobuf", Network: nw}, sopts...)
177 conn := bm.NewClientConn(target, opts...)
178 tc := testpb.NewBenchmarkServiceClient(conn)
179 streams := make([]testpb.BenchmarkService_StreamingCallClient, benchFeatures.MaxConcurrentCalls)
180 for i := 0; i < benchFeatures.MaxConcurrentCalls; i++ {
181 stream, err := tc.StreamingCall(context.Background())
183 grpclog.Fatalf("%v.StreamingCall(_) = _, %v", tc, err)
187 return func(pos int) {
188 streamCaller(streams[pos], benchFeatures.ReqSizeBytes, benchFeatures.RespSizeBytes)
195 func unaryCaller(client testpb.BenchmarkServiceClient, reqSize, respSize int) {
196 if err := bm.DoUnaryCall(client, reqSize, respSize); err != nil {
197 grpclog.Fatalf("DoUnaryCall failed: %v", err)
201 func streamCaller(stream testpb.BenchmarkService_StreamingCallClient, reqSize, respSize int) {
202 if err := bm.DoStreamingRoundTrip(stream, reqSize, respSize); err != nil {
203 grpclog.Fatalf("DoStreamingRoundTrip failed: %v", err)
207 func runBenchmark(caller func(int), startTimer func(), stopTimer func(int32), benchFeatures stats.Features, benchtime time.Duration, s *stats.Stats) {
208 // Warm up connection.
209 for i := 0; i < 10; i++ {
218 wg.Add(benchFeatures.MaxConcurrentCalls)
219 bmEnd := time.Now().Add(benchtime)
221 for i := 0; i < benchFeatures.MaxConcurrentCalls; i++ {
230 elapse := time.Since(start)
231 atomic.AddInt32(&count, 1)
243 // Initiate main function to get settings of features.
246 workloads, traceMode, compressorMode, readLatency string
247 readKbps, readMtu, readMaxConcurrentCalls intSliceType
248 readReqSizeBytes, readRespSizeBytes intSliceType
250 flag.StringVar(&workloads, "workloads", workloadsAll,
251 fmt.Sprintf("Workloads to execute - One of: %v", strings.Join(allWorkloads, ", ")))
252 flag.StringVar(&traceMode, "trace", modeOff,
253 fmt.Sprintf("Trace mode - One of: %v", strings.Join(allTraceModes, ", ")))
254 flag.StringVar(&readLatency, "latency", "", "Simulated one-way network latency - may be a comma-separated list")
255 flag.DurationVar(&benchtime, "benchtime", time.Second, "Configures the amount of time to run each benchmark")
256 flag.Var(&readKbps, "kbps", "Simulated network throughput (in kbps) - may be a comma-separated list")
257 flag.Var(&readMtu, "mtu", "Simulated network MTU (Maximum Transmission Unit) - may be a comma-separated list")
258 flag.Var(&readMaxConcurrentCalls, "maxConcurrentCalls", "Number of concurrent RPCs during benchmarks")
259 flag.Var(&readReqSizeBytes, "reqSizeBytes", "Request size in bytes - may be a comma-separated list")
260 flag.Var(&readRespSizeBytes, "respSizeBytes", "Response size in bytes - may be a comma-separated list")
261 flag.StringVar(&memProfile, "memProfile", "", "Enables memory profiling output to the filename provided.")
262 flag.IntVar(&memProfileRate, "memProfileRate", 512*1024, "Configures the memory profiling rate. \n"+
263 "memProfile should be set before setting profile rate. To include every allocated block in the profile, "+
264 "set MemProfileRate to 1. To turn off profiling entirely, set MemProfileRate to 0. 512 * 1024 by default.")
265 flag.StringVar(&cpuProfile, "cpuProfile", "", "Enables CPU profiling output to the filename provided")
266 flag.StringVar(&compressorMode, "compression", modeOff,
267 fmt.Sprintf("Compression mode - One of: %v", strings.Join(allCompressionModes, ", ")))
268 flag.StringVar(&benchmarkResultFile, "resultFile", "", "Save the benchmark result into a binary file")
269 flag.StringVar(&networkMode, "networkMode", "", "Network mode includes LAN, WAN, Local and Longhaul")
271 if flag.NArg() != 0 {
272 log.Fatal("Error: unparsed arguments: ", flag.Args())
278 case workloadsStreaming:
285 log.Fatalf("Unknown workloads setting: %v (want one of: %v)",
286 workloads, strings.Join(allWorkloads, ", "))
288 enableCompressor = setMode(compressorMode)
289 enableTrace = setMode(traceMode)
290 // Time input formats as (time + unit).
291 readTimeFromInput(<c, readLatency)
292 readIntFromIntSlice(&kbps, readKbps)
293 readIntFromIntSlice(&mtu, readMtu)
294 readIntFromIntSlice(&maxConcurrentCalls, readMaxConcurrentCalls)
295 readIntFromIntSlice(&reqSizeBytes, readReqSizeBytes)
296 readIntFromIntSlice(&respSizeBytes, readRespSizeBytes)
297 // Re-write latency, kpbs and mtu if network mode is set.
298 if network, ok := networks[networkMode]; ok {
299 ltc = []time.Duration{network.Latency}
300 kbps = []int{network.Kbps}
301 mtu = []int{network.MTU}
305 func setMode(name string) []bool {
312 return []bool{false, true}
314 log.Fatalf("Unknown %s setting: %v (want one of: %v)",
315 name, name, strings.Join(allCompressionModes, ", "))
320 type intSliceType []int
322 func (intSlice *intSliceType) String() string {
323 return fmt.Sprintf("%v", *intSlice)
326 func (intSlice *intSliceType) Set(value string) error {
327 if len(*intSlice) > 0 {
328 return errors.New("interval flag already set")
330 for _, num := range strings.Split(value, ",") {
331 next, err := strconv.Atoi(num)
335 *intSlice = append(*intSlice, next)
340 func readIntFromIntSlice(values *[]int, replace intSliceType) {
341 // If not set replace in the flag, just return to run the default settings.
342 if len(replace) == 0 {
348 func readTimeFromInput(values *[]time.Duration, replace string) {
349 if strings.Compare(replace, "") != 0 {
350 *values = []time.Duration{}
351 for _, ltc := range strings.Split(replace, ",") {
352 duration, err := time.ParseDuration(ltc)
354 log.Fatal(err.Error())
356 *values = append(*values, duration)
363 featuresPos := make([]int, 8)
364 // 0:enableTracing 1:ltc 2:kbps 3:mtu 4:maxC 5:reqSize 6:respSize
365 featuresNum := []int{len(enableTrace), len(ltc), len(kbps), len(mtu),
366 len(maxConcurrentCalls), len(reqSizeBytes), len(respSizeBytes), len(enableCompressor)}
367 initalPos := make([]int, len(featuresPos))
368 s := stats.NewStats(10)
370 var memStats runtime.MemStats
371 var results testing.BenchmarkResult
372 var startAllocs, startBytes uint64
373 var startTime time.Time
375 var startTimer = func() {
376 runtime.ReadMemStats(&memStats)
377 startAllocs = memStats.Mallocs
378 startBytes = memStats.TotalAlloc
379 startTime = time.Now()
381 var stopTimer = func(count int32) {
382 runtime.ReadMemStats(&memStats)
383 results = testing.BenchmarkResult{N: int(count), T: time.Now().Sub(startTime),
384 Bytes: 0, MemAllocs: memStats.Mallocs - startAllocs, MemBytes: memStats.TotalAlloc - startBytes}
386 sharedPos := make([]bool, len(featuresPos))
387 for i := 0; i < len(featuresPos); i++ {
388 if featuresNum[i] <= 1 {
394 resultSlice := []stats.BenchResults{}
395 for !reflect.DeepEqual(featuresPos, initalPos) || start {
397 benchFeature := stats.Features{
398 NetworkMode: networkMode,
399 EnableTrace: enableTrace[featuresPos[0]],
400 Latency: ltc[featuresPos[1]],
401 Kbps: kbps[featuresPos[2]],
402 Mtu: mtu[featuresPos[3]],
403 MaxConcurrentCalls: maxConcurrentCalls[featuresPos[4]],
404 ReqSizeBytes: reqSizeBytes[featuresPos[5]],
405 RespSizeBytes: respSizeBytes[featuresPos[6]],
406 EnableCompressor: enableCompressor[featuresPos[7]],
409 grpc.EnableTracing = enableTrace[featuresPos[0]]
411 unaryBenchmark(startTimer, stopTimer, benchFeature, benchtime, s)
412 s.SetBenchmarkResult("Unary", benchFeature, results.N,
413 results.AllocedBytesPerOp(), results.AllocsPerOp(), sharedPos)
414 fmt.Println(s.BenchString())
415 fmt.Println(s.String())
416 resultSlice = append(resultSlice, s.GetBenchmarkResults())
420 streamBenchmark(startTimer, stopTimer, benchFeature, benchtime, s)
421 s.SetBenchmarkResult("Stream", benchFeature, results.N,
422 results.AllocedBytesPerOp(), results.AllocsPerOp(), sharedPos)
423 fmt.Println(s.BenchString())
424 fmt.Println(s.String())
425 resultSlice = append(resultSlice, s.GetBenchmarkResults())
428 bm.AddOne(featuresPos, featuresNum)
434 if memProfile != "" {
435 runtime.MemProfileRate = memProfileRate
437 if cpuProfile != "" {
438 f, err := os.Create(cpuProfile)
440 fmt.Fprintf(os.Stderr, "testing: %s\n", err)
443 if err := pprof.StartCPUProfile(f); err != nil {
444 fmt.Fprintf(os.Stderr, "testing: can't start cpu profile: %s\n", err)
451 func after(data []stats.BenchResults) {
452 if cpuProfile != "" {
453 pprof.StopCPUProfile() // flushes profile to disk
455 if memProfile != "" {
456 f, err := os.Create(memProfile)
458 fmt.Fprintf(os.Stderr, "testing: %s\n", err)
461 runtime.GC() // materialize all statistics
462 if err = pprof.WriteHeapProfile(f); err != nil {
463 fmt.Fprintf(os.Stderr, "testing: can't write heap profile %s: %s\n", memProfile, err)
468 if benchmarkResultFile != "" {
469 f, err := os.Create(benchmarkResultFile)
471 log.Fatalf("testing: can't write benchmark result %s: %s\n", benchmarkResultFile, err)
473 dataEncoder := gob.NewEncoder(f)
474 dataEncoder.Encode(data)
479 // nopCompressor is a compressor that just copies data.
480 type nopCompressor struct{}
482 func (nopCompressor) Do(w io.Writer, p []byte) error {
488 return fmt.Errorf("nopCompressor.Write: wrote %v bytes; want %v", n, len(p))
493 func (nopCompressor) Type() string { return "nop" }
495 // nopDecompressor is a decompressor that just copies data.
496 type nopDecompressor struct{}
498 func (nopDecompressor) Do(r io.Reader) ([]byte, error) { return ioutil.ReadAll(r) }
499 func (nopDecompressor) Type() string { return "nop" }