3 * Copyright 2016 gRPC authors.
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
9 * http://www.apache.org/licenses/LICENSE-2.0
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
33 "github.com/golang/protobuf/proto"
34 dpb "github.com/golang/protobuf/ptypes/duration"
35 "golang.org/x/net/context"
36 epb "google.golang.org/genproto/googleapis/rpc/errdetails"
37 "google.golang.org/grpc/codes"
38 "google.golang.org/grpc/metadata"
39 "google.golang.org/grpc/status"
42 func TestHandlerTransport_NewServerHandlerTransport(t *testing.T) {
43 type testCase struct {
47 modrw func(http.ResponseWriter) http.ResponseWriter
48 check func(*serverHandlerTransport, *testCase) error
57 wantErr: "gRPC requires HTTP/2",
64 Header: http.Header{},
67 wantErr: "invalid gRPC request method",
70 name: "bad content type",
75 "Content-Type": {"application/foo"},
77 RequestURI: "/service/foo.bar",
79 wantErr: "invalid gRPC request content-type",
87 "Content-Type": {"application/grpc"},
89 RequestURI: "/service/foo.bar",
91 modrw: func(w http.ResponseWriter) http.ResponseWriter {
92 // Return w without its Flush method
93 type onlyCloseNotifier interface {
97 return struct{ onlyCloseNotifier }{w.(onlyCloseNotifier)}
99 wantErr: "gRPC requires a ResponseWriter supporting http.Flusher",
102 name: "not closenotifier",
107 "Content-Type": {"application/grpc"},
109 RequestURI: "/service/foo.bar",
111 modrw: func(w http.ResponseWriter) http.ResponseWriter {
112 // Return w without its CloseNotify method
113 type onlyFlusher interface {
117 return struct{ onlyFlusher }{w.(onlyFlusher)}
119 wantErr: "gRPC requires a ResponseWriter supporting http.CloseNotifier",
127 "Content-Type": {"application/grpc"},
130 Path: "/service/foo.bar",
132 RequestURI: "/service/foo.bar",
134 check: func(t *serverHandlerTransport, tt *testCase) error {
136 return fmt.Errorf("t.req = %p; want %p", t.req, tt.req)
139 return errors.New("t.rw = nil; want non-nil")
145 name: "with timeout",
150 "Content-Type": []string{"application/grpc"},
151 "Grpc-Timeout": {"200m"},
154 Path: "/service/foo.bar",
156 RequestURI: "/service/foo.bar",
158 check: func(t *serverHandlerTransport, tt *testCase) error {
160 return errors.New("timeout not set")
162 if want := 200 * time.Millisecond; t.timeout != want {
163 return fmt.Errorf("timeout = %v; want %v", t.timeout, want)
169 name: "with bad timeout",
174 "Content-Type": []string{"application/grpc"},
175 "Grpc-Timeout": {"tomorrow"},
178 Path: "/service/foo.bar",
180 RequestURI: "/service/foo.bar",
182 wantErr: `stream error: code = Internal desc = "malformed time-out: transport: timeout unit is not recognized: \"tomorrow\""`,
185 name: "with metadata",
190 "Content-Type": []string{"application/grpc"},
191 "meta-foo": {"foo-val"},
192 "meta-bar": {"bar-val1", "bar-val2"},
193 "user-agent": {"x/y a/b"},
196 Path: "/service/foo.bar",
198 RequestURI: "/service/foo.bar",
200 check: func(ht *serverHandlerTransport, tt *testCase) error {
202 "meta-bar": {"bar-val1", "bar-val2"},
203 "user-agent": {"x/y a/b"},
204 "meta-foo": {"foo-val"},
207 if !reflect.DeepEqual(ht.headerMD, want) {
208 return fmt.Errorf("metdata = %#v; want %#v", ht.headerMD, want)
215 for _, tt := range tests {
216 rw := newTestHandlerResponseWriter()
220 got, gotErr := NewServerHandlerTransport(rw, tt.req)
221 if (gotErr != nil) != (tt.wantErr != "") || (gotErr != nil && gotErr.Error() != tt.wantErr) {
222 t.Errorf("%s: error = %v; want %q", tt.name, gotErr, tt.wantErr)
229 if err := tt.check(got.(*serverHandlerTransport), &tt); err != nil {
230 t.Errorf("%s: %v", tt.name, err)
236 type testHandlerResponseWriter struct {
237 *httptest.ResponseRecorder
238 closeNotify chan bool
241 func (w testHandlerResponseWriter) CloseNotify() <-chan bool { return w.closeNotify }
242 func (w testHandlerResponseWriter) Flush() {}
244 func newTestHandlerResponseWriter() http.ResponseWriter {
245 return testHandlerResponseWriter{
246 ResponseRecorder: httptest.NewRecorder(),
247 closeNotify: make(chan bool, 1),
251 type handleStreamTest struct {
255 rw testHandlerResponseWriter
256 ht *serverHandlerTransport
259 func newHandleStreamTest(t *testing.T) *handleStreamTest {
260 bodyr, bodyw := io.Pipe()
261 req := &http.Request{
265 "Content-Type": {"application/grpc"},
268 Path: "/service/foo.bar",
270 RequestURI: "/service/foo.bar",
273 rw := newTestHandlerResponseWriter().(testHandlerResponseWriter)
274 ht, err := NewServerHandlerTransport(rw, req)
278 return &handleStreamTest{
281 ht: ht.(*serverHandlerTransport),
286 func TestHandlerTransport_HandleStreams(t *testing.T) {
287 st := newHandleStreamTest(t)
288 handleStream := func(s *Stream) {
289 if want := "/service/foo.bar"; s.method != want {
290 t.Errorf("stream method = %q; want %q", s.method, want)
292 st.bodyw.Close() // no body
293 st.ht.WriteStatus(s, status.New(codes.OK, ""))
296 func(s *Stream) { go handleStream(s) },
297 func(ctx context.Context, method string) context.Context { return ctx },
299 wantHeader := http.Header{
301 "Content-Type": {"application/grpc"},
302 "Trailer": {"Grpc-Status", "Grpc-Message", "Grpc-Status-Details-Bin"},
303 "Grpc-Status": {"0"},
305 if !reflect.DeepEqual(st.rw.HeaderMap, wantHeader) {
306 t.Errorf("Header+Trailer Map: %#v; want %#v", st.rw.HeaderMap, wantHeader)
310 // Tests that codes.Unimplemented will close the body, per comment in handler_server.go.
311 func TestHandlerTransport_HandleStreams_Unimplemented(t *testing.T) {
312 handleStreamCloseBodyTest(t, codes.Unimplemented, "thingy is unimplemented")
315 // Tests that codes.InvalidArgument will close the body, per comment in handler_server.go.
316 func TestHandlerTransport_HandleStreams_InvalidArgument(t *testing.T) {
317 handleStreamCloseBodyTest(t, codes.InvalidArgument, "bad arg")
320 func handleStreamCloseBodyTest(t *testing.T, statusCode codes.Code, msg string) {
321 st := newHandleStreamTest(t)
323 handleStream := func(s *Stream) {
324 st.ht.WriteStatus(s, status.New(statusCode, msg))
327 func(s *Stream) { go handleStream(s) },
328 func(ctx context.Context, method string) context.Context { return ctx },
330 wantHeader := http.Header{
332 "Content-Type": {"application/grpc"},
333 "Trailer": {"Grpc-Status", "Grpc-Message", "Grpc-Status-Details-Bin"},
334 "Grpc-Status": {fmt.Sprint(uint32(statusCode))},
335 "Grpc-Message": {encodeGrpcMessage(msg)},
338 if !reflect.DeepEqual(st.rw.HeaderMap, wantHeader) {
339 t.Errorf("Header+Trailer mismatch.\n got: %#v\nwant: %#v", st.rw.HeaderMap, wantHeader)
343 func TestHandlerTransport_HandleStreams_Timeout(t *testing.T) {
344 bodyr, bodyw := io.Pipe()
345 req := &http.Request{
349 "Content-Type": {"application/grpc"},
350 "Grpc-Timeout": {"200m"},
353 Path: "/service/foo.bar",
355 RequestURI: "/service/foo.bar",
358 rw := newTestHandlerResponseWriter().(testHandlerResponseWriter)
359 ht, err := NewServerHandlerTransport(rw, req)
363 runStream := func(s *Stream) {
367 case <-time.After(5 * time.Second):
368 t.Errorf("timeout waiting for ctx.Done")
372 if err != context.DeadlineExceeded {
373 t.Errorf("ctx.Err = %v; want %v", err, context.DeadlineExceeded)
376 ht.WriteStatus(s, status.New(codes.DeadlineExceeded, "too slow"))
379 func(s *Stream) { go runStream(s) },
380 func(ctx context.Context, method string) context.Context { return ctx },
382 wantHeader := http.Header{
384 "Content-Type": {"application/grpc"},
385 "Trailer": {"Grpc-Status", "Grpc-Message", "Grpc-Status-Details-Bin"},
386 "Grpc-Status": {"4"},
387 "Grpc-Message": {encodeGrpcMessage("too slow")},
389 if !reflect.DeepEqual(rw.HeaderMap, wantHeader) {
390 t.Errorf("Header+Trailer Map mismatch.\n got: %#v\nwant: %#v", rw.HeaderMap, wantHeader)
394 func TestHandlerTransport_HandleStreams_MultiWriteStatus(t *testing.T) {
395 st := newHandleStreamTest(t)
396 handleStream := func(s *Stream) {
397 if want := "/service/foo.bar"; s.method != want {
398 t.Errorf("stream method = %q; want %q", s.method, want)
400 st.bodyw.Close() // no body
402 var wg sync.WaitGroup
404 for i := 0; i < 5; i++ {
407 st.ht.WriteStatus(s, status.New(codes.OK, ""))
413 func(s *Stream) { go handleStream(s) },
414 func(ctx context.Context, method string) context.Context { return ctx },
418 func TestHandlerTransport_HandleStreams_ErrDetails(t *testing.T) {
419 errDetails := []proto.Message{
421 RetryDelay: &dpb.Duration{Seconds: 60},
424 ResourceType: "foo bar",
425 ResourceName: "service.foo.bar",
430 statusCode := codes.ResourceExhausted
431 msg := "you are being throttled"
432 st, err := status.New(statusCode, msg).WithDetails(errDetails...)
437 stBytes, err := proto.Marshal(st.Proto())
442 hst := newHandleStreamTest(t)
443 handleStream := func(s *Stream) {
444 hst.ht.WriteStatus(s, st)
446 hst.ht.HandleStreams(
447 func(s *Stream) { go handleStream(s) },
448 func(ctx context.Context, method string) context.Context { return ctx },
450 wantHeader := http.Header{
452 "Content-Type": {"application/grpc"},
453 "Trailer": {"Grpc-Status", "Grpc-Message", "Grpc-Status-Details-Bin"},
454 "Grpc-Status": {fmt.Sprint(uint32(statusCode))},
455 "Grpc-Message": {encodeGrpcMessage(msg)},
456 "Grpc-Status-Details-Bin": {encodeBinHeader(stBytes)},
459 if !reflect.DeepEqual(hst.rw.HeaderMap, wantHeader) {
460 t.Errorf("Header+Trailer mismatch.\n got: %#v\nwant: %#v", hst.rw.HeaderMap, wantHeader)