OSDN Git Service

add package
[bytom/vapor.git] / vendor / github.com / hashicorp / go-plugin / rpc_client.go
1 package plugin
2
3 import (
4         "crypto/tls"
5         "fmt"
6         "io"
7         "net"
8         "net/rpc"
9
10         "github.com/hashicorp/yamux"
11 )
12
13 // RPCClient connects to an RPCServer over net/rpc to dispense plugin types.
14 type RPCClient struct {
15         broker  *MuxBroker
16         control *rpc.Client
17         plugins map[string]Plugin
18
19         // These are the streams used for the various stdout/err overrides
20         stdout, stderr net.Conn
21 }
22
23 // newRPCClient creates a new RPCClient. The Client argument is expected
24 // to be successfully started already with a lock held.
25 func newRPCClient(c *Client) (*RPCClient, error) {
26         // Connect to the client
27         conn, err := net.Dial(c.address.Network(), c.address.String())
28         if err != nil {
29                 return nil, err
30         }
31         if tcpConn, ok := conn.(*net.TCPConn); ok {
32                 // Make sure to set keep alive so that the connection doesn't die
33                 tcpConn.SetKeepAlive(true)
34         }
35
36         if c.config.TLSConfig != nil {
37                 conn = tls.Client(conn, c.config.TLSConfig)
38         }
39
40         // Create the actual RPC client
41         result, err := NewRPCClient(conn, c.config.Plugins)
42         if err != nil {
43                 conn.Close()
44                 return nil, err
45         }
46
47         // Begin the stream syncing so that stdin, out, err work properly
48         err = result.SyncStreams(
49                 c.config.SyncStdout,
50                 c.config.SyncStderr)
51         if err != nil {
52                 result.Close()
53                 return nil, err
54         }
55
56         return result, nil
57 }
58
59 // NewRPCClient creates a client from an already-open connection-like value.
60 // Dial is typically used instead.
61 func NewRPCClient(conn io.ReadWriteCloser, plugins map[string]Plugin) (*RPCClient, error) {
62         // Create the yamux client so we can multiplex
63         mux, err := yamux.Client(conn, nil)
64         if err != nil {
65                 conn.Close()
66                 return nil, err
67         }
68
69         // Connect to the control stream.
70         control, err := mux.Open()
71         if err != nil {
72                 mux.Close()
73                 return nil, err
74         }
75
76         // Connect stdout, stderr streams
77         stdstream := make([]net.Conn, 2)
78         for i, _ := range stdstream {
79                 stdstream[i], err = mux.Open()
80                 if err != nil {
81                         mux.Close()
82                         return nil, err
83                 }
84         }
85
86         // Create the broker and start it up
87         broker := newMuxBroker(mux)
88         go broker.Run()
89
90         // Build the client using our broker and control channel.
91         return &RPCClient{
92                 broker:  broker,
93                 control: rpc.NewClient(control),
94                 plugins: plugins,
95                 stdout:  stdstream[0],
96                 stderr:  stdstream[1],
97         }, nil
98 }
99
100 // SyncStreams should be called to enable syncing of stdout,
101 // stderr with the plugin.
102 //
103 // This will return immediately and the syncing will continue to happen
104 // in the background. You do not need to launch this in a goroutine itself.
105 //
106 // This should never be called multiple times.
107 func (c *RPCClient) SyncStreams(stdout io.Writer, stderr io.Writer) error {
108         go copyStream("stdout", stdout, c.stdout)
109         go copyStream("stderr", stderr, c.stderr)
110         return nil
111 }
112
113 // Close closes the connection. The client is no longer usable after this
114 // is called.
115 func (c *RPCClient) Close() error {
116         // Call the control channel and ask it to gracefully exit. If this
117         // errors, then we save it so that we always return an error but we
118         // want to try to close the other channels anyways.
119         var empty struct{}
120         returnErr := c.control.Call("Control.Quit", true, &empty)
121
122         // Close the other streams we have
123         if err := c.control.Close(); err != nil {
124                 return err
125         }
126         if err := c.stdout.Close(); err != nil {
127                 return err
128         }
129         if err := c.stderr.Close(); err != nil {
130                 return err
131         }
132         if err := c.broker.Close(); err != nil {
133                 return err
134         }
135
136         // Return back the error we got from Control.Quit. This is very important
137         // since we MUST return non-nil error if this fails so that Client.Kill
138         // will properly try a process.Kill.
139         return returnErr
140 }
141
142 func (c *RPCClient) Dispense(name string) (interface{}, error) {
143         p, ok := c.plugins[name]
144         if !ok {
145                 return nil, fmt.Errorf("unknown plugin type: %s", name)
146         }
147
148         var id uint32
149         if err := c.control.Call(
150                 "Dispenser.Dispense", name, &id); err != nil {
151                 return nil, err
152         }
153
154         conn, err := c.broker.Dial(id)
155         if err != nil {
156                 return nil, err
157         }
158
159         return p.Client(c.broker, rpc.NewClient(conn))
160 }
161
162 // Ping pings the connection to ensure it is still alive.
163 //
164 // The error from the RPC call is returned exactly if you want to inspect
165 // it for further error analysis. Any error returned from here would indicate
166 // that the connection to the plugin is not healthy.
167 func (c *RPCClient) Ping() error {
168         var empty struct{}
169         return c.control.Call("Control.Ping", true, &empty)
170 }