12 "github.com/hashicorp/yamux"
15 // RPCServer listens for network connections and then dispenses interface
16 // implementations over net/rpc.
18 // After setting the fields below, they shouldn't be read again directly
19 // from the structure which may be reading/writing them concurrently.
20 type RPCServer struct {
21 Plugins map[string]Plugin
23 // Stdout, Stderr are what this server will use instead of the
24 // normal stdin/out/err. This is because due to the multi-process nature
25 // of our plugin system, we can't use the normal process values so we
26 // make our own custom one we pipe across.
30 // DoneCh should be set to a non-nil channel that will be closed
31 // when the control requests the RPC server to end.
32 DoneCh chan<- struct{}
37 // ServerProtocol impl.
38 func (s *RPCServer) Init() error { return nil }
40 // ServerProtocol impl.
41 func (s *RPCServer) Config() string { return "" }
43 // ServerProtocol impl.
44 func (s *RPCServer) Serve(lis net.Listener) {
46 conn, err := lis.Accept()
48 log.Printf("[ERR] plugin: plugin server: %s", err)
56 // ServeConn runs a single connection.
58 // ServeConn blocks, serving the connection until the client hangs up.
59 func (s *RPCServer) ServeConn(conn io.ReadWriteCloser) {
60 // First create the yamux server to wrap this connection
61 mux, err := yamux.Server(conn, nil)
64 log.Printf("[ERR] plugin: error creating yamux server: %s", err)
68 // Accept the control connection
69 control, err := mux.Accept()
73 log.Printf("[ERR] plugin: error accepting control connection: %s", err)
79 // Connect the stdstreams (in, out, err)
80 stdstream := make([]net.Conn, 2)
81 for i, _ := range stdstream {
82 stdstream[i], err = mux.Accept()
85 log.Printf("[ERR] plugin: accepting stream %d: %s", i, err)
90 // Copy std streams out to the proper place
91 go copyStream("stdout", stdstream[0], s.Stdout)
92 go copyStream("stderr", stdstream[1], s.Stderr)
94 // Create the broker and start it up
95 broker := newMuxBroker(mux)
98 // Use the control connection to build the dispenser and serve the
100 server := rpc.NewServer()
101 server.RegisterName("Control", &controlServer{
104 server.RegisterName("Dispenser", &dispenseServer{
108 server.ServeConn(control)
111 // done is called internally by the control server to trigger the
112 // doneCh to close which is listened to by the main process to cleanly
114 func (s *RPCServer) done() {
116 defer s.lock.Unlock()
124 // dispenseServer dispenses variousinterface implementations for Terraform.
125 type controlServer struct {
129 // Ping can be called to verify the connection (and likely the binary)
130 // is still alive to a plugin.
131 func (c *controlServer) Ping(
132 null bool, response *struct{}) error {
133 *response = struct{}{}
137 func (c *controlServer) Quit(
138 null bool, response *struct{}) error {
142 // Always return true
143 *response = struct{}{}
148 // dispenseServer dispenses variousinterface implementations for Terraform.
149 type dispenseServer struct {
151 plugins map[string]Plugin
154 func (d *dispenseServer) Dispense(
155 name string, response *uint32) error {
156 // Find the function to create this implementation
157 p, ok := d.plugins[name]
159 return fmt.Errorf("unknown plugin type: %s", name)
162 // Create the implementation first so we know if there is an error.
163 impl, err := p.Server(d.broker)
165 // We turn the error into an errors error so that it works across RPC
166 return errors.New(err.Error())
169 // Reserve an ID for our implementation
170 id := d.broker.NextId()
173 // Run the rest in a goroutine since it can only happen once this RPC
174 // call returns. We wait for a connection for the plugin implementation
177 conn, err := d.broker.Accept(id)
179 log.Printf("[ERR] go-plugin: plugin dispense error: %s: %s", name, err)
183 serve(conn, "Plugin", impl)
189 func serve(conn io.ReadWriteCloser, name string, v interface{}) {
190 server := rpc.NewServer()
191 if err := server.RegisterName(name, v); err != nil {
192 log.Printf("[ERR] go-plugin: plugin dispense error: %s", err)
196 server.ServeConn(conn)