1 // Copyright (c) 2013-2016 The btcsuite developers
2 // Use of this source code is governed by an ISC
3 // license that can be found in the LICENSE file.
20 "github.com/btcsuite/btcd/blockchain"
21 "github.com/btcsuite/btcd/chaincfg"
22 "github.com/btcsuite/btcd/chaincfg/chainhash"
23 "github.com/btcsuite/btcd/wire"
24 "github.com/btcsuite/go-socks/socks"
25 "github.com/davecgh/go-spew/spew"
29 // MaxProtocolVersion is the max protocol version the peer supports.
30 MaxProtocolVersion = wire.FeeFilterVersion
32 // minAcceptableProtocolVersion is the lowest protocol version that a
33 // connected peer may support.
34 minAcceptableProtocolVersion = wire.MultipleAddressVersion
36 // outputBufferSize is the number of elements the output channels use.
39 // invTrickleSize is the maximum amount of inventory to send in a single
40 // message when trickling inventory to remote peers.
41 maxInvTrickleSize = 1000
43 // maxKnownInventory is the maximum number of items to keep in the known
45 maxKnownInventory = 1000
47 // pingInterval is the interval of time to wait in between sending ping
49 pingInterval = 2 * time.Minute
51 // negotiateTimeout is the duration of inactivity before we timeout a
52 // peer that hasn't completed the initial version negotiation.
53 negotiateTimeout = 30 * time.Second
55 // idleTimeout is the duration of inactivity before we time out a peer.
56 idleTimeout = 5 * time.Minute
58 // stallTickInterval is the interval of time between each check for
60 stallTickInterval = 15 * time.Second
62 // stallResponseTimeout is the base maximum amount of time messages that
63 // expect a response will wait before disconnecting the peer for
64 // stalling. The deadlines are adjusted for callback running times and
65 // only checked on each stall tick interval.
66 stallResponseTimeout = 30 * time.Second
68 // trickleTimeout is the duration of the ticker which trickles down the
69 // inventory to a peer.
70 trickleTimeout = 10 * time.Second
74 // nodeCount is the total number of peer connections made since startup
75 // and is used to assign an id to a peer.
78 // zeroHash is the zero value hash (all zeros). It is defined as a
80 zeroHash chainhash.Hash
82 // sentNonces houses the unique nonces that are generated when pushing
83 // version messages that are used to detect self connections.
84 sentNonces = newMruNonceMap(50)
86 // allowSelfConns is only used to allow the tests to bypass the self
87 // connection detecting and disconnect logic since they intentionally
88 // do so for testing purposes.
92 // MessageListeners defines callback function pointers to invoke with message
93 // listeners for a peer. Any listener which is not set to a concrete callback
94 // during peer initialization is ignored. Execution of multiple message
95 // listeners occurs serially, so one callback blocks the execution of the next.
97 // NOTE: Unless otherwise documented, these listeners must NOT directly call any
98 // blocking calls (such as WaitForShutdown) on the peer instance since the input
99 // handler goroutine blocks until the callback has completed. Doing so will
100 // result in a deadlock.
101 type MessageListeners struct {
102 // OnGetAddr is invoked when a peer receives a getaddr bitcoin message.
103 OnGetAddr func(p *Peer, msg *wire.MsgGetAddr)
105 // OnAddr is invoked when a peer receives an addr bitcoin message.
106 OnAddr func(p *Peer, msg *wire.MsgAddr)
108 // OnPing is invoked when a peer receives a ping bitcoin message.
109 OnPing func(p *Peer, msg *wire.MsgPing)
111 // OnPong is invoked when a peer receives a pong bitcoin message.
112 OnPong func(p *Peer, msg *wire.MsgPong)
114 // OnAlert is invoked when a peer receives an alert bitcoin message.
115 OnAlert func(p *Peer, msg *wire.MsgAlert)
117 // OnMemPool is invoked when a peer receives a mempool bitcoin message.
118 OnMemPool func(p *Peer, msg *wire.MsgMemPool)
120 // OnTx is invoked when a peer receives a tx bitcoin message.
121 OnTx func(p *Peer, msg *wire.MsgTx)
123 // OnBlock is invoked when a peer receives a block bitcoin message.
124 OnBlock func(p *Peer, msg *wire.MsgBlock, buf []byte)
126 // OnInv is invoked when a peer receives an inv bitcoin message.
127 OnInv func(p *Peer, msg *wire.MsgInv)
129 // OnHeaders is invoked when a peer receives a headers bitcoin message.
130 OnHeaders func(p *Peer, msg *wire.MsgHeaders)
132 // OnNotFound is invoked when a peer receives a notfound bitcoin
134 OnNotFound func(p *Peer, msg *wire.MsgNotFound)
136 // OnGetData is invoked when a peer receives a getdata bitcoin message.
137 OnGetData func(p *Peer, msg *wire.MsgGetData)
139 // OnGetBlocks is invoked when a peer receives a getblocks bitcoin
141 OnGetBlocks func(p *Peer, msg *wire.MsgGetBlocks)
143 // OnGetHeaders is invoked when a peer receives a getheaders bitcoin
145 OnGetHeaders func(p *Peer, msg *wire.MsgGetHeaders)
147 // OnFeeFilter is invoked when a peer receives a feefilter bitcoin message.
148 OnFeeFilter func(p *Peer, msg *wire.MsgFeeFilter)
150 // OnFilterAdd is invoked when a peer receives a filteradd bitcoin message.
151 OnFilterAdd func(p *Peer, msg *wire.MsgFilterAdd)
153 // OnFilterClear is invoked when a peer receives a filterclear bitcoin
155 OnFilterClear func(p *Peer, msg *wire.MsgFilterClear)
157 // OnFilterLoad is invoked when a peer receives a filterload bitcoin
159 OnFilterLoad func(p *Peer, msg *wire.MsgFilterLoad)
161 // OnMerkleBlock is invoked when a peer receives a merkleblock bitcoin
163 OnMerkleBlock func(p *Peer, msg *wire.MsgMerkleBlock)
165 // OnVersion is invoked when a peer receives a version bitcoin message.
166 OnVersion func(p *Peer, msg *wire.MsgVersion)
168 // OnVerAck is invoked when a peer receives a verack bitcoin message.
169 OnVerAck func(p *Peer, msg *wire.MsgVerAck)
171 // OnReject is invoked when a peer receives a reject bitcoin message.
172 OnReject func(p *Peer, msg *wire.MsgReject)
174 // OnSendHeaders is invoked when a peer receives a sendheaders bitcoin
176 OnSendHeaders func(p *Peer, msg *wire.MsgSendHeaders)
178 // OnRead is invoked when a peer receives a bitcoin message. It
179 // consists of the number of bytes read, the message, and whether or not
180 // an error in the read occurred. Typically, callers will opt to use
181 // the callbacks for the specific message types, however this can be
182 // useful for circumstances such as keeping track of server-wide byte
183 // counts or working with custom message types for which the peer does
184 // not directly provide a callback.
185 OnRead func(p *Peer, bytesRead int, msg wire.Message, err error)
187 // OnWrite is invoked when we write a bitcoin message to a peer. It
188 // consists of the number of bytes written, the message, and whether or
189 // not an error in the write occurred. This can be useful for
190 // circumstances such as keeping track of server-wide byte counts.
191 OnWrite func(p *Peer, bytesWritten int, msg wire.Message, err error)
194 // Config is the struct to hold configuration options useful to Peer.
196 // NewestBlock specifies a callback which provides the newest block
197 // details to the peer as needed. This can be nil in which case the
198 // peer will report a block height of 0, however it is good practice for
199 // peers to specify this so their currently best known is accurately
203 // HostToNetAddress returns the netaddress for the given host. This can be
204 // nil in which case the host will be parsed as an IP address.
205 HostToNetAddress HostToNetAddrFunc
207 // Proxy indicates a proxy is being used for connections. The only
208 // effect this has is to prevent leaking the tor proxy address, so it
209 // only needs to specified if using a tor proxy.
212 // UserAgentName specifies the user agent name to advertise. It is
213 // highly recommended to specify this value.
216 // UserAgentVersion specifies the user agent version to advertise. It
217 // is highly recommended to specify this value and that it follows the
218 // form "major.minor.revision" e.g. "2.6.41".
219 UserAgentVersion string
221 // UserAgentComments specify the user agent comments to advertise. These
222 // values must not contain the illegal characters specified in BIP 14:
223 // '/', ':', '(', ')'.
224 UserAgentComments []string
226 // ChainParams identifies which chain parameters the peer is associated
227 // with. It is highly recommended to specify this field, however it can
228 // be omitted in which case the test network will be used.
229 ChainParams *chaincfg.Params
231 // Services specifies which services to advertise as supported by the
232 // local peer. This field can be omitted in which case it will be 0
233 // and therefore advertise no supported services.
234 Services wire.ServiceFlag
236 // ProtocolVersion specifies the maximum protocol version to use and
237 // advertise. This field can be omitted in which case
238 // peer.MaxProtocolVersion will be used.
239 ProtocolVersion uint32
241 // DisableRelayTx specifies if the remote peer should be informed to
242 // not send inv messages for transactions.
245 // Listeners houses callback functions to be invoked on receiving peer
247 Listeners MessageListeners
250 // minUint32 is a helper function to return the minimum of two uint32s.
251 // This avoids a math import and the need to cast to floats.
252 func minUint32(a, b uint32) uint32 {
259 // newNetAddress attempts to extract the IP address and port from the passed
260 // net.Addr interface and create a bitcoin NetAddress structure using that
262 func newNetAddress(addr net.Addr, services wire.ServiceFlag) (*wire.NetAddress, error) {
263 // addr will be a net.TCPAddr when not using a proxy.
264 if tcpAddr, ok := addr.(*net.TCPAddr); ok {
266 port := uint16(tcpAddr.Port)
267 na := wire.NewNetAddressIPPort(ip, port, services)
271 // addr will be a socks.ProxiedAddr when using a proxy.
272 if proxiedAddr, ok := addr.(*socks.ProxiedAddr); ok {
273 ip := net.ParseIP(proxiedAddr.Host)
275 ip = net.ParseIP("0.0.0.0")
277 port := uint16(proxiedAddr.Port)
278 na := wire.NewNetAddressIPPort(ip, port, services)
282 // For the most part, addr should be one of the two above cases, but
283 // to be safe, fall back to trying to parse the information from the
284 // address string as a last resort.
285 host, portStr, err := net.SplitHostPort(addr.String())
289 ip := net.ParseIP(host)
290 port, err := strconv.ParseUint(portStr, 10, 16)
294 na := wire.NewNetAddressIPPort(ip, uint16(port), services)
298 // outMsg is used to house a message to be sent along with a channel to signal
299 // when the message has been sent (or won't be sent due to things such as
303 doneChan chan<- struct{}
304 encoding wire.MessageEncoding
307 // stallControlCmd represents the command of a stall control message.
308 type stallControlCmd uint8
310 // Constants for the command of a stall control message.
312 // sccSendMessage indicates a message is being sent to the remote peer.
313 sccSendMessage stallControlCmd = iota
315 // sccReceiveMessage indicates a message has been received from the
319 // sccHandlerStart indicates a callback handler is about to be invoked.
322 // sccHandlerStart indicates a callback handler has completed.
326 // stallControlMsg is used to signal the stall handler about specific events
327 // so it can properly detect and handle stalled remote peers.
328 type stallControlMsg struct {
329 command stallControlCmd
333 // StatsSnap is a snapshot of peer stats at a point in time.
334 type StatsSnap struct {
337 Services wire.ServiceFlag
350 LastPingTime time.Time
354 // HashFunc is a function which returns a block hash, height and error
355 // It is used as a callback to get newest block details.
356 type HashFunc func() (hash *chainhash.Hash, height int32, err error)
358 // AddrFunc is a func which takes an address and returns a related address.
359 type AddrFunc func(remoteAddr *wire.NetAddress) *wire.NetAddress
361 // HostToNetAddrFunc is a func which takes a host, port, services and returns
363 type HostToNetAddrFunc func(host string, port uint16,
364 services wire.ServiceFlag) (*wire.NetAddress, error)
366 // NOTE: The overall data flow of a peer is split into 3 goroutines. Inbound
367 // messages are read via the inHandler goroutine and generally dispatched to
368 // their own handler. For inbound data-related messages such as blocks,
369 // transactions, and inventory, the data is handled by the corresponding
370 // message handlers. The data flow for outbound messages is split into 2
371 // goroutines, queueHandler and outHandler. The first, queueHandler, is used
372 // as a way for external entities to queue messages, by way of the QueueMessage
373 // function, quickly regardless of whether the peer is currently sending or not.
374 // It acts as the traffic cop between the external world and the actual
375 // goroutine which writes to the network socket.
377 // Peer provides a basic concurrent safe bitcoin peer for handling bitcoin
378 // communications via the peer-to-peer protocol. It provides full duplex
379 // reading and writing, automatic handling of the initial handshake process,
380 // querying of usage statistics and other information about the remote peer such
381 // as its address, user agent, and protocol version, output message queuing,
382 // inventory trickling, and the ability to dynamically register and unregister
383 // callbacks for handling bitcoin protocol messages.
385 // Outbound messages are typically queued via QueueMessage or QueueInventory.
386 // QueueMessage is intended for all messages, including responses to data such
387 // as blocks and transactions. QueueInventory, on the other hand, is only
388 // intended for relaying inventory as it employs a trickling mechanism to batch
389 // the inventory together. However, some helper functions for pushing messages
390 // of specific types that typically require common special handling are
391 // provided as a convenience.
393 // The following variables must only be used atomically.
403 // These fields are set at creation time and never modified, so they are
404 // safe to read from concurrently without a mutex.
409 flagsMtx sync.Mutex // protects the peer flags below
413 services wire.ServiceFlag
415 advertisedProtoVer uint32 // protocol version advertised by remote
416 protocolVersion uint32 // negotiated protocol version
417 sendHeadersPreferred bool // peer sent a sendheaders message
421 wireEncoding wire.MessageEncoding
423 knownInventory *mruInventoryMap
424 prevGetBlocksMtx sync.Mutex
425 prevGetBlocksBegin *chainhash.Hash
426 prevGetBlocksStop *chainhash.Hash
427 prevGetHdrsMtx sync.Mutex
428 prevGetHdrsBegin *chainhash.Hash
429 prevGetHdrsStop *chainhash.Hash
431 // These fields keep track of statistics for the peer and are protected
432 // by the statsMtx mutex.
433 statsMtx sync.RWMutex
435 timeConnected time.Time
438 lastAnnouncedBlock *chainhash.Hash
439 lastPingNonce uint64 // Set to nonce if we have a pending ping.
440 lastPingTime time.Time // Time we sent last ping.
441 lastPingMicros int64 // Time for last ping to return.
443 stallControl chan stallControlMsg
444 outputQueue chan outMsg
445 sendQueue chan outMsg
446 sendDoneQueue chan struct{}
447 outputInvChan chan *wire.InvVect
449 queueQuit chan struct{}
450 outQuit chan struct{}
454 // String returns the peer's address and directionality as a human-readable
457 // This function is safe for concurrent access.
458 func (p *Peer) String() string {
459 return fmt.Sprintf("%s (%s)", p.addr, directionString(p.inbound))
462 // UpdateLastBlockHeight updates the last known block for the peer.
464 // This function is safe for concurrent access.
465 func (p *Peer) UpdateLastBlockHeight(newHeight int32) {
467 log.Tracef("Updating last block height of peer %v from %v to %v",
468 p.addr, p.lastBlock, newHeight)
469 p.lastBlock = newHeight
473 // UpdateLastAnnouncedBlock updates meta-data about the last block hash this
474 // peer is known to have announced.
476 // This function is safe for concurrent access.
477 func (p *Peer) UpdateLastAnnouncedBlock(blkHash *chainhash.Hash) {
478 log.Tracef("Updating last blk for peer %v, %v", p.addr, blkHash)
481 p.lastAnnouncedBlock = blkHash
485 // AddKnownInventory adds the passed inventory to the cache of known inventory
488 // This function is safe for concurrent access.
489 func (p *Peer) AddKnownInventory(invVect *wire.InvVect) {
490 p.knownInventory.Add(invVect)
493 // StatsSnapshot returns a snapshot of the current peer flags and statistics.
495 // This function is safe for concurrent access.
496 func (p *Peer) StatsSnapshot() *StatsSnap {
502 userAgent := p.userAgent
503 services := p.services
504 protocolVersion := p.advertisedProtoVer
507 // Get a copy of all relevant flags and stats.
508 statsSnap := &StatsSnap{
511 UserAgent: userAgent,
513 LastSend: p.LastSend(),
514 LastRecv: p.LastRecv(),
515 BytesSent: p.BytesSent(),
516 BytesRecv: p.BytesReceived(),
517 ConnTime: p.timeConnected,
518 TimeOffset: p.timeOffset,
519 Version: protocolVersion,
521 StartingHeight: p.startingHeight,
522 LastBlock: p.lastBlock,
523 LastPingNonce: p.lastPingNonce,
524 LastPingMicros: p.lastPingMicros,
525 LastPingTime: p.lastPingTime,
532 // ID returns the peer id.
534 // This function is safe for concurrent access.
535 func (p *Peer) ID() int32 {
543 // NA returns the peer network address.
545 // This function is safe for concurrent access.
546 func (p *Peer) NA() *wire.NetAddress {
554 // Addr returns the peer address.
556 // This function is safe for concurrent access.
557 func (p *Peer) Addr() string {
558 // The address doesn't change after initialization, therefore it is not
559 // protected by a mutex.
563 // Inbound returns whether the peer is inbound.
565 // This function is safe for concurrent access.
566 func (p *Peer) Inbound() bool {
570 // Services returns the services flag of the remote peer.
572 // This function is safe for concurrent access.
573 func (p *Peer) Services() wire.ServiceFlag {
575 services := p.services
581 // UserAgent returns the user agent of the remote peer.
583 // This function is safe for concurrent access.
584 func (p *Peer) UserAgent() string {
586 userAgent := p.userAgent
592 // LastAnnouncedBlock returns the last announced block of the remote peer.
594 // This function is safe for concurrent access.
595 func (p *Peer) LastAnnouncedBlock() *chainhash.Hash {
597 lastAnnouncedBlock := p.lastAnnouncedBlock
600 return lastAnnouncedBlock
603 // LastPingNonce returns the last ping nonce of the remote peer.
605 // This function is safe for concurrent access.
606 func (p *Peer) LastPingNonce() uint64 {
608 lastPingNonce := p.lastPingNonce
614 // LastPingTime returns the last ping time of the remote peer.
616 // This function is safe for concurrent access.
617 func (p *Peer) LastPingTime() time.Time {
619 lastPingTime := p.lastPingTime
625 // LastPingMicros returns the last ping micros of the remote peer.
627 // This function is safe for concurrent access.
628 func (p *Peer) LastPingMicros() int64 {
630 lastPingMicros := p.lastPingMicros
633 return lastPingMicros
636 // VersionKnown returns the whether or not the version of a peer is known
639 // This function is safe for concurrent access.
640 func (p *Peer) VersionKnown() bool {
642 versionKnown := p.versionKnown
648 // VerAckReceived returns whether or not a verack message was received by the
651 // This function is safe for concurrent access.
652 func (p *Peer) VerAckReceived() bool {
654 verAckReceived := p.verAckReceived
657 return verAckReceived
660 // ProtocolVersion returns the negotiated peer protocol version.
662 // This function is safe for concurrent access.
663 func (p *Peer) ProtocolVersion() uint32 {
665 protocolVersion := p.protocolVersion
668 return protocolVersion
671 // LastBlock returns the last block of the peer.
673 // This function is safe for concurrent access.
674 func (p *Peer) LastBlock() int32 {
676 lastBlock := p.lastBlock
682 // LastSend returns the last send time of the peer.
684 // This function is safe for concurrent access.
685 func (p *Peer) LastSend() time.Time {
686 return time.Unix(atomic.LoadInt64(&p.lastSend), 0)
689 // LastRecv returns the last recv time of the peer.
691 // This function is safe for concurrent access.
692 func (p *Peer) LastRecv() time.Time {
693 return time.Unix(atomic.LoadInt64(&p.lastRecv), 0)
696 // LocalAddr returns the local address of the connection.
698 // This function is safe fo concurrent access.
699 func (p *Peer) LocalAddr() net.Addr {
700 var localAddr net.Addr
701 if atomic.LoadInt32(&p.connected) != 0 {
702 localAddr = p.conn.LocalAddr()
707 // BytesSent returns the total number of bytes sent by the peer.
709 // This function is safe for concurrent access.
710 func (p *Peer) BytesSent() uint64 {
711 return atomic.LoadUint64(&p.bytesSent)
714 // BytesReceived returns the total number of bytes received by the peer.
716 // This function is safe for concurrent access.
717 func (p *Peer) BytesReceived() uint64 {
718 return atomic.LoadUint64(&p.bytesReceived)
721 // TimeConnected returns the time at which the peer connected.
723 // This function is safe for concurrent access.
724 func (p *Peer) TimeConnected() time.Time {
726 timeConnected := p.timeConnected
732 // TimeOffset returns the number of seconds the local time was offset from the
733 // time the peer reported during the initial negotiation phase. Negative values
734 // indicate the remote peer's time is before the local time.
736 // This function is safe for concurrent access.
737 func (p *Peer) TimeOffset() int64 {
739 timeOffset := p.timeOffset
745 // StartingHeight returns the last known height the peer reported during the
746 // initial negotiation phase.
748 // This function is safe for concurrent access.
749 func (p *Peer) StartingHeight() int32 {
751 startingHeight := p.startingHeight
754 return startingHeight
757 // WantsHeaders returns if the peer wants header messages instead of
758 // inventory vectors for blocks.
760 // This function is safe for concurrent access.
761 func (p *Peer) WantsHeaders() bool {
763 sendHeadersPreferred := p.sendHeadersPreferred
766 return sendHeadersPreferred
769 // IsWitnessEnabled returns true if the peer has signalled that it supports
770 // segregated witness.
772 // This function is safe for concurrent access.
773 func (p *Peer) IsWitnessEnabled() bool {
775 witnessEnabled := p.witnessEnabled
778 return witnessEnabled
781 // localVersionMsg creates a version message that can be used to send to the
783 func (p *Peer) localVersionMsg() (*wire.MsgVersion, error) {
785 if p.cfg.NewestBlock != nil {
787 _, blockNum, err = p.cfg.NewestBlock()
795 // If we are behind a proxy and the connection comes from the proxy then
796 // we return an unroutable address as their address. This is to prevent
797 // leaking the tor proxy address.
798 if p.cfg.Proxy != "" {
799 proxyaddress, _, err := net.SplitHostPort(p.cfg.Proxy)
800 // invalid proxy means poorly configured, be on the safe side.
801 if err != nil || p.na.IP.String() == proxyaddress {
802 theirNA = wire.NewNetAddressIPPort(net.IP([]byte{0, 0, 0, 0}), 0, 0)
806 // Create a wire.NetAddress with only the services set to use as the
807 // "addrme" in the version message.
809 // Older nodes previously added the IP and port information to the
810 // address manager which proved to be unreliable as an inbound
811 // connection from a peer didn't necessarily mean the peer itself
812 // accepted inbound connections.
814 // Also, the timestamp is unused in the version message.
815 ourNA := &wire.NetAddress{
816 Services: p.cfg.Services,
819 // Generate a unique nonce for this peer so self connections can be
820 // detected. This is accomplished by adding it to a size-limited map of
821 // recently seen nonces.
822 nonce := uint64(rand.Int63())
823 sentNonces.Add(nonce)
826 msg := wire.NewMsgVersion(ourNA, theirNA, nonce, blockNum)
827 msg.AddUserAgent(p.cfg.UserAgentName, p.cfg.UserAgentVersion,
828 p.cfg.UserAgentComments...)
830 // XXX: bitcoind appears to always enable the full node services flag
831 // of the remote peer netaddress field in the version message regardless
832 // of whether it knows it supports it or not. Also, bitcoind sets
833 // the services field of the local peer to 0 regardless of support.
835 // Realistically, this should be set as follows:
836 // - For outgoing connections:
837 // - Set the local netaddress services to what the local peer
839 // - Set the remote netaddress services to 0 to indicate no services
840 // as they are still unknown
841 // - For incoming connections:
842 // - Set the local netaddress services to what the local peer
844 // - Set the remote netaddress services to the what was advertised by
845 // by the remote peer in its version message
846 msg.AddrYou.Services = wire.SFNodeNetwork
848 // Advertise the services flag
849 msg.Services = p.cfg.Services
851 // Advertise our max supported protocol version.
852 msg.ProtocolVersion = int32(p.cfg.ProtocolVersion)
854 // Advertise if inv messages for transactions are desired.
855 msg.DisableRelayTx = p.cfg.DisableRelayTx
860 // PushAddrMsg sends an addr message to the connected peer using the provided
861 // addresses. This function is useful over manually sending the message via
862 // QueueMessage since it automatically limits the addresses to the maximum
863 // number allowed by the message and randomizes the chosen addresses when there
864 // are too many. It returns the addresses that were actually sent and no
865 // message will be sent if there are no entries in the provided addresses slice.
867 // This function is safe for concurrent access.
868 func (p *Peer) PushAddrMsg(addresses []*wire.NetAddress) ([]*wire.NetAddress, error) {
869 addressCount := len(addresses)
872 if addressCount == 0 {
876 msg := wire.NewMsgAddr()
877 msg.AddrList = make([]*wire.NetAddress, addressCount)
878 copy(msg.AddrList, addresses)
880 // Randomize the addresses sent if there are more than the maximum allowed.
881 if addressCount > wire.MaxAddrPerMsg {
882 // Shuffle the address list.
883 for i := 0; i < wire.MaxAddrPerMsg; i++ {
884 j := i + rand.Intn(addressCount-i)
885 msg.AddrList[i], msg.AddrList[j] = msg.AddrList[j], msg.AddrList[i]
888 // Truncate it to the maximum size.
889 msg.AddrList = msg.AddrList[:wire.MaxAddrPerMsg]
892 p.QueueMessage(msg, nil)
893 return msg.AddrList, nil
896 // PushGetBlocksMsg sends a getblocks message for the provided block locator
897 // and stop hash. It will ignore back-to-back duplicate requests.
899 // This function is safe for concurrent access.
900 func (p *Peer) PushGetBlocksMsg(locator blockchain.BlockLocator, stopHash *chainhash.Hash) error {
901 // Extract the begin hash from the block locator, if one was specified,
902 // to use for filtering duplicate getblocks requests.
903 var beginHash *chainhash.Hash
904 if len(locator) > 0 {
905 beginHash = locator[0]
908 // Filter duplicate getblocks requests.
909 p.prevGetBlocksMtx.Lock()
910 isDuplicate := p.prevGetBlocksStop != nil && p.prevGetBlocksBegin != nil &&
911 beginHash != nil && stopHash.IsEqual(p.prevGetBlocksStop) &&
912 beginHash.IsEqual(p.prevGetBlocksBegin)
913 p.prevGetBlocksMtx.Unlock()
916 log.Tracef("Filtering duplicate [getblocks] with begin "+
917 "hash %v, stop hash %v", beginHash, stopHash)
921 // Construct the getblocks request and queue it to be sent.
922 msg := wire.NewMsgGetBlocks(stopHash)
923 for _, hash := range locator {
924 err := msg.AddBlockLocatorHash(hash)
929 p.QueueMessage(msg, nil)
931 // Update the previous getblocks request information for filtering
933 p.prevGetBlocksMtx.Lock()
934 p.prevGetBlocksBegin = beginHash
935 p.prevGetBlocksStop = stopHash
936 p.prevGetBlocksMtx.Unlock()
940 // PushGetHeadersMsg sends a getblocks message for the provided block locator
941 // and stop hash. It will ignore back-to-back duplicate requests.
943 // This function is safe for concurrent access.
944 func (p *Peer) PushGetHeadersMsg(locator blockchain.BlockLocator, stopHash *chainhash.Hash) error {
945 // Extract the begin hash from the block locator, if one was specified,
946 // to use for filtering duplicate getheaders requests.
947 var beginHash *chainhash.Hash
948 if len(locator) > 0 {
949 beginHash = locator[0]
952 // Filter duplicate getheaders requests.
953 p.prevGetHdrsMtx.Lock()
954 isDuplicate := p.prevGetHdrsStop != nil && p.prevGetHdrsBegin != nil &&
955 beginHash != nil && stopHash.IsEqual(p.prevGetHdrsStop) &&
956 beginHash.IsEqual(p.prevGetHdrsBegin)
957 p.prevGetHdrsMtx.Unlock()
960 log.Tracef("Filtering duplicate [getheaders] with begin hash %v",
965 // Construct the getheaders request and queue it to be sent.
966 msg := wire.NewMsgGetHeaders()
967 msg.HashStop = *stopHash
968 for _, hash := range locator {
969 err := msg.AddBlockLocatorHash(hash)
974 p.QueueMessage(msg, nil)
976 // Update the previous getheaders request information for filtering
978 p.prevGetHdrsMtx.Lock()
979 p.prevGetHdrsBegin = beginHash
980 p.prevGetHdrsStop = stopHash
981 p.prevGetHdrsMtx.Unlock()
985 // PushRejectMsg sends a reject message for the provided command, reject code,
986 // reject reason, and hash. The hash will only be used when the command is a tx
987 // or block and should be nil in other cases. The wait parameter will cause the
988 // function to block until the reject message has actually been sent.
990 // This function is safe for concurrent access.
991 func (p *Peer) PushRejectMsg(command string, code wire.RejectCode, reason string, hash *chainhash.Hash, wait bool) {
992 // Don't bother sending the reject message if the protocol version
994 if p.VersionKnown() && p.ProtocolVersion() < wire.RejectVersion {
998 msg := wire.NewMsgReject(command, code, reason)
999 if command == wire.CmdTx || command == wire.CmdBlock {
1001 log.Warnf("Sending a reject message for command "+
1002 "type %v which should have specified a hash "+
1003 "but does not", command)
1009 // Send the message without waiting if the caller has not requested it.
1011 p.QueueMessage(msg, nil)
1015 // Send the message and block until it has been sent before returning.
1016 doneChan := make(chan struct{}, 1)
1017 p.QueueMessage(msg, doneChan)
1021 // handleRemoteVersionMsg is invoked when a version bitcoin message is received
1022 // from the remote peer. It will return an error if the remote peer's version
1023 // is not compatible with ours.
1024 func (p *Peer) handleRemoteVersionMsg(msg *wire.MsgVersion) error {
1025 // Detect self connections.
1026 if !allowSelfConns && sentNonces.Exists(msg.Nonce) {
1027 return errors.New("disconnecting peer connected to self")
1030 // Notify and disconnect clients that have a protocol version that is
1033 // NOTE: If minAcceptableProtocolVersion is raised to be higher than
1034 // wire.RejectVersion, this should send a reject packet before
1036 if uint32(msg.ProtocolVersion) < minAcceptableProtocolVersion {
1037 reason := fmt.Sprintf("protocol version must be %d or greater",
1038 minAcceptableProtocolVersion)
1039 return errors.New(reason)
1042 // Updating a bunch of stats including block based stats, and the
1043 // peer's time offset.
1045 p.lastBlock = msg.LastBlock
1046 p.startingHeight = msg.LastBlock
1047 p.timeOffset = msg.Timestamp.Unix() - time.Now().Unix()
1050 // Negotiate the protocol version.
1052 p.advertisedProtoVer = uint32(msg.ProtocolVersion)
1053 p.protocolVersion = minUint32(p.protocolVersion, p.advertisedProtoVer)
1054 p.versionKnown = true
1055 log.Debugf("Negotiated protocol version %d for peer %s",
1056 p.protocolVersion, p)
1058 // Set the peer's ID.
1059 p.id = atomic.AddInt32(&nodeCount, 1)
1061 // Set the supported services for the peer to what the remote peer
1063 p.services = msg.Services
1065 // Set the remote peer's user agent.
1066 p.userAgent = msg.UserAgent
1068 // Determine if the peer would like to receive witness data with
1069 // transactions, or not.
1070 if p.services&wire.SFNodeWitness == wire.SFNodeWitness {
1071 p.witnessEnabled = true
1075 // Once the version message has been exchanged, we're able to determine
1076 // if this peer knows how to encode witness data over the wire
1077 // protocol. If so, then we'll switch to a decoding mode which is
1078 // prepared for the new transaction format introduced as part of
1080 if p.services&wire.SFNodeWitness == wire.SFNodeWitness {
1081 p.wireEncoding = wire.WitnessEncoding
1087 // handlePingMsg is invoked when a peer receives a ping bitcoin message. For
1088 // recent clients (protocol version > BIP0031Version), it replies with a pong
1089 // message. For older clients, it does nothing and anything other than failure
1090 // is considered a successful ping.
1091 func (p *Peer) handlePingMsg(msg *wire.MsgPing) {
1092 // Only reply with pong if the message is from a new enough client.
1093 if p.ProtocolVersion() > wire.BIP0031Version {
1094 // Include nonce from ping so pong can be identified.
1095 p.QueueMessage(wire.NewMsgPong(msg.Nonce), nil)
1099 // handlePongMsg is invoked when a peer receives a pong bitcoin message. It
1100 // updates the ping statistics as required for recent clients (protocol
1101 // version > BIP0031Version). There is no effect for older clients or when a
1102 // ping was not previously sent.
1103 func (p *Peer) handlePongMsg(msg *wire.MsgPong) {
1104 // Arguably we could use a buffered channel here sending data
1105 // in a fifo manner whenever we send a ping, or a list keeping track of
1106 // the times of each ping. For now we just make a best effort and
1107 // only record stats if it was for the last ping sent. Any preceding
1108 // and overlapping pings will be ignored. It is unlikely to occur
1109 // without large usage of the ping rpc call since we ping infrequently
1110 // enough that if they overlap we would have timed out the peer.
1111 if p.ProtocolVersion() > wire.BIP0031Version {
1113 if p.lastPingNonce != 0 && msg.Nonce == p.lastPingNonce {
1114 p.lastPingMicros = time.Since(p.lastPingTime).Nanoseconds()
1115 p.lastPingMicros /= 1000 // convert to usec.
1122 // readMessage reads the next bitcoin message from the peer with logging.
1123 func (p *Peer) readMessage(encoding wire.MessageEncoding) (wire.Message, []byte, error) {
1124 n, msg, buf, err := wire.ReadMessageWithEncodingN(p.conn,
1125 p.ProtocolVersion(), p.cfg.ChainParams.Net, encoding)
1126 atomic.AddUint64(&p.bytesReceived, uint64(n))
1127 if p.cfg.Listeners.OnRead != nil {
1128 p.cfg.Listeners.OnRead(p, n, msg, err)
1131 return nil, nil, err
1134 // Use closures to log expensive operations so they are only run when
1135 // the logging level requires it.
1136 log.Debugf("%v", newLogClosure(func() string {
1137 // Debug summary of message.
1138 summary := messageSummary(msg)
1139 if len(summary) > 0 {
1140 summary = " (" + summary + ")"
1142 return fmt.Sprintf("Received %v%s from %s",
1143 msg.Command(), summary, p)
1145 log.Tracef("%v", newLogClosure(func() string {
1146 return spew.Sdump(msg)
1148 log.Tracef("%v", newLogClosure(func() string {
1149 return spew.Sdump(buf)
1152 return msg, buf, nil
1155 // writeMessage sends a bitcoin message to the peer with logging.
1156 func (p *Peer) writeMessage(msg wire.Message, enc wire.MessageEncoding) error {
1157 // Don't do anything if we're disconnecting.
1158 if atomic.LoadInt32(&p.disconnect) != 0 {
1162 // Use closures to log expensive operations so they are only run when
1163 // the logging level requires it.
1164 log.Debugf("%v", newLogClosure(func() string {
1165 // Debug summary of message.
1166 summary := messageSummary(msg)
1167 if len(summary) > 0 {
1168 summary = " (" + summary + ")"
1170 return fmt.Sprintf("Sending %v%s to %s", msg.Command(),
1173 log.Tracef("%v", newLogClosure(func() string {
1174 return spew.Sdump(msg)
1176 log.Tracef("%v", newLogClosure(func() string {
1177 var buf bytes.Buffer
1178 _, err := wire.WriteMessageWithEncodingN(&buf, msg, p.ProtocolVersion(),
1179 p.cfg.ChainParams.Net, enc)
1183 return spew.Sdump(buf.Bytes())
1186 // Write the message to the peer.
1187 n, err := wire.WriteMessageWithEncodingN(p.conn, msg,
1188 p.ProtocolVersion(), p.cfg.ChainParams.Net, enc)
1189 atomic.AddUint64(&p.bytesSent, uint64(n))
1190 if p.cfg.Listeners.OnWrite != nil {
1191 p.cfg.Listeners.OnWrite(p, n, msg, err)
1196 // isAllowedReadError returns whether or not the passed error is allowed without
1197 // disconnecting the peer. In particular, regression tests need to be allowed
1198 // to send malformed messages without the peer being disconnected.
1199 func (p *Peer) isAllowedReadError(err error) bool {
1200 // Only allow read errors in regression test mode.
1201 if p.cfg.ChainParams.Net != wire.TestNet {
1205 // Don't allow the error if it's not specifically a malformed message error.
1206 if _, ok := err.(*wire.MessageError); !ok {
1210 // Don't allow the error if it's not coming from localhost or the
1211 // hostname can't be determined for some reason.
1212 host, _, err := net.SplitHostPort(p.addr)
1217 if host != "127.0.0.1" && host != "localhost" {
1221 // Allowed if all checks passed.
1225 // shouldHandleReadError returns whether or not the passed error, which is
1226 // expected to have come from reading from the remote peer in the inHandler,
1227 // should be logged and responded to with a reject message.
1228 func (p *Peer) shouldHandleReadError(err error) bool {
1229 // No logging or reject message when the peer is being forcibly
1231 if atomic.LoadInt32(&p.disconnect) != 0 {
1235 // No logging or reject message when the remote peer has been
1240 if opErr, ok := err.(*net.OpError); ok && !opErr.Temporary() {
1247 // maybeAddDeadline potentially adds a deadline for the appropriate expected
1248 // response for the passed wire protocol command to the pending responses map.
1249 func (p *Peer) maybeAddDeadline(pendingResponses map[string]time.Time, msgCmd string) {
1250 // Setup a deadline for each message being sent that expects a response.
1252 // NOTE: Pings are intentionally ignored here since they are typically
1253 // sent asynchronously and as a result of a long backlock of messages,
1254 // such as is typical in the case of initial block download, the
1255 // response won't be received in time.
1256 deadline := time.Now().Add(stallResponseTimeout)
1258 case wire.CmdVersion:
1259 // Expects a verack message.
1260 pendingResponses[wire.CmdVerAck] = deadline
1262 case wire.CmdMemPool:
1263 // Expects an inv message.
1264 pendingResponses[wire.CmdInv] = deadline
1266 case wire.CmdGetBlocks:
1267 // Expects an inv message.
1268 pendingResponses[wire.CmdInv] = deadline
1270 case wire.CmdGetData:
1271 // Expects a block, merkleblock, tx, or notfound message.
1272 pendingResponses[wire.CmdBlock] = deadline
1273 pendingResponses[wire.CmdMerkleBlock] = deadline
1274 pendingResponses[wire.CmdTx] = deadline
1275 pendingResponses[wire.CmdNotFound] = deadline
1277 case wire.CmdGetHeaders:
1278 // Expects a headers message. Use a longer deadline since it
1279 // can take a while for the remote peer to load all of the
1281 deadline = time.Now().Add(stallResponseTimeout * 3)
1282 pendingResponses[wire.CmdHeaders] = deadline
1286 // stallHandler handles stall detection for the peer. This entails keeping
1287 // track of expected responses and assigning them deadlines while accounting for
1288 // the time spent in callbacks. It must be run as a goroutine.
1289 func (p *Peer) stallHandler() {
1290 // These variables are used to adjust the deadline times forward by the
1291 // time it takes callbacks to execute. This is done because new
1292 // messages aren't read until the previous one is finished processing
1293 // (which includes callbacks), so the deadline for receiving a response
1294 // for a given message must account for the processing time as well.
1295 var handlerActive bool
1296 var handlersStartTime time.Time
1297 var deadlineOffset time.Duration
1299 // pendingResponses tracks the expected response deadline times.
1300 pendingResponses := make(map[string]time.Time)
1302 // stallTicker is used to periodically check pending responses that have
1303 // exceeded the expected deadline and disconnect the peer due to
1305 stallTicker := time.NewTicker(stallTickInterval)
1306 defer stallTicker.Stop()
1308 // ioStopped is used to detect when both the input and output handler
1309 // goroutines are done.
1314 case msg := <-p.stallControl:
1315 switch msg.command {
1316 case sccSendMessage:
1317 // Add a deadline for the expected response
1318 // message if needed.
1319 p.maybeAddDeadline(pendingResponses,
1320 msg.message.Command())
1322 case sccReceiveMessage:
1323 // Remove received messages from the expected
1324 // response map. Since certain commands expect
1325 // one of a group of responses, remove
1326 // everything in the expected group accordingly.
1327 switch msgCmd := msg.message.Command(); msgCmd {
1330 case wire.CmdMerkleBlock:
1334 case wire.CmdNotFound:
1335 delete(pendingResponses, wire.CmdBlock)
1336 delete(pendingResponses, wire.CmdMerkleBlock)
1337 delete(pendingResponses, wire.CmdTx)
1338 delete(pendingResponses, wire.CmdNotFound)
1341 delete(pendingResponses, msgCmd)
1344 case sccHandlerStart:
1345 // Warn on unbalanced callback signalling.
1347 log.Warn("Received handler start " +
1348 "control command while a " +
1349 "handler is already active")
1353 handlerActive = true
1354 handlersStartTime = time.Now()
1356 case sccHandlerDone:
1357 // Warn on unbalanced callback signalling.
1359 log.Warn("Received handler done " +
1360 "control command when a " +
1361 "handler is not already active")
1365 // Extend active deadlines by the time it took
1366 // to execute the callback.
1367 duration := time.Since(handlersStartTime)
1368 deadlineOffset += duration
1369 handlerActive = false
1372 log.Warnf("Unsupported message command %v",
1376 case <-stallTicker.C:
1377 // Calculate the offset to apply to the deadline based
1378 // on how long the handlers have taken to execute since
1381 offset := deadlineOffset
1383 offset += now.Sub(handlersStartTime)
1386 // Disconnect the peer if any of the pending responses
1387 // don't arrive by their adjusted deadline.
1388 for command, deadline := range pendingResponses {
1389 if now.Before(deadline.Add(offset)) {
1393 log.Debugf("Peer %s appears to be stalled or "+
1394 "misbehaving, %s timeout -- "+
1395 "disconnecting", p, command)
1400 // Reset the deadline offset for the next tick.
1404 // The stall handler can exit once both the input and
1405 // output handler goroutines are done.
1412 // The stall handler can exit once both the input and
1413 // output handler goroutines are done.
1421 // Drain any wait channels before going away so there is nothing left
1422 // waiting on this goroutine.
1426 case <-p.stallControl:
1431 log.Tracef("Peer stall handler done for %s", p)
1434 // inHandler handles all incoming messages for the peer. It must be run as a
1436 func (p *Peer) inHandler() {
1437 // The timer is stopped when a new message is received and reset after it
1439 idleTimer := time.AfterFunc(idleTimeout, func() {
1440 log.Warnf("Peer %s no answer for %s -- disconnecting", p, idleTimeout)
1445 for atomic.LoadInt32(&p.disconnect) == 0 {
1446 // Read a message and stop the idle timer as soon as the read
1447 // is done. The timer is reset below for the next iteration if
1449 rmsg, buf, err := p.readMessage(p.wireEncoding)
1452 // In order to allow regression tests with malformed messages, don't
1453 // disconnect the peer when we're in regression test mode and the
1454 // error is one of the allowed errors.
1455 if p.isAllowedReadError(err) {
1456 log.Errorf("Allowed test error from %s: %v", p, err)
1457 idleTimer.Reset(idleTimeout)
1461 // Only log the error and send reject message if the
1462 // local peer is not forcibly disconnecting and the
1463 // remote peer has not disconnected.
1464 if p.shouldHandleReadError(err) {
1465 errMsg := fmt.Sprintf("Can't read message from %s: %v", p, err)
1466 if err != io.ErrUnexpectedEOF {
1470 // Push a reject message for the malformed message and wait for
1471 // the message to be sent before disconnecting.
1473 // NOTE: Ideally this would include the command in the header if
1474 // at least that much of the message was valid, but that is not
1475 // currently exposed by wire, so just used malformed for the
1477 p.PushRejectMsg("malformed", wire.RejectMalformed, errMsg, nil,
1482 atomic.StoreInt64(&p.lastRecv, time.Now().Unix())
1483 p.stallControl <- stallControlMsg{sccReceiveMessage, rmsg}
1485 // Handle each supported message type.
1486 p.stallControl <- stallControlMsg{sccHandlerStart, rmsg}
1487 switch msg := rmsg.(type) {
1488 case *wire.MsgVersion:
1490 p.PushRejectMsg(msg.Command(), wire.RejectDuplicate,
1491 "duplicate version message", nil, true)
1494 case *wire.MsgVerAck:
1496 // No read lock is necessary because verAckReceived is not written
1497 // to in any other goroutine.
1498 if p.verAckReceived {
1499 log.Infof("Already received 'verack' from peer %v -- "+
1504 p.verAckReceived = true
1506 if p.cfg.Listeners.OnVerAck != nil {
1507 p.cfg.Listeners.OnVerAck(p, msg)
1510 case *wire.MsgGetAddr:
1511 if p.cfg.Listeners.OnGetAddr != nil {
1512 p.cfg.Listeners.OnGetAddr(p, msg)
1516 if p.cfg.Listeners.OnAddr != nil {
1517 p.cfg.Listeners.OnAddr(p, msg)
1521 p.handlePingMsg(msg)
1522 if p.cfg.Listeners.OnPing != nil {
1523 p.cfg.Listeners.OnPing(p, msg)
1527 p.handlePongMsg(msg)
1528 if p.cfg.Listeners.OnPong != nil {
1529 p.cfg.Listeners.OnPong(p, msg)
1532 case *wire.MsgAlert:
1533 if p.cfg.Listeners.OnAlert != nil {
1534 p.cfg.Listeners.OnAlert(p, msg)
1537 case *wire.MsgMemPool:
1538 if p.cfg.Listeners.OnMemPool != nil {
1539 p.cfg.Listeners.OnMemPool(p, msg)
1543 if p.cfg.Listeners.OnTx != nil {
1544 p.cfg.Listeners.OnTx(p, msg)
1547 case *wire.MsgBlock:
1548 if p.cfg.Listeners.OnBlock != nil {
1549 p.cfg.Listeners.OnBlock(p, msg, buf)
1553 if p.cfg.Listeners.OnInv != nil {
1554 p.cfg.Listeners.OnInv(p, msg)
1557 case *wire.MsgHeaders:
1558 if p.cfg.Listeners.OnHeaders != nil {
1559 p.cfg.Listeners.OnHeaders(p, msg)
1562 case *wire.MsgNotFound:
1563 if p.cfg.Listeners.OnNotFound != nil {
1564 p.cfg.Listeners.OnNotFound(p, msg)
1567 case *wire.MsgGetData:
1568 if p.cfg.Listeners.OnGetData != nil {
1569 p.cfg.Listeners.OnGetData(p, msg)
1572 case *wire.MsgGetBlocks:
1573 if p.cfg.Listeners.OnGetBlocks != nil {
1574 p.cfg.Listeners.OnGetBlocks(p, msg)
1577 case *wire.MsgGetHeaders:
1578 if p.cfg.Listeners.OnGetHeaders != nil {
1579 p.cfg.Listeners.OnGetHeaders(p, msg)
1582 case *wire.MsgFeeFilter:
1583 if p.cfg.Listeners.OnFeeFilter != nil {
1584 p.cfg.Listeners.OnFeeFilter(p, msg)
1587 case *wire.MsgFilterAdd:
1588 if p.cfg.Listeners.OnFilterAdd != nil {
1589 p.cfg.Listeners.OnFilterAdd(p, msg)
1592 case *wire.MsgFilterClear:
1593 if p.cfg.Listeners.OnFilterClear != nil {
1594 p.cfg.Listeners.OnFilterClear(p, msg)
1597 case *wire.MsgFilterLoad:
1598 if p.cfg.Listeners.OnFilterLoad != nil {
1599 p.cfg.Listeners.OnFilterLoad(p, msg)
1602 case *wire.MsgMerkleBlock:
1603 if p.cfg.Listeners.OnMerkleBlock != nil {
1604 p.cfg.Listeners.OnMerkleBlock(p, msg)
1607 case *wire.MsgReject:
1608 if p.cfg.Listeners.OnReject != nil {
1609 p.cfg.Listeners.OnReject(p, msg)
1612 case *wire.MsgSendHeaders:
1614 p.sendHeadersPreferred = true
1617 if p.cfg.Listeners.OnSendHeaders != nil {
1618 p.cfg.Listeners.OnSendHeaders(p, msg)
1622 log.Debugf("Received unhandled message of type %v "+
1623 "from %v", rmsg.Command(), p)
1625 p.stallControl <- stallControlMsg{sccHandlerDone, rmsg}
1627 // A message was received so reset the idle timer.
1628 idleTimer.Reset(idleTimeout)
1631 // Ensure the idle timer is stopped to avoid leaking the resource.
1634 // Ensure connection is closed.
1638 log.Tracef("Peer input handler done for %s", p)
1641 // queueHandler handles the queuing of outgoing data for the peer. This runs as
1642 // a muxer for various sources of input so we can ensure that server and peer
1643 // handlers will not block on us sending a message. That data is then passed on
1644 // to outHandler to be actually written.
1645 func (p *Peer) queueHandler() {
1646 pendingMsgs := list.New()
1647 invSendQueue := list.New()
1648 trickleTicker := time.NewTicker(trickleTimeout)
1649 defer trickleTicker.Stop()
1651 // We keep the waiting flag so that we know if we have a message queued
1652 // to the outHandler or not. We could use the presence of a head of
1653 // the list for this but then we have rather racy concerns about whether
1654 // it has gotten it at cleanup time - and thus who sends on the
1655 // message's done channel. To avoid such confusion we keep a different
1656 // flag and pendingMsgs only contains messages that we have not yet
1657 // passed to outHandler.
1660 // To avoid duplication below.
1661 queuePacket := func(msg outMsg, list *list.List, waiting bool) bool {
1667 // we are always waiting now.
1673 case msg := <-p.outputQueue:
1674 waiting = queuePacket(msg, pendingMsgs, waiting)
1676 // This channel is notified when a message has been sent across
1677 // the network socket.
1678 case <-p.sendDoneQueue:
1679 // No longer waiting if there are no more messages
1680 // in the pending messages queue.
1681 next := pendingMsgs.Front()
1687 // Notify the outHandler about the next item to
1688 // asynchronously send.
1689 val := pendingMsgs.Remove(next)
1690 p.sendQueue <- val.(outMsg)
1692 case iv := <-p.outputInvChan:
1693 // No handshake? They'll find out soon enough.
1694 if p.VersionKnown() {
1695 invSendQueue.PushBack(iv)
1698 case <-trickleTicker.C:
1699 // Don't send anything if we're disconnecting or there
1700 // is no queued inventory.
1701 // version is known if send queue has any entries.
1702 if atomic.LoadInt32(&p.disconnect) != 0 ||
1703 invSendQueue.Len() == 0 {
1707 // Create and send as many inv messages as needed to
1708 // drain the inventory send queue.
1709 invMsg := wire.NewMsgInvSizeHint(uint(invSendQueue.Len()))
1710 for e := invSendQueue.Front(); e != nil; e = invSendQueue.Front() {
1711 iv := invSendQueue.Remove(e).(*wire.InvVect)
1713 // Don't send inventory that became known after
1714 // the initial check.
1715 if p.knownInventory.Exists(iv) {
1719 invMsg.AddInvVect(iv)
1720 if len(invMsg.InvList) >= maxInvTrickleSize {
1721 waiting = queuePacket(
1722 outMsg{msg: invMsg},
1723 pendingMsgs, waiting)
1724 invMsg = wire.NewMsgInvSizeHint(uint(invSendQueue.Len()))
1727 // Add the inventory that is being relayed to
1728 // the known inventory for the peer.
1729 p.AddKnownInventory(iv)
1731 if len(invMsg.InvList) > 0 {
1732 waiting = queuePacket(outMsg{msg: invMsg},
1733 pendingMsgs, waiting)
1741 // Drain any wait channels before we go away so we don't leave something
1743 for e := pendingMsgs.Front(); e != nil; e = pendingMsgs.Front() {
1744 val := pendingMsgs.Remove(e)
1746 if msg.doneChan != nil {
1747 msg.doneChan <- struct{}{}
1753 case msg := <-p.outputQueue:
1754 if msg.doneChan != nil {
1755 msg.doneChan <- struct{}{}
1757 case <-p.outputInvChan:
1758 // Just drain channel
1759 // sendDoneQueue is buffered so doesn't need draining.
1765 log.Tracef("Peer queue handler done for %s", p)
1768 // shouldLogWriteError returns whether or not the passed error, which is
1769 // expected to have come from writing to the remote peer in the outHandler,
1770 // should be logged.
1771 func (p *Peer) shouldLogWriteError(err error) bool {
1772 // No logging when the peer is being forcibly disconnected.
1773 if atomic.LoadInt32(&p.disconnect) != 0 {
1777 // No logging when the remote peer has been disconnected.
1781 if opErr, ok := err.(*net.OpError); ok && !opErr.Temporary() {
1788 // outHandler handles all outgoing messages for the peer. It must be run as a
1789 // goroutine. It uses a buffered channel to serialize output messages while
1790 // allowing the sender to continue running asynchronously.
1791 func (p *Peer) outHandler() {
1795 case msg := <-p.sendQueue:
1796 switch m := msg.msg.(type) {
1798 // Only expects a pong message in later protocol
1799 // versions. Also set up statistics.
1800 if p.ProtocolVersion() > wire.BIP0031Version {
1802 p.lastPingNonce = m.Nonce
1803 p.lastPingTime = time.Now()
1808 p.stallControl <- stallControlMsg{sccSendMessage, msg.msg}
1810 err := p.writeMessage(msg.msg, msg.encoding)
1813 if p.shouldLogWriteError(err) {
1814 log.Errorf("Failed to send message to "+
1817 if msg.doneChan != nil {
1818 msg.doneChan <- struct{}{}
1823 // At this point, the message was successfully sent, so
1824 // update the last send time, signal the sender of the
1825 // message that it has been sent (if requested), and
1826 // signal the send queue to the deliver the next queued
1828 atomic.StoreInt64(&p.lastSend, time.Now().Unix())
1829 if msg.doneChan != nil {
1830 msg.doneChan <- struct{}{}
1832 p.sendDoneQueue <- struct{}{}
1841 // Drain any wait channels before we go away so we don't leave something
1842 // waiting for us. We have waited on queueQuit and thus we can be sure
1843 // that we will not miss anything sent on sendQueue.
1847 case msg := <-p.sendQueue:
1848 if msg.doneChan != nil {
1849 msg.doneChan <- struct{}{}
1851 // no need to send on sendDoneQueue since queueHandler
1852 // has been waited on and already exited.
1858 log.Tracef("Peer output handler done for %s", p)
1861 // pingHandler periodically pings the peer. It must be run as a goroutine.
1862 func (p *Peer) pingHandler() {
1863 pingTicker := time.NewTicker(pingInterval)
1864 defer pingTicker.Stop()
1869 case <-pingTicker.C:
1870 nonce, err := wire.RandomUint64()
1872 log.Errorf("Not sending ping to %s: %v", p, err)
1875 p.QueueMessage(wire.NewMsgPing(nonce), nil)
1883 // QueueMessage adds the passed bitcoin message to the peer send queue.
1885 // This function is safe for concurrent access.
1886 func (p *Peer) QueueMessage(msg wire.Message, doneChan chan<- struct{}) {
1887 p.QueueMessageWithEncoding(msg, doneChan, wire.BaseEncoding)
1890 // QueueMessageWithEncoding adds the passed bitcoin message to the peer send
1891 // queue. This function is identical to QueueMessage, however it allows the
1892 // caller to specify the wire encoding type that should be used when
1893 // encoding/decoding blocks and transactions.
1895 // This function is safe for concurrent access.
1896 func (p *Peer) QueueMessageWithEncoding(msg wire.Message, doneChan chan<- struct{},
1897 encoding wire.MessageEncoding) {
1899 // Avoid risk of deadlock if goroutine already exited. The goroutine
1900 // we will be sending to hangs around until it knows for a fact that
1901 // it is marked as disconnected and *then* it drains the channels.
1903 if doneChan != nil {
1905 doneChan <- struct{}{}
1910 p.outputQueue <- outMsg{msg: msg, encoding: encoding, doneChan: doneChan}
1913 // QueueInventory adds the passed inventory to the inventory send queue which
1914 // might not be sent right away, rather it is trickled to the peer in batches.
1915 // Inventory that the peer is already known to have is ignored.
1917 // This function is safe for concurrent access.
1918 func (p *Peer) QueueInventory(invVect *wire.InvVect) {
1919 // Don't add the inventory to the send queue if the peer is already
1920 // known to have it.
1921 if p.knownInventory.Exists(invVect) {
1925 // Avoid risk of deadlock if goroutine already exited. The goroutine
1926 // we will be sending to hangs around until it knows for a fact that
1927 // it is marked as disconnected and *then* it drains the channels.
1932 p.outputInvChan <- invVect
1935 // AssociateConnection associates the given conn to the peer. Calling this
1936 // function when the peer is already connected will have no effect.
1937 func (p *Peer) AssociateConnection(conn net.Conn) {
1938 // Already connected?
1939 if !atomic.CompareAndSwapInt32(&p.connected, 0, 1) {
1944 p.timeConnected = time.Now()
1947 p.addr = p.conn.RemoteAddr().String()
1949 // Set up a NetAddress for the peer to be used with AddrManager. We
1950 // only do this inbound because outbound set this up at connection time
1951 // and no point recomputing.
1952 na, err := newNetAddress(p.conn.RemoteAddr(), p.services)
1954 log.Errorf("Cannot create remote net address: %v", err)
1962 if err := p.start(); err != nil {
1963 log.Debugf("Cannot start peer %v: %v", p, err)
1969 // Connected returns whether or not the peer is currently connected.
1971 // This function is safe for concurrent access.
1972 func (p *Peer) Connected() bool {
1973 return atomic.LoadInt32(&p.connected) != 0 &&
1974 atomic.LoadInt32(&p.disconnect) == 0
1977 // Disconnect disconnects the peer by closing the connection. Calling this
1978 // function when the peer is already disconnected or in the process of
1979 // disconnecting will have no effect.
1980 func (p *Peer) Disconnect() {
1981 if atomic.AddInt32(&p.disconnect, 1) != 1 {
1985 log.Tracef("Disconnecting %s", p)
1986 if atomic.LoadInt32(&p.connected) != 0 {
1992 // start begins processing input and output messages.
1993 func (p *Peer) start() error {
1994 log.Tracef("Starting peer %s", p)
1996 negotiateErr := make(chan error)
1999 negotiateErr <- p.negotiateInboundProtocol()
2001 negotiateErr <- p.negotiateOutboundProtocol()
2005 // Negotiate the protocol within the specified negotiateTimeout.
2007 case err := <-negotiateErr:
2011 case <-time.After(negotiateTimeout):
2012 return errors.New("protocol negotiation timeout")
2014 log.Debugf("Connected to %s", p.Addr())
2016 // The protocol has been negotiated successfully so start processing input
2017 // and output messages.
2024 // Send our verack message now that the IO processing machinery has started.
2025 p.QueueMessage(wire.NewMsgVerAck(), nil)
2029 // WaitForDisconnect waits until the peer has completely disconnected and all
2030 // resources are cleaned up. This will happen if either the local or remote
2031 // side has been disconnected or the peer is forcibly disconnected via
2033 func (p *Peer) WaitForDisconnect() {
2037 // readRemoteVersionMsg waits for the next message to arrive from the remote
2038 // peer. If the next message is not a version message or the version is not
2039 // acceptable then return an error.
2040 func (p *Peer) readRemoteVersionMsg() error {
2041 // Read their version message.
2042 msg, _, err := p.readMessage(wire.LatestEncoding)
2047 remoteVerMsg, ok := msg.(*wire.MsgVersion)
2049 errStr := "A version message must precede all others"
2052 rejectMsg := wire.NewMsgReject(msg.Command(), wire.RejectMalformed,
2054 return p.writeMessage(rejectMsg, wire.LatestEncoding)
2057 if err := p.handleRemoteVersionMsg(remoteVerMsg); err != nil {
2061 if p.cfg.Listeners.OnVersion != nil {
2062 p.cfg.Listeners.OnVersion(p, remoteVerMsg)
2067 // writeLocalVersionMsg writes our version message to the remote peer.
2068 func (p *Peer) writeLocalVersionMsg() error {
2069 localVerMsg, err := p.localVersionMsg()
2074 return p.writeMessage(localVerMsg, wire.LatestEncoding)
2077 // negotiateInboundProtocol waits to receive a version message from the peer
2078 // then sends our version message. If the events do not occur in that order then
2079 // it returns an error.
2080 func (p *Peer) negotiateInboundProtocol() error {
2081 if err := p.readRemoteVersionMsg(); err != nil {
2085 return p.writeLocalVersionMsg()
2088 // negotiateOutboundProtocol sends our version message then waits to receive a
2089 // version message from the peer. If the events do not occur in that order then
2090 // it returns an error.
2091 func (p *Peer) negotiateOutboundProtocol() error {
2092 if err := p.writeLocalVersionMsg(); err != nil {
2096 return p.readRemoteVersionMsg()
2099 // newPeerBase returns a new base bitcoin peer based on the inbound flag. This
2100 // is used by the NewInboundPeer and NewOutboundPeer functions to perform base
2101 // setup needed by both types of peers.
2102 func newPeerBase(origCfg *Config, inbound bool) *Peer {
2103 // Default to the max supported protocol version if not specified by the
2105 cfg := *origCfg // Copy to avoid mutating caller.
2106 if cfg.ProtocolVersion == 0 {
2107 cfg.ProtocolVersion = MaxProtocolVersion
2110 // Set the chain parameters to testnet if the caller did not specify any.
2111 if cfg.ChainParams == nil {
2112 cfg.ChainParams = &chaincfg.TestNet3Params
2117 wireEncoding: wire.BaseEncoding,
2118 knownInventory: newMruInventoryMap(maxKnownInventory),
2119 stallControl: make(chan stallControlMsg, 1), // nonblocking sync
2120 outputQueue: make(chan outMsg, outputBufferSize),
2121 sendQueue: make(chan outMsg, 1), // nonblocking sync
2122 sendDoneQueue: make(chan struct{}, 1), // nonblocking sync
2123 outputInvChan: make(chan *wire.InvVect, outputBufferSize),
2124 inQuit: make(chan struct{}),
2125 queueQuit: make(chan struct{}),
2126 outQuit: make(chan struct{}),
2127 quit: make(chan struct{}),
2128 cfg: cfg, // Copy so caller can't mutate.
2129 services: cfg.Services,
2130 protocolVersion: cfg.ProtocolVersion,
2135 // NewInboundPeer returns a new inbound bitcoin peer. Use Start to begin
2136 // processing incoming and outgoing messages.
2137 func NewInboundPeer(cfg *Config) *Peer {
2138 return newPeerBase(cfg, true)
2141 // NewOutboundPeer returns a new outbound bitcoin peer.
2142 func NewOutboundPeer(cfg *Config, addr string) (*Peer, error) {
2143 p := newPeerBase(cfg, false)
2146 host, portStr, err := net.SplitHostPort(addr)
2151 port, err := strconv.ParseUint(portStr, 10, 16)
2156 if cfg.HostToNetAddress != nil {
2157 na, err := cfg.HostToNetAddress(host, uint16(port), cfg.Services)
2163 p.na = wire.NewNetAddressIPPort(net.ParseIP(host), uint16(port),
2171 rand.Seed(time.Now().UnixNano())