3 * Copyright 2014 gRPC authors.
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
9 * http://www.apache.org/licenses/LICENSE-2.0
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
28 "github.com/golang/protobuf/proto"
29 "google.golang.org/grpc/codes"
30 "google.golang.org/grpc/status"
31 perfpb "google.golang.org/grpc/test/codec_perf"
32 "google.golang.org/grpc/transport"
35 type fullReader struct {
39 func (f fullReader) Read(p []byte) (int, error) {
40 return io.ReadFull(f.reader, p)
43 var _ CallOption = EmptyCallOption{} // ensure EmptyCallOption implements the interface
45 func TestSimpleParsing(t *testing.T) {
46 bigMsg := bytes.Repeat([]byte{'x'}, 1<<24)
47 for _, test := range []struct {
55 {nil, io.EOF, nil, compressionNone},
56 {[]byte{0, 0, 0, 0, 0}, nil, nil, compressionNone},
57 {[]byte{0, 0, 0, 0, 1, 'a'}, nil, []byte{'a'}, compressionNone},
58 {[]byte{1, 0}, io.ErrUnexpectedEOF, nil, compressionNone},
59 {[]byte{0, 0, 0, 0, 10, 'a'}, io.ErrUnexpectedEOF, nil, compressionNone},
60 // Check that messages with length >= 2^24 are parsed.
61 {append([]byte{0, 1, 0, 0, 0}, bigMsg...), nil, bigMsg, compressionNone},
63 buf := fullReader{bytes.NewReader(test.p)}
64 parser := &parser{r: buf}
65 pt, b, err := parser.recvMsg(math.MaxInt32)
66 if err != test.err || !bytes.Equal(b, test.b) || pt != test.pt {
67 t.Fatalf("parser{%v}.recvMsg(_) = %v, %v, %v\nwant %v, %v, %v", test.p, pt, b, err, test.pt, test.b, test.err)
72 func TestMultipleParsing(t *testing.T) {
73 // Set a byte stream consists of 3 messages with their headers.
74 p := []byte{0, 0, 0, 0, 1, 'a', 0, 0, 0, 0, 2, 'b', 'c', 0, 0, 0, 0, 1, 'd'}
75 b := fullReader{bytes.NewReader(p)}
76 parser := &parser{r: b}
78 wantRecvs := []struct {
82 {compressionNone, []byte("a")},
83 {compressionNone, []byte("bc")},
84 {compressionNone, []byte("d")},
86 for i, want := range wantRecvs {
87 pt, data, err := parser.recvMsg(math.MaxInt32)
88 if err != nil || pt != want.pt || !reflect.DeepEqual(data, want.data) {
89 t.Fatalf("after %d calls, parser{%v}.recvMsg(_) = %v, %v, %v\nwant %v, %v, <nil>",
90 i, p, pt, data, err, want.pt, want.data)
94 pt, data, err := parser.recvMsg(math.MaxInt32)
96 t.Fatalf("after %d recvMsgs calls, parser{%v}.recvMsg(_) = %v, %v, %v\nwant _, _, %v",
97 len(wantRecvs), p, pt, data, err, io.EOF)
101 func TestEncode(t *testing.T) {
102 for _, test := range []struct {
111 {nil, nil, []byte{0, 0, 0, 0, 0}, []byte{}, nil},
113 hdr, data, err := encode(protoCodec{}, test.msg, nil, nil, nil)
114 if err != test.err || !bytes.Equal(hdr, test.hdr) || !bytes.Equal(data, test.data) {
115 t.Fatalf("encode(_, _, %v, _) = %v, %v, %v\nwant %v, %v, %v", test.cp, hdr, data, err, test.hdr, test.data, test.err)
120 func TestCompress(t *testing.T) {
121 for _, test := range []struct {
129 {make([]byte, 1024), NewGZIPCompressor(), NewGZIPDecompressor(), nil},
131 b := new(bytes.Buffer)
132 if err := test.cp.Do(b, test.data); err != test.err {
133 t.Fatalf("Compressor.Do(_, %v) = %v, want %v", test.data, err, test.err)
135 if b.Len() >= len(test.data) {
136 t.Fatalf("The compressor fails to compress data.")
138 if p, err := test.dc.Do(b); err != nil || !bytes.Equal(test.data, p) {
139 t.Fatalf("Decompressor.Do(%v) = %v, %v, want %v, <nil>", b, p, err, test.data)
144 func TestToRPCErr(t *testing.T) {
145 for _, test := range []struct {
151 {transport.StreamError{Code: codes.Unknown, Desc: ""}, status.Error(codes.Unknown, "")},
152 {transport.ErrConnClosing, status.Error(codes.Unavailable, transport.ErrConnClosing.Desc)},
154 err := toRPCErr(test.errIn)
155 if _, ok := status.FromError(err); !ok {
156 t.Fatalf("toRPCErr{%v} returned type %T, want %T", test.errIn, err, status.Error(codes.Unknown, ""))
158 if !reflect.DeepEqual(err, test.errOut) {
159 t.Fatalf("toRPCErr{%v} = %v \nwant %v", test.errIn, err, test.errOut)
164 // bmEncode benchmarks encoding a Protocol Buffer message containing mSize
166 func bmEncode(b *testing.B, mSize int) {
167 msg := &perfpb.Buffer{Body: make([]byte, mSize)}
168 encodeHdr, encodeData, _ := encode(protoCodec{}, msg, nil, nil, nil)
169 encodedSz := int64(len(encodeHdr) + len(encodeData))
172 for i := 0; i < b.N; i++ {
173 encode(protoCodec{}, msg, nil, nil, nil)
175 b.SetBytes(encodedSz)
178 func BenchmarkEncode1B(b *testing.B) {
182 func BenchmarkEncode1KiB(b *testing.B) {
186 func BenchmarkEncode8KiB(b *testing.B) {
190 func BenchmarkEncode64KiB(b *testing.B) {
194 func BenchmarkEncode512KiB(b *testing.B) {
195 bmEncode(b, 512*1024)
198 func BenchmarkEncode1MiB(b *testing.B) {
199 bmEncode(b, 1024*1024)
202 // bmCompressor benchmarks a compressor of a Protocol Buffer message containing
204 func bmCompressor(b *testing.B, mSize int, cp Compressor) {
205 payload := make([]byte, mSize)
206 cBuf := bytes.NewBuffer(make([]byte, mSize))
209 for i := 0; i < b.N; i++ {
215 func BenchmarkGZIPCompressor1B(b *testing.B) {
216 bmCompressor(b, 1, NewGZIPCompressor())
219 func BenchmarkGZIPCompressor1KiB(b *testing.B) {
220 bmCompressor(b, 1024, NewGZIPCompressor())
223 func BenchmarkGZIPCompressor8KiB(b *testing.B) {
224 bmCompressor(b, 8*1024, NewGZIPCompressor())
227 func BenchmarkGZIPCompressor64KiB(b *testing.B) {
228 bmCompressor(b, 64*1024, NewGZIPCompressor())
231 func BenchmarkGZIPCompressor512KiB(b *testing.B) {
232 bmCompressor(b, 512*1024, NewGZIPCompressor())
235 func BenchmarkGZIPCompressor1MiB(b *testing.B) {
236 bmCompressor(b, 1024*1024, NewGZIPCompressor())