OSDN Git Service

new repo
[bytom/vapor.git] / vendor / github.com / btcsuite / btcd / rpcclient / infrastructure.go
1 // Copyright (c) 2014-2017 The btcsuite developers
2 // Use of this source code is governed by an ISC
3 // license that can be found in the LICENSE file.
4
5 package rpcclient
6
7 import (
8         "bytes"
9         "container/list"
10         "crypto/tls"
11         "crypto/x509"
12         "encoding/base64"
13         "encoding/json"
14         "errors"
15         "fmt"
16         "io"
17         "io/ioutil"
18         "math"
19         "net"
20         "net/http"
21         "net/url"
22         "sync"
23         "sync/atomic"
24         "time"
25
26         "github.com/btcsuite/btcd/btcjson"
27         "github.com/btcsuite/go-socks/socks"
28         "github.com/btcsuite/websocket"
29 )
30
31 var (
32         // ErrInvalidAuth is an error to describe the condition where the client
33         // is either unable to authenticate or the specified endpoint is
34         // incorrect.
35         ErrInvalidAuth = errors.New("authentication failure")
36
37         // ErrInvalidEndpoint is an error to describe the condition where the
38         // websocket handshake failed with the specified endpoint.
39         ErrInvalidEndpoint = errors.New("the endpoint either does not support " +
40                 "websockets or does not exist")
41
42         // ErrClientNotConnected is an error to describe the condition where a
43         // websocket client has been created, but the connection was never
44         // established.  This condition differs from ErrClientDisconnect, which
45         // represents an established connection that was lost.
46         ErrClientNotConnected = errors.New("the client was never connected")
47
48         // ErrClientDisconnect is an error to describe the condition where the
49         // client has been disconnected from the RPC server.  When the
50         // DisableAutoReconnect option is not set, any outstanding futures
51         // when a client disconnect occurs will return this error as will
52         // any new requests.
53         ErrClientDisconnect = errors.New("the client has been disconnected")
54
55         // ErrClientShutdown is an error to describe the condition where the
56         // client is either already shutdown, or in the process of shutting
57         // down.  Any outstanding futures when a client shutdown occurs will
58         // return this error as will any new requests.
59         ErrClientShutdown = errors.New("the client has been shutdown")
60
61         // ErrNotWebsocketClient is an error to describe the condition of
62         // calling a Client method intended for a websocket client when the
63         // client has been configured to run in HTTP POST mode instead.
64         ErrNotWebsocketClient = errors.New("client is not configured for " +
65                 "websockets")
66
67         // ErrClientAlreadyConnected is an error to describe the condition where
68         // a new client connection cannot be established due to a websocket
69         // client having already connected to the RPC server.
70         ErrClientAlreadyConnected = errors.New("websocket client has already " +
71                 "connected")
72 )
73
74 const (
75         // sendBufferSize is the number of elements the websocket send channel
76         // can queue before blocking.
77         sendBufferSize = 50
78
79         // sendPostBufferSize is the number of elements the HTTP POST send
80         // channel can queue before blocking.
81         sendPostBufferSize = 100
82
83         // connectionRetryInterval is the amount of time to wait in between
84         // retries when automatically reconnecting to an RPC server.
85         connectionRetryInterval = time.Second * 5
86 )
87
88 // sendPostDetails houses an HTTP POST request to send to an RPC server as well
89 // as the original JSON-RPC command and a channel to reply on when the server
90 // responds with the result.
91 type sendPostDetails struct {
92         httpRequest *http.Request
93         jsonRequest *jsonRequest
94 }
95
96 // jsonRequest holds information about a json request that is used to properly
97 // detect, interpret, and deliver a reply to it.
98 type jsonRequest struct {
99         id             uint64
100         method         string
101         cmd            interface{}
102         marshalledJSON []byte
103         responseChan   chan *response
104 }
105
106 // Client represents a Bitcoin RPC client which allows easy access to the
107 // various RPC methods available on a Bitcoin RPC server.  Each of the wrapper
108 // functions handle the details of converting the passed and return types to and
109 // from the underlying JSON types which are required for the JSON-RPC
110 // invocations
111 //
112 // The client provides each RPC in both synchronous (blocking) and asynchronous
113 // (non-blocking) forms.  The asynchronous forms are based on the concept of
114 // futures where they return an instance of a type that promises to deliver the
115 // result of the invocation at some future time.  Invoking the Receive method on
116 // the returned future will block until the result is available if it's not
117 // already.
118 type Client struct {
119         id uint64 // atomic, so must stay 64-bit aligned
120
121         // config holds the connection configuration assoiated with this client.
122         config *ConnConfig
123
124         // wsConn is the underlying websocket connection when not in HTTP POST
125         // mode.
126         wsConn *websocket.Conn
127
128         // httpClient is the underlying HTTP client to use when running in HTTP
129         // POST mode.
130         httpClient *http.Client
131
132         // mtx is a mutex to protect access to connection related fields.
133         mtx sync.Mutex
134
135         // disconnected indicated whether or not the server is disconnected.
136         disconnected bool
137
138         // retryCount holds the number of times the client has tried to
139         // reconnect to the RPC server.
140         retryCount int64
141
142         // Track command and their response channels by ID.
143         requestLock sync.Mutex
144         requestMap  map[uint64]*list.Element
145         requestList *list.List
146
147         // Notifications.
148         ntfnHandlers  *NotificationHandlers
149         ntfnStateLock sync.Mutex
150         ntfnState     *notificationState
151
152         // Networking infrastructure.
153         sendChan        chan []byte
154         sendPostChan    chan *sendPostDetails
155         connEstablished chan struct{}
156         disconnect      chan struct{}
157         shutdown        chan struct{}
158         wg              sync.WaitGroup
159 }
160
161 // NextID returns the next id to be used when sending a JSON-RPC message.  This
162 // ID allows responses to be associated with particular requests per the
163 // JSON-RPC specification.  Typically the consumer of the client does not need
164 // to call this function, however, if a custom request is being created and used
165 // this function should be used to ensure the ID is unique amongst all requests
166 // being made.
167 func (c *Client) NextID() uint64 {
168         return atomic.AddUint64(&c.id, 1)
169 }
170
171 // addRequest associates the passed jsonRequest with its id.  This allows the
172 // response from the remote server to be unmarshalled to the appropriate type
173 // and sent to the specified channel when it is received.
174 //
175 // If the client has already begun shutting down, ErrClientShutdown is returned
176 // and the request is not added.
177 //
178 // This function is safe for concurrent access.
179 func (c *Client) addRequest(jReq *jsonRequest) error {
180         c.requestLock.Lock()
181         defer c.requestLock.Unlock()
182
183         // A non-blocking read of the shutdown channel with the request lock
184         // held avoids adding the request to the client's internal data
185         // structures if the client is in the process of shutting down (and
186         // has not yet grabbed the request lock), or has finished shutdown
187         // already (responding to each outstanding request with
188         // ErrClientShutdown).
189         select {
190         case <-c.shutdown:
191                 return ErrClientShutdown
192         default:
193         }
194
195         element := c.requestList.PushBack(jReq)
196         c.requestMap[jReq.id] = element
197         return nil
198 }
199
200 // removeRequest returns and removes the jsonRequest which contains the response
201 // channel and original method associated with the passed id or nil if there is
202 // no association.
203 //
204 // This function is safe for concurrent access.
205 func (c *Client) removeRequest(id uint64) *jsonRequest {
206         c.requestLock.Lock()
207         defer c.requestLock.Unlock()
208
209         element := c.requestMap[id]
210         if element != nil {
211                 delete(c.requestMap, id)
212                 request := c.requestList.Remove(element).(*jsonRequest)
213                 return request
214         }
215
216         return nil
217 }
218
219 // removeAllRequests removes all the jsonRequests which contain the response
220 // channels for outstanding requests.
221 //
222 // This function MUST be called with the request lock held.
223 func (c *Client) removeAllRequests() {
224         c.requestMap = make(map[uint64]*list.Element)
225         c.requestList.Init()
226 }
227
228 // trackRegisteredNtfns examines the passed command to see if it is one of
229 // the notification commands and updates the notification state that is used
230 // to automatically re-establish registered notifications on reconnects.
231 func (c *Client) trackRegisteredNtfns(cmd interface{}) {
232         // Nothing to do if the caller is not interested in notifications.
233         if c.ntfnHandlers == nil {
234                 return
235         }
236
237         c.ntfnStateLock.Lock()
238         defer c.ntfnStateLock.Unlock()
239
240         switch bcmd := cmd.(type) {
241         case *btcjson.NotifyBlocksCmd:
242                 c.ntfnState.notifyBlocks = true
243
244         case *btcjson.NotifyNewTransactionsCmd:
245                 if bcmd.Verbose != nil && *bcmd.Verbose {
246                         c.ntfnState.notifyNewTxVerbose = true
247                 } else {
248                         c.ntfnState.notifyNewTx = true
249
250                 }
251
252         case *btcjson.NotifySpentCmd:
253                 for _, op := range bcmd.OutPoints {
254                         c.ntfnState.notifySpent[op] = struct{}{}
255                 }
256
257         case *btcjson.NotifyReceivedCmd:
258                 for _, addr := range bcmd.Addresses {
259                         c.ntfnState.notifyReceived[addr] = struct{}{}
260                 }
261         }
262 }
263
264 type (
265         // inMessage is the first type that an incoming message is unmarshaled
266         // into. It supports both requests (for notification support) and
267         // responses.  The partially-unmarshaled message is a notification if
268         // the embedded ID (from the response) is nil.  Otherwise, it is a
269         // response.
270         inMessage struct {
271                 ID *float64 `json:"id"`
272                 *rawNotification
273                 *rawResponse
274         }
275
276         // rawNotification is a partially-unmarshaled JSON-RPC notification.
277         rawNotification struct {
278                 Method string            `json:"method"`
279                 Params []json.RawMessage `json:"params"`
280         }
281
282         // rawResponse is a partially-unmarshaled JSON-RPC response.  For this
283         // to be valid (according to JSON-RPC 1.0 spec), ID may not be nil.
284         rawResponse struct {
285                 Result json.RawMessage   `json:"result"`
286                 Error  *btcjson.RPCError `json:"error"`
287         }
288 )
289
290 // response is the raw bytes of a JSON-RPC result, or the error if the response
291 // error object was non-null.
292 type response struct {
293         result []byte
294         err    error
295 }
296
297 // result checks whether the unmarshaled response contains a non-nil error,
298 // returning an unmarshaled btcjson.RPCError (or an unmarshaling error) if so.
299 // If the response is not an error, the raw bytes of the request are
300 // returned for further unmashaling into specific result types.
301 func (r rawResponse) result() (result []byte, err error) {
302         if r.Error != nil {
303                 return nil, r.Error
304         }
305         return r.Result, nil
306 }
307
308 // handleMessage is the main handler for incoming notifications and responses.
309 func (c *Client) handleMessage(msg []byte) {
310         // Attempt to unmarshal the message as either a notification or
311         // response.
312         var in inMessage
313         err := json.Unmarshal(msg, &in)
314         if err != nil {
315                 log.Warnf("Remote server sent invalid message: %v", err)
316                 return
317         }
318
319         // JSON-RPC 1.0 notifications are requests with a null id.
320         if in.ID == nil {
321                 ntfn := in.rawNotification
322                 if ntfn == nil {
323                         log.Warn("Malformed notification: missing " +
324                                 "method and parameters")
325                         return
326                 }
327                 if ntfn.Method == "" {
328                         log.Warn("Malformed notification: missing method")
329                         return
330                 }
331                 // params are not optional: nil isn't valid (but len == 0 is)
332                 if ntfn.Params == nil {
333                         log.Warn("Malformed notification: missing params")
334                         return
335                 }
336                 // Deliver the notification.
337                 log.Tracef("Received notification [%s]", in.Method)
338                 c.handleNotification(in.rawNotification)
339                 return
340         }
341
342         // ensure that in.ID can be converted to an integer without loss of precision
343         if *in.ID < 0 || *in.ID != math.Trunc(*in.ID) {
344                 log.Warn("Malformed response: invalid identifier")
345                 return
346         }
347
348         if in.rawResponse == nil {
349                 log.Warn("Malformed response: missing result and error")
350                 return
351         }
352
353         id := uint64(*in.ID)
354         log.Tracef("Received response for id %d (result %s)", id, in.Result)
355         request := c.removeRequest(id)
356
357         // Nothing more to do if there is no request associated with this reply.
358         if request == nil || request.responseChan == nil {
359                 log.Warnf("Received unexpected reply: %s (id %d)", in.Result,
360                         id)
361                 return
362         }
363
364         // Since the command was successful, examine it to see if it's a
365         // notification, and if is, add it to the notification state so it
366         // can automatically be re-established on reconnect.
367         c.trackRegisteredNtfns(request.cmd)
368
369         // Deliver the response.
370         result, err := in.rawResponse.result()
371         request.responseChan <- &response{result: result, err: err}
372 }
373
374 // shouldLogReadError returns whether or not the passed error, which is expected
375 // to have come from reading from the websocket connection in wsInHandler,
376 // should be logged.
377 func (c *Client) shouldLogReadError(err error) bool {
378         // No logging when the connetion is being forcibly disconnected.
379         select {
380         case <-c.shutdown:
381                 return false
382         default:
383         }
384
385         // No logging when the connection has been disconnected.
386         if err == io.EOF {
387                 return false
388         }
389         if opErr, ok := err.(*net.OpError); ok && !opErr.Temporary() {
390                 return false
391         }
392
393         return true
394 }
395
396 // wsInHandler handles all incoming messages for the websocket connection
397 // associated with the client.  It must be run as a goroutine.
398 func (c *Client) wsInHandler() {
399 out:
400         for {
401                 // Break out of the loop once the shutdown channel has been
402                 // closed.  Use a non-blocking select here so we fall through
403                 // otherwise.
404                 select {
405                 case <-c.shutdown:
406                         break out
407                 default:
408                 }
409
410                 _, msg, err := c.wsConn.ReadMessage()
411                 if err != nil {
412                         // Log the error if it's not due to disconnecting.
413                         if c.shouldLogReadError(err) {
414                                 log.Errorf("Websocket receive error from "+
415                                         "%s: %v", c.config.Host, err)
416                         }
417                         break out
418                 }
419                 c.handleMessage(msg)
420         }
421
422         // Ensure the connection is closed.
423         c.Disconnect()
424         c.wg.Done()
425         log.Tracef("RPC client input handler done for %s", c.config.Host)
426 }
427
428 // disconnectChan returns a copy of the current disconnect channel.  The channel
429 // is read protected by the client mutex, and is safe to call while the channel
430 // is being reassigned during a reconnect.
431 func (c *Client) disconnectChan() <-chan struct{} {
432         c.mtx.Lock()
433         ch := c.disconnect
434         c.mtx.Unlock()
435         return ch
436 }
437
438 // wsOutHandler handles all outgoing messages for the websocket connection.  It
439 // uses a buffered channel to serialize output messages while allowing the
440 // sender to continue running asynchronously.  It must be run as a goroutine.
441 func (c *Client) wsOutHandler() {
442 out:
443         for {
444                 // Send any messages ready for send until the client is
445                 // disconnected closed.
446                 select {
447                 case msg := <-c.sendChan:
448                         err := c.wsConn.WriteMessage(websocket.TextMessage, msg)
449                         if err != nil {
450                                 c.Disconnect()
451                                 break out
452                         }
453
454                 case <-c.disconnectChan():
455                         break out
456                 }
457         }
458
459         // Drain any channels before exiting so nothing is left waiting around
460         // to send.
461 cleanup:
462         for {
463                 select {
464                 case <-c.sendChan:
465                 default:
466                         break cleanup
467                 }
468         }
469         c.wg.Done()
470         log.Tracef("RPC client output handler done for %s", c.config.Host)
471 }
472
473 // sendMessage sends the passed JSON to the connected server using the
474 // websocket connection.  It is backed by a buffered channel, so it will not
475 // block until the send channel is full.
476 func (c *Client) sendMessage(marshalledJSON []byte) {
477         // Don't send the message if disconnected.
478         select {
479         case c.sendChan <- marshalledJSON:
480         case <-c.disconnectChan():
481                 return
482         }
483 }
484
485 // reregisterNtfns creates and sends commands needed to re-establish the current
486 // notification state associated with the client.  It should only be called on
487 // on reconnect by the resendRequests function.
488 func (c *Client) reregisterNtfns() error {
489         // Nothing to do if the caller is not interested in notifications.
490         if c.ntfnHandlers == nil {
491                 return nil
492         }
493
494         // In order to avoid holding the lock on the notification state for the
495         // entire time of the potentially long running RPCs issued below, make a
496         // copy of it and work from that.
497         //
498         // Also, other commands will be running concurrently which could modify
499         // the notification state (while not under the lock of course) which
500         // also register it with the remote RPC server, so this prevents double
501         // registrations.
502         c.ntfnStateLock.Lock()
503         stateCopy := c.ntfnState.Copy()
504         c.ntfnStateLock.Unlock()
505
506         // Reregister notifyblocks if needed.
507         if stateCopy.notifyBlocks {
508                 log.Debugf("Reregistering [notifyblocks]")
509                 if err := c.NotifyBlocks(); err != nil {
510                         return err
511                 }
512         }
513
514         // Reregister notifynewtransactions if needed.
515         if stateCopy.notifyNewTx || stateCopy.notifyNewTxVerbose {
516                 log.Debugf("Reregistering [notifynewtransactions] (verbose=%v)",
517                         stateCopy.notifyNewTxVerbose)
518                 err := c.NotifyNewTransactions(stateCopy.notifyNewTxVerbose)
519                 if err != nil {
520                         return err
521                 }
522         }
523
524         // Reregister the combination of all previously registered notifyspent
525         // outpoints in one command if needed.
526         nslen := len(stateCopy.notifySpent)
527         if nslen > 0 {
528                 outpoints := make([]btcjson.OutPoint, 0, nslen)
529                 for op := range stateCopy.notifySpent {
530                         outpoints = append(outpoints, op)
531                 }
532                 log.Debugf("Reregistering [notifyspent] outpoints: %v", outpoints)
533                 if err := c.notifySpentInternal(outpoints).Receive(); err != nil {
534                         return err
535                 }
536         }
537
538         // Reregister the combination of all previously registered
539         // notifyreceived addresses in one command if needed.
540         nrlen := len(stateCopy.notifyReceived)
541         if nrlen > 0 {
542                 addresses := make([]string, 0, nrlen)
543                 for addr := range stateCopy.notifyReceived {
544                         addresses = append(addresses, addr)
545                 }
546                 log.Debugf("Reregistering [notifyreceived] addresses: %v", addresses)
547                 if err := c.notifyReceivedInternal(addresses).Receive(); err != nil {
548                         return err
549                 }
550         }
551
552         return nil
553 }
554
555 // ignoreResends is a set of all methods for requests that are "long running"
556 // are not be reissued by the client on reconnect.
557 var ignoreResends = map[string]struct{}{
558         "rescan": {},
559 }
560
561 // resendRequests resends any requests that had not completed when the client
562 // disconnected.  It is intended to be called once the client has reconnected as
563 // a separate goroutine.
564 func (c *Client) resendRequests() {
565         // Set the notification state back up.  If anything goes wrong,
566         // disconnect the client.
567         if err := c.reregisterNtfns(); err != nil {
568                 log.Warnf("Unable to re-establish notification state: %v", err)
569                 c.Disconnect()
570                 return
571         }
572
573         // Since it's possible to block on send and more requests might be
574         // added by the caller while resending, make a copy of all of the
575         // requests that need to be resent now and work from the copy.  This
576         // also allows the lock to be released quickly.
577         c.requestLock.Lock()
578         resendReqs := make([]*jsonRequest, 0, c.requestList.Len())
579         var nextElem *list.Element
580         for e := c.requestList.Front(); e != nil; e = nextElem {
581                 nextElem = e.Next()
582
583                 jReq := e.Value.(*jsonRequest)
584                 if _, ok := ignoreResends[jReq.method]; ok {
585                         // If a request is not sent on reconnect, remove it
586                         // from the request structures, since no reply is
587                         // expected.
588                         delete(c.requestMap, jReq.id)
589                         c.requestList.Remove(e)
590                 } else {
591                         resendReqs = append(resendReqs, jReq)
592                 }
593         }
594         c.requestLock.Unlock()
595
596         for _, jReq := range resendReqs {
597                 // Stop resending commands if the client disconnected again
598                 // since the next reconnect will handle them.
599                 if c.Disconnected() {
600                         return
601                 }
602
603                 log.Tracef("Sending command [%s] with id %d", jReq.method,
604                         jReq.id)
605                 c.sendMessage(jReq.marshalledJSON)
606         }
607 }
608
609 // wsReconnectHandler listens for client disconnects and automatically tries
610 // to reconnect with retry interval that scales based on the number of retries.
611 // It also resends any commands that had not completed when the client
612 // disconnected so the disconnect/reconnect process is largely transparent to
613 // the caller.  This function is not run when the DisableAutoReconnect config
614 // options is set.
615 //
616 // This function must be run as a goroutine.
617 func (c *Client) wsReconnectHandler() {
618 out:
619         for {
620                 select {
621                 case <-c.disconnect:
622                         // On disconnect, fallthrough to reestablish the
623                         // connection.
624
625                 case <-c.shutdown:
626                         break out
627                 }
628
629         reconnect:
630                 for {
631                         select {
632                         case <-c.shutdown:
633                                 break out
634                         default:
635                         }
636
637                         wsConn, err := dial(c.config)
638                         if err != nil {
639                                 c.retryCount++
640                                 log.Infof("Failed to connect to %s: %v",
641                                         c.config.Host, err)
642
643                                 // Scale the retry interval by the number of
644                                 // retries so there is a backoff up to a max
645                                 // of 1 minute.
646                                 scaledInterval := connectionRetryInterval.Nanoseconds() * c.retryCount
647                                 scaledDuration := time.Duration(scaledInterval)
648                                 if scaledDuration > time.Minute {
649                                         scaledDuration = time.Minute
650                                 }
651                                 log.Infof("Retrying connection to %s in "+
652                                         "%s", c.config.Host, scaledDuration)
653                                 time.Sleep(scaledDuration)
654                                 continue reconnect
655                         }
656
657                         log.Infof("Reestablished connection to RPC server %s",
658                                 c.config.Host)
659
660                         // Reset the connection state and signal the reconnect
661                         // has happened.
662                         c.wsConn = wsConn
663                         c.retryCount = 0
664
665                         c.mtx.Lock()
666                         c.disconnect = make(chan struct{})
667                         c.disconnected = false
668                         c.mtx.Unlock()
669
670                         // Start processing input and output for the
671                         // new connection.
672                         c.start()
673
674                         // Reissue pending requests in another goroutine since
675                         // the send can block.
676                         go c.resendRequests()
677
678                         // Break out of the reconnect loop back to wait for
679                         // disconnect again.
680                         break reconnect
681                 }
682         }
683         c.wg.Done()
684         log.Tracef("RPC client reconnect handler done for %s", c.config.Host)
685 }
686
687 // handleSendPostMessage handles performing the passed HTTP request, reading the
688 // result, unmarshalling it, and delivering the unmarshalled result to the
689 // provided response channel.
690 func (c *Client) handleSendPostMessage(details *sendPostDetails) {
691         jReq := details.jsonRequest
692         log.Tracef("Sending command [%s] with id %d", jReq.method, jReq.id)
693         httpResponse, err := c.httpClient.Do(details.httpRequest)
694         if err != nil {
695                 jReq.responseChan <- &response{err: err}
696                 return
697         }
698
699         // Read the raw bytes and close the response.
700         respBytes, err := ioutil.ReadAll(httpResponse.Body)
701         httpResponse.Body.Close()
702         if err != nil {
703                 err = fmt.Errorf("error reading json reply: %v", err)
704                 jReq.responseChan <- &response{err: err}
705                 return
706         }
707
708         // Try to unmarshal the response as a regular JSON-RPC response.
709         var resp rawResponse
710         err = json.Unmarshal(respBytes, &resp)
711         if err != nil {
712                 // When the response itself isn't a valid JSON-RPC response
713                 // return an error which includes the HTTP status code and raw
714                 // response bytes.
715                 err = fmt.Errorf("status code: %d, response: %q",
716                         httpResponse.StatusCode, string(respBytes))
717                 jReq.responseChan <- &response{err: err}
718                 return
719         }
720
721         res, err := resp.result()
722         jReq.responseChan <- &response{result: res, err: err}
723 }
724
725 // sendPostHandler handles all outgoing messages when the client is running
726 // in HTTP POST mode.  It uses a buffered channel to serialize output messages
727 // while allowing the sender to continue running asynchronously.  It must be run
728 // as a goroutine.
729 func (c *Client) sendPostHandler() {
730 out:
731         for {
732                 // Send any messages ready for send until the shutdown channel
733                 // is closed.
734                 select {
735                 case details := <-c.sendPostChan:
736                         c.handleSendPostMessage(details)
737
738                 case <-c.shutdown:
739                         break out
740                 }
741         }
742
743         // Drain any wait channels before exiting so nothing is left waiting
744         // around to send.
745 cleanup:
746         for {
747                 select {
748                 case details := <-c.sendPostChan:
749                         details.jsonRequest.responseChan <- &response{
750                                 result: nil,
751                                 err:    ErrClientShutdown,
752                         }
753
754                 default:
755                         break cleanup
756                 }
757         }
758         c.wg.Done()
759         log.Tracef("RPC client send handler done for %s", c.config.Host)
760
761 }
762
763 // sendPostRequest sends the passed HTTP request to the RPC server using the
764 // HTTP client associated with the client.  It is backed by a buffered channel,
765 // so it will not block until the send channel is full.
766 func (c *Client) sendPostRequest(httpReq *http.Request, jReq *jsonRequest) {
767         // Don't send the message if shutting down.
768         select {
769         case <-c.shutdown:
770                 jReq.responseChan <- &response{result: nil, err: ErrClientShutdown}
771         default:
772         }
773
774         c.sendPostChan <- &sendPostDetails{
775                 jsonRequest: jReq,
776                 httpRequest: httpReq,
777         }
778 }
779
780 // newFutureError returns a new future result channel that already has the
781 // passed error waitin on the channel with the reply set to nil.  This is useful
782 // to easily return errors from the various Async functions.
783 func newFutureError(err error) chan *response {
784         responseChan := make(chan *response, 1)
785         responseChan <- &response{err: err}
786         return responseChan
787 }
788
789 // receiveFuture receives from the passed futureResult channel to extract a
790 // reply or any errors.  The examined errors include an error in the
791 // futureResult and the error in the reply from the server.  This will block
792 // until the result is available on the passed channel.
793 func receiveFuture(f chan *response) ([]byte, error) {
794         // Wait for a response on the returned channel.
795         r := <-f
796         return r.result, r.err
797 }
798
799 // sendPost sends the passed request to the server by issuing an HTTP POST
800 // request using the provided response channel for the reply.  Typically a new
801 // connection is opened and closed for each command when using this method,
802 // however, the underlying HTTP client might coalesce multiple commands
803 // depending on several factors including the remote server configuration.
804 func (c *Client) sendPost(jReq *jsonRequest) {
805         // Generate a request to the configured RPC server.
806         protocol := "http"
807         if !c.config.DisableTLS {
808                 protocol = "https"
809         }
810         url := protocol + "://" + c.config.Host
811         bodyReader := bytes.NewReader(jReq.marshalledJSON)
812         httpReq, err := http.NewRequest("POST", url, bodyReader)
813         if err != nil {
814                 jReq.responseChan <- &response{result: nil, err: err}
815                 return
816         }
817         httpReq.Close = true
818         httpReq.Header.Set("Content-Type", "application/json")
819
820         // Configure basic access authorization.
821         httpReq.SetBasicAuth(c.config.User, c.config.Pass)
822
823         log.Tracef("Sending command [%s] with id %d", jReq.method, jReq.id)
824         c.sendPostRequest(httpReq, jReq)
825 }
826
827 // sendRequest sends the passed json request to the associated server using the
828 // provided response channel for the reply.  It handles both websocket and HTTP
829 // POST mode depending on the configuration of the client.
830 func (c *Client) sendRequest(jReq *jsonRequest) {
831         // Choose which marshal and send function to use depending on whether
832         // the client running in HTTP POST mode or not.  When running in HTTP
833         // POST mode, the command is issued via an HTTP client.  Otherwise,
834         // the command is issued via the asynchronous websocket channels.
835         if c.config.HTTPPostMode {
836                 c.sendPost(jReq)
837                 return
838         }
839
840         // Check whether the websocket connection has never been established,
841         // in which case the handler goroutines are not running.
842         select {
843         case <-c.connEstablished:
844         default:
845                 jReq.responseChan <- &response{err: ErrClientNotConnected}
846                 return
847         }
848
849         // Add the request to the internal tracking map so the response from the
850         // remote server can be properly detected and routed to the response
851         // channel.  Then send the marshalled request via the websocket
852         // connection.
853         if err := c.addRequest(jReq); err != nil {
854                 jReq.responseChan <- &response{err: err}
855                 return
856         }
857         log.Tracef("Sending command [%s] with id %d", jReq.method, jReq.id)
858         c.sendMessage(jReq.marshalledJSON)
859 }
860
861 // sendCmd sends the passed command to the associated server and returns a
862 // response channel on which the reply will be delivered at some point in the
863 // future.  It handles both websocket and HTTP POST mode depending on the
864 // configuration of the client.
865 func (c *Client) sendCmd(cmd interface{}) chan *response {
866         // Get the method associated with the command.
867         method, err := btcjson.CmdMethod(cmd)
868         if err != nil {
869                 return newFutureError(err)
870         }
871
872         // Marshal the command.
873         id := c.NextID()
874         marshalledJSON, err := btcjson.MarshalCmd(id, cmd)
875         if err != nil {
876                 return newFutureError(err)
877         }
878
879         // Generate the request and send it along with a channel to respond on.
880         responseChan := make(chan *response, 1)
881         jReq := &jsonRequest{
882                 id:             id,
883                 method:         method,
884                 cmd:            cmd,
885                 marshalledJSON: marshalledJSON,
886                 responseChan:   responseChan,
887         }
888         c.sendRequest(jReq)
889
890         return responseChan
891 }
892
893 // sendCmdAndWait sends the passed command to the associated server, waits
894 // for the reply, and returns the result from it.  It will return the error
895 // field in the reply if there is one.
896 func (c *Client) sendCmdAndWait(cmd interface{}) (interface{}, error) {
897         // Marshal the command to JSON-RPC, send it to the connected server, and
898         // wait for a response on the returned channel.
899         return receiveFuture(c.sendCmd(cmd))
900 }
901
902 // Disconnected returns whether or not the server is disconnected.  If a
903 // websocket client was created but never connected, this also returns false.
904 func (c *Client) Disconnected() bool {
905         c.mtx.Lock()
906         defer c.mtx.Unlock()
907
908         select {
909         case <-c.connEstablished:
910                 return c.disconnected
911         default:
912                 return false
913         }
914 }
915
916 // doDisconnect disconnects the websocket associated with the client if it
917 // hasn't already been disconnected.  It will return false if the disconnect is
918 // not needed or the client is running in HTTP POST mode.
919 //
920 // This function is safe for concurrent access.
921 func (c *Client) doDisconnect() bool {
922         if c.config.HTTPPostMode {
923                 return false
924         }
925
926         c.mtx.Lock()
927         defer c.mtx.Unlock()
928
929         // Nothing to do if already disconnected.
930         if c.disconnected {
931                 return false
932         }
933
934         log.Tracef("Disconnecting RPC client %s", c.config.Host)
935         close(c.disconnect)
936         if c.wsConn != nil {
937                 c.wsConn.Close()
938         }
939         c.disconnected = true
940         return true
941 }
942
943 // doShutdown closes the shutdown channel and logs the shutdown unless shutdown
944 // is already in progress.  It will return false if the shutdown is not needed.
945 //
946 // This function is safe for concurrent access.
947 func (c *Client) doShutdown() bool {
948         // Ignore the shutdown request if the client is already in the process
949         // of shutting down or already shutdown.
950         select {
951         case <-c.shutdown:
952                 return false
953         default:
954         }
955
956         log.Tracef("Shutting down RPC client %s", c.config.Host)
957         close(c.shutdown)
958         return true
959 }
960
961 // Disconnect disconnects the current websocket associated with the client.  The
962 // connection will automatically be re-established unless the client was
963 // created with the DisableAutoReconnect flag.
964 //
965 // This function has no effect when the client is running in HTTP POST mode.
966 func (c *Client) Disconnect() {
967         // Nothing to do if already disconnected or running in HTTP POST mode.
968         if !c.doDisconnect() {
969                 return
970         }
971
972         c.requestLock.Lock()
973         defer c.requestLock.Unlock()
974
975         // When operating without auto reconnect, send errors to any pending
976         // requests and shutdown the client.
977         if c.config.DisableAutoReconnect {
978                 for e := c.requestList.Front(); e != nil; e = e.Next() {
979                         req := e.Value.(*jsonRequest)
980                         req.responseChan <- &response{
981                                 result: nil,
982                                 err:    ErrClientDisconnect,
983                         }
984                 }
985                 c.removeAllRequests()
986                 c.doShutdown()
987         }
988 }
989
990 // Shutdown shuts down the client by disconnecting any connections associated
991 // with the client and, when automatic reconnect is enabled, preventing future
992 // attempts to reconnect.  It also stops all goroutines.
993 func (c *Client) Shutdown() {
994         // Do the shutdown under the request lock to prevent clients from
995         // adding new requests while the client shutdown process is initiated.
996         c.requestLock.Lock()
997         defer c.requestLock.Unlock()
998
999         // Ignore the shutdown request if the client is already in the process
1000         // of shutting down or already shutdown.
1001         if !c.doShutdown() {
1002                 return
1003         }
1004
1005         // Send the ErrClientShutdown error to any pending requests.
1006         for e := c.requestList.Front(); e != nil; e = e.Next() {
1007                 req := e.Value.(*jsonRequest)
1008                 req.responseChan <- &response{
1009                         result: nil,
1010                         err:    ErrClientShutdown,
1011                 }
1012         }
1013         c.removeAllRequests()
1014
1015         // Disconnect the client if needed.
1016         c.doDisconnect()
1017 }
1018
1019 // start begins processing input and output messages.
1020 func (c *Client) start() {
1021         log.Tracef("Starting RPC client %s", c.config.Host)
1022
1023         // Start the I/O processing handlers depending on whether the client is
1024         // in HTTP POST mode or the default websocket mode.
1025         if c.config.HTTPPostMode {
1026                 c.wg.Add(1)
1027                 go c.sendPostHandler()
1028         } else {
1029                 c.wg.Add(3)
1030                 go func() {
1031                         if c.ntfnHandlers != nil {
1032                                 if c.ntfnHandlers.OnClientConnected != nil {
1033                                         c.ntfnHandlers.OnClientConnected()
1034                                 }
1035                         }
1036                         c.wg.Done()
1037                 }()
1038                 go c.wsInHandler()
1039                 go c.wsOutHandler()
1040         }
1041 }
1042
1043 // WaitForShutdown blocks until the client goroutines are stopped and the
1044 // connection is closed.
1045 func (c *Client) WaitForShutdown() {
1046         c.wg.Wait()
1047 }
1048
1049 // ConnConfig describes the connection configuration parameters for the client.
1050 // This
1051 type ConnConfig struct {
1052         // Host is the IP address and port of the RPC server you want to connect
1053         // to.
1054         Host string
1055
1056         // Endpoint is the websocket endpoint on the RPC server.  This is
1057         // typically "ws".
1058         Endpoint string
1059
1060         // User is the username to use to authenticate to the RPC server.
1061         User string
1062
1063         // Pass is the passphrase to use to authenticate to the RPC server.
1064         Pass string
1065
1066         // DisableTLS specifies whether transport layer security should be
1067         // disabled.  It is recommended to always use TLS if the RPC server
1068         // supports it as otherwise your username and password is sent across
1069         // the wire in cleartext.
1070         DisableTLS bool
1071
1072         // Certificates are the bytes for a PEM-encoded certificate chain used
1073         // for the TLS connection.  It has no effect if the DisableTLS parameter
1074         // is true.
1075         Certificates []byte
1076
1077         // Proxy specifies to connect through a SOCKS 5 proxy server.  It may
1078         // be an empty string if a proxy is not required.
1079         Proxy string
1080
1081         // ProxyUser is an optional username to use for the proxy server if it
1082         // requires authentication.  It has no effect if the Proxy parameter
1083         // is not set.
1084         ProxyUser string
1085
1086         // ProxyPass is an optional password to use for the proxy server if it
1087         // requires authentication.  It has no effect if the Proxy parameter
1088         // is not set.
1089         ProxyPass string
1090
1091         // DisableAutoReconnect specifies the client should not automatically
1092         // try to reconnect to the server when it has been disconnected.
1093         DisableAutoReconnect bool
1094
1095         // DisableConnectOnNew specifies that a websocket client connection
1096         // should not be tried when creating the client with New.  Instead, the
1097         // client is created and returned unconnected, and Connect must be
1098         // called manually.
1099         DisableConnectOnNew bool
1100
1101         // HTTPPostMode instructs the client to run using multiple independent
1102         // connections issuing HTTP POST requests instead of using the default
1103         // of websockets.  Websockets are generally preferred as some of the
1104         // features of the client such notifications only work with websockets,
1105         // however, not all servers support the websocket extensions, so this
1106         // flag can be set to true to use basic HTTP POST requests instead.
1107         HTTPPostMode bool
1108
1109         // EnableBCInfoHacks is an option provided to enable compatiblity hacks
1110         // when connecting to blockchain.info RPC server
1111         EnableBCInfoHacks bool
1112 }
1113
1114 // newHTTPClient returns a new http client that is configured according to the
1115 // proxy and TLS settings in the associated connection configuration.
1116 func newHTTPClient(config *ConnConfig) (*http.Client, error) {
1117         // Set proxy function if there is a proxy configured.
1118         var proxyFunc func(*http.Request) (*url.URL, error)
1119         if config.Proxy != "" {
1120                 proxyURL, err := url.Parse(config.Proxy)
1121                 if err != nil {
1122                         return nil, err
1123                 }
1124                 proxyFunc = http.ProxyURL(proxyURL)
1125         }
1126
1127         // Configure TLS if needed.
1128         var tlsConfig *tls.Config
1129         if !config.DisableTLS {
1130                 if len(config.Certificates) > 0 {
1131                         pool := x509.NewCertPool()
1132                         pool.AppendCertsFromPEM(config.Certificates)
1133                         tlsConfig = &tls.Config{
1134                                 RootCAs: pool,
1135                         }
1136                 }
1137         }
1138
1139         client := http.Client{
1140                 Transport: &http.Transport{
1141                         Proxy:           proxyFunc,
1142                         TLSClientConfig: tlsConfig,
1143                 },
1144         }
1145
1146         return &client, nil
1147 }
1148
1149 // dial opens a websocket connection using the passed connection configuration
1150 // details.
1151 func dial(config *ConnConfig) (*websocket.Conn, error) {
1152         // Setup TLS if not disabled.
1153         var tlsConfig *tls.Config
1154         var scheme = "ws"
1155         if !config.DisableTLS {
1156                 tlsConfig = &tls.Config{
1157                         MinVersion: tls.VersionTLS12,
1158                 }
1159                 if len(config.Certificates) > 0 {
1160                         pool := x509.NewCertPool()
1161                         pool.AppendCertsFromPEM(config.Certificates)
1162                         tlsConfig.RootCAs = pool
1163                 }
1164                 scheme = "wss"
1165         }
1166
1167         // Create a websocket dialer that will be used to make the connection.
1168         // It is modified by the proxy setting below as needed.
1169         dialer := websocket.Dialer{TLSClientConfig: tlsConfig}
1170
1171         // Setup the proxy if one is configured.
1172         if config.Proxy != "" {
1173                 proxy := &socks.Proxy{
1174                         Addr:     config.Proxy,
1175                         Username: config.ProxyUser,
1176                         Password: config.ProxyPass,
1177                 }
1178                 dialer.NetDial = proxy.Dial
1179         }
1180
1181         // The RPC server requires basic authorization, so create a custom
1182         // request header with the Authorization header set.
1183         login := config.User + ":" + config.Pass
1184         auth := "Basic " + base64.StdEncoding.EncodeToString([]byte(login))
1185         requestHeader := make(http.Header)
1186         requestHeader.Add("Authorization", auth)
1187
1188         // Dial the connection.
1189         url := fmt.Sprintf("%s://%s/%s", scheme, config.Host, config.Endpoint)
1190         wsConn, resp, err := dialer.Dial(url, requestHeader)
1191         if err != nil {
1192                 if err != websocket.ErrBadHandshake || resp == nil {
1193                         return nil, err
1194                 }
1195
1196                 // Detect HTTP authentication error status codes.
1197                 if resp.StatusCode == http.StatusUnauthorized ||
1198                         resp.StatusCode == http.StatusForbidden {
1199                         return nil, ErrInvalidAuth
1200                 }
1201
1202                 // The connection was authenticated and the status response was
1203                 // ok, but the websocket handshake still failed, so the endpoint
1204                 // is invalid in some way.
1205                 if resp.StatusCode == http.StatusOK {
1206                         return nil, ErrInvalidEndpoint
1207                 }
1208
1209                 // Return the status text from the server if none of the special
1210                 // cases above apply.
1211                 return nil, errors.New(resp.Status)
1212         }
1213         return wsConn, nil
1214 }
1215
1216 // New creates a new RPC client based on the provided connection configuration
1217 // details.  The notification handlers parameter may be nil if you are not
1218 // interested in receiving notifications and will be ignored if the
1219 // configuration is set to run in HTTP POST mode.
1220 func New(config *ConnConfig, ntfnHandlers *NotificationHandlers) (*Client, error) {
1221         // Either open a websocket connection or create an HTTP client depending
1222         // on the HTTP POST mode.  Also, set the notification handlers to nil
1223         // when running in HTTP POST mode.
1224         var wsConn *websocket.Conn
1225         var httpClient *http.Client
1226         connEstablished := make(chan struct{})
1227         var start bool
1228         if config.HTTPPostMode {
1229                 ntfnHandlers = nil
1230                 start = true
1231
1232                 var err error
1233                 httpClient, err = newHTTPClient(config)
1234                 if err != nil {
1235                         return nil, err
1236                 }
1237         } else {
1238                 if !config.DisableConnectOnNew {
1239                         var err error
1240                         wsConn, err = dial(config)
1241                         if err != nil {
1242                                 return nil, err
1243                         }
1244                         start = true
1245                 }
1246         }
1247
1248         client := &Client{
1249                 config:          config,
1250                 wsConn:          wsConn,
1251                 httpClient:      httpClient,
1252                 requestMap:      make(map[uint64]*list.Element),
1253                 requestList:     list.New(),
1254                 ntfnHandlers:    ntfnHandlers,
1255                 ntfnState:       newNotificationState(),
1256                 sendChan:        make(chan []byte, sendBufferSize),
1257                 sendPostChan:    make(chan *sendPostDetails, sendPostBufferSize),
1258                 connEstablished: connEstablished,
1259                 disconnect:      make(chan struct{}),
1260                 shutdown:        make(chan struct{}),
1261         }
1262
1263         if start {
1264                 log.Infof("Established connection to RPC server %s",
1265                         config.Host)
1266                 close(connEstablished)
1267                 client.start()
1268                 if !client.config.HTTPPostMode && !client.config.DisableAutoReconnect {
1269                         client.wg.Add(1)
1270                         go client.wsReconnectHandler()
1271                 }
1272         }
1273
1274         return client, nil
1275 }
1276
1277 // Connect establishes the initial websocket connection.  This is necessary when
1278 // a client was created after setting the DisableConnectOnNew field of the
1279 // Config struct.
1280 //
1281 // Up to tries number of connections (each after an increasing backoff) will
1282 // be tried if the connection can not be established.  The special value of 0
1283 // indicates an unlimited number of connection attempts.
1284 //
1285 // This method will error if the client is not configured for websockets, if the
1286 // connection has already been established, or if none of the connection
1287 // attempts were successful.
1288 func (c *Client) Connect(tries int) error {
1289         c.mtx.Lock()
1290         defer c.mtx.Unlock()
1291
1292         if c.config.HTTPPostMode {
1293                 return ErrNotWebsocketClient
1294         }
1295         if c.wsConn != nil {
1296                 return ErrClientAlreadyConnected
1297         }
1298
1299         // Begin connection attempts.  Increase the backoff after each failed
1300         // attempt, up to a maximum of one minute.
1301         var err error
1302         var backoff time.Duration
1303         for i := 0; tries == 0 || i < tries; i++ {
1304                 var wsConn *websocket.Conn
1305                 wsConn, err = dial(c.config)
1306                 if err != nil {
1307                         backoff = connectionRetryInterval * time.Duration(i+1)
1308                         if backoff > time.Minute {
1309                                 backoff = time.Minute
1310                         }
1311                         time.Sleep(backoff)
1312                         continue
1313                 }
1314
1315                 // Connection was established.  Set the websocket connection
1316                 // member of the client and start the goroutines necessary
1317                 // to run the client.
1318                 log.Infof("Established connection to RPC server %s",
1319                         c.config.Host)
1320                 c.wsConn = wsConn
1321                 close(c.connEstablished)
1322                 c.start()
1323                 if !c.config.DisableAutoReconnect {
1324                         c.wg.Add(1)
1325                         go c.wsReconnectHandler()
1326                 }
1327                 return nil
1328         }
1329
1330         // All connection attempts failed, so return the last error.
1331         return err
1332 }