--- /dev/null
+package websocket
+
+import (
+ "encoding/json"
+ "fmt"
+ "sync"
+
+ log "github.com/sirupsen/logrus"
+
+ "github.com/vapor/event"
+ "github.com/vapor/protocol"
+ "github.com/vapor/protocol/bc"
+ "github.com/vapor/protocol/bc/types"
+)
+
+// Notification types
+type notificationBlockConnected types.Block
+type notificationBlockDisconnected types.Block
+type notificationTxDescAcceptedByMempool protocol.TxDesc
+
+// Notification control requests
+type notificationRegisterClient WSClient
+type notificationUnregisterClient WSClient
+type notificationRegisterBlocks WSClient
+type notificationUnregisterBlocks WSClient
+type notificationRegisterNewMempoolTxs WSClient
+type notificationUnregisterNewMempoolTxs WSClient
+
+// NotificationType represents the type of a notification message.
+type NotificationType int
+
+// Constants for the type of a notification message.
+const (
+ // NTBlockConnected indicates the associated block was connected to the main chain.
+ NTRawBlockConnected NotificationType = iota
+ // NTBlockDisconnected indicates the associated block was disconnected from the main chain.
+ NTRawBlockDisconnected
+ NTNewTransaction
+ NTRequestStatus
+)
+
+// notificationTypeStrings is a map of notification types back to their constant
+// names for pretty printing.
+var notificationTypeStrings = map[NotificationType]string{
+ NTRawBlockConnected: "raw_blocks_connected",
+ NTRawBlockDisconnected: "raw_blocks_disconnected",
+ NTNewTransaction: "new_transaction",
+ NTRequestStatus: "request_status",
+}
+
+// String returns the NotificationType in human-readable form.
+func (n NotificationType) String() string {
+ if s, ok := notificationTypeStrings[n]; ok {
+ return s
+ }
+ return fmt.Sprintf("Unknown Notification Type (%d)", int(n))
+}
+
+type statusInfo struct {
+ BestHeight uint64
+ BestHash bc.Hash
+}
+
+// WSNotificationManager is a connection and notification manager used for
+// websockets. It allows websocket clients to register for notifications they
+// are interested in. When an event happens elsewhere in the code such as
+// transactions being added to the memory pool or block connects/disconnects,
+// the notification manager is provided with the relevant details needed to
+// figure out which websocket clients need to be notified based on what they
+// have registered for and notifies them accordingly. It is also used to keep
+// track of all connected websocket clients.
+type WSNotificationManager struct {
+ // queueNotification queues a notification for handling.
+ queueNotification chan interface{}
+
+ // notificationMsgs feeds notificationHandler with notifications
+ // and client (un)registeration requests from a queue as well as
+ // registeration and unregisteration requests from clients.
+ notificationMsgs chan interface{}
+
+ // Access channel for current number of connected clients.
+ numClients chan int
+
+ // Shutdown handling
+ wg sync.WaitGroup
+ quit chan struct{}
+ MaxNumWebsockets int
+ maxNumConcurrentReqs int
+ status statusInfo
+ chain *protocol.Chain
+ eventDispatcher *event.Dispatcher
+ txMsgSub *event.Subscription
+}
+
+// NewWsNotificationManager returns a new notification manager ready for use. See WSNotificationManager for more details.
+func NewWsNotificationManager(maxNumWebsockets int, maxNumConcurrentReqs int, chain *protocol.Chain, dispatcher *event.Dispatcher) *WSNotificationManager {
+ // init status
+ var status statusInfo
+ header := chain.BestBlockHeader()
+ status.BestHeight = header.Height
+ status.BestHash = header.Hash()
+
+ return &WSNotificationManager{
+ queueNotification: make(chan interface{}),
+ notificationMsgs: make(chan interface{}),
+ numClients: make(chan int),
+ quit: make(chan struct{}),
+ MaxNumWebsockets: maxNumWebsockets,
+ maxNumConcurrentReqs: maxNumConcurrentReqs,
+ status: status,
+ chain: chain,
+ eventDispatcher: dispatcher,
+ }
+}
+
+// queueHandler manages a queue of empty interfaces, reading from in and
+// sending the oldest unsent to out. This handler stops when either of the
+// in or quit channels are closed, and closes out before returning, without
+// waiting to send any variables still remaining in the queue.
+func queueHandler(in <-chan interface{}, out chan<- interface{}, quit <-chan struct{}) {
+ var (
+ q []interface{}
+ next interface{}
+ dequeue chan<- interface{}
+ )
+
+ skipQueue := out
+
+out:
+ for {
+ select {
+ case n, ok := <-in:
+ if !ok {
+ // Sender closed input channel.
+ break out
+ }
+
+ // Either send to out immediately if skipQueue is
+ // non-nil (queue is empty) and reader is ready,
+ // or append to the queue and send later.
+ select {
+ case skipQueue <- n:
+
+ default:
+ q = append(q, n)
+ dequeue = out
+ skipQueue = nil
+ next = q[0]
+ }
+
+ case dequeue <- next:
+ copy(q, q[1:])
+ q[len(q)-1] = nil // avoid leak
+ q = q[:len(q)-1]
+ if len(q) == 0 {
+ dequeue = nil
+ skipQueue = out
+ } else {
+ next = q[0]
+ }
+
+ case <-quit:
+ break out
+ }
+ }
+ close(out)
+}
+
+func (m *WSNotificationManager) sendNotification(typ NotificationType, data interface{}) {
+ switch typ {
+ case NTRawBlockConnected:
+ block, ok := data.(*types.Block)
+ if !ok {
+ log.WithField("module", logModule).Error("Chain connected notification is not a block")
+ break
+ }
+ m.status.BestHeight = block.Height
+ m.status.BestHash = block.Hash()
+ // Notify registered websocket clients of incoming block.
+ m.NotifyBlockConnected(block)
+
+ case NTRawBlockDisconnected:
+ block, ok := data.(*types.Block)
+ if !ok {
+ log.WithField("module", logModule).Error("Chain disconnected notification is not a block")
+ break
+ }
+ m.status.BestHeight = block.Height - 1
+ m.status.BestHash = block.PreviousBlockHash
+ // Notify registered websocket clients.
+ m.NotifyBlockDisconnected(block)
+ }
+}
+
+// queueHandler maintains a queue of notifications and notification handler
+// control messages.
+func (m *WSNotificationManager) queueHandler() {
+ queueHandler(m.queueNotification, m.notificationMsgs, m.quit)
+ m.wg.Done()
+}
+
+// NotifyBlockConnected passes a block newly-connected to the best chain
+// to the notification manager for block and transaction notification
+// processing.
+func (m *WSNotificationManager) NotifyBlockConnected(block *types.Block) {
+ select {
+ case m.queueNotification <- (*notificationBlockConnected)(block):
+ case <-m.quit:
+ }
+}
+
+// NotifyBlockDisconnected passes a block disconnected from the best chain
+// to the notification manager for block notification processing.
+func (m *WSNotificationManager) NotifyBlockDisconnected(block *types.Block) {
+ select {
+ case m.queueNotification <- (*notificationBlockDisconnected)(block):
+ case <-m.quit:
+ }
+}
+
+// memPoolTxQueryLoop constantly pass a transaction accepted by mempool to the
+// notification manager for transaction notification processing.
+func (m *WSNotificationManager) memPoolTxQueryLoop() {
+out:
+ for {
+ select {
+ case obj, ok := <-m.txMsgSub.Chan():
+ if !ok {
+ log.WithFields(log.Fields{"module": logModule}).Warning("tx pool tx msg subscription channel closed")
+ break out
+ }
+
+ ev, ok := obj.Data.(protocol.TxMsgEvent)
+ if !ok {
+ log.WithFields(log.Fields{"module": logModule}).Error("event type error")
+ continue
+ }
+
+ if ev.TxMsg.MsgType == protocol.MsgNewTx {
+ select {
+ case m.queueNotification <- (*notificationTxDescAcceptedByMempool)(ev.TxMsg.TxDesc):
+ default:
+ }
+ }
+ case <-m.quit:
+ break out
+ }
+ }
+
+ m.wg.Done()
+}
+
+// notificationHandler reads notifications and control messages from the queue handler and processes one at a time.
+func (m *WSNotificationManager) notificationHandler() {
+ // clients is a map of all currently connected websocket clients.
+ clients := make(map[chan struct{}]*WSClient)
+ blockNotifications := make(map[chan struct{}]*WSClient)
+ txNotifications := make(map[chan struct{}]*WSClient)
+
+out:
+ for {
+ select {
+ case n, ok := <-m.notificationMsgs:
+ if !ok {
+ break out
+ }
+ switch n := n.(type) {
+ case *notificationBlockConnected:
+ block := (*types.Block)(n)
+ if len(blockNotifications) != 0 {
+ m.notifyBlockConnected(blockNotifications, block)
+ }
+
+ case *notificationBlockDisconnected:
+ block := (*types.Block)(n)
+ if len(blockNotifications) != 0 {
+ m.notifyBlockDisconnected(blockNotifications, block)
+ }
+
+ case *notificationTxDescAcceptedByMempool:
+ txDesc := (*protocol.TxDesc)(n)
+ if len(txNotifications) != 0 {
+ m.notifyForNewTx(txNotifications, txDesc)
+ }
+
+ case *notificationRegisterBlocks:
+ wsc := (*WSClient)(n)
+ blockNotifications[wsc.quit] = wsc
+
+ case *notificationUnregisterBlocks:
+ wsc := (*WSClient)(n)
+ delete(blockNotifications, wsc.quit)
+
+ case *notificationRegisterNewMempoolTxs:
+ wsc := (*WSClient)(n)
+ txNotifications[wsc.quit] = wsc
+
+ case *notificationUnregisterNewMempoolTxs:
+ wsc := (*WSClient)(n)
+ delete(txNotifications, wsc.quit)
+
+ case *notificationRegisterClient:
+ wsc := (*WSClient)(n)
+ clients[wsc.quit] = wsc
+
+ case *notificationUnregisterClient:
+ wsc := (*WSClient)(n)
+ delete(blockNotifications, wsc.quit)
+ delete(txNotifications, wsc.quit)
+ delete(clients, wsc.quit)
+
+ default:
+ log.Warnf("Unhandled notification type")
+ }
+
+ case m.numClients <- len(clients):
+
+ case <-m.quit:
+ break out
+ }
+ }
+
+ for _, c := range clients {
+ c.Disconnect()
+ }
+ m.wg.Done()
+}
+
+// NumClients returns the number of clients actively being served.
+func (m *WSNotificationManager) NumClients() (n int) {
+ select {
+ case n = <-m.numClients:
+ case <-m.quit:
+ }
+ return
+}
+
+// IsMaxConnect returns whether the maximum connection is exceeded
+func (m *WSNotificationManager) IsMaxConnect() bool {
+ return m.NumClients() >= m.MaxNumWebsockets
+}
+
+// RegisterBlockUpdates requests block update notifications to the passed websocket client.
+func (m *WSNotificationManager) RegisterBlockUpdates(wsc *WSClient) {
+ m.queueNotification <- (*notificationRegisterBlocks)(wsc)
+}
+
+// UnregisterBlockUpdates removes block update notifications for the passed websocket client.
+func (m *WSNotificationManager) UnregisterBlockUpdates(wsc *WSClient) {
+ m.queueNotification <- (*notificationUnregisterBlocks)(wsc)
+}
+
+// notifyBlockConnected notifies websocket clients that have registered for block updates when a block is connected to the main chain.
+func (*WSNotificationManager) notifyBlockConnected(clients map[chan struct{}]*WSClient, block *types.Block) {
+ resp := NewWSResponse(NTRawBlockConnected.String(), block, nil)
+ marshalledJSON, err := json.Marshal(resp)
+ if err != nil {
+ log.WithFields(log.Fields{"module": logModule, "error": err}).Error("Failed to marshal block connected notification")
+ return
+ }
+
+ for _, wsc := range clients {
+ wsc.QueueNotification(marshalledJSON)
+ }
+}
+
+// notifyBlockDisconnected notifies websocket clients that have registered for block updates
+// when a block is disconnected from the main chain (due to a reorganize).
+func (*WSNotificationManager) notifyBlockDisconnected(clients map[chan struct{}]*WSClient, block *types.Block) {
+ resp := NewWSResponse(NTRawBlockDisconnected.String(), block, nil)
+ marshalledJSON, err := json.Marshal(resp)
+ if err != nil {
+ log.WithField("error", err).Error("Failed to marshal block Disconnected notification")
+ return
+ }
+
+ for _, wsc := range clients {
+ wsc.QueueNotification(marshalledJSON)
+ }
+}
+
+// RegisterNewMempoolTxsUpdates requests notifications to the passed websocket
+// client when new transactions are added to the memory pool.
+func (m *WSNotificationManager) RegisterNewMempoolTxsUpdates(wsc *WSClient) {
+ m.queueNotification <- (*notificationRegisterNewMempoolTxs)(wsc)
+}
+
+// UnregisterNewMempoolTxsUpdates removes notifications to the passed websocket
+// client when new transaction are added to the memory pool.
+func (m *WSNotificationManager) UnregisterNewMempoolTxsUpdates(wsc *WSClient) {
+ m.queueNotification <- (*notificationUnregisterNewMempoolTxs)(wsc)
+}
+
+// notifyForNewTx notifies websocket clients that have registered for updates
+// when a new transaction is added to the memory pool.
+func (m *WSNotificationManager) notifyForNewTx(clients map[chan struct{}]*WSClient, txDesc *protocol.TxDesc) {
+ resp := NewWSResponse(NTNewTransaction.String(), txDesc, nil)
+ marshalledJSON, err := json.Marshal(resp)
+ if err != nil {
+ log.WithFields(log.Fields{"module": logModule, "error": err}).Error("Failed to marshal tx notification")
+ return
+ }
+
+ for _, wsc := range clients {
+ wsc.QueueNotification(marshalledJSON)
+ }
+}
+
+// AddClient adds the passed websocket client to the notification manager.
+func (m *WSNotificationManager) AddClient(wsc *WSClient) {
+ m.queueNotification <- (*notificationRegisterClient)(wsc)
+}
+
+// RemoveClient removes the passed websocket client and all notifications registered for it.
+func (m *WSNotificationManager) RemoveClient(wsc *WSClient) {
+ select {
+ case m.queueNotification <- (*notificationUnregisterClient)(wsc):
+ case <-m.quit:
+ }
+}
+
+func (m *WSNotificationManager) blockNotify() {
+out:
+ for {
+ select {
+ case <-m.quit:
+ break out
+
+ default:
+ }
+ for !m.chain.InMainChain(m.status.BestHash) {
+ block, err := m.chain.GetBlockByHash(&m.status.BestHash)
+ if err != nil {
+ log.WithFields(log.Fields{"module": logModule, "err": err}).Error("blockNotify GetBlockByHash")
+ return
+ }
+
+ m.sendNotification(NTRawBlockDisconnected, block)
+ }
+
+ block, _ := m.chain.GetBlockByHeight(m.status.BestHeight + 1)
+ if block == nil {
+ m.blockWaiter()
+ continue
+ }
+
+ if m.status.BestHash != block.PreviousBlockHash {
+ log.WithFields(log.Fields{"module": logModule, "blockHeight": block.Height, "previousBlockHash": m.status.BestHash, "rcvBlockPrevHash": block.PreviousBlockHash}).Warning("The previousBlockHash of the received block is not the same as the hash of the previous block")
+ continue
+ }
+
+ m.sendNotification(NTRawBlockConnected, block)
+ }
+ m.wg.Done()
+}
+
+func (m *WSNotificationManager) blockWaiter() {
+ select {
+ case <-m.chain.BlockWaiter(m.status.BestHeight + 1):
+ case <-m.quit:
+ }
+}
+
+// Start starts the goroutines required for the manager to queue and process websocket client notifications.
+func (m *WSNotificationManager) Start() error {
+ var err error
+ m.txMsgSub, err = m.eventDispatcher.Subscribe(protocol.TxMsgEvent{})
+ if err != nil {
+ return err
+ }
+
+ m.wg.Add(4)
+ go m.blockNotify()
+ go m.queueHandler()
+ go m.notificationHandler()
+ go m.memPoolTxQueryLoop()
+ return nil
+}
+
+// WaitForShutdown blocks until all notification manager goroutines have finished.
+func (m *WSNotificationManager) WaitForShutdown() {
+ m.wg.Wait()
+}
+
+// Shutdown shuts down the manager, stopping the notification queue and notification handler goroutines.
+func (m *WSNotificationManager) Shutdown() {
+ close(m.quit)
+}