OSDN Git Service

new repo
[bytom/vapor.git] / vendor / google.golang.org / grpc / benchmark / benchmain / main.go
1 /*
2  *
3  * Copyright 2017 gRPC authors.
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  *     http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  *
17  */
18
19 /*
20 Package main provides benchmark with setting flags.
21
22 An example to run some benchmarks with profiling enabled:
23
24 go run benchmark/benchmain/main.go -benchtime=10s -workloads=all \
25   -compression=on -maxConcurrentCalls=1 -trace=off \
26   -reqSizeBytes=1,1048576 -respSizeBytes=1,1048576 -networkMode=Local \
27   -cpuProfile=cpuProf -memProfile=memProf -memProfileRate=10000 -resultFile=result
28
29 As a suggestion, when creating a branch, you can run this benchmark and save the result
30 file "-resultFile=basePerf", and later when you at the middle of the work or finish the
31 work, you can get the benchmark result and compare it with the base anytime.
32
33 Assume there are two result files names as "basePerf" and "curPerf" created by adding
34 -resultFile=basePerf and -resultFile=curPerf.
35         To format the curPerf, run:
36         go run benchmark/benchresult/main.go curPerf
37         To observe how the performance changes based on a base result, run:
38         go run benchmark/benchresult/main.go basePerf curPerf
39 */
40 package main
41
42 import (
43         "encoding/gob"
44         "errors"
45         "flag"
46         "fmt"
47         "io"
48         "io/ioutil"
49         "log"
50         "net"
51         "os"
52         "reflect"
53         "runtime"
54         "runtime/pprof"
55         "strconv"
56         "strings"
57         "sync"
58         "sync/atomic"
59         "testing"
60         "time"
61
62         "golang.org/x/net/context"
63         "google.golang.org/grpc"
64         bm "google.golang.org/grpc/benchmark"
65         testpb "google.golang.org/grpc/benchmark/grpc_testing"
66         "google.golang.org/grpc/benchmark/latency"
67         "google.golang.org/grpc/benchmark/stats"
68         "google.golang.org/grpc/grpclog"
69 )
70
71 const (
72         modeOn   = "on"
73         modeOff  = "off"
74         modeBoth = "both"
75 )
76
77 var allCompressionModes = []string{modeOn, modeOff, modeBoth}
78 var allTraceModes = []string{modeOn, modeOff, modeBoth}
79
80 const (
81         workloadsUnary     = "unary"
82         workloadsStreaming = "streaming"
83         workloadsAll       = "all"
84 )
85
86 var allWorkloads = []string{workloadsUnary, workloadsStreaming, workloadsAll}
87
88 var (
89         runMode = []bool{true, true} // {runUnary, runStream}
90         // When set the latency to 0 (no delay), the result is slower than the real result with no delay
91         // because latency simulation section has extra operations
92         ltc                    = []time.Duration{0, 40 * time.Millisecond} // if non-positive, no delay.
93         kbps                   = []int{0, 10240}                           // if non-positive, infinite
94         mtu                    = []int{0}                                  // if non-positive, infinite
95         maxConcurrentCalls     = []int{1, 8, 64, 512}
96         reqSizeBytes           = []int{1, 1024, 1024 * 1024}
97         respSizeBytes          = []int{1, 1024, 1024 * 1024}
98         enableTrace            []bool
99         benchtime              time.Duration
100         memProfile, cpuProfile string
101         memProfileRate         int
102         enableCompressor       []bool
103         networkMode            string
104         benchmarkResultFile    string
105         networks               = map[string]latency.Network{
106                 "Local":    latency.Local,
107                 "LAN":      latency.LAN,
108                 "WAN":      latency.WAN,
109                 "Longhaul": latency.Longhaul,
110         }
111 )
112
113 func unaryBenchmark(startTimer func(), stopTimer func(int32), benchFeatures stats.Features, benchtime time.Duration, s *stats.Stats) {
114         caller, close := makeFuncUnary(benchFeatures)
115         defer close()
116         runBenchmark(caller, startTimer, stopTimer, benchFeatures, benchtime, s)
117 }
118
119 func streamBenchmark(startTimer func(), stopTimer func(int32), benchFeatures stats.Features, benchtime time.Duration, s *stats.Stats) {
120         caller, close := makeFuncStream(benchFeatures)
121         defer close()
122         runBenchmark(caller, startTimer, stopTimer, benchFeatures, benchtime, s)
123 }
124
125 func makeFuncUnary(benchFeatures stats.Features) (func(int), func()) {
126         nw := &latency.Network{Kbps: benchFeatures.Kbps, Latency: benchFeatures.Latency, MTU: benchFeatures.Mtu}
127         opts := []grpc.DialOption{}
128         sopts := []grpc.ServerOption{}
129         if benchFeatures.EnableCompressor {
130                 sopts = append(sopts,
131                         grpc.RPCCompressor(nopCompressor{}),
132                         grpc.RPCDecompressor(nopDecompressor{}),
133                 )
134                 opts = append(opts,
135                         grpc.WithCompressor(nopCompressor{}),
136                         grpc.WithDecompressor(nopDecompressor{}),
137                 )
138         }
139         sopts = append(sopts, grpc.MaxConcurrentStreams(uint32(benchFeatures.MaxConcurrentCalls+1)))
140         opts = append(opts, grpc.WithDialer(func(address string, timeout time.Duration) (net.Conn, error) {
141                 return nw.TimeoutDialer(net.DialTimeout)("tcp", address, timeout)
142         }))
143         opts = append(opts, grpc.WithInsecure())
144
145         target, stopper := bm.StartServer(bm.ServerInfo{Addr: "localhost:0", Type: "protobuf", Network: nw}, sopts...)
146         conn := bm.NewClientConn(target, opts...)
147         tc := testpb.NewBenchmarkServiceClient(conn)
148         return func(int) {
149                         unaryCaller(tc, benchFeatures.ReqSizeBytes, benchFeatures.RespSizeBytes)
150                 }, func() {
151                         conn.Close()
152                         stopper()
153                 }
154 }
155
156 func makeFuncStream(benchFeatures stats.Features) (func(int), func()) {
157         nw := &latency.Network{Kbps: benchFeatures.Kbps, Latency: benchFeatures.Latency, MTU: benchFeatures.Mtu}
158         opts := []grpc.DialOption{}
159         sopts := []grpc.ServerOption{}
160         if benchFeatures.EnableCompressor {
161                 sopts = append(sopts,
162                         grpc.RPCCompressor(grpc.NewGZIPCompressor()),
163                         grpc.RPCDecompressor(grpc.NewGZIPDecompressor()),
164                 )
165                 opts = append(opts,
166                         grpc.WithCompressor(grpc.NewGZIPCompressor()),
167                         grpc.WithDecompressor(grpc.NewGZIPDecompressor()),
168                 )
169         }
170         sopts = append(sopts, grpc.MaxConcurrentStreams(uint32(benchFeatures.MaxConcurrentCalls+1)))
171         opts = append(opts, grpc.WithDialer(func(address string, timeout time.Duration) (net.Conn, error) {
172                 return nw.TimeoutDialer(net.DialTimeout)("tcp", address, timeout)
173         }))
174         opts = append(opts, grpc.WithInsecure())
175
176         target, stopper := bm.StartServer(bm.ServerInfo{Addr: "localhost:0", Type: "protobuf", Network: nw}, sopts...)
177         conn := bm.NewClientConn(target, opts...)
178         tc := testpb.NewBenchmarkServiceClient(conn)
179         streams := make([]testpb.BenchmarkService_StreamingCallClient, benchFeatures.MaxConcurrentCalls)
180         for i := 0; i < benchFeatures.MaxConcurrentCalls; i++ {
181                 stream, err := tc.StreamingCall(context.Background())
182                 if err != nil {
183                         grpclog.Fatalf("%v.StreamingCall(_) = _, %v", tc, err)
184                 }
185                 streams[i] = stream
186         }
187         return func(pos int) {
188                         streamCaller(streams[pos], benchFeatures.ReqSizeBytes, benchFeatures.RespSizeBytes)
189                 }, func() {
190                         conn.Close()
191                         stopper()
192                 }
193 }
194
195 func unaryCaller(client testpb.BenchmarkServiceClient, reqSize, respSize int) {
196         if err := bm.DoUnaryCall(client, reqSize, respSize); err != nil {
197                 grpclog.Fatalf("DoUnaryCall failed: %v", err)
198         }
199 }
200
201 func streamCaller(stream testpb.BenchmarkService_StreamingCallClient, reqSize, respSize int) {
202         if err := bm.DoStreamingRoundTrip(stream, reqSize, respSize); err != nil {
203                 grpclog.Fatalf("DoStreamingRoundTrip failed: %v", err)
204         }
205 }
206
207 func runBenchmark(caller func(int), startTimer func(), stopTimer func(int32), benchFeatures stats.Features, benchtime time.Duration, s *stats.Stats) {
208         // Warm up connection.
209         for i := 0; i < 10; i++ {
210                 caller(0)
211         }
212         // Run benchmark.
213         startTimer()
214         var (
215                 mu sync.Mutex
216                 wg sync.WaitGroup
217         )
218         wg.Add(benchFeatures.MaxConcurrentCalls)
219         bmEnd := time.Now().Add(benchtime)
220         var count int32
221         for i := 0; i < benchFeatures.MaxConcurrentCalls; i++ {
222                 go func(pos int) {
223                         for {
224                                 t := time.Now()
225                                 if t.After(bmEnd) {
226                                         break
227                                 }
228                                 start := time.Now()
229                                 caller(pos)
230                                 elapse := time.Since(start)
231                                 atomic.AddInt32(&count, 1)
232                                 mu.Lock()
233                                 s.Add(elapse)
234                                 mu.Unlock()
235                         }
236                         wg.Done()
237                 }(i)
238         }
239         wg.Wait()
240         stopTimer(count)
241 }
242
243 // Initiate main function to get settings of features.
244 func init() {
245         var (
246                 workloads, traceMode, compressorMode, readLatency string
247                 readKbps, readMtu, readMaxConcurrentCalls         intSliceType
248                 readReqSizeBytes, readRespSizeBytes               intSliceType
249         )
250         flag.StringVar(&workloads, "workloads", workloadsAll,
251                 fmt.Sprintf("Workloads to execute - One of: %v", strings.Join(allWorkloads, ", ")))
252         flag.StringVar(&traceMode, "trace", modeOff,
253                 fmt.Sprintf("Trace mode - One of: %v", strings.Join(allTraceModes, ", ")))
254         flag.StringVar(&readLatency, "latency", "", "Simulated one-way network latency - may be a comma-separated list")
255         flag.DurationVar(&benchtime, "benchtime", time.Second, "Configures the amount of time to run each benchmark")
256         flag.Var(&readKbps, "kbps", "Simulated network throughput (in kbps) - may be a comma-separated list")
257         flag.Var(&readMtu, "mtu", "Simulated network MTU (Maximum Transmission Unit) - may be a comma-separated list")
258         flag.Var(&readMaxConcurrentCalls, "maxConcurrentCalls", "Number of concurrent RPCs during benchmarks")
259         flag.Var(&readReqSizeBytes, "reqSizeBytes", "Request size in bytes - may be a comma-separated list")
260         flag.Var(&readRespSizeBytes, "respSizeBytes", "Response size in bytes - may be a comma-separated list")
261         flag.StringVar(&memProfile, "memProfile", "", "Enables memory profiling output to the filename provided.")
262         flag.IntVar(&memProfileRate, "memProfileRate", 512*1024, "Configures the memory profiling rate. \n"+
263                 "memProfile should be set before setting profile rate. To include every allocated block in the profile, "+
264                 "set MemProfileRate to 1. To turn off profiling entirely, set MemProfileRate to 0. 512 * 1024 by default.")
265         flag.StringVar(&cpuProfile, "cpuProfile", "", "Enables CPU profiling output to the filename provided")
266         flag.StringVar(&compressorMode, "compression", modeOff,
267                 fmt.Sprintf("Compression mode - One of: %v", strings.Join(allCompressionModes, ", ")))
268         flag.StringVar(&benchmarkResultFile, "resultFile", "", "Save the benchmark result into a binary file")
269         flag.StringVar(&networkMode, "networkMode", "", "Network mode includes LAN, WAN, Local and Longhaul")
270         flag.Parse()
271         if flag.NArg() != 0 {
272                 log.Fatal("Error: unparsed arguments: ", flag.Args())
273         }
274         switch workloads {
275         case workloadsUnary:
276                 runMode[0] = true
277                 runMode[1] = false
278         case workloadsStreaming:
279                 runMode[0] = false
280                 runMode[1] = true
281         case workloadsAll:
282                 runMode[0] = true
283                 runMode[1] = true
284         default:
285                 log.Fatalf("Unknown workloads setting: %v (want one of: %v)",
286                         workloads, strings.Join(allWorkloads, ", "))
287         }
288         enableCompressor = setMode(compressorMode)
289         enableTrace = setMode(traceMode)
290         // Time input formats as (time + unit).
291         readTimeFromInput(&ltc, readLatency)
292         readIntFromIntSlice(&kbps, readKbps)
293         readIntFromIntSlice(&mtu, readMtu)
294         readIntFromIntSlice(&maxConcurrentCalls, readMaxConcurrentCalls)
295         readIntFromIntSlice(&reqSizeBytes, readReqSizeBytes)
296         readIntFromIntSlice(&respSizeBytes, readRespSizeBytes)
297         // Re-write latency, kpbs and mtu if network mode is set.
298         if network, ok := networks[networkMode]; ok {
299                 ltc = []time.Duration{network.Latency}
300                 kbps = []int{network.Kbps}
301                 mtu = []int{network.MTU}
302         }
303 }
304
305 func setMode(name string) []bool {
306         switch name {
307         case modeOn:
308                 return []bool{true}
309         case modeOff:
310                 return []bool{false}
311         case modeBoth:
312                 return []bool{false, true}
313         default:
314                 log.Fatalf("Unknown %s setting: %v (want one of: %v)",
315                         name, name, strings.Join(allCompressionModes, ", "))
316                 return []bool{}
317         }
318 }
319
320 type intSliceType []int
321
322 func (intSlice *intSliceType) String() string {
323         return fmt.Sprintf("%v", *intSlice)
324 }
325
326 func (intSlice *intSliceType) Set(value string) error {
327         if len(*intSlice) > 0 {
328                 return errors.New("interval flag already set")
329         }
330         for _, num := range strings.Split(value, ",") {
331                 next, err := strconv.Atoi(num)
332                 if err != nil {
333                         return err
334                 }
335                 *intSlice = append(*intSlice, next)
336         }
337         return nil
338 }
339
340 func readIntFromIntSlice(values *[]int, replace intSliceType) {
341         // If not set replace in the flag, just return to run the default settings.
342         if len(replace) == 0 {
343                 return
344         }
345         *values = replace
346 }
347
348 func readTimeFromInput(values *[]time.Duration, replace string) {
349         if strings.Compare(replace, "") != 0 {
350                 *values = []time.Duration{}
351                 for _, ltc := range strings.Split(replace, ",") {
352                         duration, err := time.ParseDuration(ltc)
353                         if err != nil {
354                                 log.Fatal(err.Error())
355                         }
356                         *values = append(*values, duration)
357                 }
358         }
359 }
360
361 func main() {
362         before()
363         featuresPos := make([]int, 8)
364         // 0:enableTracing 1:ltc 2:kbps 3:mtu 4:maxC 5:reqSize 6:respSize
365         featuresNum := []int{len(enableTrace), len(ltc), len(kbps), len(mtu),
366                 len(maxConcurrentCalls), len(reqSizeBytes), len(respSizeBytes), len(enableCompressor)}
367         initalPos := make([]int, len(featuresPos))
368         s := stats.NewStats(10)
369         s.SortLatency()
370         var memStats runtime.MemStats
371         var results testing.BenchmarkResult
372         var startAllocs, startBytes uint64
373         var startTime time.Time
374         start := true
375         var startTimer = func() {
376                 runtime.ReadMemStats(&memStats)
377                 startAllocs = memStats.Mallocs
378                 startBytes = memStats.TotalAlloc
379                 startTime = time.Now()
380         }
381         var stopTimer = func(count int32) {
382                 runtime.ReadMemStats(&memStats)
383                 results = testing.BenchmarkResult{N: int(count), T: time.Now().Sub(startTime),
384                         Bytes: 0, MemAllocs: memStats.Mallocs - startAllocs, MemBytes: memStats.TotalAlloc - startBytes}
385         }
386         sharedPos := make([]bool, len(featuresPos))
387         for i := 0; i < len(featuresPos); i++ {
388                 if featuresNum[i] <= 1 {
389                         sharedPos[i] = true
390                 }
391         }
392
393         // Run benchmarks
394         resultSlice := []stats.BenchResults{}
395         for !reflect.DeepEqual(featuresPos, initalPos) || start {
396                 start = false
397                 benchFeature := stats.Features{
398                         NetworkMode:        networkMode,
399                         EnableTrace:        enableTrace[featuresPos[0]],
400                         Latency:            ltc[featuresPos[1]],
401                         Kbps:               kbps[featuresPos[2]],
402                         Mtu:                mtu[featuresPos[3]],
403                         MaxConcurrentCalls: maxConcurrentCalls[featuresPos[4]],
404                         ReqSizeBytes:       reqSizeBytes[featuresPos[5]],
405                         RespSizeBytes:      respSizeBytes[featuresPos[6]],
406                         EnableCompressor:   enableCompressor[featuresPos[7]],
407                 }
408
409                 grpc.EnableTracing = enableTrace[featuresPos[0]]
410                 if runMode[0] {
411                         unaryBenchmark(startTimer, stopTimer, benchFeature, benchtime, s)
412                         s.SetBenchmarkResult("Unary", benchFeature, results.N,
413                                 results.AllocedBytesPerOp(), results.AllocsPerOp(), sharedPos)
414                         fmt.Println(s.BenchString())
415                         fmt.Println(s.String())
416                         resultSlice = append(resultSlice, s.GetBenchmarkResults())
417                         s.Clear()
418                 }
419                 if runMode[1] {
420                         streamBenchmark(startTimer, stopTimer, benchFeature, benchtime, s)
421                         s.SetBenchmarkResult("Stream", benchFeature, results.N,
422                                 results.AllocedBytesPerOp(), results.AllocsPerOp(), sharedPos)
423                         fmt.Println(s.BenchString())
424                         fmt.Println(s.String())
425                         resultSlice = append(resultSlice, s.GetBenchmarkResults())
426                         s.Clear()
427                 }
428                 bm.AddOne(featuresPos, featuresNum)
429         }
430         after(resultSlice)
431 }
432
433 func before() {
434         if memProfile != "" {
435                 runtime.MemProfileRate = memProfileRate
436         }
437         if cpuProfile != "" {
438                 f, err := os.Create(cpuProfile)
439                 if err != nil {
440                         fmt.Fprintf(os.Stderr, "testing: %s\n", err)
441                         return
442                 }
443                 if err := pprof.StartCPUProfile(f); err != nil {
444                         fmt.Fprintf(os.Stderr, "testing: can't start cpu profile: %s\n", err)
445                         f.Close()
446                         return
447                 }
448         }
449 }
450
451 func after(data []stats.BenchResults) {
452         if cpuProfile != "" {
453                 pprof.StopCPUProfile() // flushes profile to disk
454         }
455         if memProfile != "" {
456                 f, err := os.Create(memProfile)
457                 if err != nil {
458                         fmt.Fprintf(os.Stderr, "testing: %s\n", err)
459                         os.Exit(2)
460                 }
461                 runtime.GC() // materialize all statistics
462                 if err = pprof.WriteHeapProfile(f); err != nil {
463                         fmt.Fprintf(os.Stderr, "testing: can't write heap profile %s: %s\n", memProfile, err)
464                         os.Exit(2)
465                 }
466                 f.Close()
467         }
468         if benchmarkResultFile != "" {
469                 f, err := os.Create(benchmarkResultFile)
470                 if err != nil {
471                         log.Fatalf("testing: can't write benchmark result %s: %s\n", benchmarkResultFile, err)
472                 }
473                 dataEncoder := gob.NewEncoder(f)
474                 dataEncoder.Encode(data)
475                 f.Close()
476         }
477 }
478
479 // nopCompressor is a compressor that just copies data.
480 type nopCompressor struct{}
481
482 func (nopCompressor) Do(w io.Writer, p []byte) error {
483         n, err := w.Write(p)
484         if err != nil {
485                 return err
486         }
487         if n != len(p) {
488                 return fmt.Errorf("nopCompressor.Write: wrote %v bytes; want %v", n, len(p))
489         }
490         return nil
491 }
492
493 func (nopCompressor) Type() string { return "nop" }
494
495 // nopDecompressor is a decompressor that just copies data.
496 type nopDecompressor struct{}
497
498 func (nopDecompressor) Do(r io.Reader) ([]byte, error) { return ioutil.ReadAll(r) }
499 func (nopDecompressor) Type() string                   { return "nop" }