OSDN Git Service

new repo
[bytom/vapor.git] / vendor / google.golang.org / grpc / benchmark / worker / main.go
1 /*
2  *
3  * Copyright 2016 gRPC authors.
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  *     http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  *
17  */
18
19 package main
20
21 import (
22         "flag"
23         "fmt"
24         "io"
25         "net"
26         "net/http"
27         _ "net/http/pprof"
28         "runtime"
29         "strconv"
30         "time"
31
32         "golang.org/x/net/context"
33         "google.golang.org/grpc"
34         testpb "google.golang.org/grpc/benchmark/grpc_testing"
35         "google.golang.org/grpc/codes"
36         "google.golang.org/grpc/grpclog"
37 )
38
39 var (
40         driverPort    = flag.Int("driver_port", 10000, "port for communication with driver")
41         serverPort    = flag.Int("server_port", 0, "port for benchmark server if not specified by server config message")
42         pprofPort     = flag.Int("pprof_port", -1, "Port for pprof debug server to listen on. Pprof server doesn't start if unset")
43         blockProfRate = flag.Int("block_prof_rate", 0, "fraction of goroutine blocking events to report in blocking profile")
44 )
45
46 type byteBufCodec struct {
47 }
48
49 func (byteBufCodec) Marshal(v interface{}) ([]byte, error) {
50         b, ok := v.(*[]byte)
51         if !ok {
52                 return nil, fmt.Errorf("failed to marshal: %v is not type of *[]byte", v)
53         }
54         return *b, nil
55 }
56
57 func (byteBufCodec) Unmarshal(data []byte, v interface{}) error {
58         b, ok := v.(*[]byte)
59         if !ok {
60                 return fmt.Errorf("failed to marshal: %v is not type of *[]byte", v)
61         }
62         *b = data
63         return nil
64 }
65
66 func (byteBufCodec) String() string {
67         return "bytebuffer"
68 }
69
70 // workerServer implements WorkerService rpc handlers.
71 // It can create benchmarkServer or benchmarkClient on demand.
72 type workerServer struct {
73         stop       chan<- bool
74         serverPort int
75 }
76
77 func (s *workerServer) RunServer(stream testpb.WorkerService_RunServerServer) error {
78         var bs *benchmarkServer
79         defer func() {
80                 // Close benchmark server when stream ends.
81                 grpclog.Printf("closing benchmark server")
82                 if bs != nil {
83                         bs.closeFunc()
84                 }
85         }()
86         for {
87                 in, err := stream.Recv()
88                 if err == io.EOF {
89                         return nil
90                 }
91                 if err != nil {
92                         return err
93                 }
94
95                 var out *testpb.ServerStatus
96                 switch argtype := in.Argtype.(type) {
97                 case *testpb.ServerArgs_Setup:
98                         grpclog.Printf("server setup received:")
99                         if bs != nil {
100                                 grpclog.Printf("server setup received when server already exists, closing the existing server")
101                                 bs.closeFunc()
102                         }
103                         bs, err = startBenchmarkServer(argtype.Setup, s.serverPort)
104                         if err != nil {
105                                 return err
106                         }
107                         out = &testpb.ServerStatus{
108                                 Stats: bs.getStats(false),
109                                 Port:  int32(bs.port),
110                                 Cores: int32(bs.cores),
111                         }
112
113                 case *testpb.ServerArgs_Mark:
114                         grpclog.Printf("server mark received:")
115                         grpclog.Printf(" - %v", argtype)
116                         if bs == nil {
117                                 return grpc.Errorf(codes.InvalidArgument, "server does not exist when mark received")
118                         }
119                         out = &testpb.ServerStatus{
120                                 Stats: bs.getStats(argtype.Mark.Reset_),
121                                 Port:  int32(bs.port),
122                                 Cores: int32(bs.cores),
123                         }
124                 }
125
126                 if err := stream.Send(out); err != nil {
127                         return err
128                 }
129         }
130 }
131
132 func (s *workerServer) RunClient(stream testpb.WorkerService_RunClientServer) error {
133         var bc *benchmarkClient
134         defer func() {
135                 // Shut down benchmark client when stream ends.
136                 grpclog.Printf("shuting down benchmark client")
137                 if bc != nil {
138                         bc.shutdown()
139                 }
140         }()
141         for {
142                 in, err := stream.Recv()
143                 if err == io.EOF {
144                         return nil
145                 }
146                 if err != nil {
147                         return err
148                 }
149
150                 var out *testpb.ClientStatus
151                 switch t := in.Argtype.(type) {
152                 case *testpb.ClientArgs_Setup:
153                         grpclog.Printf("client setup received:")
154                         if bc != nil {
155                                 grpclog.Printf("client setup received when client already exists, shuting down the existing client")
156                                 bc.shutdown()
157                         }
158                         bc, err = startBenchmarkClient(t.Setup)
159                         if err != nil {
160                                 return err
161                         }
162                         out = &testpb.ClientStatus{
163                                 Stats: bc.getStats(false),
164                         }
165
166                 case *testpb.ClientArgs_Mark:
167                         grpclog.Printf("client mark received:")
168                         grpclog.Printf(" - %v", t)
169                         if bc == nil {
170                                 return grpc.Errorf(codes.InvalidArgument, "client does not exist when mark received")
171                         }
172                         out = &testpb.ClientStatus{
173                                 Stats: bc.getStats(t.Mark.Reset_),
174                         }
175                 }
176
177                 if err := stream.Send(out); err != nil {
178                         return err
179                 }
180         }
181 }
182
183 func (s *workerServer) CoreCount(ctx context.Context, in *testpb.CoreRequest) (*testpb.CoreResponse, error) {
184         grpclog.Printf("core count: %v", runtime.NumCPU())
185         return &testpb.CoreResponse{Cores: int32(runtime.NumCPU())}, nil
186 }
187
188 func (s *workerServer) QuitWorker(ctx context.Context, in *testpb.Void) (*testpb.Void, error) {
189         grpclog.Printf("quiting worker")
190         s.stop <- true
191         return &testpb.Void{}, nil
192 }
193
194 func main() {
195         grpc.EnableTracing = false
196
197         flag.Parse()
198         lis, err := net.Listen("tcp", ":"+strconv.Itoa(*driverPort))
199         if err != nil {
200                 grpclog.Fatalf("failed to listen: %v", err)
201         }
202         grpclog.Printf("worker listening at port %v", *driverPort)
203
204         s := grpc.NewServer()
205         stop := make(chan bool)
206         testpb.RegisterWorkerServiceServer(s, &workerServer{
207                 stop:       stop,
208                 serverPort: *serverPort,
209         })
210
211         go func() {
212                 <-stop
213                 // Wait for 1 second before stopping the server to make sure the return value of QuitWorker is sent to client.
214                 // TODO revise this once server graceful stop is supported in gRPC.
215                 time.Sleep(time.Second)
216                 s.Stop()
217         }()
218
219         runtime.SetBlockProfileRate(*blockProfRate)
220
221         if *pprofPort >= 0 {
222                 go func() {
223                         grpclog.Println("Starting pprof server on port " + strconv.Itoa(*pprofPort))
224                         grpclog.Println(http.ListenAndServe("localhost:"+strconv.Itoa(*pprofPort), nil))
225                 }()
226         }
227
228         s.Serve(lis)
229 }