24 hclog "github.com/hashicorp/go-hclog"
27 // If this is 1, then we've called CleanupClients. This can be used
28 // by plugin RPC implementations to change error behavior since you
29 // can expected network connection errors at this point. This should be
30 // read by using sync/atomic.
33 // This is a slice of the "managed" clients which are cleaned up when
35 var managedClients = make([]*Client, 0, 5)
36 var managedClientsLock sync.Mutex
40 // ErrProcessNotFound is returned when a client is instantiated to
41 // reattach to an existing process and it isn't found.
42 ErrProcessNotFound = errors.New("Reattachment process not found")
44 // ErrChecksumsDoNotMatch is returned when binary's checksum doesn't match
45 // the one provided in the SecureConfig.
46 ErrChecksumsDoNotMatch = errors.New("checksums did not match")
48 // ErrSecureNoChecksum is returned when an empty checksum is provided to the
50 ErrSecureConfigNoChecksum = errors.New("no checksum provided")
52 // ErrSecureNoHash is returned when a nil Hash object is provided to the
54 ErrSecureConfigNoHash = errors.New("no hash implementation provided")
56 // ErrSecureConfigAndReattach is returned when both Reattach and
57 // SecureConfig are set.
58 ErrSecureConfigAndReattach = errors.New("only one of Reattach or SecureConfig can be set")
61 // Client handles the lifecycle of a plugin application. It launches
62 // plugins, connects to them, dispenses interface implementations, and handles
63 // killing the process.
65 // Plugin hosts should use one Client for each plugin executable. To
66 // dispense a plugin type, use the `Client.Client` function, and then
67 // cal `Dispense`. This awkward API is mostly historical but is used to split
68 // the client that deals with subprocess management and the client that
69 // does RPC management.
71 // See NewClient and ClientConfig for using a Client.
75 doneLogging chan struct{}
82 doneCtx context.Context
85 // ClientConfig is the configuration used to initialize a new
86 // plugin client. After being used to initialize a plugin client,
87 // that configuration must not be modified again.
88 type ClientConfig struct {
89 // HandshakeConfig is the configuration that must match servers.
92 // Plugins are the plugins that can be consumed.
93 Plugins map[string]Plugin
95 // One of the following must be set, but not both.
97 // Cmd is the unstarted subprocess for starting the plugin. If this is
98 // set, then the Client starts the plugin process on its own and connects
101 // Reattach is configuration for reattaching to an existing plugin process
102 // that is already running. This isn't common.
104 Reattach *ReattachConfig
106 // SecureConfig is configuration for verifying the integrity of the
107 // executable. It can not be used with Reattach.
108 SecureConfig *SecureConfig
110 // TLSConfig is used to enable TLS on the RPC client.
111 TLSConfig *tls.Config
113 // Managed represents if the client should be managed by the
114 // plugin package or not. If true, then by calling CleanupClients,
115 // it will automatically be cleaned up. Otherwise, the client
116 // user is fully responsible for making sure to Kill all plugin
117 // clients. By default the client is _not_ managed.
120 // The minimum and maximum port to use for communicating with
121 // the subprocess. If not set, this defaults to 10,000 and 25,000
123 MinPort, MaxPort uint
125 // StartTimeout is the timeout to wait for the plugin to say it
126 // has started successfully.
127 StartTimeout time.Duration
129 // If non-nil, then the stderr of the client will be written to here
130 // (as well as the log). This is the original os.Stderr of the subprocess.
131 // This isn't the output of synced stderr.
134 // SyncStdout, SyncStderr can be set to override the
135 // respective os.Std* values in the plugin. Care should be taken to
136 // avoid races here. If these are nil, then this will automatically be
137 // hooked up to os.Stdin, Stdout, and Stderr, respectively.
139 // If the default values (nil) are used, then this package will not
140 // sync any of these streams.
144 // AllowedProtocols is a list of allowed protocols. If this isn't set,
145 // then only netrpc is allowed. This is so that older go-plugin systems
146 // can show friendly errors if they see a plugin with an unknown
149 // By setting this, you can cause an error immediately on plugin start
150 // if an unsupported protocol is used with a good error message.
152 // If this isn't set at all (nil value), then only net/rpc is accepted.
153 // This is done for legacy reasons. You must explicitly opt-in to
155 AllowedProtocols []Protocol
157 // Logger is the logger that the client will used. If none is provided,
158 // it will default to hclog's default logger.
162 // ReattachConfig is used to configure a client to reattach to an
163 // already-running plugin process. You can retrieve this information by
164 // calling ReattachConfig on Client.
165 type ReattachConfig struct {
171 // SecureConfig is used to configure a client to verify the integrity of an
172 // executable before running. It does this by verifying the checksum is
173 // expected. Hash is used to specify the hashing method to use when checksumming
174 // the file. The configuration is verified by the client by calling the
175 // SecureConfig.Check() function.
177 // The host process should ensure the checksum was provided by a trusted and
178 // authoritative source. The binary should be installed in such a way that it
179 // can not be modified by an unauthorized user between the time of this check
180 // and the time of execution.
181 type SecureConfig struct {
186 // Check takes the filepath to an executable and returns true if the checksum of
187 // the file matches the checksum provided in the SecureConfig.
188 func (s *SecureConfig) Check(filePath string) (bool, error) {
189 if len(s.Checksum) == 0 {
190 return false, ErrSecureConfigNoChecksum
194 return false, ErrSecureConfigNoHash
197 file, err := os.Open(filePath)
203 _, err = io.Copy(s.Hash, file)
208 sum := s.Hash.Sum(nil)
210 return subtle.ConstantTimeCompare(sum, s.Checksum) == 1, nil
213 // This makes sure all the managed subprocesses are killed and properly
214 // logged. This should be called before the parent process running the
217 // This must only be called _once_.
218 func CleanupClients() {
219 // Set the killed to true so that we don't get unexpected panics
220 atomic.StoreUint32(&Killed, 1)
222 // Kill all the managed clients in parallel and use a WaitGroup
223 // to wait for them all to finish up.
224 var wg sync.WaitGroup
225 managedClientsLock.Lock()
226 for _, client := range managedClients {
229 go func(client *Client) {
234 managedClientsLock.Unlock()
239 // Creates a new plugin client which manages the lifecycle of an external
240 // plugin and gets the address for the RPC connection.
242 // The client must be cleaned up at some point by calling Kill(). If
243 // the client is a managed client (created with NewManagedClient) you
244 // can just call CleanupClients at the end of your program and they will
245 // be properly cleaned.
246 func NewClient(config *ClientConfig) (c *Client) {
247 if config.MinPort == 0 && config.MaxPort == 0 {
248 config.MinPort = 10000
249 config.MaxPort = 25000
252 if config.StartTimeout == 0 {
253 config.StartTimeout = 1 * time.Minute
256 if config.Stderr == nil {
257 config.Stderr = ioutil.Discard
260 if config.SyncStdout == nil {
261 config.SyncStdout = ioutil.Discard
263 if config.SyncStderr == nil {
264 config.SyncStderr = ioutil.Discard
267 if config.AllowedProtocols == nil {
268 config.AllowedProtocols = []Protocol{ProtocolNetRPC}
271 if config.Logger == nil {
272 config.Logger = hclog.New(&hclog.LoggerOptions{
273 Output: hclog.DefaultOutput,
281 logger: config.Logger,
284 managedClientsLock.Lock()
285 managedClients = append(managedClients, c)
286 managedClientsLock.Unlock()
292 // Client returns the protocol client for this connection.
294 // Subsequent calls to this will return the same client.
295 func (c *Client) Client() (ClientProtocol, error) {
310 c.client, err = newRPCClient(c)
313 c.client, err = newGRPCClient(c.doneCtx, c)
316 return nil, fmt.Errorf("unknown server protocol: %s", c.protocol)
327 // Tells whether or not the underlying process has exited.
328 func (c *Client) Exited() bool {
334 // End the executing subprocess (if it is running) and perform any cleanup
335 // tasks necessary such as capturing any remaining logs and so on.
337 // This method blocks until the process successfully exits.
339 // This method can safely be called multiple times.
340 func (c *Client) Kill() {
341 // Grab a lock to read some private fields.
345 doneCh := c.doneLogging
348 // If there is no process, we never started anything. Nothing to kill.
353 // We need to check for address here. It is possible that the plugin
354 // started (process != nil) but has no address (addr == nil) if the
355 // plugin failed at startup. If we do have an address, we need to close
356 // the plugin net connections.
359 // Close the client to cleanly exit the process.
360 client, err := c.Client()
364 // If there is no error, then we attempt to wait for a graceful
365 // exit. If there was an error, we assume that graceful cleanup
366 // won't happen and just force kill.
367 graceful = err == nil
369 // If there was an error just log it. We're going to force
370 // kill in a moment anyways.
371 c.logger.Warn("error closing client during Kill", "err", err)
376 // If we're attempting a graceful exit, then we wait for a short period
377 // of time to allow that to happen. To wait for this we just wait on the
378 // doneCh which would be closed if the process exits.
383 case <-time.After(250 * time.Millisecond):
387 // If graceful exiting failed, just kill it
390 // Wait for the client to finish logging so we have a complete log
394 // Starts the underlying subprocess, communicating with it to negotiate
395 // a port for RPC connections, and returning the address to connect via RPC.
397 // This method is safe to call multiple times. Subsequent calls have no effect.
398 // Once a client has been started once, it cannot be started again, even if
400 func (c *Client) Start() (addr net.Addr, err error) {
404 if c.address != nil {
405 return c.address, nil
408 // If one of cmd or reattach isn't set, then it is an error. We wrap
409 // this in a {} for scoping reasons, and hopeful that the escape
410 // analysis will pop the stock here.
412 cmdSet := c.config.Cmd != nil
413 attachSet := c.config.Reattach != nil
414 secureSet := c.config.SecureConfig != nil
415 if cmdSet == attachSet {
416 return nil, fmt.Errorf("Only one of Cmd or Reattach must be set")
419 if secureSet && attachSet {
420 return nil, ErrSecureConfigAndReattach
424 // Create the logging channel for when we kill
425 c.doneLogging = make(chan struct{})
426 // Create a context for when we kill
427 var ctxCancel context.CancelFunc
428 c.doneCtx, ctxCancel = context.WithCancel(context.Background())
430 if c.config.Reattach != nil {
431 // Verify the process still exists. If not, then it is an error
432 p, err := os.FindProcess(c.config.Reattach.Pid)
437 // Attempt to connect to the addr since on Unix systems FindProcess
438 // doesn't actually return an error if it can't find the process.
439 conn, err := net.Dial(
440 c.config.Reattach.Addr.Network(),
441 c.config.Reattach.Addr.String())
444 return nil, ErrProcessNotFound
448 // Goroutine to mark exit status
450 // Wait for the process to die
453 // Log so we can see it
454 c.logger.Debug("reattached plugin process exited")
461 // Close the logging channel since that doesn't work on reattach
464 // Cancel the context
468 // Set the address and process
469 c.address = c.config.Reattach.Addr
471 c.protocol = c.config.Reattach.Protocol
472 if c.protocol == "" {
473 // Default the protocol to net/rpc for backwards compatibility
474 c.protocol = ProtocolNetRPC
477 return c.address, nil
481 fmt.Sprintf("%s=%s", c.config.MagicCookieKey, c.config.MagicCookieValue),
482 fmt.Sprintf("PLUGIN_MIN_PORT=%d", c.config.MinPort),
483 fmt.Sprintf("PLUGIN_MAX_PORT=%d", c.config.MaxPort),
486 stdout_r, stdout_w := io.Pipe()
487 stderr_r, stderr_w := io.Pipe()
490 cmd.Env = append(cmd.Env, os.Environ()...)
491 cmd.Env = append(cmd.Env, env...)
493 cmd.Stderr = stderr_w
494 cmd.Stdout = stdout_w
496 if c.config.SecureConfig != nil {
497 if ok, err := c.config.SecureConfig.Check(cmd.Path); err != nil {
498 return nil, fmt.Errorf("error verifying checksum: %s", err)
500 return nil, ErrChecksumsDoNotMatch
504 c.logger.Debug("starting plugin", "path", cmd.Path, "args", cmd.Args)
511 c.process = cmd.Process
513 // Make sure the command is properly cleaned up if there is an error
517 if err != nil || r != nil {
526 // Start goroutine to wait for process to exit
527 exitCh := make(chan struct{})
529 // Make sure we close the write end of our stderr/stdout so
530 // that the readers send EOF properly.
531 defer stderr_w.Close()
532 defer stdout_w.Close()
534 // Wait for the command to end.
537 // Log and make sure to flush the logs write away
538 c.logger.Debug("plugin process exited", "path", cmd.Path)
541 // Mark that we exited
544 // Cancel the context, marking that we exited
547 // Set that we exited, which takes a lock
553 // Start goroutine that logs the stderr
554 go c.logStderr(stderr_r)
556 // Start a goroutine that is going to be reading the lines
558 linesCh := make(chan []byte)
562 buf := bufio.NewReader(stdout_r)
564 line, err := buf.ReadBytes('\n')
575 // Make sure after we exit we read the lines from stdout forever
576 // so they don't block since it is an io.Pipe
579 for _ = range linesCh {
584 // Some channels for the next step
585 timeout := time.After(c.config.StartTimeout)
587 // Start looking for the address
588 c.logger.Debug("waiting for RPC address", "path", cmd.Path)
591 err = errors.New("timeout while waiting for plugin to start")
593 err = errors.New("plugin exited before we could connect")
594 case lineBytes := <-linesCh:
595 // Trim the line and split by "|" in order to get the parts of
597 line := strings.TrimSpace(string(lineBytes))
598 parts := strings.SplitN(line, "|", 6)
601 "Unrecognized remote plugin message: %s\n\n"+
602 "This usually means that the plugin is either invalid or simply\n"+
603 "needs to be recompiled to support the latest protocol.", line)
607 // Check the core protocol. Wrapped in a {} for scoping.
609 var coreProtocol int64
610 coreProtocol, err = strconv.ParseInt(parts[0], 10, 0)
612 err = fmt.Errorf("Error parsing core protocol version: %s", err)
616 if int(coreProtocol) != CoreProtocolVersion {
617 err = fmt.Errorf("Incompatible core API version with plugin. "+
618 "Plugin version: %s, Core version: %d\n\n"+
619 "To fix this, the plugin usually only needs to be recompiled.\n"+
620 "Please report this to the plugin author.", parts[0], CoreProtocolVersion)
625 // Parse the protocol version
627 protocol, err = strconv.ParseInt(parts[1], 10, 0)
629 err = fmt.Errorf("Error parsing protocol version: %s", err)
633 // Test the API version
634 if uint(protocol) != c.config.ProtocolVersion {
635 err = fmt.Errorf("Incompatible API version with plugin. "+
636 "Plugin version: %s, Core version: %d", parts[1], c.config.ProtocolVersion)
642 addr, err = net.ResolveTCPAddr("tcp", parts[3])
644 addr, err = net.ResolveUnixAddr("unix", parts[3])
646 err = fmt.Errorf("Unknown address type: %s", parts[3])
649 // If we have a server type, then record that. We default to net/rpc
650 // for backwards compatibility.
651 c.protocol = ProtocolNetRPC
653 c.protocol = Protocol(parts[4])
657 for _, p := range c.config.AllowedProtocols {
664 err = fmt.Errorf("Unsupported plugin protocol %q. Supported: %v",
665 c.protocol, c.config.AllowedProtocols)
675 // ReattachConfig returns the information that must be provided to NewClient
676 // to reattach to the plugin process that this client started. This is
677 // useful for plugins that detach from their parent process.
679 // If this returns nil then the process hasn't been started yet. Please
680 // call Start or Client before calling this.
681 func (c *Client) ReattachConfig() *ReattachConfig {
685 if c.address == nil {
689 if c.config.Cmd != nil && c.config.Cmd.Process == nil {
693 // If we connected via reattach, just return the information as-is
694 if c.config.Reattach != nil {
695 return c.config.Reattach
698 return &ReattachConfig{
699 Protocol: c.protocol,
701 Pid: c.config.Cmd.Process.Pid,
705 // Protocol returns the protocol of server on the remote end. This will
706 // start the plugin process if it isn't already started. Errors from
707 // starting the plugin are surpressed and ProtocolInvalid is returned. It
708 // is recommended you call Start explicitly before calling Protocol to ensure
710 func (c *Client) Protocol() Protocol {
713 return ProtocolInvalid
719 func netAddrDialer(addr net.Addr) func(string, time.Duration) (net.Conn, error) {
720 return func(_ string, _ time.Duration) (net.Conn, error) {
721 // Connect to the client
722 conn, err := net.Dial(addr.Network(), addr.String())
726 if tcpConn, ok := conn.(*net.TCPConn); ok {
727 // Make sure to set keep alive so that the connection doesn't die
728 tcpConn.SetKeepAlive(true)
735 // dialer is compatible with grpc.WithDialer and creates the connection
737 func (c *Client) dialer(_ string, timeout time.Duration) (net.Conn, error) {
738 conn, err := netAddrDialer(c.address)("", timeout)
743 // If we have a TLS config we wrap our connection. We only do this
744 // for net/rpc since gRPC uses its own mechanism for TLS.
745 if c.protocol == ProtocolNetRPC && c.config.TLSConfig != nil {
746 conn = tls.Client(conn, c.config.TLSConfig)
752 func (c *Client) logStderr(r io.Reader) {
753 bufR := bufio.NewReader(r)
754 l := c.logger.Named(filepath.Base(c.config.Cmd.Path))
757 line, err := bufR.ReadString('\n')
759 c.config.Stderr.Write([]byte(line))
760 line = strings.TrimRightFunc(line, unicode.IsSpace)
762 entry, err := parseJSON(line)
763 // If output is not JSON format, print directly to Debug
767 out := flattenKVPairs(entry.KVPairs)
769 out = append(out, "timestamp", entry.Timestamp.Format(hclog.TimeFormat))
770 switch hclog.LevelFromString(entry.Level) {
772 l.Trace(entry.Message, out...)
774 l.Debug(entry.Message, out...)
776 l.Info(entry.Message, out...)
778 l.Warn(entry.Message, out...)
780 l.Error(entry.Message, out...)
790 // Flag that we've completed logging for others