+++ /dev/null
-package plugin
-
-import (
- "errors"
- "fmt"
- "io"
- "log"
- "net"
- "net/rpc"
- "sync"
-
- "github.com/hashicorp/yamux"
-)
-
-// RPCServer listens for network connections and then dispenses interface
-// implementations over net/rpc.
-//
-// After setting the fields below, they shouldn't be read again directly
-// from the structure which may be reading/writing them concurrently.
-type RPCServer struct {
- Plugins map[string]Plugin
-
- // Stdout, Stderr are what this server will use instead of the
- // normal stdin/out/err. This is because due to the multi-process nature
- // of our plugin system, we can't use the normal process values so we
- // make our own custom one we pipe across.
- Stdout io.Reader
- Stderr io.Reader
-
- // DoneCh should be set to a non-nil channel that will be closed
- // when the control requests the RPC server to end.
- DoneCh chan<- struct{}
-
- lock sync.Mutex
-}
-
-// ServerProtocol impl.
-func (s *RPCServer) Init() error { return nil }
-
-// ServerProtocol impl.
-func (s *RPCServer) Config() string { return "" }
-
-// ServerProtocol impl.
-func (s *RPCServer) Serve(lis net.Listener) {
- for {
- conn, err := lis.Accept()
- if err != nil {
- log.Printf("[ERR] plugin: plugin server: %s", err)
- return
- }
-
- go s.ServeConn(conn)
- }
-}
-
-// ServeConn runs a single connection.
-//
-// ServeConn blocks, serving the connection until the client hangs up.
-func (s *RPCServer) ServeConn(conn io.ReadWriteCloser) {
- // First create the yamux server to wrap this connection
- mux, err := yamux.Server(conn, nil)
- if err != nil {
- conn.Close()
- log.Printf("[ERR] plugin: error creating yamux server: %s", err)
- return
- }
-
- // Accept the control connection
- control, err := mux.Accept()
- if err != nil {
- mux.Close()
- if err != io.EOF {
- log.Printf("[ERR] plugin: error accepting control connection: %s", err)
- }
-
- return
- }
-
- // Connect the stdstreams (in, out, err)
- stdstream := make([]net.Conn, 2)
- for i, _ := range stdstream {
- stdstream[i], err = mux.Accept()
- if err != nil {
- mux.Close()
- log.Printf("[ERR] plugin: accepting stream %d: %s", i, err)
- return
- }
- }
-
- // Copy std streams out to the proper place
- go copyStream("stdout", stdstream[0], s.Stdout)
- go copyStream("stderr", stdstream[1], s.Stderr)
-
- // Create the broker and start it up
- broker := newMuxBroker(mux)
- go broker.Run()
-
- // Use the control connection to build the dispenser and serve the
- // connection.
- server := rpc.NewServer()
- server.RegisterName("Control", &controlServer{
- server: s,
- })
- server.RegisterName("Dispenser", &dispenseServer{
- broker: broker,
- plugins: s.Plugins,
- })
- server.ServeConn(control)
-}
-
-// done is called internally by the control server to trigger the
-// doneCh to close which is listened to by the main process to cleanly
-// exit.
-func (s *RPCServer) done() {
- s.lock.Lock()
- defer s.lock.Unlock()
-
- if s.DoneCh != nil {
- close(s.DoneCh)
- s.DoneCh = nil
- }
-}
-
-// dispenseServer dispenses variousinterface implementations for Terraform.
-type controlServer struct {
- server *RPCServer
-}
-
-// Ping can be called to verify the connection (and likely the binary)
-// is still alive to a plugin.
-func (c *controlServer) Ping(
- null bool, response *struct{}) error {
- *response = struct{}{}
- return nil
-}
-
-func (c *controlServer) Quit(
- null bool, response *struct{}) error {
- // End the server
- c.server.done()
-
- // Always return true
- *response = struct{}{}
-
- return nil
-}
-
-// dispenseServer dispenses variousinterface implementations for Terraform.
-type dispenseServer struct {
- broker *MuxBroker
- plugins map[string]Plugin
-}
-
-func (d *dispenseServer) Dispense(
- name string, response *uint32) error {
- // Find the function to create this implementation
- p, ok := d.plugins[name]
- if !ok {
- return fmt.Errorf("unknown plugin type: %s", name)
- }
-
- // Create the implementation first so we know if there is an error.
- impl, err := p.Server(d.broker)
- if err != nil {
- // We turn the error into an errors error so that it works across RPC
- return errors.New(err.Error())
- }
-
- // Reserve an ID for our implementation
- id := d.broker.NextId()
- *response = id
-
- // Run the rest in a goroutine since it can only happen once this RPC
- // call returns. We wait for a connection for the plugin implementation
- // and serve it.
- go func() {
- conn, err := d.broker.Accept(id)
- if err != nil {
- log.Printf("[ERR] go-plugin: plugin dispense error: %s: %s", name, err)
- return
- }
-
- serve(conn, "Plugin", impl)
- }()
-
- return nil
-}
-
-func serve(conn io.ReadWriteCloser, name string, v interface{}) {
- server := rpc.NewServer()
- if err := server.RegisterName(name, v); err != nil {
- log.Printf("[ERR] go-plugin: plugin dispense error: %s", err)
- return
- }
-
- server.ServeConn(conn)
-}