8 log "github.com/sirupsen/logrus"
10 "github.com/vapor/event"
11 "github.com/vapor/protocol"
12 "github.com/vapor/protocol/bc"
13 "github.com/vapor/protocol/bc/types"
17 type notificationBlockConnected types.Block
18 type notificationBlockDisconnected types.Block
19 type notificationTxDescAcceptedByMempool protocol.TxDesc
21 // Notification control requests
22 type notificationRegisterClient WSClient
23 type notificationUnregisterClient WSClient
24 type notificationRegisterBlocks WSClient
25 type notificationUnregisterBlocks WSClient
26 type notificationRegisterNewMempoolTxs WSClient
27 type notificationUnregisterNewMempoolTxs WSClient
29 // NotificationType represents the type of a notification message.
30 type NotificationType int
32 // Constants for the type of a notification message.
34 // NTBlockConnected indicates the associated block was connected to the main chain.
35 NTRawBlockConnected NotificationType = iota
36 // NTBlockDisconnected indicates the associated block was disconnected from the main chain.
37 NTRawBlockDisconnected
42 // notificationTypeStrings is a map of notification types back to their constant
43 // names for pretty printing.
44 var notificationTypeStrings = map[NotificationType]string{
45 NTRawBlockConnected: "raw_blocks_connected",
46 NTRawBlockDisconnected: "raw_blocks_disconnected",
47 NTNewTransaction: "new_transaction",
48 NTRequestStatus: "request_status",
51 // String returns the NotificationType in human-readable form.
52 func (n NotificationType) String() string {
53 if s, ok := notificationTypeStrings[n]; ok {
56 return fmt.Sprintf("Unknown Notification Type (%d)", int(n))
59 type statusInfo struct {
64 // WSNotificationManager is a connection and notification manager used for
65 // websockets. It allows websocket clients to register for notifications they
66 // are interested in. When an event happens elsewhere in the code such as
67 // transactions being added to the memory pool or block connects/disconnects,
68 // the notification manager is provided with the relevant details needed to
69 // figure out which websocket clients need to be notified based on what they
70 // have registered for and notifies them accordingly. It is also used to keep
71 // track of all connected websocket clients.
72 type WSNotificationManager struct {
73 // queueNotification queues a notification for handling.
74 queueNotification chan interface{}
76 // notificationMsgs feeds notificationHandler with notifications
77 // and client (un)registeration requests from a queue as well as
78 // registeration and unregisteration requests from clients.
79 notificationMsgs chan interface{}
81 // Access channel for current number of connected clients.
88 maxNumConcurrentReqs int
91 eventDispatcher *event.Dispatcher
92 txMsgSub *event.Subscription
95 // NewWsNotificationManager returns a new notification manager ready for use. See WSNotificationManager for more details.
96 func NewWsNotificationManager(maxNumWebsockets int, maxNumConcurrentReqs int, chain *protocol.Chain, dispatcher *event.Dispatcher) *WSNotificationManager {
99 header := chain.BestBlockHeader()
100 status.BestHeight = header.Height
101 status.BestHash = header.Hash()
103 return &WSNotificationManager{
104 queueNotification: make(chan interface{}),
105 notificationMsgs: make(chan interface{}),
106 numClients: make(chan int),
107 quit: make(chan struct{}),
108 MaxNumWebsockets: maxNumWebsockets,
109 maxNumConcurrentReqs: maxNumConcurrentReqs,
112 eventDispatcher: dispatcher,
116 // queueHandler manages a queue of empty interfaces, reading from in and
117 // sending the oldest unsent to out. This handler stops when either of the
118 // in or quit channels are closed, and closes out before returning, without
119 // waiting to send any variables still remaining in the queue.
120 func queueHandler(in <-chan interface{}, out chan<- interface{}, quit <-chan struct{}) {
124 dequeue chan<- interface{}
134 // Sender closed input channel.
138 // Either send to out immediately if skipQueue is
139 // non-nil (queue is empty) and reader is ready,
140 // or append to the queue and send later.
151 case dequeue <- next:
153 q[len(q)-1] = nil // avoid leak
169 func (m *WSNotificationManager) sendNotification(typ NotificationType, data interface{}) {
171 case NTRawBlockConnected:
172 block, ok := data.(*types.Block)
174 log.WithField("module", logModule).Error("Chain connected notification is not a block")
177 m.status.BestHeight = block.Height
178 m.status.BestHash = block.Hash()
179 // Notify registered websocket clients of incoming block.
180 m.NotifyBlockConnected(block)
182 case NTRawBlockDisconnected:
183 block, ok := data.(*types.Block)
185 log.WithField("module", logModule).Error("Chain disconnected notification is not a block")
188 m.status.BestHeight = block.Height - 1
189 m.status.BestHash = block.PreviousBlockHash
190 // Notify registered websocket clients.
191 m.NotifyBlockDisconnected(block)
195 // queueHandler maintains a queue of notifications and notification handler
197 func (m *WSNotificationManager) queueHandler() {
198 queueHandler(m.queueNotification, m.notificationMsgs, m.quit)
202 // NotifyBlockConnected passes a block newly-connected to the best chain
203 // to the notification manager for block and transaction notification
205 func (m *WSNotificationManager) NotifyBlockConnected(block *types.Block) {
207 case m.queueNotification <- (*notificationBlockConnected)(block):
212 // NotifyBlockDisconnected passes a block disconnected from the best chain
213 // to the notification manager for block notification processing.
214 func (m *WSNotificationManager) NotifyBlockDisconnected(block *types.Block) {
216 case m.queueNotification <- (*notificationBlockDisconnected)(block):
221 // memPoolTxQueryLoop constantly pass a transaction accepted by mempool to the
222 // notification manager for transaction notification processing.
223 func (m *WSNotificationManager) memPoolTxQueryLoop() {
227 case obj, ok := <-m.txMsgSub.Chan():
229 log.WithFields(log.Fields{"module": logModule}).Warning("tx pool tx msg subscription channel closed")
233 ev, ok := obj.Data.(protocol.TxMsgEvent)
235 log.WithFields(log.Fields{"module": logModule}).Error("event type error")
239 if ev.TxMsg.MsgType == protocol.MsgNewTx {
241 case m.queueNotification <- (*notificationTxDescAcceptedByMempool)(ev.TxMsg.TxDesc):
253 // notificationHandler reads notifications and control messages from the queue handler and processes one at a time.
254 func (m *WSNotificationManager) notificationHandler() {
255 // clients is a map of all currently connected websocket clients.
256 clients := make(map[chan struct{}]*WSClient)
257 blockNotifications := make(map[chan struct{}]*WSClient)
258 txNotifications := make(map[chan struct{}]*WSClient)
263 case n, ok := <-m.notificationMsgs:
267 switch n := n.(type) {
268 case *notificationBlockConnected:
269 block := (*types.Block)(n)
270 if len(blockNotifications) != 0 {
271 m.notifyBlockConnected(blockNotifications, block)
274 case *notificationBlockDisconnected:
275 block := (*types.Block)(n)
276 if len(blockNotifications) != 0 {
277 m.notifyBlockDisconnected(blockNotifications, block)
280 case *notificationTxDescAcceptedByMempool:
281 txDesc := (*protocol.TxDesc)(n)
282 if len(txNotifications) != 0 {
283 m.notifyForNewTx(txNotifications, txDesc)
286 case *notificationRegisterBlocks:
287 wsc := (*WSClient)(n)
288 blockNotifications[wsc.quit] = wsc
290 case *notificationUnregisterBlocks:
291 wsc := (*WSClient)(n)
292 delete(blockNotifications, wsc.quit)
294 case *notificationRegisterNewMempoolTxs:
295 wsc := (*WSClient)(n)
296 txNotifications[wsc.quit] = wsc
298 case *notificationUnregisterNewMempoolTxs:
299 wsc := (*WSClient)(n)
300 delete(txNotifications, wsc.quit)
302 case *notificationRegisterClient:
303 wsc := (*WSClient)(n)
304 clients[wsc.quit] = wsc
306 case *notificationUnregisterClient:
307 wsc := (*WSClient)(n)
308 delete(blockNotifications, wsc.quit)
309 delete(txNotifications, wsc.quit)
310 delete(clients, wsc.quit)
313 log.Warnf("Unhandled notification type")
316 case m.numClients <- len(clients):
323 for _, c := range clients {
329 // NumClients returns the number of clients actively being served.
330 func (m *WSNotificationManager) NumClients() (n int) {
332 case n = <-m.numClients:
338 // IsMaxConnect returns whether the maximum connection is exceeded
339 func (m *WSNotificationManager) IsMaxConnect() bool {
340 return m.NumClients() >= m.MaxNumWebsockets
343 // RegisterBlockUpdates requests block update notifications to the passed websocket client.
344 func (m *WSNotificationManager) RegisterBlockUpdates(wsc *WSClient) {
345 m.queueNotification <- (*notificationRegisterBlocks)(wsc)
348 // UnregisterBlockUpdates removes block update notifications for the passed websocket client.
349 func (m *WSNotificationManager) UnregisterBlockUpdates(wsc *WSClient) {
350 m.queueNotification <- (*notificationUnregisterBlocks)(wsc)
353 // notifyBlockConnected notifies websocket clients that have registered for block updates when a block is connected to the main chain.
354 func (*WSNotificationManager) notifyBlockConnected(clients map[chan struct{}]*WSClient, block *types.Block) {
355 resp := NewWSResponse(NTRawBlockConnected.String(), block, nil)
356 marshalledJSON, err := json.Marshal(resp)
358 log.WithFields(log.Fields{"module": logModule, "error": err}).Error("Failed to marshal block connected notification")
362 for _, wsc := range clients {
363 wsc.QueueNotification(marshalledJSON)
367 // notifyBlockDisconnected notifies websocket clients that have registered for block updates
368 // when a block is disconnected from the main chain (due to a reorganize).
369 func (*WSNotificationManager) notifyBlockDisconnected(clients map[chan struct{}]*WSClient, block *types.Block) {
370 resp := NewWSResponse(NTRawBlockDisconnected.String(), block, nil)
371 marshalledJSON, err := json.Marshal(resp)
373 log.WithField("error", err).Error("Failed to marshal block Disconnected notification")
377 for _, wsc := range clients {
378 wsc.QueueNotification(marshalledJSON)
382 // RegisterNewMempoolTxsUpdates requests notifications to the passed websocket
383 // client when new transactions are added to the memory pool.
384 func (m *WSNotificationManager) RegisterNewMempoolTxsUpdates(wsc *WSClient) {
385 m.queueNotification <- (*notificationRegisterNewMempoolTxs)(wsc)
388 // UnregisterNewMempoolTxsUpdates removes notifications to the passed websocket
389 // client when new transaction are added to the memory pool.
390 func (m *WSNotificationManager) UnregisterNewMempoolTxsUpdates(wsc *WSClient) {
391 m.queueNotification <- (*notificationUnregisterNewMempoolTxs)(wsc)
394 // notifyForNewTx notifies websocket clients that have registered for updates
395 // when a new transaction is added to the memory pool.
396 func (m *WSNotificationManager) notifyForNewTx(clients map[chan struct{}]*WSClient, txDesc *protocol.TxDesc) {
397 resp := NewWSResponse(NTNewTransaction.String(), txDesc, nil)
398 marshalledJSON, err := json.Marshal(resp)
400 log.WithFields(log.Fields{"module": logModule, "error": err}).Error("Failed to marshal tx notification")
404 for _, wsc := range clients {
405 wsc.QueueNotification(marshalledJSON)
409 // AddClient adds the passed websocket client to the notification manager.
410 func (m *WSNotificationManager) AddClient(wsc *WSClient) {
411 m.queueNotification <- (*notificationRegisterClient)(wsc)
414 // RemoveClient removes the passed websocket client and all notifications registered for it.
415 func (m *WSNotificationManager) RemoveClient(wsc *WSClient) {
417 case m.queueNotification <- (*notificationUnregisterClient)(wsc):
422 func (m *WSNotificationManager) blockNotify() {
431 for !m.chain.InMainChain(m.status.BestHash) {
432 block, err := m.chain.GetBlockByHash(&m.status.BestHash)
434 log.WithFields(log.Fields{"module": logModule, "err": err}).Error("blockNotify GetBlockByHash")
438 m.sendNotification(NTRawBlockDisconnected, block)
441 block, _ := m.chain.GetBlockByHeight(m.status.BestHeight + 1)
447 if m.status.BestHash != block.PreviousBlockHash {
448 log.WithFields(log.Fields{"module": logModule, "blockHeight": block.Height, "previousBlockHash": m.status.BestHash, "rcvBlockPrevHash": block.PreviousBlockHash}).Warning("The previousBlockHash of the received block is not the same as the hash of the previous block")
452 m.sendNotification(NTRawBlockConnected, block)
457 func (m *WSNotificationManager) blockWaiter() {
459 case <-m.chain.BlockWaiter(m.status.BestHeight + 1):
464 // Start starts the goroutines required for the manager to queue and process websocket client notifications.
465 func (m *WSNotificationManager) Start() error {
467 m.txMsgSub, err = m.eventDispatcher.Subscribe(protocol.TxMsgEvent{})
475 go m.notificationHandler()
476 go m.memPoolTxQueryLoop()
480 // WaitForShutdown blocks until all notification manager goroutines have finished.
481 func (m *WSNotificationManager) WaitForShutdown() {
485 // Shutdown shuts down the manager, stopping the notification queue and notification handler goroutines.
486 func (m *WSNotificationManager) Shutdown() {