8 log "github.com/sirupsen/logrus"
10 "github.com/bytom/protocol"
11 "github.com/bytom/protocol/bc"
12 "github.com/bytom/protocol/bc/types"
16 type notificationBlockConnected types.Block
17 type notificationBlockDisconnected types.Block
18 type notificationTxDescAcceptedByMempool protocol.TxDesc
20 // Notification control requests
21 type notificationRegisterClient WSClient
22 type notificationUnregisterClient WSClient
23 type notificationRegisterBlocks WSClient
24 type notificationUnregisterBlocks WSClient
25 type notificationRegisterNewMempoolTxs WSClient
26 type notificationUnregisterNewMempoolTxs WSClient
28 // NotificationType represents the type of a notification message.
29 type NotificationType int
31 // Constants for the type of a notification message.
33 // NTBlockConnected indicates the associated block was connected to the main chain.
34 NTRawBlockConnected NotificationType = iota
35 // NTBlockDisconnected indicates the associated block was disconnected from the main chain.
36 NTRawBlockDisconnected
41 // notificationTypeStrings is a map of notification types back to their constant
42 // names for pretty printing.
43 var notificationTypeStrings = map[NotificationType]string{
44 NTRawBlockConnected: "raw_blocks_connected",
45 NTRawBlockDisconnected: "raw_blocks_disconnected",
46 NTNewTransaction: "new_transaction",
47 NTRequestStatus: "request_status",
50 // String returns the NotificationType in human-readable form.
51 func (n NotificationType) String() string {
52 if s, ok := notificationTypeStrings[n]; ok {
55 return fmt.Sprintf("Unknown Notification Type (%d)", int(n))
58 type statusInfo struct {
63 // WSNotificationManager is a connection and notification manager used for
64 // websockets. It allows websocket clients to register for notifications they
65 // are interested in. When an event happens elsewhere in the code such as
66 // transactions being added to the memory pool or block connects/disconnects,
67 // the notification manager is provided with the relevant details needed to
68 // figure out which websocket clients need to be notified based on what they
69 // have registered for and notifies them accordingly. It is also used to keep
70 // track of all connected websocket clients.
71 type WSNotificationManager struct {
72 // queueNotification queues a notification for handling.
73 queueNotification chan interface{}
75 // notificationMsgs feeds notificationHandler with notifications
76 // and client (un)registeration requests from a queue as well as
77 // registeration and unregisteration requests from clients.
78 notificationMsgs chan interface{}
80 // Access channel for current number of connected clients.
87 maxNumConcurrentReqs int
92 // NewWsNotificationManager returns a new notification manager ready for use. See WSNotificationManager for more details.
93 func NewWsNotificationManager(maxNumWebsockets int, maxNumConcurrentReqs int, chain *protocol.Chain) *WSNotificationManager {
96 header := chain.BestBlockHeader()
97 status.BestHeight = header.Height
98 status.BestHash = header.Hash()
100 return &WSNotificationManager{
101 queueNotification: make(chan interface{}),
102 notificationMsgs: make(chan interface{}),
103 numClients: make(chan int),
104 quit: make(chan struct{}),
105 MaxNumWebsockets: maxNumWebsockets,
106 maxNumConcurrentReqs: maxNumConcurrentReqs,
112 // queueHandler manages a queue of empty interfaces, reading from in and
113 // sending the oldest unsent to out. This handler stops when either of the
114 // in or quit channels are closed, and closes out before returning, without
115 // waiting to send any variables still remaining in the queue.
116 func queueHandler(in <-chan interface{}, out chan<- interface{}, quit <-chan struct{}) {
120 dequeue chan<- interface{}
130 // Sender closed input channel.
134 // Either send to out immediately if skipQueue is
135 // non-nil (queue is empty) and reader is ready,
136 // or append to the queue and send later.
147 case dequeue <- next:
149 q[len(q)-1] = nil // avoid leak
165 func (m *WSNotificationManager) sendNotification(typ NotificationType, data interface{}) {
167 case NTRawBlockConnected:
168 block, ok := data.(*types.Block)
170 log.WithField("module", logModule).Error("Chain connected notification is not a block")
173 m.status.BestHeight = block.Height
174 m.status.BestHash = block.Hash()
175 // Notify registered websocket clients of incoming block.
176 m.NotifyBlockConnected(block)
178 case NTRawBlockDisconnected:
179 block, ok := data.(*types.Block)
181 log.WithField("module", logModule).Error("Chain disconnected notification is not a block")
184 m.status.BestHeight = block.Height - 1
185 m.status.BestHash = block.PreviousBlockHash
186 // Notify registered websocket clients.
187 m.NotifyBlockDisconnected(block)
191 // queueHandler maintains a queue of notifications and notification handler
193 func (m *WSNotificationManager) queueHandler() {
194 queueHandler(m.queueNotification, m.notificationMsgs, m.quit)
198 // NotifyBlockConnected passes a block newly-connected to the best chain
199 // to the notification manager for block and transaction notification
201 func (m *WSNotificationManager) NotifyBlockConnected(block *types.Block) {
203 case m.queueNotification <- (*notificationBlockConnected)(block):
208 // NotifyBlockDisconnected passes a block disconnected from the best chain
209 // to the notification manager for block notification processing.
210 func (m *WSNotificationManager) NotifyBlockDisconnected(block *types.Block) {
212 case m.queueNotification <- (*notificationBlockDisconnected)(block):
217 // NotifyMempoolTx passes a transaction desc accepted by mempool to the
218 // notification manager for transaction notification processing. If
219 // isNew is true, the tx is is a new transaction, rather than one
220 // added to the mempool during a reorg.
221 func (m *WSNotificationManager) NotifyMempoolTx(txDesc *protocol.TxDesc) {
223 case m.queueNotification <- (*notificationTxDescAcceptedByMempool)(txDesc):
228 // notificationHandler reads notifications and control messages from the queue handler and processes one at a time.
229 func (m *WSNotificationManager) notificationHandler() {
230 // clients is a map of all currently connected websocket clients.
231 clients := make(map[chan struct{}]*WSClient)
232 blockNotifications := make(map[chan struct{}]*WSClient)
233 txNotifications := make(map[chan struct{}]*WSClient)
238 case n, ok := <-m.notificationMsgs:
242 switch n := n.(type) {
243 case *notificationBlockConnected:
244 block := (*types.Block)(n)
245 if len(blockNotifications) != 0 {
246 m.notifyBlockConnected(blockNotifications, block)
249 case *notificationBlockDisconnected:
250 block := (*types.Block)(n)
251 if len(blockNotifications) != 0 {
252 m.notifyBlockDisconnected(blockNotifications, block)
255 case *notificationTxDescAcceptedByMempool:
256 txDesc := (*protocol.TxDesc)(n)
257 if len(txNotifications) != 0 {
258 m.notifyForNewTx(txNotifications, txDesc)
261 case *notificationRegisterBlocks:
262 wsc := (*WSClient)(n)
263 blockNotifications[wsc.quit] = wsc
265 case *notificationUnregisterBlocks:
266 wsc := (*WSClient)(n)
267 delete(blockNotifications, wsc.quit)
269 case *notificationRegisterNewMempoolTxs:
270 wsc := (*WSClient)(n)
271 txNotifications[wsc.quit] = wsc
273 case *notificationUnregisterNewMempoolTxs:
274 wsc := (*WSClient)(n)
275 delete(txNotifications, wsc.quit)
277 case *notificationRegisterClient:
278 wsc := (*WSClient)(n)
279 clients[wsc.quit] = wsc
281 case *notificationUnregisterClient:
282 wsc := (*WSClient)(n)
283 delete(blockNotifications, wsc.quit)
284 delete(txNotifications, wsc.quit)
285 delete(clients, wsc.quit)
288 log.Warnf("Unhandled notification type")
291 case m.numClients <- len(clients):
298 for _, c := range clients {
304 // NumClients returns the number of clients actively being served.
305 func (m *WSNotificationManager) NumClients() (n int) {
307 case n = <-m.numClients:
313 // IsMaxConnect returns whether the maximum connection is exceeded
314 func (m *WSNotificationManager) IsMaxConnect() bool {
315 return m.NumClients() >= m.MaxNumWebsockets
318 // RegisterBlockUpdates requests block update notifications to the passed websocket client.
319 func (m *WSNotificationManager) RegisterBlockUpdates(wsc *WSClient) {
320 m.queueNotification <- (*notificationRegisterBlocks)(wsc)
323 // UnregisterBlockUpdates removes block update notifications for the passed websocket client.
324 func (m *WSNotificationManager) UnregisterBlockUpdates(wsc *WSClient) {
325 m.queueNotification <- (*notificationUnregisterBlocks)(wsc)
328 // notifyBlockConnected notifies websocket clients that have registered for block updates when a block is connected to the main chain.
329 func (*WSNotificationManager) notifyBlockConnected(clients map[chan struct{}]*WSClient, block *types.Block) {
330 resp := NewWSResponse(NTRawBlockConnected.String(), block, nil)
331 marshalledJSON, err := json.Marshal(resp)
333 log.WithFields(log.Fields{"module": logModule, "error": err}).Error("Failed to marshal block connected notification")
337 for _, wsc := range clients {
338 wsc.QueueNotification(marshalledJSON)
342 // notifyBlockDisconnected notifies websocket clients that have registered for block updates
343 // when a block is disconnected from the main chain (due to a reorganize).
344 func (*WSNotificationManager) notifyBlockDisconnected(clients map[chan struct{}]*WSClient, block *types.Block) {
345 resp := NewWSResponse(NTRawBlockDisconnected.String(), block, nil)
346 marshalledJSON, err := json.Marshal(resp)
348 log.WithField("error", err).Error("Failed to marshal block Disconnected notification")
352 for _, wsc := range clients {
353 wsc.QueueNotification(marshalledJSON)
357 // RegisterNewMempoolTxsUpdates requests notifications to the passed websocket
358 // client when new transactions are added to the memory pool.
359 func (m *WSNotificationManager) RegisterNewMempoolTxsUpdates(wsc *WSClient) {
360 m.queueNotification <- (*notificationRegisterNewMempoolTxs)(wsc)
363 // UnregisterNewMempoolTxsUpdates removes notifications to the passed websocket
364 // client when new transaction are added to the memory pool.
365 func (m *WSNotificationManager) UnregisterNewMempoolTxsUpdates(wsc *WSClient) {
366 m.queueNotification <- (*notificationUnregisterNewMempoolTxs)(wsc)
369 // notifyForNewTx notifies websocket clients that have registered for updates
370 // when a new transaction is added to the memory pool.
371 func (m *WSNotificationManager) notifyForNewTx(clients map[chan struct{}]*WSClient, txDesc *protocol.TxDesc) {
372 resp := NewWSResponse(NTNewTransaction.String(), txDesc, nil)
373 marshalledJSON, err := json.Marshal(resp)
375 log.WithFields(log.Fields{"module": logModule, "error": err}).Error("Failed to marshal tx notification")
379 for _, wsc := range clients {
380 wsc.QueueNotification(marshalledJSON)
384 // AddClient adds the passed websocket client to the notification manager.
385 func (m *WSNotificationManager) AddClient(wsc *WSClient) {
386 m.queueNotification <- (*notificationRegisterClient)(wsc)
389 // RemoveClient removes the passed websocket client and all notifications registered for it.
390 func (m *WSNotificationManager) RemoveClient(wsc *WSClient) {
392 case m.queueNotification <- (*notificationUnregisterClient)(wsc):
397 func (m *WSNotificationManager) blockNotify() {
406 for !m.chain.InMainChain(m.status.BestHash) {
407 block, err := m.chain.GetBlockByHash(&m.status.BestHash)
409 log.WithFields(log.Fields{"module": logModule, "err": err}).Error("blockNotify GetBlockByHash")
413 m.sendNotification(NTRawBlockDisconnected, block)
416 block, _ := m.chain.GetBlockByHeight(m.status.BestHeight + 1)
422 if m.status.BestHash != block.PreviousBlockHash {
423 log.WithFields(log.Fields{"module": logModule, "blockHeight": block.Height, "previousBlockHash": m.status.BestHash, "rcvBlockPrevHash": block.PreviousBlockHash}).Warning("The previousBlockHash of the received block is not the same as the hash of the previous block")
427 m.sendNotification(NTRawBlockConnected, block)
432 func (m *WSNotificationManager) blockWaiter() {
434 case <-m.chain.BlockWaiter(m.status.BestHeight + 1):
439 // Start starts the goroutines required for the manager to queue and process websocket client notifications.
440 func (m *WSNotificationManager) Start() {
444 go m.notificationHandler()
447 // WaitForShutdown blocks until all notification manager goroutines have finished.
448 func (m *WSNotificationManager) WaitForShutdown() {
452 // Shutdown shuts down the manager, stopping the notification queue and notification handler goroutines.
453 func (m *WSNotificationManager) Shutdown() {