11 "google.golang.org/grpc"
12 "google.golang.org/grpc/credentials"
13 "google.golang.org/grpc/health"
14 "google.golang.org/grpc/health/grpc_health_v1"
17 // GRPCServiceName is the name of the service that the health check should
19 const GRPCServiceName = "plugin"
21 // DefaultGRPCServer can be used with the "GRPCServer" field for Server
22 // as a default factory method to create a gRPC server with no extra options.
23 func DefaultGRPCServer(opts []grpc.ServerOption) *grpc.Server {
24 return grpc.NewServer(opts...)
27 // GRPCServer is a ServerType implementation that serves plugins over
28 // gRPC. This allows plugins to easily be written for other languages.
30 // The GRPCServer outputs a custom configuration as a base64-encoded
31 // JSON structure represented by the GRPCServerConfig config structure.
32 type GRPCServer struct {
33 // Plugins are the list of plugins to serve.
34 Plugins map[string]Plugin
36 // Server is the actual server that will accept connections. This
37 // will be used for plugin registration as well.
38 Server func([]grpc.ServerOption) *grpc.Server
40 // TLS should be the TLS configuration if available. If this is nil,
41 // the connection will not have transport security.
44 // DoneCh is the channel that is closed when this server has exited.
47 // Stdout/StderrLis are the readers for stdout/stderr that will be copied
48 // to the stdout/stderr connection that is output.
52 config GRPCServerConfig
57 // ServerProtocol impl.
58 func (s *GRPCServer) Init() error {
60 var opts []grpc.ServerOption
62 opts = append(opts, grpc.Creds(credentials.NewTLS(s.TLS)))
64 s.server = s.Server(opts)
66 // Register the health service
67 healthCheck := health.NewServer()
68 healthCheck.SetServingStatus(
69 GRPCServiceName, grpc_health_v1.HealthCheckResponse_SERVING)
70 grpc_health_v1.RegisterHealthServer(s.server, healthCheck)
72 // Register the broker service
73 brokerServer := newGRPCBrokerServer()
74 RegisterGRPCBrokerServer(s.server, brokerServer)
75 s.broker = newGRPCBroker(brokerServer, s.TLS)
78 // Register all our plugins onto the gRPC server.
79 for k, raw := range s.Plugins {
80 p, ok := raw.(GRPCPlugin)
82 return fmt.Errorf("%q is not a GRPC-compatible plugin", k)
85 if err := p.GRPCServer(s.broker, s.server); err != nil {
86 return fmt.Errorf("error registring %q: %s", k, err)
93 // Stop calls Stop on the underlying grpc.Server
94 func (s *GRPCServer) Stop() {
98 // GracefulStop calls GracefulStop on the underlying grpc.Server
99 func (s *GRPCServer) GracefulStop() {
100 s.server.GracefulStop()
103 // Config is the GRPCServerConfig encoded as JSON then base64.
104 func (s *GRPCServer) Config() string {
105 // Create a buffer that will contain our final contents
108 // Wrap the base64 encoding with JSON encoding.
109 if err := json.NewEncoder(&buf).Encode(s.config); err != nil {
110 // We panic since ths shouldn't happen under any scenario. We
111 // carefully control the structure being encoded here and it should
112 // always be successful.
119 func (s *GRPCServer) Serve(lis net.Listener) {
120 // Start serving in a goroutine
121 go s.server.Serve(lis)
123 // Wait until graceful completion
127 // GRPCServerConfig is the extra configuration passed along for consumers
128 // to facilitate using GRPC plugins.
129 type GRPCServerConfig struct {
130 StdoutAddr string `json:"stdout_addr"`
131 StderrAddr string `json:"stderr_addr"`