OSDN Git Service

add status fail for websocket nofify new transaction (#1582)
[bytom/bytom.git] / net / websocket / wsnotificationmaneger.go
1 package websocket
2
3 import (
4         "encoding/json"
5         "fmt"
6         "sync"
7
8         log "github.com/sirupsen/logrus"
9
10         "github.com/bytom/protocol"
11         "github.com/bytom/protocol/bc"
12         "github.com/bytom/protocol/bc/types"
13 )
14
15 // Notification types
16 type notificationBlockConnected types.Block
17 type notificationBlockDisconnected types.Block
18 type notificationTxDescAcceptedByMempool protocol.TxDesc
19
20 // Notification control requests
21 type notificationRegisterClient WSClient
22 type notificationUnregisterClient WSClient
23 type notificationRegisterBlocks WSClient
24 type notificationUnregisterBlocks WSClient
25 type notificationRegisterNewMempoolTxs WSClient
26 type notificationUnregisterNewMempoolTxs WSClient
27
28 // NotificationType represents the type of a notification message.
29 type NotificationType int
30
31 // Constants for the type of a notification message.
32 const (
33         // NTBlockConnected indicates the associated block was connected to the main chain.
34         NTRawBlockConnected NotificationType = iota
35         // NTBlockDisconnected indicates the associated block was disconnected  from the main chain.
36         NTRawBlockDisconnected
37         NTNewTransaction
38         NTRequestStatus
39 )
40
41 // notificationTypeStrings is a map of notification types back to their constant
42 // names for pretty printing.
43 var notificationTypeStrings = map[NotificationType]string{
44         NTRawBlockConnected:    "raw_blocks_connected",
45         NTRawBlockDisconnected: "raw_blocks_disconnected",
46         NTNewTransaction:       "new_transaction",
47         NTRequestStatus:        "request_status",
48 }
49
50 // String returns the NotificationType in human-readable form.
51 func (n NotificationType) String() string {
52         if s, ok := notificationTypeStrings[n]; ok {
53                 return s
54         }
55         return fmt.Sprintf("Unknown Notification Type (%d)", int(n))
56 }
57
58 type statusInfo struct {
59         BestHeight uint64
60         BestHash   bc.Hash
61 }
62
63 // WSNotificationManager is a connection and notification manager used for
64 // websockets.  It allows websocket clients to register for notifications they
65 // are interested in.  When an event happens elsewhere in the code such as
66 // transactions being added to the memory pool or block connects/disconnects,
67 // the notification manager is provided with the relevant details needed to
68 // figure out which websocket clients need to be notified based on what they
69 // have registered for and notifies them accordingly.  It is also used to keep
70 // track of all connected websocket clients.
71 type WSNotificationManager struct {
72         // queueNotification queues a notification for handling.
73         queueNotification chan interface{}
74
75         // notificationMsgs feeds notificationHandler with notifications
76         // and client (un)registeration requests from a queue as well as
77         // registeration and unregisteration requests from clients.
78         notificationMsgs chan interface{}
79
80         // Access channel for current number of connected clients.
81         numClients chan int
82
83         // Shutdown handling
84         wg                   sync.WaitGroup
85         quit                 chan struct{}
86         MaxNumWebsockets     int
87         maxNumConcurrentReqs int
88         status               statusInfo
89         chain                *protocol.Chain
90 }
91
92 // NewWsNotificationManager returns a new notification manager ready for use. See WSNotificationManager for more details.
93 func NewWsNotificationManager(maxNumWebsockets int, maxNumConcurrentReqs int, chain *protocol.Chain) *WSNotificationManager {
94         // init status
95         var status statusInfo
96         header := chain.BestBlockHeader()
97         status.BestHeight = header.Height
98         status.BestHash = header.Hash()
99
100         return &WSNotificationManager{
101                 queueNotification:    make(chan interface{}),
102                 notificationMsgs:     make(chan interface{}),
103                 numClients:           make(chan int),
104                 quit:                 make(chan struct{}),
105                 MaxNumWebsockets:     maxNumWebsockets,
106                 maxNumConcurrentReqs: maxNumConcurrentReqs,
107                 status:               status,
108                 chain:                chain,
109         }
110 }
111
112 // queueHandler manages a queue of empty interfaces, reading from in and
113 // sending the oldest unsent to out.  This handler stops when either of the
114 // in or quit channels are closed, and closes out before returning, without
115 // waiting to send any variables still remaining in the queue.
116 func queueHandler(in <-chan interface{}, out chan<- interface{}, quit <-chan struct{}) {
117         var (
118                 q       []interface{}
119                 next    interface{}
120                 dequeue chan<- interface{}
121         )
122
123         skipQueue := out
124
125 out:
126         for {
127                 select {
128                 case n, ok := <-in:
129                         if !ok {
130                                 // Sender closed input channel.
131                                 break out
132                         }
133
134                         // Either send to out immediately if skipQueue is
135                         // non-nil (queue is empty) and reader is ready,
136                         // or append to the queue and send later.
137                         select {
138                         case skipQueue <- n:
139
140                         default:
141                                 q = append(q, n)
142                                 dequeue = out
143                                 skipQueue = nil
144                                 next = q[0]
145                         }
146
147                 case dequeue <- next:
148                         copy(q, q[1:])
149                         q[len(q)-1] = nil // avoid leak
150                         q = q[:len(q)-1]
151                         if len(q) == 0 {
152                                 dequeue = nil
153                                 skipQueue = out
154                         } else {
155                                 next = q[0]
156                         }
157
158                 case <-quit:
159                         break out
160                 }
161         }
162         close(out)
163 }
164
165 func (m *WSNotificationManager) sendNotification(typ NotificationType, data interface{}) {
166         switch typ {
167         case NTRawBlockConnected:
168                 block, ok := data.(*types.Block)
169                 if !ok {
170                         log.WithField("module", logModule).Error("Chain connected notification is not a block")
171                         break
172                 }
173                 m.status.BestHeight = block.Height
174                 m.status.BestHash = block.Hash()
175                 // Notify registered websocket clients of incoming block.
176                 m.NotifyBlockConnected(block)
177
178         case NTRawBlockDisconnected:
179                 block, ok := data.(*types.Block)
180                 if !ok {
181                         log.WithField("module", logModule).Error("Chain disconnected notification is not a block")
182                         break
183                 }
184                 m.status.BestHeight = block.Height - 1
185                 m.status.BestHash = block.PreviousBlockHash
186                 // Notify registered websocket clients.
187                 m.NotifyBlockDisconnected(block)
188         }
189 }
190
191 // queueHandler maintains a queue of notifications and notification handler
192 // control messages.
193 func (m *WSNotificationManager) queueHandler() {
194         queueHandler(m.queueNotification, m.notificationMsgs, m.quit)
195         m.wg.Done()
196 }
197
198 // NotifyBlockConnected passes a block newly-connected to the best chain
199 // to the notification manager for block and transaction notification
200 // processing.
201 func (m *WSNotificationManager) NotifyBlockConnected(block *types.Block) {
202         select {
203         case m.queueNotification <- (*notificationBlockConnected)(block):
204         case <-m.quit:
205         }
206 }
207
208 // NotifyBlockDisconnected passes a block disconnected from the best chain
209 // to the notification manager for block notification processing.
210 func (m *WSNotificationManager) NotifyBlockDisconnected(block *types.Block) {
211         select {
212         case m.queueNotification <- (*notificationBlockDisconnected)(block):
213         case <-m.quit:
214         }
215 }
216
217 // NotifyMempoolTx passes a transaction desc accepted by mempool to the
218 // notification manager for transaction notification processing.  If
219 // isNew is true, the tx is is a new transaction, rather than one
220 // added to the mempool during a reorg.
221 func (m *WSNotificationManager) NotifyMempoolTx(txDesc *protocol.TxDesc) {
222         select {
223         case m.queueNotification <- (*notificationTxDescAcceptedByMempool)(txDesc):
224         case <-m.quit:
225         }
226 }
227
228 // notificationHandler reads notifications and control messages from the queue handler and processes one at a time.
229 func (m *WSNotificationManager) notificationHandler() {
230         // clients is a map of all currently connected websocket clients.
231         clients := make(map[chan struct{}]*WSClient)
232         blockNotifications := make(map[chan struct{}]*WSClient)
233         txNotifications := make(map[chan struct{}]*WSClient)
234
235 out:
236         for {
237                 select {
238                 case n, ok := <-m.notificationMsgs:
239                         if !ok {
240                                 break out
241                         }
242                         switch n := n.(type) {
243                         case *notificationBlockConnected:
244                                 block := (*types.Block)(n)
245                                 if len(blockNotifications) != 0 {
246                                         m.notifyBlockConnected(blockNotifications, block)
247                                 }
248
249                         case *notificationBlockDisconnected:
250                                 block := (*types.Block)(n)
251                                 if len(blockNotifications) != 0 {
252                                         m.notifyBlockDisconnected(blockNotifications, block)
253                                 }
254
255                         case *notificationTxDescAcceptedByMempool:
256                                 txDesc := (*protocol.TxDesc)(n)
257                                 if len(txNotifications) != 0 {
258                                         m.notifyForNewTx(txNotifications, txDesc)
259                                 }
260
261                         case *notificationRegisterBlocks:
262                                 wsc := (*WSClient)(n)
263                                 blockNotifications[wsc.quit] = wsc
264
265                         case *notificationUnregisterBlocks:
266                                 wsc := (*WSClient)(n)
267                                 delete(blockNotifications, wsc.quit)
268
269                         case *notificationRegisterNewMempoolTxs:
270                                 wsc := (*WSClient)(n)
271                                 txNotifications[wsc.quit] = wsc
272
273                         case *notificationUnregisterNewMempoolTxs:
274                                 wsc := (*WSClient)(n)
275                                 delete(txNotifications, wsc.quit)
276
277                         case *notificationRegisterClient:
278                                 wsc := (*WSClient)(n)
279                                 clients[wsc.quit] = wsc
280
281                         case *notificationUnregisterClient:
282                                 wsc := (*WSClient)(n)
283                                 delete(blockNotifications, wsc.quit)
284                                 delete(txNotifications, wsc.quit)
285                                 delete(clients, wsc.quit)
286
287                         default:
288                                 log.Warnf("Unhandled notification type")
289                         }
290
291                 case m.numClients <- len(clients):
292
293                 case <-m.quit:
294                         break out
295                 }
296         }
297
298         for _, c := range clients {
299                 c.Disconnect()
300         }
301         m.wg.Done()
302 }
303
304 // NumClients returns the number of clients actively being served.
305 func (m *WSNotificationManager) NumClients() (n int) {
306         select {
307         case n = <-m.numClients:
308         case <-m.quit:
309         }
310         return
311 }
312
313 // IsMaxConnect returns whether the maximum connection is exceeded
314 func (m *WSNotificationManager) IsMaxConnect() bool {
315         return m.NumClients() >= m.MaxNumWebsockets
316 }
317
318 // RegisterBlockUpdates requests block update notifications to the passed websocket client.
319 func (m *WSNotificationManager) RegisterBlockUpdates(wsc *WSClient) {
320         m.queueNotification <- (*notificationRegisterBlocks)(wsc)
321 }
322
323 // UnregisterBlockUpdates removes block update notifications for the passed websocket client.
324 func (m *WSNotificationManager) UnregisterBlockUpdates(wsc *WSClient) {
325         m.queueNotification <- (*notificationUnregisterBlocks)(wsc)
326 }
327
328 // notifyBlockConnected notifies websocket clients that have registered for block updates when a block is connected to the main chain.
329 func (*WSNotificationManager) notifyBlockConnected(clients map[chan struct{}]*WSClient, block *types.Block) {
330         resp := NewWSResponse(NTRawBlockConnected.String(), block, nil)
331         marshalledJSON, err := json.Marshal(resp)
332         if err != nil {
333                 log.WithFields(log.Fields{"module": logModule, "error": err}).Error("Failed to marshal block connected notification")
334                 return
335         }
336
337         for _, wsc := range clients {
338                 wsc.QueueNotification(marshalledJSON)
339         }
340 }
341
342 // notifyBlockDisconnected notifies websocket clients that have registered for block updates
343 // when a block is disconnected from the main chain (due to a reorganize).
344 func (*WSNotificationManager) notifyBlockDisconnected(clients map[chan struct{}]*WSClient, block *types.Block) {
345         resp := NewWSResponse(NTRawBlockDisconnected.String(), block, nil)
346         marshalledJSON, err := json.Marshal(resp)
347         if err != nil {
348                 log.WithField("error", err).Error("Failed to marshal block Disconnected notification")
349                 return
350         }
351
352         for _, wsc := range clients {
353                 wsc.QueueNotification(marshalledJSON)
354         }
355 }
356
357 // RegisterNewMempoolTxsUpdates requests notifications to the passed websocket
358 // client when new transactions are added to the memory pool.
359 func (m *WSNotificationManager) RegisterNewMempoolTxsUpdates(wsc *WSClient) {
360         m.queueNotification <- (*notificationRegisterNewMempoolTxs)(wsc)
361 }
362
363 // UnregisterNewMempoolTxsUpdates removes notifications to the passed websocket
364 // client when new transaction are added to the memory pool.
365 func (m *WSNotificationManager) UnregisterNewMempoolTxsUpdates(wsc *WSClient) {
366         m.queueNotification <- (*notificationUnregisterNewMempoolTxs)(wsc)
367 }
368
369 // notifyForNewTx notifies websocket clients that have registered for updates
370 // when a new transaction is added to the memory pool.
371 func (m *WSNotificationManager) notifyForNewTx(clients map[chan struct{}]*WSClient, txDesc *protocol.TxDesc) {
372         resp := NewWSResponse(NTNewTransaction.String(), txDesc, nil)
373         marshalledJSON, err := json.Marshal(resp)
374         if err != nil {
375                 log.WithFields(log.Fields{"module": logModule, "error": err}).Error("Failed to marshal tx notification")
376                 return
377         }
378
379         for _, wsc := range clients {
380                 wsc.QueueNotification(marshalledJSON)
381         }
382 }
383
384 // AddClient adds the passed websocket client to the notification manager.
385 func (m *WSNotificationManager) AddClient(wsc *WSClient) {
386         m.queueNotification <- (*notificationRegisterClient)(wsc)
387 }
388
389 // RemoveClient removes the passed websocket client and all notifications registered for it.
390 func (m *WSNotificationManager) RemoveClient(wsc *WSClient) {
391         select {
392         case m.queueNotification <- (*notificationUnregisterClient)(wsc):
393         case <-m.quit:
394         }
395 }
396
397 func (m *WSNotificationManager) blockNotify() {
398 out:
399         for {
400                 select {
401                 case <-m.quit:
402                         break out
403
404                 default:
405                 }
406                 for !m.chain.InMainChain(m.status.BestHash) {
407                         block, err := m.chain.GetBlockByHash(&m.status.BestHash)
408                         if err != nil {
409                                 log.WithFields(log.Fields{"module": logModule, "err": err}).Error("blockNotify GetBlockByHash")
410                                 return
411                         }
412
413                         m.sendNotification(NTRawBlockDisconnected, block)
414                 }
415
416                 block, _ := m.chain.GetBlockByHeight(m.status.BestHeight + 1)
417                 if block == nil {
418                         m.blockWaiter()
419                         continue
420                 }
421
422                 if m.status.BestHash != block.PreviousBlockHash {
423                         log.WithFields(log.Fields{"module": logModule, "blockHeight": block.Height, "previousBlockHash": m.status.BestHash, "rcvBlockPrevHash": block.PreviousBlockHash}).Warning("The previousBlockHash of the received block is not the same as the hash of the previous block")
424                         continue
425                 }
426
427                 m.sendNotification(NTRawBlockConnected, block)
428         }
429         m.wg.Done()
430 }
431
432 func (m *WSNotificationManager) blockWaiter() {
433         select {
434         case <-m.chain.BlockWaiter(m.status.BestHeight + 1):
435         case <-m.quit:
436         }
437 }
438
439 // Start starts the goroutines required for the manager to queue and process websocket client notifications.
440 func (m *WSNotificationManager) Start() {
441         m.wg.Add(3)
442         go m.blockNotify()
443         go m.queueHandler()
444         go m.notificationHandler()
445 }
446
447 // WaitForShutdown blocks until all notification manager goroutines have finished.
448 func (m *WSNotificationManager) WaitForShutdown() {
449         m.wg.Wait()
450 }
451
452 // Shutdown shuts down the manager, stopping the notification queue and notification handler goroutines.
453 func (m *WSNotificationManager) Shutdown() {
454         close(m.quit)
455 }