package websocket import ( "container/list" "encoding/json" "fmt" "io" "net/http" "sync" "time" "github.com/gorilla/websocket" log "github.com/sirupsen/logrus" "github.com/vapor/errors" ) // websocketSendBufferSize is the number of elements the send channel // can queue before blocking. Note that this only applies to requests // handled directly in the websocket client input handler or the async // handler since notifications have their own queuing mechanism // independent of the send channel buffer. const ( logModule = "websocket" websocketSendBufferSize = 50 ) var ( // ErrWSParse means a request parsing error ErrWSParse = errors.New("Websocket request parsing error") // ErrWSInternal means service handling errors ErrWSInternal = errors.New("Websocket Internal error") // ErrWSClientQuit means the websocket client is disconnected ErrWSClientQuit = errors.New("Websocket client quit") // timeZeroVal is simply the zero value for a time.Time and is used to avoid creating multiple instances. timeZeroVal time.Time ) type semaphore chan struct{} func makeSemaphore(n int) semaphore { return make(chan struct{}, n) } func (s semaphore) acquire() { s <- struct{}{} } func (s semaphore) release() { <-s } // wsTopicHandler describes a callback function used to handle a specific topic. type wsTopicHandler func(*WSClient) // wsHandlers maps websocket topic strings to appropriate websocket handler // functions. This is set by init because help references wsHandlers and thus // causes a dependency loop. var wsHandlers = map[string]wsTopicHandler{ "notify_raw_blocks": handleNotifyBlocks, "notify_new_transactions": handleNotifyNewTransactions, "stop_notify_raw_blocks": handleStopNotifyBlocks, "stop_notify_new_transactions": handleStopNotifyNewTransactions, } // responseMessage houses a message to send to a connected websocket client as // well as a channel to reply on when the message is sent. type responseMessage struct { msg []byte doneChan chan bool } // WSClient provides an abstraction for handling a websocket client. The // overall data flow is split into 3 main goroutines, a possible 4th goroutine // for long-running operations (only started if request is made), and a // websocket manager which is used to allow things such as broadcasting // requested notifications to all connected websocket clients. Inbound // messages are read via the inHandler goroutine and generally dispatched to // their own handler. However, certain potentially long-running operations such // as rescans, are sent to the asyncHander goroutine and are limited to one at a // time. There are two outbound message types - one for responding to client // requests and another for async notifications. Responses to client requests // use SendMessage which employs a buffered channel thereby limiting the number // of outstanding requests that can be made. Notifications are sent via // QueueNotification which implements a queue via notificationQueueHandler to // ensure sending notifications from other subsystems can't block. Ultimately, // all messages are sent via the outHandler. type WSClient struct { sync.Mutex conn *websocket.Conn // disconnected indicated whether or not the websocket client is disconnected. disconnected bool // addr is the remote address of the client. addr string serviceRequestSem semaphore ntfnChan chan []byte sendChan chan responseMessage quit chan struct{} wg sync.WaitGroup notificationMgr *WSNotificationManager } // NewWebsocketClient means to create a new object to the connected websocket client func NewWebsocketClient(w http.ResponseWriter, r *http.Request, notificationMgr *WSNotificationManager) (*WSClient, error) { // Limit max number of websocket clients. if notificationMgr.IsMaxConnect() { return nil, fmt.Errorf("numOfMaxWS: %d, disconnecting: %s", notificationMgr.MaxNumWebsockets, r.RemoteAddr) } // Attempt to upgrade the connection to a websocket connection using the default size for read/write buffers. conn, err := websocket.Upgrade(w, r, nil, 0, 0) if err != nil { return nil, err } conn.SetReadDeadline(timeZeroVal) client := &WSClient{ conn: conn, addr: r.RemoteAddr, serviceRequestSem: makeSemaphore(notificationMgr.maxNumConcurrentReqs), ntfnChan: make(chan []byte, 1), // nonblocking sync sendChan: make(chan responseMessage, websocketSendBufferSize), quit: make(chan struct{}), notificationMgr: notificationMgr, } return client, nil } // inHandler handles all incoming messages for the websocket connection. func (c *WSClient) inHandler() { out: for { // Break out of the loop once the quit channel has been closed. // Use a non-blocking select here so we fall through otherwise. select { case <-c.quit: break out default: } _, msg, err := c.conn.ReadMessage() if err != nil { if err != io.EOF { log.WithFields(log.Fields{"module": logModule, "remoteAddress": c.addr, "error": err}).Error("Websocket receive error") } break out } var request WSRequest if err = json.Unmarshal(msg, &request); err != nil { respError := errors.Wrap(err, ErrWSParse) resp := NewWSResponse(NTRequestStatus.String(), nil, respError) reply, err := json.Marshal(resp) if err != nil { log.WithFields(log.Fields{"module": logModule, "error": err}).Error("Failed to marshal parse failure reply") continue } c.SendMessage(reply, nil) continue } c.serviceRequestSem.acquire() go func() { c.serviceRequest(request.Topic) c.serviceRequestSem.release() }() } // Ensure the connection is closed. c.Disconnect() c.wg.Done() log.WithFields(log.Fields{"module": logModule, "remoteAddress": c.addr}).Debug("Websocket client input handler done") } func (c *WSClient) serviceRequest(topic string) { var respErr error if wsHandler, ok := wsHandlers[topic]; ok { wsHandler(c) } else { err := fmt.Errorf("There is not this topic: %s", topic) respErr = errors.Wrap(err, ErrWSInternal) log.WithFields(log.Fields{"module": logModule, "topic": topic}).Debug("There is not this topic") } resp := NewWSResponse(NTRequestStatus.String(), nil, respErr) reply, err := json.Marshal(resp) if err != nil { log.WithFields(log.Fields{"module": logModule, "error": err}).Debug("Failed to marshal parse failure reply") return } c.SendMessage(reply, nil) } // notificationQueueHandler handles the queuing of outgoing notifications for the websocket client. func (c *WSClient) notificationQueueHandler() { ntfnSentChan := make(chan bool, 1) // nonblocking sync // pendingNtfns is used as a queue for notifications that are ready to // be sent once there are no outstanding notifications currently being // sent. pendingNtfns := list.New() waiting := false out: for { select { // This channel is notified when a message is being queued to // be sent across the network socket. It will either send the // message immediately if a send is not already in progress, or // queue the message to be sent once the other pending messages // are sent. case msg := <-c.ntfnChan: if !waiting { c.SendMessage(msg, ntfnSentChan) } else { pendingNtfns.PushBack(msg) } waiting = true // This channel is notified when a notification has been sent across the network socket. case <-ntfnSentChan: // No longer waiting if there are no more messages in the pending messages queue. next := pendingNtfns.Front() if next == nil { waiting = false continue } // Notify the outHandler about the next item to asynchronously send. msg := pendingNtfns.Remove(next).([]byte) c.SendMessage(msg, ntfnSentChan) case <-c.quit: break out } } // Drain any wait channels before exiting so nothing is left waiting around to send. cleanup: for { select { case <-c.ntfnChan: case <-ntfnSentChan: default: break cleanup } } c.wg.Done() log.WithFields(log.Fields{"module": logModule, "remoteAddress": c.addr}).Debug("Websocket client notification queue handler done") } // outHandler handles all outgoing messages for the websocket connection. func (c *WSClient) outHandler() { out: for { // Send any messages ready for send until the quit channel is closed. select { case r := <-c.sendChan: if err := c.conn.WriteMessage(websocket.TextMessage, r.msg); err != nil { log.WithFields(log.Fields{"module": logModule, "error": err}).Error("Failed to send message to wesocket client") c.Disconnect() break out } if r.doneChan != nil { r.doneChan <- true } case <-c.quit: break out } } // Drain any wait channels before exiting so nothing is left waiting around to send. cleanup: for { select { case r := <-c.sendChan: if r.doneChan != nil { r.doneChan <- false } default: break cleanup } } c.wg.Done() log.WithFields(log.Fields{"module": logModule, "remoteAddress": c.addr}).Debug("Websocket client output handler done") } // SendMessage sends the passed json to the websocket client. It is backed // by a buffered channel, so it will not block until the send channel is full. // Note however that QueueNotification must be used for sending async // notifications instead of the this function. This approach allows a limit to // the number of outstanding requests a client can make without preventing or // blocking on async notifications. func (c *WSClient) SendMessage(marshalledJSON []byte, doneChan chan bool) { // Don't send the message if disconnected. if c.Disconnected() { if doneChan != nil { doneChan <- false } return } c.sendChan <- responseMessage{msg: marshalledJSON, doneChan: doneChan} } // QueueNotification queues the passed notification to be sent to the websocket client. func (c *WSClient) QueueNotification(marshalledJSON []byte) error { // Don't queue the message if disconnected. if c.Disconnected() { return ErrWSClientQuit } c.ntfnChan <- marshalledJSON return nil } // Disconnected returns whether or not the websocket client is disconnected. func (c *WSClient) Disconnected() bool { c.Lock() defer c.Unlock() return c.disconnected } // Disconnect disconnects the websocket client. func (c *WSClient) Disconnect() { c.Lock() defer c.Unlock() // Nothing to do if already disconnected. if c.disconnected { return } log.WithFields(log.Fields{"module": logModule, "remoteAddress": c.addr}).Info("Disconnecting websocket client") close(c.quit) c.conn.Close() c.disconnected = true } // Start begins processing input and output messages. func (c *WSClient) Start() { log.WithFields(log.Fields{"module": logModule, "remoteAddress": c.addr}).Info("Starting websocket client") c.wg.Add(3) go c.inHandler() go c.notificationQueueHandler() go c.outHandler() } // WaitForShutdown blocks until the websocket client goroutines are stopped and the connection is closed. func (c *WSClient) WaitForShutdown() { c.wg.Wait() } // handleNotifyBlocks implements the notifyblocks topic extension for websocket connections. func handleNotifyBlocks(wsc *WSClient) { wsc.notificationMgr.RegisterBlockUpdates(wsc) } // handleStopNotifyBlocks implements the stopnotifyblocks topic extension for websocket connections. func handleStopNotifyBlocks(wsc *WSClient) { wsc.notificationMgr.UnregisterBlockUpdates(wsc) } // handleNotifyNewTransations implements the notifynewtransactions topic extension for websocket connections. func handleNotifyNewTransactions(wsc *WSClient) { wsc.notificationMgr.RegisterNewMempoolTxsUpdates(wsc) } // handleStopNotifyNewTransations implements the stopnotifynewtransactions topic extension for websocket connections. func handleStopNotifyNewTransactions(wsc *WSClient) { wsc.notificationMgr.UnregisterNewMempoolTxsUpdates(wsc) }