OSDN Git Service

new repo
[bytom/vapor.git] / vendor / google.golang.org / grpc / benchmark / client / main.go
1 /*
2  *
3  * Copyright 2017 gRPC authors.
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  *     http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  *
17  */
18
19 package main
20
21 import (
22         "flag"
23         "math"
24         "net"
25         "net/http"
26         _ "net/http/pprof"
27         "sync"
28         "time"
29
30         "golang.org/x/net/context"
31         "google.golang.org/grpc"
32         "google.golang.org/grpc/benchmark"
33         testpb "google.golang.org/grpc/benchmark/grpc_testing"
34         "google.golang.org/grpc/benchmark/stats"
35         "google.golang.org/grpc/grpclog"
36 )
37
38 var (
39         server            = flag.String("server", "", "The server address")
40         maxConcurrentRPCs = flag.Int("max_concurrent_rpcs", 1, "The max number of concurrent RPCs")
41         duration          = flag.Int("duration", math.MaxInt32, "The duration in seconds to run the benchmark client")
42         trace             = flag.Bool("trace", true, "Whether tracing is on")
43         rpcType           = flag.Int("rpc_type", 0,
44                 `Configure different client rpc type. Valid options are:
45                    0 : unary call;
46                    1 : streaming call.`)
47 )
48
49 func unaryCaller(client testpb.BenchmarkServiceClient) {
50         benchmark.DoUnaryCall(client, 1, 1)
51 }
52
53 func streamCaller(stream testpb.BenchmarkService_StreamingCallClient) {
54         benchmark.DoStreamingRoundTrip(stream, 1, 1)
55 }
56
57 func buildConnection() (s *stats.Stats, conn *grpc.ClientConn, tc testpb.BenchmarkServiceClient) {
58         s = stats.NewStats(256)
59         conn = benchmark.NewClientConn(*server)
60         tc = testpb.NewBenchmarkServiceClient(conn)
61         return s, conn, tc
62 }
63
64 func closeLoopUnary() {
65         s, conn, tc := buildConnection()
66
67         for i := 0; i < 100; i++ {
68                 unaryCaller(tc)
69         }
70         ch := make(chan int, *maxConcurrentRPCs*4)
71         var (
72                 mu sync.Mutex
73                 wg sync.WaitGroup
74         )
75         wg.Add(*maxConcurrentRPCs)
76
77         for i := 0; i < *maxConcurrentRPCs; i++ {
78                 go func() {
79                         for range ch {
80                                 start := time.Now()
81                                 unaryCaller(tc)
82                                 elapse := time.Since(start)
83                                 mu.Lock()
84                                 s.Add(elapse)
85                                 mu.Unlock()
86                         }
87                         wg.Done()
88                 }()
89         }
90         // Stop the client when time is up.
91         done := make(chan struct{})
92         go func() {
93                 <-time.After(time.Duration(*duration) * time.Second)
94                 close(done)
95         }()
96         ok := true
97         for ok {
98                 select {
99                 case ch <- 0:
100                 case <-done:
101                         ok = false
102                 }
103         }
104         close(ch)
105         wg.Wait()
106         conn.Close()
107         grpclog.Println(s.String())
108
109 }
110
111 func closeLoopStream() {
112         s, conn, tc := buildConnection()
113         ch := make(chan int, *maxConcurrentRPCs*4)
114         var (
115                 mu sync.Mutex
116                 wg sync.WaitGroup
117         )
118         wg.Add(*maxConcurrentRPCs)
119         // Distribute RPCs over maxConcurrentCalls workers.
120         for i := 0; i < *maxConcurrentRPCs; i++ {
121                 go func() {
122                         stream, err := tc.StreamingCall(context.Background())
123                         if err != nil {
124                                 grpclog.Fatalf("%v.StreamingCall(_) = _, %v", tc, err)
125                         }
126                         // Do some warm up.
127                         for i := 0; i < 100; i++ {
128                                 streamCaller(stream)
129                         }
130                         for range ch {
131                                 start := time.Now()
132                                 streamCaller(stream)
133                                 elapse := time.Since(start)
134                                 mu.Lock()
135                                 s.Add(elapse)
136                                 mu.Unlock()
137                         }
138                         wg.Done()
139                 }()
140         }
141         // Stop the client when time is up.
142         done := make(chan struct{})
143         go func() {
144                 <-time.After(time.Duration(*duration) * time.Second)
145                 close(done)
146         }()
147         ok := true
148         for ok {
149                 select {
150                 case ch <- 0:
151                 case <-done:
152                         ok = false
153                 }
154         }
155         close(ch)
156         wg.Wait()
157         conn.Close()
158         grpclog.Println(s.String())
159 }
160
161 func main() {
162         flag.Parse()
163         grpc.EnableTracing = *trace
164         go func() {
165                 lis, err := net.Listen("tcp", ":0")
166                 if err != nil {
167                         grpclog.Fatalf("Failed to listen: %v", err)
168                 }
169                 grpclog.Println("Client profiling address: ", lis.Addr().String())
170                 if err := http.Serve(lis, nil); err != nil {
171                         grpclog.Fatalf("Failed to serve: %v", err)
172                 }
173         }()
174         switch *rpcType {
175         case 0:
176                 closeLoopUnary()
177         case 1:
178                 closeLoopStream()
179         }
180 }