+++ /dev/null
-package plugin
-
-import (
- "bytes"
- "crypto/tls"
- "encoding/json"
- "fmt"
- "io"
- "net"
-
- "google.golang.org/grpc"
- "google.golang.org/grpc/credentials"
- "google.golang.org/grpc/health"
- "google.golang.org/grpc/health/grpc_health_v1"
-)
-
-// GRPCServiceName is the name of the service that the health check should
-// return as passing.
-const GRPCServiceName = "plugin"
-
-// DefaultGRPCServer can be used with the "GRPCServer" field for Server
-// as a default factory method to create a gRPC server with no extra options.
-func DefaultGRPCServer(opts []grpc.ServerOption) *grpc.Server {
- return grpc.NewServer(opts...)
-}
-
-// GRPCServer is a ServerType implementation that serves plugins over
-// gRPC. This allows plugins to easily be written for other languages.
-//
-// The GRPCServer outputs a custom configuration as a base64-encoded
-// JSON structure represented by the GRPCServerConfig config structure.
-type GRPCServer struct {
- // Plugins are the list of plugins to serve.
- Plugins map[string]Plugin
-
- // Server is the actual server that will accept connections. This
- // will be used for plugin registration as well.
- Server func([]grpc.ServerOption) *grpc.Server
-
- // TLS should be the TLS configuration if available. If this is nil,
- // the connection will not have transport security.
- TLS *tls.Config
-
- // DoneCh is the channel that is closed when this server has exited.
- DoneCh chan struct{}
-
- // Stdout/StderrLis are the readers for stdout/stderr that will be copied
- // to the stdout/stderr connection that is output.
- Stdout io.Reader
- Stderr io.Reader
-
- config GRPCServerConfig
- server *grpc.Server
- broker *GRPCBroker
-}
-
-// ServerProtocol impl.
-func (s *GRPCServer) Init() error {
- // Create our server
- var opts []grpc.ServerOption
- if s.TLS != nil {
- opts = append(opts, grpc.Creds(credentials.NewTLS(s.TLS)))
- }
- s.server = s.Server(opts)
-
- // Register the health service
- healthCheck := health.NewServer()
- healthCheck.SetServingStatus(
- GRPCServiceName, grpc_health_v1.HealthCheckResponse_SERVING)
- grpc_health_v1.RegisterHealthServer(s.server, healthCheck)
-
- // Register the broker service
- brokerServer := newGRPCBrokerServer()
- RegisterGRPCBrokerServer(s.server, brokerServer)
- s.broker = newGRPCBroker(brokerServer, s.TLS)
- go s.broker.Run()
-
- // Register all our plugins onto the gRPC server.
- for k, raw := range s.Plugins {
- p, ok := raw.(GRPCPlugin)
- if !ok {
- return fmt.Errorf("%q is not a GRPC-compatible plugin", k)
- }
-
- if err := p.GRPCServer(s.broker, s.server); err != nil {
- return fmt.Errorf("error registring %q: %s", k, err)
- }
- }
-
- return nil
-}
-
-// Stop calls Stop on the underlying grpc.Server
-func (s *GRPCServer) Stop() {
- s.server.Stop()
-}
-
-// GracefulStop calls GracefulStop on the underlying grpc.Server
-func (s *GRPCServer) GracefulStop() {
- s.server.GracefulStop()
-}
-
-// Config is the GRPCServerConfig encoded as JSON then base64.
-func (s *GRPCServer) Config() string {
- // Create a buffer that will contain our final contents
- var buf bytes.Buffer
-
- // Wrap the base64 encoding with JSON encoding.
- if err := json.NewEncoder(&buf).Encode(s.config); err != nil {
- // We panic since ths shouldn't happen under any scenario. We
- // carefully control the structure being encoded here and it should
- // always be successful.
- panic(err)
- }
-
- return buf.String()
-}
-
-func (s *GRPCServer) Serve(lis net.Listener) {
- // Start serving in a goroutine
- go s.server.Serve(lis)
-
- // Wait until graceful completion
- <-s.DoneCh
-}
-
-// GRPCServerConfig is the extra configuration passed along for consumers
-// to facilitate using GRPC plugins.
-type GRPCServerConfig struct {
- StdoutAddr string `json:"stdout_addr"`
- StderrAddr string `json:"stderr_addr"`
-}