3 * Copyright 2016 gRPC authors.
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
9 * http://www.apache.org/licenses/LICENSE-2.0
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
30 "github.com/golang/protobuf/proto"
31 "golang.org/x/net/context"
32 "google.golang.org/grpc"
33 "google.golang.org/grpc/metadata"
34 "google.golang.org/grpc/stats"
35 testpb "google.golang.org/grpc/stats/grpc_testing"
39 grpc.EnableTracing = false
42 type connCtxKey struct{}
43 type rpcCtxKey struct{}
47 testMetadata = metadata.MD{
48 "key1": []string{"value1"},
49 "key2": []string{"value2"},
52 testTrailerMetadata = metadata.MD{
53 "tkey1": []string{"trailerValue1"},
54 "tkey2": []string{"trailerValue2"},
56 // The id for which the service handler should return error.
60 type testServer struct{}
62 func (s *testServer) UnaryCall(ctx context.Context, in *testpb.SimpleRequest) (*testpb.SimpleResponse, error) {
63 md, ok := metadata.FromIncomingContext(ctx)
65 if err := grpc.SendHeader(ctx, md); err != nil {
66 return nil, grpc.Errorf(grpc.Code(err), "grpc.SendHeader(_, %v) = %v, want <nil>", md, err)
68 if err := grpc.SetTrailer(ctx, testTrailerMetadata); err != nil {
69 return nil, grpc.Errorf(grpc.Code(err), "grpc.SetTrailer(_, %v) = %v, want <nil>", testTrailerMetadata, err)
74 return nil, fmt.Errorf("got error id: %v", in.Id)
77 return &testpb.SimpleResponse{Id: in.Id}, nil
80 func (s *testServer) FullDuplexCall(stream testpb.TestService_FullDuplexCallServer) error {
81 md, ok := metadata.FromIncomingContext(stream.Context())
83 if err := stream.SendHeader(md); err != nil {
84 return grpc.Errorf(grpc.Code(err), "%v.SendHeader(%v) = %v, want %v", stream, md, err, nil)
86 stream.SetTrailer(testTrailerMetadata)
89 in, err := stream.Recv()
99 return fmt.Errorf("got error id: %v", in.Id)
102 if err := stream.Send(&testpb.SimpleResponse{Id: in.Id}); err != nil {
108 func (s *testServer) ClientStreamCall(stream testpb.TestService_ClientStreamCallServer) error {
109 md, ok := metadata.FromIncomingContext(stream.Context())
111 if err := stream.SendHeader(md); err != nil {
112 return grpc.Errorf(grpc.Code(err), "%v.SendHeader(%v) = %v, want %v", stream, md, err, nil)
114 stream.SetTrailer(testTrailerMetadata)
117 in, err := stream.Recv()
120 return stream.SendAndClose(&testpb.SimpleResponse{Id: int32(0)})
126 if in.Id == errorID {
127 return fmt.Errorf("got error id: %v", in.Id)
132 func (s *testServer) ServerStreamCall(in *testpb.SimpleRequest, stream testpb.TestService_ServerStreamCallServer) error {
133 md, ok := metadata.FromIncomingContext(stream.Context())
135 if err := stream.SendHeader(md); err != nil {
136 return grpc.Errorf(grpc.Code(err), "%v.SendHeader(%v) = %v, want %v", stream, md, err, nil)
138 stream.SetTrailer(testTrailerMetadata)
141 if in.Id == errorID {
142 return fmt.Errorf("got error id: %v", in.Id)
145 for i := 0; i < 5; i++ {
146 if err := stream.Send(&testpb.SimpleResponse{Id: in.Id}); err != nil {
153 // test is an end-to-end test. It should be created with the newTest
154 // func, modified as needed, and then started with its startServer method.
155 // It should be cleaned up with the tearDown method.
159 clientStatsHandler stats.Handler
160 serverStatsHandler stats.Handler
162 testServer testpb.TestServiceServer // nil means none
163 // srv and srvAddr are set once startServer is called.
167 cc *grpc.ClientConn // nil until requested via clientConn
170 func (te *test) tearDown() {
178 type testConfig struct {
182 // newTest returns a new test using the provided testing.T and
183 // environment. It is returned with default values. Tests should
184 // modify it before calling its startServer and clientConn methods.
185 func newTest(t *testing.T, tc *testConfig, ch stats.Handler, sh stats.Handler) *test {
188 compress: tc.compress,
189 clientStatsHandler: ch,
190 serverStatsHandler: sh,
195 // startServer starts a gRPC server listening. Callers should defer a
196 // call to te.tearDown to clean up.
197 func (te *test) startServer(ts testpb.TestServiceServer) {
199 lis, err := net.Listen("tcp", "localhost:0")
201 te.t.Fatalf("Failed to listen: %v", err)
203 var opts []grpc.ServerOption
204 if te.compress == "gzip" {
206 grpc.RPCCompressor(grpc.NewGZIPCompressor()),
207 grpc.RPCDecompressor(grpc.NewGZIPDecompressor()),
210 if te.serverStatsHandler != nil {
211 opts = append(opts, grpc.StatsHandler(te.serverStatsHandler))
213 s := grpc.NewServer(opts...)
215 if te.testServer != nil {
216 testpb.RegisterTestServiceServer(s, te.testServer)
220 te.srvAddr = lis.Addr().String()
223 func (te *test) clientConn() *grpc.ClientConn {
227 opts := []grpc.DialOption{grpc.WithInsecure(), grpc.WithBlock()}
228 if te.compress == "gzip" {
230 grpc.WithCompressor(grpc.NewGZIPCompressor()),
231 grpc.WithDecompressor(grpc.NewGZIPDecompressor()),
234 if te.clientStatsHandler != nil {
235 opts = append(opts, grpc.WithStatsHandler(te.clientStatsHandler))
239 te.cc, err = grpc.Dial(te.srvAddr, opts...)
241 te.t.Fatalf("Dial(%q) = %v", te.srvAddr, err)
249 unaryRPC rpcType = iota
255 type rpcConfig struct {
256 count int // Number of requests and responses for streaming RPCs.
257 success bool // Whether the RPC should succeed or return error.
259 callType rpcType // Type of RPC.
260 noLastRecv bool // Whether to call recv for io.EOF. When true, last recv won't be called. Only valid for streaming RPCs.
263 func (te *test) doUnaryCall(c *rpcConfig) (*testpb.SimpleRequest, *testpb.SimpleResponse, error) {
265 resp *testpb.SimpleResponse
266 req *testpb.SimpleRequest
269 tc := testpb.NewTestServiceClient(te.clientConn())
271 req = &testpb.SimpleRequest{Id: errorID + 1}
273 req = &testpb.SimpleRequest{Id: errorID}
275 ctx := metadata.NewOutgoingContext(context.Background(), testMetadata)
277 resp, err = tc.UnaryCall(ctx, req, grpc.FailFast(c.failfast))
278 return req, resp, err
281 func (te *test) doFullDuplexCallRoundtrip(c *rpcConfig) ([]*testpb.SimpleRequest, []*testpb.SimpleResponse, error) {
283 reqs []*testpb.SimpleRequest
284 resps []*testpb.SimpleResponse
287 tc := testpb.NewTestServiceClient(te.clientConn())
288 stream, err := tc.FullDuplexCall(metadata.NewOutgoingContext(context.Background(), testMetadata), grpc.FailFast(c.failfast))
290 return reqs, resps, err
296 for i := 0; i < c.count; i++ {
297 req := &testpb.SimpleRequest{
298 Id: int32(i) + startID,
300 reqs = append(reqs, req)
301 if err = stream.Send(req); err != nil {
302 return reqs, resps, err
304 var resp *testpb.SimpleResponse
305 if resp, err = stream.Recv(); err != nil {
306 return reqs, resps, err
308 resps = append(resps, resp)
310 if err = stream.CloseSend(); err != nil && err != io.EOF {
311 return reqs, resps, err
314 if _, err = stream.Recv(); err != io.EOF {
315 return reqs, resps, err
318 // In the case of not calling the last recv, sleep to avoid
319 // returning too fast to miss the remaining stats (InTrailer and End).
320 time.Sleep(time.Second)
323 return reqs, resps, nil
326 func (te *test) doClientStreamCall(c *rpcConfig) ([]*testpb.SimpleRequest, *testpb.SimpleResponse, error) {
328 reqs []*testpb.SimpleRequest
329 resp *testpb.SimpleResponse
332 tc := testpb.NewTestServiceClient(te.clientConn())
333 stream, err := tc.ClientStreamCall(metadata.NewOutgoingContext(context.Background(), testMetadata), grpc.FailFast(c.failfast))
335 return reqs, resp, err
341 for i := 0; i < c.count; i++ {
342 req := &testpb.SimpleRequest{
343 Id: int32(i) + startID,
345 reqs = append(reqs, req)
346 if err = stream.Send(req); err != nil {
347 return reqs, resp, err
350 resp, err = stream.CloseAndRecv()
351 return reqs, resp, err
354 func (te *test) doServerStreamCall(c *rpcConfig) (*testpb.SimpleRequest, []*testpb.SimpleResponse, error) {
356 req *testpb.SimpleRequest
357 resps []*testpb.SimpleResponse
361 tc := testpb.NewTestServiceClient(te.clientConn())
367 req = &testpb.SimpleRequest{Id: startID}
368 stream, err := tc.ServerStreamCall(metadata.NewOutgoingContext(context.Background(), testMetadata), req, grpc.FailFast(c.failfast))
370 return req, resps, err
373 var resp *testpb.SimpleResponse
374 resp, err := stream.Recv()
376 return req, resps, nil
377 } else if err != nil {
378 return req, resps, err
380 resps = append(resps, resp)
384 type expectedData struct {
389 requests []*testpb.SimpleRequest
391 responses []*testpb.SimpleResponse
396 type gotData struct {
399 s interface{} // This could be RPCStats or ConnStats.
415 func checkBegin(t *testing.T, d *gotData, e *expectedData) {
420 if st, ok = d.s.(*stats.Begin); !ok {
421 t.Fatalf("got %T, want Begin", d.s)
424 t.Fatalf("d.ctx = nil, want <non-nil>")
426 if st.BeginTime.IsZero() {
427 t.Fatalf("st.BeginTime = %v, want <non-zero>", st.BeginTime)
430 if st.FailFast != e.failfast {
431 t.Fatalf("st.FailFast = %v, want %v", st.FailFast, e.failfast)
436 func checkInHeader(t *testing.T, d *gotData, e *expectedData) {
441 if st, ok = d.s.(*stats.InHeader); !ok {
442 t.Fatalf("got %T, want InHeader", d.s)
445 t.Fatalf("d.ctx = nil, want <non-nil>")
448 if st.FullMethod != e.method {
449 t.Fatalf("st.FullMethod = %s, want %v", st.FullMethod, e.method)
451 if st.LocalAddr.String() != e.serverAddr {
452 t.Fatalf("st.LocalAddr = %v, want %v", st.LocalAddr, e.serverAddr)
454 if st.Compression != e.compression {
455 t.Fatalf("st.Compression = %v, want %v", st.Compression, e.compression)
458 if connInfo, ok := d.ctx.Value(connCtxKey{}).(*stats.ConnTagInfo); ok {
459 if connInfo.RemoteAddr != st.RemoteAddr {
460 t.Fatalf("connInfo.RemoteAddr = %v, want %v", connInfo.RemoteAddr, st.RemoteAddr)
462 if connInfo.LocalAddr != st.LocalAddr {
463 t.Fatalf("connInfo.LocalAddr = %v, want %v", connInfo.LocalAddr, st.LocalAddr)
466 t.Fatalf("got context %v, want one with connCtxKey", d.ctx)
468 if rpcInfo, ok := d.ctx.Value(rpcCtxKey{}).(*stats.RPCTagInfo); ok {
469 if rpcInfo.FullMethodName != st.FullMethod {
470 t.Fatalf("rpcInfo.FullMethod = %s, want %v", rpcInfo.FullMethodName, st.FullMethod)
473 t.Fatalf("got context %v, want one with rpcCtxKey", d.ctx)
478 func checkInPayload(t *testing.T, d *gotData, e *expectedData) {
483 if st, ok = d.s.(*stats.InPayload); !ok {
484 t.Fatalf("got %T, want InPayload", d.s)
487 t.Fatalf("d.ctx = nil, want <non-nil>")
490 b, err := proto.Marshal(e.responses[e.respIdx])
492 t.Fatalf("failed to marshal message: %v", err)
494 if reflect.TypeOf(st.Payload) != reflect.TypeOf(e.responses[e.respIdx]) {
495 t.Fatalf("st.Payload = %T, want %T", st.Payload, e.responses[e.respIdx])
498 if string(st.Data) != string(b) {
499 t.Fatalf("st.Data = %v, want %v", st.Data, b)
501 if st.Length != len(b) {
502 t.Fatalf("st.Lenght = %v, want %v", st.Length, len(b))
505 b, err := proto.Marshal(e.requests[e.reqIdx])
507 t.Fatalf("failed to marshal message: %v", err)
509 if reflect.TypeOf(st.Payload) != reflect.TypeOf(e.requests[e.reqIdx]) {
510 t.Fatalf("st.Payload = %T, want %T", st.Payload, e.requests[e.reqIdx])
513 if string(st.Data) != string(b) {
514 t.Fatalf("st.Data = %v, want %v", st.Data, b)
516 if st.Length != len(b) {
517 t.Fatalf("st.Lenght = %v, want %v", st.Length, len(b))
520 // TODO check WireLength and ReceivedTime.
521 if st.RecvTime.IsZero() {
522 t.Fatalf("st.ReceivedTime = %v, want <non-zero>", st.RecvTime)
526 func checkInTrailer(t *testing.T, d *gotData, e *expectedData) {
530 if _, ok = d.s.(*stats.InTrailer); !ok {
531 t.Fatalf("got %T, want InTrailer", d.s)
534 t.Fatalf("d.ctx = nil, want <non-nil>")
538 func checkOutHeader(t *testing.T, d *gotData, e *expectedData) {
543 if st, ok = d.s.(*stats.OutHeader); !ok {
544 t.Fatalf("got %T, want OutHeader", d.s)
547 t.Fatalf("d.ctx = nil, want <non-nil>")
550 if st.FullMethod != e.method {
551 t.Fatalf("st.FullMethod = %s, want %v", st.FullMethod, e.method)
553 if st.RemoteAddr.String() != e.serverAddr {
554 t.Fatalf("st.RemoteAddr = %v, want %v", st.RemoteAddr, e.serverAddr)
556 if st.Compression != e.compression {
557 t.Fatalf("st.Compression = %v, want %v", st.Compression, e.compression)
560 if rpcInfo, ok := d.ctx.Value(rpcCtxKey{}).(*stats.RPCTagInfo); ok {
561 if rpcInfo.FullMethodName != st.FullMethod {
562 t.Fatalf("rpcInfo.FullMethod = %s, want %v", rpcInfo.FullMethodName, st.FullMethod)
565 t.Fatalf("got context %v, want one with rpcCtxKey", d.ctx)
570 func checkOutPayload(t *testing.T, d *gotData, e *expectedData) {
575 if st, ok = d.s.(*stats.OutPayload); !ok {
576 t.Fatalf("got %T, want OutPayload", d.s)
579 t.Fatalf("d.ctx = nil, want <non-nil>")
582 b, err := proto.Marshal(e.requests[e.reqIdx])
584 t.Fatalf("failed to marshal message: %v", err)
586 if reflect.TypeOf(st.Payload) != reflect.TypeOf(e.requests[e.reqIdx]) {
587 t.Fatalf("st.Payload = %T, want %T", st.Payload, e.requests[e.reqIdx])
590 if string(st.Data) != string(b) {
591 t.Fatalf("st.Data = %v, want %v", st.Data, b)
593 if st.Length != len(b) {
594 t.Fatalf("st.Lenght = %v, want %v", st.Length, len(b))
597 b, err := proto.Marshal(e.responses[e.respIdx])
599 t.Fatalf("failed to marshal message: %v", err)
601 if reflect.TypeOf(st.Payload) != reflect.TypeOf(e.responses[e.respIdx]) {
602 t.Fatalf("st.Payload = %T, want %T", st.Payload, e.responses[e.respIdx])
605 if string(st.Data) != string(b) {
606 t.Fatalf("st.Data = %v, want %v", st.Data, b)
608 if st.Length != len(b) {
609 t.Fatalf("st.Lenght = %v, want %v", st.Length, len(b))
612 // TODO check WireLength and ReceivedTime.
613 if st.SentTime.IsZero() {
614 t.Fatalf("st.SentTime = %v, want <non-zero>", st.SentTime)
618 func checkOutTrailer(t *testing.T, d *gotData, e *expectedData) {
623 if st, ok = d.s.(*stats.OutTrailer); !ok {
624 t.Fatalf("got %T, want OutTrailer", d.s)
627 t.Fatalf("d.ctx = nil, want <non-nil>")
630 t.Fatalf("st IsClient = true, want false")
634 func checkEnd(t *testing.T, d *gotData, e *expectedData) {
639 if st, ok = d.s.(*stats.End); !ok {
640 t.Fatalf("got %T, want End", d.s)
643 t.Fatalf("d.ctx = nil, want <non-nil>")
645 if st.EndTime.IsZero() {
646 t.Fatalf("st.EndTime = %v, want <non-zero>", st.EndTime)
648 if grpc.Code(st.Error) != grpc.Code(e.err) || grpc.ErrorDesc(st.Error) != grpc.ErrorDesc(e.err) {
649 t.Fatalf("st.Error = %v, want %v", st.Error, e.err)
653 func checkConnBegin(t *testing.T, d *gotData, e *expectedData) {
658 if st, ok = d.s.(*stats.ConnBegin); !ok {
659 t.Fatalf("got %T, want ConnBegin", d.s)
662 t.Fatalf("d.ctx = nil, want <non-nil>")
664 st.IsClient() // TODO remove this.
667 func checkConnEnd(t *testing.T, d *gotData, e *expectedData) {
672 if st, ok = d.s.(*stats.ConnEnd); !ok {
673 t.Fatalf("got %T, want ConnEnd", d.s)
676 t.Fatalf("d.ctx = nil, want <non-nil>")
678 st.IsClient() // TODO remove this.
681 type statshandler struct {
687 func (h *statshandler) TagConn(ctx context.Context, info *stats.ConnTagInfo) context.Context {
688 return context.WithValue(ctx, connCtxKey{}, info)
691 func (h *statshandler) TagRPC(ctx context.Context, info *stats.RPCTagInfo) context.Context {
692 return context.WithValue(ctx, rpcCtxKey{}, info)
695 func (h *statshandler) HandleConn(ctx context.Context, s stats.ConnStats) {
698 h.gotConn = append(h.gotConn, &gotData{ctx, s.IsClient(), s})
701 func (h *statshandler) HandleRPC(ctx context.Context, s stats.RPCStats) {
704 h.gotRPC = append(h.gotRPC, &gotData{ctx, s.IsClient(), s})
707 func checkConnStats(t *testing.T, got []*gotData) {
708 if len(got) <= 0 || len(got)%2 != 0 {
709 for i, g := range got {
710 t.Errorf(" - %v, %T = %+v, ctx: %v", i, g.s, g.s, g.ctx)
712 t.Fatalf("got %v stats, want even positive number", len(got))
714 // The first conn stats must be a ConnBegin.
715 checkConnBegin(t, got[0], nil)
716 // The last conn stats must be a ConnEnd.
717 checkConnEnd(t, got[len(got)-1], nil)
720 func checkServerStats(t *testing.T, got []*gotData, expect *expectedData, checkFuncs []func(t *testing.T, d *gotData, e *expectedData)) {
721 if len(got) != len(checkFuncs) {
722 for i, g := range got {
723 t.Errorf(" - %v, %T", i, g.s)
725 t.Fatalf("got %v stats, want %v stats", len(got), len(checkFuncs))
728 var rpcctx context.Context
729 for i := 0; i < len(got); i++ {
730 if _, ok := got[i].s.(stats.RPCStats); ok {
731 if rpcctx != nil && got[i].ctx != rpcctx {
732 t.Fatalf("got different contexts with stats %T", got[i].s)
738 for i, f := range checkFuncs {
743 func testServerStats(t *testing.T, tc *testConfig, cc *rpcConfig, checkFuncs []func(t *testing.T, d *gotData, e *expectedData)) {
745 te := newTest(t, tc, nil, h)
746 te.startServer(&testServer{})
750 reqs []*testpb.SimpleRequest
751 resps []*testpb.SimpleResponse
755 req *testpb.SimpleRequest
756 resp *testpb.SimpleResponse
762 method = "/grpc.testing.TestService/UnaryCall"
763 req, resp, e = te.doUnaryCall(cc)
764 reqs = []*testpb.SimpleRequest{req}
765 resps = []*testpb.SimpleResponse{resp}
767 case clientStreamRPC:
768 method = "/grpc.testing.TestService/ClientStreamCall"
769 reqs, resp, e = te.doClientStreamCall(cc)
770 resps = []*testpb.SimpleResponse{resp}
772 case serverStreamRPC:
773 method = "/grpc.testing.TestService/ServerStreamCall"
774 req, resps, e = te.doServerStreamCall(cc)
775 reqs = []*testpb.SimpleRequest{req}
777 case fullDuplexStreamRPC:
778 method = "/grpc.testing.TestService/FullDuplexCall"
779 reqs, resps, err = te.doFullDuplexCallRoundtrip(cc)
781 if cc.success != (err == nil) {
782 t.Fatalf("cc.success: %v, got error: %v", cc.success, err)
785 te.srv.GracefulStop() // Wait for the server to stop.
789 if len(h.gotRPC) >= len(checkFuncs) {
794 time.Sleep(10 * time.Millisecond)
799 if _, ok := h.gotConn[len(h.gotConn)-1].s.(*stats.ConnEnd); ok {
804 time.Sleep(10 * time.Millisecond)
807 expect := &expectedData{
808 serverAddr: te.srvAddr,
809 compression: tc.compress,
817 checkConnStats(t, h.gotConn)
819 checkServerStats(t, h.gotRPC, expect, checkFuncs)
822 func TestServerStatsUnaryRPC(t *testing.T) {
823 testServerStats(t, &testConfig{compress: ""}, &rpcConfig{success: true, callType: unaryRPC}, []func(t *testing.T, d *gotData, e *expectedData){
834 func TestServerStatsUnaryRPCError(t *testing.T) {
835 testServerStats(t, &testConfig{compress: ""}, &rpcConfig{success: false, callType: unaryRPC}, []func(t *testing.T, d *gotData, e *expectedData){
845 func TestServerStatsClientStreamRPC(t *testing.T) {
847 checkFuncs := []func(t *testing.T, d *gotData, e *expectedData){
852 ioPayFuncs := []func(t *testing.T, d *gotData, e *expectedData){
855 for i := 0; i < count; i++ {
856 checkFuncs = append(checkFuncs, ioPayFuncs...)
858 checkFuncs = append(checkFuncs,
863 testServerStats(t, &testConfig{compress: "gzip"}, &rpcConfig{count: count, success: true, callType: clientStreamRPC}, checkFuncs)
866 func TestServerStatsClientStreamRPCError(t *testing.T) {
868 testServerStats(t, &testConfig{compress: "gzip"}, &rpcConfig{count: count, success: false, callType: clientStreamRPC}, []func(t *testing.T, d *gotData, e *expectedData){
878 func TestServerStatsServerStreamRPC(t *testing.T) {
880 checkFuncs := []func(t *testing.T, d *gotData, e *expectedData){
886 ioPayFuncs := []func(t *testing.T, d *gotData, e *expectedData){
889 for i := 0; i < count; i++ {
890 checkFuncs = append(checkFuncs, ioPayFuncs...)
892 checkFuncs = append(checkFuncs,
896 testServerStats(t, &testConfig{compress: "gzip"}, &rpcConfig{count: count, success: true, callType: serverStreamRPC}, checkFuncs)
899 func TestServerStatsServerStreamRPCError(t *testing.T) {
901 testServerStats(t, &testConfig{compress: "gzip"}, &rpcConfig{count: count, success: false, callType: serverStreamRPC}, []func(t *testing.T, d *gotData, e *expectedData){
911 func TestServerStatsFullDuplexRPC(t *testing.T) {
913 checkFuncs := []func(t *testing.T, d *gotData, e *expectedData){
918 ioPayFuncs := []func(t *testing.T, d *gotData, e *expectedData){
922 for i := 0; i < count; i++ {
923 checkFuncs = append(checkFuncs, ioPayFuncs...)
925 checkFuncs = append(checkFuncs,
929 testServerStats(t, &testConfig{compress: "gzip"}, &rpcConfig{count: count, success: true, callType: fullDuplexStreamRPC}, checkFuncs)
932 func TestServerStatsFullDuplexRPCError(t *testing.T) {
934 testServerStats(t, &testConfig{compress: "gzip"}, &rpcConfig{count: count, success: false, callType: fullDuplexStreamRPC}, []func(t *testing.T, d *gotData, e *expectedData){
944 type checkFuncWithCount struct {
945 f func(t *testing.T, d *gotData, e *expectedData)
946 c int // expected count
949 func checkClientStats(t *testing.T, got []*gotData, expect *expectedData, checkFuncs map[int]*checkFuncWithCount) {
951 for _, v := range checkFuncs {
954 if len(got) != expectLen {
955 for i, g := range got {
956 t.Errorf(" - %v, %T", i, g.s)
958 t.Fatalf("got %v stats, want %v stats", len(got), expectLen)
961 var tagInfoInCtx *stats.RPCTagInfo
962 for i := 0; i < len(got); i++ {
963 if _, ok := got[i].s.(stats.RPCStats); ok {
964 tagInfoInCtxNew, _ := got[i].ctx.Value(rpcCtxKey{}).(*stats.RPCTagInfo)
965 if tagInfoInCtx != nil && tagInfoInCtx != tagInfoInCtxNew {
966 t.Fatalf("got context containing different tagInfo with stats %T", got[i].s)
968 tagInfoInCtx = tagInfoInCtxNew
972 for _, s := range got {
975 if checkFuncs[begin].c <= 0 {
976 t.Fatalf("unexpected stats: %T", s.s)
978 checkFuncs[begin].f(t, s, expect)
979 checkFuncs[begin].c--
980 case *stats.OutHeader:
981 if checkFuncs[outHeader].c <= 0 {
982 t.Fatalf("unexpected stats: %T", s.s)
984 checkFuncs[outHeader].f(t, s, expect)
985 checkFuncs[outHeader].c--
986 case *stats.OutPayload:
987 if checkFuncs[outPayload].c <= 0 {
988 t.Fatalf("unexpected stats: %T", s.s)
990 checkFuncs[outPayload].f(t, s, expect)
991 checkFuncs[outPayload].c--
992 case *stats.InHeader:
993 if checkFuncs[inHeader].c <= 0 {
994 t.Fatalf("unexpected stats: %T", s.s)
996 checkFuncs[inHeader].f(t, s, expect)
997 checkFuncs[inHeader].c--
998 case *stats.InPayload:
999 if checkFuncs[inPayload].c <= 0 {
1000 t.Fatalf("unexpected stats: %T", s.s)
1002 checkFuncs[inPayload].f(t, s, expect)
1003 checkFuncs[inPayload].c--
1004 case *stats.InTrailer:
1005 if checkFuncs[inTrailer].c <= 0 {
1006 t.Fatalf("unexpected stats: %T", s.s)
1008 checkFuncs[inTrailer].f(t, s, expect)
1009 checkFuncs[inTrailer].c--
1011 if checkFuncs[end].c <= 0 {
1012 t.Fatalf("unexpected stats: %T", s.s)
1014 checkFuncs[end].f(t, s, expect)
1016 case *stats.ConnBegin:
1017 if checkFuncs[connbegin].c <= 0 {
1018 t.Fatalf("unexpected stats: %T", s.s)
1020 checkFuncs[connbegin].f(t, s, expect)
1021 checkFuncs[connbegin].c--
1022 case *stats.ConnEnd:
1023 if checkFuncs[connend].c <= 0 {
1024 t.Fatalf("unexpected stats: %T", s.s)
1026 checkFuncs[connend].f(t, s, expect)
1027 checkFuncs[connend].c--
1029 t.Fatalf("unexpected stats: %T", s.s)
1034 func testClientStats(t *testing.T, tc *testConfig, cc *rpcConfig, checkFuncs map[int]*checkFuncWithCount) {
1035 h := &statshandler{}
1036 te := newTest(t, tc, h, nil)
1037 te.startServer(&testServer{})
1041 reqs []*testpb.SimpleRequest
1042 resps []*testpb.SimpleResponse
1046 req *testpb.SimpleRequest
1047 resp *testpb.SimpleResponse
1050 switch cc.callType {
1052 method = "/grpc.testing.TestService/UnaryCall"
1053 req, resp, e = te.doUnaryCall(cc)
1054 reqs = []*testpb.SimpleRequest{req}
1055 resps = []*testpb.SimpleResponse{resp}
1057 case clientStreamRPC:
1058 method = "/grpc.testing.TestService/ClientStreamCall"
1059 reqs, resp, e = te.doClientStreamCall(cc)
1060 resps = []*testpb.SimpleResponse{resp}
1062 case serverStreamRPC:
1063 method = "/grpc.testing.TestService/ServerStreamCall"
1064 req, resps, e = te.doServerStreamCall(cc)
1065 reqs = []*testpb.SimpleRequest{req}
1067 case fullDuplexStreamRPC:
1068 method = "/grpc.testing.TestService/FullDuplexCall"
1069 reqs, resps, err = te.doFullDuplexCallRoundtrip(cc)
1071 if cc.success != (err == nil) {
1072 t.Fatalf("cc.success: %v, got error: %v", cc.success, err)
1075 te.srv.GracefulStop() // Wait for the server to stop.
1078 for _, v := range checkFuncs {
1083 if len(h.gotRPC) >= lenRPCStats {
1088 time.Sleep(10 * time.Millisecond)
1093 if _, ok := h.gotConn[len(h.gotConn)-1].s.(*stats.ConnEnd); ok {
1098 time.Sleep(10 * time.Millisecond)
1101 expect := &expectedData{
1102 serverAddr: te.srvAddr,
1103 compression: tc.compress,
1107 failfast: cc.failfast,
1112 checkConnStats(t, h.gotConn)
1114 checkClientStats(t, h.gotRPC, expect, checkFuncs)
1117 func TestClientStatsUnaryRPC(t *testing.T) {
1118 testClientStats(t, &testConfig{compress: ""}, &rpcConfig{success: true, failfast: false, callType: unaryRPC}, map[int]*checkFuncWithCount{
1119 begin: {checkBegin, 1},
1120 outHeader: {checkOutHeader, 1},
1121 outPayload: {checkOutPayload, 1},
1122 inHeader: {checkInHeader, 1},
1123 inPayload: {checkInPayload, 1},
1124 inTrailer: {checkInTrailer, 1},
1129 func TestClientStatsUnaryRPCError(t *testing.T) {
1130 testClientStats(t, &testConfig{compress: ""}, &rpcConfig{success: false, failfast: false, callType: unaryRPC}, map[int]*checkFuncWithCount{
1131 begin: {checkBegin, 1},
1132 outHeader: {checkOutHeader, 1},
1133 outPayload: {checkOutPayload, 1},
1134 inHeader: {checkInHeader, 1},
1135 inTrailer: {checkInTrailer, 1},
1140 func TestClientStatsClientStreamRPC(t *testing.T) {
1142 testClientStats(t, &testConfig{compress: "gzip"}, &rpcConfig{count: count, success: true, failfast: false, callType: clientStreamRPC}, map[int]*checkFuncWithCount{
1143 begin: {checkBegin, 1},
1144 outHeader: {checkOutHeader, 1},
1145 inHeader: {checkInHeader, 1},
1146 outPayload: {checkOutPayload, count},
1147 inTrailer: {checkInTrailer, 1},
1148 inPayload: {checkInPayload, 1},
1153 func TestClientStatsClientStreamRPCError(t *testing.T) {
1155 testClientStats(t, &testConfig{compress: "gzip"}, &rpcConfig{count: count, success: false, failfast: false, callType: clientStreamRPC}, map[int]*checkFuncWithCount{
1156 begin: {checkBegin, 1},
1157 outHeader: {checkOutHeader, 1},
1158 inHeader: {checkInHeader, 1},
1159 outPayload: {checkOutPayload, 1},
1160 inTrailer: {checkInTrailer, 1},
1165 func TestClientStatsServerStreamRPC(t *testing.T) {
1167 testClientStats(t, &testConfig{compress: "gzip"}, &rpcConfig{count: count, success: true, failfast: false, callType: serverStreamRPC}, map[int]*checkFuncWithCount{
1168 begin: {checkBegin, 1},
1169 outHeader: {checkOutHeader, 1},
1170 outPayload: {checkOutPayload, 1},
1171 inHeader: {checkInHeader, 1},
1172 inPayload: {checkInPayload, count},
1173 inTrailer: {checkInTrailer, 1},
1178 func TestClientStatsServerStreamRPCError(t *testing.T) {
1180 testClientStats(t, &testConfig{compress: "gzip"}, &rpcConfig{count: count, success: false, failfast: false, callType: serverStreamRPC}, map[int]*checkFuncWithCount{
1181 begin: {checkBegin, 1},
1182 outHeader: {checkOutHeader, 1},
1183 outPayload: {checkOutPayload, 1},
1184 inHeader: {checkInHeader, 1},
1185 inTrailer: {checkInTrailer, 1},
1190 func TestClientStatsFullDuplexRPC(t *testing.T) {
1192 testClientStats(t, &testConfig{compress: "gzip"}, &rpcConfig{count: count, success: true, failfast: false, callType: fullDuplexStreamRPC}, map[int]*checkFuncWithCount{
1193 begin: {checkBegin, 1},
1194 outHeader: {checkOutHeader, 1},
1195 outPayload: {checkOutPayload, count},
1196 inHeader: {checkInHeader, 1},
1197 inPayload: {checkInPayload, count},
1198 inTrailer: {checkInTrailer, 1},
1203 func TestClientStatsFullDuplexRPCError(t *testing.T) {
1205 testClientStats(t, &testConfig{compress: "gzip"}, &rpcConfig{count: count, success: false, failfast: false, callType: fullDuplexStreamRPC}, map[int]*checkFuncWithCount{
1206 begin: {checkBegin, 1},
1207 outHeader: {checkOutHeader, 1},
1208 outPayload: {checkOutPayload, 1},
1209 inHeader: {checkInHeader, 1},
1210 inTrailer: {checkInTrailer, 1},
1215 // If the user doesn't call the last recv() on clientStream.
1216 func TestClientStatsFullDuplexRPCNotCallingLastRecv(t *testing.T) {
1218 testClientStats(t, &testConfig{compress: "gzip"}, &rpcConfig{count: count, success: true, failfast: false, callType: fullDuplexStreamRPC, noLastRecv: true}, map[int]*checkFuncWithCount{
1219 begin: {checkBegin, 1},
1220 outHeader: {checkOutHeader, 1},
1221 outPayload: {checkOutPayload, count},
1222 inHeader: {checkInHeader, 1},
1223 inPayload: {checkInPayload, count},
1224 inTrailer: {checkInTrailer, 1},
1229 func TestTags(t *testing.T) {
1230 b := []byte{5, 2, 4, 3, 1}
1231 ctx := stats.SetTags(context.Background(), b)
1232 if tg := stats.OutgoingTags(ctx); !reflect.DeepEqual(tg, b) {
1233 t.Errorf("OutgoingTags(%v) = %v; want %v", ctx, tg, b)
1235 if tg := stats.Tags(ctx); tg != nil {
1236 t.Errorf("Tags(%v) = %v; want nil", ctx, tg)
1239 ctx = stats.SetIncomingTags(context.Background(), b)
1240 if tg := stats.Tags(ctx); !reflect.DeepEqual(tg, b) {
1241 t.Errorf("Tags(%v) = %v; want %v", ctx, tg, b)
1243 if tg := stats.OutgoingTags(ctx); tg != nil {
1244 t.Errorf("OutgoingTags(%v) = %v; want nil", ctx, tg)
1248 func TestTrace(t *testing.T) {
1249 b := []byte{5, 2, 4, 3, 1}
1250 ctx := stats.SetTrace(context.Background(), b)
1251 if tr := stats.OutgoingTrace(ctx); !reflect.DeepEqual(tr, b) {
1252 t.Errorf("OutgoingTrace(%v) = %v; want %v", ctx, tr, b)
1254 if tr := stats.Trace(ctx); tr != nil {
1255 t.Errorf("Trace(%v) = %v; want nil", ctx, tr)
1258 ctx = stats.SetIncomingTrace(context.Background(), b)
1259 if tr := stats.Trace(ctx); !reflect.DeepEqual(tr, b) {
1260 t.Errorf("Trace(%v) = %v; want %v", ctx, tr, b)
1262 if tr := stats.OutgoingTrace(ctx); tr != nil {
1263 t.Errorf("OutgoingTrace(%v) = %v; want nil", ctx, tr)