OSDN Git Service

Hulk did something
[bytom/vapor.git] / net / websocket / wsclient.go
diff --git a/net/websocket/wsclient.go b/net/websocket/wsclient.go
new file mode 100644 (file)
index 0000000..ea8261b
--- /dev/null
@@ -0,0 +1,372 @@
+package websocket
+
+import (
+       "container/list"
+       "encoding/json"
+       "fmt"
+       "io"
+       "net/http"
+       "sync"
+       "time"
+
+       "github.com/gorilla/websocket"
+       log "github.com/sirupsen/logrus"
+
+       "github.com/vapor/errors"
+)
+
+// websocketSendBufferSize is the number of elements the send channel
+// can queue before blocking.  Note that this only applies to requests
+// handled directly in the websocket client input handler or the async
+// handler since notifications have their own queuing mechanism
+// independent of the send channel buffer.
+const (
+       logModule               = "websocket"
+       websocketSendBufferSize = 50
+)
+
+var (
+       // ErrWSParse means a request parsing error
+       ErrWSParse = errors.New("Websocket request parsing error")
+       // ErrWSInternal means service handling errors
+       ErrWSInternal = errors.New("Websocket Internal error")
+       // ErrWSClientQuit means the websocket client is disconnected
+       ErrWSClientQuit = errors.New("Websocket client quit")
+
+       // timeZeroVal is simply the zero value for a time.Time and is used to avoid creating multiple instances.
+       timeZeroVal time.Time
+)
+
+type semaphore chan struct{}
+
+func makeSemaphore(n int) semaphore {
+       return make(chan struct{}, n)
+}
+
+func (s semaphore) acquire() { s <- struct{}{} }
+func (s semaphore) release() { <-s }
+
+// wsTopicHandler describes a callback function used to handle a specific topic.
+type wsTopicHandler func(*WSClient)
+
+// wsHandlers maps websocket topic strings to appropriate websocket handler
+// functions.  This is set by init because help references wsHandlers and thus
+// causes a dependency loop.
+var wsHandlers = map[string]wsTopicHandler{
+       "notify_raw_blocks":            handleNotifyBlocks,
+       "notify_new_transactions":      handleNotifyNewTransactions,
+       "stop_notify_raw_blocks":       handleStopNotifyBlocks,
+       "stop_notify_new_transactions": handleStopNotifyNewTransactions,
+}
+
+// responseMessage houses a message to send to a connected websocket client as
+// well as a channel to reply on when the message is sent.
+type responseMessage struct {
+       msg      []byte
+       doneChan chan bool
+}
+
+// WSClient provides an abstraction for handling a websocket client.  The
+// overall data flow is split into 3 main goroutines, a possible 4th goroutine
+// for long-running operations (only started if request is made), and a
+// websocket manager which is used to allow things such as broadcasting
+// requested notifications to all connected websocket clients.   Inbound
+// messages are read via the inHandler goroutine and generally dispatched to
+// their own handler.  However, certain potentially long-running operations such
+// as rescans, are sent to the asyncHander goroutine and are limited to one at a
+// time.  There are two outbound message types - one for responding to client
+// requests and another for async notifications.  Responses to client requests
+// use SendMessage which employs a buffered channel thereby limiting the number
+// of outstanding requests that can be made.  Notifications are sent via
+// QueueNotification which implements a queue via notificationQueueHandler to
+// ensure sending notifications from other subsystems can't block.  Ultimately,
+// all messages are sent via the outHandler.
+type WSClient struct {
+       sync.Mutex
+       conn *websocket.Conn
+       // disconnected indicated whether or not the websocket client is disconnected.
+       disconnected bool
+       // addr is the remote address of the client.
+       addr              string
+       serviceRequestSem semaphore
+       ntfnChan          chan []byte
+       sendChan          chan responseMessage
+       quit              chan struct{}
+       wg                sync.WaitGroup
+       notificationMgr   *WSNotificationManager
+}
+
+// NewWebsocketClient means to create a new object to the connected websocket client
+func NewWebsocketClient(w http.ResponseWriter, r *http.Request, notificationMgr *WSNotificationManager) (*WSClient, error) {
+       // Limit max number of websocket clients.
+       if notificationMgr.IsMaxConnect() {
+               return nil, fmt.Errorf("numOfMaxWS: %d, disconnecting: %s", notificationMgr.MaxNumWebsockets, r.RemoteAddr)
+       }
+
+       // Attempt to upgrade the connection to a websocket connection using the default size for read/write buffers.
+       conn, err := websocket.Upgrade(w, r, nil, 0, 0)
+       if err != nil {
+               return nil, err
+       }
+
+       conn.SetReadDeadline(timeZeroVal)
+
+       client := &WSClient{
+               conn:              conn,
+               addr:              r.RemoteAddr,
+               serviceRequestSem: makeSemaphore(notificationMgr.maxNumConcurrentReqs),
+               ntfnChan:          make(chan []byte, 1), // nonblocking sync
+               sendChan:          make(chan responseMessage, websocketSendBufferSize),
+               quit:              make(chan struct{}),
+               notificationMgr:   notificationMgr,
+       }
+       return client, nil
+}
+
+// inHandler handles all incoming messages for the websocket connection.
+func (c *WSClient) inHandler() {
+out:
+       for {
+               // Break out of the loop once the quit channel has been closed.
+               // Use a non-blocking select here so we fall through otherwise.
+               select {
+               case <-c.quit:
+                       break out
+               default:
+               }
+
+               _, msg, err := c.conn.ReadMessage()
+               if err != nil {
+                       if err != io.EOF {
+                               log.WithFields(log.Fields{"module": logModule, "remoteAddress": c.addr, "error": err}).Error("Websocket receive error")
+                       }
+                       break out
+               }
+
+               var request WSRequest
+               if err = json.Unmarshal(msg, &request); err != nil {
+                       respError := errors.Wrap(err, ErrWSParse)
+                       resp := NewWSResponse(NTRequestStatus.String(), nil, respError)
+                       reply, err := json.Marshal(resp)
+                       if err != nil {
+                               log.WithFields(log.Fields{"module": logModule, "error": err}).Error("Failed to marshal parse failure reply")
+                               continue
+                       }
+
+                       c.SendMessage(reply, nil)
+                       continue
+               }
+
+               c.serviceRequestSem.acquire()
+               go func() {
+                       c.serviceRequest(request.Topic)
+                       c.serviceRequestSem.release()
+               }()
+       }
+
+       // Ensure the connection is closed.
+       c.Disconnect()
+       c.wg.Done()
+       log.WithFields(log.Fields{"module": logModule, "remoteAddress": c.addr}).Debug("Websocket client input handler done")
+}
+
+func (c *WSClient) serviceRequest(topic string) {
+       var respErr error
+
+       if wsHandler, ok := wsHandlers[topic]; ok {
+               wsHandler(c)
+       } else {
+               err := fmt.Errorf("There is not this topic: %s", topic)
+               respErr = errors.Wrap(err, ErrWSInternal)
+               log.WithFields(log.Fields{"module": logModule, "topic": topic}).Debug("There is not this topic")
+       }
+
+       resp := NewWSResponse(NTRequestStatus.String(), nil, respErr)
+       reply, err := json.Marshal(resp)
+       if err != nil {
+               log.WithFields(log.Fields{"module": logModule, "error": err}).Debug("Failed to marshal parse failure reply")
+               return
+       }
+
+       c.SendMessage(reply, nil)
+}
+
+// notificationQueueHandler handles the queuing of outgoing notifications for  the websocket client.
+func (c *WSClient) notificationQueueHandler() {
+       ntfnSentChan := make(chan bool, 1) // nonblocking sync
+
+       // pendingNtfns is used as a queue for notifications that are ready to
+       // be sent once there are no outstanding notifications currently being
+       // sent.
+       pendingNtfns := list.New()
+       waiting := false
+out:
+       for {
+               select {
+               // This channel is notified when a message is being queued to
+               // be sent across the network socket.  It will either send the
+               // message immediately if a send is not already in progress, or
+               // queue the message to be sent once the other pending messages
+               // are sent.
+               case msg := <-c.ntfnChan:
+                       if !waiting {
+                               c.SendMessage(msg, ntfnSentChan)
+                       } else {
+                               pendingNtfns.PushBack(msg)
+                       }
+                       waiting = true
+               // This channel is notified when a notification has been sent across the network socket.
+               case <-ntfnSentChan:
+                       // No longer waiting if there are no more messages in the pending messages queue.
+                       next := pendingNtfns.Front()
+                       if next == nil {
+                               waiting = false
+                               continue
+                       }
+
+                       // Notify the outHandler about the next item to asynchronously send.
+                       msg := pendingNtfns.Remove(next).([]byte)
+                       c.SendMessage(msg, ntfnSentChan)
+               case <-c.quit:
+                       break out
+               }
+       }
+
+       // Drain any wait channels before exiting so nothing is left waiting around to send.
+cleanup:
+       for {
+               select {
+               case <-c.ntfnChan:
+               case <-ntfnSentChan:
+               default:
+                       break cleanup
+               }
+       }
+       c.wg.Done()
+       log.WithFields(log.Fields{"module": logModule, "remoteAddress": c.addr}).Debug("Websocket client notification queue handler done")
+}
+
+// outHandler handles all outgoing messages for the websocket connection.
+func (c *WSClient) outHandler() {
+out:
+       for {
+               // Send any messages ready for send until the quit channel is closed.
+               select {
+               case r := <-c.sendChan:
+                       if err := c.conn.WriteMessage(websocket.TextMessage, r.msg); err != nil {
+                               log.WithFields(log.Fields{"module": logModule, "error": err}).Error("Failed to send message to wesocket client")
+                               c.Disconnect()
+                               break out
+                       }
+                       if r.doneChan != nil {
+                               r.doneChan <- true
+                       }
+               case <-c.quit:
+                       break out
+               }
+       }
+
+       // Drain any wait channels before exiting so nothing is left waiting around to send.
+cleanup:
+       for {
+               select {
+               case r := <-c.sendChan:
+                       if r.doneChan != nil {
+                               r.doneChan <- false
+                       }
+               default:
+                       break cleanup
+               }
+       }
+       c.wg.Done()
+       log.WithFields(log.Fields{"module": logModule, "remoteAddress": c.addr}).Debug("Websocket client output handler done")
+}
+
+// SendMessage sends the passed json to the websocket client.  It is backed
+// by a buffered channel, so it will not block until the send channel is full.
+// Note however that QueueNotification must be used for sending async
+// notifications instead of the this function.  This approach allows a limit to
+// the number of outstanding requests a client can make without preventing or
+// blocking on async notifications.
+func (c *WSClient) SendMessage(marshalledJSON []byte, doneChan chan bool) {
+       // Don't send the message if disconnected.
+       if c.Disconnected() {
+               if doneChan != nil {
+                       doneChan <- false
+               }
+               return
+       }
+
+       c.sendChan <- responseMessage{msg: marshalledJSON, doneChan: doneChan}
+}
+
+// QueueNotification queues the passed notification to be sent to the websocket client.
+func (c *WSClient) QueueNotification(marshalledJSON []byte) error {
+       // Don't queue the message if disconnected.
+       if c.Disconnected() {
+               return ErrWSClientQuit
+       }
+
+       c.ntfnChan <- marshalledJSON
+       return nil
+}
+
+// Disconnected returns whether or not the websocket client is disconnected.
+func (c *WSClient) Disconnected() bool {
+       c.Lock()
+       defer c.Unlock()
+
+       return c.disconnected
+}
+
+// Disconnect disconnects the websocket client.
+func (c *WSClient) Disconnect() {
+       c.Lock()
+       defer c.Unlock()
+
+       // Nothing to do if already disconnected.
+       if c.disconnected {
+               return
+       }
+
+       log.WithFields(log.Fields{"module": logModule, "remoteAddress": c.addr}).Info("Disconnecting websocket client")
+
+       close(c.quit)
+       c.conn.Close()
+       c.disconnected = true
+}
+
+// Start begins processing input and output messages.
+func (c *WSClient) Start() {
+       log.WithFields(log.Fields{"module": logModule, "remoteAddress": c.addr}).Info("Starting websocket client")
+
+       c.wg.Add(3)
+       go c.inHandler()
+       go c.notificationQueueHandler()
+       go c.outHandler()
+}
+
+// WaitForShutdown blocks until the websocket client goroutines are stopped and the connection is closed.
+func (c *WSClient) WaitForShutdown() {
+       c.wg.Wait()
+}
+
+// handleNotifyBlocks implements the notifyblocks topic extension for websocket connections.
+func handleNotifyBlocks(wsc *WSClient) {
+       wsc.notificationMgr.RegisterBlockUpdates(wsc)
+}
+
+// handleStopNotifyBlocks implements the stopnotifyblocks topic extension for websocket connections.
+func handleStopNotifyBlocks(wsc *WSClient) {
+       wsc.notificationMgr.UnregisterBlockUpdates(wsc)
+}
+
+// handleNotifyNewTransations implements the notifynewtransactions topic extension for websocket connections.
+func handleNotifyNewTransactions(wsc *WSClient) {
+       wsc.notificationMgr.RegisterNewMempoolTxsUpdates(wsc)
+}
+
+// handleStopNotifyNewTransations implements the stopnotifynewtransactions topic extension for websocket connections.
+func handleStopNotifyNewTransactions(wsc *WSClient) {
+       wsc.notificationMgr.UnregisterNewMempoolTxsUpdates(wsc)
+}