12 "github.com/hashicorp/yamux"
15 // MuxBroker is responsible for brokering multiplexed connections by unique ID.
17 // It is used by plugins to multiplex multiple RPC connections and data
18 // streams on top of a single connection between the plugin process and the
21 // This allows a plugin to request a channel with a specific ID to connect to
22 // or accept a connection from, and the broker handles the details of
23 // holding these channels open while they're being negotiated.
25 // The Plugin interface has access to these for both Server and Client.
26 // The broker can be used by either (optionally) to reserve and connect to
27 // new multiplexed streams. This is useful for complex args and return values,
28 // or anything else you might need a data stream for.
29 type MuxBroker struct {
31 session *yamux.Session
32 streams map[uint32]*muxBrokerPending
37 type muxBrokerPending struct {
42 func newMuxBroker(s *yamux.Session) *MuxBroker {
45 streams: make(map[uint32]*muxBrokerPending),
49 // Accept accepts a connection by ID.
51 // This should not be called multiple times with the same ID at one time.
52 func (m *MuxBroker) Accept(id uint32) (net.Conn, error) {
58 case <-time.After(5 * time.Second):
63 return nil, fmt.Errorf("timeout waiting for accept")
67 if err := binary.Write(c, binary.LittleEndian, id); err != nil {
75 // AcceptAndServe is used to accept a specific stream ID and immediately
76 // serve an RPC server on that stream ID. This is used to easily serve
79 // The served interface is always registered to the "Plugin" name.
80 func (m *MuxBroker) AcceptAndServe(id uint32, v interface{}) {
81 conn, err := m.Accept(id)
83 log.Printf("[ERR] plugin: plugin acceptAndServe error: %s", err)
87 serve(conn, "Plugin", v)
90 // Close closes the connection and all sub-connections.
91 func (m *MuxBroker) Close() error {
92 return m.session.Close()
95 // Dial opens a connection by ID.
96 func (m *MuxBroker) Dial(id uint32) (net.Conn, error) {
98 stream, err := m.session.OpenStream()
103 // Write the stream ID onto the wire.
104 if err := binary.Write(stream, binary.LittleEndian, id); err != nil {
109 // Read the ack that we connected. Then we're off!
111 if err := binary.Read(stream, binary.LittleEndian, &ack); err != nil {
117 return nil, fmt.Errorf("bad ack: %d (expected %d)", ack, id)
123 // NextId returns a unique ID to use next.
125 // It is possible for very long-running plugin hosts to wrap this value,
126 // though it would require a very large amount of RPC calls. In practice
127 // we've never seen it happen.
128 func (m *MuxBroker) NextId() uint32 {
129 return atomic.AddUint32(&m.nextId, 1)
132 // Run starts the brokering and should be executed in a goroutine, since it
133 // blocks forever, or until the session closes.
135 // Uses of MuxBroker never need to call this. It is called internally by
136 // the plugin host/client.
137 func (m *MuxBroker) Run() {
139 stream, err := m.session.AcceptStream()
141 // Once we receive an error, just exit
145 // Read the stream ID from the stream
147 if err := binary.Read(stream, binary.LittleEndian, &id); err != nil {
152 // Initialize the waiter
159 // Wait for a timeout
160 go m.timeoutWait(id, p)
164 func (m *MuxBroker) getStream(id uint32) *muxBrokerPending {
168 p, ok := m.streams[id]
173 m.streams[id] = &muxBrokerPending{
174 ch: make(chan net.Conn, 1),
175 doneCh: make(chan struct{}),
180 func (m *MuxBroker) timeoutWait(id uint32, p *muxBrokerPending) {
181 // Wait for the stream to either be picked up and connected, or
186 case <-time.After(5 * time.Second):
193 // Delete the stream so no one else can grab it
194 delete(m.streams, id)
196 // If we timed out, then check if we have a channel in the buffer,
197 // and if so, close it.