1 // Copyright 2011 The Go Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style
3 // license that can be found in the LICENSE file.
18 // channelMaxPacket contains the maximum number of bytes that will be
19 // sent in a single packet. As per RFC 4253, section 6.1, 32k is also
21 channelMaxPacket = 1 << 15
22 // We follow OpenSSH here.
23 channelWindowSize = 64 * channelMaxPacket
26 // NewChannel represents an incoming request to a channel. It must either be
27 // accepted for use by calling Accept, or rejected by calling Reject.
28 type NewChannel interface {
29 // Accept accepts the channel creation request. It returns the Channel
30 // and a Go channel containing SSH requests. The Go channel must be
31 // serviced otherwise the Channel will hang.
32 Accept() (Channel, <-chan *Request, error)
34 // Reject rejects the channel creation request. After calling
35 // this, no other methods on the Channel may be called.
36 Reject(reason RejectionReason, message string) error
38 // ChannelType returns the type of the channel, as supplied by the
42 // ExtraData returns the arbitrary payload for this channel, as supplied
43 // by the client. This data is specific to the channel type.
47 // A Channel is an ordered, reliable, flow-controlled, duplex stream
48 // that is multiplexed over an SSH connection.
49 type Channel interface {
50 // Read reads up to len(data) bytes from the channel.
51 Read(data []byte) (int, error)
53 // Write writes len(data) bytes to the channel.
54 Write(data []byte) (int, error)
56 // Close signals end of channel use. No data may be sent after this
60 // CloseWrite signals the end of sending in-band
61 // data. Requests may still be sent, and the other side may
65 // SendRequest sends a channel request. If wantReply is true,
66 // it will wait for a reply and return the result as a
67 // boolean, otherwise the return value will be false. Channel
68 // requests are out-of-band messages so they may be sent even
69 // if the data stream is closed or blocked by flow control.
70 // If the channel is closed before a reply is returned, io.EOF
72 SendRequest(name string, wantReply bool, payload []byte) (bool, error)
74 // Stderr returns an io.ReadWriter that writes to this channel
75 // with the extended data type set to stderr. Stderr may
76 // safely be read and written from a different goroutine than
77 // Read and Write respectively.
78 Stderr() io.ReadWriter
81 // Request is a request sent outside of the normal stream of
82 // data. Requests can either be specific to an SSH channel, or they
93 // Reply sends a response to a request. It must be called for all requests
94 // where WantReply is true and is a no-op otherwise. The payload argument is
95 // ignored for replies to channel-specific requests.
96 func (r *Request) Reply(ok bool, payload []byte) error {
102 return r.mux.ackRequest(ok, payload)
105 return r.ch.ackRequest(ok)
108 // RejectionReason is an enumeration used when rejecting channel creation
109 // requests. See RFC 4254, section 5.1.
110 type RejectionReason uint32
113 Prohibited RejectionReason = iota + 1
119 // String converts the rejection reason to human readable form.
120 func (r RejectionReason) String() string {
123 return "administratively prohibited"
124 case ConnectionFailed:
125 return "connect failed"
126 case UnknownChannelType:
127 return "unknown channel type"
128 case ResourceShortage:
129 return "resource shortage"
131 return fmt.Sprintf("unknown reason %d", int(r))
134 func min(a uint32, b int) uint32 {
141 type channelDirection uint8
144 channelInbound channelDirection = iota
148 // channel is an implementation of the Channel interface that works
149 // with the mux class.
150 type channel struct {
151 // R/O after creation
154 localId, remoteId uint32
156 // maxIncomingPayload and maxRemotePayload are the maximum
157 // payload sizes of normal and extended data packets for
158 // receiving and sending, respectively. The wire packet will
159 // be 9 or 13 bytes larger (excluding encryption overhead).
160 maxIncomingPayload uint32
161 maxRemotePayload uint32
165 // decided is set to true if an accept or reject message has been sent
166 // (for outbound channels) or received (for inbound channels).
169 // direction contains either channelOutbound, for channels created
170 // locally, or channelInbound, for channels created by the peer.
171 direction channelDirection
173 // Pending internal channel messages.
176 // Since requests have no ID, there can be only one request
177 // with WantReply=true outstanding. This lock is held by a
178 // goroutine that has such an outgoing request pending.
179 sentRequestMu sync.Mutex
181 incomingRequests chan *Request
190 // windowMu protects myWindow, the flow-control window.
194 // writeMu serializes calls to mux.conn.writePacket() and
195 // protects sentClose and packetPool. This mutex must be
196 // different from windowMu, as writePacket can block if there
197 // is a key exchange pending.
201 // packetPool has a buffer for each extended channel ID to
202 // save allocations during writes.
203 packetPool map[uint32][]byte
206 // writePacket sends a packet. If the packet is a channel close, it updates
207 // sentClose. This method takes the lock c.writeMu.
208 func (c *channel) writePacket(packet []byte) error {
214 c.sentClose = (packet[0] == msgChannelClose)
215 err := c.mux.conn.writePacket(packet)
220 func (c *channel) sendMessage(msg interface{}) error {
222 log.Printf("send(%d): %#v", c.mux.chanList.offset, msg)
226 binary.BigEndian.PutUint32(p[1:], c.remoteId)
227 return c.writePacket(p)
230 // WriteExtended writes data to a specific extended stream. These streams are
231 // used, for example, for stderr.
232 func (c *channel) WriteExtended(data []byte, extendedCode uint32) (n int, err error) {
236 // 1 byte message type, 4 bytes remoteId, 4 bytes data length
237 opCode := byte(msgChannelData)
238 headerLength := uint32(9)
239 if extendedCode > 0 {
241 opCode = msgChannelExtendedData
245 packet := c.packetPool[extendedCode]
246 // We don't remove the buffer from packetPool, so
247 // WriteExtended calls from different goroutines will be
248 // flagged as errors by the race detector.
252 space := min(c.maxRemotePayload, len(data))
253 if space, err = c.remoteWin.reserve(space); err != nil {
256 if want := headerLength + space; uint32(cap(packet)) < want {
257 packet = make([]byte, want)
259 packet = packet[:want]
265 binary.BigEndian.PutUint32(packet[1:], c.remoteId)
266 if extendedCode > 0 {
267 binary.BigEndian.PutUint32(packet[5:], uint32(extendedCode))
269 binary.BigEndian.PutUint32(packet[headerLength-4:], uint32(len(todo)))
270 copy(packet[headerLength:], todo)
271 if err = c.writePacket(packet); err != nil {
276 data = data[len(todo):]
280 c.packetPool[extendedCode] = packet
286 func (c *channel) handleData(packet []byte) error {
288 isExtendedData := packet[0] == msgChannelExtendedData
292 if len(packet) < headerLen {
293 // malformed data packet
294 return parseError(packet[0])
299 extended = binary.BigEndian.Uint32(packet[5:])
302 length := binary.BigEndian.Uint32(packet[headerLen-4 : headerLen])
306 if length > c.maxIncomingPayload {
307 // TODO(hanwen): should send Disconnect?
308 return errors.New("ssh: incoming packet exceeds maximum payload size")
311 data := packet[headerLen:]
312 if length != uint32(len(data)) {
313 return errors.New("ssh: wrong packet length")
317 if c.myWindow < length {
319 // TODO(hanwen): should send Disconnect with reason?
320 return errors.New("ssh: remote side wrote too much")
326 c.extPending.write(data)
327 } else if extended > 0 {
328 // discard other extended data.
330 c.pending.write(data)
335 func (c *channel) adjustWindow(n uint32) error {
337 // Since myWindow is managed on our side, and can never exceed
338 // the initial window setting, we don't worry about overflow.
339 c.myWindow += uint32(n)
341 return c.sendMessage(windowAdjustMsg{
342 AdditionalBytes: uint32(n),
346 func (c *channel) ReadExtended(data []byte, extended uint32) (n int, err error) {
349 n, err = c.extPending.Read(data)
351 n, err = c.pending.Read(data)
353 return 0, fmt.Errorf("ssh: extended code %d unimplemented", extended)
357 err = c.adjustWindow(uint32(n))
358 // sendWindowAdjust can return io.EOF if the remote
359 // peer has closed the connection, however we want to
360 // defer forwarding io.EOF to the caller of Read until
361 // the buffer has been drained.
362 if n > 0 && err == io.EOF {
370 func (c *channel) close() {
374 close(c.incomingRequests)
376 // This is not necessary for a normal channel teardown, but if
377 // there was another error, it is.
384 // responseMessageReceived is called when a success or failure message is
385 // received on a channel to check that such a message is reasonable for the
387 func (c *channel) responseMessageReceived() error {
388 if c.direction == channelInbound {
389 return errors.New("ssh: channel response message received on inbound channel")
392 return errors.New("ssh: duplicate response received for channel")
398 func (c *channel) handlePacket(packet []byte) error {
400 case msgChannelData, msgChannelExtendedData:
401 return c.handleData(packet)
402 case msgChannelClose:
403 c.sendMessage(channelCloseMsg{PeersId: c.remoteId})
404 c.mux.chanList.remove(c.localId)
408 // RFC 4254 is mute on how EOF affects dataExt messages but
409 // it is logical to signal EOF at the same time.
415 decoded, err := decode(packet)
420 switch msg := decoded.(type) {
421 case *channelOpenFailureMsg:
422 if err := c.responseMessageReceived(); err != nil {
425 c.mux.chanList.remove(msg.PeersId)
427 case *channelOpenConfirmMsg:
428 if err := c.responseMessageReceived(); err != nil {
431 if msg.MaxPacketSize < minPacketLength || msg.MaxPacketSize > 1<<31 {
432 return fmt.Errorf("ssh: invalid MaxPacketSize %d from peer", msg.MaxPacketSize)
434 c.remoteId = msg.MyId
435 c.maxRemotePayload = msg.MaxPacketSize
436 c.remoteWin.add(msg.MyWindow)
438 case *windowAdjustMsg:
439 if !c.remoteWin.add(msg.AdditionalBytes) {
440 return fmt.Errorf("ssh: invalid window update for %d bytes", msg.AdditionalBytes)
442 case *channelRequestMsg:
445 WantReply: msg.WantReply,
446 Payload: msg.RequestSpecificData,
450 c.incomingRequests <- &req
457 func (m *mux) newChannel(chanType string, direction channelDirection, extraData []byte) *channel {
459 remoteWin: window{Cond: newCond()},
460 myWindow: channelWindowSize,
461 pending: newBuffer(),
462 extPending: newBuffer(),
463 direction: direction,
464 incomingRequests: make(chan *Request, chanSize),
465 msg: make(chan interface{}, chanSize),
467 extraData: extraData,
469 packetPool: make(map[uint32][]byte),
471 ch.localId = m.chanList.add(ch)
475 var errUndecided = errors.New("ssh: must Accept or Reject channel")
476 var errDecidedAlready = errors.New("ssh: can call Accept or Reject only once")
478 type extChannel struct {
483 func (e *extChannel) Write(data []byte) (n int, err error) {
484 return e.ch.WriteExtended(data, e.code)
487 func (e *extChannel) Read(data []byte) (n int, err error) {
488 return e.ch.ReadExtended(data, e.code)
491 func (c *channel) Accept() (Channel, <-chan *Request, error) {
493 return nil, nil, errDecidedAlready
495 c.maxIncomingPayload = channelMaxPacket
496 confirm := channelOpenConfirmMsg{
499 MyWindow: c.myWindow,
500 MaxPacketSize: c.maxIncomingPayload,
503 if err := c.sendMessage(confirm); err != nil {
507 return c, c.incomingRequests, nil
510 func (ch *channel) Reject(reason RejectionReason, message string) error {
512 return errDecidedAlready
514 reject := channelOpenFailureMsg{
515 PeersId: ch.remoteId,
521 return ch.sendMessage(reject)
524 func (ch *channel) Read(data []byte) (int, error) {
526 return 0, errUndecided
528 return ch.ReadExtended(data, 0)
531 func (ch *channel) Write(data []byte) (int, error) {
533 return 0, errUndecided
535 return ch.WriteExtended(data, 0)
538 func (ch *channel) CloseWrite() error {
543 return ch.sendMessage(channelEOFMsg{
544 PeersId: ch.remoteId})
547 func (ch *channel) Close() error {
552 return ch.sendMessage(channelCloseMsg{
553 PeersId: ch.remoteId})
556 // Extended returns an io.ReadWriter that sends and receives data on the given,
557 // SSH extended stream. Such streams are used, for example, for stderr.
558 func (ch *channel) Extended(code uint32) io.ReadWriter {
562 return &extChannel{code, ch}
565 func (ch *channel) Stderr() io.ReadWriter {
566 return ch.Extended(1)
569 func (ch *channel) SendRequest(name string, wantReply bool, payload []byte) (bool, error) {
571 return false, errUndecided
575 ch.sentRequestMu.Lock()
576 defer ch.sentRequestMu.Unlock()
579 msg := channelRequestMsg{
580 PeersId: ch.remoteId,
582 WantReply: wantReply,
583 RequestSpecificData: payload,
586 if err := ch.sendMessage(msg); err != nil {
596 case *channelRequestFailureMsg:
598 case *channelRequestSuccessMsg:
601 return false, fmt.Errorf("ssh: unexpected response to channel request: %#v", m)
608 // ackRequest either sends an ack or nack to the channel request.
609 func (ch *channel) ackRequest(ok bool) error {
616 msg = channelRequestFailureMsg{
617 PeersId: ch.remoteId,
620 msg = channelRequestSuccessMsg{
621 PeersId: ch.remoteId,
624 return ch.sendMessage(msg)
627 func (ch *channel) ChannelType() string {
631 func (ch *channel) ExtraData() []byte {