1 // Copyright (c) 2013-2017 The btcsuite developers
2 // Use of this source code is governed by an ISC
3 // license that can be found in the LICENSE file.
14 "github.com/btcsuite/btcd/blockchain"
15 "github.com/btcsuite/btcd/chaincfg"
16 "github.com/btcsuite/btcd/chaincfg/chainhash"
17 "github.com/btcsuite/btcd/database"
18 "github.com/btcsuite/btcd/mempool"
19 peerpkg "github.com/btcsuite/btcd/peer"
20 "github.com/btcsuite/btcd/wire"
21 "github.com/btcsuite/btcutil"
25 // minInFlightBlocks is the minimum number of blocks that should be
26 // in the request queue for headers-first mode before requesting
28 minInFlightBlocks = 10
30 // maxRejectedTxns is the maximum number of rejected transactions
31 // hashes to store in memory.
32 maxRejectedTxns = 1000
34 // maxRequestedBlocks is the maximum number of requested block
35 // hashes to store in memory.
36 maxRequestedBlocks = wire.MaxInvPerMsg
38 // maxRequestedTxns is the maximum number of requested transactions
39 // hashes to store in memory.
40 maxRequestedTxns = wire.MaxInvPerMsg
43 // zeroHash is the zero value hash (all zeros). It is defined as a convenience.
44 var zeroHash chainhash.Hash
46 // newPeerMsg signifies a newly connected peer to the block handler.
47 type newPeerMsg struct {
51 // blockMsg packages a bitcoin block message and the peer it came from together
52 // so the block handler has access to that information.
53 type blockMsg struct {
59 // invMsg packages a bitcoin inv message and the peer it came from together
60 // so the block handler has access to that information.
66 // headersMsg packages a bitcoin headers message and the peer it came from
67 // together so the block handler has access to that information.
68 type headersMsg struct {
69 headers *wire.MsgHeaders
73 // donePeerMsg signifies a newly disconnected peer to the block handler.
74 type donePeerMsg struct {
78 // txMsg packages a bitcoin tx message and the peer it came from together
79 // so the block handler has access to that information.
86 // getSyncPeerMsg is a message type to be sent across the message channel for
87 // retrieving the current sync peer.
88 type getSyncPeerMsg struct {
92 // processBlockResponse is a response sent to the reply channel of a
94 type processBlockResponse struct {
99 // processBlockMsg is a message type to be sent across the message channel
100 // for requested a block is processed. Note this call differs from blockMsg
101 // above in that blockMsg is intended for blocks that came from peers and have
102 // extra handling whereas this message essentially is just a concurrent safe
103 // way to call ProcessBlock on the internal block chain instance.
104 type processBlockMsg struct {
106 flags blockchain.BehaviorFlags
107 reply chan processBlockResponse
110 // isCurrentMsg is a message type to be sent across the message channel for
111 // requesting whether or not the sync manager believes it is synced with the
112 // currently connected peers.
113 type isCurrentMsg struct {
117 // pauseMsg is a message type to be sent across the message channel for
118 // pausing the sync manager. This effectively provides the caller with
119 // exclusive access over the manager until a receive is performed on the
121 type pauseMsg struct {
122 unpause <-chan struct{}
125 // headerNode is used as a node in a list of headers that are linked together
126 // between checkpoints.
127 type headerNode struct {
132 // peerSyncState stores additional information that the SyncManager tracks
134 type peerSyncState struct {
136 requestQueue []*wire.InvVect
137 requestedTxns map[chainhash.Hash]struct{}
138 requestedBlocks map[chainhash.Hash]struct{}
141 // SyncManager is used to communicate block related messages with peers. The
142 // SyncManager is started as by executing Start() in a goroutine. Once started,
143 // it selects peers to sync from and starts the initial block download. Once the
144 // chain is in sync, the SyncManager handles incoming block and header
145 // notifications and relays announcements of new blocks to peers.
146 type SyncManager struct {
147 peerNotifier PeerNotifier
150 chain *blockchain.BlockChain
151 txMemPool *mempool.TxPool
152 chainParams *chaincfg.Params
153 progressLogger *blockProgressLogger
154 msgChan chan interface{}
158 // These fields should only be accessed from the blockHandler thread
159 rejectedTxns map[chainhash.Hash]struct{}
160 requestedTxns map[chainhash.Hash]struct{}
161 requestedBlocks map[chainhash.Hash]struct{}
162 syncPeer *peerpkg.Peer
163 peerStates map[*peerpkg.Peer]*peerSyncState
165 // The following fields are used for headers-first mode.
166 headersFirstMode bool
167 headerList *list.List
168 startHeader *list.Element
169 nextCheckpoint *chaincfg.Checkpoint
172 // resetHeaderState sets the headers-first mode state to values appropriate for
173 // syncing from a new peer.
174 func (sm *SyncManager) resetHeaderState(newestHash *chainhash.Hash, newestHeight int32) {
175 sm.headersFirstMode = false
179 // When there is a next checkpoint, add an entry for the latest known
180 // block into the header pool. This allows the next downloaded header
181 // to prove it links to the chain properly.
182 if sm.nextCheckpoint != nil {
183 node := headerNode{height: newestHeight, hash: newestHash}
184 sm.headerList.PushBack(&node)
188 // findNextHeaderCheckpoint returns the next checkpoint after the passed height.
189 // It returns nil when there is not one either because the height is already
190 // later than the final checkpoint or some other reason such as disabled
192 func (sm *SyncManager) findNextHeaderCheckpoint(height int32) *chaincfg.Checkpoint {
193 checkpoints := sm.chain.Checkpoints()
194 if len(checkpoints) == 0 {
198 // There is no next checkpoint if the height is already after the final
200 finalCheckpoint := &checkpoints[len(checkpoints)-1]
201 if height >= finalCheckpoint.Height {
205 // Find the next checkpoint.
206 nextCheckpoint := finalCheckpoint
207 for i := len(checkpoints) - 2; i >= 0; i-- {
208 if height >= checkpoints[i].Height {
211 nextCheckpoint = &checkpoints[i]
213 return nextCheckpoint
216 // startSync will choose the best peer among the available candidate peers to
217 // download/sync the blockchain from. When syncing is already running, it
218 // simply returns. It also examines the candidates for any which are no longer
219 // candidates and removes them as needed.
220 func (sm *SyncManager) startSync() {
221 // Return now if we're already syncing.
222 if sm.syncPeer != nil {
226 // Once the segwit soft-fork package has activated, we only
227 // want to sync from peers which are witness enabled to ensure
228 // that we fully validate all blockchain data.
229 segwitActive, err := sm.chain.IsDeploymentActive(chaincfg.DeploymentSegwit)
231 log.Errorf("Unable to query for segwit soft-fork state: %v", err)
235 best := sm.chain.BestSnapshot()
236 var bestPeer *peerpkg.Peer
237 for peer, state := range sm.peerStates {
238 if !state.syncCandidate {
242 if segwitActive && !peer.IsWitnessEnabled() {
243 log.Debugf("peer %v not witness enabled, skipping", peer)
247 // Remove sync candidate peers that are no longer candidates due
248 // to passing their latest known block. NOTE: The < is
249 // intentional as opposed to <=. While technically the peer
250 // doesn't have a later block when it's equal, it will likely
251 // have one soon so it is a reasonable choice. It also allows
252 // the case where both are at 0 such as during regression test.
253 if peer.LastBlock() < best.Height {
254 state.syncCandidate = false
258 // TODO(davec): Use a better algorithm to choose the best peer.
259 // For now, just pick the first available candidate.
263 // Start syncing from the best peer if one was selected.
265 // Clear the requestedBlocks if the sync peer changes, otherwise
266 // we may ignore blocks we need that the last sync peer failed
268 sm.requestedBlocks = make(map[chainhash.Hash]struct{})
270 locator, err := sm.chain.LatestBlockLocator()
272 log.Errorf("Failed to get block locator for the "+
273 "latest block: %v", err)
277 log.Infof("Syncing to block height %d from peer %v",
278 bestPeer.LastBlock(), bestPeer.Addr())
280 // When the current height is less than a known checkpoint we
281 // can use block headers to learn about which blocks comprise
282 // the chain up to the checkpoint and perform less validation
283 // for them. This is possible since each header contains the
284 // hash of the previous header and a merkle root. Therefore if
285 // we validate all of the received headers link together
286 // properly and the checkpoint hashes match, we can be sure the
287 // hashes for the blocks in between are accurate. Further, once
288 // the full blocks are downloaded, the merkle root is computed
289 // and compared against the value in the header which proves the
290 // full block hasn't been tampered with.
292 // Once we have passed the final checkpoint, or checkpoints are
293 // disabled, use standard inv messages learn about the blocks
294 // and fully validate them. Finally, regression test mode does
295 // not support the headers-first approach so do normal block
296 // downloads when in regression test mode.
297 if sm.nextCheckpoint != nil &&
298 best.Height < sm.nextCheckpoint.Height &&
299 sm.chainParams != &chaincfg.RegressionNetParams {
301 bestPeer.PushGetHeadersMsg(locator, sm.nextCheckpoint.Hash)
302 sm.headersFirstMode = true
303 log.Infof("Downloading headers for blocks %d to "+
304 "%d from peer %s", best.Height+1,
305 sm.nextCheckpoint.Height, bestPeer.Addr())
307 bestPeer.PushGetBlocksMsg(locator, &zeroHash)
309 sm.syncPeer = bestPeer
311 log.Warnf("No sync peer candidates available")
315 // isSyncCandidate returns whether or not the peer is a candidate to consider
317 func (sm *SyncManager) isSyncCandidate(peer *peerpkg.Peer) bool {
318 // Typically a peer is not a candidate for sync if it's not a full node,
319 // however regression test is special in that the regression tool is
320 // not a full node and still needs to be considered a sync candidate.
321 if sm.chainParams == &chaincfg.RegressionNetParams {
322 // The peer is not a candidate if it's not coming from localhost
323 // or the hostname can't be determined for some reason.
324 host, _, err := net.SplitHostPort(peer.Addr())
329 if host != "127.0.0.1" && host != "localhost" {
333 // The peer is not a candidate for sync if it's not a full
334 // node. Additionally, if the segwit soft-fork package has
335 // activated, then the peer must also be upgraded.
336 segwitActive, err := sm.chain.IsDeploymentActive(chaincfg.DeploymentSegwit)
338 log.Errorf("Unable to query for segwit "+
339 "soft-fork state: %v", err)
341 nodeServices := peer.Services()
342 if nodeServices&wire.SFNodeNetwork != wire.SFNodeNetwork ||
343 (segwitActive && !peer.IsWitnessEnabled()) {
348 // Candidate if all checks passed.
352 // handleNewPeerMsg deals with new peers that have signalled they may
353 // be considered as a sync peer (they have already successfully negotiated). It
354 // also starts syncing if needed. It is invoked from the syncHandler goroutine.
355 func (sm *SyncManager) handleNewPeerMsg(peer *peerpkg.Peer) {
356 // Ignore if in the process of shutting down.
357 if atomic.LoadInt32(&sm.shutdown) != 0 {
361 log.Infof("New valid peer %s (%s)", peer, peer.UserAgent())
363 // Initialize the peer state
364 isSyncCandidate := sm.isSyncCandidate(peer)
365 sm.peerStates[peer] = &peerSyncState{
366 syncCandidate: isSyncCandidate,
367 requestedTxns: make(map[chainhash.Hash]struct{}),
368 requestedBlocks: make(map[chainhash.Hash]struct{}),
371 // Start syncing by choosing the best candidate if needed.
372 if isSyncCandidate && sm.syncPeer == nil {
377 // handleDonePeerMsg deals with peers that have signalled they are done. It
378 // removes the peer as a candidate for syncing and in the case where it was
379 // the current sync peer, attempts to select a new best peer to sync from. It
380 // is invoked from the syncHandler goroutine.
381 func (sm *SyncManager) handleDonePeerMsg(peer *peerpkg.Peer) {
382 state, exists := sm.peerStates[peer]
384 log.Warnf("Received done peer message for unknown peer %s", peer)
388 // Remove the peer from the list of candidate peers.
389 delete(sm.peerStates, peer)
391 log.Infof("Lost peer %s", peer)
393 // Remove requested transactions from the global map so that they will
394 // be fetched from elsewhere next time we get an inv.
395 for txHash := range state.requestedTxns {
396 delete(sm.requestedTxns, txHash)
399 // Remove requested blocks from the global map so that they will be
400 // fetched from elsewhere next time we get an inv.
401 // TODO: we could possibly here check which peers have these blocks
402 // and request them now to speed things up a little.
403 for blockHash := range state.requestedBlocks {
404 delete(sm.requestedBlocks, blockHash)
407 // Attempt to find a new peer to sync from if the quitting peer is the
408 // sync peer. Also, reset the headers-first state if in headers-first
410 if sm.syncPeer == peer {
412 if sm.headersFirstMode {
413 best := sm.chain.BestSnapshot()
414 sm.resetHeaderState(&best.Hash, best.Height)
420 // handleTxMsg handles transaction messages from all peers.
421 func (sm *SyncManager) handleTxMsg(tmsg *txMsg) {
423 state, exists := sm.peerStates[peer]
425 log.Warnf("Received tx message from unknown peer %s", peer)
429 // NOTE: BitcoinJ, and possibly other wallets, don't follow the spec of
430 // sending an inventory message and allowing the remote peer to decide
431 // whether or not they want to request the transaction via a getdata
432 // message. Unfortunately, the reference implementation permits
433 // unrequested data, so it has allowed wallets that don't follow the
434 // spec to proliferate. While this is not ideal, there is no check here
435 // to disconnect peers for sending unsolicited transactions to provide
437 txHash := tmsg.tx.Hash()
439 // Ignore transactions that we have already rejected. Do not
440 // send a reject message here because if the transaction was already
441 // rejected, the transaction was unsolicited.
442 if _, exists = sm.rejectedTxns[*txHash]; exists {
443 log.Debugf("Ignoring unsolicited previously rejected "+
444 "transaction %v from %s", txHash, peer)
448 // Process the transaction to include validation, insertion in the
449 // memory pool, orphan handling, etc.
450 acceptedTxs, err := sm.txMemPool.ProcessTransaction(tmsg.tx,
451 true, true, mempool.Tag(peer.ID()))
453 // Remove transaction from request maps. Either the mempool/chain
454 // already knows about it and as such we shouldn't have any more
455 // instances of trying to fetch it, or we failed to insert and thus
456 // we'll retry next time we get an inv.
457 delete(state.requestedTxns, *txHash)
458 delete(sm.requestedTxns, *txHash)
461 // Do not request this transaction again until a new block
462 // has been processed.
463 sm.rejectedTxns[*txHash] = struct{}{}
464 sm.limitMap(sm.rejectedTxns, maxRejectedTxns)
466 // When the error is a rule error, it means the transaction was
467 // simply rejected as opposed to something actually going wrong,
468 // so log it as such. Otherwise, something really did go wrong,
469 // so log it as an actual error.
470 if _, ok := err.(mempool.RuleError); ok {
471 log.Debugf("Rejected transaction %v from %s: %v",
474 log.Errorf("Failed to process transaction %v: %v",
478 // Convert the error into an appropriate reject message and
480 code, reason := mempool.ErrToRejectErr(err)
481 peer.PushRejectMsg(wire.CmdTx, code, reason, txHash, false)
485 sm.peerNotifier.AnnounceNewTransactions(acceptedTxs)
488 // current returns true if we believe we are synced with our peers, false if we
489 // still have blocks to check
490 func (sm *SyncManager) current() bool {
491 if !sm.chain.IsCurrent() {
495 // if blockChain thinks we are current and we have no syncPeer it
496 // is probably right.
497 if sm.syncPeer == nil {
501 // No matter what chain thinks, if we are below the block we are syncing
502 // to we are not current.
503 if sm.chain.BestSnapshot().Height < sm.syncPeer.LastBlock() {
509 // handleBlockMsg handles block messages from all peers.
510 func (sm *SyncManager) handleBlockMsg(bmsg *blockMsg) {
512 state, exists := sm.peerStates[peer]
514 log.Warnf("Received block message from unknown peer %s", peer)
518 // If we didn't ask for this block then the peer is misbehaving.
519 blockHash := bmsg.block.Hash()
520 if _, exists = state.requestedBlocks[*blockHash]; !exists {
521 // The regression test intentionally sends some blocks twice
522 // to test duplicate block insertion fails. Don't disconnect
523 // the peer or ignore the block when we're in regression test
524 // mode in this case so the chain code is actually fed the
526 if sm.chainParams != &chaincfg.RegressionNetParams {
527 log.Warnf("Got unrequested block %v from %s -- "+
528 "disconnecting", blockHash, peer.Addr())
534 // When in headers-first mode, if the block matches the hash of the
535 // first header in the list of headers that are being fetched, it's
536 // eligible for less validation since the headers have already been
537 // verified to link together and are valid up to the next checkpoint.
538 // Also, remove the list entry for all blocks except the checkpoint
539 // since it is needed to verify the next round of headers links
541 isCheckpointBlock := false
542 behaviorFlags := blockchain.BFNone
543 if sm.headersFirstMode {
544 firstNodeEl := sm.headerList.Front()
545 if firstNodeEl != nil {
546 firstNode := firstNodeEl.Value.(*headerNode)
547 if blockHash.IsEqual(firstNode.hash) {
548 behaviorFlags |= blockchain.BFFastAdd
549 if firstNode.hash.IsEqual(sm.nextCheckpoint.Hash) {
550 isCheckpointBlock = true
552 sm.headerList.Remove(firstNodeEl)
558 // Remove block from request maps. Either chain will know about it and
559 // so we shouldn't have any more instances of trying to fetch it, or we
560 // will fail the insert and thus we'll retry next time we get an inv.
561 delete(state.requestedBlocks, *blockHash)
562 delete(sm.requestedBlocks, *blockHash)
564 // Process the block to include validation, best chain selection, orphan
566 _, isOrphan, err := sm.chain.ProcessBlock(bmsg.block, behaviorFlags)
568 // When the error is a rule error, it means the block was simply
569 // rejected as opposed to something actually going wrong, so log
570 // it as such. Otherwise, something really did go wrong, so log
571 // it as an actual error.
572 if _, ok := err.(blockchain.RuleError); ok {
573 log.Infof("Rejected block %v from %s: %v", blockHash,
576 log.Errorf("Failed to process block %v: %v",
579 if dbErr, ok := err.(database.Error); ok && dbErr.ErrorCode ==
580 database.ErrCorruption {
584 // Convert the error into an appropriate reject message and
586 code, reason := mempool.ErrToRejectErr(err)
587 peer.PushRejectMsg(wire.CmdBlock, code, reason, blockHash, false)
591 // Meta-data about the new block this peer is reporting. We use this
592 // below to update this peer's lastest block height and the heights of
593 // other peers based on their last announced block hash. This allows us
594 // to dynamically update the block heights of peers, avoiding stale
595 // heights when looking for a new sync peer. Upon acceptance of a block
596 // or recognition of an orphan, we also use this information to update
597 // the block heights over other peers who's invs may have been ignored
598 // if we are actively syncing while the chain is not yet current or
599 // who may have lost the lock announcment race.
600 var heightUpdate int32
601 var blkHashUpdate *chainhash.Hash
603 // Request the parents for the orphan block from the peer that sent it.
605 // We've just received an orphan block from a peer. In order
606 // to update the height of the peer, we try to extract the
607 // block height from the scriptSig of the coinbase transaction.
608 // Extraction is only attempted if the block's version is
609 // high enough (ver 2+).
610 header := &bmsg.block.MsgBlock().Header
611 if blockchain.ShouldHaveSerializedBlockHeight(header) {
612 coinbaseTx := bmsg.block.Transactions()[0]
613 cbHeight, err := blockchain.ExtractCoinbaseHeight(coinbaseTx)
615 log.Warnf("Unable to extract height from "+
616 "coinbase tx: %v", err)
618 log.Debugf("Extracted height of %v from "+
619 "orphan block", cbHeight)
620 heightUpdate = cbHeight
621 blkHashUpdate = blockHash
625 orphanRoot := sm.chain.GetOrphanRoot(blockHash)
626 locator, err := sm.chain.LatestBlockLocator()
628 log.Warnf("Failed to get block locator for the "+
629 "latest block: %v", err)
631 peer.PushGetBlocksMsg(locator, orphanRoot)
634 // When the block is not an orphan, log information about it and
635 // update the chain state.
636 sm.progressLogger.LogBlockHeight(bmsg.block)
638 // Update this peer's latest block height, for future
639 // potential sync node candidacy.
640 best := sm.chain.BestSnapshot()
641 heightUpdate = best.Height
642 blkHashUpdate = &best.Hash
644 // Clear the rejected transactions.
645 sm.rejectedTxns = make(map[chainhash.Hash]struct{})
648 // Update the block height for this peer. But only send a message to
649 // the server for updating peer heights if this is an orphan or our
650 // chain is "current". This avoids sending a spammy amount of messages
651 // if we're syncing the chain from scratch.
652 if blkHashUpdate != nil && heightUpdate != 0 {
653 peer.UpdateLastBlockHeight(heightUpdate)
654 if isOrphan || sm.current() {
655 go sm.peerNotifier.UpdatePeerHeights(blkHashUpdate, heightUpdate,
660 // Nothing more to do if we aren't in headers-first mode.
661 if !sm.headersFirstMode {
665 // This is headers-first mode, so if the block is not a checkpoint
666 // request more blocks using the header list when the request queue is
668 if !isCheckpointBlock {
669 if sm.startHeader != nil &&
670 len(state.requestedBlocks) < minInFlightBlocks {
671 sm.fetchHeaderBlocks()
676 // This is headers-first mode and the block is a checkpoint. When
677 // there is a next checkpoint, get the next round of headers by asking
678 // for headers starting from the block after this one up to the next
680 prevHeight := sm.nextCheckpoint.Height
681 prevHash := sm.nextCheckpoint.Hash
682 sm.nextCheckpoint = sm.findNextHeaderCheckpoint(prevHeight)
683 if sm.nextCheckpoint != nil {
684 locator := blockchain.BlockLocator([]*chainhash.Hash{prevHash})
685 err := peer.PushGetHeadersMsg(locator, sm.nextCheckpoint.Hash)
687 log.Warnf("Failed to send getheaders message to "+
688 "peer %s: %v", peer.Addr(), err)
691 log.Infof("Downloading headers for blocks %d to %d from "+
692 "peer %s", prevHeight+1, sm.nextCheckpoint.Height,
697 // This is headers-first mode, the block is a checkpoint, and there are
698 // no more checkpoints, so switch to normal mode by requesting blocks
699 // from the block after this one up to the end of the chain (zero hash).
700 sm.headersFirstMode = false
702 log.Infof("Reached the final checkpoint -- switching to normal mode")
703 locator := blockchain.BlockLocator([]*chainhash.Hash{blockHash})
704 err = peer.PushGetBlocksMsg(locator, &zeroHash)
706 log.Warnf("Failed to send getblocks message to peer %s: %v",
712 // fetchHeaderBlocks creates and sends a request to the syncPeer for the next
713 // list of blocks to be downloaded based on the current list of headers.
714 func (sm *SyncManager) fetchHeaderBlocks() {
715 // Nothing to do if there is no start header.
716 if sm.startHeader == nil {
717 log.Warnf("fetchHeaderBlocks called with no start header")
721 // Build up a getdata request for the list of blocks the headers
722 // describe. The size hint will be limited to wire.MaxInvPerMsg by
723 // the function, so no need to double check it here.
724 gdmsg := wire.NewMsgGetDataSizeHint(uint(sm.headerList.Len()))
726 for e := sm.startHeader; e != nil; e = e.Next() {
727 node, ok := e.Value.(*headerNode)
729 log.Warn("Header list node type is not a headerNode")
733 iv := wire.NewInvVect(wire.InvTypeBlock, node.hash)
734 haveInv, err := sm.haveInventory(iv)
736 log.Warnf("Unexpected failure when checking for "+
737 "existing inventory during header block "+
741 syncPeerState := sm.peerStates[sm.syncPeer]
743 sm.requestedBlocks[*node.hash] = struct{}{}
744 syncPeerState.requestedBlocks[*node.hash] = struct{}{}
746 // If we're fetching from a witness enabled peer
747 // post-fork, then ensure that we receive all the
748 // witness data in the blocks.
749 if sm.syncPeer.IsWitnessEnabled() {
750 iv.Type = wire.InvTypeWitnessBlock
756 sm.startHeader = e.Next()
757 if numRequested >= wire.MaxInvPerMsg {
761 if len(gdmsg.InvList) > 0 {
762 sm.syncPeer.QueueMessage(gdmsg, nil)
766 // handleHeadersMsg handles block header messages from all peers. Headers are
767 // requested when performing a headers-first sync.
768 func (sm *SyncManager) handleHeadersMsg(hmsg *headersMsg) {
770 _, exists := sm.peerStates[peer]
772 log.Warnf("Received headers message from unknown peer %s", peer)
776 // The remote peer is misbehaving if we didn't request headers.
778 numHeaders := len(msg.Headers)
779 if !sm.headersFirstMode {
780 log.Warnf("Got %d unrequested headers from %s -- "+
781 "disconnecting", numHeaders, peer.Addr())
786 // Nothing to do for an empty headers message.
791 // Process all of the received headers ensuring each one connects to the
792 // previous and that checkpoints match.
793 receivedCheckpoint := false
794 var finalHash *chainhash.Hash
795 for _, blockHeader := range msg.Headers {
796 blockHash := blockHeader.BlockHash()
797 finalHash = &blockHash
799 // Ensure there is a previous header to compare against.
800 prevNodeEl := sm.headerList.Back()
801 if prevNodeEl == nil {
802 log.Warnf("Header list does not contain a previous" +
803 "element as expected -- disconnecting peer")
808 // Ensure the header properly connects to the previous one and
809 // add it to the list of headers.
810 node := headerNode{hash: &blockHash}
811 prevNode := prevNodeEl.Value.(*headerNode)
812 if prevNode.hash.IsEqual(&blockHeader.PrevBlock) {
813 node.height = prevNode.height + 1
814 e := sm.headerList.PushBack(&node)
815 if sm.startHeader == nil {
819 log.Warnf("Received block header that does not "+
820 "properly connect to the chain from peer %s "+
821 "-- disconnecting", peer.Addr())
826 // Verify the header at the next checkpoint height matches.
827 if node.height == sm.nextCheckpoint.Height {
828 if node.hash.IsEqual(sm.nextCheckpoint.Hash) {
829 receivedCheckpoint = true
830 log.Infof("Verified downloaded block "+
831 "header against checkpoint at height "+
832 "%d/hash %s", node.height, node.hash)
834 log.Warnf("Block header at height %d/hash "+
835 "%s from peer %s does NOT match "+
836 "expected checkpoint hash of %s -- "+
837 "disconnecting", node.height,
838 node.hash, peer.Addr(),
839 sm.nextCheckpoint.Hash)
847 // When this header is a checkpoint, switch to fetching the blocks for
848 // all of the headers since the last checkpoint.
849 if receivedCheckpoint {
850 // Since the first entry of the list is always the final block
851 // that is already in the database and is only used to ensure
852 // the next header links properly, it must be removed before
853 // fetching the blocks.
854 sm.headerList.Remove(sm.headerList.Front())
855 log.Infof("Received %v block headers: Fetching blocks",
857 sm.progressLogger.SetLastLogTime(time.Now())
858 sm.fetchHeaderBlocks()
862 // This header is not a checkpoint, so request the next batch of
863 // headers starting from the latest known header and ending with the
865 locator := blockchain.BlockLocator([]*chainhash.Hash{finalHash})
866 err := peer.PushGetHeadersMsg(locator, sm.nextCheckpoint.Hash)
868 log.Warnf("Failed to send getheaders message to "+
869 "peer %s: %v", peer.Addr(), err)
874 // haveInventory returns whether or not the inventory represented by the passed
875 // inventory vector is known. This includes checking all of the various places
876 // inventory can be when it is in different states such as blocks that are part
877 // of the main chain, on a side chain, in the orphan pool, and transactions that
878 // are in the memory pool (either the main pool or orphan pool).
879 func (sm *SyncManager) haveInventory(invVect *wire.InvVect) (bool, error) {
880 switch invVect.Type {
881 case wire.InvTypeWitnessBlock:
883 case wire.InvTypeBlock:
884 // Ask chain if the block is known to it in any form (main
885 // chain, side chain, or orphan).
886 return sm.chain.HaveBlock(&invVect.Hash)
888 case wire.InvTypeWitnessTx:
891 // Ask the transaction memory pool if the transaction is known
892 // to it in any form (main pool or orphan).
893 if sm.txMemPool.HaveTransaction(&invVect.Hash) {
897 // Check if the transaction exists from the point of view of the
898 // end of the main chain.
899 entry, err := sm.chain.FetchUtxoEntry(&invVect.Hash)
903 return entry != nil && !entry.IsFullySpent(), nil
906 // The requested inventory is is an unsupported type, so just claim
907 // it is known to avoid requesting it.
911 // handleInvMsg handles inv messages from all peers.
912 // We examine the inventory advertised by the remote peer and act accordingly.
913 func (sm *SyncManager) handleInvMsg(imsg *invMsg) {
915 state, exists := sm.peerStates[peer]
917 log.Warnf("Received inv message from unknown peer %s", peer)
921 // Attempt to find the final block in the inventory list. There may
924 invVects := imsg.inv.InvList
925 for i := len(invVects) - 1; i >= 0; i-- {
926 if invVects[i].Type == wire.InvTypeBlock {
932 // If this inv contains a block announcement, and this isn't coming from
933 // our current sync peer or we're current, then update the last
934 // announced block for this peer. We'll use this information later to
935 // update the heights of peers based on blocks we've accepted that they
936 // previously announced.
937 if lastBlock != -1 && (peer != sm.syncPeer || sm.current()) {
938 peer.UpdateLastAnnouncedBlock(&invVects[lastBlock].Hash)
941 // Ignore invs from peers that aren't the sync if we are not current.
942 // Helps prevent fetching a mass of orphans.
943 if peer != sm.syncPeer && !sm.current() {
947 // If our chain is current and a peer announces a block we already
948 // know of, then update their current block height.
949 if lastBlock != -1 && sm.current() {
950 blkHeight, err := sm.chain.BlockHeightByHash(&invVects[lastBlock].Hash)
952 peer.UpdateLastBlockHeight(blkHeight)
956 // Request the advertised inventory if we don't already have it. Also,
957 // request parent blocks of orphans if we receive one we already have.
958 // Finally, attempt to detect potential stalls due to long side chains
959 // we already have and request more blocks to prevent them.
960 for i, iv := range invVects {
961 // Ignore unsupported inventory types.
963 case wire.InvTypeBlock:
965 case wire.InvTypeWitnessBlock:
966 case wire.InvTypeWitnessTx:
971 // Add the inventory to the cache of known inventory
973 peer.AddKnownInventory(iv)
975 // Ignore inventory when we're in headers-first mode.
976 if sm.headersFirstMode {
980 // Request the inventory if we don't already have it.
981 haveInv, err := sm.haveInventory(iv)
983 log.Warnf("Unexpected failure when checking for "+
984 "existing inventory during inv message "+
985 "processing: %v", err)
989 if iv.Type == wire.InvTypeTx {
990 // Skip the transaction if it has already been
992 if _, exists := sm.rejectedTxns[iv.Hash]; exists {
997 // Ignore invs block invs from non-witness enabled
998 // peers, as after segwit activation we only want to
999 // download from peers that can provide us full witness
1001 if !peer.IsWitnessEnabled() && iv.Type == wire.InvTypeBlock {
1005 // Add it to the request queue.
1006 state.requestQueue = append(state.requestQueue, iv)
1010 if iv.Type == wire.InvTypeBlock {
1011 // The block is an orphan block that we already have.
1012 // When the existing orphan was processed, it requested
1013 // the missing parent blocks. When this scenario
1014 // happens, it means there were more blocks missing
1015 // than are allowed into a single inventory message. As
1016 // a result, once this peer requested the final
1017 // advertised block, the remote peer noticed and is now
1018 // resending the orphan block as an available block
1019 // to signal there are more missing blocks that need to
1021 if sm.chain.IsKnownOrphan(&iv.Hash) {
1022 // Request blocks starting at the latest known
1023 // up to the root of the orphan that just came
1025 orphanRoot := sm.chain.GetOrphanRoot(&iv.Hash)
1026 locator, err := sm.chain.LatestBlockLocator()
1028 log.Errorf("PEER: Failed to get block "+
1029 "locator for the latest block: "+
1033 peer.PushGetBlocksMsg(locator, orphanRoot)
1037 // We already have the final block advertised by this
1038 // inventory message, so force a request for more. This
1039 // should only happen if we're on a really long side
1042 // Request blocks after this one up to the
1043 // final one the remote peer knows about (zero
1045 locator := sm.chain.BlockLocatorFromHash(&iv.Hash)
1046 peer.PushGetBlocksMsg(locator, &zeroHash)
1051 // Request as much as possible at once. Anything that won't fit into
1052 // the request will be requested on the next inv message.
1054 gdmsg := wire.NewMsgGetData()
1055 requestQueue := state.requestQueue
1056 for len(requestQueue) != 0 {
1057 iv := requestQueue[0]
1058 requestQueue[0] = nil
1059 requestQueue = requestQueue[1:]
1062 case wire.InvTypeWitnessBlock:
1064 case wire.InvTypeBlock:
1065 // Request the block if there is not already a pending
1067 if _, exists := sm.requestedBlocks[iv.Hash]; !exists {
1068 sm.requestedBlocks[iv.Hash] = struct{}{}
1069 sm.limitMap(sm.requestedBlocks, maxRequestedBlocks)
1070 state.requestedBlocks[iv.Hash] = struct{}{}
1072 if peer.IsWitnessEnabled() {
1073 iv.Type = wire.InvTypeWitnessBlock
1076 gdmsg.AddInvVect(iv)
1080 case wire.InvTypeWitnessTx:
1082 case wire.InvTypeTx:
1083 // Request the transaction if there is not already a
1085 if _, exists := sm.requestedTxns[iv.Hash]; !exists {
1086 sm.requestedTxns[iv.Hash] = struct{}{}
1087 sm.limitMap(sm.requestedTxns, maxRequestedTxns)
1088 state.requestedTxns[iv.Hash] = struct{}{}
1090 // If the peer is capable, request the txn
1091 // including all witness data.
1092 if peer.IsWitnessEnabled() {
1093 iv.Type = wire.InvTypeWitnessTx
1096 gdmsg.AddInvVect(iv)
1101 if numRequested >= wire.MaxInvPerMsg {
1105 state.requestQueue = requestQueue
1106 if len(gdmsg.InvList) > 0 {
1107 peer.QueueMessage(gdmsg, nil)
1111 // limitMap is a helper function for maps that require a maximum limit by
1112 // evicting a random transaction if adding a new value would cause it to
1113 // overflow the maximum allowed.
1114 func (sm *SyncManager) limitMap(m map[chainhash.Hash]struct{}, limit int) {
1115 if len(m)+1 > limit {
1116 // Remove a random entry from the map. For most compilers, Go's
1117 // range statement iterates starting at a random item although
1118 // that is not 100% guaranteed by the spec. The iteration order
1119 // is not important here because an adversary would have to be
1120 // able to pull off preimage attacks on the hashing function in
1121 // order to target eviction of specific entries anyways.
1122 for txHash := range m {
1129 // blockHandler is the main handler for the sync manager. It must be run as a
1130 // goroutine. It processes block and inv messages in a separate goroutine
1131 // from the peer handlers so the block (MsgBlock) messages are handled by a
1132 // single thread without needing to lock memory data structures. This is
1133 // important because the sync manager controls which blocks are needed and how
1134 // the fetching should proceed.
1135 func (sm *SyncManager) blockHandler() {
1139 case m := <-sm.msgChan:
1140 switch msg := m.(type) {
1142 sm.handleNewPeerMsg(msg.peer)
1146 msg.reply <- struct{}{}
1149 sm.handleBlockMsg(msg)
1150 msg.reply <- struct{}{}
1153 sm.handleInvMsg(msg)
1156 sm.handleHeadersMsg(msg)
1159 sm.handleDonePeerMsg(msg.peer)
1161 case getSyncPeerMsg:
1163 if sm.syncPeer != nil {
1164 peerID = sm.syncPeer.ID()
1168 case processBlockMsg:
1169 _, isOrphan, err := sm.chain.ProcessBlock(
1170 msg.block, msg.flags)
1172 msg.reply <- processBlockResponse{
1178 msg.reply <- processBlockResponse{
1184 msg.reply <- sm.current()
1187 // Wait until the sender unpauses the manager.
1191 log.Warnf("Invalid message type in block "+
1201 log.Trace("Block handler done")
1204 // handleBlockchainNotification handles notifications from blockchain. It does
1205 // things such as request orphan block parents and relay accepted blocks to
1207 func (sm *SyncManager) handleBlockchainNotification(notification *blockchain.Notification) {
1208 switch notification.Type {
1209 // A block has been accepted into the block chain. Relay it to other
1211 case blockchain.NTBlockAccepted:
1212 // Don't relay if we are not current. Other peers that are
1213 // current should already know about it.
1218 block, ok := notification.Data.(*btcutil.Block)
1220 log.Warnf("Chain accepted notification is not a block.")
1224 // Generate the inventory vector and relay it.
1225 iv := wire.NewInvVect(wire.InvTypeBlock, block.Hash())
1226 sm.peerNotifier.RelayInventory(iv, block.MsgBlock().Header)
1228 // A block has been connected to the main block chain.
1229 case blockchain.NTBlockConnected:
1230 block, ok := notification.Data.(*btcutil.Block)
1232 log.Warnf("Chain connected notification is not a block.")
1236 // Remove all of the transactions (except the coinbase) in the
1237 // connected block from the transaction pool. Secondly, remove any
1238 // transactions which are now double spends as a result of these
1239 // new transactions. Finally, remove any transaction that is
1240 // no longer an orphan. Transactions which depend on a confirmed
1241 // transaction are NOT removed recursively because they are still
1243 for _, tx := range block.Transactions()[1:] {
1244 sm.txMemPool.RemoveTransaction(tx, false)
1245 sm.txMemPool.RemoveDoubleSpends(tx)
1246 sm.txMemPool.RemoveOrphan(tx)
1247 sm.peerNotifier.TransactionConfirmed(tx)
1248 acceptedTxs := sm.txMemPool.ProcessOrphans(tx)
1249 sm.peerNotifier.AnnounceNewTransactions(acceptedTxs)
1252 // A block has been disconnected from the main block chain.
1253 case blockchain.NTBlockDisconnected:
1254 block, ok := notification.Data.(*btcutil.Block)
1256 log.Warnf("Chain disconnected notification is not a block.")
1260 // Reinsert all of the transactions (except the coinbase) into
1261 // the transaction pool.
1262 for _, tx := range block.Transactions()[1:] {
1263 _, _, err := sm.txMemPool.MaybeAcceptTransaction(tx,
1266 // Remove the transaction and all transactions
1267 // that depend on it if it wasn't accepted into
1268 // the transaction pool.
1269 sm.txMemPool.RemoveTransaction(tx, true)
1275 // NewPeer informs the sync manager of a newly active peer.
1276 func (sm *SyncManager) NewPeer(peer *peerpkg.Peer) {
1277 // Ignore if we are shutting down.
1278 if atomic.LoadInt32(&sm.shutdown) != 0 {
1281 sm.msgChan <- &newPeerMsg{peer: peer}
1284 // QueueTx adds the passed transaction message and peer to the block handling
1285 // queue. Responds to the done channel argument after the tx message is
1287 func (sm *SyncManager) QueueTx(tx *btcutil.Tx, peer *peerpkg.Peer, done chan struct{}) {
1288 // Don't accept more transactions if we're shutting down.
1289 if atomic.LoadInt32(&sm.shutdown) != 0 {
1294 sm.msgChan <- &txMsg{tx: tx, peer: peer, reply: done}
1297 // QueueBlock adds the passed block message and peer to the block handling
1298 // queue. Responds to the done channel argument after the block message is
1300 func (sm *SyncManager) QueueBlock(block *btcutil.Block, peer *peerpkg.Peer, done chan struct{}) {
1301 // Don't accept more blocks if we're shutting down.
1302 if atomic.LoadInt32(&sm.shutdown) != 0 {
1307 sm.msgChan <- &blockMsg{block: block, peer: peer, reply: done}
1310 // QueueInv adds the passed inv message and peer to the block handling queue.
1311 func (sm *SyncManager) QueueInv(inv *wire.MsgInv, peer *peerpkg.Peer) {
1312 // No channel handling here because peers do not need to block on inv
1314 if atomic.LoadInt32(&sm.shutdown) != 0 {
1318 sm.msgChan <- &invMsg{inv: inv, peer: peer}
1321 // QueueHeaders adds the passed headers message and peer to the block handling
1323 func (sm *SyncManager) QueueHeaders(headers *wire.MsgHeaders, peer *peerpkg.Peer) {
1324 // No channel handling here because peers do not need to block on
1325 // headers messages.
1326 if atomic.LoadInt32(&sm.shutdown) != 0 {
1330 sm.msgChan <- &headersMsg{headers: headers, peer: peer}
1333 // DonePeer informs the blockmanager that a peer has disconnected.
1334 func (sm *SyncManager) DonePeer(peer *peerpkg.Peer) {
1335 // Ignore if we are shutting down.
1336 if atomic.LoadInt32(&sm.shutdown) != 0 {
1340 sm.msgChan <- &donePeerMsg{peer: peer}
1343 // Start begins the core block handler which processes block and inv messages.
1344 func (sm *SyncManager) Start() {
1346 if atomic.AddInt32(&sm.started, 1) != 1 {
1350 log.Trace("Starting sync manager")
1352 go sm.blockHandler()
1355 // Stop gracefully shuts down the sync manager by stopping all asynchronous
1356 // handlers and waiting for them to finish.
1357 func (sm *SyncManager) Stop() error {
1358 if atomic.AddInt32(&sm.shutdown, 1) != 1 {
1359 log.Warnf("Sync manager is already in the process of " +
1364 log.Infof("Sync manager shutting down")
1370 // SyncPeerID returns the ID of the current sync peer, or 0 if there is none.
1371 func (sm *SyncManager) SyncPeerID() int32 {
1372 reply := make(chan int32)
1373 sm.msgChan <- getSyncPeerMsg{reply: reply}
1377 // ProcessBlock makes use of ProcessBlock on an internal instance of a block
1379 func (sm *SyncManager) ProcessBlock(block *btcutil.Block, flags blockchain.BehaviorFlags) (bool, error) {
1380 reply := make(chan processBlockResponse, 1)
1381 sm.msgChan <- processBlockMsg{block: block, flags: flags, reply: reply}
1383 return response.isOrphan, response.err
1386 // IsCurrent returns whether or not the sync manager believes it is synced with
1387 // the connected peers.
1388 func (sm *SyncManager) IsCurrent() bool {
1389 reply := make(chan bool)
1390 sm.msgChan <- isCurrentMsg{reply: reply}
1394 // Pause pauses the sync manager until the returned channel is closed.
1396 // Note that while paused, all peer and block processing is halted. The
1397 // message sender should avoid pausing the sync manager for long durations.
1398 func (sm *SyncManager) Pause() chan<- struct{} {
1399 c := make(chan struct{})
1400 sm.msgChan <- pauseMsg{c}
1404 // New constructs a new SyncManager. Use Start to begin processing asynchronous
1405 // block, tx, and inv updates.
1406 func New(config *Config) (*SyncManager, error) {
1408 peerNotifier: config.PeerNotifier,
1409 chain: config.Chain,
1410 txMemPool: config.TxMemPool,
1411 chainParams: config.ChainParams,
1412 rejectedTxns: make(map[chainhash.Hash]struct{}),
1413 requestedTxns: make(map[chainhash.Hash]struct{}),
1414 requestedBlocks: make(map[chainhash.Hash]struct{}),
1415 peerStates: make(map[*peerpkg.Peer]*peerSyncState),
1416 progressLogger: newBlockProgressLogger("Processed", log),
1417 msgChan: make(chan interface{}, config.MaxPeers*3),
1418 headerList: list.New(),
1419 quit: make(chan struct{}),
1422 best := sm.chain.BestSnapshot()
1423 if !config.DisableCheckpoints {
1424 // Initialize the next checkpoint based on the current height.
1425 sm.nextCheckpoint = sm.findNextHeaderCheckpoint(best.Height)
1426 if sm.nextCheckpoint != nil {
1427 sm.resetHeaderState(&best.Hash, best.Height)
1430 log.Info("Checkpoints are disabled")
1433 sm.chain.Subscribe(sm.handleBlockchainNotification)