OSDN Git Service

add package
[bytom/vapor.git] / vendor / github.com / hashicorp / go-plugin / rpc_server.go
1 package plugin
2
3 import (
4         "errors"
5         "fmt"
6         "io"
7         "log"
8         "net"
9         "net/rpc"
10         "sync"
11
12         "github.com/hashicorp/yamux"
13 )
14
15 // RPCServer listens for network connections and then dispenses interface
16 // implementations over net/rpc.
17 //
18 // After setting the fields below, they shouldn't be read again directly
19 // from the structure which may be reading/writing them concurrently.
20 type RPCServer struct {
21         Plugins map[string]Plugin
22
23         // Stdout, Stderr are what this server will use instead of the
24         // normal stdin/out/err. This is because due to the multi-process nature
25         // of our plugin system, we can't use the normal process values so we
26         // make our own custom one we pipe across.
27         Stdout io.Reader
28         Stderr io.Reader
29
30         // DoneCh should be set to a non-nil channel that will be closed
31         // when the control requests the RPC server to end.
32         DoneCh chan<- struct{}
33
34         lock sync.Mutex
35 }
36
37 // ServerProtocol impl.
38 func (s *RPCServer) Init() error { return nil }
39
40 // ServerProtocol impl.
41 func (s *RPCServer) Config() string { return "" }
42
43 // ServerProtocol impl.
44 func (s *RPCServer) Serve(lis net.Listener) {
45         for {
46                 conn, err := lis.Accept()
47                 if err != nil {
48                         log.Printf("[ERR] plugin: plugin server: %s", err)
49                         return
50                 }
51
52                 go s.ServeConn(conn)
53         }
54 }
55
56 // ServeConn runs a single connection.
57 //
58 // ServeConn blocks, serving the connection until the client hangs up.
59 func (s *RPCServer) ServeConn(conn io.ReadWriteCloser) {
60         // First create the yamux server to wrap this connection
61         mux, err := yamux.Server(conn, nil)
62         if err != nil {
63                 conn.Close()
64                 log.Printf("[ERR] plugin: error creating yamux server: %s", err)
65                 return
66         }
67
68         // Accept the control connection
69         control, err := mux.Accept()
70         if err != nil {
71                 mux.Close()
72                 if err != io.EOF {
73                         log.Printf("[ERR] plugin: error accepting control connection: %s", err)
74                 }
75
76                 return
77         }
78
79         // Connect the stdstreams (in, out, err)
80         stdstream := make([]net.Conn, 2)
81         for i, _ := range stdstream {
82                 stdstream[i], err = mux.Accept()
83                 if err != nil {
84                         mux.Close()
85                         log.Printf("[ERR] plugin: accepting stream %d: %s", i, err)
86                         return
87                 }
88         }
89
90         // Copy std streams out to the proper place
91         go copyStream("stdout", stdstream[0], s.Stdout)
92         go copyStream("stderr", stdstream[1], s.Stderr)
93
94         // Create the broker and start it up
95         broker := newMuxBroker(mux)
96         go broker.Run()
97
98         // Use the control connection to build the dispenser and serve the
99         // connection.
100         server := rpc.NewServer()
101         server.RegisterName("Control", &controlServer{
102                 server: s,
103         })
104         server.RegisterName("Dispenser", &dispenseServer{
105                 broker:  broker,
106                 plugins: s.Plugins,
107         })
108         server.ServeConn(control)
109 }
110
111 // done is called internally by the control server to trigger the
112 // doneCh to close which is listened to by the main process to cleanly
113 // exit.
114 func (s *RPCServer) done() {
115         s.lock.Lock()
116         defer s.lock.Unlock()
117
118         if s.DoneCh != nil {
119                 close(s.DoneCh)
120                 s.DoneCh = nil
121         }
122 }
123
124 // dispenseServer dispenses variousinterface implementations for Terraform.
125 type controlServer struct {
126         server *RPCServer
127 }
128
129 // Ping can be called to verify the connection (and likely the binary)
130 // is still alive to a plugin.
131 func (c *controlServer) Ping(
132         null bool, response *struct{}) error {
133         *response = struct{}{}
134         return nil
135 }
136
137 func (c *controlServer) Quit(
138         null bool, response *struct{}) error {
139         // End the server
140         c.server.done()
141
142         // Always return true
143         *response = struct{}{}
144
145         return nil
146 }
147
148 // dispenseServer dispenses variousinterface implementations for Terraform.
149 type dispenseServer struct {
150         broker  *MuxBroker
151         plugins map[string]Plugin
152 }
153
154 func (d *dispenseServer) Dispense(
155         name string, response *uint32) error {
156         // Find the function to create this implementation
157         p, ok := d.plugins[name]
158         if !ok {
159                 return fmt.Errorf("unknown plugin type: %s", name)
160         }
161
162         // Create the implementation first so we know if there is an error.
163         impl, err := p.Server(d.broker)
164         if err != nil {
165                 // We turn the error into an errors error so that it works across RPC
166                 return errors.New(err.Error())
167         }
168
169         // Reserve an ID for our implementation
170         id := d.broker.NextId()
171         *response = id
172
173         // Run the rest in a goroutine since it can only happen once this RPC
174         // call returns. We wait for a connection for the plugin implementation
175         // and serve it.
176         go func() {
177                 conn, err := d.broker.Accept(id)
178                 if err != nil {
179                         log.Printf("[ERR] go-plugin: plugin dispense error: %s: %s", name, err)
180                         return
181                 }
182
183                 serve(conn, "Plugin", impl)
184         }()
185
186         return nil
187 }
188
189 func serve(conn io.ReadWriteCloser, name string, v interface{}) {
190         server := rpc.NewServer()
191         if err := server.RegisterName(name, v); err != nil {
192                 log.Printf("[ERR] go-plugin: plugin dispense error: %s", err)
193                 return
194         }
195
196         server.ServeConn(conn)
197 }