OSDN Git Service

new repo
[bytom/vapor.git] / vendor / google.golang.org / grpc / call_test.go
1 /*
2  *
3  * Copyright 2014 gRPC authors.
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  *     http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  *
17  */
18
19 package grpc
20
21 import (
22         "fmt"
23         "io"
24         "math"
25         "net"
26         "strconv"
27         "strings"
28         "sync"
29         "testing"
30         "time"
31
32         "golang.org/x/net/context"
33         "google.golang.org/grpc/codes"
34         "google.golang.org/grpc/status"
35         "google.golang.org/grpc/test/leakcheck"
36         "google.golang.org/grpc/transport"
37 )
38
39 var (
40         expectedRequest  = "ping"
41         expectedResponse = "pong"
42         weirdError       = "format verbs: %v%s"
43         sizeLargeErr     = 1024 * 1024
44         canceled         = 0
45 )
46
47 type testCodec struct {
48 }
49
50 func (testCodec) Marshal(v interface{}) ([]byte, error) {
51         return []byte(*(v.(*string))), nil
52 }
53
54 func (testCodec) Unmarshal(data []byte, v interface{}) error {
55         *(v.(*string)) = string(data)
56         return nil
57 }
58
59 func (testCodec) String() string {
60         return "test"
61 }
62
63 type testStreamHandler struct {
64         port string
65         t    transport.ServerTransport
66 }
67
68 func (h *testStreamHandler) handleStream(t *testing.T, s *transport.Stream) {
69         p := &parser{r: s}
70         for {
71                 pf, req, err := p.recvMsg(math.MaxInt32)
72                 if err == io.EOF {
73                         break
74                 }
75                 if err != nil {
76                         return
77                 }
78                 if pf != compressionNone {
79                         t.Errorf("Received the mistaken message format %d, want %d", pf, compressionNone)
80                         return
81                 }
82                 var v string
83                 codec := testCodec{}
84                 if err := codec.Unmarshal(req, &v); err != nil {
85                         t.Errorf("Failed to unmarshal the received message: %v", err)
86                         return
87                 }
88                 if v == "weird error" {
89                         h.t.WriteStatus(s, status.New(codes.Internal, weirdError))
90                         return
91                 }
92                 if v == "canceled" {
93                         canceled++
94                         h.t.WriteStatus(s, status.New(codes.Internal, ""))
95                         return
96                 }
97                 if v == "port" {
98                         h.t.WriteStatus(s, status.New(codes.Internal, h.port))
99                         return
100                 }
101
102                 if v != expectedRequest {
103                         h.t.WriteStatus(s, status.New(codes.Internal, strings.Repeat("A", sizeLargeErr)))
104                         return
105                 }
106         }
107         // send a response back to end the stream.
108         hdr, data, err := encode(testCodec{}, &expectedResponse, nil, nil, nil)
109         if err != nil {
110                 t.Errorf("Failed to encode the response: %v", err)
111                 return
112         }
113         h.t.Write(s, hdr, data, &transport.Options{})
114         h.t.WriteStatus(s, status.New(codes.OK, ""))
115 }
116
117 type server struct {
118         lis        net.Listener
119         port       string
120         addr       string
121         startedErr chan error // sent nil or an error after server starts
122         mu         sync.Mutex
123         conns      map[transport.ServerTransport]bool
124 }
125
126 func newTestServer() *server {
127         return &server{startedErr: make(chan error, 1)}
128 }
129
130 // start starts server. Other goroutines should block on s.startedErr for further operations.
131 func (s *server) start(t *testing.T, port int, maxStreams uint32) {
132         var err error
133         if port == 0 {
134                 s.lis, err = net.Listen("tcp", "localhost:0")
135         } else {
136                 s.lis, err = net.Listen("tcp", "localhost:"+strconv.Itoa(port))
137         }
138         if err != nil {
139                 s.startedErr <- fmt.Errorf("failed to listen: %v", err)
140                 return
141         }
142         s.addr = s.lis.Addr().String()
143         _, p, err := net.SplitHostPort(s.addr)
144         if err != nil {
145                 s.startedErr <- fmt.Errorf("failed to parse listener address: %v", err)
146                 return
147         }
148         s.port = p
149         s.conns = make(map[transport.ServerTransport]bool)
150         s.startedErr <- nil
151         for {
152                 conn, err := s.lis.Accept()
153                 if err != nil {
154                         return
155                 }
156                 config := &transport.ServerConfig{
157                         MaxStreams: maxStreams,
158                 }
159                 st, err := transport.NewServerTransport("http2", conn, config)
160                 if err != nil {
161                         continue
162                 }
163                 s.mu.Lock()
164                 if s.conns == nil {
165                         s.mu.Unlock()
166                         st.Close()
167                         return
168                 }
169                 s.conns[st] = true
170                 s.mu.Unlock()
171                 h := &testStreamHandler{
172                         port: s.port,
173                         t:    st,
174                 }
175                 go st.HandleStreams(func(s *transport.Stream) {
176                         go h.handleStream(t, s)
177                 }, func(ctx context.Context, method string) context.Context {
178                         return ctx
179                 })
180         }
181 }
182
183 func (s *server) wait(t *testing.T, timeout time.Duration) {
184         select {
185         case err := <-s.startedErr:
186                 if err != nil {
187                         t.Fatal(err)
188                 }
189         case <-time.After(timeout):
190                 t.Fatalf("Timed out after %v waiting for server to be ready", timeout)
191         }
192 }
193
194 func (s *server) stop() {
195         s.lis.Close()
196         s.mu.Lock()
197         for c := range s.conns {
198                 c.Close()
199         }
200         s.conns = nil
201         s.mu.Unlock()
202 }
203
204 func setUp(t *testing.T, port int, maxStreams uint32) (*server, *ClientConn) {
205         server := newTestServer()
206         go server.start(t, port, maxStreams)
207         server.wait(t, 2*time.Second)
208         addr := "localhost:" + server.port
209         cc, err := Dial(addr, WithBlock(), WithInsecure(), WithCodec(testCodec{}))
210         if err != nil {
211                 t.Fatalf("Failed to create ClientConn: %v", err)
212         }
213         return server, cc
214 }
215
216 func TestInvoke(t *testing.T) {
217         defer leakcheck.Check(t)
218         server, cc := setUp(t, 0, math.MaxUint32)
219         var reply string
220         if err := Invoke(context.Background(), "/foo/bar", &expectedRequest, &reply, cc); err != nil || reply != expectedResponse {
221                 t.Fatalf("grpc.Invoke(_, _, _, _, _) = %v, want <nil>", err)
222         }
223         cc.Close()
224         server.stop()
225 }
226
227 func TestInvokeLargeErr(t *testing.T) {
228         defer leakcheck.Check(t)
229         server, cc := setUp(t, 0, math.MaxUint32)
230         var reply string
231         req := "hello"
232         err := Invoke(context.Background(), "/foo/bar", &req, &reply, cc)
233         if _, ok := status.FromError(err); !ok {
234                 t.Fatalf("grpc.Invoke(_, _, _, _, _) receives non rpc error.")
235         }
236         if Code(err) != codes.Internal || len(ErrorDesc(err)) != sizeLargeErr {
237                 t.Fatalf("grpc.Invoke(_, _, _, _, _) = %v, want an error of code %d and desc size %d", err, codes.Internal, sizeLargeErr)
238         }
239         cc.Close()
240         server.stop()
241 }
242
243 // TestInvokeErrorSpecialChars checks that error messages don't get mangled.
244 func TestInvokeErrorSpecialChars(t *testing.T) {
245         defer leakcheck.Check(t)
246         server, cc := setUp(t, 0, math.MaxUint32)
247         var reply string
248         req := "weird error"
249         err := Invoke(context.Background(), "/foo/bar", &req, &reply, cc)
250         if _, ok := status.FromError(err); !ok {
251                 t.Fatalf("grpc.Invoke(_, _, _, _, _) receives non rpc error.")
252         }
253         if got, want := ErrorDesc(err), weirdError; got != want {
254                 t.Fatalf("grpc.Invoke(_, _, _, _, _) error = %q, want %q", got, want)
255         }
256         cc.Close()
257         server.stop()
258 }
259
260 // TestInvokeCancel checks that an Invoke with a canceled context is not sent.
261 func TestInvokeCancel(t *testing.T) {
262         defer leakcheck.Check(t)
263         server, cc := setUp(t, 0, math.MaxUint32)
264         var reply string
265         req := "canceled"
266         for i := 0; i < 100; i++ {
267                 ctx, cancel := context.WithCancel(context.Background())
268                 cancel()
269                 Invoke(ctx, "/foo/bar", &req, &reply, cc)
270         }
271         if canceled != 0 {
272                 t.Fatalf("received %d of 100 canceled requests", canceled)
273         }
274         cc.Close()
275         server.stop()
276 }
277
278 // TestInvokeCancelClosedNonFail checks that a canceled non-failfast RPC
279 // on a closed client will terminate.
280 func TestInvokeCancelClosedNonFailFast(t *testing.T) {
281         defer leakcheck.Check(t)
282         server, cc := setUp(t, 0, math.MaxUint32)
283         var reply string
284         cc.Close()
285         req := "hello"
286         ctx, cancel := context.WithCancel(context.Background())
287         cancel()
288         if err := Invoke(ctx, "/foo/bar", &req, &reply, cc, FailFast(false)); err == nil {
289                 t.Fatalf("canceled invoke on closed connection should fail")
290         }
291         server.stop()
292 }