OSDN Git Service

Hulk did something
[bytom/vapor.git] / net / websocket / wsnotificationmaneger.go
diff --git a/net/websocket/wsnotificationmaneger.go b/net/websocket/wsnotificationmaneger.go
new file mode 100644 (file)
index 0000000..f92e6ef
--- /dev/null
@@ -0,0 +1,488 @@
+package websocket
+
+import (
+       "encoding/json"
+       "fmt"
+       "sync"
+
+       log "github.com/sirupsen/logrus"
+
+       "github.com/vapor/event"
+       "github.com/vapor/protocol"
+       "github.com/vapor/protocol/bc"
+       "github.com/vapor/protocol/bc/types"
+)
+
+// Notification types
+type notificationBlockConnected types.Block
+type notificationBlockDisconnected types.Block
+type notificationTxDescAcceptedByMempool protocol.TxDesc
+
+// Notification control requests
+type notificationRegisterClient WSClient
+type notificationUnregisterClient WSClient
+type notificationRegisterBlocks WSClient
+type notificationUnregisterBlocks WSClient
+type notificationRegisterNewMempoolTxs WSClient
+type notificationUnregisterNewMempoolTxs WSClient
+
+// NotificationType represents the type of a notification message.
+type NotificationType int
+
+// Constants for the type of a notification message.
+const (
+       // NTBlockConnected indicates the associated block was connected to the main chain.
+       NTRawBlockConnected NotificationType = iota
+       // NTBlockDisconnected indicates the associated block was disconnected  from the main chain.
+       NTRawBlockDisconnected
+       NTNewTransaction
+       NTRequestStatus
+)
+
+// notificationTypeStrings is a map of notification types back to their constant
+// names for pretty printing.
+var notificationTypeStrings = map[NotificationType]string{
+       NTRawBlockConnected:    "raw_blocks_connected",
+       NTRawBlockDisconnected: "raw_blocks_disconnected",
+       NTNewTransaction:       "new_transaction",
+       NTRequestStatus:        "request_status",
+}
+
+// String returns the NotificationType in human-readable form.
+func (n NotificationType) String() string {
+       if s, ok := notificationTypeStrings[n]; ok {
+               return s
+       }
+       return fmt.Sprintf("Unknown Notification Type (%d)", int(n))
+}
+
+type statusInfo struct {
+       BestHeight uint64
+       BestHash   bc.Hash
+}
+
+// WSNotificationManager is a connection and notification manager used for
+// websockets.  It allows websocket clients to register for notifications they
+// are interested in.  When an event happens elsewhere in the code such as
+// transactions being added to the memory pool or block connects/disconnects,
+// the notification manager is provided with the relevant details needed to
+// figure out which websocket clients need to be notified based on what they
+// have registered for and notifies them accordingly.  It is also used to keep
+// track of all connected websocket clients.
+type WSNotificationManager struct {
+       // queueNotification queues a notification for handling.
+       queueNotification chan interface{}
+
+       // notificationMsgs feeds notificationHandler with notifications
+       // and client (un)registeration requests from a queue as well as
+       // registeration and unregisteration requests from clients.
+       notificationMsgs chan interface{}
+
+       // Access channel for current number of connected clients.
+       numClients chan int
+
+       // Shutdown handling
+       wg                   sync.WaitGroup
+       quit                 chan struct{}
+       MaxNumWebsockets     int
+       maxNumConcurrentReqs int
+       status               statusInfo
+       chain                *protocol.Chain
+       eventDispatcher      *event.Dispatcher
+       txMsgSub             *event.Subscription
+}
+
+// NewWsNotificationManager returns a new notification manager ready for use. See WSNotificationManager for more details.
+func NewWsNotificationManager(maxNumWebsockets int, maxNumConcurrentReqs int, chain *protocol.Chain, dispatcher *event.Dispatcher) *WSNotificationManager {
+       // init status
+       var status statusInfo
+       header := chain.BestBlockHeader()
+       status.BestHeight = header.Height
+       status.BestHash = header.Hash()
+
+       return &WSNotificationManager{
+               queueNotification:    make(chan interface{}),
+               notificationMsgs:     make(chan interface{}),
+               numClients:           make(chan int),
+               quit:                 make(chan struct{}),
+               MaxNumWebsockets:     maxNumWebsockets,
+               maxNumConcurrentReqs: maxNumConcurrentReqs,
+               status:               status,
+               chain:                chain,
+               eventDispatcher:      dispatcher,
+       }
+}
+
+// queueHandler manages a queue of empty interfaces, reading from in and
+// sending the oldest unsent to out.  This handler stops when either of the
+// in or quit channels are closed, and closes out before returning, without
+// waiting to send any variables still remaining in the queue.
+func queueHandler(in <-chan interface{}, out chan<- interface{}, quit <-chan struct{}) {
+       var (
+               q       []interface{}
+               next    interface{}
+               dequeue chan<- interface{}
+       )
+
+       skipQueue := out
+
+out:
+       for {
+               select {
+               case n, ok := <-in:
+                       if !ok {
+                               // Sender closed input channel.
+                               break out
+                       }
+
+                       // Either send to out immediately if skipQueue is
+                       // non-nil (queue is empty) and reader is ready,
+                       // or append to the queue and send later.
+                       select {
+                       case skipQueue <- n:
+
+                       default:
+                               q = append(q, n)
+                               dequeue = out
+                               skipQueue = nil
+                               next = q[0]
+                       }
+
+               case dequeue <- next:
+                       copy(q, q[1:])
+                       q[len(q)-1] = nil // avoid leak
+                       q = q[:len(q)-1]
+                       if len(q) == 0 {
+                               dequeue = nil
+                               skipQueue = out
+                       } else {
+                               next = q[0]
+                       }
+
+               case <-quit:
+                       break out
+               }
+       }
+       close(out)
+}
+
+func (m *WSNotificationManager) sendNotification(typ NotificationType, data interface{}) {
+       switch typ {
+       case NTRawBlockConnected:
+               block, ok := data.(*types.Block)
+               if !ok {
+                       log.WithField("module", logModule).Error("Chain connected notification is not a block")
+                       break
+               }
+               m.status.BestHeight = block.Height
+               m.status.BestHash = block.Hash()
+               // Notify registered websocket clients of incoming block.
+               m.NotifyBlockConnected(block)
+
+       case NTRawBlockDisconnected:
+               block, ok := data.(*types.Block)
+               if !ok {
+                       log.WithField("module", logModule).Error("Chain disconnected notification is not a block")
+                       break
+               }
+               m.status.BestHeight = block.Height - 1
+               m.status.BestHash = block.PreviousBlockHash
+               // Notify registered websocket clients.
+               m.NotifyBlockDisconnected(block)
+       }
+}
+
+// queueHandler maintains a queue of notifications and notification handler
+// control messages.
+func (m *WSNotificationManager) queueHandler() {
+       queueHandler(m.queueNotification, m.notificationMsgs, m.quit)
+       m.wg.Done()
+}
+
+// NotifyBlockConnected passes a block newly-connected to the best chain
+// to the notification manager for block and transaction notification
+// processing.
+func (m *WSNotificationManager) NotifyBlockConnected(block *types.Block) {
+       select {
+       case m.queueNotification <- (*notificationBlockConnected)(block):
+       case <-m.quit:
+       }
+}
+
+// NotifyBlockDisconnected passes a block disconnected from the best chain
+// to the notification manager for block notification processing.
+func (m *WSNotificationManager) NotifyBlockDisconnected(block *types.Block) {
+       select {
+       case m.queueNotification <- (*notificationBlockDisconnected)(block):
+       case <-m.quit:
+       }
+}
+
+// memPoolTxQueryLoop constantly pass a transaction accepted by mempool to the
+// notification manager for transaction notification processing.
+func (m *WSNotificationManager) memPoolTxQueryLoop() {
+out:
+       for {
+               select {
+               case obj, ok := <-m.txMsgSub.Chan():
+                       if !ok {
+                               log.WithFields(log.Fields{"module": logModule}).Warning("tx pool tx msg subscription channel closed")
+                               break out
+                       }
+
+                       ev, ok := obj.Data.(protocol.TxMsgEvent)
+                       if !ok {
+                               log.WithFields(log.Fields{"module": logModule}).Error("event type error")
+                               continue
+                       }
+
+                       if ev.TxMsg.MsgType == protocol.MsgNewTx {
+                               select {
+                               case m.queueNotification <- (*notificationTxDescAcceptedByMempool)(ev.TxMsg.TxDesc):
+                               default:
+                               }
+                       }
+               case <-m.quit:
+                       break out
+               }
+       }
+
+       m.wg.Done()
+}
+
+// notificationHandler reads notifications and control messages from the queue handler and processes one at a time.
+func (m *WSNotificationManager) notificationHandler() {
+       // clients is a map of all currently connected websocket clients.
+       clients := make(map[chan struct{}]*WSClient)
+       blockNotifications := make(map[chan struct{}]*WSClient)
+       txNotifications := make(map[chan struct{}]*WSClient)
+
+out:
+       for {
+               select {
+               case n, ok := <-m.notificationMsgs:
+                       if !ok {
+                               break out
+                       }
+                       switch n := n.(type) {
+                       case *notificationBlockConnected:
+                               block := (*types.Block)(n)
+                               if len(blockNotifications) != 0 {
+                                       m.notifyBlockConnected(blockNotifications, block)
+                               }
+
+                       case *notificationBlockDisconnected:
+                               block := (*types.Block)(n)
+                               if len(blockNotifications) != 0 {
+                                       m.notifyBlockDisconnected(blockNotifications, block)
+                               }
+
+                       case *notificationTxDescAcceptedByMempool:
+                               txDesc := (*protocol.TxDesc)(n)
+                               if len(txNotifications) != 0 {
+                                       m.notifyForNewTx(txNotifications, txDesc)
+                               }
+
+                       case *notificationRegisterBlocks:
+                               wsc := (*WSClient)(n)
+                               blockNotifications[wsc.quit] = wsc
+
+                       case *notificationUnregisterBlocks:
+                               wsc := (*WSClient)(n)
+                               delete(blockNotifications, wsc.quit)
+
+                       case *notificationRegisterNewMempoolTxs:
+                               wsc := (*WSClient)(n)
+                               txNotifications[wsc.quit] = wsc
+
+                       case *notificationUnregisterNewMempoolTxs:
+                               wsc := (*WSClient)(n)
+                               delete(txNotifications, wsc.quit)
+
+                       case *notificationRegisterClient:
+                               wsc := (*WSClient)(n)
+                               clients[wsc.quit] = wsc
+
+                       case *notificationUnregisterClient:
+                               wsc := (*WSClient)(n)
+                               delete(blockNotifications, wsc.quit)
+                               delete(txNotifications, wsc.quit)
+                               delete(clients, wsc.quit)
+
+                       default:
+                               log.Warnf("Unhandled notification type")
+                       }
+
+               case m.numClients <- len(clients):
+
+               case <-m.quit:
+                       break out
+               }
+       }
+
+       for _, c := range clients {
+               c.Disconnect()
+       }
+       m.wg.Done()
+}
+
+// NumClients returns the number of clients actively being served.
+func (m *WSNotificationManager) NumClients() (n int) {
+       select {
+       case n = <-m.numClients:
+       case <-m.quit:
+       }
+       return
+}
+
+// IsMaxConnect returns whether the maximum connection is exceeded
+func (m *WSNotificationManager) IsMaxConnect() bool {
+       return m.NumClients() >= m.MaxNumWebsockets
+}
+
+// RegisterBlockUpdates requests block update notifications to the passed websocket client.
+func (m *WSNotificationManager) RegisterBlockUpdates(wsc *WSClient) {
+       m.queueNotification <- (*notificationRegisterBlocks)(wsc)
+}
+
+// UnregisterBlockUpdates removes block update notifications for the passed websocket client.
+func (m *WSNotificationManager) UnregisterBlockUpdates(wsc *WSClient) {
+       m.queueNotification <- (*notificationUnregisterBlocks)(wsc)
+}
+
+// notifyBlockConnected notifies websocket clients that have registered for block updates when a block is connected to the main chain.
+func (*WSNotificationManager) notifyBlockConnected(clients map[chan struct{}]*WSClient, block *types.Block) {
+       resp := NewWSResponse(NTRawBlockConnected.String(), block, nil)
+       marshalledJSON, err := json.Marshal(resp)
+       if err != nil {
+               log.WithFields(log.Fields{"module": logModule, "error": err}).Error("Failed to marshal block connected notification")
+               return
+       }
+
+       for _, wsc := range clients {
+               wsc.QueueNotification(marshalledJSON)
+       }
+}
+
+// notifyBlockDisconnected notifies websocket clients that have registered for block updates
+// when a block is disconnected from the main chain (due to a reorganize).
+func (*WSNotificationManager) notifyBlockDisconnected(clients map[chan struct{}]*WSClient, block *types.Block) {
+       resp := NewWSResponse(NTRawBlockDisconnected.String(), block, nil)
+       marshalledJSON, err := json.Marshal(resp)
+       if err != nil {
+               log.WithField("error", err).Error("Failed to marshal block Disconnected notification")
+               return
+       }
+
+       for _, wsc := range clients {
+               wsc.QueueNotification(marshalledJSON)
+       }
+}
+
+// RegisterNewMempoolTxsUpdates requests notifications to the passed websocket
+// client when new transactions are added to the memory pool.
+func (m *WSNotificationManager) RegisterNewMempoolTxsUpdates(wsc *WSClient) {
+       m.queueNotification <- (*notificationRegisterNewMempoolTxs)(wsc)
+}
+
+// UnregisterNewMempoolTxsUpdates removes notifications to the passed websocket
+// client when new transaction are added to the memory pool.
+func (m *WSNotificationManager) UnregisterNewMempoolTxsUpdates(wsc *WSClient) {
+       m.queueNotification <- (*notificationUnregisterNewMempoolTxs)(wsc)
+}
+
+// notifyForNewTx notifies websocket clients that have registered for updates
+// when a new transaction is added to the memory pool.
+func (m *WSNotificationManager) notifyForNewTx(clients map[chan struct{}]*WSClient, txDesc *protocol.TxDesc) {
+       resp := NewWSResponse(NTNewTransaction.String(), txDesc, nil)
+       marshalledJSON, err := json.Marshal(resp)
+       if err != nil {
+               log.WithFields(log.Fields{"module": logModule, "error": err}).Error("Failed to marshal tx notification")
+               return
+       }
+
+       for _, wsc := range clients {
+               wsc.QueueNotification(marshalledJSON)
+       }
+}
+
+// AddClient adds the passed websocket client to the notification manager.
+func (m *WSNotificationManager) AddClient(wsc *WSClient) {
+       m.queueNotification <- (*notificationRegisterClient)(wsc)
+}
+
+// RemoveClient removes the passed websocket client and all notifications registered for it.
+func (m *WSNotificationManager) RemoveClient(wsc *WSClient) {
+       select {
+       case m.queueNotification <- (*notificationUnregisterClient)(wsc):
+       case <-m.quit:
+       }
+}
+
+func (m *WSNotificationManager) blockNotify() {
+out:
+       for {
+               select {
+               case <-m.quit:
+                       break out
+
+               default:
+               }
+               for !m.chain.InMainChain(m.status.BestHash) {
+                       block, err := m.chain.GetBlockByHash(&m.status.BestHash)
+                       if err != nil {
+                               log.WithFields(log.Fields{"module": logModule, "err": err}).Error("blockNotify GetBlockByHash")
+                               return
+                       }
+
+                       m.sendNotification(NTRawBlockDisconnected, block)
+               }
+
+               block, _ := m.chain.GetBlockByHeight(m.status.BestHeight + 1)
+               if block == nil {
+                       m.blockWaiter()
+                       continue
+               }
+
+               if m.status.BestHash != block.PreviousBlockHash {
+                       log.WithFields(log.Fields{"module": logModule, "blockHeight": block.Height, "previousBlockHash": m.status.BestHash, "rcvBlockPrevHash": block.PreviousBlockHash}).Warning("The previousBlockHash of the received block is not the same as the hash of the previous block")
+                       continue
+               }
+
+               m.sendNotification(NTRawBlockConnected, block)
+       }
+       m.wg.Done()
+}
+
+func (m *WSNotificationManager) blockWaiter() {
+       select {
+       case <-m.chain.BlockWaiter(m.status.BestHeight + 1):
+       case <-m.quit:
+       }
+}
+
+// Start starts the goroutines required for the manager to queue and process websocket client notifications.
+func (m *WSNotificationManager) Start() error {
+       var err error
+       m.txMsgSub, err = m.eventDispatcher.Subscribe(protocol.TxMsgEvent{})
+       if err != nil {
+               return err
+       }
+
+       m.wg.Add(4)
+       go m.blockNotify()
+       go m.queueHandler()
+       go m.notificationHandler()
+       go m.memPoolTxQueryLoop()
+       return nil
+}
+
+// WaitForShutdown blocks until all notification manager goroutines have finished.
+func (m *WSNotificationManager) WaitForShutdown() {
+       m.wg.Wait()
+}
+
+// Shutdown shuts down the manager, stopping the notification queue and notification handler goroutines.
+func (m *WSNotificationManager) Shutdown() {
+       close(m.quit)
+}