3 * Copyright 2016 gRPC authors.
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
9 * http://www.apache.org/licenses/LICENSE-2.0
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
32 "golang.org/x/net/context"
33 "google.golang.org/grpc"
34 testpb "google.golang.org/grpc/benchmark/grpc_testing"
35 "google.golang.org/grpc/codes"
36 "google.golang.org/grpc/grpclog"
40 driverPort = flag.Int("driver_port", 10000, "port for communication with driver")
41 serverPort = flag.Int("server_port", 0, "port for benchmark server if not specified by server config message")
42 pprofPort = flag.Int("pprof_port", -1, "Port for pprof debug server to listen on. Pprof server doesn't start if unset")
43 blockProfRate = flag.Int("block_prof_rate", 0, "fraction of goroutine blocking events to report in blocking profile")
46 type byteBufCodec struct {
49 func (byteBufCodec) Marshal(v interface{}) ([]byte, error) {
52 return nil, fmt.Errorf("failed to marshal: %v is not type of *[]byte", v)
57 func (byteBufCodec) Unmarshal(data []byte, v interface{}) error {
60 return fmt.Errorf("failed to marshal: %v is not type of *[]byte", v)
66 func (byteBufCodec) String() string {
70 // workerServer implements WorkerService rpc handlers.
71 // It can create benchmarkServer or benchmarkClient on demand.
72 type workerServer struct {
77 func (s *workerServer) RunServer(stream testpb.WorkerService_RunServerServer) error {
78 var bs *benchmarkServer
80 // Close benchmark server when stream ends.
81 grpclog.Printf("closing benchmark server")
87 in, err := stream.Recv()
95 var out *testpb.ServerStatus
96 switch argtype := in.Argtype.(type) {
97 case *testpb.ServerArgs_Setup:
98 grpclog.Printf("server setup received:")
100 grpclog.Printf("server setup received when server already exists, closing the existing server")
103 bs, err = startBenchmarkServer(argtype.Setup, s.serverPort)
107 out = &testpb.ServerStatus{
108 Stats: bs.getStats(false),
109 Port: int32(bs.port),
110 Cores: int32(bs.cores),
113 case *testpb.ServerArgs_Mark:
114 grpclog.Printf("server mark received:")
115 grpclog.Printf(" - %v", argtype)
117 return grpc.Errorf(codes.InvalidArgument, "server does not exist when mark received")
119 out = &testpb.ServerStatus{
120 Stats: bs.getStats(argtype.Mark.Reset_),
121 Port: int32(bs.port),
122 Cores: int32(bs.cores),
126 if err := stream.Send(out); err != nil {
132 func (s *workerServer) RunClient(stream testpb.WorkerService_RunClientServer) error {
133 var bc *benchmarkClient
135 // Shut down benchmark client when stream ends.
136 grpclog.Printf("shuting down benchmark client")
142 in, err := stream.Recv()
150 var out *testpb.ClientStatus
151 switch t := in.Argtype.(type) {
152 case *testpb.ClientArgs_Setup:
153 grpclog.Printf("client setup received:")
155 grpclog.Printf("client setup received when client already exists, shuting down the existing client")
158 bc, err = startBenchmarkClient(t.Setup)
162 out = &testpb.ClientStatus{
163 Stats: bc.getStats(false),
166 case *testpb.ClientArgs_Mark:
167 grpclog.Printf("client mark received:")
168 grpclog.Printf(" - %v", t)
170 return grpc.Errorf(codes.InvalidArgument, "client does not exist when mark received")
172 out = &testpb.ClientStatus{
173 Stats: bc.getStats(t.Mark.Reset_),
177 if err := stream.Send(out); err != nil {
183 func (s *workerServer) CoreCount(ctx context.Context, in *testpb.CoreRequest) (*testpb.CoreResponse, error) {
184 grpclog.Printf("core count: %v", runtime.NumCPU())
185 return &testpb.CoreResponse{Cores: int32(runtime.NumCPU())}, nil
188 func (s *workerServer) QuitWorker(ctx context.Context, in *testpb.Void) (*testpb.Void, error) {
189 grpclog.Printf("quiting worker")
191 return &testpb.Void{}, nil
195 grpc.EnableTracing = false
198 lis, err := net.Listen("tcp", ":"+strconv.Itoa(*driverPort))
200 grpclog.Fatalf("failed to listen: %v", err)
202 grpclog.Printf("worker listening at port %v", *driverPort)
204 s := grpc.NewServer()
205 stop := make(chan bool)
206 testpb.RegisterWorkerServiceServer(s, &workerServer{
208 serverPort: *serverPort,
213 // Wait for 1 second before stopping the server to make sure the return value of QuitWorker is sent to client.
214 // TODO revise this once server graceful stop is supported in gRPC.
215 time.Sleep(time.Second)
219 runtime.SetBlockProfileRate(*blockProfRate)
223 grpclog.Println("Starting pprof server on port " + strconv.Itoa(*pprofPort))
224 grpclog.Println(http.ListenAndServe("localhost:"+strconv.Itoa(*pprofPort), nil))