OSDN Git Service

Merge pull request #201 from Bytom/v0.1
[bytom/vapor.git] / vendor / github.com / hashicorp / go-plugin / grpc_broker.go
diff --git a/vendor/github.com/hashicorp/go-plugin/grpc_broker.go b/vendor/github.com/hashicorp/go-plugin/grpc_broker.go
deleted file mode 100644 (file)
index 49fd21c..0000000
+++ /dev/null
@@ -1,455 +0,0 @@
-package plugin
-
-import (
-       "context"
-       "crypto/tls"
-       "errors"
-       "fmt"
-       "log"
-       "net"
-       "sync"
-       "sync/atomic"
-       "time"
-
-       "github.com/oklog/run"
-       "google.golang.org/grpc"
-       "google.golang.org/grpc/credentials"
-)
-
-// streamer interface is used in the broker to send/receive connection
-// information.
-type streamer interface {
-       Send(*ConnInfo) error
-       Recv() (*ConnInfo, error)
-       Close()
-}
-
-// sendErr is used to pass errors back during a send.
-type sendErr struct {
-       i  *ConnInfo
-       ch chan error
-}
-
-// gRPCBrokerServer is used by the plugin to start a stream and to send
-// connection information to/from the plugin. Implements GRPCBrokerServer and
-// streamer interfaces.
-type gRPCBrokerServer struct {
-       // send is used to send connection info to the gRPC stream.
-       send chan *sendErr
-
-       // recv is used to receive connection info from the gRPC stream.
-       recv chan *ConnInfo
-
-       // quit closes down the stream.
-       quit chan struct{}
-
-       // o is used to ensure we close the quit channel only once.
-       o sync.Once
-}
-
-func newGRPCBrokerServer() *gRPCBrokerServer {
-       return &gRPCBrokerServer{
-               send: make(chan *sendErr),
-               recv: make(chan *ConnInfo),
-               quit: make(chan struct{}),
-       }
-}
-
-// StartStream implements the GRPCBrokerServer interface and will block until
-// the quit channel is closed or the context reports Done. The stream will pass
-// connection information to/from the client.
-func (s *gRPCBrokerServer) StartStream(stream GRPCBroker_StartStreamServer) error {
-       doneCh := stream.Context().Done()
-       defer s.Close()
-
-       // Proccess send stream
-       go func() {
-               for {
-                       select {
-                       case <-doneCh:
-                               return
-                       case <-s.quit:
-                               return
-                       case se := <-s.send:
-                               err := stream.Send(se.i)
-                               se.ch <- err
-                       }
-               }
-       }()
-
-       // Process receive stream
-       for {
-               i, err := stream.Recv()
-               if err != nil {
-                       return err
-               }
-               select {
-               case <-doneCh:
-                       return nil
-               case <-s.quit:
-                       return nil
-               case s.recv <- i:
-               }
-       }
-
-       return nil
-}
-
-// Send is used by the GRPCBroker to pass connection information into the stream
-// to the client.
-func (s *gRPCBrokerServer) Send(i *ConnInfo) error {
-       ch := make(chan error)
-       defer close(ch)
-
-       select {
-       case <-s.quit:
-               return errors.New("broker closed")
-       case s.send <- &sendErr{
-               i:  i,
-               ch: ch,
-       }:
-       }
-
-       return <-ch
-}
-
-// Recv is used by the GRPCBroker to pass connection information that has been
-// sent from the client from the stream to the broker.
-func (s *gRPCBrokerServer) Recv() (*ConnInfo, error) {
-       select {
-       case <-s.quit:
-               return nil, errors.New("broker closed")
-       case i := <-s.recv:
-               return i, nil
-       }
-}
-
-// Close closes the quit channel, shutting down the stream.
-func (s *gRPCBrokerServer) Close() {
-       s.o.Do(func() {
-               close(s.quit)
-       })
-}
-
-// gRPCBrokerClientImpl is used by the client to start a stream and to send
-// connection information to/from the client. Implements GRPCBrokerClient and
-// streamer interfaces.
-type gRPCBrokerClientImpl struct {
-       // client is the underlying GRPC client used to make calls to the server.
-       client GRPCBrokerClient
-
-       // send is used to send connection info to the gRPC stream.
-       send chan *sendErr
-
-       // recv is used to receive connection info from the gRPC stream.
-       recv chan *ConnInfo
-
-       // quit closes down the stream.
-       quit chan struct{}
-
-       // o is used to ensure we close the quit channel only once.
-       o sync.Once
-}
-
-func newGRPCBrokerClient(conn *grpc.ClientConn) *gRPCBrokerClientImpl {
-       return &gRPCBrokerClientImpl{
-               client: NewGRPCBrokerClient(conn),
-               send:   make(chan *sendErr),
-               recv:   make(chan *ConnInfo),
-               quit:   make(chan struct{}),
-       }
-}
-
-// StartStream implements the GRPCBrokerClient interface and will block until
-// the quit channel is closed or the context reports Done. The stream will pass
-// connection information to/from the plugin.
-func (s *gRPCBrokerClientImpl) StartStream() error {
-       ctx, cancelFunc := context.WithCancel(context.Background())
-       defer cancelFunc()
-       defer s.Close()
-
-       stream, err := s.client.StartStream(ctx)
-       if err != nil {
-               return err
-       }
-       doneCh := stream.Context().Done()
-
-       go func() {
-               for {
-                       select {
-                       case <-doneCh:
-                               return
-                       case <-s.quit:
-                               return
-                       case se := <-s.send:
-                               err := stream.Send(se.i)
-                               se.ch <- err
-                       }
-               }
-       }()
-
-       for {
-               i, err := stream.Recv()
-               if err != nil {
-                       return err
-               }
-               select {
-               case <-doneCh:
-                       return nil
-               case <-s.quit:
-                       return nil
-               case s.recv <- i:
-               }
-       }
-
-       return nil
-}
-
-// Send is used by the GRPCBroker to pass connection information into the stream
-// to the plugin.
-func (s *gRPCBrokerClientImpl) Send(i *ConnInfo) error {
-       ch := make(chan error)
-       defer close(ch)
-
-       select {
-       case <-s.quit:
-               return errors.New("broker closed")
-       case s.send <- &sendErr{
-               i:  i,
-               ch: ch,
-       }:
-       }
-
-       return <-ch
-}
-
-// Recv is used by the GRPCBroker to pass connection information that has been
-// sent from the plugin to the broker.
-func (s *gRPCBrokerClientImpl) Recv() (*ConnInfo, error) {
-       select {
-       case <-s.quit:
-               return nil, errors.New("broker closed")
-       case i := <-s.recv:
-               return i, nil
-       }
-}
-
-// Close closes the quit channel, shutting down the stream.
-func (s *gRPCBrokerClientImpl) Close() {
-       s.o.Do(func() {
-               close(s.quit)
-       })
-}
-
-// GRPCBroker is responsible for brokering connections by unique ID.
-//
-// It is used by plugins to create multiple gRPC connections and data
-// streams between the plugin process and the host process.
-//
-// This allows a plugin to request a channel with a specific ID to connect to
-// or accept a connection from, and the broker handles the details of
-// holding these channels open while they're being negotiated.
-//
-// The Plugin interface has access to these for both Server and Client.
-// The broker can be used by either (optionally) to reserve and connect to
-// new streams. This is useful for complex args and return values,
-// or anything else you might need a data stream for.
-type GRPCBroker struct {
-       nextId   uint32
-       streamer streamer
-       streams  map[uint32]*gRPCBrokerPending
-       tls      *tls.Config
-       doneCh   chan struct{}
-       o        sync.Once
-
-       sync.Mutex
-}
-
-type gRPCBrokerPending struct {
-       ch     chan *ConnInfo
-       doneCh chan struct{}
-}
-
-func newGRPCBroker(s streamer, tls *tls.Config) *GRPCBroker {
-       return &GRPCBroker{
-               streamer: s,
-               streams:  make(map[uint32]*gRPCBrokerPending),
-               tls:      tls,
-               doneCh:   make(chan struct{}),
-       }
-}
-
-// Accept accepts a connection by ID.
-//
-// This should not be called multiple times with the same ID at one time.
-func (b *GRPCBroker) Accept(id uint32) (net.Listener, error) {
-       listener, err := serverListener()
-       if err != nil {
-               return nil, err
-       }
-
-       err = b.streamer.Send(&ConnInfo{
-               ServiceId: id,
-               Network:   listener.Addr().Network(),
-               Address:   listener.Addr().String(),
-       })
-       if err != nil {
-               return nil, err
-       }
-
-       return listener, nil
-}
-
-// AcceptAndServe is used to accept a specific stream ID and immediately
-// serve a gRPC server on that stream ID. This is used to easily serve
-// complex arguments. Each AcceptAndServe call opens a new listener socket and
-// sends the connection info down the stream to the dialer. Since a new
-// connection is opened every call, these calls should be used sparingly.
-// Multiple gRPC server implementations can be registered to a single
-// AcceptAndServe call.
-func (b *GRPCBroker) AcceptAndServe(id uint32, s func([]grpc.ServerOption) *grpc.Server) {
-       listener, err := b.Accept(id)
-       if err != nil {
-               log.Printf("[ERR] plugin: plugin acceptAndServe error: %s", err)
-               return
-       }
-       defer listener.Close()
-
-       var opts []grpc.ServerOption
-       if b.tls != nil {
-               opts = []grpc.ServerOption{grpc.Creds(credentials.NewTLS(b.tls))}
-       }
-
-       server := s(opts)
-
-       // Here we use a run group to close this goroutine if the server is shutdown
-       // or the broker is shutdown.
-       var g run.Group
-       {
-               // Serve on the listener, if shutting down call GracefulStop.
-               g.Add(func() error {
-                       return server.Serve(listener)
-               }, func(err error) {
-                       server.GracefulStop()
-               })
-       }
-       {
-               // block on the closeCh or the doneCh. If we are shutting down close the
-               // closeCh.
-               closeCh := make(chan struct{})
-               g.Add(func() error {
-                       select {
-                       case <-b.doneCh:
-                       case <-closeCh:
-                       }
-                       return nil
-               }, func(err error) {
-                       close(closeCh)
-               })
-       }
-
-       // Block until we are done
-       g.Run()
-}
-
-// Close closes the stream and all servers.
-func (b *GRPCBroker) Close() error {
-       b.streamer.Close()
-       b.o.Do(func() {
-               close(b.doneCh)
-       })
-       return nil
-}
-
-// Dial opens a connection by ID.
-func (b *GRPCBroker) Dial(id uint32) (conn *grpc.ClientConn, err error) {
-       var c *ConnInfo
-
-       // Open the stream
-       p := b.getStream(id)
-       select {
-       case c = <-p.ch:
-               close(p.doneCh)
-       case <-time.After(5 * time.Second):
-               return nil, fmt.Errorf("timeout waiting for connection info")
-       }
-
-       var addr net.Addr
-       switch c.Network {
-       case "tcp":
-               addr, err = net.ResolveTCPAddr("tcp", c.Address)
-       case "unix":
-               addr, err = net.ResolveUnixAddr("unix", c.Address)
-       default:
-               err = fmt.Errorf("Unknown address type: %s", c.Address)
-       }
-       if err != nil {
-               return nil, err
-       }
-
-       return dialGRPCConn(b.tls, netAddrDialer(addr))
-}
-
-// NextId returns a unique ID to use next.
-//
-// It is possible for very long-running plugin hosts to wrap this value,
-// though it would require a very large amount of calls. In practice
-// we've never seen it happen.
-func (m *GRPCBroker) NextId() uint32 {
-       return atomic.AddUint32(&m.nextId, 1)
-}
-
-// Run starts the brokering and should be executed in a goroutine, since it
-// blocks forever, or until the session closes.
-//
-// Uses of GRPCBroker never need to call this. It is called internally by
-// the plugin host/client.
-func (m *GRPCBroker) Run() {
-       for {
-               stream, err := m.streamer.Recv()
-               if err != nil {
-                       // Once we receive an error, just exit
-                       break
-               }
-
-               // Initialize the waiter
-               p := m.getStream(stream.ServiceId)
-               select {
-               case p.ch <- stream:
-               default:
-               }
-
-               go m.timeoutWait(stream.ServiceId, p)
-       }
-}
-
-func (m *GRPCBroker) getStream(id uint32) *gRPCBrokerPending {
-       m.Lock()
-       defer m.Unlock()
-
-       p, ok := m.streams[id]
-       if ok {
-               return p
-       }
-
-       m.streams[id] = &gRPCBrokerPending{
-               ch:     make(chan *ConnInfo, 1),
-               doneCh: make(chan struct{}),
-       }
-       return m.streams[id]
-}
-
-func (m *GRPCBroker) timeoutWait(id uint32, p *gRPCBrokerPending) {
-       // Wait for the stream to either be picked up and connected, or
-       // for a timeout.
-       select {
-       case <-p.doneCh:
-       case <-time.After(5 * time.Second):
-       }
-
-       m.Lock()
-       defer m.Unlock()
-
-       // Delete the stream so no one else can grab it
-       delete(m.streams, id)
-}