3 * Copyright 2014 gRPC authors.
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
9 * http://www.apache.org/licenses/LICENSE-2.0
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
32 "golang.org/x/net/context"
33 "google.golang.org/grpc/codes"
34 "google.golang.org/grpc/status"
35 "google.golang.org/grpc/test/leakcheck"
36 "google.golang.org/grpc/transport"
40 expectedRequest = "ping"
41 expectedResponse = "pong"
42 weirdError = "format verbs: %v%s"
43 sizeLargeErr = 1024 * 1024
47 type testCodec struct {
50 func (testCodec) Marshal(v interface{}) ([]byte, error) {
51 return []byte(*(v.(*string))), nil
54 func (testCodec) Unmarshal(data []byte, v interface{}) error {
55 *(v.(*string)) = string(data)
59 func (testCodec) String() string {
63 type testStreamHandler struct {
65 t transport.ServerTransport
68 func (h *testStreamHandler) handleStream(t *testing.T, s *transport.Stream) {
71 pf, req, err := p.recvMsg(math.MaxInt32)
78 if pf != compressionNone {
79 t.Errorf("Received the mistaken message format %d, want %d", pf, compressionNone)
84 if err := codec.Unmarshal(req, &v); err != nil {
85 t.Errorf("Failed to unmarshal the received message: %v", err)
88 if v == "weird error" {
89 h.t.WriteStatus(s, status.New(codes.Internal, weirdError))
94 h.t.WriteStatus(s, status.New(codes.Internal, ""))
98 h.t.WriteStatus(s, status.New(codes.Internal, h.port))
102 if v != expectedRequest {
103 h.t.WriteStatus(s, status.New(codes.Internal, strings.Repeat("A", sizeLargeErr)))
107 // send a response back to end the stream.
108 hdr, data, err := encode(testCodec{}, &expectedResponse, nil, nil, nil)
110 t.Errorf("Failed to encode the response: %v", err)
113 h.t.Write(s, hdr, data, &transport.Options{})
114 h.t.WriteStatus(s, status.New(codes.OK, ""))
121 startedErr chan error // sent nil or an error after server starts
123 conns map[transport.ServerTransport]bool
126 func newTestServer() *server {
127 return &server{startedErr: make(chan error, 1)}
130 // start starts server. Other goroutines should block on s.startedErr for further operations.
131 func (s *server) start(t *testing.T, port int, maxStreams uint32) {
134 s.lis, err = net.Listen("tcp", "localhost:0")
136 s.lis, err = net.Listen("tcp", "localhost:"+strconv.Itoa(port))
139 s.startedErr <- fmt.Errorf("failed to listen: %v", err)
142 s.addr = s.lis.Addr().String()
143 _, p, err := net.SplitHostPort(s.addr)
145 s.startedErr <- fmt.Errorf("failed to parse listener address: %v", err)
149 s.conns = make(map[transport.ServerTransport]bool)
152 conn, err := s.lis.Accept()
156 config := &transport.ServerConfig{
157 MaxStreams: maxStreams,
159 st, err := transport.NewServerTransport("http2", conn, config)
171 h := &testStreamHandler{
175 go st.HandleStreams(func(s *transport.Stream) {
176 go h.handleStream(t, s)
177 }, func(ctx context.Context, method string) context.Context {
183 func (s *server) wait(t *testing.T, timeout time.Duration) {
185 case err := <-s.startedErr:
189 case <-time.After(timeout):
190 t.Fatalf("Timed out after %v waiting for server to be ready", timeout)
194 func (s *server) stop() {
197 for c := range s.conns {
204 func setUp(t *testing.T, port int, maxStreams uint32) (*server, *ClientConn) {
205 server := newTestServer()
206 go server.start(t, port, maxStreams)
207 server.wait(t, 2*time.Second)
208 addr := "localhost:" + server.port
209 cc, err := Dial(addr, WithBlock(), WithInsecure(), WithCodec(testCodec{}))
211 t.Fatalf("Failed to create ClientConn: %v", err)
216 func TestInvoke(t *testing.T) {
217 defer leakcheck.Check(t)
218 server, cc := setUp(t, 0, math.MaxUint32)
220 if err := Invoke(context.Background(), "/foo/bar", &expectedRequest, &reply, cc); err != nil || reply != expectedResponse {
221 t.Fatalf("grpc.Invoke(_, _, _, _, _) = %v, want <nil>", err)
227 func TestInvokeLargeErr(t *testing.T) {
228 defer leakcheck.Check(t)
229 server, cc := setUp(t, 0, math.MaxUint32)
232 err := Invoke(context.Background(), "/foo/bar", &req, &reply, cc)
233 if _, ok := status.FromError(err); !ok {
234 t.Fatalf("grpc.Invoke(_, _, _, _, _) receives non rpc error.")
236 if Code(err) != codes.Internal || len(ErrorDesc(err)) != sizeLargeErr {
237 t.Fatalf("grpc.Invoke(_, _, _, _, _) = %v, want an error of code %d and desc size %d", err, codes.Internal, sizeLargeErr)
243 // TestInvokeErrorSpecialChars checks that error messages don't get mangled.
244 func TestInvokeErrorSpecialChars(t *testing.T) {
245 defer leakcheck.Check(t)
246 server, cc := setUp(t, 0, math.MaxUint32)
249 err := Invoke(context.Background(), "/foo/bar", &req, &reply, cc)
250 if _, ok := status.FromError(err); !ok {
251 t.Fatalf("grpc.Invoke(_, _, _, _, _) receives non rpc error.")
253 if got, want := ErrorDesc(err), weirdError; got != want {
254 t.Fatalf("grpc.Invoke(_, _, _, _, _) error = %q, want %q", got, want)
260 // TestInvokeCancel checks that an Invoke with a canceled context is not sent.
261 func TestInvokeCancel(t *testing.T) {
262 defer leakcheck.Check(t)
263 server, cc := setUp(t, 0, math.MaxUint32)
266 for i := 0; i < 100; i++ {
267 ctx, cancel := context.WithCancel(context.Background())
269 Invoke(ctx, "/foo/bar", &req, &reply, cc)
272 t.Fatalf("received %d of 100 canceled requests", canceled)
278 // TestInvokeCancelClosedNonFail checks that a canceled non-failfast RPC
279 // on a closed client will terminate.
280 func TestInvokeCancelClosedNonFailFast(t *testing.T) {
281 defer leakcheck.Check(t)
282 server, cc := setUp(t, 0, math.MaxUint32)
286 ctx, cancel := context.WithCancel(context.Background())
288 if err := Invoke(ctx, "/foo/bar", &req, &reply, cc, FailFast(false)); err == nil {
289 t.Fatalf("canceled invoke on closed connection should fail")