OSDN Git Service

rename (#465)
[bytom/vapor.git] / net / websocket / wsclient.go
1 package websocket
2
3 import (
4         "container/list"
5         "encoding/json"
6         "fmt"
7         "io"
8         "net/http"
9         "sync"
10         "time"
11
12         "github.com/gorilla/websocket"
13         log "github.com/sirupsen/logrus"
14
15         "github.com/bytom/vapor/errors"
16 )
17
18 // websocketSendBufferSize is the number of elements the send channel
19 // can queue before blocking.  Note that this only applies to requests
20 // handled directly in the websocket client input handler or the async
21 // handler since notifications have their own queuing mechanism
22 // independent of the send channel buffer.
23 const (
24         logModule               = "websocket"
25         websocketSendBufferSize = 50
26 )
27
28 var (
29         // ErrWSParse means a request parsing error
30         ErrWSParse = errors.New("Websocket request parsing error")
31         // ErrWSInternal means service handling errors
32         ErrWSInternal = errors.New("Websocket Internal error")
33         // ErrWSClientQuit means the websocket client is disconnected
34         ErrWSClientQuit = errors.New("Websocket client quit")
35
36         // timeZeroVal is simply the zero value for a time.Time and is used to avoid creating multiple instances.
37         timeZeroVal time.Time
38 )
39
40 type semaphore chan struct{}
41
42 func makeSemaphore(n int) semaphore {
43         return make(chan struct{}, n)
44 }
45
46 func (s semaphore) acquire() { s <- struct{}{} }
47 func (s semaphore) release() { <-s }
48
49 // wsTopicHandler describes a callback function used to handle a specific topic.
50 type wsTopicHandler func(*WSClient)
51
52 // wsHandlers maps websocket topic strings to appropriate websocket handler
53 // functions.  This is set by init because help references wsHandlers and thus
54 // causes a dependency loop.
55 var wsHandlers = map[string]wsTopicHandler{
56         "notify_raw_blocks":            handleNotifyBlocks,
57         "notify_new_transactions":      handleNotifyNewTransactions,
58         "stop_notify_raw_blocks":       handleStopNotifyBlocks,
59         "stop_notify_new_transactions": handleStopNotifyNewTransactions,
60 }
61
62 // responseMessage houses a message to send to a connected websocket client as
63 // well as a channel to reply on when the message is sent.
64 type responseMessage struct {
65         msg      []byte
66         doneChan chan bool
67 }
68
69 // WSClient provides an abstraction for handling a websocket client.  The
70 // overall data flow is split into 3 main goroutines, a possible 4th goroutine
71 // for long-running operations (only started if request is made), and a
72 // websocket manager which is used to allow things such as broadcasting
73 // requested notifications to all connected websocket clients.   Inbound
74 // messages are read via the inHandler goroutine and generally dispatched to
75 // their own handler.  However, certain potentially long-running operations such
76 // as rescans, are sent to the asyncHander goroutine and are limited to one at a
77 // time.  There are two outbound message types - one for responding to client
78 // requests and another for async notifications.  Responses to client requests
79 // use SendMessage which employs a buffered channel thereby limiting the number
80 // of outstanding requests that can be made.  Notifications are sent via
81 // QueueNotification which implements a queue via notificationQueueHandler to
82 // ensure sending notifications from other subsystems can't block.  Ultimately,
83 // all messages are sent via the outHandler.
84 type WSClient struct {
85         sync.Mutex
86         conn *websocket.Conn
87         // disconnected indicated whether or not the websocket client is disconnected.
88         disconnected bool
89         // addr is the remote address of the client.
90         addr              string
91         serviceRequestSem semaphore
92         ntfnChan          chan []byte
93         sendChan          chan responseMessage
94         quit              chan struct{}
95         wg                sync.WaitGroup
96         notificationMgr   *WSNotificationManager
97 }
98
99 // NewWebsocketClient means to create a new object to the connected websocket client
100 func NewWebsocketClient(w http.ResponseWriter, r *http.Request, notificationMgr *WSNotificationManager) (*WSClient, error) {
101         // Limit max number of websocket clients.
102         if notificationMgr.IsMaxConnect() {
103                 return nil, fmt.Errorf("numOfMaxWS: %d, disconnecting: %s", notificationMgr.MaxNumWebsockets, r.RemoteAddr)
104         }
105
106         // Attempt to upgrade the connection to a websocket connection using the default size for read/write buffers.
107         conn, err := websocket.Upgrade(w, r, nil, 0, 0)
108         if err != nil {
109                 return nil, err
110         }
111
112         conn.SetReadDeadline(timeZeroVal)
113
114         client := &WSClient{
115                 conn:              conn,
116                 addr:              r.RemoteAddr,
117                 serviceRequestSem: makeSemaphore(notificationMgr.maxNumConcurrentReqs),
118                 ntfnChan:          make(chan []byte, 1), // nonblocking sync
119                 sendChan:          make(chan responseMessage, websocketSendBufferSize),
120                 quit:              make(chan struct{}),
121                 notificationMgr:   notificationMgr,
122         }
123         return client, nil
124 }
125
126 // inHandler handles all incoming messages for the websocket connection.
127 func (c *WSClient) inHandler() {
128 out:
129         for {
130                 // Break out of the loop once the quit channel has been closed.
131                 // Use a non-blocking select here so we fall through otherwise.
132                 select {
133                 case <-c.quit:
134                         break out
135                 default:
136                 }
137
138                 _, msg, err := c.conn.ReadMessage()
139                 if err != nil {
140                         if err != io.EOF {
141                                 log.WithFields(log.Fields{"module": logModule, "remoteAddress": c.addr, "error": err}).Error("Websocket receive error")
142                         }
143                         break out
144                 }
145
146                 var request WSRequest
147                 if err = json.Unmarshal(msg, &request); err != nil {
148                         respError := errors.Wrap(err, ErrWSParse)
149                         resp := NewWSResponse(NTRequestStatus.String(), nil, respError)
150                         reply, err := json.Marshal(resp)
151                         if err != nil {
152                                 log.WithFields(log.Fields{"module": logModule, "error": err}).Error("Failed to marshal parse failure reply")
153                                 continue
154                         }
155
156                         c.SendMessage(reply, nil)
157                         continue
158                 }
159
160                 c.serviceRequestSem.acquire()
161                 go func() {
162                         c.serviceRequest(request.Topic)
163                         c.serviceRequestSem.release()
164                 }()
165         }
166
167         // Ensure the connection is closed.
168         c.Disconnect()
169         c.wg.Done()
170         log.WithFields(log.Fields{"module": logModule, "remoteAddress": c.addr}).Debug("Websocket client input handler done")
171 }
172
173 func (c *WSClient) serviceRequest(topic string) {
174         var respErr error
175
176         if wsHandler, ok := wsHandlers[topic]; ok {
177                 wsHandler(c)
178         } else {
179                 err := fmt.Errorf("There is not this topic: %s", topic)
180                 respErr = errors.Wrap(err, ErrWSInternal)
181                 log.WithFields(log.Fields{"module": logModule, "topic": topic}).Debug("There is not this topic")
182         }
183
184         resp := NewWSResponse(NTRequestStatus.String(), nil, respErr)
185         reply, err := json.Marshal(resp)
186         if err != nil {
187                 log.WithFields(log.Fields{"module": logModule, "error": err}).Debug("Failed to marshal parse failure reply")
188                 return
189         }
190
191         c.SendMessage(reply, nil)
192 }
193
194 // notificationQueueHandler handles the queuing of outgoing notifications for  the websocket client.
195 func (c *WSClient) notificationQueueHandler() {
196         ntfnSentChan := make(chan bool, 1) // nonblocking sync
197
198         // pendingNtfns is used as a queue for notifications that are ready to
199         // be sent once there are no outstanding notifications currently being
200         // sent.
201         pendingNtfns := list.New()
202         waiting := false
203 out:
204         for {
205                 select {
206                 // This channel is notified when a message is being queued to
207                 // be sent across the network socket.  It will either send the
208                 // message immediately if a send is not already in progress, or
209                 // queue the message to be sent once the other pending messages
210                 // are sent.
211                 case msg := <-c.ntfnChan:
212                         if !waiting {
213                                 c.SendMessage(msg, ntfnSentChan)
214                         } else {
215                                 pendingNtfns.PushBack(msg)
216                         }
217                         waiting = true
218                 // This channel is notified when a notification has been sent across the network socket.
219                 case <-ntfnSentChan:
220                         // No longer waiting if there are no more messages in the pending messages queue.
221                         next := pendingNtfns.Front()
222                         if next == nil {
223                                 waiting = false
224                                 continue
225                         }
226
227                         // Notify the outHandler about the next item to asynchronously send.
228                         msg := pendingNtfns.Remove(next).([]byte)
229                         c.SendMessage(msg, ntfnSentChan)
230                 case <-c.quit:
231                         break out
232                 }
233         }
234
235         // Drain any wait channels before exiting so nothing is left waiting around to send.
236 cleanup:
237         for {
238                 select {
239                 case <-c.ntfnChan:
240                 case <-ntfnSentChan:
241                 default:
242                         break cleanup
243                 }
244         }
245         c.wg.Done()
246         log.WithFields(log.Fields{"module": logModule, "remoteAddress": c.addr}).Debug("Websocket client notification queue handler done")
247 }
248
249 // outHandler handles all outgoing messages for the websocket connection.
250 func (c *WSClient) outHandler() {
251 out:
252         for {
253                 // Send any messages ready for send until the quit channel is closed.
254                 select {
255                 case r := <-c.sendChan:
256                         if err := c.conn.WriteMessage(websocket.TextMessage, r.msg); err != nil {
257                                 log.WithFields(log.Fields{"module": logModule, "error": err}).Error("Failed to send message to wesocket client")
258                                 c.Disconnect()
259                                 break out
260                         }
261                         if r.doneChan != nil {
262                                 r.doneChan <- true
263                         }
264                 case <-c.quit:
265                         break out
266                 }
267         }
268
269         // Drain any wait channels before exiting so nothing is left waiting around to send.
270 cleanup:
271         for {
272                 select {
273                 case r := <-c.sendChan:
274                         if r.doneChan != nil {
275                                 r.doneChan <- false
276                         }
277                 default:
278                         break cleanup
279                 }
280         }
281         c.wg.Done()
282         log.WithFields(log.Fields{"module": logModule, "remoteAddress": c.addr}).Debug("Websocket client output handler done")
283 }
284
285 // SendMessage sends the passed json to the websocket client.  It is backed
286 // by a buffered channel, so it will not block until the send channel is full.
287 // Note however that QueueNotification must be used for sending async
288 // notifications instead of the this function.  This approach allows a limit to
289 // the number of outstanding requests a client can make without preventing or
290 // blocking on async notifications.
291 func (c *WSClient) SendMessage(marshalledJSON []byte, doneChan chan bool) {
292         // Don't send the message if disconnected.
293         if c.Disconnected() {
294                 if doneChan != nil {
295                         doneChan <- false
296                 }
297                 return
298         }
299
300         c.sendChan <- responseMessage{msg: marshalledJSON, doneChan: doneChan}
301 }
302
303 // QueueNotification queues the passed notification to be sent to the websocket client.
304 func (c *WSClient) QueueNotification(marshalledJSON []byte) error {
305         // Don't queue the message if disconnected.
306         if c.Disconnected() {
307                 return ErrWSClientQuit
308         }
309
310         c.ntfnChan <- marshalledJSON
311         return nil
312 }
313
314 // Disconnected returns whether or not the websocket client is disconnected.
315 func (c *WSClient) Disconnected() bool {
316         c.Lock()
317         defer c.Unlock()
318
319         return c.disconnected
320 }
321
322 // Disconnect disconnects the websocket client.
323 func (c *WSClient) Disconnect() {
324         c.Lock()
325         defer c.Unlock()
326
327         // Nothing to do if already disconnected.
328         if c.disconnected {
329                 return
330         }
331
332         log.WithFields(log.Fields{"module": logModule, "remoteAddress": c.addr}).Info("Disconnecting websocket client")
333
334         close(c.quit)
335         c.conn.Close()
336         c.disconnected = true
337 }
338
339 // Start begins processing input and output messages.
340 func (c *WSClient) Start() {
341         log.WithFields(log.Fields{"module": logModule, "remoteAddress": c.addr}).Info("Starting websocket client")
342
343         c.wg.Add(3)
344         go c.inHandler()
345         go c.notificationQueueHandler()
346         go c.outHandler()
347 }
348
349 // WaitForShutdown blocks until the websocket client goroutines are stopped and the connection is closed.
350 func (c *WSClient) WaitForShutdown() {
351         c.wg.Wait()
352 }
353
354 // handleNotifyBlocks implements the notifyblocks topic extension for websocket connections.
355 func handleNotifyBlocks(wsc *WSClient) {
356         wsc.notificationMgr.RegisterBlockUpdates(wsc)
357 }
358
359 // handleStopNotifyBlocks implements the stopnotifyblocks topic extension for websocket connections.
360 func handleStopNotifyBlocks(wsc *WSClient) {
361         wsc.notificationMgr.UnregisterBlockUpdates(wsc)
362 }
363
364 // handleNotifyNewTransations implements the notifynewtransactions topic extension for websocket connections.
365 func handleNotifyNewTransactions(wsc *WSClient) {
366         wsc.notificationMgr.RegisterNewMempoolTxsUpdates(wsc)
367 }
368
369 // handleStopNotifyNewTransations implements the stopnotifynewtransactions topic extension for websocket connections.
370 func handleStopNotifyNewTransactions(wsc *WSClient) {
371         wsc.notificationMgr.UnregisterNewMempoolTxsUpdates(wsc)
372 }