log "github.com/sirupsen/logrus"
+ "github.com/vapor/event"
"github.com/vapor/protocol"
"github.com/vapor/protocol/bc"
"github.com/vapor/protocol/bc/types"
// Notification types
type notificationBlockConnected types.Block
type notificationBlockDisconnected types.Block
-type notificationTxAcceptedByMempool types.Tx
+type notificationTxDescAcceptedByMempool protocol.TxDesc
// Notification control requests
type notificationRegisterClient WSClient
maxNumConcurrentReqs int
status statusInfo
chain *protocol.Chain
+ eventDispatcher *event.Dispatcher
+ txMsgSub *event.Subscription
}
// NewWsNotificationManager returns a new notification manager ready for use. See WSNotificationManager for more details.
-func NewWsNotificationManager(maxNumWebsockets int, maxNumConcurrentReqs int, chain *protocol.Chain) *WSNotificationManager {
+func NewWsNotificationManager(maxNumWebsockets int, maxNumConcurrentReqs int, chain *protocol.Chain, dispatcher *event.Dispatcher) *WSNotificationManager {
// init status
var status statusInfo
header := chain.BestBlockHeader()
maxNumConcurrentReqs: maxNumConcurrentReqs,
status: status,
chain: chain,
+ eventDispatcher: dispatcher,
}
}
log.WithField("module", logModule).Error("Chain connected notification is not a block")
break
}
-
+ m.status.BestHeight = block.Height
+ m.status.BestHash = block.Hash()
// Notify registered websocket clients of incoming block.
m.NotifyBlockConnected(block)
log.WithField("module", logModule).Error("Chain disconnected notification is not a block")
break
}
-
+ m.status.BestHeight = block.Height - 1
+ m.status.BestHash = block.PreviousBlockHash
// Notify registered websocket clients.
m.NotifyBlockDisconnected(block)
}
}
}
-// NotifyMempoolTx passes a transaction accepted by mempool to the
-// notification manager for transaction notification processing. If
-// isNew is true, the tx is is a new transaction, rather than one
-// added to the mempool during a reorg.
-func (m *WSNotificationManager) NotifyMempoolTx(tx *types.Tx) {
- select {
- case m.queueNotification <- (*notificationTxAcceptedByMempool)(tx):
- case <-m.quit:
+// memPoolTxQueryLoop constantly pass a transaction accepted by mempool to the
+// notification manager for transaction notification processing.
+func (m *WSNotificationManager) memPoolTxQueryLoop() {
+out:
+ for {
+ select {
+ case obj, ok := <-m.txMsgSub.Chan():
+ if !ok {
+ log.WithFields(log.Fields{"module": logModule}).Warning("tx pool tx msg subscription channel closed")
+ break out
+ }
+
+ ev, ok := obj.Data.(protocol.TxMsgEvent)
+ if !ok {
+ log.WithFields(log.Fields{"module": logModule}).Error("event type error")
+ continue
+ }
+
+ if ev.TxMsg.MsgType == protocol.MsgNewTx {
+ select {
+ case m.queueNotification <- (*notificationTxDescAcceptedByMempool)(ev.TxMsg.TxDesc):
+ default:
+ }
+ }
+ case <-m.quit:
+ break out
+ }
}
+
+ m.wg.Done()
}
// notificationHandler reads notifications and control messages from the queue handler and processes one at a time.
m.notifyBlockDisconnected(blockNotifications, block)
}
- case *notificationTxAcceptedByMempool:
- tx := (*types.Tx)(n)
+ case *notificationTxDescAcceptedByMempool:
+ txDesc := (*protocol.TxDesc)(n)
if len(txNotifications) != 0 {
- m.notifyForNewTx(txNotifications, tx)
+ m.notifyForNewTx(txNotifications, txDesc)
}
case *notificationRegisterBlocks:
// notifyForNewTx notifies websocket clients that have registered for updates
// when a new transaction is added to the memory pool.
-func (m *WSNotificationManager) notifyForNewTx(clients map[chan struct{}]*WSClient, tx *types.Tx) {
- resp := NewWSResponse(NTNewTransaction.String(), tx, nil)
+func (m *WSNotificationManager) notifyForNewTx(clients map[chan struct{}]*WSClient, txDesc *protocol.TxDesc) {
+ resp := NewWSResponse(NTNewTransaction.String(), txDesc, nil)
marshalledJSON, err := json.Marshal(resp)
if err != nil {
log.WithFields(log.Fields{"module": logModule, "error": err}).Error("Failed to marshal tx notification")
log.WithFields(log.Fields{"module": logModule, "err": err}).Error("blockNotify GetBlockByHash")
return
}
- m.updateStatus(block)
+
m.sendNotification(NTRawBlockDisconnected, block)
}
continue
}
- m.updateStatus(block)
m.sendNotification(NTRawBlockConnected, block)
}
m.wg.Done()
}
}
-func (m *WSNotificationManager) updateStatus(block *types.Block) {
- m.status.BestHeight = block.Height
- m.status.BestHash = block.Hash()
-}
-
// Start starts the goroutines required for the manager to queue and process websocket client notifications.
-func (m *WSNotificationManager) Start() {
- m.wg.Add(3)
+func (m *WSNotificationManager) Start() error {
+ var err error
+ m.txMsgSub, err = m.eventDispatcher.Subscribe(protocol.TxMsgEvent{})
+ if err != nil {
+ return err
+ }
+
+ m.wg.Add(4)
go m.blockNotify()
go m.queueHandler()
go m.notificationHandler()
+ go m.memPoolTxQueryLoop()
+ return nil
}
// WaitForShutdown blocks until all notification manager goroutines have finished.