OSDN Git Service

new repo
[bytom/vapor.git] / vendor / google.golang.org / grpc / interop / test_utils.go
1 /*
2  *
3  * Copyright 2014 gRPC authors.
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  *     http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  *
17  */
18
19 //go:generate protoc --go_out=plugins=grpc:. grpc_testing/test.proto
20
21 package interop
22
23 import (
24         "fmt"
25         "io"
26         "io/ioutil"
27         "strings"
28         "time"
29
30         "github.com/golang/protobuf/proto"
31         "golang.org/x/net/context"
32         "golang.org/x/oauth2"
33         "golang.org/x/oauth2/google"
34         "google.golang.org/grpc"
35         "google.golang.org/grpc/codes"
36         "google.golang.org/grpc/grpclog"
37         testpb "google.golang.org/grpc/interop/grpc_testing"
38         "google.golang.org/grpc/metadata"
39 )
40
41 var (
42         reqSizes            = []int{27182, 8, 1828, 45904}
43         respSizes           = []int{31415, 9, 2653, 58979}
44         largeReqSize        = 271828
45         largeRespSize       = 314159
46         initialMetadataKey  = "x-grpc-test-echo-initial"
47         trailingMetadataKey = "x-grpc-test-echo-trailing-bin"
48 )
49
50 // ClientNewPayload returns a payload of the given type and size.
51 func ClientNewPayload(t testpb.PayloadType, size int) *testpb.Payload {
52         if size < 0 {
53                 grpclog.Fatalf("Requested a response with invalid length %d", size)
54         }
55         body := make([]byte, size)
56         switch t {
57         case testpb.PayloadType_COMPRESSABLE:
58         case testpb.PayloadType_UNCOMPRESSABLE:
59                 grpclog.Fatalf("PayloadType UNCOMPRESSABLE is not supported")
60         default:
61                 grpclog.Fatalf("Unsupported payload type: %d", t)
62         }
63         return &testpb.Payload{
64                 Type: t,
65                 Body: body,
66         }
67 }
68
69 // DoEmptyUnaryCall performs a unary RPC with empty request and response messages.
70 func DoEmptyUnaryCall(tc testpb.TestServiceClient, args ...grpc.CallOption) {
71         reply, err := tc.EmptyCall(context.Background(), &testpb.Empty{}, args...)
72         if err != nil {
73                 grpclog.Fatal("/TestService/EmptyCall RPC failed: ", err)
74         }
75         if !proto.Equal(&testpb.Empty{}, reply) {
76                 grpclog.Fatalf("/TestService/EmptyCall receives %v, want %v", reply, testpb.Empty{})
77         }
78 }
79
80 // DoLargeUnaryCall performs a unary RPC with large payload in the request and response.
81 func DoLargeUnaryCall(tc testpb.TestServiceClient, args ...grpc.CallOption) {
82         pl := ClientNewPayload(testpb.PayloadType_COMPRESSABLE, largeReqSize)
83         req := &testpb.SimpleRequest{
84                 ResponseType: testpb.PayloadType_COMPRESSABLE,
85                 ResponseSize: int32(largeRespSize),
86                 Payload:      pl,
87         }
88         reply, err := tc.UnaryCall(context.Background(), req, args...)
89         if err != nil {
90                 grpclog.Fatal("/TestService/UnaryCall RPC failed: ", err)
91         }
92         t := reply.GetPayload().GetType()
93         s := len(reply.GetPayload().GetBody())
94         if t != testpb.PayloadType_COMPRESSABLE || s != largeRespSize {
95                 grpclog.Fatalf("Got the reply with type %d len %d; want %d, %d", t, s, testpb.PayloadType_COMPRESSABLE, largeRespSize)
96         }
97 }
98
99 // DoClientStreaming performs a client streaming RPC.
100 func DoClientStreaming(tc testpb.TestServiceClient, args ...grpc.CallOption) {
101         stream, err := tc.StreamingInputCall(context.Background(), args...)
102         if err != nil {
103                 grpclog.Fatalf("%v.StreamingInputCall(_) = _, %v", tc, err)
104         }
105         var sum int
106         for _, s := range reqSizes {
107                 pl := ClientNewPayload(testpb.PayloadType_COMPRESSABLE, s)
108                 req := &testpb.StreamingInputCallRequest{
109                         Payload: pl,
110                 }
111                 if err := stream.Send(req); err != nil {
112                         grpclog.Fatalf("%v has error %v while sending %v", stream, err, req)
113                 }
114                 sum += s
115         }
116         reply, err := stream.CloseAndRecv()
117         if err != nil {
118                 grpclog.Fatalf("%v.CloseAndRecv() got error %v, want %v", stream, err, nil)
119         }
120         if reply.GetAggregatedPayloadSize() != int32(sum) {
121                 grpclog.Fatalf("%v.CloseAndRecv().GetAggregatePayloadSize() = %v; want %v", stream, reply.GetAggregatedPayloadSize(), sum)
122         }
123 }
124
125 // DoServerStreaming performs a server streaming RPC.
126 func DoServerStreaming(tc testpb.TestServiceClient, args ...grpc.CallOption) {
127         respParam := make([]*testpb.ResponseParameters, len(respSizes))
128         for i, s := range respSizes {
129                 respParam[i] = &testpb.ResponseParameters{
130                         Size: int32(s),
131                 }
132         }
133         req := &testpb.StreamingOutputCallRequest{
134                 ResponseType:       testpb.PayloadType_COMPRESSABLE,
135                 ResponseParameters: respParam,
136         }
137         stream, err := tc.StreamingOutputCall(context.Background(), req, args...)
138         if err != nil {
139                 grpclog.Fatalf("%v.StreamingOutputCall(_) = _, %v", tc, err)
140         }
141         var rpcStatus error
142         var respCnt int
143         var index int
144         for {
145                 reply, err := stream.Recv()
146                 if err != nil {
147                         rpcStatus = err
148                         break
149                 }
150                 t := reply.GetPayload().GetType()
151                 if t != testpb.PayloadType_COMPRESSABLE {
152                         grpclog.Fatalf("Got the reply of type %d, want %d", t, testpb.PayloadType_COMPRESSABLE)
153                 }
154                 size := len(reply.GetPayload().GetBody())
155                 if size != int(respSizes[index]) {
156                         grpclog.Fatalf("Got reply body of length %d, want %d", size, respSizes[index])
157                 }
158                 index++
159                 respCnt++
160         }
161         if rpcStatus != io.EOF {
162                 grpclog.Fatalf("Failed to finish the server streaming rpc: %v", rpcStatus)
163         }
164         if respCnt != len(respSizes) {
165                 grpclog.Fatalf("Got %d reply, want %d", len(respSizes), respCnt)
166         }
167 }
168
169 // DoPingPong performs ping-pong style bi-directional streaming RPC.
170 func DoPingPong(tc testpb.TestServiceClient, args ...grpc.CallOption) {
171         stream, err := tc.FullDuplexCall(context.Background(), args...)
172         if err != nil {
173                 grpclog.Fatalf("%v.FullDuplexCall(_) = _, %v", tc, err)
174         }
175         var index int
176         for index < len(reqSizes) {
177                 respParam := []*testpb.ResponseParameters{
178                         {
179                                 Size: int32(respSizes[index]),
180                         },
181                 }
182                 pl := ClientNewPayload(testpb.PayloadType_COMPRESSABLE, reqSizes[index])
183                 req := &testpb.StreamingOutputCallRequest{
184                         ResponseType:       testpb.PayloadType_COMPRESSABLE,
185                         ResponseParameters: respParam,
186                         Payload:            pl,
187                 }
188                 if err := stream.Send(req); err != nil {
189                         grpclog.Fatalf("%v has error %v while sending %v", stream, err, req)
190                 }
191                 reply, err := stream.Recv()
192                 if err != nil {
193                         grpclog.Fatalf("%v.Recv() = %v", stream, err)
194                 }
195                 t := reply.GetPayload().GetType()
196                 if t != testpb.PayloadType_COMPRESSABLE {
197                         grpclog.Fatalf("Got the reply of type %d, want %d", t, testpb.PayloadType_COMPRESSABLE)
198                 }
199                 size := len(reply.GetPayload().GetBody())
200                 if size != int(respSizes[index]) {
201                         grpclog.Fatalf("Got reply body of length %d, want %d", size, respSizes[index])
202                 }
203                 index++
204         }
205         if err := stream.CloseSend(); err != nil {
206                 grpclog.Fatalf("%v.CloseSend() got %v, want %v", stream, err, nil)
207         }
208         if _, err := stream.Recv(); err != io.EOF {
209                 grpclog.Fatalf("%v failed to complele the ping pong test: %v", stream, err)
210         }
211 }
212
213 // DoEmptyStream sets up a bi-directional streaming with zero message.
214 func DoEmptyStream(tc testpb.TestServiceClient, args ...grpc.CallOption) {
215         stream, err := tc.FullDuplexCall(context.Background(), args...)
216         if err != nil {
217                 grpclog.Fatalf("%v.FullDuplexCall(_) = _, %v", tc, err)
218         }
219         if err := stream.CloseSend(); err != nil {
220                 grpclog.Fatalf("%v.CloseSend() got %v, want %v", stream, err, nil)
221         }
222         if _, err := stream.Recv(); err != io.EOF {
223                 grpclog.Fatalf("%v failed to complete the empty stream test: %v", stream, err)
224         }
225 }
226
227 // DoTimeoutOnSleepingServer performs an RPC on a sleep server which causes RPC timeout.
228 func DoTimeoutOnSleepingServer(tc testpb.TestServiceClient, args ...grpc.CallOption) {
229         ctx, cancel := context.WithTimeout(context.Background(), 1*time.Millisecond)
230         defer cancel()
231         stream, err := tc.FullDuplexCall(ctx, args...)
232         if err != nil {
233                 if grpc.Code(err) == codes.DeadlineExceeded {
234                         return
235                 }
236                 grpclog.Fatalf("%v.FullDuplexCall(_) = _, %v", tc, err)
237         }
238         pl := ClientNewPayload(testpb.PayloadType_COMPRESSABLE, 27182)
239         req := &testpb.StreamingOutputCallRequest{
240                 ResponseType: testpb.PayloadType_COMPRESSABLE,
241                 Payload:      pl,
242         }
243         if err := stream.Send(req); err != nil {
244                 if grpc.Code(err) != codes.DeadlineExceeded {
245                         grpclog.Fatalf("%v.Send(_) = %v", stream, err)
246                 }
247         }
248         if _, err := stream.Recv(); grpc.Code(err) != codes.DeadlineExceeded {
249                 grpclog.Fatalf("%v.Recv() = _, %v, want error code %d", stream, err, codes.DeadlineExceeded)
250         }
251 }
252
253 // DoComputeEngineCreds performs a unary RPC with compute engine auth.
254 func DoComputeEngineCreds(tc testpb.TestServiceClient, serviceAccount, oauthScope string) {
255         pl := ClientNewPayload(testpb.PayloadType_COMPRESSABLE, largeReqSize)
256         req := &testpb.SimpleRequest{
257                 ResponseType:   testpb.PayloadType_COMPRESSABLE,
258                 ResponseSize:   int32(largeRespSize),
259                 Payload:        pl,
260                 FillUsername:   true,
261                 FillOauthScope: true,
262         }
263         reply, err := tc.UnaryCall(context.Background(), req)
264         if err != nil {
265                 grpclog.Fatal("/TestService/UnaryCall RPC failed: ", err)
266         }
267         user := reply.GetUsername()
268         scope := reply.GetOauthScope()
269         if user != serviceAccount {
270                 grpclog.Fatalf("Got user name %q, want %q.", user, serviceAccount)
271         }
272         if !strings.Contains(oauthScope, scope) {
273                 grpclog.Fatalf("Got OAuth scope %q which is NOT a substring of %q.", scope, oauthScope)
274         }
275 }
276
277 func getServiceAccountJSONKey(keyFile string) []byte {
278         jsonKey, err := ioutil.ReadFile(keyFile)
279         if err != nil {
280                 grpclog.Fatalf("Failed to read the service account key file: %v", err)
281         }
282         return jsonKey
283 }
284
285 // DoServiceAccountCreds performs a unary RPC with service account auth.
286 func DoServiceAccountCreds(tc testpb.TestServiceClient, serviceAccountKeyFile, oauthScope string) {
287         pl := ClientNewPayload(testpb.PayloadType_COMPRESSABLE, largeReqSize)
288         req := &testpb.SimpleRequest{
289                 ResponseType:   testpb.PayloadType_COMPRESSABLE,
290                 ResponseSize:   int32(largeRespSize),
291                 Payload:        pl,
292                 FillUsername:   true,
293                 FillOauthScope: true,
294         }
295         reply, err := tc.UnaryCall(context.Background(), req)
296         if err != nil {
297                 grpclog.Fatal("/TestService/UnaryCall RPC failed: ", err)
298         }
299         jsonKey := getServiceAccountJSONKey(serviceAccountKeyFile)
300         user := reply.GetUsername()
301         scope := reply.GetOauthScope()
302         if !strings.Contains(string(jsonKey), user) {
303                 grpclog.Fatalf("Got user name %q which is NOT a substring of %q.", user, jsonKey)
304         }
305         if !strings.Contains(oauthScope, scope) {
306                 grpclog.Fatalf("Got OAuth scope %q which is NOT a substring of %q.", scope, oauthScope)
307         }
308 }
309
310 // DoJWTTokenCreds performs a unary RPC with JWT token auth.
311 func DoJWTTokenCreds(tc testpb.TestServiceClient, serviceAccountKeyFile string) {
312         pl := ClientNewPayload(testpb.PayloadType_COMPRESSABLE, largeReqSize)
313         req := &testpb.SimpleRequest{
314                 ResponseType: testpb.PayloadType_COMPRESSABLE,
315                 ResponseSize: int32(largeRespSize),
316                 Payload:      pl,
317                 FillUsername: true,
318         }
319         reply, err := tc.UnaryCall(context.Background(), req)
320         if err != nil {
321                 grpclog.Fatal("/TestService/UnaryCall RPC failed: ", err)
322         }
323         jsonKey := getServiceAccountJSONKey(serviceAccountKeyFile)
324         user := reply.GetUsername()
325         if !strings.Contains(string(jsonKey), user) {
326                 grpclog.Fatalf("Got user name %q which is NOT a substring of %q.", user, jsonKey)
327         }
328 }
329
330 // GetToken obtains an OAUTH token from the input.
331 func GetToken(serviceAccountKeyFile string, oauthScope string) *oauth2.Token {
332         jsonKey := getServiceAccountJSONKey(serviceAccountKeyFile)
333         config, err := google.JWTConfigFromJSON(jsonKey, oauthScope)
334         if err != nil {
335                 grpclog.Fatalf("Failed to get the config: %v", err)
336         }
337         token, err := config.TokenSource(context.Background()).Token()
338         if err != nil {
339                 grpclog.Fatalf("Failed to get the token: %v", err)
340         }
341         return token
342 }
343
344 // DoOauth2TokenCreds performs a unary RPC with OAUTH2 token auth.
345 func DoOauth2TokenCreds(tc testpb.TestServiceClient, serviceAccountKeyFile, oauthScope string) {
346         pl := ClientNewPayload(testpb.PayloadType_COMPRESSABLE, largeReqSize)
347         req := &testpb.SimpleRequest{
348                 ResponseType:   testpb.PayloadType_COMPRESSABLE,
349                 ResponseSize:   int32(largeRespSize),
350                 Payload:        pl,
351                 FillUsername:   true,
352                 FillOauthScope: true,
353         }
354         reply, err := tc.UnaryCall(context.Background(), req)
355         if err != nil {
356                 grpclog.Fatal("/TestService/UnaryCall RPC failed: ", err)
357         }
358         jsonKey := getServiceAccountJSONKey(serviceAccountKeyFile)
359         user := reply.GetUsername()
360         scope := reply.GetOauthScope()
361         if !strings.Contains(string(jsonKey), user) {
362                 grpclog.Fatalf("Got user name %q which is NOT a substring of %q.", user, jsonKey)
363         }
364         if !strings.Contains(oauthScope, scope) {
365                 grpclog.Fatalf("Got OAuth scope %q which is NOT a substring of %q.", scope, oauthScope)
366         }
367 }
368
369 // DoPerRPCCreds performs a unary RPC with per RPC OAUTH2 token.
370 func DoPerRPCCreds(tc testpb.TestServiceClient, serviceAccountKeyFile, oauthScope string) {
371         jsonKey := getServiceAccountJSONKey(serviceAccountKeyFile)
372         pl := ClientNewPayload(testpb.PayloadType_COMPRESSABLE, largeReqSize)
373         req := &testpb.SimpleRequest{
374                 ResponseType:   testpb.PayloadType_COMPRESSABLE,
375                 ResponseSize:   int32(largeRespSize),
376                 Payload:        pl,
377                 FillUsername:   true,
378                 FillOauthScope: true,
379         }
380         token := GetToken(serviceAccountKeyFile, oauthScope)
381         kv := map[string]string{"authorization": token.Type() + " " + token.AccessToken}
382         ctx := metadata.NewOutgoingContext(context.Background(), metadata.MD{"authorization": []string{kv["authorization"]}})
383         reply, err := tc.UnaryCall(ctx, req)
384         if err != nil {
385                 grpclog.Fatal("/TestService/UnaryCall RPC failed: ", err)
386         }
387         user := reply.GetUsername()
388         scope := reply.GetOauthScope()
389         if !strings.Contains(string(jsonKey), user) {
390                 grpclog.Fatalf("Got user name %q which is NOT a substring of %q.", user, jsonKey)
391         }
392         if !strings.Contains(oauthScope, scope) {
393                 grpclog.Fatalf("Got OAuth scope %q which is NOT a substring of %q.", scope, oauthScope)
394         }
395 }
396
397 var testMetadata = metadata.MD{
398         "key1": []string{"value1"},
399         "key2": []string{"value2"},
400 }
401
402 // DoCancelAfterBegin cancels the RPC after metadata has been sent but before payloads are sent.
403 func DoCancelAfterBegin(tc testpb.TestServiceClient, args ...grpc.CallOption) {
404         ctx, cancel := context.WithCancel(metadata.NewOutgoingContext(context.Background(), testMetadata))
405         stream, err := tc.StreamingInputCall(ctx, args...)
406         if err != nil {
407                 grpclog.Fatalf("%v.StreamingInputCall(_) = _, %v", tc, err)
408         }
409         cancel()
410         _, err = stream.CloseAndRecv()
411         if grpc.Code(err) != codes.Canceled {
412                 grpclog.Fatalf("%v.CloseAndRecv() got error code %d, want %d", stream, grpc.Code(err), codes.Canceled)
413         }
414 }
415
416 // DoCancelAfterFirstResponse cancels the RPC after receiving the first message from the server.
417 func DoCancelAfterFirstResponse(tc testpb.TestServiceClient, args ...grpc.CallOption) {
418         ctx, cancel := context.WithCancel(context.Background())
419         stream, err := tc.FullDuplexCall(ctx, args...)
420         if err != nil {
421                 grpclog.Fatalf("%v.FullDuplexCall(_) = _, %v", tc, err)
422         }
423         respParam := []*testpb.ResponseParameters{
424                 {
425                         Size: 31415,
426                 },
427         }
428         pl := ClientNewPayload(testpb.PayloadType_COMPRESSABLE, 27182)
429         req := &testpb.StreamingOutputCallRequest{
430                 ResponseType:       testpb.PayloadType_COMPRESSABLE,
431                 ResponseParameters: respParam,
432                 Payload:            pl,
433         }
434         if err := stream.Send(req); err != nil {
435                 grpclog.Fatalf("%v has error %v while sending %v", stream, err, req)
436         }
437         if _, err := stream.Recv(); err != nil {
438                 grpclog.Fatalf("%v.Recv() = %v", stream, err)
439         }
440         cancel()
441         if _, err := stream.Recv(); grpc.Code(err) != codes.Canceled {
442                 grpclog.Fatalf("%v compleled with error code %d, want %d", stream, grpc.Code(err), codes.Canceled)
443         }
444 }
445
446 var (
447         initialMetadataValue  = "test_initial_metadata_value"
448         trailingMetadataValue = "\x0a\x0b\x0a\x0b\x0a\x0b"
449         customMetadata        = metadata.Pairs(
450                 initialMetadataKey, initialMetadataValue,
451                 trailingMetadataKey, trailingMetadataValue,
452         )
453 )
454
455 func validateMetadata(header, trailer metadata.MD) {
456         if len(header[initialMetadataKey]) != 1 {
457                 grpclog.Fatalf("Expected exactly one header from server. Received %d", len(header[initialMetadataKey]))
458         }
459         if header[initialMetadataKey][0] != initialMetadataValue {
460                 grpclog.Fatalf("Got header %s; want %s", header[initialMetadataKey][0], initialMetadataValue)
461         }
462         if len(trailer[trailingMetadataKey]) != 1 {
463                 grpclog.Fatalf("Expected exactly one trailer from server. Received %d", len(trailer[trailingMetadataKey]))
464         }
465         if trailer[trailingMetadataKey][0] != trailingMetadataValue {
466                 grpclog.Fatalf("Got trailer %s; want %s", trailer[trailingMetadataKey][0], trailingMetadataValue)
467         }
468 }
469
470 // DoCustomMetadata checks that metadata is echoed back to the client.
471 func DoCustomMetadata(tc testpb.TestServiceClient, args ...grpc.CallOption) {
472         // Testing with UnaryCall.
473         pl := ClientNewPayload(testpb.PayloadType_COMPRESSABLE, 1)
474         req := &testpb.SimpleRequest{
475                 ResponseType: testpb.PayloadType_COMPRESSABLE,
476                 ResponseSize: int32(1),
477                 Payload:      pl,
478         }
479         ctx := metadata.NewOutgoingContext(context.Background(), customMetadata)
480         var header, trailer metadata.MD
481         args = append(args, grpc.Header(&header), grpc.Trailer(&trailer))
482         reply, err := tc.UnaryCall(
483                 ctx,
484                 req,
485                 args...,
486         )
487         if err != nil {
488                 grpclog.Fatal("/TestService/UnaryCall RPC failed: ", err)
489         }
490         t := reply.GetPayload().GetType()
491         s := len(reply.GetPayload().GetBody())
492         if t != testpb.PayloadType_COMPRESSABLE || s != 1 {
493                 grpclog.Fatalf("Got the reply with type %d len %d; want %d, %d", t, s, testpb.PayloadType_COMPRESSABLE, 1)
494         }
495         validateMetadata(header, trailer)
496
497         // Testing with FullDuplex.
498         stream, err := tc.FullDuplexCall(ctx, args...)
499         if err != nil {
500                 grpclog.Fatalf("%v.FullDuplexCall(_) = _, %v, want <nil>", tc, err)
501         }
502         respParam := []*testpb.ResponseParameters{
503                 {
504                         Size: 1,
505                 },
506         }
507         streamReq := &testpb.StreamingOutputCallRequest{
508                 ResponseType:       testpb.PayloadType_COMPRESSABLE,
509                 ResponseParameters: respParam,
510                 Payload:            pl,
511         }
512         if err := stream.Send(streamReq); err != nil {
513                 grpclog.Fatalf("%v has error %v while sending %v", stream, err, streamReq)
514         }
515         streamHeader, err := stream.Header()
516         if err != nil {
517                 grpclog.Fatalf("%v.Header() = %v", stream, err)
518         }
519         if _, err := stream.Recv(); err != nil {
520                 grpclog.Fatalf("%v.Recv() = %v", stream, err)
521         }
522         if err := stream.CloseSend(); err != nil {
523                 grpclog.Fatalf("%v.CloseSend() = %v, want <nil>", stream, err)
524         }
525         if _, err := stream.Recv(); err != io.EOF {
526                 grpclog.Fatalf("%v failed to complete the custom metadata test: %v", stream, err)
527         }
528         streamTrailer := stream.Trailer()
529         validateMetadata(streamHeader, streamTrailer)
530 }
531
532 // DoStatusCodeAndMessage checks that the status code is propagated back to the client.
533 func DoStatusCodeAndMessage(tc testpb.TestServiceClient, args ...grpc.CallOption) {
534         var code int32 = 2
535         msg := "test status message"
536         expectedErr := grpc.Errorf(codes.Code(code), msg)
537         respStatus := &testpb.EchoStatus{
538                 Code:    code,
539                 Message: msg,
540         }
541         // Test UnaryCall.
542         req := &testpb.SimpleRequest{
543                 ResponseStatus: respStatus,
544         }
545         if _, err := tc.UnaryCall(context.Background(), req, args...); err == nil || err.Error() != expectedErr.Error() {
546                 grpclog.Fatalf("%v.UnaryCall(_, %v) = _, %v, want _, %v", tc, req, err, expectedErr)
547         }
548         // Test FullDuplexCall.
549         stream, err := tc.FullDuplexCall(context.Background(), args...)
550         if err != nil {
551                 grpclog.Fatalf("%v.FullDuplexCall(_) = _, %v, want <nil>", tc, err)
552         }
553         streamReq := &testpb.StreamingOutputCallRequest{
554                 ResponseStatus: respStatus,
555         }
556         if err := stream.Send(streamReq); err != nil {
557                 grpclog.Fatalf("%v has error %v while sending %v, want <nil>", stream, err, streamReq)
558         }
559         if err := stream.CloseSend(); err != nil {
560                 grpclog.Fatalf("%v.CloseSend() = %v, want <nil>", stream, err)
561         }
562         if _, err = stream.Recv(); err.Error() != expectedErr.Error() {
563                 grpclog.Fatalf("%v.Recv() returned error %v, want %v", stream, err, expectedErr)
564         }
565 }
566
567 // DoUnimplementedService attempts to call a method from an unimplemented service.
568 func DoUnimplementedService(tc testpb.UnimplementedServiceClient) {
569         _, err := tc.UnimplementedCall(context.Background(), &testpb.Empty{})
570         if grpc.Code(err) != codes.Unimplemented {
571                 grpclog.Fatalf("%v.UnimplementedCall() = _, %v, want _, %v", tc, grpc.Code(err), codes.Unimplemented)
572         }
573 }
574
575 // DoUnimplementedMethod attempts to call an unimplemented method.
576 func DoUnimplementedMethod(cc *grpc.ClientConn) {
577         var req, reply proto.Message
578         if err := grpc.Invoke(context.Background(), "/grpc.testing.TestService/UnimplementedCall", req, reply, cc); err == nil || grpc.Code(err) != codes.Unimplemented {
579                 grpclog.Fatalf("grpc.Invoke(_, _, _, _, _) = %v, want error code %s", err, codes.Unimplemented)
580         }
581 }
582
583 type testServer struct {
584 }
585
586 // NewTestServer creates a test server for test service.
587 func NewTestServer() testpb.TestServiceServer {
588         return &testServer{}
589 }
590
591 func (s *testServer) EmptyCall(ctx context.Context, in *testpb.Empty) (*testpb.Empty, error) {
592         return new(testpb.Empty), nil
593 }
594
595 func serverNewPayload(t testpb.PayloadType, size int32) (*testpb.Payload, error) {
596         if size < 0 {
597                 return nil, fmt.Errorf("requested a response with invalid length %d", size)
598         }
599         body := make([]byte, size)
600         switch t {
601         case testpb.PayloadType_COMPRESSABLE:
602         case testpb.PayloadType_UNCOMPRESSABLE:
603                 return nil, fmt.Errorf("payloadType UNCOMPRESSABLE is not supported")
604         default:
605                 return nil, fmt.Errorf("unsupported payload type: %d", t)
606         }
607         return &testpb.Payload{
608                 Type: t,
609                 Body: body,
610         }, nil
611 }
612
613 func (s *testServer) UnaryCall(ctx context.Context, in *testpb.SimpleRequest) (*testpb.SimpleResponse, error) {
614         status := in.GetResponseStatus()
615         if md, ok := metadata.FromIncomingContext(ctx); ok {
616                 if initialMetadata, ok := md[initialMetadataKey]; ok {
617                         header := metadata.Pairs(initialMetadataKey, initialMetadata[0])
618                         grpc.SendHeader(ctx, header)
619                 }
620                 if trailingMetadata, ok := md[trailingMetadataKey]; ok {
621                         trailer := metadata.Pairs(trailingMetadataKey, trailingMetadata[0])
622                         grpc.SetTrailer(ctx, trailer)
623                 }
624         }
625         if status != nil && status.Code != 0 {
626                 return nil, grpc.Errorf(codes.Code(status.Code), status.Message)
627         }
628         pl, err := serverNewPayload(in.GetResponseType(), in.GetResponseSize())
629         if err != nil {
630                 return nil, err
631         }
632         return &testpb.SimpleResponse{
633                 Payload: pl,
634         }, nil
635 }
636
637 func (s *testServer) StreamingOutputCall(args *testpb.StreamingOutputCallRequest, stream testpb.TestService_StreamingOutputCallServer) error {
638         cs := args.GetResponseParameters()
639         for _, c := range cs {
640                 if us := c.GetIntervalUs(); us > 0 {
641                         time.Sleep(time.Duration(us) * time.Microsecond)
642                 }
643                 pl, err := serverNewPayload(args.GetResponseType(), c.GetSize())
644                 if err != nil {
645                         return err
646                 }
647                 if err := stream.Send(&testpb.StreamingOutputCallResponse{
648                         Payload: pl,
649                 }); err != nil {
650                         return err
651                 }
652         }
653         return nil
654 }
655
656 func (s *testServer) StreamingInputCall(stream testpb.TestService_StreamingInputCallServer) error {
657         var sum int
658         for {
659                 in, err := stream.Recv()
660                 if err == io.EOF {
661                         return stream.SendAndClose(&testpb.StreamingInputCallResponse{
662                                 AggregatedPayloadSize: int32(sum),
663                         })
664                 }
665                 if err != nil {
666                         return err
667                 }
668                 p := in.GetPayload().GetBody()
669                 sum += len(p)
670         }
671 }
672
673 func (s *testServer) FullDuplexCall(stream testpb.TestService_FullDuplexCallServer) error {
674         if md, ok := metadata.FromIncomingContext(stream.Context()); ok {
675                 if initialMetadata, ok := md[initialMetadataKey]; ok {
676                         header := metadata.Pairs(initialMetadataKey, initialMetadata[0])
677                         stream.SendHeader(header)
678                 }
679                 if trailingMetadata, ok := md[trailingMetadataKey]; ok {
680                         trailer := metadata.Pairs(trailingMetadataKey, trailingMetadata[0])
681                         stream.SetTrailer(trailer)
682                 }
683         }
684         for {
685                 in, err := stream.Recv()
686                 if err == io.EOF {
687                         // read done.
688                         return nil
689                 }
690                 if err != nil {
691                         return err
692                 }
693                 status := in.GetResponseStatus()
694                 if status != nil && status.Code != 0 {
695                         return grpc.Errorf(codes.Code(status.Code), status.Message)
696                 }
697                 cs := in.GetResponseParameters()
698                 for _, c := range cs {
699                         if us := c.GetIntervalUs(); us > 0 {
700                                 time.Sleep(time.Duration(us) * time.Microsecond)
701                         }
702                         pl, err := serverNewPayload(in.GetResponseType(), c.GetSize())
703                         if err != nil {
704                                 return err
705                         }
706                         if err := stream.Send(&testpb.StreamingOutputCallResponse{
707                                 Payload: pl,
708                         }); err != nil {
709                                 return err
710                         }
711                 }
712         }
713 }
714
715 func (s *testServer) HalfDuplexCall(stream testpb.TestService_HalfDuplexCallServer) error {
716         var msgBuf []*testpb.StreamingOutputCallRequest
717         for {
718                 in, err := stream.Recv()
719                 if err == io.EOF {
720                         // read done.
721                         break
722                 }
723                 if err != nil {
724                         return err
725                 }
726                 msgBuf = append(msgBuf, in)
727         }
728         for _, m := range msgBuf {
729                 cs := m.GetResponseParameters()
730                 for _, c := range cs {
731                         if us := c.GetIntervalUs(); us > 0 {
732                                 time.Sleep(time.Duration(us) * time.Microsecond)
733                         }
734                         pl, err := serverNewPayload(m.GetResponseType(), c.GetSize())
735                         if err != nil {
736                                 return err
737                         }
738                         if err := stream.Send(&testpb.StreamingOutputCallResponse{
739                                 Payload: pl,
740                         }); err != nil {
741                                 return err
742                         }
743                 }
744         }
745         return nil
746 }