OSDN Git Service

add package
[bytom/vapor.git] / vendor / github.com / hashicorp / go-plugin / grpc_broker.go
1 package plugin
2
3 import (
4         "context"
5         "crypto/tls"
6         "errors"
7         "fmt"
8         "log"
9         "net"
10         "sync"
11         "sync/atomic"
12         "time"
13
14         "github.com/oklog/run"
15         "google.golang.org/grpc"
16         "google.golang.org/grpc/credentials"
17 )
18
19 // streamer interface is used in the broker to send/receive connection
20 // information.
21 type streamer interface {
22         Send(*ConnInfo) error
23         Recv() (*ConnInfo, error)
24         Close()
25 }
26
27 // sendErr is used to pass errors back during a send.
28 type sendErr struct {
29         i  *ConnInfo
30         ch chan error
31 }
32
33 // gRPCBrokerServer is used by the plugin to start a stream and to send
34 // connection information to/from the plugin. Implements GRPCBrokerServer and
35 // streamer interfaces.
36 type gRPCBrokerServer struct {
37         // send is used to send connection info to the gRPC stream.
38         send chan *sendErr
39
40         // recv is used to receive connection info from the gRPC stream.
41         recv chan *ConnInfo
42
43         // quit closes down the stream.
44         quit chan struct{}
45
46         // o is used to ensure we close the quit channel only once.
47         o sync.Once
48 }
49
50 func newGRPCBrokerServer() *gRPCBrokerServer {
51         return &gRPCBrokerServer{
52                 send: make(chan *sendErr),
53                 recv: make(chan *ConnInfo),
54                 quit: make(chan struct{}),
55         }
56 }
57
58 // StartStream implements the GRPCBrokerServer interface and will block until
59 // the quit channel is closed or the context reports Done. The stream will pass
60 // connection information to/from the client.
61 func (s *gRPCBrokerServer) StartStream(stream GRPCBroker_StartStreamServer) error {
62         doneCh := stream.Context().Done()
63         defer s.Close()
64
65         // Proccess send stream
66         go func() {
67                 for {
68                         select {
69                         case <-doneCh:
70                                 return
71                         case <-s.quit:
72                                 return
73                         case se := <-s.send:
74                                 err := stream.Send(se.i)
75                                 se.ch <- err
76                         }
77                 }
78         }()
79
80         // Process receive stream
81         for {
82                 i, err := stream.Recv()
83                 if err != nil {
84                         return err
85                 }
86                 select {
87                 case <-doneCh:
88                         return nil
89                 case <-s.quit:
90                         return nil
91                 case s.recv <- i:
92                 }
93         }
94
95         return nil
96 }
97
98 // Send is used by the GRPCBroker to pass connection information into the stream
99 // to the client.
100 func (s *gRPCBrokerServer) Send(i *ConnInfo) error {
101         ch := make(chan error)
102         defer close(ch)
103
104         select {
105         case <-s.quit:
106                 return errors.New("broker closed")
107         case s.send <- &sendErr{
108                 i:  i,
109                 ch: ch,
110         }:
111         }
112
113         return <-ch
114 }
115
116 // Recv is used by the GRPCBroker to pass connection information that has been
117 // sent from the client from the stream to the broker.
118 func (s *gRPCBrokerServer) Recv() (*ConnInfo, error) {
119         select {
120         case <-s.quit:
121                 return nil, errors.New("broker closed")
122         case i := <-s.recv:
123                 return i, nil
124         }
125 }
126
127 // Close closes the quit channel, shutting down the stream.
128 func (s *gRPCBrokerServer) Close() {
129         s.o.Do(func() {
130                 close(s.quit)
131         })
132 }
133
134 // gRPCBrokerClientImpl is used by the client to start a stream and to send
135 // connection information to/from the client. Implements GRPCBrokerClient and
136 // streamer interfaces.
137 type gRPCBrokerClientImpl struct {
138         // client is the underlying GRPC client used to make calls to the server.
139         client GRPCBrokerClient
140
141         // send is used to send connection info to the gRPC stream.
142         send chan *sendErr
143
144         // recv is used to receive connection info from the gRPC stream.
145         recv chan *ConnInfo
146
147         // quit closes down the stream.
148         quit chan struct{}
149
150         // o is used to ensure we close the quit channel only once.
151         o sync.Once
152 }
153
154 func newGRPCBrokerClient(conn *grpc.ClientConn) *gRPCBrokerClientImpl {
155         return &gRPCBrokerClientImpl{
156                 client: NewGRPCBrokerClient(conn),
157                 send:   make(chan *sendErr),
158                 recv:   make(chan *ConnInfo),
159                 quit:   make(chan struct{}),
160         }
161 }
162
163 // StartStream implements the GRPCBrokerClient interface and will block until
164 // the quit channel is closed or the context reports Done. The stream will pass
165 // connection information to/from the plugin.
166 func (s *gRPCBrokerClientImpl) StartStream() error {
167         ctx, cancelFunc := context.WithCancel(context.Background())
168         defer cancelFunc()
169         defer s.Close()
170
171         stream, err := s.client.StartStream(ctx)
172         if err != nil {
173                 return err
174         }
175         doneCh := stream.Context().Done()
176
177         go func() {
178                 for {
179                         select {
180                         case <-doneCh:
181                                 return
182                         case <-s.quit:
183                                 return
184                         case se := <-s.send:
185                                 err := stream.Send(se.i)
186                                 se.ch <- err
187                         }
188                 }
189         }()
190
191         for {
192                 i, err := stream.Recv()
193                 if err != nil {
194                         return err
195                 }
196                 select {
197                 case <-doneCh:
198                         return nil
199                 case <-s.quit:
200                         return nil
201                 case s.recv <- i:
202                 }
203         }
204
205         return nil
206 }
207
208 // Send is used by the GRPCBroker to pass connection information into the stream
209 // to the plugin.
210 func (s *gRPCBrokerClientImpl) Send(i *ConnInfo) error {
211         ch := make(chan error)
212         defer close(ch)
213
214         select {
215         case <-s.quit:
216                 return errors.New("broker closed")
217         case s.send <- &sendErr{
218                 i:  i,
219                 ch: ch,
220         }:
221         }
222
223         return <-ch
224 }
225
226 // Recv is used by the GRPCBroker to pass connection information that has been
227 // sent from the plugin to the broker.
228 func (s *gRPCBrokerClientImpl) Recv() (*ConnInfo, error) {
229         select {
230         case <-s.quit:
231                 return nil, errors.New("broker closed")
232         case i := <-s.recv:
233                 return i, nil
234         }
235 }
236
237 // Close closes the quit channel, shutting down the stream.
238 func (s *gRPCBrokerClientImpl) Close() {
239         s.o.Do(func() {
240                 close(s.quit)
241         })
242 }
243
244 // GRPCBroker is responsible for brokering connections by unique ID.
245 //
246 // It is used by plugins to create multiple gRPC connections and data
247 // streams between the plugin process and the host process.
248 //
249 // This allows a plugin to request a channel with a specific ID to connect to
250 // or accept a connection from, and the broker handles the details of
251 // holding these channels open while they're being negotiated.
252 //
253 // The Plugin interface has access to these for both Server and Client.
254 // The broker can be used by either (optionally) to reserve and connect to
255 // new streams. This is useful for complex args and return values,
256 // or anything else you might need a data stream for.
257 type GRPCBroker struct {
258         nextId   uint32
259         streamer streamer
260         streams  map[uint32]*gRPCBrokerPending
261         tls      *tls.Config
262         doneCh   chan struct{}
263         o        sync.Once
264
265         sync.Mutex
266 }
267
268 type gRPCBrokerPending struct {
269         ch     chan *ConnInfo
270         doneCh chan struct{}
271 }
272
273 func newGRPCBroker(s streamer, tls *tls.Config) *GRPCBroker {
274         return &GRPCBroker{
275                 streamer: s,
276                 streams:  make(map[uint32]*gRPCBrokerPending),
277                 tls:      tls,
278                 doneCh:   make(chan struct{}),
279         }
280 }
281
282 // Accept accepts a connection by ID.
283 //
284 // This should not be called multiple times with the same ID at one time.
285 func (b *GRPCBroker) Accept(id uint32) (net.Listener, error) {
286         listener, err := serverListener()
287         if err != nil {
288                 return nil, err
289         }
290
291         err = b.streamer.Send(&ConnInfo{
292                 ServiceId: id,
293                 Network:   listener.Addr().Network(),
294                 Address:   listener.Addr().String(),
295         })
296         if err != nil {
297                 return nil, err
298         }
299
300         return listener, nil
301 }
302
303 // AcceptAndServe is used to accept a specific stream ID and immediately
304 // serve a gRPC server on that stream ID. This is used to easily serve
305 // complex arguments. Each AcceptAndServe call opens a new listener socket and
306 // sends the connection info down the stream to the dialer. Since a new
307 // connection is opened every call, these calls should be used sparingly.
308 // Multiple gRPC server implementations can be registered to a single
309 // AcceptAndServe call.
310 func (b *GRPCBroker) AcceptAndServe(id uint32, s func([]grpc.ServerOption) *grpc.Server) {
311         listener, err := b.Accept(id)
312         if err != nil {
313                 log.Printf("[ERR] plugin: plugin acceptAndServe error: %s", err)
314                 return
315         }
316         defer listener.Close()
317
318         var opts []grpc.ServerOption
319         if b.tls != nil {
320                 opts = []grpc.ServerOption{grpc.Creds(credentials.NewTLS(b.tls))}
321         }
322
323         server := s(opts)
324
325         // Here we use a run group to close this goroutine if the server is shutdown
326         // or the broker is shutdown.
327         var g run.Group
328         {
329                 // Serve on the listener, if shutting down call GracefulStop.
330                 g.Add(func() error {
331                         return server.Serve(listener)
332                 }, func(err error) {
333                         server.GracefulStop()
334                 })
335         }
336         {
337                 // block on the closeCh or the doneCh. If we are shutting down close the
338                 // closeCh.
339                 closeCh := make(chan struct{})
340                 g.Add(func() error {
341                         select {
342                         case <-b.doneCh:
343                         case <-closeCh:
344                         }
345                         return nil
346                 }, func(err error) {
347                         close(closeCh)
348                 })
349         }
350
351         // Block until we are done
352         g.Run()
353 }
354
355 // Close closes the stream and all servers.
356 func (b *GRPCBroker) Close() error {
357         b.streamer.Close()
358         b.o.Do(func() {
359                 close(b.doneCh)
360         })
361         return nil
362 }
363
364 // Dial opens a connection by ID.
365 func (b *GRPCBroker) Dial(id uint32) (conn *grpc.ClientConn, err error) {
366         var c *ConnInfo
367
368         // Open the stream
369         p := b.getStream(id)
370         select {
371         case c = <-p.ch:
372                 close(p.doneCh)
373         case <-time.After(5 * time.Second):
374                 return nil, fmt.Errorf("timeout waiting for connection info")
375         }
376
377         var addr net.Addr
378         switch c.Network {
379         case "tcp":
380                 addr, err = net.ResolveTCPAddr("tcp", c.Address)
381         case "unix":
382                 addr, err = net.ResolveUnixAddr("unix", c.Address)
383         default:
384                 err = fmt.Errorf("Unknown address type: %s", c.Address)
385         }
386         if err != nil {
387                 return nil, err
388         }
389
390         return dialGRPCConn(b.tls, netAddrDialer(addr))
391 }
392
393 // NextId returns a unique ID to use next.
394 //
395 // It is possible for very long-running plugin hosts to wrap this value,
396 // though it would require a very large amount of calls. In practice
397 // we've never seen it happen.
398 func (m *GRPCBroker) NextId() uint32 {
399         return atomic.AddUint32(&m.nextId, 1)
400 }
401
402 // Run starts the brokering and should be executed in a goroutine, since it
403 // blocks forever, or until the session closes.
404 //
405 // Uses of GRPCBroker never need to call this. It is called internally by
406 // the plugin host/client.
407 func (m *GRPCBroker) Run() {
408         for {
409                 stream, err := m.streamer.Recv()
410                 if err != nil {
411                         // Once we receive an error, just exit
412                         break
413                 }
414
415                 // Initialize the waiter
416                 p := m.getStream(stream.ServiceId)
417                 select {
418                 case p.ch <- stream:
419                 default:
420                 }
421
422                 go m.timeoutWait(stream.ServiceId, p)
423         }
424 }
425
426 func (m *GRPCBroker) getStream(id uint32) *gRPCBrokerPending {
427         m.Lock()
428         defer m.Unlock()
429
430         p, ok := m.streams[id]
431         if ok {
432                 return p
433         }
434
435         m.streams[id] = &gRPCBrokerPending{
436                 ch:     make(chan *ConnInfo, 1),
437                 doneCh: make(chan struct{}),
438         }
439         return m.streams[id]
440 }
441
442 func (m *GRPCBroker) timeoutWait(id uint32, p *gRPCBrokerPending) {
443         // Wait for the stream to either be picked up and connected, or
444         // for a timeout.
445         select {
446         case <-p.doneCh:
447         case <-time.After(5 * time.Second):
448         }
449
450         m.Lock()
451         defer m.Unlock()
452
453         // Delete the stream so no one else can grab it
454         delete(m.streams, id)
455 }