+++ /dev/null
-// Copyright (c) 2013-2017 The btcsuite developers
-// Copyright (c) 2015-2017 The Decred developers
-// Use of this source code is governed by an ISC
-// license that can be found in the LICENSE file.
-
-package main
-
-import (
- "bytes"
- "crypto/rand"
- "crypto/tls"
- "encoding/binary"
- "errors"
- "fmt"
- "math"
- "net"
- "runtime"
- "sort"
- "strconv"
- "strings"
- "sync"
- "sync/atomic"
- "time"
-
- "github.com/btcsuite/btcd/addrmgr"
- "github.com/btcsuite/btcd/blockchain"
- "github.com/btcsuite/btcd/blockchain/indexers"
- "github.com/btcsuite/btcd/chaincfg"
- "github.com/btcsuite/btcd/chaincfg/chainhash"
- "github.com/btcsuite/btcd/connmgr"
- "github.com/btcsuite/btcd/database"
- "github.com/btcsuite/btcd/mempool"
- "github.com/btcsuite/btcd/mining"
- "github.com/btcsuite/btcd/mining/cpuminer"
- "github.com/btcsuite/btcd/netsync"
- "github.com/btcsuite/btcd/peer"
- "github.com/btcsuite/btcd/txscript"
- "github.com/btcsuite/btcd/wire"
- "github.com/btcsuite/btcutil"
- "github.com/btcsuite/btcutil/bloom"
-)
-
-const (
- // defaultServices describes the default services that are supported by
- // the server.
- defaultServices = wire.SFNodeNetwork | wire.SFNodeBloom | wire.SFNodeWitness
-
- // defaultRequiredServices describes the default services that are
- // required to be supported by outbound peers.
- defaultRequiredServices = wire.SFNodeNetwork
-
- // defaultTargetOutbound is the default number of outbound peers to target.
- defaultTargetOutbound = 8
-
- // connectionRetryInterval is the base amount of time to wait in between
- // retries when connecting to persistent peers. It is adjusted by the
- // number of retries such that there is a retry backoff.
- connectionRetryInterval = time.Second * 5
-)
-
-var (
- // userAgentName is the user agent name and is used to help identify
- // ourselves to other bitcoin peers.
- userAgentName = "btcd"
-
- // userAgentVersion is the user agent version and is used to help
- // identify ourselves to other bitcoin peers.
- userAgentVersion = fmt.Sprintf("%d.%d.%d", appMajor, appMinor, appPatch)
-)
-
-// zeroHash is the zero value hash (all zeros). It is defined as a convenience.
-var zeroHash chainhash.Hash
-
-// onionAddr implements the net.Addr interface and represents a tor address.
-type onionAddr struct {
- addr string
-}
-
-// String returns the onion address.
-//
-// This is part of the net.Addr interface.
-func (oa *onionAddr) String() string {
- return oa.addr
-}
-
-// Network returns "onion".
-//
-// This is part of the net.Addr interface.
-func (oa *onionAddr) Network() string {
- return "onion"
-}
-
-// Ensure onionAddr implements the net.Addr interface.
-var _ net.Addr = (*onionAddr)(nil)
-
-// broadcastMsg provides the ability to house a bitcoin message to be broadcast
-// to all connected peers except specified excluded peers.
-type broadcastMsg struct {
- message wire.Message
- excludePeers []*serverPeer
-}
-
-// broadcastInventoryAdd is a type used to declare that the InvVect it contains
-// needs to be added to the rebroadcast map
-type broadcastInventoryAdd relayMsg
-
-// broadcastInventoryDel is a type used to declare that the InvVect it contains
-// needs to be removed from the rebroadcast map
-type broadcastInventoryDel *wire.InvVect
-
-// relayMsg packages an inventory vector along with the newly discovered
-// inventory so the relay has access to that information.
-type relayMsg struct {
- invVect *wire.InvVect
- data interface{}
-}
-
-// updatePeerHeightsMsg is a message sent from the blockmanager to the server
-// after a new block has been accepted. The purpose of the message is to update
-// the heights of peers that were known to announce the block before we
-// connected it to the main chain or recognized it as an orphan. With these
-// updates, peer heights will be kept up to date, allowing for fresh data when
-// selecting sync peer candidacy.
-type updatePeerHeightsMsg struct {
- newHash *chainhash.Hash
- newHeight int32
- originPeer *peer.Peer
-}
-
-// peerState maintains state of inbound, persistent, outbound peers as well
-// as banned peers and outbound groups.
-type peerState struct {
- inboundPeers map[int32]*serverPeer
- outboundPeers map[int32]*serverPeer
- persistentPeers map[int32]*serverPeer
- banned map[string]time.Time
- outboundGroups map[string]int
-}
-
-// Count returns the count of all known peers.
-func (ps *peerState) Count() int {
- return len(ps.inboundPeers) + len(ps.outboundPeers) +
- len(ps.persistentPeers)
-}
-
-// forAllOutboundPeers is a helper function that runs closure on all outbound
-// peers known to peerState.
-func (ps *peerState) forAllOutboundPeers(closure func(sp *serverPeer)) {
- for _, e := range ps.outboundPeers {
- closure(e)
- }
- for _, e := range ps.persistentPeers {
- closure(e)
- }
-}
-
-// forAllPeers is a helper function that runs closure on all peers known to
-// peerState.
-func (ps *peerState) forAllPeers(closure func(sp *serverPeer)) {
- for _, e := range ps.inboundPeers {
- closure(e)
- }
- ps.forAllOutboundPeers(closure)
-}
-
-// server provides a bitcoin server for handling communications to and from
-// bitcoin peers.
-type server struct {
- // The following variables must only be used atomically.
- // Putting the uint64s first makes them 64-bit aligned for 32-bit systems.
- bytesReceived uint64 // Total bytes received from all peers since start.
- bytesSent uint64 // Total bytes sent by all peers since start.
- started int32
- shutdown int32
- shutdownSched int32
- startupTime int64
-
- chainParams *chaincfg.Params
- addrManager *addrmgr.AddrManager
- connManager *connmgr.ConnManager
- sigCache *txscript.SigCache
- hashCache *txscript.HashCache
- rpcServer *rpcServer
- syncManager *netsync.SyncManager
- chain *blockchain.BlockChain
- txMemPool *mempool.TxPool
- cpuMiner *cpuminer.CPUMiner
- modifyRebroadcastInv chan interface{}
- newPeers chan *serverPeer
- donePeers chan *serverPeer
- banPeers chan *serverPeer
- query chan interface{}
- relayInv chan relayMsg
- broadcast chan broadcastMsg
- peerHeightsUpdate chan updatePeerHeightsMsg
- wg sync.WaitGroup
- quit chan struct{}
- nat NAT
- db database.DB
- timeSource blockchain.MedianTimeSource
- services wire.ServiceFlag
-
- // The following fields are used for optional indexes. They will be nil
- // if the associated index is not enabled. These fields are set during
- // initial creation of the server and never changed afterwards, so they
- // do not need to be protected for concurrent access.
- txIndex *indexers.TxIndex
- addrIndex *indexers.AddrIndex
-}
-
-// serverPeer extends the peer to maintain state shared by the server and
-// the blockmanager.
-type serverPeer struct {
- // The following variables must only be used atomically
- feeFilter int64
-
- *peer.Peer
-
- connReq *connmgr.ConnReq
- server *server
- persistent bool
- continueHash *chainhash.Hash
- relayMtx sync.Mutex
- disableRelayTx bool
- sentAddrs bool
- isWhitelisted bool
- filter *bloom.Filter
- knownAddresses map[string]struct{}
- banScore connmgr.DynamicBanScore
- quit chan struct{}
- // The following chans are used to sync blockmanager and server.
- txProcessed chan struct{}
- blockProcessed chan struct{}
-}
-
-// newServerPeer returns a new serverPeer instance. The peer needs to be set by
-// the caller.
-func newServerPeer(s *server, isPersistent bool) *serverPeer {
- return &serverPeer{
- server: s,
- persistent: isPersistent,
- filter: bloom.LoadFilter(nil),
- knownAddresses: make(map[string]struct{}),
- quit: make(chan struct{}),
- txProcessed: make(chan struct{}, 1),
- blockProcessed: make(chan struct{}, 1),
- }
-}
-
-// newestBlock returns the current best block hash and height using the format
-// required by the configuration for the peer package.
-func (sp *serverPeer) newestBlock() (*chainhash.Hash, int32, error) {
- best := sp.server.chain.BestSnapshot()
- return &best.Hash, best.Height, nil
-}
-
-// addKnownAddresses adds the given addresses to the set of known addresses to
-// the peer to prevent sending duplicate addresses.
-func (sp *serverPeer) addKnownAddresses(addresses []*wire.NetAddress) {
- for _, na := range addresses {
- sp.knownAddresses[addrmgr.NetAddressKey(na)] = struct{}{}
- }
-}
-
-// addressKnown true if the given address is already known to the peer.
-func (sp *serverPeer) addressKnown(na *wire.NetAddress) bool {
- _, exists := sp.knownAddresses[addrmgr.NetAddressKey(na)]
- return exists
-}
-
-// setDisableRelayTx toggles relaying of transactions for the given peer.
-// It is safe for concurrent access.
-func (sp *serverPeer) setDisableRelayTx(disable bool) {
- sp.relayMtx.Lock()
- sp.disableRelayTx = disable
- sp.relayMtx.Unlock()
-}
-
-// relayTxDisabled returns whether or not relaying of transactions for the given
-// peer is disabled.
-// It is safe for concurrent access.
-func (sp *serverPeer) relayTxDisabled() bool {
- sp.relayMtx.Lock()
- isDisabled := sp.disableRelayTx
- sp.relayMtx.Unlock()
-
- return isDisabled
-}
-
-// pushAddrMsg sends an addr message to the connected peer using the provided
-// addresses.
-func (sp *serverPeer) pushAddrMsg(addresses []*wire.NetAddress) {
- // Filter addresses already known to the peer.
- addrs := make([]*wire.NetAddress, 0, len(addresses))
- for _, addr := range addresses {
- if !sp.addressKnown(addr) {
- addrs = append(addrs, addr)
- }
- }
- known, err := sp.PushAddrMsg(addrs)
- if err != nil {
- peerLog.Errorf("Can't push address message to %s: %v", sp.Peer, err)
- sp.Disconnect()
- return
- }
- sp.addKnownAddresses(known)
-}
-
-// addBanScore increases the persistent and decaying ban score fields by the
-// values passed as parameters. If the resulting score exceeds half of the ban
-// threshold, a warning is logged including the reason provided. Further, if
-// the score is above the ban threshold, the peer will be banned and
-// disconnected.
-func (sp *serverPeer) addBanScore(persistent, transient uint32, reason string) {
- // No warning is logged and no score is calculated if banning is disabled.
- if cfg.DisableBanning {
- return
- }
- if sp.isWhitelisted {
- peerLog.Debugf("Misbehaving whitelisted peer %s: %s", sp, reason)
- return
- }
-
- warnThreshold := cfg.BanThreshold >> 1
- if transient == 0 && persistent == 0 {
- // The score is not being increased, but a warning message is still
- // logged if the score is above the warn threshold.
- score := sp.banScore.Int()
- if score > warnThreshold {
- peerLog.Warnf("Misbehaving peer %s: %s -- ban score is %d, "+
- "it was not increased this time", sp, reason, score)
- }
- return
- }
- score := sp.banScore.Increase(persistent, transient)
- if score > warnThreshold {
- peerLog.Warnf("Misbehaving peer %s: %s -- ban score increased to %d",
- sp, reason, score)
- if score > cfg.BanThreshold {
- peerLog.Warnf("Misbehaving peer %s -- banning and disconnecting",
- sp)
- sp.server.BanPeer(sp)
- sp.Disconnect()
- }
- }
-}
-
-// OnVersion is invoked when a peer receives a version bitcoin message
-// and is used to negotiate the protocol version details as well as kick start
-// the communications.
-func (sp *serverPeer) OnVersion(_ *peer.Peer, msg *wire.MsgVersion) {
- // Add the remote peer time as a sample for creating an offset against
- // the local clock to keep the network time in sync.
- sp.server.timeSource.AddTimeSample(sp.Addr(), msg.Timestamp)
-
- // Signal the sync manager this peer is a new sync candidate.
- sp.server.syncManager.NewPeer(sp.Peer)
-
- // Choose whether or not to relay transactions before a filter command
- // is received.
- sp.setDisableRelayTx(msg.DisableRelayTx)
-
- // Update the address manager and request known addresses from the
- // remote peer for outbound connections. This is skipped when running
- // on the simulation test network since it is only intended to connect
- // to specified peers and actively avoids advertising and connecting to
- // discovered peers.
- if !cfg.SimNet {
- addrManager := sp.server.addrManager
-
- // Outbound connections.
- if !sp.Inbound() {
- // After soft-fork activation, only make outbound
- // connection to peers if they flag that they're segwit
- // enabled.
- chain := sp.server.chain
- segwitActive, err := chain.IsDeploymentActive(chaincfg.DeploymentSegwit)
- if err != nil {
- peerLog.Errorf("Unable to query for segwit "+
- "soft-fork state: %v", err)
- return
- }
-
- if segwitActive && !sp.IsWitnessEnabled() {
- peerLog.Infof("Disconnecting non-segwit "+
- "peer %v, isn't segwit enabled and "+
- "we need more segwit enabled peers", sp)
- sp.Disconnect()
- return
- }
-
- // TODO(davec): Only do this if not doing the initial block
- // download and the local address is routable.
- if !cfg.DisableListen /* && isCurrent? */ {
- // Get address that best matches.
- lna := addrManager.GetBestLocalAddress(sp.NA())
- if addrmgr.IsRoutable(lna) {
- // Filter addresses the peer already knows about.
- addresses := []*wire.NetAddress{lna}
- sp.pushAddrMsg(addresses)
- }
- }
-
- // Request known addresses if the server address manager needs
- // more and the peer has a protocol version new enough to
- // include a timestamp with addresses.
- hasTimestamp := sp.ProtocolVersion() >=
- wire.NetAddressTimeVersion
- if addrManager.NeedMoreAddresses() && hasTimestamp {
- sp.QueueMessage(wire.NewMsgGetAddr(), nil)
- }
-
- // Mark the address as a known good address.
- addrManager.Good(sp.NA())
- }
- }
-
- // Add valid peer to the server.
- sp.server.AddPeer(sp)
-}
-
-// OnMemPool is invoked when a peer receives a mempool bitcoin message.
-// It creates and sends an inventory message with the contents of the memory
-// pool up to the maximum inventory allowed per message. When the peer has a
-// bloom filter loaded, the contents are filtered accordingly.
-func (sp *serverPeer) OnMemPool(_ *peer.Peer, msg *wire.MsgMemPool) {
- // Only allow mempool requests if the server has bloom filtering
- // enabled.
- if sp.server.services&wire.SFNodeBloom != wire.SFNodeBloom {
- peerLog.Debugf("peer %v sent mempool request with bloom "+
- "filtering disabled -- disconnecting", sp)
- sp.Disconnect()
- return
- }
-
- // A decaying ban score increase is applied to prevent flooding.
- // The ban score accumulates and passes the ban threshold if a burst of
- // mempool messages comes from a peer. The score decays each minute to
- // half of its value.
- sp.addBanScore(0, 33, "mempool")
-
- // Generate inventory message with the available transactions in the
- // transaction memory pool. Limit it to the max allowed inventory
- // per message. The NewMsgInvSizeHint function automatically limits
- // the passed hint to the maximum allowed, so it's safe to pass it
- // without double checking it here.
- txMemPool := sp.server.txMemPool
- txDescs := txMemPool.TxDescs()
- invMsg := wire.NewMsgInvSizeHint(uint(len(txDescs)))
-
- for _, txDesc := range txDescs {
- // Either add all transactions when there is no bloom filter,
- // or only the transactions that match the filter when there is
- // one.
- if !sp.filter.IsLoaded() || sp.filter.MatchTxAndUpdate(txDesc.Tx) {
- iv := wire.NewInvVect(wire.InvTypeTx, txDesc.Tx.Hash())
- invMsg.AddInvVect(iv)
- if len(invMsg.InvList)+1 > wire.MaxInvPerMsg {
- break
- }
- }
- }
-
- // Send the inventory message if there is anything to send.
- if len(invMsg.InvList) > 0 {
- sp.QueueMessage(invMsg, nil)
- }
-}
-
-// OnTx is invoked when a peer receives a tx bitcoin message. It blocks
-// until the bitcoin transaction has been fully processed. Unlock the block
-// handler this does not serialize all transactions through a single thread
-// transactions don't rely on the previous one in a linear fashion like blocks.
-func (sp *serverPeer) OnTx(_ *peer.Peer, msg *wire.MsgTx) {
- if cfg.BlocksOnly {
- peerLog.Tracef("Ignoring tx %v from %v - blocksonly enabled",
- msg.TxHash(), sp)
- return
- }
-
- // Add the transaction to the known inventory for the peer.
- // Convert the raw MsgTx to a btcutil.Tx which provides some convenience
- // methods and things such as hash caching.
- tx := btcutil.NewTx(msg)
- iv := wire.NewInvVect(wire.InvTypeTx, tx.Hash())
- sp.AddKnownInventory(iv)
-
- // Queue the transaction up to be handled by the sync manager and
- // intentionally block further receives until the transaction is fully
- // processed and known good or bad. This helps prevent a malicious peer
- // from queuing up a bunch of bad transactions before disconnecting (or
- // being disconnected) and wasting memory.
- sp.server.syncManager.QueueTx(tx, sp.Peer, sp.txProcessed)
- <-sp.txProcessed
-}
-
-// OnBlock is invoked when a peer receives a block bitcoin message. It
-// blocks until the bitcoin block has been fully processed.
-func (sp *serverPeer) OnBlock(_ *peer.Peer, msg *wire.MsgBlock, buf []byte) {
- // Convert the raw MsgBlock to a btcutil.Block which provides some
- // convenience methods and things such as hash caching.
- block := btcutil.NewBlockFromBlockAndBytes(msg, buf)
-
- // Add the block to the known inventory for the peer.
- iv := wire.NewInvVect(wire.InvTypeBlock, block.Hash())
- sp.AddKnownInventory(iv)
-
- // Queue the block up to be handled by the block
- // manager and intentionally block further receives
- // until the bitcoin block is fully processed and known
- // good or bad. This helps prevent a malicious peer
- // from queuing up a bunch of bad blocks before
- // disconnecting (or being disconnected) and wasting
- // memory. Additionally, this behavior is depended on
- // by at least the block acceptance test tool as the
- // reference implementation processes blocks in the same
- // thread and therefore blocks further messages until
- // the bitcoin block has been fully processed.
- sp.server.syncManager.QueueBlock(block, sp.Peer, sp.blockProcessed)
- <-sp.blockProcessed
-}
-
-// OnInv is invoked when a peer receives an inv bitcoin message and is
-// used to examine the inventory being advertised by the remote peer and react
-// accordingly. We pass the message down to blockmanager which will call
-// QueueMessage with any appropriate responses.
-func (sp *serverPeer) OnInv(_ *peer.Peer, msg *wire.MsgInv) {
- if !cfg.BlocksOnly {
- if len(msg.InvList) > 0 {
- sp.server.syncManager.QueueInv(msg, sp.Peer)
- }
- return
- }
-
- newInv := wire.NewMsgInvSizeHint(uint(len(msg.InvList)))
- for _, invVect := range msg.InvList {
- if invVect.Type == wire.InvTypeTx {
- peerLog.Tracef("Ignoring tx %v in inv from %v -- "+
- "blocksonly enabled", invVect.Hash, sp)
- if sp.ProtocolVersion() >= wire.BIP0037Version {
- peerLog.Infof("Peer %v is announcing "+
- "transactions -- disconnecting", sp)
- sp.Disconnect()
- return
- }
- continue
- }
- err := newInv.AddInvVect(invVect)
- if err != nil {
- peerLog.Errorf("Failed to add inventory vector: %v", err)
- break
- }
- }
-
- if len(newInv.InvList) > 0 {
- sp.server.syncManager.QueueInv(newInv, sp.Peer)
- }
-}
-
-// OnHeaders is invoked when a peer receives a headers bitcoin
-// message. The message is passed down to the sync manager.
-func (sp *serverPeer) OnHeaders(_ *peer.Peer, msg *wire.MsgHeaders) {
- sp.server.syncManager.QueueHeaders(msg, sp.Peer)
-}
-
-// handleGetData is invoked when a peer receives a getdata bitcoin message and
-// is used to deliver block and transaction information.
-func (sp *serverPeer) OnGetData(_ *peer.Peer, msg *wire.MsgGetData) {
- numAdded := 0
- notFound := wire.NewMsgNotFound()
-
- length := len(msg.InvList)
- // A decaying ban score increase is applied to prevent exhausting resources
- // with unusually large inventory queries.
- // Requesting more than the maximum inventory vector length within a short
- // period of time yields a score above the default ban threshold. Sustained
- // bursts of small requests are not penalized as that would potentially ban
- // peers performing IBD.
- // This incremental score decays each minute to half of its value.
- sp.addBanScore(0, uint32(length)*99/wire.MaxInvPerMsg, "getdata")
-
- // We wait on this wait channel periodically to prevent queuing
- // far more data than we can send in a reasonable time, wasting memory.
- // The waiting occurs after the database fetch for the next one to
- // provide a little pipelining.
- var waitChan chan struct{}
- doneChan := make(chan struct{}, 1)
-
- for i, iv := range msg.InvList {
- var c chan struct{}
- // If this will be the last message we send.
- if i == length-1 && len(notFound.InvList) == 0 {
- c = doneChan
- } else if (i+1)%3 == 0 {
- // Buffered so as to not make the send goroutine block.
- c = make(chan struct{}, 1)
- }
- var err error
- switch iv.Type {
- case wire.InvTypeWitnessTx:
- err = sp.server.pushTxMsg(sp, &iv.Hash, c, waitChan, wire.WitnessEncoding)
- case wire.InvTypeTx:
- err = sp.server.pushTxMsg(sp, &iv.Hash, c, waitChan, wire.BaseEncoding)
- case wire.InvTypeWitnessBlock:
- err = sp.server.pushBlockMsg(sp, &iv.Hash, c, waitChan, wire.WitnessEncoding)
- case wire.InvTypeBlock:
- err = sp.server.pushBlockMsg(sp, &iv.Hash, c, waitChan, wire.BaseEncoding)
- case wire.InvTypeFilteredWitnessBlock:
- err = sp.server.pushMerkleBlockMsg(sp, &iv.Hash, c, waitChan, wire.WitnessEncoding)
- case wire.InvTypeFilteredBlock:
- err = sp.server.pushMerkleBlockMsg(sp, &iv.Hash, c, waitChan, wire.BaseEncoding)
- default:
- peerLog.Warnf("Unknown type in inventory request %d",
- iv.Type)
- continue
- }
- if err != nil {
- notFound.AddInvVect(iv)
-
- // When there is a failure fetching the final entry
- // and the done channel was sent in due to there
- // being no outstanding not found inventory, consume
- // it here because there is now not found inventory
- // that will use the channel momentarily.
- if i == len(msg.InvList)-1 && c != nil {
- <-c
- }
- }
- numAdded++
- waitChan = c
- }
- if len(notFound.InvList) != 0 {
- sp.QueueMessage(notFound, doneChan)
- }
-
- // Wait for messages to be sent. We can send quite a lot of data at this
- // point and this will keep the peer busy for a decent amount of time.
- // We don't process anything else by them in this time so that we
- // have an idea of when we should hear back from them - else the idle
- // timeout could fire when we were only half done sending the blocks.
- if numAdded > 0 {
- <-doneChan
- }
-}
-
-// OnGetBlocks is invoked when a peer receives a getblocks bitcoin
-// message.
-func (sp *serverPeer) OnGetBlocks(_ *peer.Peer, msg *wire.MsgGetBlocks) {
- // Find the most recent known block in the best chain based on the block
- // locator and fetch all of the block hashes after it until either
- // wire.MaxBlocksPerMsg have been fetched or the provided stop hash is
- // encountered.
- //
- // Use the block after the genesis block if no other blocks in the
- // provided locator are known. This does mean the client will start
- // over with the genesis block if unknown block locators are provided.
- //
- // This mirrors the behavior in the reference implementation.
- chain := sp.server.chain
- hashList := chain.LocateBlocks(msg.BlockLocatorHashes, &msg.HashStop,
- wire.MaxBlocksPerMsg)
-
- // Generate inventory message.
- invMsg := wire.NewMsgInv()
- for i := range hashList {
- iv := wire.NewInvVect(wire.InvTypeBlock, &hashList[i])
- invMsg.AddInvVect(iv)
- }
-
- // Send the inventory message if there is anything to send.
- if len(invMsg.InvList) > 0 {
- invListLen := len(invMsg.InvList)
- if invListLen == wire.MaxBlocksPerMsg {
- // Intentionally use a copy of the final hash so there
- // is not a reference into the inventory slice which
- // would prevent the entire slice from being eligible
- // for GC as soon as it's sent.
- continueHash := invMsg.InvList[invListLen-1].Hash
- sp.continueHash = &continueHash
- }
- sp.QueueMessage(invMsg, nil)
- }
-}
-
-// OnGetHeaders is invoked when a peer receives a getheaders bitcoin
-// message.
-func (sp *serverPeer) OnGetHeaders(_ *peer.Peer, msg *wire.MsgGetHeaders) {
- // Ignore getheaders requests if not in sync.
- if !sp.server.syncManager.IsCurrent() {
- return
- }
-
- // Find the most recent known block in the best chain based on the block
- // locator and fetch all of the headers after it until either
- // wire.MaxBlockHeadersPerMsg have been fetched or the provided stop
- // hash is encountered.
- //
- // Use the block after the genesis block if no other blocks in the
- // provided locator are known. This does mean the client will start
- // over with the genesis block if unknown block locators are provided.
- //
- // This mirrors the behavior in the reference implementation.
- chain := sp.server.chain
- headers := chain.LocateHeaders(msg.BlockLocatorHashes, &msg.HashStop)
- if len(headers) == 0 {
- // Nothing to send.
- return
- }
-
- // Send found headers to the requesting peer.
- blockHeaders := make([]*wire.BlockHeader, len(headers))
- for i := range headers {
- blockHeaders[i] = &headers[i]
- }
- sp.QueueMessage(&wire.MsgHeaders{Headers: blockHeaders}, nil)
-}
-
-// enforceNodeBloomFlag disconnects the peer if the server is not configured to
-// allow bloom filters. Additionally, if the peer has negotiated to a protocol
-// version that is high enough to observe the bloom filter service support bit,
-// it will be banned since it is intentionally violating the protocol.
-func (sp *serverPeer) enforceNodeBloomFlag(cmd string) bool {
- if sp.server.services&wire.SFNodeBloom != wire.SFNodeBloom {
- // Ban the peer if the protocol version is high enough that the
- // peer is knowingly violating the protocol and banning is
- // enabled.
- //
- // NOTE: Even though the addBanScore function already examines
- // whether or not banning is enabled, it is checked here as well
- // to ensure the violation is logged and the peer is
- // disconnected regardless.
- if sp.ProtocolVersion() >= wire.BIP0111Version &&
- !cfg.DisableBanning {
-
- // Disconnect the peer regardless of whether it was
- // banned.
- sp.addBanScore(100, 0, cmd)
- sp.Disconnect()
- return false
- }
-
- // Disconnect the peer regardless of protocol version or banning
- // state.
- peerLog.Debugf("%s sent an unsupported %s request -- "+
- "disconnecting", sp, cmd)
- sp.Disconnect()
- return false
- }
-
- return true
-}
-
-// OnFeeFilter is invoked when a peer receives a feefilter bitcoin message and
-// is used by remote peers to request that no transactions which have a fee rate
-// lower than provided value are inventoried to them. The peer will be
-// disconnected if an invalid fee filter value is provided.
-func (sp *serverPeer) OnFeeFilter(_ *peer.Peer, msg *wire.MsgFeeFilter) {
- // Check that the passed minimum fee is a valid amount.
- if msg.MinFee < 0 || msg.MinFee > btcutil.MaxSatoshi {
- peerLog.Debugf("Peer %v sent an invalid feefilter '%v' -- "+
- "disconnecting", sp, btcutil.Amount(msg.MinFee))
- sp.Disconnect()
- return
- }
-
- atomic.StoreInt64(&sp.feeFilter, msg.MinFee)
-}
-
-// OnFilterAdd is invoked when a peer receives a filteradd bitcoin
-// message and is used by remote peers to add data to an already loaded bloom
-// filter. The peer will be disconnected if a filter is not loaded when this
-// message is received or the server is not configured to allow bloom filters.
-func (sp *serverPeer) OnFilterAdd(_ *peer.Peer, msg *wire.MsgFilterAdd) {
- // Disconnect and/or ban depending on the node bloom services flag and
- // negotiated protocol version.
- if !sp.enforceNodeBloomFlag(msg.Command()) {
- return
- }
-
- if sp.filter.IsLoaded() {
- peerLog.Debugf("%s sent a filteradd request with no filter "+
- "loaded -- disconnecting", sp)
- sp.Disconnect()
- return
- }
-
- sp.filter.Add(msg.Data)
-}
-
-// OnFilterClear is invoked when a peer receives a filterclear bitcoin
-// message and is used by remote peers to clear an already loaded bloom filter.
-// The peer will be disconnected if a filter is not loaded when this message is
-// received or the server is not configured to allow bloom filters.
-func (sp *serverPeer) OnFilterClear(_ *peer.Peer, msg *wire.MsgFilterClear) {
- // Disconnect and/or ban depending on the node bloom services flag and
- // negotiated protocol version.
- if !sp.enforceNodeBloomFlag(msg.Command()) {
- return
- }
-
- if !sp.filter.IsLoaded() {
- peerLog.Debugf("%s sent a filterclear request with no "+
- "filter loaded -- disconnecting", sp)
- sp.Disconnect()
- return
- }
-
- sp.filter.Unload()
-}
-
-// OnFilterLoad is invoked when a peer receives a filterload bitcoin
-// message and it used to load a bloom filter that should be used for
-// delivering merkle blocks and associated transactions that match the filter.
-// The peer will be disconnected if the server is not configured to allow bloom
-// filters.
-func (sp *serverPeer) OnFilterLoad(_ *peer.Peer, msg *wire.MsgFilterLoad) {
- // Disconnect and/or ban depending on the node bloom services flag and
- // negotiated protocol version.
- if !sp.enforceNodeBloomFlag(msg.Command()) {
- return
- }
-
- sp.setDisableRelayTx(false)
-
- sp.filter.Reload(msg)
-}
-
-// OnGetAddr is invoked when a peer receives a getaddr bitcoin message
-// and is used to provide the peer with known addresses from the address
-// manager.
-func (sp *serverPeer) OnGetAddr(_ *peer.Peer, msg *wire.MsgGetAddr) {
- // Don't return any addresses when running on the simulation test
- // network. This helps prevent the network from becoming another
- // public test network since it will not be able to learn about other
- // peers that have not specifically been provided.
- if cfg.SimNet {
- return
- }
-
- // Do not accept getaddr requests from outbound peers. This reduces
- // fingerprinting attacks.
- if !sp.Inbound() {
- peerLog.Debugf("Ignoring getaddr request from outbound peer ",
- "%v", sp)
- return
- }
-
- // Only allow one getaddr request per connection to discourage
- // address stamping of inv announcements.
- if sp.sentAddrs {
- peerLog.Debugf("Ignoring repeated getaddr request from peer ",
- "%v", sp)
- return
- }
- sp.sentAddrs = true
-
- // Get the current known addresses from the address manager.
- addrCache := sp.server.addrManager.AddressCache()
-
- // Push the addresses.
- sp.pushAddrMsg(addrCache)
-}
-
-// OnAddr is invoked when a peer receives an addr bitcoin message and is
-// used to notify the server about advertised addresses.
-func (sp *serverPeer) OnAddr(_ *peer.Peer, msg *wire.MsgAddr) {
- // Ignore addresses when running on the simulation test network. This
- // helps prevent the network from becoming another public test network
- // since it will not be able to learn about other peers that have not
- // specifically been provided.
- if cfg.SimNet {
- return
- }
-
- // Ignore old style addresses which don't include a timestamp.
- if sp.ProtocolVersion() < wire.NetAddressTimeVersion {
- return
- }
-
- // A message that has no addresses is invalid.
- if len(msg.AddrList) == 0 {
- peerLog.Errorf("Command [%s] from %s does not contain any addresses",
- msg.Command(), sp)
- sp.Disconnect()
- return
- }
-
- for _, na := range msg.AddrList {
- // Don't add more address if we're disconnecting.
- if !sp.Connected() {
- return
- }
-
- // Set the timestamp to 5 days ago if it's more than 24 hours
- // in the future so this address is one of the first to be
- // removed when space is needed.
- now := time.Now()
- if na.Timestamp.After(now.Add(time.Minute * 10)) {
- na.Timestamp = now.Add(-1 * time.Hour * 24 * 5)
- }
-
- // Add address to known addresses for this peer.
- sp.addKnownAddresses([]*wire.NetAddress{na})
- }
-
- // Add addresses to server address manager. The address manager handles
- // the details of things such as preventing duplicate addresses, max
- // addresses, and last seen updates.
- // XXX bitcoind gives a 2 hour time penalty here, do we want to do the
- // same?
- sp.server.addrManager.AddAddresses(msg.AddrList, sp.NA())
-}
-
-// OnRead is invoked when a peer receives a message and it is used to update
-// the bytes received by the server.
-func (sp *serverPeer) OnRead(_ *peer.Peer, bytesRead int, msg wire.Message, err error) {
- sp.server.AddBytesReceived(uint64(bytesRead))
-}
-
-// OnWrite is invoked when a peer sends a message and it is used to update
-// the bytes sent by the server.
-func (sp *serverPeer) OnWrite(_ *peer.Peer, bytesWritten int, msg wire.Message, err error) {
- sp.server.AddBytesSent(uint64(bytesWritten))
-}
-
-// randomUint16Number returns a random uint16 in a specified input range. Note
-// that the range is in zeroth ordering; if you pass it 1800, you will get
-// values from 0 to 1800.
-func randomUint16Number(max uint16) uint16 {
- // In order to avoid modulo bias and ensure every possible outcome in
- // [0, max) has equal probability, the random number must be sampled
- // from a random source that has a range limited to a multiple of the
- // modulus.
- var randomNumber uint16
- var limitRange = (math.MaxUint16 / max) * max
- for {
- binary.Read(rand.Reader, binary.LittleEndian, &randomNumber)
- if randomNumber < limitRange {
- return (randomNumber % max)
- }
- }
-}
-
-// AddRebroadcastInventory adds 'iv' to the list of inventories to be
-// rebroadcasted at random intervals until they show up in a block.
-func (s *server) AddRebroadcastInventory(iv *wire.InvVect, data interface{}) {
- // Ignore if shutting down.
- if atomic.LoadInt32(&s.shutdown) != 0 {
- return
- }
-
- s.modifyRebroadcastInv <- broadcastInventoryAdd{invVect: iv, data: data}
-}
-
-// RemoveRebroadcastInventory removes 'iv' from the list of items to be
-// rebroadcasted if present.
-func (s *server) RemoveRebroadcastInventory(iv *wire.InvVect) {
- // Ignore if shutting down.
- if atomic.LoadInt32(&s.shutdown) != 0 {
- return
- }
-
- s.modifyRebroadcastInv <- broadcastInventoryDel(iv)
-}
-
-// relayTransactions generates and relays inventory vectors for all of the
-// passed transactions to all connected peers.
-func (s *server) relayTransactions(txns []*mempool.TxDesc) {
- for _, txD := range txns {
- iv := wire.NewInvVect(wire.InvTypeTx, txD.Tx.Hash())
- s.RelayInventory(iv, txD)
- }
-}
-
-// AnnounceNewTransactions generates and relays inventory vectors and notifies
-// both websocket and getblocktemplate long poll clients of the passed
-// transactions. This function should be called whenever new transactions
-// are added to the mempool.
-func (s *server) AnnounceNewTransactions(txns []*mempool.TxDesc) {
- // Generate and relay inventory vectors for all newly accepted
- // transactions.
- s.relayTransactions(txns)
-
- // Notify both websocket and getblocktemplate long poll clients of all
- // newly accepted transactions.
- if s.rpcServer != nil {
- s.rpcServer.NotifyNewTransactions(txns)
- }
-}
-
-// Transaction has one confirmation on the main chain. Now we can mark it as no
-// longer needing rebroadcasting.
-func (s *server) TransactionConfirmed(tx *btcutil.Tx) {
- // Rebroadcasting is only necessary when the RPC server is active.
- if s.rpcServer == nil {
- return
- }
-
- iv := wire.NewInvVect(wire.InvTypeTx, tx.Hash())
- s.RemoveRebroadcastInventory(iv)
-}
-
-// pushTxMsg sends a tx message for the provided transaction hash to the
-// connected peer. An error is returned if the transaction hash is not known.
-func (s *server) pushTxMsg(sp *serverPeer, hash *chainhash.Hash, doneChan chan<- struct{},
- waitChan <-chan struct{}, encoding wire.MessageEncoding) error {
-
- // Attempt to fetch the requested transaction from the pool. A
- // call could be made to check for existence first, but simply trying
- // to fetch a missing transaction results in the same behavior.
- tx, err := s.txMemPool.FetchTransaction(hash)
- if err != nil {
- peerLog.Tracef("Unable to fetch tx %v from transaction "+
- "pool: %v", hash, err)
-
- if doneChan != nil {
- doneChan <- struct{}{}
- }
- return err
- }
-
- // Once we have fetched data wait for any previous operation to finish.
- if waitChan != nil {
- <-waitChan
- }
-
- sp.QueueMessageWithEncoding(tx.MsgTx(), doneChan, encoding)
-
- return nil
-}
-
-// pushBlockMsg sends a block message for the provided block hash to the
-// connected peer. An error is returned if the block hash is not known.
-func (s *server) pushBlockMsg(sp *serverPeer, hash *chainhash.Hash, doneChan chan<- struct{},
- waitChan <-chan struct{}, encoding wire.MessageEncoding) error {
-
- // Fetch the raw block bytes from the database.
- var blockBytes []byte
- err := sp.server.db.View(func(dbTx database.Tx) error {
- var err error
- blockBytes, err = dbTx.FetchBlock(hash)
- return err
- })
- if err != nil {
- peerLog.Tracef("Unable to fetch requested block hash %v: %v",
- hash, err)
-
- if doneChan != nil {
- doneChan <- struct{}{}
- }
- return err
- }
-
- // Deserialize the block.
- var msgBlock wire.MsgBlock
- err = msgBlock.Deserialize(bytes.NewReader(blockBytes))
- if err != nil {
- peerLog.Tracef("Unable to deserialize requested block hash "+
- "%v: %v", hash, err)
-
- if doneChan != nil {
- doneChan <- struct{}{}
- }
- return err
- }
-
- // Once we have fetched data wait for any previous operation to finish.
- if waitChan != nil {
- <-waitChan
- }
-
- // We only send the channel for this message if we aren't sending
- // an inv straight after.
- var dc chan<- struct{}
- continueHash := sp.continueHash
- sendInv := continueHash != nil && continueHash.IsEqual(hash)
- if !sendInv {
- dc = doneChan
- }
- sp.QueueMessageWithEncoding(&msgBlock, dc, encoding)
-
- // When the peer requests the final block that was advertised in
- // response to a getblocks message which requested more blocks than
- // would fit into a single message, send it a new inventory message
- // to trigger it to issue another getblocks message for the next
- // batch of inventory.
- if sendInv {
- best := sp.server.chain.BestSnapshot()
- invMsg := wire.NewMsgInvSizeHint(1)
- iv := wire.NewInvVect(wire.InvTypeBlock, &best.Hash)
- invMsg.AddInvVect(iv)
- sp.QueueMessage(invMsg, doneChan)
- sp.continueHash = nil
- }
- return nil
-}
-
-// pushMerkleBlockMsg sends a merkleblock message for the provided block hash to
-// the connected peer. Since a merkle block requires the peer to have a filter
-// loaded, this call will simply be ignored if there is no filter loaded. An
-// error is returned if the block hash is not known.
-func (s *server) pushMerkleBlockMsg(sp *serverPeer, hash *chainhash.Hash,
- doneChan chan<- struct{}, waitChan <-chan struct{}, encoding wire.MessageEncoding) error {
-
- // Do not send a response if the peer doesn't have a filter loaded.
- if !sp.filter.IsLoaded() {
- if doneChan != nil {
- doneChan <- struct{}{}
- }
- return nil
- }
-
- // Fetch the raw block bytes from the database.
- blk, err := sp.server.chain.BlockByHash(hash)
- if err != nil {
- peerLog.Tracef("Unable to fetch requested block hash %v: %v",
- hash, err)
-
- if doneChan != nil {
- doneChan <- struct{}{}
- }
- return err
- }
-
- // Generate a merkle block by filtering the requested block according
- // to the filter for the peer.
- merkle, matchedTxIndices := bloom.NewMerkleBlock(blk, sp.filter)
-
- // Once we have fetched data wait for any previous operation to finish.
- if waitChan != nil {
- <-waitChan
- }
-
- // Send the merkleblock. Only send the done channel with this message
- // if no transactions will be sent afterwards.
- var dc chan<- struct{}
- if len(matchedTxIndices) == 0 {
- dc = doneChan
- }
- sp.QueueMessage(merkle, dc)
-
- // Finally, send any matched transactions.
- blkTransactions := blk.MsgBlock().Transactions
- for i, txIndex := range matchedTxIndices {
- // Only send the done channel on the final transaction.
- var dc chan<- struct{}
- if i == len(matchedTxIndices)-1 {
- dc = doneChan
- }
- if txIndex < uint32(len(blkTransactions)) {
- sp.QueueMessageWithEncoding(blkTransactions[txIndex], dc,
- encoding)
- }
- }
-
- return nil
-}
-
-// handleUpdatePeerHeight updates the heights of all peers who were known to
-// announce a block we recently accepted.
-func (s *server) handleUpdatePeerHeights(state *peerState, umsg updatePeerHeightsMsg) {
- state.forAllPeers(func(sp *serverPeer) {
- // The origin peer should already have the updated height.
- if sp.Peer == umsg.originPeer {
- return
- }
-
- // This is a pointer to the underlying memory which doesn't
- // change.
- latestBlkHash := sp.LastAnnouncedBlock()
-
- // Skip this peer if it hasn't recently announced any new blocks.
- if latestBlkHash == nil {
- return
- }
-
- // If the peer has recently announced a block, and this block
- // matches our newly accepted block, then update their block
- // height.
- if *latestBlkHash == *umsg.newHash {
- sp.UpdateLastBlockHeight(umsg.newHeight)
- sp.UpdateLastAnnouncedBlock(nil)
- }
- })
-}
-
-// handleAddPeerMsg deals with adding new peers. It is invoked from the
-// peerHandler goroutine.
-func (s *server) handleAddPeerMsg(state *peerState, sp *serverPeer) bool {
- if sp == nil {
- return false
- }
-
- // Ignore new peers if we're shutting down.
- if atomic.LoadInt32(&s.shutdown) != 0 {
- srvrLog.Infof("New peer %s ignored - server is shutting down", sp)
- sp.Disconnect()
- return false
- }
-
- // Disconnect banned peers.
- host, _, err := net.SplitHostPort(sp.Addr())
- if err != nil {
- srvrLog.Debugf("can't split hostport %v", err)
- sp.Disconnect()
- return false
- }
- if banEnd, ok := state.banned[host]; ok {
- if time.Now().Before(banEnd) {
- srvrLog.Debugf("Peer %s is banned for another %v - disconnecting",
- host, time.Until(banEnd))
- sp.Disconnect()
- return false
- }
-
- srvrLog.Infof("Peer %s is no longer banned", host)
- delete(state.banned, host)
- }
-
- // TODO: Check for max peers from a single IP.
-
- // Limit max number of total peers.
- if state.Count() >= cfg.MaxPeers {
- srvrLog.Infof("Max peers reached [%d] - disconnecting peer %s",
- cfg.MaxPeers, sp)
- sp.Disconnect()
- // TODO: how to handle permanent peers here?
- // they should be rescheduled.
- return false
- }
-
- // Add the new peer and start it.
- srvrLog.Debugf("New peer %s", sp)
- if sp.Inbound() {
- state.inboundPeers[sp.ID()] = sp
- } else {
- state.outboundGroups[addrmgr.GroupKey(sp.NA())]++
- if sp.persistent {
- state.persistentPeers[sp.ID()] = sp
- } else {
- state.outboundPeers[sp.ID()] = sp
- }
- }
-
- return true
-}
-
-// handleDonePeerMsg deals with peers that have signalled they are done. It is
-// invoked from the peerHandler goroutine.
-func (s *server) handleDonePeerMsg(state *peerState, sp *serverPeer) {
- var list map[int32]*serverPeer
- if sp.persistent {
- list = state.persistentPeers
- } else if sp.Inbound() {
- list = state.inboundPeers
- } else {
- list = state.outboundPeers
- }
- if _, ok := list[sp.ID()]; ok {
- if !sp.Inbound() && sp.VersionKnown() {
- state.outboundGroups[addrmgr.GroupKey(sp.NA())]--
- }
- if !sp.Inbound() && sp.connReq != nil {
- s.connManager.Disconnect(sp.connReq.ID())
- }
- delete(list, sp.ID())
- srvrLog.Debugf("Removed peer %s", sp)
- return
- }
-
- if sp.connReq != nil {
- s.connManager.Disconnect(sp.connReq.ID())
- }
-
- // Update the address' last seen time if the peer has acknowledged
- // our version and has sent us its version as well.
- if sp.VerAckReceived() && sp.VersionKnown() && sp.NA() != nil {
- s.addrManager.Connected(sp.NA())
- }
-
- // If we get here it means that either we didn't know about the peer
- // or we purposefully deleted it.
-}
-
-// handleBanPeerMsg deals with banning peers. It is invoked from the
-// peerHandler goroutine.
-func (s *server) handleBanPeerMsg(state *peerState, sp *serverPeer) {
- host, _, err := net.SplitHostPort(sp.Addr())
- if err != nil {
- srvrLog.Debugf("can't split ban peer %s %v", sp.Addr(), err)
- return
- }
- direction := directionString(sp.Inbound())
- srvrLog.Infof("Banned peer %s (%s) for %v", host, direction,
- cfg.BanDuration)
- state.banned[host] = time.Now().Add(cfg.BanDuration)
-}
-
-// handleRelayInvMsg deals with relaying inventory to peers that are not already
-// known to have it. It is invoked from the peerHandler goroutine.
-func (s *server) handleRelayInvMsg(state *peerState, msg relayMsg) {
- state.forAllPeers(func(sp *serverPeer) {
- if !sp.Connected() {
- return
- }
-
- // If the inventory is a block and the peer prefers headers,
- // generate and send a headers message instead of an inventory
- // message.
- if msg.invVect.Type == wire.InvTypeBlock && sp.WantsHeaders() {
- blockHeader, ok := msg.data.(wire.BlockHeader)
- if !ok {
- peerLog.Warnf("Underlying data for headers" +
- " is not a block header")
- return
- }
- msgHeaders := wire.NewMsgHeaders()
- if err := msgHeaders.AddBlockHeader(&blockHeader); err != nil {
- peerLog.Errorf("Failed to add block"+
- " header: %v", err)
- return
- }
- sp.QueueMessage(msgHeaders, nil)
- return
- }
-
- if msg.invVect.Type == wire.InvTypeTx {
- // Don't relay the transaction to the peer when it has
- // transaction relaying disabled.
- if sp.relayTxDisabled() {
- return
- }
-
- txD, ok := msg.data.(*mempool.TxDesc)
- if !ok {
- peerLog.Warnf("Underlying data for tx inv "+
- "relay is not a *mempool.TxDesc: %T",
- msg.data)
- return
- }
-
- // Don't relay the transaction if the transaction fee-per-kb
- // is less than the peer's feefilter.
- feeFilter := atomic.LoadInt64(&sp.feeFilter)
- if feeFilter > 0 && txD.FeePerKB < feeFilter {
- return
- }
-
- // Don't relay the transaction if there is a bloom
- // filter loaded and the transaction doesn't match it.
- if sp.filter.IsLoaded() {
- if !sp.filter.MatchTxAndUpdate(txD.Tx) {
- return
- }
- }
- }
-
- // Queue the inventory to be relayed with the next batch.
- // It will be ignored if the peer is already known to
- // have the inventory.
- sp.QueueInventory(msg.invVect)
- })
-}
-
-// handleBroadcastMsg deals with broadcasting messages to peers. It is invoked
-// from the peerHandler goroutine.
-func (s *server) handleBroadcastMsg(state *peerState, bmsg *broadcastMsg) {
- state.forAllPeers(func(sp *serverPeer) {
- if !sp.Connected() {
- return
- }
-
- for _, ep := range bmsg.excludePeers {
- if sp == ep {
- return
- }
- }
-
- sp.QueueMessage(bmsg.message, nil)
- })
-}
-
-type getConnCountMsg struct {
- reply chan int32
-}
-
-type getPeersMsg struct {
- reply chan []*serverPeer
-}
-
-type getOutboundGroup struct {
- key string
- reply chan int
-}
-
-type getAddedNodesMsg struct {
- reply chan []*serverPeer
-}
-
-type disconnectNodeMsg struct {
- cmp func(*serverPeer) bool
- reply chan error
-}
-
-type connectNodeMsg struct {
- addr string
- permanent bool
- reply chan error
-}
-
-type removeNodeMsg struct {
- cmp func(*serverPeer) bool
- reply chan error
-}
-
-// handleQuery is the central handler for all queries and commands from other
-// goroutines related to peer state.
-func (s *server) handleQuery(state *peerState, querymsg interface{}) {
- switch msg := querymsg.(type) {
- case getConnCountMsg:
- nconnected := int32(0)
- state.forAllPeers(func(sp *serverPeer) {
- if sp.Connected() {
- nconnected++
- }
- })
- msg.reply <- nconnected
-
- case getPeersMsg:
- peers := make([]*serverPeer, 0, state.Count())
- state.forAllPeers(func(sp *serverPeer) {
- if !sp.Connected() {
- return
- }
- peers = append(peers, sp)
- })
- msg.reply <- peers
-
- case connectNodeMsg:
- // TODO: duplicate oneshots?
- // Limit max number of total peers.
- if state.Count() >= cfg.MaxPeers {
- msg.reply <- errors.New("max peers reached")
- return
- }
- for _, peer := range state.persistentPeers {
- if peer.Addr() == msg.addr {
- if msg.permanent {
- msg.reply <- errors.New("peer already connected")
- } else {
- msg.reply <- errors.New("peer exists as a permanent peer")
- }
- return
- }
- }
-
- netAddr, err := addrStringToNetAddr(msg.addr)
- if err != nil {
- msg.reply <- err
- return
- }
-
- // TODO: if too many, nuke a non-perm peer.
- go s.connManager.Connect(&connmgr.ConnReq{
- Addr: netAddr,
- Permanent: msg.permanent,
- })
- msg.reply <- nil
- case removeNodeMsg:
- found := disconnectPeer(state.persistentPeers, msg.cmp, func(sp *serverPeer) {
- // Keep group counts ok since we remove from
- // the list now.
- state.outboundGroups[addrmgr.GroupKey(sp.NA())]--
- })
-
- if found {
- msg.reply <- nil
- } else {
- msg.reply <- errors.New("peer not found")
- }
- case getOutboundGroup:
- count, ok := state.outboundGroups[msg.key]
- if ok {
- msg.reply <- count
- } else {
- msg.reply <- 0
- }
- // Request a list of the persistent (added) peers.
- case getAddedNodesMsg:
- // Respond with a slice of the relevant peers.
- peers := make([]*serverPeer, 0, len(state.persistentPeers))
- for _, sp := range state.persistentPeers {
- peers = append(peers, sp)
- }
- msg.reply <- peers
- case disconnectNodeMsg:
- // Check inbound peers. We pass a nil callback since we don't
- // require any additional actions on disconnect for inbound peers.
- found := disconnectPeer(state.inboundPeers, msg.cmp, nil)
- if found {
- msg.reply <- nil
- return
- }
-
- // Check outbound peers.
- found = disconnectPeer(state.outboundPeers, msg.cmp, func(sp *serverPeer) {
- // Keep group counts ok since we remove from
- // the list now.
- state.outboundGroups[addrmgr.GroupKey(sp.NA())]--
- })
- if found {
- // If there are multiple outbound connections to the same
- // ip:port, continue disconnecting them all until no such
- // peers are found.
- for found {
- found = disconnectPeer(state.outboundPeers, msg.cmp, func(sp *serverPeer) {
- state.outboundGroups[addrmgr.GroupKey(sp.NA())]--
- })
- }
- msg.reply <- nil
- return
- }
-
- msg.reply <- errors.New("peer not found")
- }
-}
-
-// disconnectPeer attempts to drop the connection of a targeted peer in the
-// passed peer list. Targets are identified via usage of the passed
-// `compareFunc`, which should return `true` if the passed peer is the target
-// peer. This function returns true on success and false if the peer is unable
-// to be located. If the peer is found, and the passed callback: `whenFound'
-// isn't nil, we call it with the peer as the argument before it is removed
-// from the peerList, and is disconnected from the server.
-func disconnectPeer(peerList map[int32]*serverPeer, compareFunc func(*serverPeer) bool, whenFound func(*serverPeer)) bool {
- for addr, peer := range peerList {
- if compareFunc(peer) {
- if whenFound != nil {
- whenFound(peer)
- }
-
- // This is ok because we are not continuing
- // to iterate so won't corrupt the loop.
- delete(peerList, addr)
- peer.Disconnect()
- return true
- }
- }
- return false
-}
-
-// newPeerConfig returns the configuration for the given serverPeer.
-func newPeerConfig(sp *serverPeer) *peer.Config {
- return &peer.Config{
- Listeners: peer.MessageListeners{
- OnVersion: sp.OnVersion,
- OnMemPool: sp.OnMemPool,
- OnTx: sp.OnTx,
- OnBlock: sp.OnBlock,
- OnInv: sp.OnInv,
- OnHeaders: sp.OnHeaders,
- OnGetData: sp.OnGetData,
- OnGetBlocks: sp.OnGetBlocks,
- OnGetHeaders: sp.OnGetHeaders,
- OnFeeFilter: sp.OnFeeFilter,
- OnFilterAdd: sp.OnFilterAdd,
- OnFilterClear: sp.OnFilterClear,
- OnFilterLoad: sp.OnFilterLoad,
- OnGetAddr: sp.OnGetAddr,
- OnAddr: sp.OnAddr,
- OnRead: sp.OnRead,
- OnWrite: sp.OnWrite,
-
- // Note: The reference client currently bans peers that send alerts
- // not signed with its key. We could verify against their key, but
- // since the reference client is currently unwilling to support
- // other implementations' alert messages, we will not relay theirs.
- OnAlert: nil,
- },
- NewestBlock: sp.newestBlock,
- HostToNetAddress: sp.server.addrManager.HostToNetAddress,
- Proxy: cfg.Proxy,
- UserAgentName: userAgentName,
- UserAgentVersion: userAgentVersion,
- UserAgentComments: cfg.UserAgentComments,
- ChainParams: sp.server.chainParams,
- Services: sp.server.services,
- DisableRelayTx: cfg.BlocksOnly,
- ProtocolVersion: peer.MaxProtocolVersion,
- }
-}
-
-// inboundPeerConnected is invoked by the connection manager when a new inbound
-// connection is established. It initializes a new inbound server peer
-// instance, associates it with the connection, and starts a goroutine to wait
-// for disconnection.
-func (s *server) inboundPeerConnected(conn net.Conn) {
- sp := newServerPeer(s, false)
- sp.isWhitelisted = isWhitelisted(conn.RemoteAddr())
- sp.Peer = peer.NewInboundPeer(newPeerConfig(sp))
- sp.AssociateConnection(conn)
- go s.peerDoneHandler(sp)
-}
-
-// outboundPeerConnected is invoked by the connection manager when a new
-// outbound connection is established. It initializes a new outbound server
-// peer instance, associates it with the relevant state such as the connection
-// request instance and the connection itself, and finally notifies the address
-// manager of the attempt.
-func (s *server) outboundPeerConnected(c *connmgr.ConnReq, conn net.Conn) {
- sp := newServerPeer(s, c.Permanent)
- p, err := peer.NewOutboundPeer(newPeerConfig(sp), c.Addr.String())
- if err != nil {
- srvrLog.Debugf("Cannot create outbound peer %s: %v", c.Addr, err)
- s.connManager.Disconnect(c.ID())
- }
- sp.Peer = p
- sp.connReq = c
- sp.isWhitelisted = isWhitelisted(conn.RemoteAddr())
- sp.AssociateConnection(conn)
- go s.peerDoneHandler(sp)
- s.addrManager.Attempt(sp.NA())
-}
-
-// peerDoneHandler handles peer disconnects by notifiying the server that it's
-// done along with other performing other desirable cleanup.
-func (s *server) peerDoneHandler(sp *serverPeer) {
- sp.WaitForDisconnect()
- s.donePeers <- sp
-
- // Only tell sync manager we are gone if we ever told it we existed.
- if sp.VersionKnown() {
- s.syncManager.DonePeer(sp.Peer)
-
- // Evict any remaining orphans that were sent by the peer.
- numEvicted := s.txMemPool.RemoveOrphansByTag(mempool.Tag(sp.ID()))
- if numEvicted > 0 {
- txmpLog.Debugf("Evicted %d %s from peer %v (id %d)",
- numEvicted, pickNoun(numEvicted, "orphan",
- "orphans"), sp, sp.ID())
- }
- }
- close(sp.quit)
-}
-
-// peerHandler is used to handle peer operations such as adding and removing
-// peers to and from the server, banning peers, and broadcasting messages to
-// peers. It must be run in a goroutine.
-func (s *server) peerHandler() {
- // Start the address manager and sync manager, both of which are needed
- // by peers. This is done here since their lifecycle is closely tied
- // to this handler and rather than adding more channels to sychronize
- // things, it's easier and slightly faster to simply start and stop them
- // in this handler.
- s.addrManager.Start()
- s.syncManager.Start()
-
- srvrLog.Tracef("Starting peer handler")
-
- state := &peerState{
- inboundPeers: make(map[int32]*serverPeer),
- persistentPeers: make(map[int32]*serverPeer),
- outboundPeers: make(map[int32]*serverPeer),
- banned: make(map[string]time.Time),
- outboundGroups: make(map[string]int),
- }
-
- if !cfg.DisableDNSSeed {
- // Add peers discovered through DNS to the address manager.
- connmgr.SeedFromDNS(activeNetParams.Params, defaultRequiredServices,
- btcdLookup, func(addrs []*wire.NetAddress) {
- // Bitcoind uses a lookup of the dns seeder here. This
- // is rather strange since the values looked up by the
- // DNS seed lookups will vary quite a lot.
- // to replicate this behaviour we put all addresses as
- // having come from the first one.
- s.addrManager.AddAddresses(addrs, addrs[0])
- })
- }
- go s.connManager.Start()
-
-out:
- for {
- select {
- // New peers connected to the server.
- case p := <-s.newPeers:
- s.handleAddPeerMsg(state, p)
-
- // Disconnected peers.
- case p := <-s.donePeers:
- s.handleDonePeerMsg(state, p)
-
- // Block accepted in mainchain or orphan, update peer height.
- case umsg := <-s.peerHeightsUpdate:
- s.handleUpdatePeerHeights(state, umsg)
-
- // Peer to ban.
- case p := <-s.banPeers:
- s.handleBanPeerMsg(state, p)
-
- // New inventory to potentially be relayed to other peers.
- case invMsg := <-s.relayInv:
- s.handleRelayInvMsg(state, invMsg)
-
- // Message to broadcast to all connected peers except those
- // which are excluded by the message.
- case bmsg := <-s.broadcast:
- s.handleBroadcastMsg(state, &bmsg)
-
- case qmsg := <-s.query:
- s.handleQuery(state, qmsg)
-
- case <-s.quit:
- // Disconnect all peers on server shutdown.
- state.forAllPeers(func(sp *serverPeer) {
- srvrLog.Tracef("Shutdown peer %s", sp)
- sp.Disconnect()
- })
- break out
- }
- }
-
- s.connManager.Stop()
- s.syncManager.Stop()
- s.addrManager.Stop()
-
- // Drain channels before exiting so nothing is left waiting around
- // to send.
-cleanup:
- for {
- select {
- case <-s.newPeers:
- case <-s.donePeers:
- case <-s.peerHeightsUpdate:
- case <-s.relayInv:
- case <-s.broadcast:
- case <-s.query:
- default:
- break cleanup
- }
- }
- s.wg.Done()
- srvrLog.Tracef("Peer handler done")
-}
-
-// AddPeer adds a new peer that has already been connected to the server.
-func (s *server) AddPeer(sp *serverPeer) {
- s.newPeers <- sp
-}
-
-// BanPeer bans a peer that has already been connected to the server by ip.
-func (s *server) BanPeer(sp *serverPeer) {
- s.banPeers <- sp
-}
-
-// RelayInventory relays the passed inventory vector to all connected peers
-// that are not already known to have it.
-func (s *server) RelayInventory(invVect *wire.InvVect, data interface{}) {
- s.relayInv <- relayMsg{invVect: invVect, data: data}
-}
-
-// BroadcastMessage sends msg to all peers currently connected to the server
-// except those in the passed peers to exclude.
-func (s *server) BroadcastMessage(msg wire.Message, exclPeers ...*serverPeer) {
- // XXX: Need to determine if this is an alert that has already been
- // broadcast and refrain from broadcasting again.
- bmsg := broadcastMsg{message: msg, excludePeers: exclPeers}
- s.broadcast <- bmsg
-}
-
-// ConnectedCount returns the number of currently connected peers.
-func (s *server) ConnectedCount() int32 {
- replyChan := make(chan int32)
-
- s.query <- getConnCountMsg{reply: replyChan}
-
- return <-replyChan
-}
-
-// OutboundGroupCount returns the number of peers connected to the given
-// outbound group key.
-func (s *server) OutboundGroupCount(key string) int {
- replyChan := make(chan int)
- s.query <- getOutboundGroup{key: key, reply: replyChan}
- return <-replyChan
-}
-
-// AddBytesSent adds the passed number of bytes to the total bytes sent counter
-// for the server. It is safe for concurrent access.
-func (s *server) AddBytesSent(bytesSent uint64) {
- atomic.AddUint64(&s.bytesSent, bytesSent)
-}
-
-// AddBytesReceived adds the passed number of bytes to the total bytes received
-// counter for the server. It is safe for concurrent access.
-func (s *server) AddBytesReceived(bytesReceived uint64) {
- atomic.AddUint64(&s.bytesReceived, bytesReceived)
-}
-
-// NetTotals returns the sum of all bytes received and sent across the network
-// for all peers. It is safe for concurrent access.
-func (s *server) NetTotals() (uint64, uint64) {
- return atomic.LoadUint64(&s.bytesReceived),
- atomic.LoadUint64(&s.bytesSent)
-}
-
-// UpdatePeerHeights updates the heights of all peers who have have announced
-// the latest connected main chain block, or a recognized orphan. These height
-// updates allow us to dynamically refresh peer heights, ensuring sync peer
-// selection has access to the latest block heights for each peer.
-func (s *server) UpdatePeerHeights(latestBlkHash *chainhash.Hash, latestHeight int32, updateSource *peer.Peer) {
- s.peerHeightsUpdate <- updatePeerHeightsMsg{
- newHash: latestBlkHash,
- newHeight: latestHeight,
- originPeer: updateSource,
- }
-}
-
-// rebroadcastHandler keeps track of user submitted inventories that we have
-// sent out but have not yet made it into a block. We periodically rebroadcast
-// them in case our peers restarted or otherwise lost track of them.
-func (s *server) rebroadcastHandler() {
- // Wait 5 min before first tx rebroadcast.
- timer := time.NewTimer(5 * time.Minute)
- pendingInvs := make(map[wire.InvVect]interface{})
-
-out:
- for {
- select {
- case riv := <-s.modifyRebroadcastInv:
- switch msg := riv.(type) {
- // Incoming InvVects are added to our map of RPC txs.
- case broadcastInventoryAdd:
- pendingInvs[*msg.invVect] = msg.data
-
- // When an InvVect has been added to a block, we can
- // now remove it, if it was present.
- case broadcastInventoryDel:
- if _, ok := pendingInvs[*msg]; ok {
- delete(pendingInvs, *msg)
- }
- }
-
- case <-timer.C:
- // Any inventory we have has not made it into a block
- // yet. We periodically resubmit them until they have.
- for iv, data := range pendingInvs {
- ivCopy := iv
- s.RelayInventory(&ivCopy, data)
- }
-
- // Process at a random time up to 30mins (in seconds)
- // in the future.
- timer.Reset(time.Second *
- time.Duration(randomUint16Number(1800)))
-
- case <-s.quit:
- break out
- }
- }
-
- timer.Stop()
-
- // Drain channels before exiting so nothing is left waiting around
- // to send.
-cleanup:
- for {
- select {
- case <-s.modifyRebroadcastInv:
- default:
- break cleanup
- }
- }
- s.wg.Done()
-}
-
-// Start begins accepting connections from peers.
-func (s *server) Start() {
- // Already started?
- if atomic.AddInt32(&s.started, 1) != 1 {
- return
- }
-
- srvrLog.Trace("Starting server")
-
- // Server startup time. Used for the uptime command for uptime calculation.
- s.startupTime = time.Now().Unix()
-
- // Start the peer handler which in turn starts the address and block
- // managers.
- s.wg.Add(1)
- go s.peerHandler()
-
- if s.nat != nil {
- s.wg.Add(1)
- go s.upnpUpdateThread()
- }
-
- if !cfg.DisableRPC {
- s.wg.Add(1)
-
- // Start the rebroadcastHandler, which ensures user tx received by
- // the RPC server are rebroadcast until being included in a block.
- go s.rebroadcastHandler()
-
- s.rpcServer.Start()
- }
-
- // Start the CPU miner if generation is enabled.
- if cfg.Generate {
- s.cpuMiner.Start()
- }
-}
-
-// Stop gracefully shuts down the server by stopping and disconnecting all
-// peers and the main listener.
-func (s *server) Stop() error {
- // Make sure this only happens once.
- if atomic.AddInt32(&s.shutdown, 1) != 1 {
- srvrLog.Infof("Server is already in the process of shutting down")
- return nil
- }
-
- srvrLog.Warnf("Server shutting down")
-
- // Stop the CPU miner if needed
- s.cpuMiner.Stop()
-
- // Shutdown the RPC server if it's not disabled.
- if !cfg.DisableRPC {
- s.rpcServer.Stop()
- }
-
- // Signal the remaining goroutines to quit.
- close(s.quit)
- return nil
-}
-
-// WaitForShutdown blocks until the main listener and peer handlers are stopped.
-func (s *server) WaitForShutdown() {
- s.wg.Wait()
-}
-
-// ScheduleShutdown schedules a server shutdown after the specified duration.
-// It also dynamically adjusts how often to warn the server is going down based
-// on remaining duration.
-func (s *server) ScheduleShutdown(duration time.Duration) {
- // Don't schedule shutdown more than once.
- if atomic.AddInt32(&s.shutdownSched, 1) != 1 {
- return
- }
- srvrLog.Warnf("Server shutdown in %v", duration)
- go func() {
- remaining := duration
- tickDuration := dynamicTickDuration(remaining)
- done := time.After(remaining)
- ticker := time.NewTicker(tickDuration)
- out:
- for {
- select {
- case <-done:
- ticker.Stop()
- s.Stop()
- break out
- case <-ticker.C:
- remaining = remaining - tickDuration
- if remaining < time.Second {
- continue
- }
-
- // Change tick duration dynamically based on remaining time.
- newDuration := dynamicTickDuration(remaining)
- if tickDuration != newDuration {
- tickDuration = newDuration
- ticker.Stop()
- ticker = time.NewTicker(tickDuration)
- }
- srvrLog.Warnf("Server shutdown in %v", remaining)
- }
- }
- }()
-}
-
-// parseListeners splits the list of listen addresses passed in addrs into
-// IPv4 and IPv6 slices and returns them. This allows easy creation of the
-// listeners on the correct interface "tcp4" and "tcp6". It also properly
-// detects addresses which apply to "all interfaces" and adds the address to
-// both slices.
-func parseListeners(addrs []string) ([]string, []string, bool, error) {
- ipv4ListenAddrs := make([]string, 0, len(addrs)*2)
- ipv6ListenAddrs := make([]string, 0, len(addrs)*2)
- haveWildcard := false
-
- for _, addr := range addrs {
- host, _, err := net.SplitHostPort(addr)
- if err != nil {
- // Shouldn't happen due to already being normalized.
- return nil, nil, false, err
- }
-
- // Empty host or host of * on plan9 is both IPv4 and IPv6.
- if host == "" || (host == "*" && runtime.GOOS == "plan9") {
- ipv4ListenAddrs = append(ipv4ListenAddrs, addr)
- ipv6ListenAddrs = append(ipv6ListenAddrs, addr)
- haveWildcard = true
- continue
- }
-
- // Strip IPv6 zone id if present since net.ParseIP does not
- // handle it.
- zoneIndex := strings.LastIndex(host, "%")
- if zoneIndex > 0 {
- host = host[:zoneIndex]
- }
-
- // Parse the IP.
- ip := net.ParseIP(host)
- if ip == nil {
- return nil, nil, false, fmt.Errorf("'%s' is not a "+
- "valid IP address", host)
- }
-
- // To4 returns nil when the IP is not an IPv4 address, so use
- // this determine the address type.
- if ip.To4() == nil {
- ipv6ListenAddrs = append(ipv6ListenAddrs, addr)
- } else {
- ipv4ListenAddrs = append(ipv4ListenAddrs, addr)
- }
- }
- return ipv4ListenAddrs, ipv6ListenAddrs, haveWildcard, nil
-}
-
-func (s *server) upnpUpdateThread() {
- // Go off immediately to prevent code duplication, thereafter we renew
- // lease every 15 minutes.
- timer := time.NewTimer(0 * time.Second)
- lport, _ := strconv.ParseInt(activeNetParams.DefaultPort, 10, 16)
- first := true
-out:
- for {
- select {
- case <-timer.C:
- // TODO: pick external port more cleverly
- // TODO: know which ports we are listening to on an external net.
- // TODO: if specific listen port doesn't work then ask for wildcard
- // listen port?
- // XXX this assumes timeout is in seconds.
- listenPort, err := s.nat.AddPortMapping("tcp", int(lport), int(lport),
- "btcd listen port", 20*60)
- if err != nil {
- srvrLog.Warnf("can't add UPnP port mapping: %v", err)
- }
- if first && err == nil {
- // TODO: look this up periodically to see if upnp domain changed
- // and so did ip.
- externalip, err := s.nat.GetExternalAddress()
- if err != nil {
- srvrLog.Warnf("UPnP can't get external address: %v", err)
- continue out
- }
- na := wire.NewNetAddressIPPort(externalip, uint16(listenPort),
- s.services)
- err = s.addrManager.AddLocalAddress(na, addrmgr.UpnpPrio)
- if err != nil {
- // XXX DeletePortMapping?
- }
- srvrLog.Warnf("Successfully bound via UPnP to %s", addrmgr.NetAddressKey(na))
- first = false
- }
- timer.Reset(time.Minute * 15)
- case <-s.quit:
- break out
- }
- }
-
- timer.Stop()
-
- if err := s.nat.DeletePortMapping("tcp", int(lport), int(lport)); err != nil {
- srvrLog.Warnf("unable to remove UPnP port mapping: %v", err)
- } else {
- srvrLog.Debugf("successfully disestablished UPnP port mapping")
- }
-
- s.wg.Done()
-}
-
-// setupRPCListeners returns a slice of listners that are configured for use
-// with the RPC server depending on the configuration settings for listen
-// addresses and TLS.
-func setupRPCListeners() ([]net.Listener, error) {
- // Setup TLS if not disabled.
- listenFunc := net.Listen
- if !cfg.DisableTLS {
- // Generate the TLS cert and key file if both don't already
- // exist.
- if !fileExists(cfg.RPCKey) && !fileExists(cfg.RPCCert) {
- err := genCertPair(cfg.RPCCert, cfg.RPCKey)
- if err != nil {
- return nil, err
- }
- }
- keypair, err := tls.LoadX509KeyPair(cfg.RPCCert, cfg.RPCKey)
- if err != nil {
- return nil, err
- }
-
- tlsConfig := tls.Config{
- Certificates: []tls.Certificate{keypair},
- MinVersion: tls.VersionTLS12,
- }
-
- // Change the standard net.Listen function to the tls one.
- listenFunc = func(net string, laddr string) (net.Listener, error) {
- return tls.Listen(net, laddr, &tlsConfig)
- }
- }
-
- // TODO: This code is similar to the peer listener code. It should be
- // factored into something shared.
- ipv4Addrs, ipv6Addrs, _, err := parseListeners(cfg.RPCListeners)
- if err != nil {
- return nil, err
- }
- listeners := make([]net.Listener, 0, len(ipv4Addrs)+len(ipv4Addrs))
- for _, addr := range ipv4Addrs {
- listener, err := listenFunc("tcp4", addr)
- if err != nil {
- rpcsLog.Warnf("Can't listen on %s: %v", addr, err)
- continue
- }
- listeners = append(listeners, listener)
- }
-
- for _, addr := range ipv6Addrs {
- listener, err := listenFunc("tcp6", addr)
- if err != nil {
- rpcsLog.Warnf("Can't listen on %s: %v", addr, err)
- continue
- }
- listeners = append(listeners, listener)
- }
-
- return listeners, nil
-}
-
-// newServer returns a new btcd server configured to listen on addr for the
-// bitcoin network type specified by chainParams. Use start to begin accepting
-// connections from peers.
-func newServer(listenAddrs []string, db database.DB, chainParams *chaincfg.Params) (*server, error) {
- services := defaultServices
- if cfg.NoPeerBloomFilters {
- services &^= wire.SFNodeBloom
- }
-
- amgr := addrmgr.New(cfg.DataDir, btcdLookup)
-
- var listeners []net.Listener
- var nat NAT
- if !cfg.DisableListen {
- ipv4Addrs, ipv6Addrs, wildcard, err :=
- parseListeners(listenAddrs)
- if err != nil {
- return nil, err
- }
- listeners = make([]net.Listener, 0, len(ipv4Addrs)+len(ipv6Addrs))
- discover := true
- if len(cfg.ExternalIPs) != 0 {
- discover = false
- // if this fails we have real issues.
- port, _ := strconv.ParseUint(
- activeNetParams.DefaultPort, 10, 16)
-
- for _, sip := range cfg.ExternalIPs {
- eport := uint16(port)
- host, portstr, err := net.SplitHostPort(sip)
- if err != nil {
- // no port, use default.
- host = sip
- } else {
- port, err := strconv.ParseUint(
- portstr, 10, 16)
- if err != nil {
- srvrLog.Warnf("Can not parse "+
- "port from %s for "+
- "externalip: %v", sip,
- err)
- continue
- }
- eport = uint16(port)
- }
- na, err := amgr.HostToNetAddress(host, eport,
- services)
- if err != nil {
- srvrLog.Warnf("Not adding %s as "+
- "externalip: %v", sip, err)
- continue
- }
-
- err = amgr.AddLocalAddress(na, addrmgr.ManualPrio)
- if err != nil {
- amgrLog.Warnf("Skipping specified external IP: %v", err)
- }
- }
- } else if cfg.Upnp {
- nat, err = Discover()
- if err != nil {
- srvrLog.Warnf("Can't discover upnp: %v", err)
- }
- // nil nat here is fine, just means no upnp on network.
- }
-
- // TODO: nonstandard port...
- if wildcard {
- port, err :=
- strconv.ParseUint(activeNetParams.DefaultPort,
- 10, 16)
- if err != nil {
- // I can't think of a cleaner way to do this...
- goto nowc
- }
- addrs, err := net.InterfaceAddrs()
- for _, a := range addrs {
- ip, _, err := net.ParseCIDR(a.String())
- if err != nil {
- continue
- }
- na := wire.NewNetAddressIPPort(ip,
- uint16(port), services)
- if discover {
- err = amgr.AddLocalAddress(na, addrmgr.InterfacePrio)
- if err != nil {
- amgrLog.Debugf("Skipping local address: %v", err)
- }
- }
- }
- }
- nowc:
-
- for _, addr := range ipv4Addrs {
- listener, err := net.Listen("tcp4", addr)
- if err != nil {
- srvrLog.Warnf("Can't listen on %s: %v", addr,
- err)
- continue
- }
- listeners = append(listeners, listener)
-
- if discover {
- if na, err := amgr.DeserializeNetAddress(addr); err == nil {
- err = amgr.AddLocalAddress(na, addrmgr.BoundPrio)
- if err != nil {
- amgrLog.Warnf("Skipping bound address: %v", err)
- }
- }
- }
- }
-
- for _, addr := range ipv6Addrs {
- listener, err := net.Listen("tcp6", addr)
- if err != nil {
- srvrLog.Warnf("Can't listen on %s: %v", addr,
- err)
- continue
- }
- listeners = append(listeners, listener)
- if discover {
- if na, err := amgr.DeserializeNetAddress(addr); err == nil {
- err = amgr.AddLocalAddress(na, addrmgr.BoundPrio)
- if err != nil {
- amgrLog.Debugf("Skipping bound address: %v", err)
- }
- }
- }
- }
-
- if len(listeners) == 0 {
- return nil, errors.New("no valid listen address")
- }
- }
-
- s := server{
- chainParams: chainParams,
- addrManager: amgr,
- newPeers: make(chan *serverPeer, cfg.MaxPeers),
- donePeers: make(chan *serverPeer, cfg.MaxPeers),
- banPeers: make(chan *serverPeer, cfg.MaxPeers),
- query: make(chan interface{}),
- relayInv: make(chan relayMsg, cfg.MaxPeers),
- broadcast: make(chan broadcastMsg, cfg.MaxPeers),
- quit: make(chan struct{}),
- modifyRebroadcastInv: make(chan interface{}),
- peerHeightsUpdate: make(chan updatePeerHeightsMsg),
- nat: nat,
- db: db,
- timeSource: blockchain.NewMedianTime(),
- services: services,
- sigCache: txscript.NewSigCache(cfg.SigCacheMaxSize),
- hashCache: txscript.NewHashCache(cfg.SigCacheMaxSize),
- }
-
- // Create the transaction and address indexes if needed.
- //
- // CAUTION: the txindex needs to be first in the indexes array because
- // the addrindex uses data from the txindex during catchup. If the
- // addrindex is run first, it may not have the transactions from the
- // current block indexed.
- var indexes []indexers.Indexer
- if cfg.TxIndex || cfg.AddrIndex {
- // Enable transaction index if address index is enabled since it
- // requires it.
- if !cfg.TxIndex {
- indxLog.Infof("Transaction index enabled because it " +
- "is required by the address index")
- cfg.TxIndex = true
- } else {
- indxLog.Info("Transaction index is enabled")
- }
-
- s.txIndex = indexers.NewTxIndex(db)
- indexes = append(indexes, s.txIndex)
- }
- if cfg.AddrIndex {
- indxLog.Info("Address index is enabled")
- s.addrIndex = indexers.NewAddrIndex(db, chainParams)
- indexes = append(indexes, s.addrIndex)
- }
-
- // Create an index manager if any of the optional indexes are enabled.
- var indexManager blockchain.IndexManager
- if len(indexes) > 0 {
- indexManager = indexers.NewManager(db, indexes)
- }
-
- // Merge given checkpoints with the default ones unless they are disabled.
- var checkpoints []chaincfg.Checkpoint
- if !cfg.DisableCheckpoints {
- checkpoints = mergeCheckpoints(s.chainParams.Checkpoints, cfg.addCheckpoints)
- }
-
- // Create a new block chain instance with the appropriate configuration.
- var err error
- s.chain, err = blockchain.New(&blockchain.Config{
- DB: s.db,
- ChainParams: s.chainParams,
- Checkpoints: checkpoints,
- TimeSource: s.timeSource,
- SigCache: s.sigCache,
- IndexManager: indexManager,
- HashCache: s.hashCache,
- })
- if err != nil {
- return nil, err
- }
-
- txC := mempool.Config{
- Policy: mempool.Policy{
- DisableRelayPriority: cfg.NoRelayPriority,
- AcceptNonStd: cfg.RelayNonStd,
- FreeTxRelayLimit: cfg.FreeTxRelayLimit,
- MaxOrphanTxs: cfg.MaxOrphanTxs,
- MaxOrphanTxSize: defaultMaxOrphanTxSize,
- MaxSigOpCostPerTx: blockchain.MaxBlockSigOpsCost / 4,
- MinRelayTxFee: cfg.minRelayTxFee,
- MaxTxVersion: 2,
- },
- ChainParams: chainParams,
- FetchUtxoView: s.chain.FetchUtxoView,
- BestHeight: func() int32 { return s.chain.BestSnapshot().Height },
- MedianTimePast: func() time.Time { return s.chain.BestSnapshot().MedianTime },
- CalcSequenceLock: func(tx *btcutil.Tx, view *blockchain.UtxoViewpoint) (*blockchain.SequenceLock, error) {
- return s.chain.CalcSequenceLock(tx, view, true)
- },
- IsDeploymentActive: s.chain.IsDeploymentActive,
- SigCache: s.sigCache,
- HashCache: s.hashCache,
- AddrIndex: s.addrIndex,
- }
- s.txMemPool = mempool.New(&txC)
-
- s.syncManager, err = netsync.New(&netsync.Config{
- PeerNotifier: &s,
- Chain: s.chain,
- TxMemPool: s.txMemPool,
- ChainParams: s.chainParams,
- DisableCheckpoints: cfg.DisableCheckpoints,
- MaxPeers: cfg.MaxPeers,
- })
- if err != nil {
- return nil, err
- }
-
- // Create the mining policy and block template generator based on the
- // configuration options.
- //
- // NOTE: The CPU miner relies on the mempool, so the mempool has to be
- // created before calling the function to create the CPU miner.
- policy := mining.Policy{
- BlockMinWeight: cfg.BlockMinWeight,
- BlockMaxWeight: cfg.BlockMaxWeight,
- BlockMinSize: cfg.BlockMinSize,
- BlockMaxSize: cfg.BlockMaxSize,
- BlockPrioritySize: cfg.BlockPrioritySize,
- TxMinFreeFee: cfg.minRelayTxFee,
- }
- blockTemplateGenerator := mining.NewBlkTmplGenerator(&policy,
- s.chainParams, s.txMemPool, s.chain, s.timeSource,
- s.sigCache, s.hashCache)
- s.cpuMiner = cpuminer.New(&cpuminer.Config{
- ChainParams: chainParams,
- BlockTemplateGenerator: blockTemplateGenerator,
- MiningAddrs: cfg.miningAddrs,
- ProcessBlock: s.syncManager.ProcessBlock,
- ConnectedCount: s.ConnectedCount,
- IsCurrent: s.syncManager.IsCurrent,
- })
-
- // Only setup a function to return new addresses to connect to when
- // not running in connect-only mode. The simulation network is always
- // in connect-only mode since it is only intended to connect to
- // specified peers and actively avoid advertising and connecting to
- // discovered peers in order to prevent it from becoming a public test
- // network.
- var newAddressFunc func() (net.Addr, error)
- if !cfg.SimNet && len(cfg.ConnectPeers) == 0 {
- newAddressFunc = func() (net.Addr, error) {
- for tries := 0; tries < 100; tries++ {
- addr := s.addrManager.GetAddress()
- if addr == nil {
- break
- }
-
- // Address will not be invalid, local or unroutable
- // because addrmanager rejects those on addition.
- // Just check that we don't already have an address
- // in the same group so that we are not connecting
- // to the same network segment at the expense of
- // others.
- key := addrmgr.GroupKey(addr.NetAddress())
- if s.OutboundGroupCount(key) != 0 {
- continue
- }
-
- // only allow recent nodes (10mins) after we failed 30
- // times
- if tries < 30 && time.Since(addr.LastAttempt()) < 10*time.Minute {
- continue
- }
-
- // allow nondefault ports after 50 failed tries.
- if tries < 50 && fmt.Sprintf("%d", addr.NetAddress().Port) !=
- activeNetParams.DefaultPort {
- continue
- }
-
- addrString := addrmgr.NetAddressKey(addr.NetAddress())
- return addrStringToNetAddr(addrString)
- }
-
- return nil, errors.New("no valid connect address")
- }
- }
-
- // Create a connection manager.
- targetOutbound := defaultTargetOutbound
- if cfg.MaxPeers < targetOutbound {
- targetOutbound = cfg.MaxPeers
- }
- cmgr, err := connmgr.New(&connmgr.Config{
- Listeners: listeners,
- OnAccept: s.inboundPeerConnected,
- RetryDuration: connectionRetryInterval,
- TargetOutbound: uint32(targetOutbound),
- Dial: btcdDial,
- OnConnection: s.outboundPeerConnected,
- GetNewAddress: newAddressFunc,
- })
- if err != nil {
- return nil, err
- }
- s.connManager = cmgr
-
- // Start up persistent peers.
- permanentPeers := cfg.ConnectPeers
- if len(permanentPeers) == 0 {
- permanentPeers = cfg.AddPeers
- }
- for _, addr := range permanentPeers {
- netAddr, err := addrStringToNetAddr(addr)
- if err != nil {
- return nil, err
- }
-
- go s.connManager.Connect(&connmgr.ConnReq{
- Addr: netAddr,
- Permanent: true,
- })
- }
-
- if !cfg.DisableRPC {
- // Setup listeners for the configured RPC listen addresses and
- // TLS settings.
- rpcListeners, err := setupRPCListeners()
- if err != nil {
- return nil, err
- }
- if len(rpcListeners) == 0 {
- return nil, errors.New("RPCS: No valid listen address")
- }
-
- s.rpcServer, err = newRPCServer(&rpcserverConfig{
- Listeners: rpcListeners,
- StartupTime: s.startupTime,
- ConnMgr: &rpcConnManager{&s},
- SyncMgr: &rpcSyncMgr{&s, s.syncManager},
- TimeSource: s.timeSource,
- Chain: s.chain,
- ChainParams: chainParams,
- DB: db,
- TxMemPool: s.txMemPool,
- Generator: blockTemplateGenerator,
- CPUMiner: s.cpuMiner,
- TxIndex: s.txIndex,
- AddrIndex: s.addrIndex,
- })
- if err != nil {
- return nil, err
- }
-
- // Signal process shutdown when the RPC server requests it.
- go func() {
- <-s.rpcServer.RequestedProcessShutdown()
- shutdownRequestChannel <- struct{}{}
- }()
- }
-
- return &s, nil
-}
-
-// addrStringToNetAddr takes an address in the form of 'host:port' and returns
-// a net.Addr which maps to the original address with any host names resolved
-// to IP addresses. It also handles tor addresses properly by returning a
-// net.Addr that encapsulates the address.
-func addrStringToNetAddr(addr string) (net.Addr, error) {
- host, strPort, err := net.SplitHostPort(addr)
- if err != nil {
- return nil, err
- }
-
- port, err := strconv.Atoi(strPort)
- if err != nil {
- return nil, err
- }
-
- // Skip if host is already an IP address.
- if ip := net.ParseIP(host); ip != nil {
- return &net.TCPAddr{
- IP: ip,
- Port: port,
- }, nil
- }
-
- // Tor addresses cannot be resolved to an IP, so just return an onion
- // address instead.
- if strings.HasSuffix(host, ".onion") {
- if cfg.NoOnion {
- return nil, errors.New("tor has been disabled")
- }
-
- return &onionAddr{addr: addr}, nil
- }
-
- // Attempt to look up an IP address associated with the parsed host.
- ips, err := btcdLookup(host)
- if err != nil {
- return nil, err
- }
- if len(ips) == 0 {
- return nil, fmt.Errorf("no addresses found for %s", host)
- }
-
- return &net.TCPAddr{
- IP: ips[0],
- Port: port,
- }, nil
-}
-
-// dynamicTickDuration is a convenience function used to dynamically choose a
-// tick duration based on remaining time. It is primarily used during
-// server shutdown to make shutdown warnings more frequent as the shutdown time
-// approaches.
-func dynamicTickDuration(remaining time.Duration) time.Duration {
- switch {
- case remaining <= time.Second*5:
- return time.Second
- case remaining <= time.Second*15:
- return time.Second * 5
- case remaining <= time.Minute:
- return time.Second * 15
- case remaining <= time.Minute*5:
- return time.Minute
- case remaining <= time.Minute*15:
- return time.Minute * 5
- case remaining <= time.Hour:
- return time.Minute * 15
- }
- return time.Hour
-}
-
-// isWhitelisted returns whether the IP address is included in the whitelisted
-// networks and IPs.
-func isWhitelisted(addr net.Addr) bool {
- if len(cfg.whitelists) == 0 {
- return false
- }
-
- host, _, err := net.SplitHostPort(addr.String())
- if err != nil {
- srvrLog.Warnf("Unable to SplitHostPort on '%s': %v", addr, err)
- return false
- }
- ip := net.ParseIP(host)
- if ip == nil {
- srvrLog.Warnf("Unable to parse IP '%s'", addr)
- return false
- }
-
- for _, ipnet := range cfg.whitelists {
- if ipnet.Contains(ip) {
- return true
- }
- }
- return false
-}
-
-// checkpointSorter implements sort.Interface to allow a slice of checkpoints to
-// be sorted.
-type checkpointSorter []chaincfg.Checkpoint
-
-// Len returns the number of checkpoints in the slice. It is part of the
-// sort.Interface implementation.
-func (s checkpointSorter) Len() int {
- return len(s)
-}
-
-// Swap swaps the checkpoints at the passed indices. It is part of the
-// sort.Interface implementation.
-func (s checkpointSorter) Swap(i, j int) {
- s[i], s[j] = s[j], s[i]
-}
-
-// Less returns whether the checkpoint with index i should sort before the
-// checkpoint with index j. It is part of the sort.Interface implementation.
-func (s checkpointSorter) Less(i, j int) bool {
- return s[i].Height < s[j].Height
-}
-
-// mergeCheckpoints returns two slices of checkpoints merged into one slice
-// such that the checkpoints are sorted by height. In the case the additional
-// checkpoints contain a checkpoint with the same height as a checkpoint in the
-// default checkpoints, the additional checkpoint will take precedence and
-// overwrite the default one.
-func mergeCheckpoints(defaultCheckpoints, additional []chaincfg.Checkpoint) []chaincfg.Checkpoint {
- // Create a map of the additional checkpoints to remove duplicates while
- // leaving the most recently-specified checkpoint.
- extra := make(map[int32]chaincfg.Checkpoint)
- for _, checkpoint := range additional {
- extra[checkpoint.Height] = checkpoint
- }
-
- // Add all default checkpoints that do not have an override in the
- // additional checkpoints.
- numDefault := len(defaultCheckpoints)
- checkpoints := make([]chaincfg.Checkpoint, 0, numDefault+len(extra))
- for _, checkpoint := range defaultCheckpoints {
- if _, exists := extra[checkpoint.Height]; !exists {
- checkpoints = append(checkpoints, checkpoint)
- }
- }
-
- // Append the additional checkpoints and return the sorted results.
- for _, checkpoint := range extra {
- checkpoints = append(checkpoints, checkpoint)
- }
- sort.Sort(checkpointSorter(checkpoints))
- return checkpoints
-}