OSDN Git Service

Dev ws notification (#1442)
authorwz <mars@bytom.io>
Mon, 5 Nov 2018 03:45:22 +0000 (11:45 +0800)
committerPaladz <yzhu101@uottawa.ca>
Mon, 5 Nov 2018 03:45:22 +0000 (11:45 +0800)
* add websocket

* Implementing block notifications

* Code adjustment structure

* Remove the dependency on protocol

* Add transaction notice

* modify Message notification type

* modify func name

* Roll back file content

* add error for websocket

* add the package to vendor

* Trimming code

* Adjust the code

api/api.go
api/websocket.go [new file with mode: 0644]
cmd/bytomd/commands/run_node.go
config/config.go
net/websocket/wsclient.go [new file with mode: 0644]
net/websocket/wsjson.go [new file with mode: 0644]
net/websocket/wsnotificationmaneger.go [new file with mode: 0644]
node/node.go

index 8ba2fc3..7cb7959 100644 (file)
@@ -23,6 +23,7 @@ import (
        "github.com/bytom/net/http/gzip"
        "github.com/bytom/net/http/httpjson"
        "github.com/bytom/net/http/static"
+       "github.com/bytom/net/websocket"
        "github.com/bytom/netsync"
        "github.com/bytom/protocol"
        "github.com/bytom/protocol/bc"
@@ -104,17 +105,17 @@ func (wh *waitHandler) ServeHTTP(w http.ResponseWriter, req *http.Request) {
 
 // API is the scheduling center for server
 type API struct {
-       sync          *netsync.SyncManager
-       wallet        *wallet.Wallet
-       accessTokens  *accesstoken.CredentialStore
-       chain         *protocol.Chain
-       server        *http.Server
-       handler       http.Handler
-       txFeedTracker *txfeed.Tracker
-       cpuMiner      *cpuminer.CPUMiner
-       miningPool    *miningpool.MiningPool
-
-       newBlockCh chan *bc.Hash
+       sync            *netsync.SyncManager
+       wallet          *wallet.Wallet
+       accessTokens    *accesstoken.CredentialStore
+       chain           *protocol.Chain
+       server          *http.Server
+       handler         http.Handler
+       txFeedTracker   *txfeed.Tracker
+       cpuMiner        *cpuminer.CPUMiner
+       miningPool      *miningpool.MiningPool
+       notificationMgr *websocket.WSNotificationManager
+       newBlockCh      chan *bc.Hash
 }
 
 func (a *API) initServer(config *cfg.Config) {
@@ -168,7 +169,7 @@ func (a *API) StartServer(address string) {
 }
 
 // NewAPI create and initialize the API
-func NewAPI(sync *netsync.SyncManager, wallet *wallet.Wallet, txfeeds *txfeed.Tracker, cpuMiner *cpuminer.CPUMiner, miningPool *miningpool.MiningPool, chain *protocol.Chain, config *cfg.Config, token *accesstoken.CredentialStore, newBlockCh chan *bc.Hash) *API {
+func NewAPI(sync *netsync.SyncManager, wallet *wallet.Wallet, txfeeds *txfeed.Tracker, cpuMiner *cpuminer.CPUMiner, miningPool *miningpool.MiningPool, chain *protocol.Chain, config *cfg.Config, token *accesstoken.CredentialStore, newBlockCh chan *bc.Hash, notificationMgr *websocket.WSNotificationManager) *API {
        api := &API{
                sync:          sync,
                wallet:        wallet,
@@ -178,7 +179,8 @@ func NewAPI(sync *netsync.SyncManager, wallet *wallet.Wallet, txfeeds *txfeed.Tr
                cpuMiner:      cpuMiner,
                miningPool:    miningPool,
 
-               newBlockCh: newBlockCh,
+               newBlockCh:      newBlockCh,
+               notificationMgr: notificationMgr,
        }
        api.buildHandler()
        api.initServer(config)
@@ -296,6 +298,8 @@ func (a *API) buildHandler() {
 
        m.Handle("/get-merkle-proof", jsonHandler(a.getMerkleProof))
 
+       m.HandleFunc("/websocket-subscribe", a.websocketHandler)
+
        handler := latencyHandler(m, walletEnable)
        handler = webAssetsHandler(handler)
        handler = gzip.Handler{Handler: handler}
diff --git a/api/websocket.go b/api/websocket.go
new file mode 100644 (file)
index 0000000..8b3548d
--- /dev/null
@@ -0,0 +1,32 @@
+package api
+
+import (
+       "net/http"
+       "time"
+
+       log "github.com/sirupsen/logrus"
+
+       "github.com/bytom/net/websocket"
+)
+
+// timeZeroVal is simply the zero value for a time.Time and is used to avoid
+// creating multiple instances.
+var timeZeroVal time.Time
+
+// WebsocketHandler handles connections and requests from websocket client
+func (a *API) websocketHandler(w http.ResponseWriter, r *http.Request) {
+       log.WithField("remoteAddress", r.RemoteAddr).Info("New websocket client")
+
+       client, err := websocket.NewWebsocketClient(w, r, a.notificationMgr)
+       if err != nil {
+               log.WithField("error", err).Error("Failed to new websocket client")
+               http.Error(w, "400 Bad Request.", http.StatusBadRequest)
+               return
+       }
+
+       a.notificationMgr.AddClient(client)
+       client.Start()
+       client.WaitForShutdown()
+       a.notificationMgr.RemoveClient(client)
+       log.WithField("address", r.RemoteAddr).Infoln("Disconnected websocket client")
+}
index 8ea64f2..d73f6b1 100644 (file)
@@ -44,6 +44,10 @@ func init() {
        // log flags
        runNodeCmd.Flags().String("log_file", config.LogFile, "Log output file")
 
+       // websocket flags
+       runNodeCmd.Flags().Int("ws.max_num_websockets", config.Websocket.MaxNumWebsockets, "Max number of websocket connections")
+       runNodeCmd.Flags().Int("ws.max_num_concurrent_reqs", config.Websocket.MaxNumConcurrentReqs, "Max number of concurrent websocket requests that may be processed concurrently")
+
        RootCmd.AddCommand(runNodeCmd)
 }
 
index 41d483e..f5f478a 100644 (file)
@@ -18,11 +18,12 @@ type Config struct {
        // Top level options use an anonymous struct
        BaseConfig `mapstructure:",squash"`
        // Options for services
-       P2P    *P2PConfig     `mapstructure:"p2p"`
-       Wallet *WalletConfig  `mapstructure:"wallet"`
-       Auth   *RPCAuthConfig `mapstructure:"auth"`
-       Web    *WebConfig     `mapstructure:"web"`
-       Simd   *SimdConfig    `mapstructure:"simd"`
+       P2P       *P2PConfig       `mapstructure:"p2p"`
+       Wallet    *WalletConfig    `mapstructure:"wallet"`
+       Auth      *RPCAuthConfig   `mapstructure:"auth"`
+       Web       *WebConfig       `mapstructure:"web"`
+       Simd      *SimdConfig      `mapstructure:"simd"`
+       Websocket *WebsocketConfig `mapstructure:"ws"`
 }
 
 // Default configurable parameters.
@@ -34,6 +35,7 @@ func DefaultConfig() *Config {
                Auth:       DefaultRPCAuthConfig(),
                Web:        DefaultWebConfig(),
                Simd:       DefaultSimdConfig(),
+               Websocket:  DefaultWebsocketConfig(),
        }
 }
 
@@ -141,6 +143,11 @@ type SimdConfig struct {
        Enable bool `mapstructure:"enable"`
 }
 
+type WebsocketConfig struct {
+       MaxNumWebsockets     int `mapstructure:"max_num_websockets"`
+       MaxNumConcurrentReqs int `mapstructure:"max_num_concurrent_reqs"`
+}
+
 // Default configurable rpc's auth parameters.
 func DefaultRPCAuthConfig() *RPCAuthConfig {
        return &RPCAuthConfig{
@@ -171,6 +178,13 @@ func DefaultSimdConfig() *SimdConfig {
        }
 }
 
+func DefaultWebsocketConfig() *WebsocketConfig {
+       return &WebsocketConfig{
+               MaxNumWebsockets:     25,
+               MaxNumConcurrentReqs: 20,
+       }
+}
+
 //-----------------------------------------------------------------------------
 // Utils
 
diff --git a/net/websocket/wsclient.go b/net/websocket/wsclient.go
new file mode 100644 (file)
index 0000000..61fd749
--- /dev/null
@@ -0,0 +1,372 @@
+package websocket
+
+import (
+       "container/list"
+       "encoding/json"
+       "fmt"
+       "io"
+       "net/http"
+       "sync"
+       "time"
+
+       "github.com/gorilla/websocket"
+       log "github.com/sirupsen/logrus"
+
+       "github.com/bytom/errors"
+)
+
+// websocketSendBufferSize is the number of elements the send channel
+// can queue before blocking.  Note that this only applies to requests
+// handled directly in the websocket client input handler or the async
+// handler since notifications have their own queuing mechanism
+// independent of the send channel buffer.
+const (
+       logModule               = "websocket"
+       websocketSendBufferSize = 50
+)
+
+var (
+       // ErrWSParse means a request parsing error
+       ErrWSParse = errors.New("Websocket request parsing error")
+       // ErrWSInternal means service handling errors
+       ErrWSInternal = errors.New("Websocket Internal error")
+       // ErrWSClientQuit means the websocket client is disconnected
+       ErrWSClientQuit = errors.New("Websocket client quit")
+
+       // timeZeroVal is simply the zero value for a time.Time and is used to avoid creating multiple instances.
+       timeZeroVal time.Time
+)
+
+type semaphore chan struct{}
+
+func makeSemaphore(n int) semaphore {
+       return make(chan struct{}, n)
+}
+
+func (s semaphore) acquire() { s <- struct{}{} }
+func (s semaphore) release() { <-s }
+
+// wsTopicHandler describes a callback function used to handle a specific topic.
+type wsTopicHandler func(*WSClient)
+
+// wsHandlers maps websocket topic strings to appropriate websocket handler
+// functions.  This is set by init because help references wsHandlers and thus
+// causes a dependency loop.
+var wsHandlers = map[string]wsTopicHandler{
+       "notify_raw_blocks":            handleNotifyBlocks,
+       "notify_new_transactions":      handleNotifyNewTransactions,
+       "stop_notify_raw_blocks":       handleStopNotifyBlocks,
+       "stop_notify_new_transactions": handleStopNotifyNewTransactions,
+}
+
+// responseMessage houses a message to send to a connected websocket client as
+// well as a channel to reply on when the message is sent.
+type responseMessage struct {
+       msg      []byte
+       doneChan chan bool
+}
+
+// WSClient provides an abstraction for handling a websocket client.  The
+// overall data flow is split into 3 main goroutines, a possible 4th goroutine
+// for long-running operations (only started if request is made), and a
+// websocket manager which is used to allow things such as broadcasting
+// requested notifications to all connected websocket clients.   Inbound
+// messages are read via the inHandler goroutine and generally dispatched to
+// their own handler.  However, certain potentially long-running operations such
+// as rescans, are sent to the asyncHander goroutine and are limited to one at a
+// time.  There are two outbound message types - one for responding to client
+// requests and another for async notifications.  Responses to client requests
+// use SendMessage which employs a buffered channel thereby limiting the number
+// of outstanding requests that can be made.  Notifications are sent via
+// QueueNotification which implements a queue via notificationQueueHandler to
+// ensure sending notifications from other subsystems can't block.  Ultimately,
+// all messages are sent via the outHandler.
+type WSClient struct {
+       sync.Mutex
+       conn *websocket.Conn
+       // disconnected indicated whether or not the websocket client is disconnected.
+       disconnected bool
+       // addr is the remote address of the client.
+       addr              string
+       serviceRequestSem semaphore
+       ntfnChan          chan []byte
+       sendChan          chan responseMessage
+       quit              chan struct{}
+       wg                sync.WaitGroup
+       notificationMgr   *WSNotificationManager
+}
+
+// NewWebsocketClient means to create a new object to the connected websocket client
+func NewWebsocketClient(w http.ResponseWriter, r *http.Request, notificationMgr *WSNotificationManager) (*WSClient, error) {
+       // Limit max number of websocket clients.
+       if notificationMgr.IsMaxConnect() {
+               return nil, fmt.Errorf("numOfMaxWS: %d, disconnecting: %s", notificationMgr.MaxNumWebsockets, r.RemoteAddr)
+       }
+
+       // Attempt to upgrade the connection to a websocket connection using the default size for read/write buffers.
+       conn, err := websocket.Upgrade(w, r, nil, 0, 0)
+       if err != nil {
+               return nil, err
+       }
+
+       conn.SetReadDeadline(timeZeroVal)
+
+       client := &WSClient{
+               conn:              conn,
+               addr:              r.RemoteAddr,
+               serviceRequestSem: makeSemaphore(notificationMgr.maxNumConcurrentReqs),
+               ntfnChan:          make(chan []byte, 1), // nonblocking sync
+               sendChan:          make(chan responseMessage, websocketSendBufferSize),
+               quit:              make(chan struct{}),
+               notificationMgr:   notificationMgr,
+       }
+       return client, nil
+}
+
+// inHandler handles all incoming messages for the websocket connection.
+func (c *WSClient) inHandler() {
+out:
+       for {
+               // Break out of the loop once the quit channel has been closed.
+               // Use a non-blocking select here so we fall through otherwise.
+               select {
+               case <-c.quit:
+                       break out
+               default:
+               }
+
+               _, msg, err := c.conn.ReadMessage()
+               if err != nil {
+                       if err != io.EOF {
+                               log.WithFields(log.Fields{"module": logModule, "remoteAddress": c.addr, "error": err}).Error("Websocket receive error")
+                       }
+                       break out
+               }
+
+               var request WSRequest
+               if err = json.Unmarshal(msg, &request); err != nil {
+                       respError := errors.Wrap(err, ErrWSParse)
+                       resp := NewWSResponse(NTRequestStatus.String(), nil, respError)
+                       reply, err := json.Marshal(resp)
+                       if err != nil {
+                               log.WithFields(log.Fields{"module": logModule, "error": err}).Error("Failed to marshal parse failure reply")
+                               continue
+                       }
+
+                       c.SendMessage(reply, nil)
+                       continue
+               }
+
+               c.serviceRequestSem.acquire()
+               go func() {
+                       c.serviceRequest(request.Topic)
+                       c.serviceRequestSem.release()
+               }()
+       }
+
+       // Ensure the connection is closed.
+       c.Disconnect()
+       c.wg.Done()
+       log.WithFields(log.Fields{"module": logModule, "remoteAddress": c.addr}).Debug("Websocket client input handler done")
+}
+
+func (c *WSClient) serviceRequest(topic string) {
+       var respErr error
+
+       if wsHandler, ok := wsHandlers[topic]; ok {
+               wsHandler(c)
+       } else {
+               err := fmt.Errorf("There is not this topic: %s", topic)
+               respErr = errors.Wrap(err, ErrWSInternal)
+               log.WithFields(log.Fields{"module": logModule, "topic": topic}).Debug("There is not this topic")
+       }
+
+       resp := NewWSResponse(NTRequestStatus.String(), nil, respErr)
+       reply, err := json.Marshal(resp)
+       if err != nil {
+               log.WithFields(log.Fields{"module": logModule, "error": err}).Debug("Failed to marshal parse failure reply")
+               return
+       }
+
+       c.SendMessage(reply, nil)
+}
+
+// notificationQueueHandler handles the queuing of outgoing notifications for  the websocket client.
+func (c *WSClient) notificationQueueHandler() {
+       ntfnSentChan := make(chan bool, 1) // nonblocking sync
+
+       // pendingNtfns is used as a queue for notifications that are ready to
+       // be sent once there are no outstanding notifications currently being
+       // sent.
+       pendingNtfns := list.New()
+       waiting := false
+out:
+       for {
+               select {
+               // This channel is notified when a message is being queued to
+               // be sent across the network socket.  It will either send the
+               // message immediately if a send is not already in progress, or
+               // queue the message to be sent once the other pending messages
+               // are sent.
+               case msg := <-c.ntfnChan:
+                       if !waiting {
+                               c.SendMessage(msg, ntfnSentChan)
+                       } else {
+                               pendingNtfns.PushBack(msg)
+                       }
+                       waiting = true
+               // This channel is notified when a notification has been sent across the network socket.
+               case <-ntfnSentChan:
+                       // No longer waiting if there are no more messages in the pending messages queue.
+                       next := pendingNtfns.Front()
+                       if next == nil {
+                               waiting = false
+                               continue
+                       }
+
+                       // Notify the outHandler about the next item to asynchronously send.
+                       msg := pendingNtfns.Remove(next).([]byte)
+                       c.SendMessage(msg, ntfnSentChan)
+               case <-c.quit:
+                       break out
+               }
+       }
+
+       // Drain any wait channels before exiting so nothing is left waiting around to send.
+cleanup:
+       for {
+               select {
+               case <-c.ntfnChan:
+               case <-ntfnSentChan:
+               default:
+                       break cleanup
+               }
+       }
+       c.wg.Done()
+       log.WithFields(log.Fields{"module": logModule, "remoteAddress": c.addr}).Debug("Websocket client notification queue handler done")
+}
+
+// outHandler handles all outgoing messages for the websocket connection.
+func (c *WSClient) outHandler() {
+out:
+       for {
+               // Send any messages ready for send until the quit channel is closed.
+               select {
+               case r := <-c.sendChan:
+                       if err := c.conn.WriteMessage(websocket.TextMessage, r.msg); err != nil {
+                               log.WithFields(log.Fields{"module": logModule, "error": err}).Error("Failed to send message to wesocket client")
+                               c.Disconnect()
+                               break out
+                       }
+                       if r.doneChan != nil {
+                               r.doneChan <- true
+                       }
+               case <-c.quit:
+                       break out
+               }
+       }
+
+       // Drain any wait channels before exiting so nothing is left waiting around to send.
+cleanup:
+       for {
+               select {
+               case r := <-c.sendChan:
+                       if r.doneChan != nil {
+                               r.doneChan <- false
+                       }
+               default:
+                       break cleanup
+               }
+       }
+       c.wg.Done()
+       log.WithFields(log.Fields{"module": logModule, "remoteAddress": c.addr}).Debug("Websocket client output handler done")
+}
+
+// SendMessage sends the passed json to the websocket client.  It is backed
+// by a buffered channel, so it will not block until the send channel is full.
+// Note however that QueueNotification must be used for sending async
+// notifications instead of the this function.  This approach allows a limit to
+// the number of outstanding requests a client can make without preventing or
+// blocking on async notifications.
+func (c *WSClient) SendMessage(marshalledJSON []byte, doneChan chan bool) {
+       // Don't send the message if disconnected.
+       if c.Disconnected() {
+               if doneChan != nil {
+                       doneChan <- false
+               }
+               return
+       }
+
+       c.sendChan <- responseMessage{msg: marshalledJSON, doneChan: doneChan}
+}
+
+// QueueNotification queues the passed notification to be sent to the websocket client.
+func (c *WSClient) QueueNotification(marshalledJSON []byte) error {
+       // Don't queue the message if disconnected.
+       if c.Disconnected() {
+               return ErrWSClientQuit
+       }
+
+       c.ntfnChan <- marshalledJSON
+       return nil
+}
+
+// Disconnected returns whether or not the websocket client is disconnected.
+func (c *WSClient) Disconnected() bool {
+       c.Lock()
+       defer c.Unlock()
+
+       return c.disconnected
+}
+
+// Disconnect disconnects the websocket client.
+func (c *WSClient) Disconnect() {
+       c.Lock()
+       defer c.Unlock()
+
+       // Nothing to do if already disconnected.
+       if c.disconnected {
+               return
+       }
+
+       log.WithFields(log.Fields{"module": logModule, "remoteAddress": c.addr}).Info("Disconnecting websocket client")
+
+       close(c.quit)
+       c.conn.Close()
+       c.disconnected = true
+}
+
+// Start begins processing input and output messages.
+func (c *WSClient) Start() {
+       log.WithFields(log.Fields{"module": logModule, "remoteAddress": c.addr}).Info("Starting websocket client")
+
+       c.wg.Add(3)
+       go c.inHandler()
+       go c.notificationQueueHandler()
+       go c.outHandler()
+}
+
+// WaitForShutdown blocks until the websocket client goroutines are stopped and the connection is closed.
+func (c *WSClient) WaitForShutdown() {
+       c.wg.Wait()
+}
+
+// handleNotifyBlocks implements the notifyblocks topic extension for websocket connections.
+func handleNotifyBlocks(wsc *WSClient) {
+       wsc.notificationMgr.RegisterBlockUpdates(wsc)
+}
+
+// handleStopNotifyBlocks implements the stopnotifyblocks topic extension for websocket connections.
+func handleStopNotifyBlocks(wsc *WSClient) {
+       wsc.notificationMgr.UnregisterBlockUpdates(wsc)
+}
+
+// handleNotifyNewTransations implements the notifynewtransactions topic extension for websocket connections.
+func handleNotifyNewTransactions(wsc *WSClient) {
+       wsc.notificationMgr.RegisterNewMempoolTxsUpdates(wsc)
+}
+
+// handleStopNotifyNewTransations implements the stopnotifynewtransactions topic extension for websocket connections.
+func handleStopNotifyNewTransactions(wsc *WSClient) {
+       wsc.notificationMgr.UnregisterNewMempoolTxsUpdates(wsc)
+}
diff --git a/net/websocket/wsjson.go b/net/websocket/wsjson.go
new file mode 100644 (file)
index 0000000..5515be0
--- /dev/null
@@ -0,0 +1,34 @@
+package websocket
+
+// WSRequest means the data structure of the request
+type WSRequest struct {
+       Topic string `json:"topic"`
+}
+
+// NewWSRequest creates a request data object
+func NewWSRequest(topic string) *WSRequest {
+       return &WSRequest{
+               Topic: topic,
+       }
+}
+
+// WSResponse means the returned data structure
+type WSResponse struct {
+       NotificationType string      `json:"notification_type"`
+       Data             interface{} `json:"data"`
+       ErrorDetail      string      `json:"error_detail,omitempty"`
+}
+
+// NewWSResponse creates a return data object
+func NewWSResponse(notificationType string, data interface{}, err error) *WSResponse {
+       wsResp := &WSResponse{
+               NotificationType: notificationType,
+               Data:             data,
+       }
+
+       if err != nil {
+               wsResp.ErrorDetail = err.Error()
+       }
+
+       return wsResp
+}
diff --git a/net/websocket/wsnotificationmaneger.go b/net/websocket/wsnotificationmaneger.go
new file mode 100644 (file)
index 0000000..6030d9a
--- /dev/null
@@ -0,0 +1,459 @@
+package websocket
+
+import (
+       "encoding/json"
+       "fmt"
+       "sync"
+
+       log "github.com/sirupsen/logrus"
+
+       "github.com/bytom/protocol"
+       "github.com/bytom/protocol/bc"
+       "github.com/bytom/protocol/bc/types"
+)
+
+// Notification types
+type notificationBlockConnected types.Block
+type notificationBlockDisconnected types.Block
+type notificationTxAcceptedByMempool types.Tx
+
+// Notification control requests
+type notificationRegisterClient WSClient
+type notificationUnregisterClient WSClient
+type notificationRegisterBlocks WSClient
+type notificationUnregisterBlocks WSClient
+type notificationRegisterNewMempoolTxs WSClient
+type notificationUnregisterNewMempoolTxs WSClient
+
+// NotificationType represents the type of a notification message.
+type NotificationType int
+
+// Constants for the type of a notification message.
+const (
+       // NTBlockConnected indicates the associated block was connected to the main chain.
+       NTRawBlockConnected NotificationType = iota
+       // NTBlockDisconnected indicates the associated block was disconnected  from the main chain.
+       NTRawBlockDisconnected
+       NTNewTransaction
+       NTRequestStatus
+)
+
+// notificationTypeStrings is a map of notification types back to their constant
+// names for pretty printing.
+var notificationTypeStrings = map[NotificationType]string{
+       NTRawBlockConnected:    "raw_blocks_connected",
+       NTRawBlockDisconnected: "raw_blocks_disconnected",
+       NTNewTransaction:       "new_transaction",
+       NTRequestStatus:        "request_status",
+}
+
+// String returns the NotificationType in human-readable form.
+func (n NotificationType) String() string {
+       if s, ok := notificationTypeStrings[n]; ok {
+               return s
+       }
+       return fmt.Sprintf("Unknown Notification Type (%d)", int(n))
+}
+
+type statusInfo struct {
+       BestHeight uint64
+       BestHash   bc.Hash
+}
+
+// WSNotificationManager is a connection and notification manager used for
+// websockets.  It allows websocket clients to register for notifications they
+// are interested in.  When an event happens elsewhere in the code such as
+// transactions being added to the memory pool or block connects/disconnects,
+// the notification manager is provided with the relevant details needed to
+// figure out which websocket clients need to be notified based on what they
+// have registered for and notifies them accordingly.  It is also used to keep
+// track of all connected websocket clients.
+type WSNotificationManager struct {
+       // queueNotification queues a notification for handling.
+       queueNotification chan interface{}
+
+       // notificationMsgs feeds notificationHandler with notifications
+       // and client (un)registeration requests from a queue as well as
+       // registeration and unregisteration requests from clients.
+       notificationMsgs chan interface{}
+
+       // Access channel for current number of connected clients.
+       numClients chan int
+
+       // Shutdown handling
+       wg                   sync.WaitGroup
+       quit                 chan struct{}
+       MaxNumWebsockets     int
+       maxNumConcurrentReqs int
+       status               statusInfo
+       chain                *protocol.Chain
+}
+
+// NewWsNotificationManager returns a new notification manager ready for use. See WSNotificationManager for more details.
+func NewWsNotificationManager(maxNumWebsockets int, maxNumConcurrentReqs int, chain *protocol.Chain) *WSNotificationManager {
+       // init status
+       var status statusInfo
+       header := chain.BestBlockHeader()
+       status.BestHeight = header.Height
+       status.BestHash = header.Hash()
+
+       return &WSNotificationManager{
+               queueNotification:    make(chan interface{}),
+               notificationMsgs:     make(chan interface{}),
+               numClients:           make(chan int),
+               quit:                 make(chan struct{}),
+               MaxNumWebsockets:     maxNumWebsockets,
+               maxNumConcurrentReqs: maxNumConcurrentReqs,
+               status:               status,
+               chain:                chain,
+       }
+}
+
+// queueHandler manages a queue of empty interfaces, reading from in and
+// sending the oldest unsent to out.  This handler stops when either of the
+// in or quit channels are closed, and closes out before returning, without
+// waiting to send any variables still remaining in the queue.
+func queueHandler(in <-chan interface{}, out chan<- interface{}, quit <-chan struct{}) {
+       var (
+               q       []interface{}
+               next    interface{}
+               dequeue chan<- interface{}
+       )
+
+       skipQueue := out
+
+out:
+       for {
+               select {
+               case n, ok := <-in:
+                       if !ok {
+                               // Sender closed input channel.
+                               break out
+                       }
+
+                       // Either send to out immediately if skipQueue is
+                       // non-nil (queue is empty) and reader is ready,
+                       // or append to the queue and send later.
+                       select {
+                       case skipQueue <- n:
+
+                       default:
+                               q = append(q, n)
+                               dequeue = out
+                               skipQueue = nil
+                               next = q[0]
+                       }
+
+               case dequeue <- next:
+                       copy(q, q[1:])
+                       q[len(q)-1] = nil // avoid leak
+                       q = q[:len(q)-1]
+                       if len(q) == 0 {
+                               dequeue = nil
+                               skipQueue = out
+                       } else {
+                               next = q[0]
+                       }
+
+               case <-quit:
+                       break out
+               }
+       }
+       close(out)
+}
+
+func (m *WSNotificationManager) sendNotification(typ NotificationType, data interface{}) {
+       switch typ {
+       case NTRawBlockConnected:
+               block, ok := data.(*types.Block)
+               if !ok {
+                       log.WithField("module", logModule).Error("Chain connected notification is not a block")
+                       break
+               }
+
+               // Notify registered websocket clients of incoming block.
+               m.NotifyBlockConnected(block)
+
+       case NTRawBlockDisconnected:
+               block, ok := data.(*types.Block)
+               if !ok {
+                       log.WithField("module", logModule).Error("Chain disconnected notification is not a block")
+                       break
+               }
+
+               // Notify registered websocket clients.
+               m.NotifyBlockDisconnected(block)
+       }
+}
+
+// queueHandler maintains a queue of notifications and notification handler
+// control messages.
+func (m *WSNotificationManager) queueHandler() {
+       queueHandler(m.queueNotification, m.notificationMsgs, m.quit)
+       m.wg.Done()
+}
+
+// NotifyBlockConnected passes a block newly-connected to the best chain
+// to the notification manager for block and transaction notification
+// processing.
+func (m *WSNotificationManager) NotifyBlockConnected(block *types.Block) {
+       select {
+       case m.queueNotification <- (*notificationBlockConnected)(block):
+       case <-m.quit:
+       }
+}
+
+// NotifyBlockDisconnected passes a block disconnected from the best chain
+// to the notification manager for block notification processing.
+func (m *WSNotificationManager) NotifyBlockDisconnected(block *types.Block) {
+       select {
+       case m.queueNotification <- (*notificationBlockDisconnected)(block):
+       case <-m.quit:
+       }
+}
+
+// NotifyMempoolTx passes a transaction accepted by mempool to the
+// notification manager for transaction notification processing.  If
+// isNew is true, the tx is is a new transaction, rather than one
+// added to the mempool during a reorg.
+func (m *WSNotificationManager) NotifyMempoolTx(tx *types.Tx) {
+       select {
+       case m.queueNotification <- (*notificationTxAcceptedByMempool)(tx):
+       case <-m.quit:
+       }
+}
+
+// notificationHandler reads notifications and control messages from the queue handler and processes one at a time.
+func (m *WSNotificationManager) notificationHandler() {
+       // clients is a map of all currently connected websocket clients.
+       clients := make(map[chan struct{}]*WSClient)
+       blockNotifications := make(map[chan struct{}]*WSClient)
+       txNotifications := make(map[chan struct{}]*WSClient)
+
+out:
+       for {
+               select {
+               case n, ok := <-m.notificationMsgs:
+                       if !ok {
+                               break out
+                       }
+                       switch n := n.(type) {
+                       case *notificationBlockConnected:
+                               block := (*types.Block)(n)
+                               if len(blockNotifications) != 0 {
+                                       m.notifyBlockConnected(blockNotifications, block)
+                               }
+
+                       case *notificationBlockDisconnected:
+                               block := (*types.Block)(n)
+                               if len(blockNotifications) != 0 {
+                                       m.notifyBlockDisconnected(blockNotifications, block)
+                               }
+
+                       case *notificationTxAcceptedByMempool:
+                               tx := (*types.Tx)(n)
+                               if len(txNotifications) != 0 {
+                                       m.notifyForNewTx(txNotifications, tx)
+                               }
+
+                       case *notificationRegisterBlocks:
+                               wsc := (*WSClient)(n)
+                               blockNotifications[wsc.quit] = wsc
+
+                       case *notificationUnregisterBlocks:
+                               wsc := (*WSClient)(n)
+                               delete(blockNotifications, wsc.quit)
+
+                       case *notificationRegisterNewMempoolTxs:
+                               wsc := (*WSClient)(n)
+                               txNotifications[wsc.quit] = wsc
+
+                       case *notificationUnregisterNewMempoolTxs:
+                               wsc := (*WSClient)(n)
+                               delete(txNotifications, wsc.quit)
+
+                       case *notificationRegisterClient:
+                               wsc := (*WSClient)(n)
+                               clients[wsc.quit] = wsc
+
+                       case *notificationUnregisterClient:
+                               wsc := (*WSClient)(n)
+                               delete(blockNotifications, wsc.quit)
+                               delete(txNotifications, wsc.quit)
+                               delete(clients, wsc.quit)
+
+                       default:
+                               log.Warnf("Unhandled notification type")
+                       }
+
+               case m.numClients <- len(clients):
+
+               case <-m.quit:
+                       break out
+               }
+       }
+
+       for _, c := range clients {
+               c.Disconnect()
+       }
+       m.wg.Done()
+}
+
+// NumClients returns the number of clients actively being served.
+func (m *WSNotificationManager) NumClients() (n int) {
+       select {
+       case n = <-m.numClients:
+       case <-m.quit:
+       }
+       return
+}
+
+// IsMaxConnect returns whether the maximum connection is exceeded
+func (m *WSNotificationManager) IsMaxConnect() bool {
+       return m.NumClients() >= m.MaxNumWebsockets
+}
+
+// RegisterBlockUpdates requests block update notifications to the passed websocket client.
+func (m *WSNotificationManager) RegisterBlockUpdates(wsc *WSClient) {
+       m.queueNotification <- (*notificationRegisterBlocks)(wsc)
+}
+
+// UnregisterBlockUpdates removes block update notifications for the passed websocket client.
+func (m *WSNotificationManager) UnregisterBlockUpdates(wsc *WSClient) {
+       m.queueNotification <- (*notificationUnregisterBlocks)(wsc)
+}
+
+// notifyBlockConnected notifies websocket clients that have registered for block updates when a block is connected to the main chain.
+func (*WSNotificationManager) notifyBlockConnected(clients map[chan struct{}]*WSClient, block *types.Block) {
+       resp := NewWSResponse(NTRawBlockConnected.String(), block, nil)
+       marshalledJSON, err := json.Marshal(resp)
+       if err != nil {
+               log.WithFields(log.Fields{"module": logModule, "error": err}).Error("Failed to marshal block connected notification")
+               return
+       }
+
+       for _, wsc := range clients {
+               wsc.QueueNotification(marshalledJSON)
+       }
+}
+
+// notifyBlockDisconnected notifies websocket clients that have registered for block updates
+// when a block is disconnected from the main chain (due to a reorganize).
+func (*WSNotificationManager) notifyBlockDisconnected(clients map[chan struct{}]*WSClient, block *types.Block) {
+       resp := NewWSResponse(NTRawBlockDisconnected.String(), block, nil)
+       marshalledJSON, err := json.Marshal(resp)
+       if err != nil {
+               log.WithField("error", err).Error("Failed to marshal block Disconnected notification")
+               return
+       }
+
+       for _, wsc := range clients {
+               wsc.QueueNotification(marshalledJSON)
+       }
+}
+
+// RegisterNewMempoolTxsUpdates requests notifications to the passed websocket
+// client when new transactions are added to the memory pool.
+func (m *WSNotificationManager) RegisterNewMempoolTxsUpdates(wsc *WSClient) {
+       m.queueNotification <- (*notificationRegisterNewMempoolTxs)(wsc)
+}
+
+// UnregisterNewMempoolTxsUpdates removes notifications to the passed websocket
+// client when new transaction are added to the memory pool.
+func (m *WSNotificationManager) UnregisterNewMempoolTxsUpdates(wsc *WSClient) {
+       m.queueNotification <- (*notificationUnregisterNewMempoolTxs)(wsc)
+}
+
+// notifyForNewTx notifies websocket clients that have registered for updates
+// when a new transaction is added to the memory pool.
+func (m *WSNotificationManager) notifyForNewTx(clients map[chan struct{}]*WSClient, tx *types.Tx) {
+       resp := NewWSResponse(NTNewTransaction.String(), tx, nil)
+       marshalledJSON, err := json.Marshal(resp)
+       if err != nil {
+               log.WithFields(log.Fields{"module": logModule, "error": err}).Error("Failed to marshal tx notification")
+               return
+       }
+
+       for _, wsc := range clients {
+               wsc.QueueNotification(marshalledJSON)
+       }
+}
+
+// AddClient adds the passed websocket client to the notification manager.
+func (m *WSNotificationManager) AddClient(wsc *WSClient) {
+       m.queueNotification <- (*notificationRegisterClient)(wsc)
+}
+
+// RemoveClient removes the passed websocket client and all notifications registered for it.
+func (m *WSNotificationManager) RemoveClient(wsc *WSClient) {
+       select {
+       case m.queueNotification <- (*notificationUnregisterClient)(wsc):
+       case <-m.quit:
+       }
+}
+
+func (m *WSNotificationManager) blockNotify() {
+out:
+       for {
+               select {
+               case <-m.quit:
+                       break out
+
+               default:
+               }
+               for !m.chain.InMainChain(m.status.BestHash) {
+                       block, err := m.chain.GetBlockByHash(&m.status.BestHash)
+                       if err != nil {
+                               log.WithFields(log.Fields{"module": logModule, "err": err}).Error("blockNotify GetBlockByHash")
+                               return
+                       }
+                       m.updateStatus(block)
+                       m.sendNotification(NTRawBlockDisconnected, block)
+               }
+
+               block, _ := m.chain.GetBlockByHeight(m.status.BestHeight + 1)
+               if block == nil {
+                       m.blockWaiter()
+                       continue
+               }
+
+               if m.status.BestHash != block.PreviousBlockHash {
+                       log.WithFields(log.Fields{"module": logModule, "blockHeight": block.Height, "previousBlockHash": m.status.BestHash, "rcvBlockPrevHash": block.PreviousBlockHash}).Warning("The previousBlockHash of the received block is not the same as the hash of the previous block")
+                       continue
+               }
+
+               m.updateStatus(block)
+               m.sendNotification(NTRawBlockConnected, block)
+       }
+       m.wg.Done()
+}
+
+func (m *WSNotificationManager) blockWaiter() {
+       select {
+       case <-m.chain.BlockWaiter(m.status.BestHeight + 1):
+       case <-m.quit:
+       }
+}
+
+func (m *WSNotificationManager) updateStatus(block *types.Block) {
+       m.status.BestHeight = block.Height
+       m.status.BestHash = block.Hash()
+}
+
+// Start starts the goroutines required for the manager to queue and process websocket client notifications.
+func (m *WSNotificationManager) Start() {
+       m.wg.Add(3)
+       go m.blockNotify()
+       go m.queueHandler()
+       go m.notificationHandler()
+}
+
+// WaitForShutdown blocks until all notification manager goroutines have finished.
+func (m *WSNotificationManager) WaitForShutdown() {
+       m.wg.Wait()
+}
+
+// Shutdown shuts down the manager, stopping the notification queue and notification handler goroutines.
+func (m *WSNotificationManager) Shutdown() {
+       close(m.quit)
+}
index 8ea25c2..22f93dd 100644 (file)
@@ -28,6 +28,7 @@ import (
        "github.com/bytom/mining/cpuminer"
        "github.com/bytom/mining/miningpool"
        "github.com/bytom/mining/tensority"
+       "github.com/bytom/net/websocket"
        "github.com/bytom/netsync"
        "github.com/bytom/protocol"
        "github.com/bytom/protocol/bc"
@@ -48,14 +49,15 @@ type Node struct {
        syncManager *netsync.SyncManager
 
        //bcReactor    *bc.BlockchainReactor
-       wallet       *w.Wallet
-       accessTokens *accesstoken.CredentialStore
-       api          *api.API
-       chain        *protocol.Chain
-       txfeed       *txfeed.Tracker
-       cpuMiner     *cpuminer.CPUMiner
-       miningPool   *miningpool.MiningPool
-       miningEnable bool
+       wallet          *w.Wallet
+       accessTokens    *accesstoken.CredentialStore
+       notificationMgr *websocket.WSNotificationManager
+       api             *api.API
+       chain           *protocol.Chain
+       txfeed          *txfeed.Tracker
+       cpuMiner        *cpuminer.CPUMiner
+       miningPool      *miningpool.MiningPool
+       miningEnable    bool
 
        newBlockCh chan *bc.Hash
 }
@@ -121,8 +123,10 @@ func NewNode(config *cfg.Config) *Node {
 
        syncManager, _ := netsync.NewSyncManager(config, chain, txPool, newBlockCh)
 
+       notificationMgr := websocket.NewWsNotificationManager(config.Websocket.MaxNumWebsockets, config.Websocket.MaxNumConcurrentReqs, chain)
+
        // get transaction from txPool and send it to syncManager and wallet
-       go newPoolTxListener(txPool, syncManager, wallet)
+       go newPoolTxListener(txPool, syncManager, wallet, notificationMgr)
 
        // run the profile server
        profileHost := config.ProfListenAddress
@@ -145,7 +149,8 @@ func NewNode(config *cfg.Config) *Node {
                txfeed:       txFeed,
                miningEnable: config.Mining,
 
-               newBlockCh: newBlockCh,
+               newBlockCh:      newBlockCh,
+               notificationMgr: notificationMgr,
        }
 
        node.cpuMiner = cpuminer.NewCPUMiner(chain, accounts, txPool, newBlockCh)
@@ -161,7 +166,7 @@ func NewNode(config *cfg.Config) *Node {
 }
 
 // newPoolTxListener listener transaction from txPool, and send it to syncManager and wallet
-func newPoolTxListener(txPool *protocol.TxPool, syncManager *netsync.SyncManager, wallet *w.Wallet) {
+func newPoolTxListener(txPool *protocol.TxPool, syncManager *netsync.SyncManager, wallet *w.Wallet, notificationMgr *websocket.WSNotificationManager) {
        txMsgCh := txPool.GetMsgCh()
        syncManagerTxCh := syncManager.GetNewTxCh()
 
@@ -173,6 +178,7 @@ func newPoolTxListener(txPool *protocol.TxPool, syncManager *netsync.SyncManager
                        if wallet != nil {
                                wallet.AddUnconfirmedTx(msg.TxDesc)
                        }
+                       notificationMgr.NotifyMempoolTx(msg.Tx)
                case protocol.MsgRemoveTx:
                        if wallet != nil {
                                wallet.RemoveUnconfirmedTx(msg.TxDesc)
@@ -229,7 +235,7 @@ func launchWebBrowser(port string) {
 }
 
 func (n *Node) initAndstartApiServer() {
-       n.api = api.NewAPI(n.syncManager, n.wallet, n.txfeed, n.cpuMiner, n.miningPool, n.chain, n.config, n.accessTokens, n.newBlockCh)
+       n.api = api.NewAPI(n.syncManager, n.wallet, n.txfeed, n.cpuMiner, n.miningPool, n.chain, n.config, n.accessTokens, n.newBlockCh, n.notificationMgr)
 
        listenAddr := env.String("LISTEN", n.config.ApiAddress)
        env.Parse()
@@ -249,6 +255,7 @@ func (n *Node) OnStart() error {
                n.syncManager.Start()
        }
        n.initAndstartApiServer()
+       n.notificationMgr.Start()
        if !n.config.Web.Closed {
                _, port, err := net.SplitHostPort(n.config.ApiAddress)
                if err != nil {
@@ -261,6 +268,8 @@ func (n *Node) OnStart() error {
 }
 
 func (n *Node) OnStop() {
+       n.notificationMgr.Shutdown()
+       n.notificationMgr.WaitForShutdown()
        n.BaseService.OnStop()
        if n.miningEnable {
                n.cpuMiner.Stop()