OSDN Git Service

new repo
[bytom/vapor.git] / net / websocket / wsnotificationmaneger.go
1 package websocket
2
3 import (
4         "encoding/json"
5         "fmt"
6         "sync"
7
8         log "github.com/sirupsen/logrus"
9
10         "github.com/vapor/protocol"
11         "github.com/vapor/protocol/bc"
12         "github.com/vapor/protocol/bc/types"
13 )
14
15 // Notification types
16 type notificationBlockConnected types.Block
17 type notificationBlockDisconnected types.Block
18 type notificationTxAcceptedByMempool types.Tx
19
20 // Notification control requests
21 type notificationRegisterClient WSClient
22 type notificationUnregisterClient WSClient
23 type notificationRegisterBlocks WSClient
24 type notificationUnregisterBlocks WSClient
25 type notificationRegisterNewMempoolTxs WSClient
26 type notificationUnregisterNewMempoolTxs WSClient
27
28 // NotificationType represents the type of a notification message.
29 type NotificationType int
30
31 // Constants for the type of a notification message.
32 const (
33         // NTBlockConnected indicates the associated block was connected to the main chain.
34         NTRawBlockConnected NotificationType = iota
35         // NTBlockDisconnected indicates the associated block was disconnected  from the main chain.
36         NTRawBlockDisconnected
37         NTNewTransaction
38         NTRequestStatus
39 )
40
41 // notificationTypeStrings is a map of notification types back to their constant
42 // names for pretty printing.
43 var notificationTypeStrings = map[NotificationType]string{
44         NTRawBlockConnected:    "raw_blocks_connected",
45         NTRawBlockDisconnected: "raw_blocks_disconnected",
46         NTNewTransaction:       "new_transaction",
47         NTRequestStatus:        "request_status",
48 }
49
50 // String returns the NotificationType in human-readable form.
51 func (n NotificationType) String() string {
52         if s, ok := notificationTypeStrings[n]; ok {
53                 return s
54         }
55         return fmt.Sprintf("Unknown Notification Type (%d)", int(n))
56 }
57
58 type statusInfo struct {
59         BestHeight uint64
60         BestHash   bc.Hash
61 }
62
63 // WSNotificationManager is a connection and notification manager used for
64 // websockets.  It allows websocket clients to register for notifications they
65 // are interested in.  When an event happens elsewhere in the code such as
66 // transactions being added to the memory pool or block connects/disconnects,
67 // the notification manager is provided with the relevant details needed to
68 // figure out which websocket clients need to be notified based on what they
69 // have registered for and notifies them accordingly.  It is also used to keep
70 // track of all connected websocket clients.
71 type WSNotificationManager struct {
72         // queueNotification queues a notification for handling.
73         queueNotification chan interface{}
74
75         // notificationMsgs feeds notificationHandler with notifications
76         // and client (un)registeration requests from a queue as well as
77         // registeration and unregisteration requests from clients.
78         notificationMsgs chan interface{}
79
80         // Access channel for current number of connected clients.
81         numClients chan int
82
83         // Shutdown handling
84         wg                   sync.WaitGroup
85         quit                 chan struct{}
86         MaxNumWebsockets     int
87         maxNumConcurrentReqs int
88         status               statusInfo
89         chain                *protocol.Chain
90 }
91
92 // NewWsNotificationManager returns a new notification manager ready for use. See WSNotificationManager for more details.
93 func NewWsNotificationManager(maxNumWebsockets int, maxNumConcurrentReqs int, chain *protocol.Chain) *WSNotificationManager {
94         // init status
95         var status statusInfo
96         header := chain.BestBlockHeader()
97         status.BestHeight = header.Height
98         status.BestHash = header.Hash()
99
100         return &WSNotificationManager{
101                 queueNotification:    make(chan interface{}),
102                 notificationMsgs:     make(chan interface{}),
103                 numClients:           make(chan int),
104                 quit:                 make(chan struct{}),
105                 MaxNumWebsockets:     maxNumWebsockets,
106                 maxNumConcurrentReqs: maxNumConcurrentReqs,
107                 status:               status,
108                 chain:                chain,
109         }
110 }
111
112 // queueHandler manages a queue of empty interfaces, reading from in and
113 // sending the oldest unsent to out.  This handler stops when either of the
114 // in or quit channels are closed, and closes out before returning, without
115 // waiting to send any variables still remaining in the queue.
116 func queueHandler(in <-chan interface{}, out chan<- interface{}, quit <-chan struct{}) {
117         var (
118                 q       []interface{}
119                 next    interface{}
120                 dequeue chan<- interface{}
121         )
122
123         skipQueue := out
124
125 out:
126         for {
127                 select {
128                 case n, ok := <-in:
129                         if !ok {
130                                 // Sender closed input channel.
131                                 break out
132                         }
133
134                         // Either send to out immediately if skipQueue is
135                         // non-nil (queue is empty) and reader is ready,
136                         // or append to the queue and send later.
137                         select {
138                         case skipQueue <- n:
139
140                         default:
141                                 q = append(q, n)
142                                 dequeue = out
143                                 skipQueue = nil
144                                 next = q[0]
145                         }
146
147                 case dequeue <- next:
148                         copy(q, q[1:])
149                         q[len(q)-1] = nil // avoid leak
150                         q = q[:len(q)-1]
151                         if len(q) == 0 {
152                                 dequeue = nil
153                                 skipQueue = out
154                         } else {
155                                 next = q[0]
156                         }
157
158                 case <-quit:
159                         break out
160                 }
161         }
162         close(out)
163 }
164
165 func (m *WSNotificationManager) sendNotification(typ NotificationType, data interface{}) {
166         switch typ {
167         case NTRawBlockConnected:
168                 block, ok := data.(*types.Block)
169                 if !ok {
170                         log.WithField("module", logModule).Error("Chain connected notification is not a block")
171                         break
172                 }
173
174                 // Notify registered websocket clients of incoming block.
175                 m.NotifyBlockConnected(block)
176
177         case NTRawBlockDisconnected:
178                 block, ok := data.(*types.Block)
179                 if !ok {
180                         log.WithField("module", logModule).Error("Chain disconnected notification is not a block")
181                         break
182                 }
183
184                 // Notify registered websocket clients.
185                 m.NotifyBlockDisconnected(block)
186         }
187 }
188
189 // queueHandler maintains a queue of notifications and notification handler
190 // control messages.
191 func (m *WSNotificationManager) queueHandler() {
192         queueHandler(m.queueNotification, m.notificationMsgs, m.quit)
193         m.wg.Done()
194 }
195
196 // NotifyBlockConnected passes a block newly-connected to the best chain
197 // to the notification manager for block and transaction notification
198 // processing.
199 func (m *WSNotificationManager) NotifyBlockConnected(block *types.Block) {
200         select {
201         case m.queueNotification <- (*notificationBlockConnected)(block):
202         case <-m.quit:
203         }
204 }
205
206 // NotifyBlockDisconnected passes a block disconnected from the best chain
207 // to the notification manager for block notification processing.
208 func (m *WSNotificationManager) NotifyBlockDisconnected(block *types.Block) {
209         select {
210         case m.queueNotification <- (*notificationBlockDisconnected)(block):
211         case <-m.quit:
212         }
213 }
214
215 // NotifyMempoolTx passes a transaction accepted by mempool to the
216 // notification manager for transaction notification processing.  If
217 // isNew is true, the tx is is a new transaction, rather than one
218 // added to the mempool during a reorg.
219 func (m *WSNotificationManager) NotifyMempoolTx(tx *types.Tx) {
220         select {
221         case m.queueNotification <- (*notificationTxAcceptedByMempool)(tx):
222         case <-m.quit:
223         }
224 }
225
226 // notificationHandler reads notifications and control messages from the queue handler and processes one at a time.
227 func (m *WSNotificationManager) notificationHandler() {
228         // clients is a map of all currently connected websocket clients.
229         clients := make(map[chan struct{}]*WSClient)
230         blockNotifications := make(map[chan struct{}]*WSClient)
231         txNotifications := make(map[chan struct{}]*WSClient)
232
233 out:
234         for {
235                 select {
236                 case n, ok := <-m.notificationMsgs:
237                         if !ok {
238                                 break out
239                         }
240                         switch n := n.(type) {
241                         case *notificationBlockConnected:
242                                 block := (*types.Block)(n)
243                                 if len(blockNotifications) != 0 {
244                                         m.notifyBlockConnected(blockNotifications, block)
245                                 }
246
247                         case *notificationBlockDisconnected:
248                                 block := (*types.Block)(n)
249                                 if len(blockNotifications) != 0 {
250                                         m.notifyBlockDisconnected(blockNotifications, block)
251                                 }
252
253                         case *notificationTxAcceptedByMempool:
254                                 tx := (*types.Tx)(n)
255                                 if len(txNotifications) != 0 {
256                                         m.notifyForNewTx(txNotifications, tx)
257                                 }
258
259                         case *notificationRegisterBlocks:
260                                 wsc := (*WSClient)(n)
261                                 blockNotifications[wsc.quit] = wsc
262
263                         case *notificationUnregisterBlocks:
264                                 wsc := (*WSClient)(n)
265                                 delete(blockNotifications, wsc.quit)
266
267                         case *notificationRegisterNewMempoolTxs:
268                                 wsc := (*WSClient)(n)
269                                 txNotifications[wsc.quit] = wsc
270
271                         case *notificationUnregisterNewMempoolTxs:
272                                 wsc := (*WSClient)(n)
273                                 delete(txNotifications, wsc.quit)
274
275                         case *notificationRegisterClient:
276                                 wsc := (*WSClient)(n)
277                                 clients[wsc.quit] = wsc
278
279                         case *notificationUnregisterClient:
280                                 wsc := (*WSClient)(n)
281                                 delete(blockNotifications, wsc.quit)
282                                 delete(txNotifications, wsc.quit)
283                                 delete(clients, wsc.quit)
284
285                         default:
286                                 log.Warnf("Unhandled notification type")
287                         }
288
289                 case m.numClients <- len(clients):
290
291                 case <-m.quit:
292                         break out
293                 }
294         }
295
296         for _, c := range clients {
297                 c.Disconnect()
298         }
299         m.wg.Done()
300 }
301
302 // NumClients returns the number of clients actively being served.
303 func (m *WSNotificationManager) NumClients() (n int) {
304         select {
305         case n = <-m.numClients:
306         case <-m.quit:
307         }
308         return
309 }
310
311 // IsMaxConnect returns whether the maximum connection is exceeded
312 func (m *WSNotificationManager) IsMaxConnect() bool {
313         return m.NumClients() >= m.MaxNumWebsockets
314 }
315
316 // RegisterBlockUpdates requests block update notifications to the passed websocket client.
317 func (m *WSNotificationManager) RegisterBlockUpdates(wsc *WSClient) {
318         m.queueNotification <- (*notificationRegisterBlocks)(wsc)
319 }
320
321 // UnregisterBlockUpdates removes block update notifications for the passed websocket client.
322 func (m *WSNotificationManager) UnregisterBlockUpdates(wsc *WSClient) {
323         m.queueNotification <- (*notificationUnregisterBlocks)(wsc)
324 }
325
326 // notifyBlockConnected notifies websocket clients that have registered for block updates when a block is connected to the main chain.
327 func (*WSNotificationManager) notifyBlockConnected(clients map[chan struct{}]*WSClient, block *types.Block) {
328         resp := NewWSResponse(NTRawBlockConnected.String(), block, nil)
329         marshalledJSON, err := json.Marshal(resp)
330         if err != nil {
331                 log.WithFields(log.Fields{"module": logModule, "error": err}).Error("Failed to marshal block connected notification")
332                 return
333         }
334
335         for _, wsc := range clients {
336                 wsc.QueueNotification(marshalledJSON)
337         }
338 }
339
340 // notifyBlockDisconnected notifies websocket clients that have registered for block updates
341 // when a block is disconnected from the main chain (due to a reorganize).
342 func (*WSNotificationManager) notifyBlockDisconnected(clients map[chan struct{}]*WSClient, block *types.Block) {
343         resp := NewWSResponse(NTRawBlockDisconnected.String(), block, nil)
344         marshalledJSON, err := json.Marshal(resp)
345         if err != nil {
346                 log.WithField("error", err).Error("Failed to marshal block Disconnected notification")
347                 return
348         }
349
350         for _, wsc := range clients {
351                 wsc.QueueNotification(marshalledJSON)
352         }
353 }
354
355 // RegisterNewMempoolTxsUpdates requests notifications to the passed websocket
356 // client when new transactions are added to the memory pool.
357 func (m *WSNotificationManager) RegisterNewMempoolTxsUpdates(wsc *WSClient) {
358         m.queueNotification <- (*notificationRegisterNewMempoolTxs)(wsc)
359 }
360
361 // UnregisterNewMempoolTxsUpdates removes notifications to the passed websocket
362 // client when new transaction are added to the memory pool.
363 func (m *WSNotificationManager) UnregisterNewMempoolTxsUpdates(wsc *WSClient) {
364         m.queueNotification <- (*notificationUnregisterNewMempoolTxs)(wsc)
365 }
366
367 // notifyForNewTx notifies websocket clients that have registered for updates
368 // when a new transaction is added to the memory pool.
369 func (m *WSNotificationManager) notifyForNewTx(clients map[chan struct{}]*WSClient, tx *types.Tx) {
370         resp := NewWSResponse(NTNewTransaction.String(), tx, nil)
371         marshalledJSON, err := json.Marshal(resp)
372         if err != nil {
373                 log.WithFields(log.Fields{"module": logModule, "error": err}).Error("Failed to marshal tx notification")
374                 return
375         }
376
377         for _, wsc := range clients {
378                 wsc.QueueNotification(marshalledJSON)
379         }
380 }
381
382 // AddClient adds the passed websocket client to the notification manager.
383 func (m *WSNotificationManager) AddClient(wsc *WSClient) {
384         m.queueNotification <- (*notificationRegisterClient)(wsc)
385 }
386
387 // RemoveClient removes the passed websocket client and all notifications registered for it.
388 func (m *WSNotificationManager) RemoveClient(wsc *WSClient) {
389         select {
390         case m.queueNotification <- (*notificationUnregisterClient)(wsc):
391         case <-m.quit:
392         }
393 }
394
395 func (m *WSNotificationManager) blockNotify() {
396 out:
397         for {
398                 select {
399                 case <-m.quit:
400                         break out
401
402                 default:
403                 }
404                 for !m.chain.InMainChain(m.status.BestHash) {
405                         block, err := m.chain.GetBlockByHash(&m.status.BestHash)
406                         if err != nil {
407                                 log.WithFields(log.Fields{"module": logModule, "err": err}).Error("blockNotify GetBlockByHash")
408                                 return
409                         }
410                         m.updateStatus(block)
411                         m.sendNotification(NTRawBlockDisconnected, block)
412                 }
413
414                 block, _ := m.chain.GetBlockByHeight(m.status.BestHeight + 1)
415                 if block == nil {
416                         m.blockWaiter()
417                         continue
418                 }
419
420                 if m.status.BestHash != block.PreviousBlockHash {
421                         log.WithFields(log.Fields{"module": logModule, "blockHeight": block.Height, "previousBlockHash": m.status.BestHash, "rcvBlockPrevHash": block.PreviousBlockHash}).Warning("The previousBlockHash of the received block is not the same as the hash of the previous block")
422                         continue
423                 }
424
425                 m.updateStatus(block)
426                 m.sendNotification(NTRawBlockConnected, block)
427         }
428         m.wg.Done()
429 }
430
431 func (m *WSNotificationManager) blockWaiter() {
432         select {
433         case <-m.chain.BlockWaiter(m.status.BestHeight + 1):
434         case <-m.quit:
435         }
436 }
437
438 func (m *WSNotificationManager) updateStatus(block *types.Block) {
439         m.status.BestHeight = block.Height
440         m.status.BestHash = block.Hash()
441 }
442
443 // Start starts the goroutines required for the manager to queue and process websocket client notifications.
444 func (m *WSNotificationManager) Start() {
445         m.wg.Add(3)
446         go m.blockNotify()
447         go m.queueHandler()
448         go m.notificationHandler()
449 }
450
451 // WaitForShutdown blocks until all notification manager goroutines have finished.
452 func (m *WSNotificationManager) WaitForShutdown() {
453         m.wg.Wait()
454 }
455
456 // Shutdown shuts down the manager, stopping the notification queue and notification handler goroutines.
457 func (m *WSNotificationManager) Shutdown() {
458         close(m.quit)
459 }