OSDN Git Service

new repo
[bytom/vapor.git] / vendor / google.golang.org / grpc / benchmark / worker / benchmark_client.go
1 /*
2  *
3  * Copyright 2016 gRPC authors.
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  *     http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  *
17  */
18
19 package main
20
21 import (
22         "flag"
23         "math"
24         "runtime"
25         "sync"
26         "syscall"
27         "time"
28
29         "golang.org/x/net/context"
30         "google.golang.org/grpc"
31         "google.golang.org/grpc/benchmark"
32         testpb "google.golang.org/grpc/benchmark/grpc_testing"
33         "google.golang.org/grpc/benchmark/stats"
34         "google.golang.org/grpc/codes"
35         "google.golang.org/grpc/credentials"
36         "google.golang.org/grpc/grpclog"
37         "google.golang.org/grpc/testdata"
38 )
39
40 var caFile = flag.String("ca_file", "", "The file containing the CA root cert file")
41
42 type lockingHistogram struct {
43         mu        sync.Mutex
44         histogram *stats.Histogram
45 }
46
47 func (h *lockingHistogram) add(value int64) {
48         h.mu.Lock()
49         defer h.mu.Unlock()
50         h.histogram.Add(value)
51 }
52
53 // swap sets h.histogram to new, and returns its old value.
54 func (h *lockingHistogram) swap(new *stats.Histogram) *stats.Histogram {
55         h.mu.Lock()
56         defer h.mu.Unlock()
57         old := h.histogram
58         h.histogram = new
59         return old
60 }
61
62 func (h *lockingHistogram) mergeInto(merged *stats.Histogram) {
63         h.mu.Lock()
64         defer h.mu.Unlock()
65         merged.Merge(h.histogram)
66 }
67
68 type benchmarkClient struct {
69         closeConns        func()
70         stop              chan bool
71         lastResetTime     time.Time
72         histogramOptions  stats.HistogramOptions
73         lockingHistograms []lockingHistogram
74         rusageLastReset   *syscall.Rusage
75 }
76
77 func printClientConfig(config *testpb.ClientConfig) {
78         // Some config options are ignored:
79         // - client type:
80         //     will always create sync client
81         // - async client threads.
82         // - core list
83         grpclog.Printf(" * client type: %v (ignored, always creates sync client)", config.ClientType)
84         grpclog.Printf(" * async client threads: %v (ignored)", config.AsyncClientThreads)
85         // TODO: use cores specified by CoreList when setting list of cores is supported in go.
86         grpclog.Printf(" * core list: %v (ignored)", config.CoreList)
87
88         grpclog.Printf(" - security params: %v", config.SecurityParams)
89         grpclog.Printf(" - core limit: %v", config.CoreLimit)
90         grpclog.Printf(" - payload config: %v", config.PayloadConfig)
91         grpclog.Printf(" - rpcs per chann: %v", config.OutstandingRpcsPerChannel)
92         grpclog.Printf(" - channel number: %v", config.ClientChannels)
93         grpclog.Printf(" - load params: %v", config.LoadParams)
94         grpclog.Printf(" - rpc type: %v", config.RpcType)
95         grpclog.Printf(" - histogram params: %v", config.HistogramParams)
96         grpclog.Printf(" - server targets: %v", config.ServerTargets)
97 }
98
99 func setupClientEnv(config *testpb.ClientConfig) {
100         // Use all cpu cores available on machine by default.
101         // TODO: Revisit this for the optimal default setup.
102         if config.CoreLimit > 0 {
103                 runtime.GOMAXPROCS(int(config.CoreLimit))
104         } else {
105                 runtime.GOMAXPROCS(runtime.NumCPU())
106         }
107 }
108
109 // createConns creates connections according to given config.
110 // It returns the connections and corresponding function to close them.
111 // It returns non-nil error if there is anything wrong.
112 func createConns(config *testpb.ClientConfig) ([]*grpc.ClientConn, func(), error) {
113         var opts []grpc.DialOption
114
115         // Sanity check for client type.
116         switch config.ClientType {
117         case testpb.ClientType_SYNC_CLIENT:
118         case testpb.ClientType_ASYNC_CLIENT:
119         default:
120                 return nil, nil, grpc.Errorf(codes.InvalidArgument, "unknow client type: %v", config.ClientType)
121         }
122
123         // Check and set security options.
124         if config.SecurityParams != nil {
125                 if *caFile == "" {
126                         *caFile = testdata.Path("ca.pem")
127                 }
128                 creds, err := credentials.NewClientTLSFromFile(*caFile, config.SecurityParams.ServerHostOverride)
129                 if err != nil {
130                         return nil, nil, grpc.Errorf(codes.InvalidArgument, "failed to create TLS credentials %v", err)
131                 }
132                 opts = append(opts, grpc.WithTransportCredentials(creds))
133         } else {
134                 opts = append(opts, grpc.WithInsecure())
135         }
136
137         // Use byteBufCodec if it is required.
138         if config.PayloadConfig != nil {
139                 switch config.PayloadConfig.Payload.(type) {
140                 case *testpb.PayloadConfig_BytebufParams:
141                         opts = append(opts, grpc.WithCodec(byteBufCodec{}))
142                 case *testpb.PayloadConfig_SimpleParams:
143                 default:
144                         return nil, nil, grpc.Errorf(codes.InvalidArgument, "unknow payload config: %v", config.PayloadConfig)
145                 }
146         }
147
148         // Create connections.
149         connCount := int(config.ClientChannels)
150         conns := make([]*grpc.ClientConn, connCount, connCount)
151         for connIndex := 0; connIndex < connCount; connIndex++ {
152                 conns[connIndex] = benchmark.NewClientConn(config.ServerTargets[connIndex%len(config.ServerTargets)], opts...)
153         }
154
155         return conns, func() {
156                 for _, conn := range conns {
157                         conn.Close()
158                 }
159         }, nil
160 }
161
162 func performRPCs(config *testpb.ClientConfig, conns []*grpc.ClientConn, bc *benchmarkClient) error {
163         // Read payload size and type from config.
164         var (
165                 payloadReqSize, payloadRespSize int
166                 payloadType                     string
167         )
168         if config.PayloadConfig != nil {
169                 switch c := config.PayloadConfig.Payload.(type) {
170                 case *testpb.PayloadConfig_BytebufParams:
171                         payloadReqSize = int(c.BytebufParams.ReqSize)
172                         payloadRespSize = int(c.BytebufParams.RespSize)
173                         payloadType = "bytebuf"
174                 case *testpb.PayloadConfig_SimpleParams:
175                         payloadReqSize = int(c.SimpleParams.ReqSize)
176                         payloadRespSize = int(c.SimpleParams.RespSize)
177                         payloadType = "protobuf"
178                 default:
179                         return grpc.Errorf(codes.InvalidArgument, "unknow payload config: %v", config.PayloadConfig)
180                 }
181         }
182
183         // TODO add open loop distribution.
184         switch config.LoadParams.Load.(type) {
185         case *testpb.LoadParams_ClosedLoop:
186         case *testpb.LoadParams_Poisson:
187                 return grpc.Errorf(codes.Unimplemented, "unsupported load params: %v", config.LoadParams)
188         default:
189                 return grpc.Errorf(codes.InvalidArgument, "unknown load params: %v", config.LoadParams)
190         }
191
192         rpcCountPerConn := int(config.OutstandingRpcsPerChannel)
193
194         switch config.RpcType {
195         case testpb.RpcType_UNARY:
196                 bc.doCloseLoopUnary(conns, rpcCountPerConn, payloadReqSize, payloadRespSize)
197                 // TODO open loop.
198         case testpb.RpcType_STREAMING:
199                 bc.doCloseLoopStreaming(conns, rpcCountPerConn, payloadReqSize, payloadRespSize, payloadType)
200                 // TODO open loop.
201         default:
202                 return grpc.Errorf(codes.InvalidArgument, "unknown rpc type: %v", config.RpcType)
203         }
204
205         return nil
206 }
207
208 func startBenchmarkClient(config *testpb.ClientConfig) (*benchmarkClient, error) {
209         printClientConfig(config)
210
211         // Set running environment like how many cores to use.
212         setupClientEnv(config)
213
214         conns, closeConns, err := createConns(config)
215         if err != nil {
216                 return nil, err
217         }
218
219         rusage := new(syscall.Rusage)
220         syscall.Getrusage(syscall.RUSAGE_SELF, rusage)
221
222         rpcCountPerConn := int(config.OutstandingRpcsPerChannel)
223         bc := &benchmarkClient{
224                 histogramOptions: stats.HistogramOptions{
225                         NumBuckets:     int(math.Log(config.HistogramParams.MaxPossible)/math.Log(1+config.HistogramParams.Resolution)) + 1,
226                         GrowthFactor:   config.HistogramParams.Resolution,
227                         BaseBucketSize: (1 + config.HistogramParams.Resolution),
228                         MinValue:       0,
229                 },
230                 lockingHistograms: make([]lockingHistogram, rpcCountPerConn*len(conns), rpcCountPerConn*len(conns)),
231
232                 stop:            make(chan bool),
233                 lastResetTime:   time.Now(),
234                 closeConns:      closeConns,
235                 rusageLastReset: rusage,
236         }
237
238         if err = performRPCs(config, conns, bc); err != nil {
239                 // Close all connections if performRPCs failed.
240                 closeConns()
241                 return nil, err
242         }
243
244         return bc, nil
245 }
246
247 func (bc *benchmarkClient) doCloseLoopUnary(conns []*grpc.ClientConn, rpcCountPerConn int, reqSize int, respSize int) {
248         for ic, conn := range conns {
249                 client := testpb.NewBenchmarkServiceClient(conn)
250                 // For each connection, create rpcCountPerConn goroutines to do rpc.
251                 for j := 0; j < rpcCountPerConn; j++ {
252                         // Create histogram for each goroutine.
253                         idx := ic*rpcCountPerConn + j
254                         bc.lockingHistograms[idx].histogram = stats.NewHistogram(bc.histogramOptions)
255                         // Start goroutine on the created mutex and histogram.
256                         go func(idx int) {
257                                 // TODO: do warm up if necessary.
258                                 // Now relying on worker client to reserve time to do warm up.
259                                 // The worker client needs to wait for some time after client is created,
260                                 // before starting benchmark.
261                                 done := make(chan bool)
262                                 for {
263                                         go func() {
264                                                 start := time.Now()
265                                                 if err := benchmark.DoUnaryCall(client, reqSize, respSize); err != nil {
266                                                         select {
267                                                         case <-bc.stop:
268                                                         case done <- false:
269                                                         }
270                                                         return
271                                                 }
272                                                 elapse := time.Since(start)
273                                                 bc.lockingHistograms[idx].add(int64(elapse))
274                                                 select {
275                                                 case <-bc.stop:
276                                                 case done <- true:
277                                                 }
278                                         }()
279                                         select {
280                                         case <-bc.stop:
281                                                 return
282                                         case <-done:
283                                         }
284                                 }
285                         }(idx)
286                 }
287         }
288 }
289
290 func (bc *benchmarkClient) doCloseLoopStreaming(conns []*grpc.ClientConn, rpcCountPerConn int, reqSize int, respSize int, payloadType string) {
291         var doRPC func(testpb.BenchmarkService_StreamingCallClient, int, int) error
292         if payloadType == "bytebuf" {
293                 doRPC = benchmark.DoByteBufStreamingRoundTrip
294         } else {
295                 doRPC = benchmark.DoStreamingRoundTrip
296         }
297         for ic, conn := range conns {
298                 // For each connection, create rpcCountPerConn goroutines to do rpc.
299                 for j := 0; j < rpcCountPerConn; j++ {
300                         c := testpb.NewBenchmarkServiceClient(conn)
301                         stream, err := c.StreamingCall(context.Background())
302                         if err != nil {
303                                 grpclog.Fatalf("%v.StreamingCall(_) = _, %v", c, err)
304                         }
305                         // Create histogram for each goroutine.
306                         idx := ic*rpcCountPerConn + j
307                         bc.lockingHistograms[idx].histogram = stats.NewHistogram(bc.histogramOptions)
308                         // Start goroutine on the created mutex and histogram.
309                         go func(idx int) {
310                                 // TODO: do warm up if necessary.
311                                 // Now relying on worker client to reserve time to do warm up.
312                                 // The worker client needs to wait for some time after client is created,
313                                 // before starting benchmark.
314                                 for {
315                                         start := time.Now()
316                                         if err := doRPC(stream, reqSize, respSize); err != nil {
317                                                 return
318                                         }
319                                         elapse := time.Since(start)
320                                         bc.lockingHistograms[idx].add(int64(elapse))
321                                         select {
322                                         case <-bc.stop:
323                                                 return
324                                         default:
325                                         }
326                                 }
327                         }(idx)
328                 }
329         }
330 }
331
332 // getStats returns the stats for benchmark client.
333 // It resets lastResetTime and all histograms if argument reset is true.
334 func (bc *benchmarkClient) getStats(reset bool) *testpb.ClientStats {
335         var wallTimeElapsed, uTimeElapsed, sTimeElapsed float64
336         mergedHistogram := stats.NewHistogram(bc.histogramOptions)
337         latestRusage := new(syscall.Rusage)
338
339         if reset {
340                 // Merging histogram may take some time.
341                 // Put all histograms aside and merge later.
342                 toMerge := make([]*stats.Histogram, len(bc.lockingHistograms), len(bc.lockingHistograms))
343                 for i := range bc.lockingHistograms {
344                         toMerge[i] = bc.lockingHistograms[i].swap(stats.NewHistogram(bc.histogramOptions))
345                 }
346
347                 for i := 0; i < len(toMerge); i++ {
348                         mergedHistogram.Merge(toMerge[i])
349                 }
350
351                 wallTimeElapsed = time.Since(bc.lastResetTime).Seconds()
352                 syscall.Getrusage(syscall.RUSAGE_SELF, latestRusage)
353                 uTimeElapsed, sTimeElapsed = cpuTimeDiff(bc.rusageLastReset, latestRusage)
354
355                 bc.rusageLastReset = latestRusage
356                 bc.lastResetTime = time.Now()
357         } else {
358                 // Merge only, not reset.
359                 for i := range bc.lockingHistograms {
360                         bc.lockingHistograms[i].mergeInto(mergedHistogram)
361                 }
362
363                 wallTimeElapsed = time.Since(bc.lastResetTime).Seconds()
364                 syscall.Getrusage(syscall.RUSAGE_SELF, latestRusage)
365                 uTimeElapsed, sTimeElapsed = cpuTimeDiff(bc.rusageLastReset, latestRusage)
366         }
367
368         b := make([]uint32, len(mergedHistogram.Buckets), len(mergedHistogram.Buckets))
369         for i, v := range mergedHistogram.Buckets {
370                 b[i] = uint32(v.Count)
371         }
372         return &testpb.ClientStats{
373                 Latencies: &testpb.HistogramData{
374                         Bucket:       b,
375                         MinSeen:      float64(mergedHistogram.Min),
376                         MaxSeen:      float64(mergedHistogram.Max),
377                         Sum:          float64(mergedHistogram.Sum),
378                         SumOfSquares: float64(mergedHistogram.SumOfSquares),
379                         Count:        float64(mergedHistogram.Count),
380                 },
381                 TimeElapsed: wallTimeElapsed,
382                 TimeUser:    uTimeElapsed,
383                 TimeSystem:  sTimeElapsed,
384         }
385 }
386
387 func (bc *benchmarkClient) shutdown() {
388         close(bc.stop)
389         bc.closeConns()
390 }