+++ /dev/null
-package plugin
-
-import (
- "crypto/tls"
- "fmt"
- "io"
- "net"
- "net/rpc"
-
- "github.com/hashicorp/yamux"
-)
-
-// RPCClient connects to an RPCServer over net/rpc to dispense plugin types.
-type RPCClient struct {
- broker *MuxBroker
- control *rpc.Client
- plugins map[string]Plugin
-
- // These are the streams used for the various stdout/err overrides
- stdout, stderr net.Conn
-}
-
-// newRPCClient creates a new RPCClient. The Client argument is expected
-// to be successfully started already with a lock held.
-func newRPCClient(c *Client) (*RPCClient, error) {
- // Connect to the client
- conn, err := net.Dial(c.address.Network(), c.address.String())
- if err != nil {
- return nil, err
- }
- if tcpConn, ok := conn.(*net.TCPConn); ok {
- // Make sure to set keep alive so that the connection doesn't die
- tcpConn.SetKeepAlive(true)
- }
-
- if c.config.TLSConfig != nil {
- conn = tls.Client(conn, c.config.TLSConfig)
- }
-
- // Create the actual RPC client
- result, err := NewRPCClient(conn, c.config.Plugins)
- if err != nil {
- conn.Close()
- return nil, err
- }
-
- // Begin the stream syncing so that stdin, out, err work properly
- err = result.SyncStreams(
- c.config.SyncStdout,
- c.config.SyncStderr)
- if err != nil {
- result.Close()
- return nil, err
- }
-
- return result, nil
-}
-
-// NewRPCClient creates a client from an already-open connection-like value.
-// Dial is typically used instead.
-func NewRPCClient(conn io.ReadWriteCloser, plugins map[string]Plugin) (*RPCClient, error) {
- // Create the yamux client so we can multiplex
- mux, err := yamux.Client(conn, nil)
- if err != nil {
- conn.Close()
- return nil, err
- }
-
- // Connect to the control stream.
- control, err := mux.Open()
- if err != nil {
- mux.Close()
- return nil, err
- }
-
- // Connect stdout, stderr streams
- stdstream := make([]net.Conn, 2)
- for i, _ := range stdstream {
- stdstream[i], err = mux.Open()
- if err != nil {
- mux.Close()
- return nil, err
- }
- }
-
- // Create the broker and start it up
- broker := newMuxBroker(mux)
- go broker.Run()
-
- // Build the client using our broker and control channel.
- return &RPCClient{
- broker: broker,
- control: rpc.NewClient(control),
- plugins: plugins,
- stdout: stdstream[0],
- stderr: stdstream[1],
- }, nil
-}
-
-// SyncStreams should be called to enable syncing of stdout,
-// stderr with the plugin.
-//
-// This will return immediately and the syncing will continue to happen
-// in the background. You do not need to launch this in a goroutine itself.
-//
-// This should never be called multiple times.
-func (c *RPCClient) SyncStreams(stdout io.Writer, stderr io.Writer) error {
- go copyStream("stdout", stdout, c.stdout)
- go copyStream("stderr", stderr, c.stderr)
- return nil
-}
-
-// Close closes the connection. The client is no longer usable after this
-// is called.
-func (c *RPCClient) Close() error {
- // Call the control channel and ask it to gracefully exit. If this
- // errors, then we save it so that we always return an error but we
- // want to try to close the other channels anyways.
- var empty struct{}
- returnErr := c.control.Call("Control.Quit", true, &empty)
-
- // Close the other streams we have
- if err := c.control.Close(); err != nil {
- return err
- }
- if err := c.stdout.Close(); err != nil {
- return err
- }
- if err := c.stderr.Close(); err != nil {
- return err
- }
- if err := c.broker.Close(); err != nil {
- return err
- }
-
- // Return back the error we got from Control.Quit. This is very important
- // since we MUST return non-nil error if this fails so that Client.Kill
- // will properly try a process.Kill.
- return returnErr
-}
-
-func (c *RPCClient) Dispense(name string) (interface{}, error) {
- p, ok := c.plugins[name]
- if !ok {
- return nil, fmt.Errorf("unknown plugin type: %s", name)
- }
-
- var id uint32
- if err := c.control.Call(
- "Dispenser.Dispense", name, &id); err != nil {
- return nil, err
- }
-
- conn, err := c.broker.Dial(id)
- if err != nil {
- return nil, err
- }
-
- return p.Client(c.broker, rpc.NewClient(conn))
-}
-
-// Ping pings the connection to ensure it is still alive.
-//
-// The error from the RPC call is returned exactly if you want to inspect
-// it for further error analysis. Any error returned from here would indicate
-// that the connection to the plugin is not healthy.
-func (c *RPCClient) Ping() error {
- var empty struct{}
- return c.control.Call("Control.Ping", true, &empty)
-}