1 // Copyright (c) 2014-2017 The btcsuite developers
2 // Use of this source code is governed by an ISC
3 // license that can be found in the LICENSE file.
26 "github.com/btcsuite/btcd/btcjson"
27 "github.com/btcsuite/go-socks/socks"
28 "github.com/btcsuite/websocket"
32 // ErrInvalidAuth is an error to describe the condition where the client
33 // is either unable to authenticate or the specified endpoint is
35 ErrInvalidAuth = errors.New("authentication failure")
37 // ErrInvalidEndpoint is an error to describe the condition where the
38 // websocket handshake failed with the specified endpoint.
39 ErrInvalidEndpoint = errors.New("the endpoint either does not support " +
40 "websockets or does not exist")
42 // ErrClientNotConnected is an error to describe the condition where a
43 // websocket client has been created, but the connection was never
44 // established. This condition differs from ErrClientDisconnect, which
45 // represents an established connection that was lost.
46 ErrClientNotConnected = errors.New("the client was never connected")
48 // ErrClientDisconnect is an error to describe the condition where the
49 // client has been disconnected from the RPC server. When the
50 // DisableAutoReconnect option is not set, any outstanding futures
51 // when a client disconnect occurs will return this error as will
53 ErrClientDisconnect = errors.New("the client has been disconnected")
55 // ErrClientShutdown is an error to describe the condition where the
56 // client is either already shutdown, or in the process of shutting
57 // down. Any outstanding futures when a client shutdown occurs will
58 // return this error as will any new requests.
59 ErrClientShutdown = errors.New("the client has been shutdown")
61 // ErrNotWebsocketClient is an error to describe the condition of
62 // calling a Client method intended for a websocket client when the
63 // client has been configured to run in HTTP POST mode instead.
64 ErrNotWebsocketClient = errors.New("client is not configured for " +
67 // ErrClientAlreadyConnected is an error to describe the condition where
68 // a new client connection cannot be established due to a websocket
69 // client having already connected to the RPC server.
70 ErrClientAlreadyConnected = errors.New("websocket client has already " +
75 // sendBufferSize is the number of elements the websocket send channel
76 // can queue before blocking.
79 // sendPostBufferSize is the number of elements the HTTP POST send
80 // channel can queue before blocking.
81 sendPostBufferSize = 100
83 // connectionRetryInterval is the amount of time to wait in between
84 // retries when automatically reconnecting to an RPC server.
85 connectionRetryInterval = time.Second * 5
88 // sendPostDetails houses an HTTP POST request to send to an RPC server as well
89 // as the original JSON-RPC command and a channel to reply on when the server
90 // responds with the result.
91 type sendPostDetails struct {
92 httpRequest *http.Request
93 jsonRequest *jsonRequest
96 // jsonRequest holds information about a json request that is used to properly
97 // detect, interpret, and deliver a reply to it.
98 type jsonRequest struct {
102 marshalledJSON []byte
103 responseChan chan *response
106 // Client represents a Bitcoin RPC client which allows easy access to the
107 // various RPC methods available on a Bitcoin RPC server. Each of the wrapper
108 // functions handle the details of converting the passed and return types to and
109 // from the underlying JSON types which are required for the JSON-RPC
112 // The client provides each RPC in both synchronous (blocking) and asynchronous
113 // (non-blocking) forms. The asynchronous forms are based on the concept of
114 // futures where they return an instance of a type that promises to deliver the
115 // result of the invocation at some future time. Invoking the Receive method on
116 // the returned future will block until the result is available if it's not
119 id uint64 // atomic, so must stay 64-bit aligned
121 // config holds the connection configuration assoiated with this client.
124 // wsConn is the underlying websocket connection when not in HTTP POST
126 wsConn *websocket.Conn
128 // httpClient is the underlying HTTP client to use when running in HTTP
130 httpClient *http.Client
132 // mtx is a mutex to protect access to connection related fields.
135 // disconnected indicated whether or not the server is disconnected.
138 // retryCount holds the number of times the client has tried to
139 // reconnect to the RPC server.
142 // Track command and their response channels by ID.
143 requestLock sync.Mutex
144 requestMap map[uint64]*list.Element
145 requestList *list.List
148 ntfnHandlers *NotificationHandlers
149 ntfnStateLock sync.Mutex
150 ntfnState *notificationState
152 // Networking infrastructure.
154 sendPostChan chan *sendPostDetails
155 connEstablished chan struct{}
156 disconnect chan struct{}
157 shutdown chan struct{}
161 // NextID returns the next id to be used when sending a JSON-RPC message. This
162 // ID allows responses to be associated with particular requests per the
163 // JSON-RPC specification. Typically the consumer of the client does not need
164 // to call this function, however, if a custom request is being created and used
165 // this function should be used to ensure the ID is unique amongst all requests
167 func (c *Client) NextID() uint64 {
168 return atomic.AddUint64(&c.id, 1)
171 // addRequest associates the passed jsonRequest with its id. This allows the
172 // response from the remote server to be unmarshalled to the appropriate type
173 // and sent to the specified channel when it is received.
175 // If the client has already begun shutting down, ErrClientShutdown is returned
176 // and the request is not added.
178 // This function is safe for concurrent access.
179 func (c *Client) addRequest(jReq *jsonRequest) error {
181 defer c.requestLock.Unlock()
183 // A non-blocking read of the shutdown channel with the request lock
184 // held avoids adding the request to the client's internal data
185 // structures if the client is in the process of shutting down (and
186 // has not yet grabbed the request lock), or has finished shutdown
187 // already (responding to each outstanding request with
188 // ErrClientShutdown).
191 return ErrClientShutdown
195 element := c.requestList.PushBack(jReq)
196 c.requestMap[jReq.id] = element
200 // removeRequest returns and removes the jsonRequest which contains the response
201 // channel and original method associated with the passed id or nil if there is
204 // This function is safe for concurrent access.
205 func (c *Client) removeRequest(id uint64) *jsonRequest {
207 defer c.requestLock.Unlock()
209 element := c.requestMap[id]
211 delete(c.requestMap, id)
212 request := c.requestList.Remove(element).(*jsonRequest)
219 // removeAllRequests removes all the jsonRequests which contain the response
220 // channels for outstanding requests.
222 // This function MUST be called with the request lock held.
223 func (c *Client) removeAllRequests() {
224 c.requestMap = make(map[uint64]*list.Element)
228 // trackRegisteredNtfns examines the passed command to see if it is one of
229 // the notification commands and updates the notification state that is used
230 // to automatically re-establish registered notifications on reconnects.
231 func (c *Client) trackRegisteredNtfns(cmd interface{}) {
232 // Nothing to do if the caller is not interested in notifications.
233 if c.ntfnHandlers == nil {
237 c.ntfnStateLock.Lock()
238 defer c.ntfnStateLock.Unlock()
240 switch bcmd := cmd.(type) {
241 case *btcjson.NotifyBlocksCmd:
242 c.ntfnState.notifyBlocks = true
244 case *btcjson.NotifyNewTransactionsCmd:
245 if bcmd.Verbose != nil && *bcmd.Verbose {
246 c.ntfnState.notifyNewTxVerbose = true
248 c.ntfnState.notifyNewTx = true
252 case *btcjson.NotifySpentCmd:
253 for _, op := range bcmd.OutPoints {
254 c.ntfnState.notifySpent[op] = struct{}{}
257 case *btcjson.NotifyReceivedCmd:
258 for _, addr := range bcmd.Addresses {
259 c.ntfnState.notifyReceived[addr] = struct{}{}
265 // inMessage is the first type that an incoming message is unmarshaled
266 // into. It supports both requests (for notification support) and
267 // responses. The partially-unmarshaled message is a notification if
268 // the embedded ID (from the response) is nil. Otherwise, it is a
271 ID *float64 `json:"id"`
276 // rawNotification is a partially-unmarshaled JSON-RPC notification.
277 rawNotification struct {
278 Method string `json:"method"`
279 Params []json.RawMessage `json:"params"`
282 // rawResponse is a partially-unmarshaled JSON-RPC response. For this
283 // to be valid (according to JSON-RPC 1.0 spec), ID may not be nil.
285 Result json.RawMessage `json:"result"`
286 Error *btcjson.RPCError `json:"error"`
290 // response is the raw bytes of a JSON-RPC result, or the error if the response
291 // error object was non-null.
292 type response struct {
297 // result checks whether the unmarshaled response contains a non-nil error,
298 // returning an unmarshaled btcjson.RPCError (or an unmarshaling error) if so.
299 // If the response is not an error, the raw bytes of the request are
300 // returned for further unmashaling into specific result types.
301 func (r rawResponse) result() (result []byte, err error) {
308 // handleMessage is the main handler for incoming notifications and responses.
309 func (c *Client) handleMessage(msg []byte) {
310 // Attempt to unmarshal the message as either a notification or
313 err := json.Unmarshal(msg, &in)
315 log.Warnf("Remote server sent invalid message: %v", err)
319 // JSON-RPC 1.0 notifications are requests with a null id.
321 ntfn := in.rawNotification
323 log.Warn("Malformed notification: missing " +
324 "method and parameters")
327 if ntfn.Method == "" {
328 log.Warn("Malformed notification: missing method")
331 // params are not optional: nil isn't valid (but len == 0 is)
332 if ntfn.Params == nil {
333 log.Warn("Malformed notification: missing params")
336 // Deliver the notification.
337 log.Tracef("Received notification [%s]", in.Method)
338 c.handleNotification(in.rawNotification)
342 // ensure that in.ID can be converted to an integer without loss of precision
343 if *in.ID < 0 || *in.ID != math.Trunc(*in.ID) {
344 log.Warn("Malformed response: invalid identifier")
348 if in.rawResponse == nil {
349 log.Warn("Malformed response: missing result and error")
354 log.Tracef("Received response for id %d (result %s)", id, in.Result)
355 request := c.removeRequest(id)
357 // Nothing more to do if there is no request associated with this reply.
358 if request == nil || request.responseChan == nil {
359 log.Warnf("Received unexpected reply: %s (id %d)", in.Result,
364 // Since the command was successful, examine it to see if it's a
365 // notification, and if is, add it to the notification state so it
366 // can automatically be re-established on reconnect.
367 c.trackRegisteredNtfns(request.cmd)
369 // Deliver the response.
370 result, err := in.rawResponse.result()
371 request.responseChan <- &response{result: result, err: err}
374 // shouldLogReadError returns whether or not the passed error, which is expected
375 // to have come from reading from the websocket connection in wsInHandler,
377 func (c *Client) shouldLogReadError(err error) bool {
378 // No logging when the connetion is being forcibly disconnected.
385 // No logging when the connection has been disconnected.
389 if opErr, ok := err.(*net.OpError); ok && !opErr.Temporary() {
396 // wsInHandler handles all incoming messages for the websocket connection
397 // associated with the client. It must be run as a goroutine.
398 func (c *Client) wsInHandler() {
401 // Break out of the loop once the shutdown channel has been
402 // closed. Use a non-blocking select here so we fall through
410 _, msg, err := c.wsConn.ReadMessage()
412 // Log the error if it's not due to disconnecting.
413 if c.shouldLogReadError(err) {
414 log.Errorf("Websocket receive error from "+
415 "%s: %v", c.config.Host, err)
422 // Ensure the connection is closed.
425 log.Tracef("RPC client input handler done for %s", c.config.Host)
428 // disconnectChan returns a copy of the current disconnect channel. The channel
429 // is read protected by the client mutex, and is safe to call while the channel
430 // is being reassigned during a reconnect.
431 func (c *Client) disconnectChan() <-chan struct{} {
438 // wsOutHandler handles all outgoing messages for the websocket connection. It
439 // uses a buffered channel to serialize output messages while allowing the
440 // sender to continue running asynchronously. It must be run as a goroutine.
441 func (c *Client) wsOutHandler() {
444 // Send any messages ready for send until the client is
445 // disconnected closed.
447 case msg := <-c.sendChan:
448 err := c.wsConn.WriteMessage(websocket.TextMessage, msg)
454 case <-c.disconnectChan():
459 // Drain any channels before exiting so nothing is left waiting around
470 log.Tracef("RPC client output handler done for %s", c.config.Host)
473 // sendMessage sends the passed JSON to the connected server using the
474 // websocket connection. It is backed by a buffered channel, so it will not
475 // block until the send channel is full.
476 func (c *Client) sendMessage(marshalledJSON []byte) {
477 // Don't send the message if disconnected.
479 case c.sendChan <- marshalledJSON:
480 case <-c.disconnectChan():
485 // reregisterNtfns creates and sends commands needed to re-establish the current
486 // notification state associated with the client. It should only be called on
487 // on reconnect by the resendRequests function.
488 func (c *Client) reregisterNtfns() error {
489 // Nothing to do if the caller is not interested in notifications.
490 if c.ntfnHandlers == nil {
494 // In order to avoid holding the lock on the notification state for the
495 // entire time of the potentially long running RPCs issued below, make a
496 // copy of it and work from that.
498 // Also, other commands will be running concurrently which could modify
499 // the notification state (while not under the lock of course) which
500 // also register it with the remote RPC server, so this prevents double
502 c.ntfnStateLock.Lock()
503 stateCopy := c.ntfnState.Copy()
504 c.ntfnStateLock.Unlock()
506 // Reregister notifyblocks if needed.
507 if stateCopy.notifyBlocks {
508 log.Debugf("Reregistering [notifyblocks]")
509 if err := c.NotifyBlocks(); err != nil {
514 // Reregister notifynewtransactions if needed.
515 if stateCopy.notifyNewTx || stateCopy.notifyNewTxVerbose {
516 log.Debugf("Reregistering [notifynewtransactions] (verbose=%v)",
517 stateCopy.notifyNewTxVerbose)
518 err := c.NotifyNewTransactions(stateCopy.notifyNewTxVerbose)
524 // Reregister the combination of all previously registered notifyspent
525 // outpoints in one command if needed.
526 nslen := len(stateCopy.notifySpent)
528 outpoints := make([]btcjson.OutPoint, 0, nslen)
529 for op := range stateCopy.notifySpent {
530 outpoints = append(outpoints, op)
532 log.Debugf("Reregistering [notifyspent] outpoints: %v", outpoints)
533 if err := c.notifySpentInternal(outpoints).Receive(); err != nil {
538 // Reregister the combination of all previously registered
539 // notifyreceived addresses in one command if needed.
540 nrlen := len(stateCopy.notifyReceived)
542 addresses := make([]string, 0, nrlen)
543 for addr := range stateCopy.notifyReceived {
544 addresses = append(addresses, addr)
546 log.Debugf("Reregistering [notifyreceived] addresses: %v", addresses)
547 if err := c.notifyReceivedInternal(addresses).Receive(); err != nil {
555 // ignoreResends is a set of all methods for requests that are "long running"
556 // are not be reissued by the client on reconnect.
557 var ignoreResends = map[string]struct{}{
561 // resendRequests resends any requests that had not completed when the client
562 // disconnected. It is intended to be called once the client has reconnected as
563 // a separate goroutine.
564 func (c *Client) resendRequests() {
565 // Set the notification state back up. If anything goes wrong,
566 // disconnect the client.
567 if err := c.reregisterNtfns(); err != nil {
568 log.Warnf("Unable to re-establish notification state: %v", err)
573 // Since it's possible to block on send and more requests might be
574 // added by the caller while resending, make a copy of all of the
575 // requests that need to be resent now and work from the copy. This
576 // also allows the lock to be released quickly.
578 resendReqs := make([]*jsonRequest, 0, c.requestList.Len())
579 var nextElem *list.Element
580 for e := c.requestList.Front(); e != nil; e = nextElem {
583 jReq := e.Value.(*jsonRequest)
584 if _, ok := ignoreResends[jReq.method]; ok {
585 // If a request is not sent on reconnect, remove it
586 // from the request structures, since no reply is
588 delete(c.requestMap, jReq.id)
589 c.requestList.Remove(e)
591 resendReqs = append(resendReqs, jReq)
594 c.requestLock.Unlock()
596 for _, jReq := range resendReqs {
597 // Stop resending commands if the client disconnected again
598 // since the next reconnect will handle them.
599 if c.Disconnected() {
603 log.Tracef("Sending command [%s] with id %d", jReq.method,
605 c.sendMessage(jReq.marshalledJSON)
609 // wsReconnectHandler listens for client disconnects and automatically tries
610 // to reconnect with retry interval that scales based on the number of retries.
611 // It also resends any commands that had not completed when the client
612 // disconnected so the disconnect/reconnect process is largely transparent to
613 // the caller. This function is not run when the DisableAutoReconnect config
616 // This function must be run as a goroutine.
617 func (c *Client) wsReconnectHandler() {
622 // On disconnect, fallthrough to reestablish the
637 wsConn, err := dial(c.config)
640 log.Infof("Failed to connect to %s: %v",
643 // Scale the retry interval by the number of
644 // retries so there is a backoff up to a max
646 scaledInterval := connectionRetryInterval.Nanoseconds() * c.retryCount
647 scaledDuration := time.Duration(scaledInterval)
648 if scaledDuration > time.Minute {
649 scaledDuration = time.Minute
651 log.Infof("Retrying connection to %s in "+
652 "%s", c.config.Host, scaledDuration)
653 time.Sleep(scaledDuration)
657 log.Infof("Reestablished connection to RPC server %s",
660 // Reset the connection state and signal the reconnect
666 c.disconnect = make(chan struct{})
667 c.disconnected = false
670 // Start processing input and output for the
674 // Reissue pending requests in another goroutine since
675 // the send can block.
676 go c.resendRequests()
678 // Break out of the reconnect loop back to wait for
684 log.Tracef("RPC client reconnect handler done for %s", c.config.Host)
687 // handleSendPostMessage handles performing the passed HTTP request, reading the
688 // result, unmarshalling it, and delivering the unmarshalled result to the
689 // provided response channel.
690 func (c *Client) handleSendPostMessage(details *sendPostDetails) {
691 jReq := details.jsonRequest
692 log.Tracef("Sending command [%s] with id %d", jReq.method, jReq.id)
693 httpResponse, err := c.httpClient.Do(details.httpRequest)
695 jReq.responseChan <- &response{err: err}
699 // Read the raw bytes and close the response.
700 respBytes, err := ioutil.ReadAll(httpResponse.Body)
701 httpResponse.Body.Close()
703 err = fmt.Errorf("error reading json reply: %v", err)
704 jReq.responseChan <- &response{err: err}
708 // Try to unmarshal the response as a regular JSON-RPC response.
710 err = json.Unmarshal(respBytes, &resp)
712 // When the response itself isn't a valid JSON-RPC response
713 // return an error which includes the HTTP status code and raw
715 err = fmt.Errorf("status code: %d, response: %q",
716 httpResponse.StatusCode, string(respBytes))
717 jReq.responseChan <- &response{err: err}
721 res, err := resp.result()
722 jReq.responseChan <- &response{result: res, err: err}
725 // sendPostHandler handles all outgoing messages when the client is running
726 // in HTTP POST mode. It uses a buffered channel to serialize output messages
727 // while allowing the sender to continue running asynchronously. It must be run
729 func (c *Client) sendPostHandler() {
732 // Send any messages ready for send until the shutdown channel
735 case details := <-c.sendPostChan:
736 c.handleSendPostMessage(details)
743 // Drain any wait channels before exiting so nothing is left waiting
748 case details := <-c.sendPostChan:
749 details.jsonRequest.responseChan <- &response{
751 err: ErrClientShutdown,
759 log.Tracef("RPC client send handler done for %s", c.config.Host)
763 // sendPostRequest sends the passed HTTP request to the RPC server using the
764 // HTTP client associated with the client. It is backed by a buffered channel,
765 // so it will not block until the send channel is full.
766 func (c *Client) sendPostRequest(httpReq *http.Request, jReq *jsonRequest) {
767 // Don't send the message if shutting down.
770 jReq.responseChan <- &response{result: nil, err: ErrClientShutdown}
774 c.sendPostChan <- &sendPostDetails{
776 httpRequest: httpReq,
780 // newFutureError returns a new future result channel that already has the
781 // passed error waitin on the channel with the reply set to nil. This is useful
782 // to easily return errors from the various Async functions.
783 func newFutureError(err error) chan *response {
784 responseChan := make(chan *response, 1)
785 responseChan <- &response{err: err}
789 // receiveFuture receives from the passed futureResult channel to extract a
790 // reply or any errors. The examined errors include an error in the
791 // futureResult and the error in the reply from the server. This will block
792 // until the result is available on the passed channel.
793 func receiveFuture(f chan *response) ([]byte, error) {
794 // Wait for a response on the returned channel.
796 return r.result, r.err
799 // sendPost sends the passed request to the server by issuing an HTTP POST
800 // request using the provided response channel for the reply. Typically a new
801 // connection is opened and closed for each command when using this method,
802 // however, the underlying HTTP client might coalesce multiple commands
803 // depending on several factors including the remote server configuration.
804 func (c *Client) sendPost(jReq *jsonRequest) {
805 // Generate a request to the configured RPC server.
807 if !c.config.DisableTLS {
810 url := protocol + "://" + c.config.Host
811 bodyReader := bytes.NewReader(jReq.marshalledJSON)
812 httpReq, err := http.NewRequest("POST", url, bodyReader)
814 jReq.responseChan <- &response{result: nil, err: err}
818 httpReq.Header.Set("Content-Type", "application/json")
820 // Configure basic access authorization.
821 httpReq.SetBasicAuth(c.config.User, c.config.Pass)
823 log.Tracef("Sending command [%s] with id %d", jReq.method, jReq.id)
824 c.sendPostRequest(httpReq, jReq)
827 // sendRequest sends the passed json request to the associated server using the
828 // provided response channel for the reply. It handles both websocket and HTTP
829 // POST mode depending on the configuration of the client.
830 func (c *Client) sendRequest(jReq *jsonRequest) {
831 // Choose which marshal and send function to use depending on whether
832 // the client running in HTTP POST mode or not. When running in HTTP
833 // POST mode, the command is issued via an HTTP client. Otherwise,
834 // the command is issued via the asynchronous websocket channels.
835 if c.config.HTTPPostMode {
840 // Check whether the websocket connection has never been established,
841 // in which case the handler goroutines are not running.
843 case <-c.connEstablished:
845 jReq.responseChan <- &response{err: ErrClientNotConnected}
849 // Add the request to the internal tracking map so the response from the
850 // remote server can be properly detected and routed to the response
851 // channel. Then send the marshalled request via the websocket
853 if err := c.addRequest(jReq); err != nil {
854 jReq.responseChan <- &response{err: err}
857 log.Tracef("Sending command [%s] with id %d", jReq.method, jReq.id)
858 c.sendMessage(jReq.marshalledJSON)
861 // sendCmd sends the passed command to the associated server and returns a
862 // response channel on which the reply will be delivered at some point in the
863 // future. It handles both websocket and HTTP POST mode depending on the
864 // configuration of the client.
865 func (c *Client) sendCmd(cmd interface{}) chan *response {
866 // Get the method associated with the command.
867 method, err := btcjson.CmdMethod(cmd)
869 return newFutureError(err)
872 // Marshal the command.
874 marshalledJSON, err := btcjson.MarshalCmd(id, cmd)
876 return newFutureError(err)
879 // Generate the request and send it along with a channel to respond on.
880 responseChan := make(chan *response, 1)
881 jReq := &jsonRequest{
885 marshalledJSON: marshalledJSON,
886 responseChan: responseChan,
893 // sendCmdAndWait sends the passed command to the associated server, waits
894 // for the reply, and returns the result from it. It will return the error
895 // field in the reply if there is one.
896 func (c *Client) sendCmdAndWait(cmd interface{}) (interface{}, error) {
897 // Marshal the command to JSON-RPC, send it to the connected server, and
898 // wait for a response on the returned channel.
899 return receiveFuture(c.sendCmd(cmd))
902 // Disconnected returns whether or not the server is disconnected. If a
903 // websocket client was created but never connected, this also returns false.
904 func (c *Client) Disconnected() bool {
909 case <-c.connEstablished:
910 return c.disconnected
916 // doDisconnect disconnects the websocket associated with the client if it
917 // hasn't already been disconnected. It will return false if the disconnect is
918 // not needed or the client is running in HTTP POST mode.
920 // This function is safe for concurrent access.
921 func (c *Client) doDisconnect() bool {
922 if c.config.HTTPPostMode {
929 // Nothing to do if already disconnected.
934 log.Tracef("Disconnecting RPC client %s", c.config.Host)
939 c.disconnected = true
943 // doShutdown closes the shutdown channel and logs the shutdown unless shutdown
944 // is already in progress. It will return false if the shutdown is not needed.
946 // This function is safe for concurrent access.
947 func (c *Client) doShutdown() bool {
948 // Ignore the shutdown request if the client is already in the process
949 // of shutting down or already shutdown.
956 log.Tracef("Shutting down RPC client %s", c.config.Host)
961 // Disconnect disconnects the current websocket associated with the client. The
962 // connection will automatically be re-established unless the client was
963 // created with the DisableAutoReconnect flag.
965 // This function has no effect when the client is running in HTTP POST mode.
966 func (c *Client) Disconnect() {
967 // Nothing to do if already disconnected or running in HTTP POST mode.
968 if !c.doDisconnect() {
973 defer c.requestLock.Unlock()
975 // When operating without auto reconnect, send errors to any pending
976 // requests and shutdown the client.
977 if c.config.DisableAutoReconnect {
978 for e := c.requestList.Front(); e != nil; e = e.Next() {
979 req := e.Value.(*jsonRequest)
980 req.responseChan <- &response{
982 err: ErrClientDisconnect,
985 c.removeAllRequests()
990 // Shutdown shuts down the client by disconnecting any connections associated
991 // with the client and, when automatic reconnect is enabled, preventing future
992 // attempts to reconnect. It also stops all goroutines.
993 func (c *Client) Shutdown() {
994 // Do the shutdown under the request lock to prevent clients from
995 // adding new requests while the client shutdown process is initiated.
997 defer c.requestLock.Unlock()
999 // Ignore the shutdown request if the client is already in the process
1000 // of shutting down or already shutdown.
1001 if !c.doShutdown() {
1005 // Send the ErrClientShutdown error to any pending requests.
1006 for e := c.requestList.Front(); e != nil; e = e.Next() {
1007 req := e.Value.(*jsonRequest)
1008 req.responseChan <- &response{
1010 err: ErrClientShutdown,
1013 c.removeAllRequests()
1015 // Disconnect the client if needed.
1019 // start begins processing input and output messages.
1020 func (c *Client) start() {
1021 log.Tracef("Starting RPC client %s", c.config.Host)
1023 // Start the I/O processing handlers depending on whether the client is
1024 // in HTTP POST mode or the default websocket mode.
1025 if c.config.HTTPPostMode {
1027 go c.sendPostHandler()
1031 if c.ntfnHandlers != nil {
1032 if c.ntfnHandlers.OnClientConnected != nil {
1033 c.ntfnHandlers.OnClientConnected()
1043 // WaitForShutdown blocks until the client goroutines are stopped and the
1044 // connection is closed.
1045 func (c *Client) WaitForShutdown() {
1049 // ConnConfig describes the connection configuration parameters for the client.
1051 type ConnConfig struct {
1052 // Host is the IP address and port of the RPC server you want to connect
1056 // Endpoint is the websocket endpoint on the RPC server. This is
1060 // User is the username to use to authenticate to the RPC server.
1063 // Pass is the passphrase to use to authenticate to the RPC server.
1066 // DisableTLS specifies whether transport layer security should be
1067 // disabled. It is recommended to always use TLS if the RPC server
1068 // supports it as otherwise your username and password is sent across
1069 // the wire in cleartext.
1072 // Certificates are the bytes for a PEM-encoded certificate chain used
1073 // for the TLS connection. It has no effect if the DisableTLS parameter
1077 // Proxy specifies to connect through a SOCKS 5 proxy server. It may
1078 // be an empty string if a proxy is not required.
1081 // ProxyUser is an optional username to use for the proxy server if it
1082 // requires authentication. It has no effect if the Proxy parameter
1086 // ProxyPass is an optional password to use for the proxy server if it
1087 // requires authentication. It has no effect if the Proxy parameter
1091 // DisableAutoReconnect specifies the client should not automatically
1092 // try to reconnect to the server when it has been disconnected.
1093 DisableAutoReconnect bool
1095 // DisableConnectOnNew specifies that a websocket client connection
1096 // should not be tried when creating the client with New. Instead, the
1097 // client is created and returned unconnected, and Connect must be
1099 DisableConnectOnNew bool
1101 // HTTPPostMode instructs the client to run using multiple independent
1102 // connections issuing HTTP POST requests instead of using the default
1103 // of websockets. Websockets are generally preferred as some of the
1104 // features of the client such notifications only work with websockets,
1105 // however, not all servers support the websocket extensions, so this
1106 // flag can be set to true to use basic HTTP POST requests instead.
1109 // EnableBCInfoHacks is an option provided to enable compatiblity hacks
1110 // when connecting to blockchain.info RPC server
1111 EnableBCInfoHacks bool
1114 // newHTTPClient returns a new http client that is configured according to the
1115 // proxy and TLS settings in the associated connection configuration.
1116 func newHTTPClient(config *ConnConfig) (*http.Client, error) {
1117 // Set proxy function if there is a proxy configured.
1118 var proxyFunc func(*http.Request) (*url.URL, error)
1119 if config.Proxy != "" {
1120 proxyURL, err := url.Parse(config.Proxy)
1124 proxyFunc = http.ProxyURL(proxyURL)
1127 // Configure TLS if needed.
1128 var tlsConfig *tls.Config
1129 if !config.DisableTLS {
1130 if len(config.Certificates) > 0 {
1131 pool := x509.NewCertPool()
1132 pool.AppendCertsFromPEM(config.Certificates)
1133 tlsConfig = &tls.Config{
1139 client := http.Client{
1140 Transport: &http.Transport{
1142 TLSClientConfig: tlsConfig,
1149 // dial opens a websocket connection using the passed connection configuration
1151 func dial(config *ConnConfig) (*websocket.Conn, error) {
1152 // Setup TLS if not disabled.
1153 var tlsConfig *tls.Config
1155 if !config.DisableTLS {
1156 tlsConfig = &tls.Config{
1157 MinVersion: tls.VersionTLS12,
1159 if len(config.Certificates) > 0 {
1160 pool := x509.NewCertPool()
1161 pool.AppendCertsFromPEM(config.Certificates)
1162 tlsConfig.RootCAs = pool
1167 // Create a websocket dialer that will be used to make the connection.
1168 // It is modified by the proxy setting below as needed.
1169 dialer := websocket.Dialer{TLSClientConfig: tlsConfig}
1171 // Setup the proxy if one is configured.
1172 if config.Proxy != "" {
1173 proxy := &socks.Proxy{
1175 Username: config.ProxyUser,
1176 Password: config.ProxyPass,
1178 dialer.NetDial = proxy.Dial
1181 // The RPC server requires basic authorization, so create a custom
1182 // request header with the Authorization header set.
1183 login := config.User + ":" + config.Pass
1184 auth := "Basic " + base64.StdEncoding.EncodeToString([]byte(login))
1185 requestHeader := make(http.Header)
1186 requestHeader.Add("Authorization", auth)
1188 // Dial the connection.
1189 url := fmt.Sprintf("%s://%s/%s", scheme, config.Host, config.Endpoint)
1190 wsConn, resp, err := dialer.Dial(url, requestHeader)
1192 if err != websocket.ErrBadHandshake || resp == nil {
1196 // Detect HTTP authentication error status codes.
1197 if resp.StatusCode == http.StatusUnauthorized ||
1198 resp.StatusCode == http.StatusForbidden {
1199 return nil, ErrInvalidAuth
1202 // The connection was authenticated and the status response was
1203 // ok, but the websocket handshake still failed, so the endpoint
1204 // is invalid in some way.
1205 if resp.StatusCode == http.StatusOK {
1206 return nil, ErrInvalidEndpoint
1209 // Return the status text from the server if none of the special
1210 // cases above apply.
1211 return nil, errors.New(resp.Status)
1216 // New creates a new RPC client based on the provided connection configuration
1217 // details. The notification handlers parameter may be nil if you are not
1218 // interested in receiving notifications and will be ignored if the
1219 // configuration is set to run in HTTP POST mode.
1220 func New(config *ConnConfig, ntfnHandlers *NotificationHandlers) (*Client, error) {
1221 // Either open a websocket connection or create an HTTP client depending
1222 // on the HTTP POST mode. Also, set the notification handlers to nil
1223 // when running in HTTP POST mode.
1224 var wsConn *websocket.Conn
1225 var httpClient *http.Client
1226 connEstablished := make(chan struct{})
1228 if config.HTTPPostMode {
1233 httpClient, err = newHTTPClient(config)
1238 if !config.DisableConnectOnNew {
1240 wsConn, err = dial(config)
1251 httpClient: httpClient,
1252 requestMap: make(map[uint64]*list.Element),
1253 requestList: list.New(),
1254 ntfnHandlers: ntfnHandlers,
1255 ntfnState: newNotificationState(),
1256 sendChan: make(chan []byte, sendBufferSize),
1257 sendPostChan: make(chan *sendPostDetails, sendPostBufferSize),
1258 connEstablished: connEstablished,
1259 disconnect: make(chan struct{}),
1260 shutdown: make(chan struct{}),
1264 log.Infof("Established connection to RPC server %s",
1266 close(connEstablished)
1268 if !client.config.HTTPPostMode && !client.config.DisableAutoReconnect {
1270 go client.wsReconnectHandler()
1277 // Connect establishes the initial websocket connection. This is necessary when
1278 // a client was created after setting the DisableConnectOnNew field of the
1281 // Up to tries number of connections (each after an increasing backoff) will
1282 // be tried if the connection can not be established. The special value of 0
1283 // indicates an unlimited number of connection attempts.
1285 // This method will error if the client is not configured for websockets, if the
1286 // connection has already been established, or if none of the connection
1287 // attempts were successful.
1288 func (c *Client) Connect(tries int) error {
1290 defer c.mtx.Unlock()
1292 if c.config.HTTPPostMode {
1293 return ErrNotWebsocketClient
1295 if c.wsConn != nil {
1296 return ErrClientAlreadyConnected
1299 // Begin connection attempts. Increase the backoff after each failed
1300 // attempt, up to a maximum of one minute.
1302 var backoff time.Duration
1303 for i := 0; tries == 0 || i < tries; i++ {
1304 var wsConn *websocket.Conn
1305 wsConn, err = dial(c.config)
1307 backoff = connectionRetryInterval * time.Duration(i+1)
1308 if backoff > time.Minute {
1309 backoff = time.Minute
1315 // Connection was established. Set the websocket connection
1316 // member of the client and start the goroutines necessary
1317 // to run the client.
1318 log.Infof("Established connection to RPC server %s",
1321 close(c.connEstablished)
1323 if !c.config.DisableAutoReconnect {
1325 go c.wsReconnectHandler()
1330 // All connection attempts failed, so return the last error.