10 "github.com/hashicorp/yamux"
13 // RPCClient connects to an RPCServer over net/rpc to dispense plugin types.
14 type RPCClient struct {
17 plugins map[string]Plugin
19 // These are the streams used for the various stdout/err overrides
20 stdout, stderr net.Conn
23 // newRPCClient creates a new RPCClient. The Client argument is expected
24 // to be successfully started already with a lock held.
25 func newRPCClient(c *Client) (*RPCClient, error) {
26 // Connect to the client
27 conn, err := net.Dial(c.address.Network(), c.address.String())
31 if tcpConn, ok := conn.(*net.TCPConn); ok {
32 // Make sure to set keep alive so that the connection doesn't die
33 tcpConn.SetKeepAlive(true)
36 if c.config.TLSConfig != nil {
37 conn = tls.Client(conn, c.config.TLSConfig)
40 // Create the actual RPC client
41 result, err := NewRPCClient(conn, c.config.Plugins)
47 // Begin the stream syncing so that stdin, out, err work properly
48 err = result.SyncStreams(
59 // NewRPCClient creates a client from an already-open connection-like value.
60 // Dial is typically used instead.
61 func NewRPCClient(conn io.ReadWriteCloser, plugins map[string]Plugin) (*RPCClient, error) {
62 // Create the yamux client so we can multiplex
63 mux, err := yamux.Client(conn, nil)
69 // Connect to the control stream.
70 control, err := mux.Open()
76 // Connect stdout, stderr streams
77 stdstream := make([]net.Conn, 2)
78 for i, _ := range stdstream {
79 stdstream[i], err = mux.Open()
86 // Create the broker and start it up
87 broker := newMuxBroker(mux)
90 // Build the client using our broker and control channel.
93 control: rpc.NewClient(control),
100 // SyncStreams should be called to enable syncing of stdout,
101 // stderr with the plugin.
103 // This will return immediately and the syncing will continue to happen
104 // in the background. You do not need to launch this in a goroutine itself.
106 // This should never be called multiple times.
107 func (c *RPCClient) SyncStreams(stdout io.Writer, stderr io.Writer) error {
108 go copyStream("stdout", stdout, c.stdout)
109 go copyStream("stderr", stderr, c.stderr)
113 // Close closes the connection. The client is no longer usable after this
115 func (c *RPCClient) Close() error {
116 // Call the control channel and ask it to gracefully exit. If this
117 // errors, then we save it so that we always return an error but we
118 // want to try to close the other channels anyways.
120 returnErr := c.control.Call("Control.Quit", true, &empty)
122 // Close the other streams we have
123 if err := c.control.Close(); err != nil {
126 if err := c.stdout.Close(); err != nil {
129 if err := c.stderr.Close(); err != nil {
132 if err := c.broker.Close(); err != nil {
136 // Return back the error we got from Control.Quit. This is very important
137 // since we MUST return non-nil error if this fails so that Client.Kill
138 // will properly try a process.Kill.
142 func (c *RPCClient) Dispense(name string) (interface{}, error) {
143 p, ok := c.plugins[name]
145 return nil, fmt.Errorf("unknown plugin type: %s", name)
149 if err := c.control.Call(
150 "Dispenser.Dispense", name, &id); err != nil {
154 conn, err := c.broker.Dial(id)
159 return p.Client(c.broker, rpc.NewClient(conn))
162 // Ping pings the connection to ensure it is still alive.
164 // The error from the RPC call is returned exactly if you want to inspect
165 // it for further error analysis. Any error returned from here would indicate
166 // that the connection to the plugin is not healthy.
167 func (c *RPCClient) Ping() error {
169 return c.control.Call("Control.Ping", true, &empty)