14 "github.com/oklog/run"
15 "google.golang.org/grpc"
16 "google.golang.org/grpc/credentials"
19 // streamer interface is used in the broker to send/receive connection
21 type streamer interface {
23 Recv() (*ConnInfo, error)
27 // sendErr is used to pass errors back during a send.
33 // gRPCBrokerServer is used by the plugin to start a stream and to send
34 // connection information to/from the plugin. Implements GRPCBrokerServer and
35 // streamer interfaces.
36 type gRPCBrokerServer struct {
37 // send is used to send connection info to the gRPC stream.
40 // recv is used to receive connection info from the gRPC stream.
43 // quit closes down the stream.
46 // o is used to ensure we close the quit channel only once.
50 func newGRPCBrokerServer() *gRPCBrokerServer {
51 return &gRPCBrokerServer{
52 send: make(chan *sendErr),
53 recv: make(chan *ConnInfo),
54 quit: make(chan struct{}),
58 // StartStream implements the GRPCBrokerServer interface and will block until
59 // the quit channel is closed or the context reports Done. The stream will pass
60 // connection information to/from the client.
61 func (s *gRPCBrokerServer) StartStream(stream GRPCBroker_StartStreamServer) error {
62 doneCh := stream.Context().Done()
65 // Proccess send stream
74 err := stream.Send(se.i)
80 // Process receive stream
82 i, err := stream.Recv()
98 // Send is used by the GRPCBroker to pass connection information into the stream
100 func (s *gRPCBrokerServer) Send(i *ConnInfo) error {
101 ch := make(chan error)
106 return errors.New("broker closed")
107 case s.send <- &sendErr{
116 // Recv is used by the GRPCBroker to pass connection information that has been
117 // sent from the client from the stream to the broker.
118 func (s *gRPCBrokerServer) Recv() (*ConnInfo, error) {
121 return nil, errors.New("broker closed")
127 // Close closes the quit channel, shutting down the stream.
128 func (s *gRPCBrokerServer) Close() {
134 // gRPCBrokerClientImpl is used by the client to start a stream and to send
135 // connection information to/from the client. Implements GRPCBrokerClient and
136 // streamer interfaces.
137 type gRPCBrokerClientImpl struct {
138 // client is the underlying GRPC client used to make calls to the server.
139 client GRPCBrokerClient
141 // send is used to send connection info to the gRPC stream.
144 // recv is used to receive connection info from the gRPC stream.
147 // quit closes down the stream.
150 // o is used to ensure we close the quit channel only once.
154 func newGRPCBrokerClient(conn *grpc.ClientConn) *gRPCBrokerClientImpl {
155 return &gRPCBrokerClientImpl{
156 client: NewGRPCBrokerClient(conn),
157 send: make(chan *sendErr),
158 recv: make(chan *ConnInfo),
159 quit: make(chan struct{}),
163 // StartStream implements the GRPCBrokerClient interface and will block until
164 // the quit channel is closed or the context reports Done. The stream will pass
165 // connection information to/from the plugin.
166 func (s *gRPCBrokerClientImpl) StartStream() error {
167 ctx, cancelFunc := context.WithCancel(context.Background())
171 stream, err := s.client.StartStream(ctx)
175 doneCh := stream.Context().Done()
185 err := stream.Send(se.i)
192 i, err := stream.Recv()
208 // Send is used by the GRPCBroker to pass connection information into the stream
210 func (s *gRPCBrokerClientImpl) Send(i *ConnInfo) error {
211 ch := make(chan error)
216 return errors.New("broker closed")
217 case s.send <- &sendErr{
226 // Recv is used by the GRPCBroker to pass connection information that has been
227 // sent from the plugin to the broker.
228 func (s *gRPCBrokerClientImpl) Recv() (*ConnInfo, error) {
231 return nil, errors.New("broker closed")
237 // Close closes the quit channel, shutting down the stream.
238 func (s *gRPCBrokerClientImpl) Close() {
244 // GRPCBroker is responsible for brokering connections by unique ID.
246 // It is used by plugins to create multiple gRPC connections and data
247 // streams between the plugin process and the host process.
249 // This allows a plugin to request a channel with a specific ID to connect to
250 // or accept a connection from, and the broker handles the details of
251 // holding these channels open while they're being negotiated.
253 // The Plugin interface has access to these for both Server and Client.
254 // The broker can be used by either (optionally) to reserve and connect to
255 // new streams. This is useful for complex args and return values,
256 // or anything else you might need a data stream for.
257 type GRPCBroker struct {
260 streams map[uint32]*gRPCBrokerPending
268 type gRPCBrokerPending struct {
273 func newGRPCBroker(s streamer, tls *tls.Config) *GRPCBroker {
276 streams: make(map[uint32]*gRPCBrokerPending),
278 doneCh: make(chan struct{}),
282 // Accept accepts a connection by ID.
284 // This should not be called multiple times with the same ID at one time.
285 func (b *GRPCBroker) Accept(id uint32) (net.Listener, error) {
286 listener, err := serverListener()
291 err = b.streamer.Send(&ConnInfo{
293 Network: listener.Addr().Network(),
294 Address: listener.Addr().String(),
303 // AcceptAndServe is used to accept a specific stream ID and immediately
304 // serve a gRPC server on that stream ID. This is used to easily serve
305 // complex arguments. Each AcceptAndServe call opens a new listener socket and
306 // sends the connection info down the stream to the dialer. Since a new
307 // connection is opened every call, these calls should be used sparingly.
308 // Multiple gRPC server implementations can be registered to a single
309 // AcceptAndServe call.
310 func (b *GRPCBroker) AcceptAndServe(id uint32, s func([]grpc.ServerOption) *grpc.Server) {
311 listener, err := b.Accept(id)
313 log.Printf("[ERR] plugin: plugin acceptAndServe error: %s", err)
316 defer listener.Close()
318 var opts []grpc.ServerOption
320 opts = []grpc.ServerOption{grpc.Creds(credentials.NewTLS(b.tls))}
325 // Here we use a run group to close this goroutine if the server is shutdown
326 // or the broker is shutdown.
329 // Serve on the listener, if shutting down call GracefulStop.
331 return server.Serve(listener)
333 server.GracefulStop()
337 // block on the closeCh or the doneCh. If we are shutting down close the
339 closeCh := make(chan struct{})
351 // Block until we are done
355 // Close closes the stream and all servers.
356 func (b *GRPCBroker) Close() error {
364 // Dial opens a connection by ID.
365 func (b *GRPCBroker) Dial(id uint32) (conn *grpc.ClientConn, err error) {
373 case <-time.After(5 * time.Second):
374 return nil, fmt.Errorf("timeout waiting for connection info")
380 addr, err = net.ResolveTCPAddr("tcp", c.Address)
382 addr, err = net.ResolveUnixAddr("unix", c.Address)
384 err = fmt.Errorf("Unknown address type: %s", c.Address)
390 return dialGRPCConn(b.tls, netAddrDialer(addr))
393 // NextId returns a unique ID to use next.
395 // It is possible for very long-running plugin hosts to wrap this value,
396 // though it would require a very large amount of calls. In practice
397 // we've never seen it happen.
398 func (m *GRPCBroker) NextId() uint32 {
399 return atomic.AddUint32(&m.nextId, 1)
402 // Run starts the brokering and should be executed in a goroutine, since it
403 // blocks forever, or until the session closes.
405 // Uses of GRPCBroker never need to call this. It is called internally by
406 // the plugin host/client.
407 func (m *GRPCBroker) Run() {
409 stream, err := m.streamer.Recv()
411 // Once we receive an error, just exit
415 // Initialize the waiter
416 p := m.getStream(stream.ServiceId)
422 go m.timeoutWait(stream.ServiceId, p)
426 func (m *GRPCBroker) getStream(id uint32) *gRPCBrokerPending {
430 p, ok := m.streams[id]
435 m.streams[id] = &gRPCBrokerPending{
436 ch: make(chan *ConnInfo, 1),
437 doneCh: make(chan struct{}),
442 func (m *GRPCBroker) timeoutWait(id uint32, p *gRPCBrokerPending) {
443 // Wait for the stream to either be picked up and connected, or
447 case <-time.After(5 * time.Second):
453 // Delete the stream so no one else can grab it
454 delete(m.streams, id)