OSDN Git Service

new repo
[bytom/vapor.git] / vendor / google.golang.org / grpc / transport / transport.go
1 /*
2  *
3  * Copyright 2014 gRPC authors.
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  *     http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  *
17  */
18
19 // Package transport defines and implements message oriented communication
20 // channel to complete various transactions (e.g., an RPC).
21 package transport // import "google.golang.org/grpc/transport"
22
23 import (
24         "fmt"
25         "io"
26         "net"
27         "sync"
28         "time"
29
30         "golang.org/x/net/context"
31         "golang.org/x/net/http2"
32         "google.golang.org/grpc/codes"
33         "google.golang.org/grpc/credentials"
34         "google.golang.org/grpc/keepalive"
35         "google.golang.org/grpc/metadata"
36         "google.golang.org/grpc/stats"
37         "google.golang.org/grpc/status"
38         "google.golang.org/grpc/tap"
39 )
40
41 // recvMsg represents the received msg from the transport. All transport
42 // protocol specific info has been removed.
43 type recvMsg struct {
44         data []byte
45         // nil: received some data
46         // io.EOF: stream is completed. data is nil.
47         // other non-nil error: transport failure. data is nil.
48         err error
49 }
50
51 // recvBuffer is an unbounded channel of recvMsg structs.
52 // Note recvBuffer differs from controlBuffer only in that recvBuffer
53 // holds a channel of only recvMsg structs instead of objects implementing "item" interface.
54 // recvBuffer is written to much more often than
55 // controlBuffer and using strict recvMsg structs helps avoid allocation in "recvBuffer.put"
56 type recvBuffer struct {
57         c       chan recvMsg
58         mu      sync.Mutex
59         backlog []recvMsg
60 }
61
62 func newRecvBuffer() *recvBuffer {
63         b := &recvBuffer{
64                 c: make(chan recvMsg, 1),
65         }
66         return b
67 }
68
69 func (b *recvBuffer) put(r recvMsg) {
70         b.mu.Lock()
71         if len(b.backlog) == 0 {
72                 select {
73                 case b.c <- r:
74                         b.mu.Unlock()
75                         return
76                 default:
77                 }
78         }
79         b.backlog = append(b.backlog, r)
80         b.mu.Unlock()
81 }
82
83 func (b *recvBuffer) load() {
84         b.mu.Lock()
85         if len(b.backlog) > 0 {
86                 select {
87                 case b.c <- b.backlog[0]:
88                         b.backlog[0] = recvMsg{}
89                         b.backlog = b.backlog[1:]
90                 default:
91                 }
92         }
93         b.mu.Unlock()
94 }
95
96 // get returns the channel that receives a recvMsg in the buffer.
97 //
98 // Upon receipt of a recvMsg, the caller should call load to send another
99 // recvMsg onto the channel if there is any.
100 func (b *recvBuffer) get() <-chan recvMsg {
101         return b.c
102 }
103
104 // recvBufferReader implements io.Reader interface to read the data from
105 // recvBuffer.
106 type recvBufferReader struct {
107         ctx    context.Context
108         goAway chan struct{}
109         recv   *recvBuffer
110         last   []byte // Stores the remaining data in the previous calls.
111         err    error
112 }
113
114 // Read reads the next len(p) bytes from last. If last is drained, it tries to
115 // read additional data from recv. It blocks if there no additional data available
116 // in recv. If Read returns any non-nil error, it will continue to return that error.
117 func (r *recvBufferReader) Read(p []byte) (n int, err error) {
118         if r.err != nil {
119                 return 0, r.err
120         }
121         n, r.err = r.read(p)
122         return n, r.err
123 }
124
125 func (r *recvBufferReader) read(p []byte) (n int, err error) {
126         if r.last != nil && len(r.last) > 0 {
127                 // Read remaining data left in last call.
128                 copied := copy(p, r.last)
129                 r.last = r.last[copied:]
130                 return copied, nil
131         }
132         select {
133         case <-r.ctx.Done():
134                 return 0, ContextErr(r.ctx.Err())
135         case <-r.goAway:
136                 return 0, ErrStreamDrain
137         case m := <-r.recv.get():
138                 r.recv.load()
139                 if m.err != nil {
140                         return 0, m.err
141                 }
142                 copied := copy(p, m.data)
143                 r.last = m.data[copied:]
144                 return copied, nil
145         }
146 }
147
148 // All items in an out of a controlBuffer should be the same type.
149 type item interface {
150         item()
151 }
152
153 // controlBuffer is an unbounded channel of item.
154 type controlBuffer struct {
155         c       chan item
156         mu      sync.Mutex
157         backlog []item
158 }
159
160 func newControlBuffer() *controlBuffer {
161         b := &controlBuffer{
162                 c: make(chan item, 1),
163         }
164         return b
165 }
166
167 func (b *controlBuffer) put(r item) {
168         b.mu.Lock()
169         if len(b.backlog) == 0 {
170                 select {
171                 case b.c <- r:
172                         b.mu.Unlock()
173                         return
174                 default:
175                 }
176         }
177         b.backlog = append(b.backlog, r)
178         b.mu.Unlock()
179 }
180
181 func (b *controlBuffer) load() {
182         b.mu.Lock()
183         if len(b.backlog) > 0 {
184                 select {
185                 case b.c <- b.backlog[0]:
186                         b.backlog[0] = nil
187                         b.backlog = b.backlog[1:]
188                 default:
189                 }
190         }
191         b.mu.Unlock()
192 }
193
194 // get returns the channel that receives an item in the buffer.
195 //
196 // Upon receipt of an item, the caller should call load to send another
197 // item onto the channel if there is any.
198 func (b *controlBuffer) get() <-chan item {
199         return b.c
200 }
201
202 type streamState uint8
203
204 const (
205         streamActive    streamState = iota
206         streamWriteDone             // EndStream sent
207         streamReadDone              // EndStream received
208         streamDone                  // the entire stream is finished.
209 )
210
211 // Stream represents an RPC in the transport layer.
212 type Stream struct {
213         id uint32
214         // nil for client side Stream.
215         st ServerTransport
216         // ctx is the associated context of the stream.
217         ctx context.Context
218         // cancel is always nil for client side Stream.
219         cancel context.CancelFunc
220         // done is closed when the final status arrives.
221         done chan struct{}
222         // goAway is closed when the server sent GoAways signal before this stream was initiated.
223         goAway chan struct{}
224         // method records the associated RPC method of the stream.
225         method       string
226         recvCompress string
227         sendCompress string
228         buf          *recvBuffer
229         trReader     io.Reader
230         fc           *inFlow
231         recvQuota    uint32
232
233         // TODO: Remote this unused variable.
234         // The accumulated inbound quota pending for window update.
235         updateQuota uint32
236
237         // Callback to state application's intentions to read data. This
238         // is used to adjust flow control, if need be.
239         requestRead func(int)
240
241         sendQuotaPool  *quotaPool
242         localSendQuota *quotaPool
243         // Close headerChan to indicate the end of reception of header metadata.
244         headerChan chan struct{}
245         // header caches the received header metadata.
246         header metadata.MD
247         // The key-value map of trailer metadata.
248         trailer metadata.MD
249
250         mu sync.RWMutex // guard the following
251         // headerOK becomes true from the first header is about to send.
252         headerOk bool
253         state    streamState
254         // true iff headerChan is closed. Used to avoid closing headerChan
255         // multiple times.
256         headerDone bool
257         // the status error received from the server.
258         status *status.Status
259         // rstStream indicates whether a RST_STREAM frame needs to be sent
260         // to the server to signify that this stream is closing.
261         rstStream bool
262         // rstError is the error that needs to be sent along with the RST_STREAM frame.
263         rstError http2.ErrCode
264         // bytesSent and bytesReceived indicates whether any bytes have been sent or
265         // received on this stream.
266         bytesSent     bool
267         bytesReceived bool
268 }
269
270 // RecvCompress returns the compression algorithm applied to the inbound
271 // message. It is empty string if there is no compression applied.
272 func (s *Stream) RecvCompress() string {
273         return s.recvCompress
274 }
275
276 // SetSendCompress sets the compression algorithm to the stream.
277 func (s *Stream) SetSendCompress(str string) {
278         s.sendCompress = str
279 }
280
281 // Done returns a chanel which is closed when it receives the final status
282 // from the server.
283 func (s *Stream) Done() <-chan struct{} {
284         return s.done
285 }
286
287 // GoAway returns a channel which is closed when the server sent GoAways signal
288 // before this stream was initiated.
289 func (s *Stream) GoAway() <-chan struct{} {
290         return s.goAway
291 }
292
293 // Header acquires the key-value pairs of header metadata once it
294 // is available. It blocks until i) the metadata is ready or ii) there is no
295 // header metadata or iii) the stream is canceled/expired.
296 func (s *Stream) Header() (metadata.MD, error) {
297         var err error
298         select {
299         case <-s.ctx.Done():
300                 err = ContextErr(s.ctx.Err())
301         case <-s.goAway:
302                 err = ErrStreamDrain
303         case <-s.headerChan:
304                 return s.header.Copy(), nil
305         }
306         // Even if the stream is closed, header is returned if available.
307         select {
308         case <-s.headerChan:
309                 return s.header.Copy(), nil
310         default:
311         }
312         return nil, err
313 }
314
315 // Trailer returns the cached trailer metedata. Note that if it is not called
316 // after the entire stream is done, it could return an empty MD. Client
317 // side only.
318 func (s *Stream) Trailer() metadata.MD {
319         s.mu.RLock()
320         c := s.trailer.Copy()
321         s.mu.RUnlock()
322         return c
323 }
324
325 // ServerTransport returns the underlying ServerTransport for the stream.
326 // The client side stream always returns nil.
327 func (s *Stream) ServerTransport() ServerTransport {
328         return s.st
329 }
330
331 // Context returns the context of the stream.
332 func (s *Stream) Context() context.Context {
333         return s.ctx
334 }
335
336 // Method returns the method for the stream.
337 func (s *Stream) Method() string {
338         return s.method
339 }
340
341 // Status returns the status received from the server.
342 func (s *Stream) Status() *status.Status {
343         return s.status
344 }
345
346 // SetHeader sets the header metadata. This can be called multiple times.
347 // Server side only.
348 func (s *Stream) SetHeader(md metadata.MD) error {
349         s.mu.Lock()
350         if s.headerOk || s.state == streamDone {
351                 s.mu.Unlock()
352                 return ErrIllegalHeaderWrite
353         }
354         if md.Len() == 0 {
355                 s.mu.Unlock()
356                 return nil
357         }
358         s.header = metadata.Join(s.header, md)
359         s.mu.Unlock()
360         return nil
361 }
362
363 // SetTrailer sets the trailer metadata which will be sent with the RPC status
364 // by the server. This can be called multiple times. Server side only.
365 func (s *Stream) SetTrailer(md metadata.MD) error {
366         if md.Len() == 0 {
367                 return nil
368         }
369         s.mu.Lock()
370         s.trailer = metadata.Join(s.trailer, md)
371         s.mu.Unlock()
372         return nil
373 }
374
375 func (s *Stream) write(m recvMsg) {
376         s.buf.put(m)
377 }
378
379 // Read reads all p bytes from the wire for this stream.
380 func (s *Stream) Read(p []byte) (n int, err error) {
381         // Don't request a read if there was an error earlier
382         if er := s.trReader.(*transportReader).er; er != nil {
383                 return 0, er
384         }
385         s.requestRead(len(p))
386         return io.ReadFull(s.trReader, p)
387 }
388
389 // tranportReader reads all the data available for this Stream from the transport and
390 // passes them into the decoder, which converts them into a gRPC message stream.
391 // The error is io.EOF when the stream is done or another non-nil error if
392 // the stream broke.
393 type transportReader struct {
394         reader io.Reader
395         // The handler to control the window update procedure for both this
396         // particular stream and the associated transport.
397         windowHandler func(int)
398         er            error
399 }
400
401 func (t *transportReader) Read(p []byte) (n int, err error) {
402         n, err = t.reader.Read(p)
403         if err != nil {
404                 t.er = err
405                 return
406         }
407         t.windowHandler(n)
408         return
409 }
410
411 // finish sets the stream's state and status, and closes the done channel.
412 // s.mu must be held by the caller.  st must always be non-nil.
413 func (s *Stream) finish(st *status.Status) {
414         s.status = st
415         s.state = streamDone
416         close(s.done)
417 }
418
419 // BytesSent indicates whether any bytes have been sent on this stream.
420 func (s *Stream) BytesSent() bool {
421         s.mu.Lock()
422         bs := s.bytesSent
423         s.mu.Unlock()
424         return bs
425 }
426
427 // BytesReceived indicates whether any bytes have been received on this stream.
428 func (s *Stream) BytesReceived() bool {
429         s.mu.Lock()
430         br := s.bytesReceived
431         s.mu.Unlock()
432         return br
433 }
434
435 // GoString is implemented by Stream so context.String() won't
436 // race when printing %#v.
437 func (s *Stream) GoString() string {
438         return fmt.Sprintf("<stream: %p, %v>", s, s.method)
439 }
440
441 // The key to save transport.Stream in the context.
442 type streamKey struct{}
443
444 // newContextWithStream creates a new context from ctx and attaches stream
445 // to it.
446 func newContextWithStream(ctx context.Context, stream *Stream) context.Context {
447         return context.WithValue(ctx, streamKey{}, stream)
448 }
449
450 // StreamFromContext returns the stream saved in ctx.
451 func StreamFromContext(ctx context.Context) (s *Stream, ok bool) {
452         s, ok = ctx.Value(streamKey{}).(*Stream)
453         return
454 }
455
456 // state of transport
457 type transportState int
458
459 const (
460         reachable transportState = iota
461         closing
462         draining
463 )
464
465 // ServerConfig consists of all the configurations to establish a server transport.
466 type ServerConfig struct {
467         MaxStreams            uint32
468         AuthInfo              credentials.AuthInfo
469         InTapHandle           tap.ServerInHandle
470         StatsHandler          stats.Handler
471         KeepaliveParams       keepalive.ServerParameters
472         KeepalivePolicy       keepalive.EnforcementPolicy
473         InitialWindowSize     int32
474         InitialConnWindowSize int32
475         WriteBufferSize       int
476         ReadBufferSize        int
477 }
478
479 // NewServerTransport creates a ServerTransport with conn or non-nil error
480 // if it fails.
481 func NewServerTransport(protocol string, conn net.Conn, config *ServerConfig) (ServerTransport, error) {
482         return newHTTP2Server(conn, config)
483 }
484
485 // ConnectOptions covers all relevant options for communicating with the server.
486 type ConnectOptions struct {
487         // UserAgent is the application user agent.
488         UserAgent string
489         // Authority is the :authority pseudo-header to use. This field has no effect if
490         // TransportCredentials is set.
491         Authority string
492         // Dialer specifies how to dial a network address.
493         Dialer func(context.Context, string) (net.Conn, error)
494         // FailOnNonTempDialError specifies if gRPC fails on non-temporary dial errors.
495         FailOnNonTempDialError bool
496         // PerRPCCredentials stores the PerRPCCredentials required to issue RPCs.
497         PerRPCCredentials []credentials.PerRPCCredentials
498         // TransportCredentials stores the Authenticator required to setup a client connection.
499         TransportCredentials credentials.TransportCredentials
500         // KeepaliveParams stores the keepalive parameters.
501         KeepaliveParams keepalive.ClientParameters
502         // StatsHandler stores the handler for stats.
503         StatsHandler stats.Handler
504         // InitialWindowSize sets the initial window size for a stream.
505         InitialWindowSize int32
506         // InitialConnWindowSize sets the initial window size for a connection.
507         InitialConnWindowSize int32
508         // WriteBufferSize sets the size of write buffer which in turn determines how much data can be batched before it's written on the wire.
509         WriteBufferSize int
510         // ReadBufferSize sets the size of read buffer, which in turn determines how much data can be read at most for one read syscall.
511         ReadBufferSize int
512 }
513
514 // TargetInfo contains the information of the target such as network address and metadata.
515 type TargetInfo struct {
516         Addr      string
517         Metadata  interface{}
518         Authority string
519 }
520
521 // NewClientTransport establishes the transport with the required ConnectOptions
522 // and returns it to the caller.
523 func NewClientTransport(ctx context.Context, target TargetInfo, opts ConnectOptions, timeout time.Duration) (ClientTransport, error) {
524         return newHTTP2Client(ctx, target, opts, timeout)
525 }
526
527 // Options provides additional hints and information for message
528 // transmission.
529 type Options struct {
530         // Last indicates whether this write is the last piece for
531         // this stream.
532         Last bool
533
534         // Delay is a hint to the transport implementation for whether
535         // the data could be buffered for a batching write. The
536         // transport implementation may ignore the hint.
537         Delay bool
538 }
539
540 // CallHdr carries the information of a particular RPC.
541 type CallHdr struct {
542         // Host specifies the peer's host.
543         Host string
544
545         // Method specifies the operation to perform.
546         Method string
547
548         // RecvCompress specifies the compression algorithm applied on
549         // inbound messages.
550         RecvCompress string
551
552         // SendCompress specifies the compression algorithm applied on
553         // outbound message.
554         SendCompress string
555
556         // Creds specifies credentials.PerRPCCredentials for a call.
557         Creds credentials.PerRPCCredentials
558
559         // Flush indicates whether a new stream command should be sent
560         // to the peer without waiting for the first data. This is
561         // only a hint.
562         // If it's true, the transport may modify the flush decision
563         // for performance purposes.
564         // If it's false, new stream will never be flushed.
565         Flush bool
566 }
567
568 // ClientTransport is the common interface for all gRPC client-side transport
569 // implementations.
570 type ClientTransport interface {
571         // Close tears down this transport. Once it returns, the transport
572         // should not be accessed any more. The caller must make sure this
573         // is called only once.
574         Close() error
575
576         // GracefulClose starts to tear down the transport. It stops accepting
577         // new RPCs and wait the completion of the pending RPCs.
578         GracefulClose() error
579
580         // Write sends the data for the given stream. A nil stream indicates
581         // the write is to be performed on the transport as a whole.
582         Write(s *Stream, hdr []byte, data []byte, opts *Options) error
583
584         // NewStream creates a Stream for an RPC.
585         NewStream(ctx context.Context, callHdr *CallHdr) (*Stream, error)
586
587         // CloseStream clears the footprint of a stream when the stream is
588         // not needed any more. The err indicates the error incurred when
589         // CloseStream is called. Must be called when a stream is finished
590         // unless the associated transport is closing.
591         CloseStream(stream *Stream, err error)
592
593         // Error returns a channel that is closed when some I/O error
594         // happens. Typically the caller should have a goroutine to monitor
595         // this in order to take action (e.g., close the current transport
596         // and create a new one) in error case. It should not return nil
597         // once the transport is initiated.
598         Error() <-chan struct{}
599
600         // GoAway returns a channel that is closed when ClientTransport
601         // receives the draining signal from the server (e.g., GOAWAY frame in
602         // HTTP/2).
603         GoAway() <-chan struct{}
604
605         // GetGoAwayReason returns the reason why GoAway frame was received.
606         GetGoAwayReason() GoAwayReason
607 }
608
609 // ServerTransport is the common interface for all gRPC server-side transport
610 // implementations.
611 //
612 // Methods may be called concurrently from multiple goroutines, but
613 // Write methods for a given Stream will be called serially.
614 type ServerTransport interface {
615         // HandleStreams receives incoming streams using the given handler.
616         HandleStreams(func(*Stream), func(context.Context, string) context.Context)
617
618         // WriteHeader sends the header metadata for the given stream.
619         // WriteHeader may not be called on all streams.
620         WriteHeader(s *Stream, md metadata.MD) error
621
622         // Write sends the data for the given stream.
623         // Write may not be called on all streams.
624         Write(s *Stream, hdr []byte, data []byte, opts *Options) error
625
626         // WriteStatus sends the status of a stream to the client.  WriteStatus is
627         // the final call made on a stream and always occurs.
628         WriteStatus(s *Stream, st *status.Status) error
629
630         // Close tears down the transport. Once it is called, the transport
631         // should not be accessed any more. All the pending streams and their
632         // handlers will be terminated asynchronously.
633         Close() error
634
635         // RemoteAddr returns the remote network address.
636         RemoteAddr() net.Addr
637
638         // Drain notifies the client this ServerTransport stops accepting new RPCs.
639         Drain()
640 }
641
642 // streamErrorf creates an StreamError with the specified error code and description.
643 func streamErrorf(c codes.Code, format string, a ...interface{}) StreamError {
644         return StreamError{
645                 Code: c,
646                 Desc: fmt.Sprintf(format, a...),
647         }
648 }
649
650 // connectionErrorf creates an ConnectionError with the specified error description.
651 func connectionErrorf(temp bool, e error, format string, a ...interface{}) ConnectionError {
652         return ConnectionError{
653                 Desc: fmt.Sprintf(format, a...),
654                 temp: temp,
655                 err:  e,
656         }
657 }
658
659 // ConnectionError is an error that results in the termination of the
660 // entire connection and the retry of all the active streams.
661 type ConnectionError struct {
662         Desc string
663         temp bool
664         err  error
665 }
666
667 func (e ConnectionError) Error() string {
668         return fmt.Sprintf("connection error: desc = %q", e.Desc)
669 }
670
671 // Temporary indicates if this connection error is temporary or fatal.
672 func (e ConnectionError) Temporary() bool {
673         return e.temp
674 }
675
676 // Origin returns the original error of this connection error.
677 func (e ConnectionError) Origin() error {
678         // Never return nil error here.
679         // If the original error is nil, return itself.
680         if e.err == nil {
681                 return e
682         }
683         return e.err
684 }
685
686 var (
687         // ErrConnClosing indicates that the transport is closing.
688         ErrConnClosing = connectionErrorf(true, nil, "transport is closing")
689         // ErrStreamDrain indicates that the stream is rejected by the server because
690         // the server stops accepting new RPCs.
691         ErrStreamDrain = streamErrorf(codes.Unavailable, "the server stops accepting new RPCs")
692 )
693
694 // TODO: See if we can replace StreamError with status package errors.
695
696 // StreamError is an error that only affects one stream within a connection.
697 type StreamError struct {
698         Code codes.Code
699         Desc string
700 }
701
702 func (e StreamError) Error() string {
703         return fmt.Sprintf("stream error: code = %s desc = %q", e.Code, e.Desc)
704 }
705
706 // wait blocks until it can receive from one of the provided contexts or
707 // channels.  ctx is the context of the RPC, tctx is the context of the
708 // transport, done is a channel closed to indicate the end of the RPC, goAway
709 // is a channel closed to indicate a GOAWAY was received, and proceed is a
710 // quota channel, whose received value is returned from this function if none
711 // of the other signals occur first.
712 func wait(ctx, tctx context.Context, done, goAway <-chan struct{}, proceed <-chan int) (int, error) {
713         select {
714         case <-ctx.Done():
715                 return 0, ContextErr(ctx.Err())
716         case <-done:
717                 return 0, io.EOF
718         case <-goAway:
719                 return 0, ErrStreamDrain
720         case <-tctx.Done():
721                 return 0, ErrConnClosing
722         case i := <-proceed:
723                 return i, nil
724         }
725 }
726
727 // GoAwayReason contains the reason for the GoAway frame received.
728 type GoAwayReason uint8
729
730 const (
731         // Invalid indicates that no GoAway frame is received.
732         Invalid GoAwayReason = 0
733         // NoReason is the default value when GoAway frame is received.
734         NoReason GoAwayReason = 1
735         // TooManyPings indicates that a GoAway frame with ErrCodeEnhanceYourCalm
736         // was received and that the debug data said "too_many_pings".
737         TooManyPings GoAwayReason = 2
738 )
739
740 // loopyWriter is run in a separate go routine. It is the single code path that will
741 // write data on wire.
742 func loopyWriter(ctx context.Context, cbuf *controlBuffer, handler func(item) error) {
743         for {
744                 select {
745                 case i := <-cbuf.get():
746                         cbuf.load()
747                         if err := handler(i); err != nil {
748                                 return
749                         }
750                 case <-ctx.Done():
751                         return
752                 }
753         hasData:
754                 for {
755                         select {
756                         case i := <-cbuf.get():
757                                 cbuf.load()
758                                 if err := handler(i); err != nil {
759                                         return
760                                 }
761                         case <-ctx.Done():
762                                 return
763                         default:
764                                 if err := handler(&flushIO{}); err != nil {
765                                         return
766                                 }
767                                 break hasData
768                         }
769                 }
770         }
771 }