OSDN Git Service

Merge pull request #201 from Bytom/v0.1
[bytom/vapor.git] / net / websocket / wsnotificationmaneger.go
index b39854a..f92e6ef 100644 (file)
@@ -7,6 +7,7 @@ import (
 
        log "github.com/sirupsen/logrus"
 
+       "github.com/vapor/event"
        "github.com/vapor/protocol"
        "github.com/vapor/protocol/bc"
        "github.com/vapor/protocol/bc/types"
@@ -15,7 +16,7 @@ import (
 // Notification types
 type notificationBlockConnected types.Block
 type notificationBlockDisconnected types.Block
-type notificationTxAcceptedByMempool types.Tx
+type notificationTxDescAcceptedByMempool protocol.TxDesc
 
 // Notification control requests
 type notificationRegisterClient WSClient
@@ -87,10 +88,12 @@ type WSNotificationManager struct {
        maxNumConcurrentReqs int
        status               statusInfo
        chain                *protocol.Chain
+       eventDispatcher      *event.Dispatcher
+       txMsgSub             *event.Subscription
 }
 
 // NewWsNotificationManager returns a new notification manager ready for use. See WSNotificationManager for more details.
-func NewWsNotificationManager(maxNumWebsockets int, maxNumConcurrentReqs int, chain *protocol.Chain) *WSNotificationManager {
+func NewWsNotificationManager(maxNumWebsockets int, maxNumConcurrentReqs int, chain *protocol.Chain, dispatcher *event.Dispatcher) *WSNotificationManager {
        // init status
        var status statusInfo
        header := chain.BestBlockHeader()
@@ -106,6 +109,7 @@ func NewWsNotificationManager(maxNumWebsockets int, maxNumConcurrentReqs int, ch
                maxNumConcurrentReqs: maxNumConcurrentReqs,
                status:               status,
                chain:                chain,
+               eventDispatcher:      dispatcher,
        }
 }
 
@@ -170,7 +174,8 @@ func (m *WSNotificationManager) sendNotification(typ NotificationType, data inte
                        log.WithField("module", logModule).Error("Chain connected notification is not a block")
                        break
                }
-
+               m.status.BestHeight = block.Height
+               m.status.BestHash = block.Hash()
                // Notify registered websocket clients of incoming block.
                m.NotifyBlockConnected(block)
 
@@ -180,7 +185,8 @@ func (m *WSNotificationManager) sendNotification(typ NotificationType, data inte
                        log.WithField("module", logModule).Error("Chain disconnected notification is not a block")
                        break
                }
-
+               m.status.BestHeight = block.Height - 1
+               m.status.BestHash = block.PreviousBlockHash
                // Notify registered websocket clients.
                m.NotifyBlockDisconnected(block)
        }
@@ -212,15 +218,36 @@ func (m *WSNotificationManager) NotifyBlockDisconnected(block *types.Block) {
        }
 }
 
-// NotifyMempoolTx passes a transaction accepted by mempool to the
-// notification manager for transaction notification processing.  If
-// isNew is true, the tx is is a new transaction, rather than one
-// added to the mempool during a reorg.
-func (m *WSNotificationManager) NotifyMempoolTx(tx *types.Tx) {
-       select {
-       case m.queueNotification <- (*notificationTxAcceptedByMempool)(tx):
-       case <-m.quit:
+// memPoolTxQueryLoop constantly pass a transaction accepted by mempool to the
+// notification manager for transaction notification processing.
+func (m *WSNotificationManager) memPoolTxQueryLoop() {
+out:
+       for {
+               select {
+               case obj, ok := <-m.txMsgSub.Chan():
+                       if !ok {
+                               log.WithFields(log.Fields{"module": logModule}).Warning("tx pool tx msg subscription channel closed")
+                               break out
+                       }
+
+                       ev, ok := obj.Data.(protocol.TxMsgEvent)
+                       if !ok {
+                               log.WithFields(log.Fields{"module": logModule}).Error("event type error")
+                               continue
+                       }
+
+                       if ev.TxMsg.MsgType == protocol.MsgNewTx {
+                               select {
+                               case m.queueNotification <- (*notificationTxDescAcceptedByMempool)(ev.TxMsg.TxDesc):
+                               default:
+                               }
+                       }
+               case <-m.quit:
+                       break out
+               }
        }
+
+       m.wg.Done()
 }
 
 // notificationHandler reads notifications and control messages from the queue handler and processes one at a time.
@@ -250,10 +277,10 @@ out:
                                        m.notifyBlockDisconnected(blockNotifications, block)
                                }
 
-                       case *notificationTxAcceptedByMempool:
-                               tx := (*types.Tx)(n)
+                       case *notificationTxDescAcceptedByMempool:
+                               txDesc := (*protocol.TxDesc)(n)
                                if len(txNotifications) != 0 {
-                                       m.notifyForNewTx(txNotifications, tx)
+                                       m.notifyForNewTx(txNotifications, txDesc)
                                }
 
                        case *notificationRegisterBlocks:
@@ -366,8 +393,8 @@ func (m *WSNotificationManager) UnregisterNewMempoolTxsUpdates(wsc *WSClient) {
 
 // notifyForNewTx notifies websocket clients that have registered for updates
 // when a new transaction is added to the memory pool.
-func (m *WSNotificationManager) notifyForNewTx(clients map[chan struct{}]*WSClient, tx *types.Tx) {
-       resp := NewWSResponse(NTNewTransaction.String(), tx, nil)
+func (m *WSNotificationManager) notifyForNewTx(clients map[chan struct{}]*WSClient, txDesc *protocol.TxDesc) {
+       resp := NewWSResponse(NTNewTransaction.String(), txDesc, nil)
        marshalledJSON, err := json.Marshal(resp)
        if err != nil {
                log.WithFields(log.Fields{"module": logModule, "error": err}).Error("Failed to marshal tx notification")
@@ -407,7 +434,7 @@ out:
                                log.WithFields(log.Fields{"module": logModule, "err": err}).Error("blockNotify GetBlockByHash")
                                return
                        }
-                       m.updateStatus(block)
+
                        m.sendNotification(NTRawBlockDisconnected, block)
                }
 
@@ -422,7 +449,6 @@ out:
                        continue
                }
 
-               m.updateStatus(block)
                m.sendNotification(NTRawBlockConnected, block)
        }
        m.wg.Done()
@@ -435,17 +461,20 @@ func (m *WSNotificationManager) blockWaiter() {
        }
 }
 
-func (m *WSNotificationManager) updateStatus(block *types.Block) {
-       m.status.BestHeight = block.Height
-       m.status.BestHash = block.Hash()
-}
-
 // Start starts the goroutines required for the manager to queue and process websocket client notifications.
-func (m *WSNotificationManager) Start() {
-       m.wg.Add(3)
+func (m *WSNotificationManager) Start() error {
+       var err error
+       m.txMsgSub, err = m.eventDispatcher.Subscribe(protocol.TxMsgEvent{})
+       if err != nil {
+               return err
+       }
+
+       m.wg.Add(4)
        go m.blockNotify()
        go m.queueHandler()
        go m.notificationHandler()
+       go m.memPoolTxQueryLoop()
+       return nil
 }
 
 // WaitForShutdown blocks until all notification manager goroutines have finished.