OSDN Git Service

fix the regular sync dust block problem (#400)
[bytom/vapor.git] / net / websocket / wsnotificationmaneger.go
1 package websocket
2
3 import (
4         "encoding/json"
5         "fmt"
6         "sync"
7
8         log "github.com/sirupsen/logrus"
9
10         "github.com/vapor/event"
11         "github.com/vapor/protocol"
12         "github.com/vapor/protocol/bc"
13         "github.com/vapor/protocol/bc/types"
14 )
15
16 // Notification types
17 type notificationBlockConnected types.Block
18 type notificationBlockDisconnected types.Block
19 type notificationTxDescAcceptedByMempool protocol.TxDesc
20
21 // Notification control requests
22 type notificationRegisterClient WSClient
23 type notificationUnregisterClient WSClient
24 type notificationRegisterBlocks WSClient
25 type notificationUnregisterBlocks WSClient
26 type notificationRegisterNewMempoolTxs WSClient
27 type notificationUnregisterNewMempoolTxs WSClient
28
29 // NotificationType represents the type of a notification message.
30 type NotificationType int
31
32 // Constants for the type of a notification message.
33 const (
34         // NTBlockConnected indicates the associated block was connected to the main chain.
35         NTRawBlockConnected NotificationType = iota
36         // NTBlockDisconnected indicates the associated block was disconnected  from the main chain.
37         NTRawBlockDisconnected
38         NTNewTransaction
39         NTRequestStatus
40 )
41
42 // notificationTypeStrings is a map of notification types back to their constant
43 // names for pretty printing.
44 var notificationTypeStrings = map[NotificationType]string{
45         NTRawBlockConnected:    "raw_blocks_connected",
46         NTRawBlockDisconnected: "raw_blocks_disconnected",
47         NTNewTransaction:       "new_transaction",
48         NTRequestStatus:        "request_status",
49 }
50
51 // String returns the NotificationType in human-readable form.
52 func (n NotificationType) String() string {
53         if s, ok := notificationTypeStrings[n]; ok {
54                 return s
55         }
56         return fmt.Sprintf("Unknown Notification Type (%d)", int(n))
57 }
58
59 type statusInfo struct {
60         BestHeight uint64
61         BestHash   bc.Hash
62 }
63
64 // WSNotificationManager is a connection and notification manager used for
65 // websockets.  It allows websocket clients to register for notifications they
66 // are interested in.  When an event happens elsewhere in the code such as
67 // transactions being added to the memory pool or block connects/disconnects,
68 // the notification manager is provided with the relevant details needed to
69 // figure out which websocket clients need to be notified based on what they
70 // have registered for and notifies them accordingly.  It is also used to keep
71 // track of all connected websocket clients.
72 type WSNotificationManager struct {
73         // queueNotification queues a notification for handling.
74         queueNotification chan interface{}
75
76         // notificationMsgs feeds notificationHandler with notifications
77         // and client (un)registeration requests from a queue as well as
78         // registeration and unregisteration requests from clients.
79         notificationMsgs chan interface{}
80
81         // Access channel for current number of connected clients.
82         numClients chan int
83
84         // Shutdown handling
85         wg                   sync.WaitGroup
86         quit                 chan struct{}
87         MaxNumWebsockets     int
88         maxNumConcurrentReqs int
89         status               statusInfo
90         chain                *protocol.Chain
91         eventDispatcher      *event.Dispatcher
92         txMsgSub             *event.Subscription
93 }
94
95 // NewWsNotificationManager returns a new notification manager ready for use. See WSNotificationManager for more details.
96 func NewWsNotificationManager(maxNumWebsockets int, maxNumConcurrentReqs int, chain *protocol.Chain, dispatcher *event.Dispatcher) *WSNotificationManager {
97         // init status
98         var status statusInfo
99         header := chain.BestBlockHeader()
100         status.BestHeight = header.Height
101         status.BestHash = header.Hash()
102
103         return &WSNotificationManager{
104                 queueNotification:    make(chan interface{}),
105                 notificationMsgs:     make(chan interface{}),
106                 numClients:           make(chan int),
107                 quit:                 make(chan struct{}),
108                 MaxNumWebsockets:     maxNumWebsockets,
109                 maxNumConcurrentReqs: maxNumConcurrentReqs,
110                 status:               status,
111                 chain:                chain,
112                 eventDispatcher:      dispatcher,
113         }
114 }
115
116 // queueHandler manages a queue of empty interfaces, reading from in and
117 // sending the oldest unsent to out.  This handler stops when either of the
118 // in or quit channels are closed, and closes out before returning, without
119 // waiting to send any variables still remaining in the queue.
120 func queueHandler(in <-chan interface{}, out chan<- interface{}, quit <-chan struct{}) {
121         var (
122                 q       []interface{}
123                 next    interface{}
124                 dequeue chan<- interface{}
125         )
126
127         skipQueue := out
128
129 out:
130         for {
131                 select {
132                 case n, ok := <-in:
133                         if !ok {
134                                 // Sender closed input channel.
135                                 break out
136                         }
137
138                         // Either send to out immediately if skipQueue is
139                         // non-nil (queue is empty) and reader is ready,
140                         // or append to the queue and send later.
141                         select {
142                         case skipQueue <- n:
143
144                         default:
145                                 q = append(q, n)
146                                 dequeue = out
147                                 skipQueue = nil
148                                 next = q[0]
149                         }
150
151                 case dequeue <- next:
152                         copy(q, q[1:])
153                         q[len(q)-1] = nil // avoid leak
154                         q = q[:len(q)-1]
155                         if len(q) == 0 {
156                                 dequeue = nil
157                                 skipQueue = out
158                         } else {
159                                 next = q[0]
160                         }
161
162                 case <-quit:
163                         break out
164                 }
165         }
166         close(out)
167 }
168
169 func (m *WSNotificationManager) sendNotification(typ NotificationType, data interface{}) {
170         switch typ {
171         case NTRawBlockConnected:
172                 block, ok := data.(*types.Block)
173                 if !ok {
174                         log.WithField("module", logModule).Error("Chain connected notification is not a block")
175                         break
176                 }
177                 m.status.BestHeight = block.Height
178                 m.status.BestHash = block.Hash()
179                 // Notify registered websocket clients of incoming block.
180                 m.NotifyBlockConnected(block)
181
182         case NTRawBlockDisconnected:
183                 block, ok := data.(*types.Block)
184                 if !ok {
185                         log.WithField("module", logModule).Error("Chain disconnected notification is not a block")
186                         break
187                 }
188                 m.status.BestHeight = block.Height - 1
189                 m.status.BestHash = block.PreviousBlockHash
190                 // Notify registered websocket clients.
191                 m.NotifyBlockDisconnected(block)
192         }
193 }
194
195 // queueHandler maintains a queue of notifications and notification handler
196 // control messages.
197 func (m *WSNotificationManager) queueHandler() {
198         queueHandler(m.queueNotification, m.notificationMsgs, m.quit)
199         m.wg.Done()
200 }
201
202 // NotifyBlockConnected passes a block newly-connected to the best chain
203 // to the notification manager for block and transaction notification
204 // processing.
205 func (m *WSNotificationManager) NotifyBlockConnected(block *types.Block) {
206         select {
207         case m.queueNotification <- (*notificationBlockConnected)(block):
208         case <-m.quit:
209         }
210 }
211
212 // NotifyBlockDisconnected passes a block disconnected from the best chain
213 // to the notification manager for block notification processing.
214 func (m *WSNotificationManager) NotifyBlockDisconnected(block *types.Block) {
215         select {
216         case m.queueNotification <- (*notificationBlockDisconnected)(block):
217         case <-m.quit:
218         }
219 }
220
221 // memPoolTxQueryLoop constantly pass a transaction accepted by mempool to the
222 // notification manager for transaction notification processing.
223 func (m *WSNotificationManager) memPoolTxQueryLoop() {
224 out:
225         for {
226                 select {
227                 case obj, ok := <-m.txMsgSub.Chan():
228                         if !ok {
229                                 log.WithFields(log.Fields{"module": logModule}).Warning("tx pool tx msg subscription channel closed")
230                                 break out
231                         }
232
233                         ev, ok := obj.Data.(protocol.TxMsgEvent)
234                         if !ok {
235                                 log.WithFields(log.Fields{"module": logModule}).Error("event type error")
236                                 continue
237                         }
238
239                         if ev.TxMsg.MsgType == protocol.MsgNewTx {
240                                 select {
241                                 case m.queueNotification <- (*notificationTxDescAcceptedByMempool)(ev.TxMsg.TxDesc):
242                                 default:
243                                 }
244                         }
245                 case <-m.quit:
246                         break out
247                 }
248         }
249
250         m.wg.Done()
251 }
252
253 // notificationHandler reads notifications and control messages from the queue handler and processes one at a time.
254 func (m *WSNotificationManager) notificationHandler() {
255         // clients is a map of all currently connected websocket clients.
256         clients := make(map[chan struct{}]*WSClient)
257         blockNotifications := make(map[chan struct{}]*WSClient)
258         txNotifications := make(map[chan struct{}]*WSClient)
259
260 out:
261         for {
262                 select {
263                 case n, ok := <-m.notificationMsgs:
264                         if !ok {
265                                 break out
266                         }
267                         switch n := n.(type) {
268                         case *notificationBlockConnected:
269                                 block := (*types.Block)(n)
270                                 if len(blockNotifications) != 0 {
271                                         m.notifyBlockConnected(blockNotifications, block)
272                                 }
273
274                         case *notificationBlockDisconnected:
275                                 block := (*types.Block)(n)
276                                 if len(blockNotifications) != 0 {
277                                         m.notifyBlockDisconnected(blockNotifications, block)
278                                 }
279
280                         case *notificationTxDescAcceptedByMempool:
281                                 txDesc := (*protocol.TxDesc)(n)
282                                 if len(txNotifications) != 0 {
283                                         m.notifyForNewTx(txNotifications, txDesc)
284                                 }
285
286                         case *notificationRegisterBlocks:
287                                 wsc := (*WSClient)(n)
288                                 blockNotifications[wsc.quit] = wsc
289
290                         case *notificationUnregisterBlocks:
291                                 wsc := (*WSClient)(n)
292                                 delete(blockNotifications, wsc.quit)
293
294                         case *notificationRegisterNewMempoolTxs:
295                                 wsc := (*WSClient)(n)
296                                 txNotifications[wsc.quit] = wsc
297
298                         case *notificationUnregisterNewMempoolTxs:
299                                 wsc := (*WSClient)(n)
300                                 delete(txNotifications, wsc.quit)
301
302                         case *notificationRegisterClient:
303                                 wsc := (*WSClient)(n)
304                                 clients[wsc.quit] = wsc
305
306                         case *notificationUnregisterClient:
307                                 wsc := (*WSClient)(n)
308                                 delete(blockNotifications, wsc.quit)
309                                 delete(txNotifications, wsc.quit)
310                                 delete(clients, wsc.quit)
311
312                         default:
313                                 log.Warnf("Unhandled notification type")
314                         }
315
316                 case m.numClients <- len(clients):
317
318                 case <-m.quit:
319                         break out
320                 }
321         }
322
323         for _, c := range clients {
324                 c.Disconnect()
325         }
326         m.wg.Done()
327 }
328
329 // NumClients returns the number of clients actively being served.
330 func (m *WSNotificationManager) NumClients() (n int) {
331         select {
332         case n = <-m.numClients:
333         case <-m.quit:
334         }
335         return
336 }
337
338 // IsMaxConnect returns whether the maximum connection is exceeded
339 func (m *WSNotificationManager) IsMaxConnect() bool {
340         return m.NumClients() >= m.MaxNumWebsockets
341 }
342
343 // RegisterBlockUpdates requests block update notifications to the passed websocket client.
344 func (m *WSNotificationManager) RegisterBlockUpdates(wsc *WSClient) {
345         m.queueNotification <- (*notificationRegisterBlocks)(wsc)
346 }
347
348 // UnregisterBlockUpdates removes block update notifications for the passed websocket client.
349 func (m *WSNotificationManager) UnregisterBlockUpdates(wsc *WSClient) {
350         m.queueNotification <- (*notificationUnregisterBlocks)(wsc)
351 }
352
353 // notifyBlockConnected notifies websocket clients that have registered for block updates when a block is connected to the main chain.
354 func (*WSNotificationManager) notifyBlockConnected(clients map[chan struct{}]*WSClient, block *types.Block) {
355         resp := NewWSResponse(NTRawBlockConnected.String(), block, nil)
356         marshalledJSON, err := json.Marshal(resp)
357         if err != nil {
358                 log.WithFields(log.Fields{"module": logModule, "error": err}).Error("Failed to marshal block connected notification")
359                 return
360         }
361
362         for _, wsc := range clients {
363                 wsc.QueueNotification(marshalledJSON)
364         }
365 }
366
367 // notifyBlockDisconnected notifies websocket clients that have registered for block updates
368 // when a block is disconnected from the main chain (due to a reorganize).
369 func (*WSNotificationManager) notifyBlockDisconnected(clients map[chan struct{}]*WSClient, block *types.Block) {
370         resp := NewWSResponse(NTRawBlockDisconnected.String(), block, nil)
371         marshalledJSON, err := json.Marshal(resp)
372         if err != nil {
373                 log.WithField("error", err).Error("Failed to marshal block Disconnected notification")
374                 return
375         }
376
377         for _, wsc := range clients {
378                 wsc.QueueNotification(marshalledJSON)
379         }
380 }
381
382 // RegisterNewMempoolTxsUpdates requests notifications to the passed websocket
383 // client when new transactions are added to the memory pool.
384 func (m *WSNotificationManager) RegisterNewMempoolTxsUpdates(wsc *WSClient) {
385         m.queueNotification <- (*notificationRegisterNewMempoolTxs)(wsc)
386 }
387
388 // UnregisterNewMempoolTxsUpdates removes notifications to the passed websocket
389 // client when new transaction are added to the memory pool.
390 func (m *WSNotificationManager) UnregisterNewMempoolTxsUpdates(wsc *WSClient) {
391         m.queueNotification <- (*notificationUnregisterNewMempoolTxs)(wsc)
392 }
393
394 // notifyForNewTx notifies websocket clients that have registered for updates
395 // when a new transaction is added to the memory pool.
396 func (m *WSNotificationManager) notifyForNewTx(clients map[chan struct{}]*WSClient, txDesc *protocol.TxDesc) {
397         resp := NewWSResponse(NTNewTransaction.String(), txDesc, nil)
398         marshalledJSON, err := json.Marshal(resp)
399         if err != nil {
400                 log.WithFields(log.Fields{"module": logModule, "error": err}).Error("Failed to marshal tx notification")
401                 return
402         }
403
404         for _, wsc := range clients {
405                 wsc.QueueNotification(marshalledJSON)
406         }
407 }
408
409 // AddClient adds the passed websocket client to the notification manager.
410 func (m *WSNotificationManager) AddClient(wsc *WSClient) {
411         m.queueNotification <- (*notificationRegisterClient)(wsc)
412 }
413
414 // RemoveClient removes the passed websocket client and all notifications registered for it.
415 func (m *WSNotificationManager) RemoveClient(wsc *WSClient) {
416         select {
417         case m.queueNotification <- (*notificationUnregisterClient)(wsc):
418         case <-m.quit:
419         }
420 }
421
422 func (m *WSNotificationManager) blockNotify() {
423 out:
424         for {
425                 select {
426                 case <-m.quit:
427                         break out
428
429                 default:
430                 }
431                 for !m.chain.InMainChain(m.status.BestHash) {
432                         block, err := m.chain.GetBlockByHash(&m.status.BestHash)
433                         if err != nil {
434                                 log.WithFields(log.Fields{"module": logModule, "err": err}).Error("blockNotify GetBlockByHash")
435                                 return
436                         }
437
438                         m.sendNotification(NTRawBlockDisconnected, block)
439                 }
440
441                 block, _ := m.chain.GetBlockByHeight(m.status.BestHeight + 1)
442                 if block == nil {
443                         m.blockWaiter()
444                         continue
445                 }
446
447                 if m.status.BestHash != block.PreviousBlockHash {
448                         log.WithFields(log.Fields{
449                                 "module":                 logModule,
450                                 "block_height":           block.Height,
451                                 "status_block_hash":      m.status.BestHash.String(),
452                                 "retrive_block_PrevHash": block.PreviousBlockHash.String(),
453                         }).Warning("The previousBlockHash of the received block is not the same as the hash of the previous block")
454                         continue
455                 }
456
457                 m.sendNotification(NTRawBlockConnected, block)
458         }
459         m.wg.Done()
460 }
461
462 func (m *WSNotificationManager) blockWaiter() {
463         select {
464         case <-m.chain.BlockWaiter(m.status.BestHeight + 1):
465         case <-m.quit:
466         }
467 }
468
469 // Start starts the goroutines required for the manager to queue and process websocket client notifications.
470 func (m *WSNotificationManager) Start() error {
471         var err error
472         m.txMsgSub, err = m.eventDispatcher.Subscribe(protocol.TxMsgEvent{})
473         if err != nil {
474                 return err
475         }
476
477         m.wg.Add(4)
478         go m.blockNotify()
479         go m.queueHandler()
480         go m.notificationHandler()
481         go m.memPoolTxQueryLoop()
482         return nil
483 }
484
485 // WaitForShutdown blocks until all notification manager goroutines have finished.
486 func (m *WSNotificationManager) WaitForShutdown() {
487         m.wg.Wait()
488 }
489
490 // Shutdown shuts down the manager, stopping the notification queue and notification handler goroutines.
491 func (m *WSNotificationManager) Shutdown() {
492         close(m.quit)
493 }