"github.com/bytom/net/http/gzip"
"github.com/bytom/net/http/httpjson"
"github.com/bytom/net/http/static"
+ "github.com/bytom/net/websocket"
"github.com/bytom/netsync"
"github.com/bytom/protocol"
"github.com/bytom/protocol/bc"
// API is the scheduling center for server
type API struct {
- sync *netsync.SyncManager
- wallet *wallet.Wallet
- accessTokens *accesstoken.CredentialStore
- chain *protocol.Chain
- server *http.Server
- handler http.Handler
- txFeedTracker *txfeed.Tracker
- cpuMiner *cpuminer.CPUMiner
- miningPool *miningpool.MiningPool
-
- newBlockCh chan *bc.Hash
+ sync *netsync.SyncManager
+ wallet *wallet.Wallet
+ accessTokens *accesstoken.CredentialStore
+ chain *protocol.Chain
+ server *http.Server
+ handler http.Handler
+ txFeedTracker *txfeed.Tracker
+ cpuMiner *cpuminer.CPUMiner
+ miningPool *miningpool.MiningPool
+ notificationMgr *websocket.WSNotificationManager
+ newBlockCh chan *bc.Hash
}
func (a *API) initServer(config *cfg.Config) {
}
// NewAPI create and initialize the API
-func NewAPI(sync *netsync.SyncManager, wallet *wallet.Wallet, txfeeds *txfeed.Tracker, cpuMiner *cpuminer.CPUMiner, miningPool *miningpool.MiningPool, chain *protocol.Chain, config *cfg.Config, token *accesstoken.CredentialStore, newBlockCh chan *bc.Hash) *API {
+func NewAPI(sync *netsync.SyncManager, wallet *wallet.Wallet, txfeeds *txfeed.Tracker, cpuMiner *cpuminer.CPUMiner, miningPool *miningpool.MiningPool, chain *protocol.Chain, config *cfg.Config, token *accesstoken.CredentialStore, newBlockCh chan *bc.Hash, notificationMgr *websocket.WSNotificationManager) *API {
api := &API{
sync: sync,
wallet: wallet,
cpuMiner: cpuMiner,
miningPool: miningPool,
- newBlockCh: newBlockCh,
+ newBlockCh: newBlockCh,
+ notificationMgr: notificationMgr,
}
api.buildHandler()
api.initServer(config)
m.Handle("/get-merkle-proof", jsonHandler(a.getMerkleProof))
+ m.HandleFunc("/websocket-subscribe", a.websocketHandler)
+
handler := latencyHandler(m, walletEnable)
handler = webAssetsHandler(handler)
handler = gzip.Handler{Handler: handler}
--- /dev/null
+package api
+
+import (
+ "net/http"
+ "time"
+
+ log "github.com/sirupsen/logrus"
+
+ "github.com/bytom/net/websocket"
+)
+
+// timeZeroVal is simply the zero value for a time.Time and is used to avoid
+// creating multiple instances.
+var timeZeroVal time.Time
+
+// WebsocketHandler handles connections and requests from websocket client
+func (a *API) websocketHandler(w http.ResponseWriter, r *http.Request) {
+ log.WithField("remoteAddress", r.RemoteAddr).Info("New websocket client")
+
+ client, err := websocket.NewWebsocketClient(w, r, a.notificationMgr)
+ if err != nil {
+ log.WithField("error", err).Error("Failed to new websocket client")
+ http.Error(w, "400 Bad Request.", http.StatusBadRequest)
+ return
+ }
+
+ a.notificationMgr.AddClient(client)
+ client.Start()
+ client.WaitForShutdown()
+ a.notificationMgr.RemoveClient(client)
+ log.WithField("address", r.RemoteAddr).Infoln("Disconnected websocket client")
+}
// log flags
runNodeCmd.Flags().String("log_file", config.LogFile, "Log output file")
+ // websocket flags
+ runNodeCmd.Flags().Int("ws.max_num_websockets", config.Websocket.MaxNumWebsockets, "Max number of websocket connections")
+ runNodeCmd.Flags().Int("ws.max_num_concurrent_reqs", config.Websocket.MaxNumConcurrentReqs, "Max number of concurrent websocket requests that may be processed concurrently")
+
RootCmd.AddCommand(runNodeCmd)
}
// Top level options use an anonymous struct
BaseConfig `mapstructure:",squash"`
// Options for services
- P2P *P2PConfig `mapstructure:"p2p"`
- Wallet *WalletConfig `mapstructure:"wallet"`
- Auth *RPCAuthConfig `mapstructure:"auth"`
- Web *WebConfig `mapstructure:"web"`
- Simd *SimdConfig `mapstructure:"simd"`
+ P2P *P2PConfig `mapstructure:"p2p"`
+ Wallet *WalletConfig `mapstructure:"wallet"`
+ Auth *RPCAuthConfig `mapstructure:"auth"`
+ Web *WebConfig `mapstructure:"web"`
+ Simd *SimdConfig `mapstructure:"simd"`
+ Websocket *WebsocketConfig `mapstructure:"ws"`
}
// Default configurable parameters.
Auth: DefaultRPCAuthConfig(),
Web: DefaultWebConfig(),
Simd: DefaultSimdConfig(),
+ Websocket: DefaultWebsocketConfig(),
}
}
Enable bool `mapstructure:"enable"`
}
+type WebsocketConfig struct {
+ MaxNumWebsockets int `mapstructure:"max_num_websockets"`
+ MaxNumConcurrentReqs int `mapstructure:"max_num_concurrent_reqs"`
+}
+
// Default configurable rpc's auth parameters.
func DefaultRPCAuthConfig() *RPCAuthConfig {
return &RPCAuthConfig{
}
}
+func DefaultWebsocketConfig() *WebsocketConfig {
+ return &WebsocketConfig{
+ MaxNumWebsockets: 25,
+ MaxNumConcurrentReqs: 20,
+ }
+}
+
//-----------------------------------------------------------------------------
// Utils
--- /dev/null
+package websocket
+
+import (
+ "container/list"
+ "encoding/json"
+ "fmt"
+ "io"
+ "net/http"
+ "sync"
+ "time"
+
+ "github.com/gorilla/websocket"
+ log "github.com/sirupsen/logrus"
+
+ "github.com/bytom/errors"
+)
+
+// websocketSendBufferSize is the number of elements the send channel
+// can queue before blocking. Note that this only applies to requests
+// handled directly in the websocket client input handler or the async
+// handler since notifications have their own queuing mechanism
+// independent of the send channel buffer.
+const (
+ logModule = "websocket"
+ websocketSendBufferSize = 50
+)
+
+var (
+ // ErrWSParse means a request parsing error
+ ErrWSParse = errors.New("Websocket request parsing error")
+ // ErrWSInternal means service handling errors
+ ErrWSInternal = errors.New("Websocket Internal error")
+ // ErrWSClientQuit means the websocket client is disconnected
+ ErrWSClientQuit = errors.New("Websocket client quit")
+
+ // timeZeroVal is simply the zero value for a time.Time and is used to avoid creating multiple instances.
+ timeZeroVal time.Time
+)
+
+type semaphore chan struct{}
+
+func makeSemaphore(n int) semaphore {
+ return make(chan struct{}, n)
+}
+
+func (s semaphore) acquire() { s <- struct{}{} }
+func (s semaphore) release() { <-s }
+
+// wsTopicHandler describes a callback function used to handle a specific topic.
+type wsTopicHandler func(*WSClient)
+
+// wsHandlers maps websocket topic strings to appropriate websocket handler
+// functions. This is set by init because help references wsHandlers and thus
+// causes a dependency loop.
+var wsHandlers = map[string]wsTopicHandler{
+ "notify_raw_blocks": handleNotifyBlocks,
+ "notify_new_transactions": handleNotifyNewTransactions,
+ "stop_notify_raw_blocks": handleStopNotifyBlocks,
+ "stop_notify_new_transactions": handleStopNotifyNewTransactions,
+}
+
+// responseMessage houses a message to send to a connected websocket client as
+// well as a channel to reply on when the message is sent.
+type responseMessage struct {
+ msg []byte
+ doneChan chan bool
+}
+
+// WSClient provides an abstraction for handling a websocket client. The
+// overall data flow is split into 3 main goroutines, a possible 4th goroutine
+// for long-running operations (only started if request is made), and a
+// websocket manager which is used to allow things such as broadcasting
+// requested notifications to all connected websocket clients. Inbound
+// messages are read via the inHandler goroutine and generally dispatched to
+// their own handler. However, certain potentially long-running operations such
+// as rescans, are sent to the asyncHander goroutine and are limited to one at a
+// time. There are two outbound message types - one for responding to client
+// requests and another for async notifications. Responses to client requests
+// use SendMessage which employs a buffered channel thereby limiting the number
+// of outstanding requests that can be made. Notifications are sent via
+// QueueNotification which implements a queue via notificationQueueHandler to
+// ensure sending notifications from other subsystems can't block. Ultimately,
+// all messages are sent via the outHandler.
+type WSClient struct {
+ sync.Mutex
+ conn *websocket.Conn
+ // disconnected indicated whether or not the websocket client is disconnected.
+ disconnected bool
+ // addr is the remote address of the client.
+ addr string
+ serviceRequestSem semaphore
+ ntfnChan chan []byte
+ sendChan chan responseMessage
+ quit chan struct{}
+ wg sync.WaitGroup
+ notificationMgr *WSNotificationManager
+}
+
+// NewWebsocketClient means to create a new object to the connected websocket client
+func NewWebsocketClient(w http.ResponseWriter, r *http.Request, notificationMgr *WSNotificationManager) (*WSClient, error) {
+ // Limit max number of websocket clients.
+ if notificationMgr.IsMaxConnect() {
+ return nil, fmt.Errorf("numOfMaxWS: %d, disconnecting: %s", notificationMgr.MaxNumWebsockets, r.RemoteAddr)
+ }
+
+ // Attempt to upgrade the connection to a websocket connection using the default size for read/write buffers.
+ conn, err := websocket.Upgrade(w, r, nil, 0, 0)
+ if err != nil {
+ return nil, err
+ }
+
+ conn.SetReadDeadline(timeZeroVal)
+
+ client := &WSClient{
+ conn: conn,
+ addr: r.RemoteAddr,
+ serviceRequestSem: makeSemaphore(notificationMgr.maxNumConcurrentReqs),
+ ntfnChan: make(chan []byte, 1), // nonblocking sync
+ sendChan: make(chan responseMessage, websocketSendBufferSize),
+ quit: make(chan struct{}),
+ notificationMgr: notificationMgr,
+ }
+ return client, nil
+}
+
+// inHandler handles all incoming messages for the websocket connection.
+func (c *WSClient) inHandler() {
+out:
+ for {
+ // Break out of the loop once the quit channel has been closed.
+ // Use a non-blocking select here so we fall through otherwise.
+ select {
+ case <-c.quit:
+ break out
+ default:
+ }
+
+ _, msg, err := c.conn.ReadMessage()
+ if err != nil {
+ if err != io.EOF {
+ log.WithFields(log.Fields{"module": logModule, "remoteAddress": c.addr, "error": err}).Error("Websocket receive error")
+ }
+ break out
+ }
+
+ var request WSRequest
+ if err = json.Unmarshal(msg, &request); err != nil {
+ respError := errors.Wrap(err, ErrWSParse)
+ resp := NewWSResponse(NTRequestStatus.String(), nil, respError)
+ reply, err := json.Marshal(resp)
+ if err != nil {
+ log.WithFields(log.Fields{"module": logModule, "error": err}).Error("Failed to marshal parse failure reply")
+ continue
+ }
+
+ c.SendMessage(reply, nil)
+ continue
+ }
+
+ c.serviceRequestSem.acquire()
+ go func() {
+ c.serviceRequest(request.Topic)
+ c.serviceRequestSem.release()
+ }()
+ }
+
+ // Ensure the connection is closed.
+ c.Disconnect()
+ c.wg.Done()
+ log.WithFields(log.Fields{"module": logModule, "remoteAddress": c.addr}).Debug("Websocket client input handler done")
+}
+
+func (c *WSClient) serviceRequest(topic string) {
+ var respErr error
+
+ if wsHandler, ok := wsHandlers[topic]; ok {
+ wsHandler(c)
+ } else {
+ err := fmt.Errorf("There is not this topic: %s", topic)
+ respErr = errors.Wrap(err, ErrWSInternal)
+ log.WithFields(log.Fields{"module": logModule, "topic": topic}).Debug("There is not this topic")
+ }
+
+ resp := NewWSResponse(NTRequestStatus.String(), nil, respErr)
+ reply, err := json.Marshal(resp)
+ if err != nil {
+ log.WithFields(log.Fields{"module": logModule, "error": err}).Debug("Failed to marshal parse failure reply")
+ return
+ }
+
+ c.SendMessage(reply, nil)
+}
+
+// notificationQueueHandler handles the queuing of outgoing notifications for the websocket client.
+func (c *WSClient) notificationQueueHandler() {
+ ntfnSentChan := make(chan bool, 1) // nonblocking sync
+
+ // pendingNtfns is used as a queue for notifications that are ready to
+ // be sent once there are no outstanding notifications currently being
+ // sent.
+ pendingNtfns := list.New()
+ waiting := false
+out:
+ for {
+ select {
+ // This channel is notified when a message is being queued to
+ // be sent across the network socket. It will either send the
+ // message immediately if a send is not already in progress, or
+ // queue the message to be sent once the other pending messages
+ // are sent.
+ case msg := <-c.ntfnChan:
+ if !waiting {
+ c.SendMessage(msg, ntfnSentChan)
+ } else {
+ pendingNtfns.PushBack(msg)
+ }
+ waiting = true
+ // This channel is notified when a notification has been sent across the network socket.
+ case <-ntfnSentChan:
+ // No longer waiting if there are no more messages in the pending messages queue.
+ next := pendingNtfns.Front()
+ if next == nil {
+ waiting = false
+ continue
+ }
+
+ // Notify the outHandler about the next item to asynchronously send.
+ msg := pendingNtfns.Remove(next).([]byte)
+ c.SendMessage(msg, ntfnSentChan)
+ case <-c.quit:
+ break out
+ }
+ }
+
+ // Drain any wait channels before exiting so nothing is left waiting around to send.
+cleanup:
+ for {
+ select {
+ case <-c.ntfnChan:
+ case <-ntfnSentChan:
+ default:
+ break cleanup
+ }
+ }
+ c.wg.Done()
+ log.WithFields(log.Fields{"module": logModule, "remoteAddress": c.addr}).Debug("Websocket client notification queue handler done")
+}
+
+// outHandler handles all outgoing messages for the websocket connection.
+func (c *WSClient) outHandler() {
+out:
+ for {
+ // Send any messages ready for send until the quit channel is closed.
+ select {
+ case r := <-c.sendChan:
+ if err := c.conn.WriteMessage(websocket.TextMessage, r.msg); err != nil {
+ log.WithFields(log.Fields{"module": logModule, "error": err}).Error("Failed to send message to wesocket client")
+ c.Disconnect()
+ break out
+ }
+ if r.doneChan != nil {
+ r.doneChan <- true
+ }
+ case <-c.quit:
+ break out
+ }
+ }
+
+ // Drain any wait channels before exiting so nothing is left waiting around to send.
+cleanup:
+ for {
+ select {
+ case r := <-c.sendChan:
+ if r.doneChan != nil {
+ r.doneChan <- false
+ }
+ default:
+ break cleanup
+ }
+ }
+ c.wg.Done()
+ log.WithFields(log.Fields{"module": logModule, "remoteAddress": c.addr}).Debug("Websocket client output handler done")
+}
+
+// SendMessage sends the passed json to the websocket client. It is backed
+// by a buffered channel, so it will not block until the send channel is full.
+// Note however that QueueNotification must be used for sending async
+// notifications instead of the this function. This approach allows a limit to
+// the number of outstanding requests a client can make without preventing or
+// blocking on async notifications.
+func (c *WSClient) SendMessage(marshalledJSON []byte, doneChan chan bool) {
+ // Don't send the message if disconnected.
+ if c.Disconnected() {
+ if doneChan != nil {
+ doneChan <- false
+ }
+ return
+ }
+
+ c.sendChan <- responseMessage{msg: marshalledJSON, doneChan: doneChan}
+}
+
+// QueueNotification queues the passed notification to be sent to the websocket client.
+func (c *WSClient) QueueNotification(marshalledJSON []byte) error {
+ // Don't queue the message if disconnected.
+ if c.Disconnected() {
+ return ErrWSClientQuit
+ }
+
+ c.ntfnChan <- marshalledJSON
+ return nil
+}
+
+// Disconnected returns whether or not the websocket client is disconnected.
+func (c *WSClient) Disconnected() bool {
+ c.Lock()
+ defer c.Unlock()
+
+ return c.disconnected
+}
+
+// Disconnect disconnects the websocket client.
+func (c *WSClient) Disconnect() {
+ c.Lock()
+ defer c.Unlock()
+
+ // Nothing to do if already disconnected.
+ if c.disconnected {
+ return
+ }
+
+ log.WithFields(log.Fields{"module": logModule, "remoteAddress": c.addr}).Info("Disconnecting websocket client")
+
+ close(c.quit)
+ c.conn.Close()
+ c.disconnected = true
+}
+
+// Start begins processing input and output messages.
+func (c *WSClient) Start() {
+ log.WithFields(log.Fields{"module": logModule, "remoteAddress": c.addr}).Info("Starting websocket client")
+
+ c.wg.Add(3)
+ go c.inHandler()
+ go c.notificationQueueHandler()
+ go c.outHandler()
+}
+
+// WaitForShutdown blocks until the websocket client goroutines are stopped and the connection is closed.
+func (c *WSClient) WaitForShutdown() {
+ c.wg.Wait()
+}
+
+// handleNotifyBlocks implements the notifyblocks topic extension for websocket connections.
+func handleNotifyBlocks(wsc *WSClient) {
+ wsc.notificationMgr.RegisterBlockUpdates(wsc)
+}
+
+// handleStopNotifyBlocks implements the stopnotifyblocks topic extension for websocket connections.
+func handleStopNotifyBlocks(wsc *WSClient) {
+ wsc.notificationMgr.UnregisterBlockUpdates(wsc)
+}
+
+// handleNotifyNewTransations implements the notifynewtransactions topic extension for websocket connections.
+func handleNotifyNewTransactions(wsc *WSClient) {
+ wsc.notificationMgr.RegisterNewMempoolTxsUpdates(wsc)
+}
+
+// handleStopNotifyNewTransations implements the stopnotifynewtransactions topic extension for websocket connections.
+func handleStopNotifyNewTransactions(wsc *WSClient) {
+ wsc.notificationMgr.UnregisterNewMempoolTxsUpdates(wsc)
+}
--- /dev/null
+package websocket
+
+// WSRequest means the data structure of the request
+type WSRequest struct {
+ Topic string `json:"topic"`
+}
+
+// NewWSRequest creates a request data object
+func NewWSRequest(topic string) *WSRequest {
+ return &WSRequest{
+ Topic: topic,
+ }
+}
+
+// WSResponse means the returned data structure
+type WSResponse struct {
+ NotificationType string `json:"notification_type"`
+ Data interface{} `json:"data"`
+ ErrorDetail string `json:"error_detail,omitempty"`
+}
+
+// NewWSResponse creates a return data object
+func NewWSResponse(notificationType string, data interface{}, err error) *WSResponse {
+ wsResp := &WSResponse{
+ NotificationType: notificationType,
+ Data: data,
+ }
+
+ if err != nil {
+ wsResp.ErrorDetail = err.Error()
+ }
+
+ return wsResp
+}
--- /dev/null
+package websocket
+
+import (
+ "encoding/json"
+ "fmt"
+ "sync"
+
+ log "github.com/sirupsen/logrus"
+
+ "github.com/bytom/protocol"
+ "github.com/bytom/protocol/bc"
+ "github.com/bytom/protocol/bc/types"
+)
+
+// Notification types
+type notificationBlockConnected types.Block
+type notificationBlockDisconnected types.Block
+type notificationTxAcceptedByMempool types.Tx
+
+// Notification control requests
+type notificationRegisterClient WSClient
+type notificationUnregisterClient WSClient
+type notificationRegisterBlocks WSClient
+type notificationUnregisterBlocks WSClient
+type notificationRegisterNewMempoolTxs WSClient
+type notificationUnregisterNewMempoolTxs WSClient
+
+// NotificationType represents the type of a notification message.
+type NotificationType int
+
+// Constants for the type of a notification message.
+const (
+ // NTBlockConnected indicates the associated block was connected to the main chain.
+ NTRawBlockConnected NotificationType = iota
+ // NTBlockDisconnected indicates the associated block was disconnected from the main chain.
+ NTRawBlockDisconnected
+ NTNewTransaction
+ NTRequestStatus
+)
+
+// notificationTypeStrings is a map of notification types back to their constant
+// names for pretty printing.
+var notificationTypeStrings = map[NotificationType]string{
+ NTRawBlockConnected: "raw_blocks_connected",
+ NTRawBlockDisconnected: "raw_blocks_disconnected",
+ NTNewTransaction: "new_transaction",
+ NTRequestStatus: "request_status",
+}
+
+// String returns the NotificationType in human-readable form.
+func (n NotificationType) String() string {
+ if s, ok := notificationTypeStrings[n]; ok {
+ return s
+ }
+ return fmt.Sprintf("Unknown Notification Type (%d)", int(n))
+}
+
+type statusInfo struct {
+ BestHeight uint64
+ BestHash bc.Hash
+}
+
+// WSNotificationManager is a connection and notification manager used for
+// websockets. It allows websocket clients to register for notifications they
+// are interested in. When an event happens elsewhere in the code such as
+// transactions being added to the memory pool or block connects/disconnects,
+// the notification manager is provided with the relevant details needed to
+// figure out which websocket clients need to be notified based on what they
+// have registered for and notifies them accordingly. It is also used to keep
+// track of all connected websocket clients.
+type WSNotificationManager struct {
+ // queueNotification queues a notification for handling.
+ queueNotification chan interface{}
+
+ // notificationMsgs feeds notificationHandler with notifications
+ // and client (un)registeration requests from a queue as well as
+ // registeration and unregisteration requests from clients.
+ notificationMsgs chan interface{}
+
+ // Access channel for current number of connected clients.
+ numClients chan int
+
+ // Shutdown handling
+ wg sync.WaitGroup
+ quit chan struct{}
+ MaxNumWebsockets int
+ maxNumConcurrentReqs int
+ status statusInfo
+ chain *protocol.Chain
+}
+
+// NewWsNotificationManager returns a new notification manager ready for use. See WSNotificationManager for more details.
+func NewWsNotificationManager(maxNumWebsockets int, maxNumConcurrentReqs int, chain *protocol.Chain) *WSNotificationManager {
+ // init status
+ var status statusInfo
+ header := chain.BestBlockHeader()
+ status.BestHeight = header.Height
+ status.BestHash = header.Hash()
+
+ return &WSNotificationManager{
+ queueNotification: make(chan interface{}),
+ notificationMsgs: make(chan interface{}),
+ numClients: make(chan int),
+ quit: make(chan struct{}),
+ MaxNumWebsockets: maxNumWebsockets,
+ maxNumConcurrentReqs: maxNumConcurrentReqs,
+ status: status,
+ chain: chain,
+ }
+}
+
+// queueHandler manages a queue of empty interfaces, reading from in and
+// sending the oldest unsent to out. This handler stops when either of the
+// in or quit channels are closed, and closes out before returning, without
+// waiting to send any variables still remaining in the queue.
+func queueHandler(in <-chan interface{}, out chan<- interface{}, quit <-chan struct{}) {
+ var (
+ q []interface{}
+ next interface{}
+ dequeue chan<- interface{}
+ )
+
+ skipQueue := out
+
+out:
+ for {
+ select {
+ case n, ok := <-in:
+ if !ok {
+ // Sender closed input channel.
+ break out
+ }
+
+ // Either send to out immediately if skipQueue is
+ // non-nil (queue is empty) and reader is ready,
+ // or append to the queue and send later.
+ select {
+ case skipQueue <- n:
+
+ default:
+ q = append(q, n)
+ dequeue = out
+ skipQueue = nil
+ next = q[0]
+ }
+
+ case dequeue <- next:
+ copy(q, q[1:])
+ q[len(q)-1] = nil // avoid leak
+ q = q[:len(q)-1]
+ if len(q) == 0 {
+ dequeue = nil
+ skipQueue = out
+ } else {
+ next = q[0]
+ }
+
+ case <-quit:
+ break out
+ }
+ }
+ close(out)
+}
+
+func (m *WSNotificationManager) sendNotification(typ NotificationType, data interface{}) {
+ switch typ {
+ case NTRawBlockConnected:
+ block, ok := data.(*types.Block)
+ if !ok {
+ log.WithField("module", logModule).Error("Chain connected notification is not a block")
+ break
+ }
+
+ // Notify registered websocket clients of incoming block.
+ m.NotifyBlockConnected(block)
+
+ case NTRawBlockDisconnected:
+ block, ok := data.(*types.Block)
+ if !ok {
+ log.WithField("module", logModule).Error("Chain disconnected notification is not a block")
+ break
+ }
+
+ // Notify registered websocket clients.
+ m.NotifyBlockDisconnected(block)
+ }
+}
+
+// queueHandler maintains a queue of notifications and notification handler
+// control messages.
+func (m *WSNotificationManager) queueHandler() {
+ queueHandler(m.queueNotification, m.notificationMsgs, m.quit)
+ m.wg.Done()
+}
+
+// NotifyBlockConnected passes a block newly-connected to the best chain
+// to the notification manager for block and transaction notification
+// processing.
+func (m *WSNotificationManager) NotifyBlockConnected(block *types.Block) {
+ select {
+ case m.queueNotification <- (*notificationBlockConnected)(block):
+ case <-m.quit:
+ }
+}
+
+// NotifyBlockDisconnected passes a block disconnected from the best chain
+// to the notification manager for block notification processing.
+func (m *WSNotificationManager) NotifyBlockDisconnected(block *types.Block) {
+ select {
+ case m.queueNotification <- (*notificationBlockDisconnected)(block):
+ case <-m.quit:
+ }
+}
+
+// NotifyMempoolTx passes a transaction accepted by mempool to the
+// notification manager for transaction notification processing. If
+// isNew is true, the tx is is a new transaction, rather than one
+// added to the mempool during a reorg.
+func (m *WSNotificationManager) NotifyMempoolTx(tx *types.Tx) {
+ select {
+ case m.queueNotification <- (*notificationTxAcceptedByMempool)(tx):
+ case <-m.quit:
+ }
+}
+
+// notificationHandler reads notifications and control messages from the queue handler and processes one at a time.
+func (m *WSNotificationManager) notificationHandler() {
+ // clients is a map of all currently connected websocket clients.
+ clients := make(map[chan struct{}]*WSClient)
+ blockNotifications := make(map[chan struct{}]*WSClient)
+ txNotifications := make(map[chan struct{}]*WSClient)
+
+out:
+ for {
+ select {
+ case n, ok := <-m.notificationMsgs:
+ if !ok {
+ break out
+ }
+ switch n := n.(type) {
+ case *notificationBlockConnected:
+ block := (*types.Block)(n)
+ if len(blockNotifications) != 0 {
+ m.notifyBlockConnected(blockNotifications, block)
+ }
+
+ case *notificationBlockDisconnected:
+ block := (*types.Block)(n)
+ if len(blockNotifications) != 0 {
+ m.notifyBlockDisconnected(blockNotifications, block)
+ }
+
+ case *notificationTxAcceptedByMempool:
+ tx := (*types.Tx)(n)
+ if len(txNotifications) != 0 {
+ m.notifyForNewTx(txNotifications, tx)
+ }
+
+ case *notificationRegisterBlocks:
+ wsc := (*WSClient)(n)
+ blockNotifications[wsc.quit] = wsc
+
+ case *notificationUnregisterBlocks:
+ wsc := (*WSClient)(n)
+ delete(blockNotifications, wsc.quit)
+
+ case *notificationRegisterNewMempoolTxs:
+ wsc := (*WSClient)(n)
+ txNotifications[wsc.quit] = wsc
+
+ case *notificationUnregisterNewMempoolTxs:
+ wsc := (*WSClient)(n)
+ delete(txNotifications, wsc.quit)
+
+ case *notificationRegisterClient:
+ wsc := (*WSClient)(n)
+ clients[wsc.quit] = wsc
+
+ case *notificationUnregisterClient:
+ wsc := (*WSClient)(n)
+ delete(blockNotifications, wsc.quit)
+ delete(txNotifications, wsc.quit)
+ delete(clients, wsc.quit)
+
+ default:
+ log.Warnf("Unhandled notification type")
+ }
+
+ case m.numClients <- len(clients):
+
+ case <-m.quit:
+ break out
+ }
+ }
+
+ for _, c := range clients {
+ c.Disconnect()
+ }
+ m.wg.Done()
+}
+
+// NumClients returns the number of clients actively being served.
+func (m *WSNotificationManager) NumClients() (n int) {
+ select {
+ case n = <-m.numClients:
+ case <-m.quit:
+ }
+ return
+}
+
+// IsMaxConnect returns whether the maximum connection is exceeded
+func (m *WSNotificationManager) IsMaxConnect() bool {
+ return m.NumClients() >= m.MaxNumWebsockets
+}
+
+// RegisterBlockUpdates requests block update notifications to the passed websocket client.
+func (m *WSNotificationManager) RegisterBlockUpdates(wsc *WSClient) {
+ m.queueNotification <- (*notificationRegisterBlocks)(wsc)
+}
+
+// UnregisterBlockUpdates removes block update notifications for the passed websocket client.
+func (m *WSNotificationManager) UnregisterBlockUpdates(wsc *WSClient) {
+ m.queueNotification <- (*notificationUnregisterBlocks)(wsc)
+}
+
+// notifyBlockConnected notifies websocket clients that have registered for block updates when a block is connected to the main chain.
+func (*WSNotificationManager) notifyBlockConnected(clients map[chan struct{}]*WSClient, block *types.Block) {
+ resp := NewWSResponse(NTRawBlockConnected.String(), block, nil)
+ marshalledJSON, err := json.Marshal(resp)
+ if err != nil {
+ log.WithFields(log.Fields{"module": logModule, "error": err}).Error("Failed to marshal block connected notification")
+ return
+ }
+
+ for _, wsc := range clients {
+ wsc.QueueNotification(marshalledJSON)
+ }
+}
+
+// notifyBlockDisconnected notifies websocket clients that have registered for block updates
+// when a block is disconnected from the main chain (due to a reorganize).
+func (*WSNotificationManager) notifyBlockDisconnected(clients map[chan struct{}]*WSClient, block *types.Block) {
+ resp := NewWSResponse(NTRawBlockDisconnected.String(), block, nil)
+ marshalledJSON, err := json.Marshal(resp)
+ if err != nil {
+ log.WithField("error", err).Error("Failed to marshal block Disconnected notification")
+ return
+ }
+
+ for _, wsc := range clients {
+ wsc.QueueNotification(marshalledJSON)
+ }
+}
+
+// RegisterNewMempoolTxsUpdates requests notifications to the passed websocket
+// client when new transactions are added to the memory pool.
+func (m *WSNotificationManager) RegisterNewMempoolTxsUpdates(wsc *WSClient) {
+ m.queueNotification <- (*notificationRegisterNewMempoolTxs)(wsc)
+}
+
+// UnregisterNewMempoolTxsUpdates removes notifications to the passed websocket
+// client when new transaction are added to the memory pool.
+func (m *WSNotificationManager) UnregisterNewMempoolTxsUpdates(wsc *WSClient) {
+ m.queueNotification <- (*notificationUnregisterNewMempoolTxs)(wsc)
+}
+
+// notifyForNewTx notifies websocket clients that have registered for updates
+// when a new transaction is added to the memory pool.
+func (m *WSNotificationManager) notifyForNewTx(clients map[chan struct{}]*WSClient, tx *types.Tx) {
+ resp := NewWSResponse(NTNewTransaction.String(), tx, nil)
+ marshalledJSON, err := json.Marshal(resp)
+ if err != nil {
+ log.WithFields(log.Fields{"module": logModule, "error": err}).Error("Failed to marshal tx notification")
+ return
+ }
+
+ for _, wsc := range clients {
+ wsc.QueueNotification(marshalledJSON)
+ }
+}
+
+// AddClient adds the passed websocket client to the notification manager.
+func (m *WSNotificationManager) AddClient(wsc *WSClient) {
+ m.queueNotification <- (*notificationRegisterClient)(wsc)
+}
+
+// RemoveClient removes the passed websocket client and all notifications registered for it.
+func (m *WSNotificationManager) RemoveClient(wsc *WSClient) {
+ select {
+ case m.queueNotification <- (*notificationUnregisterClient)(wsc):
+ case <-m.quit:
+ }
+}
+
+func (m *WSNotificationManager) blockNotify() {
+out:
+ for {
+ select {
+ case <-m.quit:
+ break out
+
+ default:
+ }
+ for !m.chain.InMainChain(m.status.BestHash) {
+ block, err := m.chain.GetBlockByHash(&m.status.BestHash)
+ if err != nil {
+ log.WithFields(log.Fields{"module": logModule, "err": err}).Error("blockNotify GetBlockByHash")
+ return
+ }
+ m.updateStatus(block)
+ m.sendNotification(NTRawBlockDisconnected, block)
+ }
+
+ block, _ := m.chain.GetBlockByHeight(m.status.BestHeight + 1)
+ if block == nil {
+ m.blockWaiter()
+ continue
+ }
+
+ if m.status.BestHash != block.PreviousBlockHash {
+ log.WithFields(log.Fields{"module": logModule, "blockHeight": block.Height, "previousBlockHash": m.status.BestHash, "rcvBlockPrevHash": block.PreviousBlockHash}).Warning("The previousBlockHash of the received block is not the same as the hash of the previous block")
+ continue
+ }
+
+ m.updateStatus(block)
+ m.sendNotification(NTRawBlockConnected, block)
+ }
+ m.wg.Done()
+}
+
+func (m *WSNotificationManager) blockWaiter() {
+ select {
+ case <-m.chain.BlockWaiter(m.status.BestHeight + 1):
+ case <-m.quit:
+ }
+}
+
+func (m *WSNotificationManager) updateStatus(block *types.Block) {
+ m.status.BestHeight = block.Height
+ m.status.BestHash = block.Hash()
+}
+
+// Start starts the goroutines required for the manager to queue and process websocket client notifications.
+func (m *WSNotificationManager) Start() {
+ m.wg.Add(3)
+ go m.blockNotify()
+ go m.queueHandler()
+ go m.notificationHandler()
+}
+
+// WaitForShutdown blocks until all notification manager goroutines have finished.
+func (m *WSNotificationManager) WaitForShutdown() {
+ m.wg.Wait()
+}
+
+// Shutdown shuts down the manager, stopping the notification queue and notification handler goroutines.
+func (m *WSNotificationManager) Shutdown() {
+ close(m.quit)
+}
"github.com/bytom/mining/cpuminer"
"github.com/bytom/mining/miningpool"
"github.com/bytom/mining/tensority"
+ "github.com/bytom/net/websocket"
"github.com/bytom/netsync"
"github.com/bytom/protocol"
"github.com/bytom/protocol/bc"
syncManager *netsync.SyncManager
//bcReactor *bc.BlockchainReactor
- wallet *w.Wallet
- accessTokens *accesstoken.CredentialStore
- api *api.API
- chain *protocol.Chain
- txfeed *txfeed.Tracker
- cpuMiner *cpuminer.CPUMiner
- miningPool *miningpool.MiningPool
- miningEnable bool
+ wallet *w.Wallet
+ accessTokens *accesstoken.CredentialStore
+ notificationMgr *websocket.WSNotificationManager
+ api *api.API
+ chain *protocol.Chain
+ txfeed *txfeed.Tracker
+ cpuMiner *cpuminer.CPUMiner
+ miningPool *miningpool.MiningPool
+ miningEnable bool
newBlockCh chan *bc.Hash
}
syncManager, _ := netsync.NewSyncManager(config, chain, txPool, newBlockCh)
+ notificationMgr := websocket.NewWsNotificationManager(config.Websocket.MaxNumWebsockets, config.Websocket.MaxNumConcurrentReqs, chain)
+
// get transaction from txPool and send it to syncManager and wallet
- go newPoolTxListener(txPool, syncManager, wallet)
+ go newPoolTxListener(txPool, syncManager, wallet, notificationMgr)
// run the profile server
profileHost := config.ProfListenAddress
txfeed: txFeed,
miningEnable: config.Mining,
- newBlockCh: newBlockCh,
+ newBlockCh: newBlockCh,
+ notificationMgr: notificationMgr,
}
node.cpuMiner = cpuminer.NewCPUMiner(chain, accounts, txPool, newBlockCh)
}
// newPoolTxListener listener transaction from txPool, and send it to syncManager and wallet
-func newPoolTxListener(txPool *protocol.TxPool, syncManager *netsync.SyncManager, wallet *w.Wallet) {
+func newPoolTxListener(txPool *protocol.TxPool, syncManager *netsync.SyncManager, wallet *w.Wallet, notificationMgr *websocket.WSNotificationManager) {
txMsgCh := txPool.GetMsgCh()
syncManagerTxCh := syncManager.GetNewTxCh()
if wallet != nil {
wallet.AddUnconfirmedTx(msg.TxDesc)
}
+ notificationMgr.NotifyMempoolTx(msg.Tx)
case protocol.MsgRemoveTx:
if wallet != nil {
wallet.RemoveUnconfirmedTx(msg.TxDesc)
}
func (n *Node) initAndstartApiServer() {
- n.api = api.NewAPI(n.syncManager, n.wallet, n.txfeed, n.cpuMiner, n.miningPool, n.chain, n.config, n.accessTokens, n.newBlockCh)
+ n.api = api.NewAPI(n.syncManager, n.wallet, n.txfeed, n.cpuMiner, n.miningPool, n.chain, n.config, n.accessTokens, n.newBlockCh, n.notificationMgr)
listenAddr := env.String("LISTEN", n.config.ApiAddress)
env.Parse()
n.syncManager.Start()
}
n.initAndstartApiServer()
+ n.notificationMgr.Start()
if !n.config.Web.Closed {
_, port, err := net.SplitHostPort(n.config.ApiAddress)
if err != nil {
}
func (n *Node) OnStop() {
+ n.notificationMgr.Shutdown()
+ n.notificationMgr.WaitForShutdown()
n.BaseService.OnStop()
if n.miningEnable {
n.cpuMiner.Stop()