3 * Copyright 2014 gRPC authors.
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
9 * http://www.apache.org/licenses/LICENSE-2.0
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
19 //go:generate protoc --go_out=plugins=grpc:. grpc_testing/test.proto
30 "github.com/golang/protobuf/proto"
31 "golang.org/x/net/context"
33 "golang.org/x/oauth2/google"
34 "google.golang.org/grpc"
35 "google.golang.org/grpc/codes"
36 "google.golang.org/grpc/grpclog"
37 testpb "google.golang.org/grpc/interop/grpc_testing"
38 "google.golang.org/grpc/metadata"
42 reqSizes = []int{27182, 8, 1828, 45904}
43 respSizes = []int{31415, 9, 2653, 58979}
45 largeRespSize = 314159
46 initialMetadataKey = "x-grpc-test-echo-initial"
47 trailingMetadataKey = "x-grpc-test-echo-trailing-bin"
50 // ClientNewPayload returns a payload of the given type and size.
51 func ClientNewPayload(t testpb.PayloadType, size int) *testpb.Payload {
53 grpclog.Fatalf("Requested a response with invalid length %d", size)
55 body := make([]byte, size)
57 case testpb.PayloadType_COMPRESSABLE:
58 case testpb.PayloadType_UNCOMPRESSABLE:
59 grpclog.Fatalf("PayloadType UNCOMPRESSABLE is not supported")
61 grpclog.Fatalf("Unsupported payload type: %d", t)
63 return &testpb.Payload{
69 // DoEmptyUnaryCall performs a unary RPC with empty request and response messages.
70 func DoEmptyUnaryCall(tc testpb.TestServiceClient, args ...grpc.CallOption) {
71 reply, err := tc.EmptyCall(context.Background(), &testpb.Empty{}, args...)
73 grpclog.Fatal("/TestService/EmptyCall RPC failed: ", err)
75 if !proto.Equal(&testpb.Empty{}, reply) {
76 grpclog.Fatalf("/TestService/EmptyCall receives %v, want %v", reply, testpb.Empty{})
80 // DoLargeUnaryCall performs a unary RPC with large payload in the request and response.
81 func DoLargeUnaryCall(tc testpb.TestServiceClient, args ...grpc.CallOption) {
82 pl := ClientNewPayload(testpb.PayloadType_COMPRESSABLE, largeReqSize)
83 req := &testpb.SimpleRequest{
84 ResponseType: testpb.PayloadType_COMPRESSABLE,
85 ResponseSize: int32(largeRespSize),
88 reply, err := tc.UnaryCall(context.Background(), req, args...)
90 grpclog.Fatal("/TestService/UnaryCall RPC failed: ", err)
92 t := reply.GetPayload().GetType()
93 s := len(reply.GetPayload().GetBody())
94 if t != testpb.PayloadType_COMPRESSABLE || s != largeRespSize {
95 grpclog.Fatalf("Got the reply with type %d len %d; want %d, %d", t, s, testpb.PayloadType_COMPRESSABLE, largeRespSize)
99 // DoClientStreaming performs a client streaming RPC.
100 func DoClientStreaming(tc testpb.TestServiceClient, args ...grpc.CallOption) {
101 stream, err := tc.StreamingInputCall(context.Background(), args...)
103 grpclog.Fatalf("%v.StreamingInputCall(_) = _, %v", tc, err)
106 for _, s := range reqSizes {
107 pl := ClientNewPayload(testpb.PayloadType_COMPRESSABLE, s)
108 req := &testpb.StreamingInputCallRequest{
111 if err := stream.Send(req); err != nil {
112 grpclog.Fatalf("%v has error %v while sending %v", stream, err, req)
116 reply, err := stream.CloseAndRecv()
118 grpclog.Fatalf("%v.CloseAndRecv() got error %v, want %v", stream, err, nil)
120 if reply.GetAggregatedPayloadSize() != int32(sum) {
121 grpclog.Fatalf("%v.CloseAndRecv().GetAggregatePayloadSize() = %v; want %v", stream, reply.GetAggregatedPayloadSize(), sum)
125 // DoServerStreaming performs a server streaming RPC.
126 func DoServerStreaming(tc testpb.TestServiceClient, args ...grpc.CallOption) {
127 respParam := make([]*testpb.ResponseParameters, len(respSizes))
128 for i, s := range respSizes {
129 respParam[i] = &testpb.ResponseParameters{
133 req := &testpb.StreamingOutputCallRequest{
134 ResponseType: testpb.PayloadType_COMPRESSABLE,
135 ResponseParameters: respParam,
137 stream, err := tc.StreamingOutputCall(context.Background(), req, args...)
139 grpclog.Fatalf("%v.StreamingOutputCall(_) = _, %v", tc, err)
145 reply, err := stream.Recv()
150 t := reply.GetPayload().GetType()
151 if t != testpb.PayloadType_COMPRESSABLE {
152 grpclog.Fatalf("Got the reply of type %d, want %d", t, testpb.PayloadType_COMPRESSABLE)
154 size := len(reply.GetPayload().GetBody())
155 if size != int(respSizes[index]) {
156 grpclog.Fatalf("Got reply body of length %d, want %d", size, respSizes[index])
161 if rpcStatus != io.EOF {
162 grpclog.Fatalf("Failed to finish the server streaming rpc: %v", rpcStatus)
164 if respCnt != len(respSizes) {
165 grpclog.Fatalf("Got %d reply, want %d", len(respSizes), respCnt)
169 // DoPingPong performs ping-pong style bi-directional streaming RPC.
170 func DoPingPong(tc testpb.TestServiceClient, args ...grpc.CallOption) {
171 stream, err := tc.FullDuplexCall(context.Background(), args...)
173 grpclog.Fatalf("%v.FullDuplexCall(_) = _, %v", tc, err)
176 for index < len(reqSizes) {
177 respParam := []*testpb.ResponseParameters{
179 Size: int32(respSizes[index]),
182 pl := ClientNewPayload(testpb.PayloadType_COMPRESSABLE, reqSizes[index])
183 req := &testpb.StreamingOutputCallRequest{
184 ResponseType: testpb.PayloadType_COMPRESSABLE,
185 ResponseParameters: respParam,
188 if err := stream.Send(req); err != nil {
189 grpclog.Fatalf("%v has error %v while sending %v", stream, err, req)
191 reply, err := stream.Recv()
193 grpclog.Fatalf("%v.Recv() = %v", stream, err)
195 t := reply.GetPayload().GetType()
196 if t != testpb.PayloadType_COMPRESSABLE {
197 grpclog.Fatalf("Got the reply of type %d, want %d", t, testpb.PayloadType_COMPRESSABLE)
199 size := len(reply.GetPayload().GetBody())
200 if size != int(respSizes[index]) {
201 grpclog.Fatalf("Got reply body of length %d, want %d", size, respSizes[index])
205 if err := stream.CloseSend(); err != nil {
206 grpclog.Fatalf("%v.CloseSend() got %v, want %v", stream, err, nil)
208 if _, err := stream.Recv(); err != io.EOF {
209 grpclog.Fatalf("%v failed to complele the ping pong test: %v", stream, err)
213 // DoEmptyStream sets up a bi-directional streaming with zero message.
214 func DoEmptyStream(tc testpb.TestServiceClient, args ...grpc.CallOption) {
215 stream, err := tc.FullDuplexCall(context.Background(), args...)
217 grpclog.Fatalf("%v.FullDuplexCall(_) = _, %v", tc, err)
219 if err := stream.CloseSend(); err != nil {
220 grpclog.Fatalf("%v.CloseSend() got %v, want %v", stream, err, nil)
222 if _, err := stream.Recv(); err != io.EOF {
223 grpclog.Fatalf("%v failed to complete the empty stream test: %v", stream, err)
227 // DoTimeoutOnSleepingServer performs an RPC on a sleep server which causes RPC timeout.
228 func DoTimeoutOnSleepingServer(tc testpb.TestServiceClient, args ...grpc.CallOption) {
229 ctx, cancel := context.WithTimeout(context.Background(), 1*time.Millisecond)
231 stream, err := tc.FullDuplexCall(ctx, args...)
233 if grpc.Code(err) == codes.DeadlineExceeded {
236 grpclog.Fatalf("%v.FullDuplexCall(_) = _, %v", tc, err)
238 pl := ClientNewPayload(testpb.PayloadType_COMPRESSABLE, 27182)
239 req := &testpb.StreamingOutputCallRequest{
240 ResponseType: testpb.PayloadType_COMPRESSABLE,
243 if err := stream.Send(req); err != nil {
244 if grpc.Code(err) != codes.DeadlineExceeded {
245 grpclog.Fatalf("%v.Send(_) = %v", stream, err)
248 if _, err := stream.Recv(); grpc.Code(err) != codes.DeadlineExceeded {
249 grpclog.Fatalf("%v.Recv() = _, %v, want error code %d", stream, err, codes.DeadlineExceeded)
253 // DoComputeEngineCreds performs a unary RPC with compute engine auth.
254 func DoComputeEngineCreds(tc testpb.TestServiceClient, serviceAccount, oauthScope string) {
255 pl := ClientNewPayload(testpb.PayloadType_COMPRESSABLE, largeReqSize)
256 req := &testpb.SimpleRequest{
257 ResponseType: testpb.PayloadType_COMPRESSABLE,
258 ResponseSize: int32(largeRespSize),
261 FillOauthScope: true,
263 reply, err := tc.UnaryCall(context.Background(), req)
265 grpclog.Fatal("/TestService/UnaryCall RPC failed: ", err)
267 user := reply.GetUsername()
268 scope := reply.GetOauthScope()
269 if user != serviceAccount {
270 grpclog.Fatalf("Got user name %q, want %q.", user, serviceAccount)
272 if !strings.Contains(oauthScope, scope) {
273 grpclog.Fatalf("Got OAuth scope %q which is NOT a substring of %q.", scope, oauthScope)
277 func getServiceAccountJSONKey(keyFile string) []byte {
278 jsonKey, err := ioutil.ReadFile(keyFile)
280 grpclog.Fatalf("Failed to read the service account key file: %v", err)
285 // DoServiceAccountCreds performs a unary RPC with service account auth.
286 func DoServiceAccountCreds(tc testpb.TestServiceClient, serviceAccountKeyFile, oauthScope string) {
287 pl := ClientNewPayload(testpb.PayloadType_COMPRESSABLE, largeReqSize)
288 req := &testpb.SimpleRequest{
289 ResponseType: testpb.PayloadType_COMPRESSABLE,
290 ResponseSize: int32(largeRespSize),
293 FillOauthScope: true,
295 reply, err := tc.UnaryCall(context.Background(), req)
297 grpclog.Fatal("/TestService/UnaryCall RPC failed: ", err)
299 jsonKey := getServiceAccountJSONKey(serviceAccountKeyFile)
300 user := reply.GetUsername()
301 scope := reply.GetOauthScope()
302 if !strings.Contains(string(jsonKey), user) {
303 grpclog.Fatalf("Got user name %q which is NOT a substring of %q.", user, jsonKey)
305 if !strings.Contains(oauthScope, scope) {
306 grpclog.Fatalf("Got OAuth scope %q which is NOT a substring of %q.", scope, oauthScope)
310 // DoJWTTokenCreds performs a unary RPC with JWT token auth.
311 func DoJWTTokenCreds(tc testpb.TestServiceClient, serviceAccountKeyFile string) {
312 pl := ClientNewPayload(testpb.PayloadType_COMPRESSABLE, largeReqSize)
313 req := &testpb.SimpleRequest{
314 ResponseType: testpb.PayloadType_COMPRESSABLE,
315 ResponseSize: int32(largeRespSize),
319 reply, err := tc.UnaryCall(context.Background(), req)
321 grpclog.Fatal("/TestService/UnaryCall RPC failed: ", err)
323 jsonKey := getServiceAccountJSONKey(serviceAccountKeyFile)
324 user := reply.GetUsername()
325 if !strings.Contains(string(jsonKey), user) {
326 grpclog.Fatalf("Got user name %q which is NOT a substring of %q.", user, jsonKey)
330 // GetToken obtains an OAUTH token from the input.
331 func GetToken(serviceAccountKeyFile string, oauthScope string) *oauth2.Token {
332 jsonKey := getServiceAccountJSONKey(serviceAccountKeyFile)
333 config, err := google.JWTConfigFromJSON(jsonKey, oauthScope)
335 grpclog.Fatalf("Failed to get the config: %v", err)
337 token, err := config.TokenSource(context.Background()).Token()
339 grpclog.Fatalf("Failed to get the token: %v", err)
344 // DoOauth2TokenCreds performs a unary RPC with OAUTH2 token auth.
345 func DoOauth2TokenCreds(tc testpb.TestServiceClient, serviceAccountKeyFile, oauthScope string) {
346 pl := ClientNewPayload(testpb.PayloadType_COMPRESSABLE, largeReqSize)
347 req := &testpb.SimpleRequest{
348 ResponseType: testpb.PayloadType_COMPRESSABLE,
349 ResponseSize: int32(largeRespSize),
352 FillOauthScope: true,
354 reply, err := tc.UnaryCall(context.Background(), req)
356 grpclog.Fatal("/TestService/UnaryCall RPC failed: ", err)
358 jsonKey := getServiceAccountJSONKey(serviceAccountKeyFile)
359 user := reply.GetUsername()
360 scope := reply.GetOauthScope()
361 if !strings.Contains(string(jsonKey), user) {
362 grpclog.Fatalf("Got user name %q which is NOT a substring of %q.", user, jsonKey)
364 if !strings.Contains(oauthScope, scope) {
365 grpclog.Fatalf("Got OAuth scope %q which is NOT a substring of %q.", scope, oauthScope)
369 // DoPerRPCCreds performs a unary RPC with per RPC OAUTH2 token.
370 func DoPerRPCCreds(tc testpb.TestServiceClient, serviceAccountKeyFile, oauthScope string) {
371 jsonKey := getServiceAccountJSONKey(serviceAccountKeyFile)
372 pl := ClientNewPayload(testpb.PayloadType_COMPRESSABLE, largeReqSize)
373 req := &testpb.SimpleRequest{
374 ResponseType: testpb.PayloadType_COMPRESSABLE,
375 ResponseSize: int32(largeRespSize),
378 FillOauthScope: true,
380 token := GetToken(serviceAccountKeyFile, oauthScope)
381 kv := map[string]string{"authorization": token.Type() + " " + token.AccessToken}
382 ctx := metadata.NewOutgoingContext(context.Background(), metadata.MD{"authorization": []string{kv["authorization"]}})
383 reply, err := tc.UnaryCall(ctx, req)
385 grpclog.Fatal("/TestService/UnaryCall RPC failed: ", err)
387 user := reply.GetUsername()
388 scope := reply.GetOauthScope()
389 if !strings.Contains(string(jsonKey), user) {
390 grpclog.Fatalf("Got user name %q which is NOT a substring of %q.", user, jsonKey)
392 if !strings.Contains(oauthScope, scope) {
393 grpclog.Fatalf("Got OAuth scope %q which is NOT a substring of %q.", scope, oauthScope)
397 var testMetadata = metadata.MD{
398 "key1": []string{"value1"},
399 "key2": []string{"value2"},
402 // DoCancelAfterBegin cancels the RPC after metadata has been sent but before payloads are sent.
403 func DoCancelAfterBegin(tc testpb.TestServiceClient, args ...grpc.CallOption) {
404 ctx, cancel := context.WithCancel(metadata.NewOutgoingContext(context.Background(), testMetadata))
405 stream, err := tc.StreamingInputCall(ctx, args...)
407 grpclog.Fatalf("%v.StreamingInputCall(_) = _, %v", tc, err)
410 _, err = stream.CloseAndRecv()
411 if grpc.Code(err) != codes.Canceled {
412 grpclog.Fatalf("%v.CloseAndRecv() got error code %d, want %d", stream, grpc.Code(err), codes.Canceled)
416 // DoCancelAfterFirstResponse cancels the RPC after receiving the first message from the server.
417 func DoCancelAfterFirstResponse(tc testpb.TestServiceClient, args ...grpc.CallOption) {
418 ctx, cancel := context.WithCancel(context.Background())
419 stream, err := tc.FullDuplexCall(ctx, args...)
421 grpclog.Fatalf("%v.FullDuplexCall(_) = _, %v", tc, err)
423 respParam := []*testpb.ResponseParameters{
428 pl := ClientNewPayload(testpb.PayloadType_COMPRESSABLE, 27182)
429 req := &testpb.StreamingOutputCallRequest{
430 ResponseType: testpb.PayloadType_COMPRESSABLE,
431 ResponseParameters: respParam,
434 if err := stream.Send(req); err != nil {
435 grpclog.Fatalf("%v has error %v while sending %v", stream, err, req)
437 if _, err := stream.Recv(); err != nil {
438 grpclog.Fatalf("%v.Recv() = %v", stream, err)
441 if _, err := stream.Recv(); grpc.Code(err) != codes.Canceled {
442 grpclog.Fatalf("%v compleled with error code %d, want %d", stream, grpc.Code(err), codes.Canceled)
447 initialMetadataValue = "test_initial_metadata_value"
448 trailingMetadataValue = "\x0a\x0b\x0a\x0b\x0a\x0b"
449 customMetadata = metadata.Pairs(
450 initialMetadataKey, initialMetadataValue,
451 trailingMetadataKey, trailingMetadataValue,
455 func validateMetadata(header, trailer metadata.MD) {
456 if len(header[initialMetadataKey]) != 1 {
457 grpclog.Fatalf("Expected exactly one header from server. Received %d", len(header[initialMetadataKey]))
459 if header[initialMetadataKey][0] != initialMetadataValue {
460 grpclog.Fatalf("Got header %s; want %s", header[initialMetadataKey][0], initialMetadataValue)
462 if len(trailer[trailingMetadataKey]) != 1 {
463 grpclog.Fatalf("Expected exactly one trailer from server. Received %d", len(trailer[trailingMetadataKey]))
465 if trailer[trailingMetadataKey][0] != trailingMetadataValue {
466 grpclog.Fatalf("Got trailer %s; want %s", trailer[trailingMetadataKey][0], trailingMetadataValue)
470 // DoCustomMetadata checks that metadata is echoed back to the client.
471 func DoCustomMetadata(tc testpb.TestServiceClient, args ...grpc.CallOption) {
472 // Testing with UnaryCall.
473 pl := ClientNewPayload(testpb.PayloadType_COMPRESSABLE, 1)
474 req := &testpb.SimpleRequest{
475 ResponseType: testpb.PayloadType_COMPRESSABLE,
476 ResponseSize: int32(1),
479 ctx := metadata.NewOutgoingContext(context.Background(), customMetadata)
480 var header, trailer metadata.MD
481 args = append(args, grpc.Header(&header), grpc.Trailer(&trailer))
482 reply, err := tc.UnaryCall(
488 grpclog.Fatal("/TestService/UnaryCall RPC failed: ", err)
490 t := reply.GetPayload().GetType()
491 s := len(reply.GetPayload().GetBody())
492 if t != testpb.PayloadType_COMPRESSABLE || s != 1 {
493 grpclog.Fatalf("Got the reply with type %d len %d; want %d, %d", t, s, testpb.PayloadType_COMPRESSABLE, 1)
495 validateMetadata(header, trailer)
497 // Testing with FullDuplex.
498 stream, err := tc.FullDuplexCall(ctx, args...)
500 grpclog.Fatalf("%v.FullDuplexCall(_) = _, %v, want <nil>", tc, err)
502 respParam := []*testpb.ResponseParameters{
507 streamReq := &testpb.StreamingOutputCallRequest{
508 ResponseType: testpb.PayloadType_COMPRESSABLE,
509 ResponseParameters: respParam,
512 if err := stream.Send(streamReq); err != nil {
513 grpclog.Fatalf("%v has error %v while sending %v", stream, err, streamReq)
515 streamHeader, err := stream.Header()
517 grpclog.Fatalf("%v.Header() = %v", stream, err)
519 if _, err := stream.Recv(); err != nil {
520 grpclog.Fatalf("%v.Recv() = %v", stream, err)
522 if err := stream.CloseSend(); err != nil {
523 grpclog.Fatalf("%v.CloseSend() = %v, want <nil>", stream, err)
525 if _, err := stream.Recv(); err != io.EOF {
526 grpclog.Fatalf("%v failed to complete the custom metadata test: %v", stream, err)
528 streamTrailer := stream.Trailer()
529 validateMetadata(streamHeader, streamTrailer)
532 // DoStatusCodeAndMessage checks that the status code is propagated back to the client.
533 func DoStatusCodeAndMessage(tc testpb.TestServiceClient, args ...grpc.CallOption) {
535 msg := "test status message"
536 expectedErr := grpc.Errorf(codes.Code(code), msg)
537 respStatus := &testpb.EchoStatus{
542 req := &testpb.SimpleRequest{
543 ResponseStatus: respStatus,
545 if _, err := tc.UnaryCall(context.Background(), req, args...); err == nil || err.Error() != expectedErr.Error() {
546 grpclog.Fatalf("%v.UnaryCall(_, %v) = _, %v, want _, %v", tc, req, err, expectedErr)
548 // Test FullDuplexCall.
549 stream, err := tc.FullDuplexCall(context.Background(), args...)
551 grpclog.Fatalf("%v.FullDuplexCall(_) = _, %v, want <nil>", tc, err)
553 streamReq := &testpb.StreamingOutputCallRequest{
554 ResponseStatus: respStatus,
556 if err := stream.Send(streamReq); err != nil {
557 grpclog.Fatalf("%v has error %v while sending %v, want <nil>", stream, err, streamReq)
559 if err := stream.CloseSend(); err != nil {
560 grpclog.Fatalf("%v.CloseSend() = %v, want <nil>", stream, err)
562 if _, err = stream.Recv(); err.Error() != expectedErr.Error() {
563 grpclog.Fatalf("%v.Recv() returned error %v, want %v", stream, err, expectedErr)
567 // DoUnimplementedService attempts to call a method from an unimplemented service.
568 func DoUnimplementedService(tc testpb.UnimplementedServiceClient) {
569 _, err := tc.UnimplementedCall(context.Background(), &testpb.Empty{})
570 if grpc.Code(err) != codes.Unimplemented {
571 grpclog.Fatalf("%v.UnimplementedCall() = _, %v, want _, %v", tc, grpc.Code(err), codes.Unimplemented)
575 // DoUnimplementedMethod attempts to call an unimplemented method.
576 func DoUnimplementedMethod(cc *grpc.ClientConn) {
577 var req, reply proto.Message
578 if err := grpc.Invoke(context.Background(), "/grpc.testing.TestService/UnimplementedCall", req, reply, cc); err == nil || grpc.Code(err) != codes.Unimplemented {
579 grpclog.Fatalf("grpc.Invoke(_, _, _, _, _) = %v, want error code %s", err, codes.Unimplemented)
583 type testServer struct {
586 // NewTestServer creates a test server for test service.
587 func NewTestServer() testpb.TestServiceServer {
591 func (s *testServer) EmptyCall(ctx context.Context, in *testpb.Empty) (*testpb.Empty, error) {
592 return new(testpb.Empty), nil
595 func serverNewPayload(t testpb.PayloadType, size int32) (*testpb.Payload, error) {
597 return nil, fmt.Errorf("requested a response with invalid length %d", size)
599 body := make([]byte, size)
601 case testpb.PayloadType_COMPRESSABLE:
602 case testpb.PayloadType_UNCOMPRESSABLE:
603 return nil, fmt.Errorf("payloadType UNCOMPRESSABLE is not supported")
605 return nil, fmt.Errorf("unsupported payload type: %d", t)
607 return &testpb.Payload{
613 func (s *testServer) UnaryCall(ctx context.Context, in *testpb.SimpleRequest) (*testpb.SimpleResponse, error) {
614 status := in.GetResponseStatus()
615 if md, ok := metadata.FromIncomingContext(ctx); ok {
616 if initialMetadata, ok := md[initialMetadataKey]; ok {
617 header := metadata.Pairs(initialMetadataKey, initialMetadata[0])
618 grpc.SendHeader(ctx, header)
620 if trailingMetadata, ok := md[trailingMetadataKey]; ok {
621 trailer := metadata.Pairs(trailingMetadataKey, trailingMetadata[0])
622 grpc.SetTrailer(ctx, trailer)
625 if status != nil && status.Code != 0 {
626 return nil, grpc.Errorf(codes.Code(status.Code), status.Message)
628 pl, err := serverNewPayload(in.GetResponseType(), in.GetResponseSize())
632 return &testpb.SimpleResponse{
637 func (s *testServer) StreamingOutputCall(args *testpb.StreamingOutputCallRequest, stream testpb.TestService_StreamingOutputCallServer) error {
638 cs := args.GetResponseParameters()
639 for _, c := range cs {
640 if us := c.GetIntervalUs(); us > 0 {
641 time.Sleep(time.Duration(us) * time.Microsecond)
643 pl, err := serverNewPayload(args.GetResponseType(), c.GetSize())
647 if err := stream.Send(&testpb.StreamingOutputCallResponse{
656 func (s *testServer) StreamingInputCall(stream testpb.TestService_StreamingInputCallServer) error {
659 in, err := stream.Recv()
661 return stream.SendAndClose(&testpb.StreamingInputCallResponse{
662 AggregatedPayloadSize: int32(sum),
668 p := in.GetPayload().GetBody()
673 func (s *testServer) FullDuplexCall(stream testpb.TestService_FullDuplexCallServer) error {
674 if md, ok := metadata.FromIncomingContext(stream.Context()); ok {
675 if initialMetadata, ok := md[initialMetadataKey]; ok {
676 header := metadata.Pairs(initialMetadataKey, initialMetadata[0])
677 stream.SendHeader(header)
679 if trailingMetadata, ok := md[trailingMetadataKey]; ok {
680 trailer := metadata.Pairs(trailingMetadataKey, trailingMetadata[0])
681 stream.SetTrailer(trailer)
685 in, err := stream.Recv()
693 status := in.GetResponseStatus()
694 if status != nil && status.Code != 0 {
695 return grpc.Errorf(codes.Code(status.Code), status.Message)
697 cs := in.GetResponseParameters()
698 for _, c := range cs {
699 if us := c.GetIntervalUs(); us > 0 {
700 time.Sleep(time.Duration(us) * time.Microsecond)
702 pl, err := serverNewPayload(in.GetResponseType(), c.GetSize())
706 if err := stream.Send(&testpb.StreamingOutputCallResponse{
715 func (s *testServer) HalfDuplexCall(stream testpb.TestService_HalfDuplexCallServer) error {
716 var msgBuf []*testpb.StreamingOutputCallRequest
718 in, err := stream.Recv()
726 msgBuf = append(msgBuf, in)
728 for _, m := range msgBuf {
729 cs := m.GetResponseParameters()
730 for _, c := range cs {
731 if us := c.GetIntervalUs(); us > 0 {
732 time.Sleep(time.Duration(us) * time.Microsecond)
734 pl, err := serverNewPayload(m.GetResponseType(), c.GetSize())
738 if err := stream.Send(&testpb.StreamingOutputCallResponse{