12 "github.com/gorilla/websocket"
13 log "github.com/sirupsen/logrus"
15 "github.com/vapor/errors"
18 // websocketSendBufferSize is the number of elements the send channel
19 // can queue before blocking. Note that this only applies to requests
20 // handled directly in the websocket client input handler or the async
21 // handler since notifications have their own queuing mechanism
22 // independent of the send channel buffer.
24 logModule = "websocket"
25 websocketSendBufferSize = 50
29 // ErrWSParse means a request parsing error
30 ErrWSParse = errors.New("Websocket request parsing error")
31 // ErrWSInternal means service handling errors
32 ErrWSInternal = errors.New("Websocket Internal error")
33 // ErrWSClientQuit means the websocket client is disconnected
34 ErrWSClientQuit = errors.New("Websocket client quit")
36 // timeZeroVal is simply the zero value for a time.Time and is used to avoid creating multiple instances.
40 type semaphore chan struct{}
42 func makeSemaphore(n int) semaphore {
43 return make(chan struct{}, n)
46 func (s semaphore) acquire() { s <- struct{}{} }
47 func (s semaphore) release() { <-s }
49 // wsTopicHandler describes a callback function used to handle a specific topic.
50 type wsTopicHandler func(*WSClient)
52 // wsHandlers maps websocket topic strings to appropriate websocket handler
53 // functions. This is set by init because help references wsHandlers and thus
54 // causes a dependency loop.
55 var wsHandlers = map[string]wsTopicHandler{
56 "notify_raw_blocks": handleNotifyBlocks,
57 "notify_new_transactions": handleNotifyNewTransactions,
58 "stop_notify_raw_blocks": handleStopNotifyBlocks,
59 "stop_notify_new_transactions": handleStopNotifyNewTransactions,
62 // responseMessage houses a message to send to a connected websocket client as
63 // well as a channel to reply on when the message is sent.
64 type responseMessage struct {
69 // WSClient provides an abstraction for handling a websocket client. The
70 // overall data flow is split into 3 main goroutines, a possible 4th goroutine
71 // for long-running operations (only started if request is made), and a
72 // websocket manager which is used to allow things such as broadcasting
73 // requested notifications to all connected websocket clients. Inbound
74 // messages are read via the inHandler goroutine and generally dispatched to
75 // their own handler. However, certain potentially long-running operations such
76 // as rescans, are sent to the asyncHander goroutine and are limited to one at a
77 // time. There are two outbound message types - one for responding to client
78 // requests and another for async notifications. Responses to client requests
79 // use SendMessage which employs a buffered channel thereby limiting the number
80 // of outstanding requests that can be made. Notifications are sent via
81 // QueueNotification which implements a queue via notificationQueueHandler to
82 // ensure sending notifications from other subsystems can't block. Ultimately,
83 // all messages are sent via the outHandler.
84 type WSClient struct {
87 // disconnected indicated whether or not the websocket client is disconnected.
89 // addr is the remote address of the client.
91 serviceRequestSem semaphore
93 sendChan chan responseMessage
96 notificationMgr *WSNotificationManager
99 // NewWebsocketClient means to create a new object to the connected websocket client
100 func NewWebsocketClient(w http.ResponseWriter, r *http.Request, notificationMgr *WSNotificationManager) (*WSClient, error) {
101 // Limit max number of websocket clients.
102 if notificationMgr.IsMaxConnect() {
103 return nil, fmt.Errorf("numOfMaxWS: %d, disconnecting: %s", notificationMgr.MaxNumWebsockets, r.RemoteAddr)
106 // Attempt to upgrade the connection to a websocket connection using the default size for read/write buffers.
107 conn, err := websocket.Upgrade(w, r, nil, 0, 0)
112 conn.SetReadDeadline(timeZeroVal)
117 serviceRequestSem: makeSemaphore(notificationMgr.maxNumConcurrentReqs),
118 ntfnChan: make(chan []byte, 1), // nonblocking sync
119 sendChan: make(chan responseMessage, websocketSendBufferSize),
120 quit: make(chan struct{}),
121 notificationMgr: notificationMgr,
126 // inHandler handles all incoming messages for the websocket connection.
127 func (c *WSClient) inHandler() {
130 // Break out of the loop once the quit channel has been closed.
131 // Use a non-blocking select here so we fall through otherwise.
138 _, msg, err := c.conn.ReadMessage()
141 log.WithFields(log.Fields{"module": logModule, "remoteAddress": c.addr, "error": err}).Error("Websocket receive error")
146 var request WSRequest
147 if err = json.Unmarshal(msg, &request); err != nil {
148 respError := errors.Wrap(err, ErrWSParse)
149 resp := NewWSResponse(NTRequestStatus.String(), nil, respError)
150 reply, err := json.Marshal(resp)
152 log.WithFields(log.Fields{"module": logModule, "error": err}).Error("Failed to marshal parse failure reply")
156 c.SendMessage(reply, nil)
160 c.serviceRequestSem.acquire()
162 c.serviceRequest(request.Topic)
163 c.serviceRequestSem.release()
167 // Ensure the connection is closed.
170 log.WithFields(log.Fields{"module": logModule, "remoteAddress": c.addr}).Debug("Websocket client input handler done")
173 func (c *WSClient) serviceRequest(topic string) {
176 if wsHandler, ok := wsHandlers[topic]; ok {
179 err := fmt.Errorf("There is not this topic: %s", topic)
180 respErr = errors.Wrap(err, ErrWSInternal)
181 log.WithFields(log.Fields{"module": logModule, "topic": topic}).Debug("There is not this topic")
184 resp := NewWSResponse(NTRequestStatus.String(), nil, respErr)
185 reply, err := json.Marshal(resp)
187 log.WithFields(log.Fields{"module": logModule, "error": err}).Debug("Failed to marshal parse failure reply")
191 c.SendMessage(reply, nil)
194 // notificationQueueHandler handles the queuing of outgoing notifications for the websocket client.
195 func (c *WSClient) notificationQueueHandler() {
196 ntfnSentChan := make(chan bool, 1) // nonblocking sync
198 // pendingNtfns is used as a queue for notifications that are ready to
199 // be sent once there are no outstanding notifications currently being
201 pendingNtfns := list.New()
206 // This channel is notified when a message is being queued to
207 // be sent across the network socket. It will either send the
208 // message immediately if a send is not already in progress, or
209 // queue the message to be sent once the other pending messages
211 case msg := <-c.ntfnChan:
213 c.SendMessage(msg, ntfnSentChan)
215 pendingNtfns.PushBack(msg)
218 // This channel is notified when a notification has been sent across the network socket.
220 // No longer waiting if there are no more messages in the pending messages queue.
221 next := pendingNtfns.Front()
227 // Notify the outHandler about the next item to asynchronously send.
228 msg := pendingNtfns.Remove(next).([]byte)
229 c.SendMessage(msg, ntfnSentChan)
235 // Drain any wait channels before exiting so nothing is left waiting around to send.
246 log.WithFields(log.Fields{"module": logModule, "remoteAddress": c.addr}).Debug("Websocket client notification queue handler done")
249 // outHandler handles all outgoing messages for the websocket connection.
250 func (c *WSClient) outHandler() {
253 // Send any messages ready for send until the quit channel is closed.
255 case r := <-c.sendChan:
256 if err := c.conn.WriteMessage(websocket.TextMessage, r.msg); err != nil {
257 log.WithFields(log.Fields{"module": logModule, "error": err}).Error("Failed to send message to wesocket client")
261 if r.doneChan != nil {
269 // Drain any wait channels before exiting so nothing is left waiting around to send.
273 case r := <-c.sendChan:
274 if r.doneChan != nil {
282 log.WithFields(log.Fields{"module": logModule, "remoteAddress": c.addr}).Debug("Websocket client output handler done")
285 // SendMessage sends the passed json to the websocket client. It is backed
286 // by a buffered channel, so it will not block until the send channel is full.
287 // Note however that QueueNotification must be used for sending async
288 // notifications instead of the this function. This approach allows a limit to
289 // the number of outstanding requests a client can make without preventing or
290 // blocking on async notifications.
291 func (c *WSClient) SendMessage(marshalledJSON []byte, doneChan chan bool) {
292 // Don't send the message if disconnected.
293 if c.Disconnected() {
300 c.sendChan <- responseMessage{msg: marshalledJSON, doneChan: doneChan}
303 // QueueNotification queues the passed notification to be sent to the websocket client.
304 func (c *WSClient) QueueNotification(marshalledJSON []byte) error {
305 // Don't queue the message if disconnected.
306 if c.Disconnected() {
307 return ErrWSClientQuit
310 c.ntfnChan <- marshalledJSON
314 // Disconnected returns whether or not the websocket client is disconnected.
315 func (c *WSClient) Disconnected() bool {
319 return c.disconnected
322 // Disconnect disconnects the websocket client.
323 func (c *WSClient) Disconnect() {
327 // Nothing to do if already disconnected.
332 log.WithFields(log.Fields{"module": logModule, "remoteAddress": c.addr}).Info("Disconnecting websocket client")
336 c.disconnected = true
339 // Start begins processing input and output messages.
340 func (c *WSClient) Start() {
341 log.WithFields(log.Fields{"module": logModule, "remoteAddress": c.addr}).Info("Starting websocket client")
345 go c.notificationQueueHandler()
349 // WaitForShutdown blocks until the websocket client goroutines are stopped and the connection is closed.
350 func (c *WSClient) WaitForShutdown() {
354 // handleNotifyBlocks implements the notifyblocks topic extension for websocket connections.
355 func handleNotifyBlocks(wsc *WSClient) {
356 wsc.notificationMgr.RegisterBlockUpdates(wsc)
359 // handleStopNotifyBlocks implements the stopnotifyblocks topic extension for websocket connections.
360 func handleStopNotifyBlocks(wsc *WSClient) {
361 wsc.notificationMgr.UnregisterBlockUpdates(wsc)
364 // handleNotifyNewTransations implements the notifynewtransactions topic extension for websocket connections.
365 func handleNotifyNewTransactions(wsc *WSClient) {
366 wsc.notificationMgr.RegisterNewMempoolTxsUpdates(wsc)
369 // handleStopNotifyNewTransations implements the stopnotifynewtransactions topic extension for websocket connections.
370 func handleStopNotifyNewTransactions(wsc *WSClient) {
371 wsc.notificationMgr.UnregisterNewMempoolTxsUpdates(wsc)