OSDN Git Service

new repo
[bytom/vapor.git] / vendor / github.com / btcsuite / btcd / netsync / manager.go
1 // Copyright (c) 2013-2017 The btcsuite developers
2 // Use of this source code is governed by an ISC
3 // license that can be found in the LICENSE file.
4
5 package netsync
6
7 import (
8         "container/list"
9         "net"
10         "sync"
11         "sync/atomic"
12         "time"
13
14         "github.com/btcsuite/btcd/blockchain"
15         "github.com/btcsuite/btcd/chaincfg"
16         "github.com/btcsuite/btcd/chaincfg/chainhash"
17         "github.com/btcsuite/btcd/database"
18         "github.com/btcsuite/btcd/mempool"
19         peerpkg "github.com/btcsuite/btcd/peer"
20         "github.com/btcsuite/btcd/wire"
21         "github.com/btcsuite/btcutil"
22 )
23
24 const (
25         // minInFlightBlocks is the minimum number of blocks that should be
26         // in the request queue for headers-first mode before requesting
27         // more.
28         minInFlightBlocks = 10
29
30         // maxRejectedTxns is the maximum number of rejected transactions
31         // hashes to store in memory.
32         maxRejectedTxns = 1000
33
34         // maxRequestedBlocks is the maximum number of requested block
35         // hashes to store in memory.
36         maxRequestedBlocks = wire.MaxInvPerMsg
37
38         // maxRequestedTxns is the maximum number of requested transactions
39         // hashes to store in memory.
40         maxRequestedTxns = wire.MaxInvPerMsg
41 )
42
43 // zeroHash is the zero value hash (all zeros).  It is defined as a convenience.
44 var zeroHash chainhash.Hash
45
46 // newPeerMsg signifies a newly connected peer to the block handler.
47 type newPeerMsg struct {
48         peer *peerpkg.Peer
49 }
50
51 // blockMsg packages a bitcoin block message and the peer it came from together
52 // so the block handler has access to that information.
53 type blockMsg struct {
54         block *btcutil.Block
55         peer  *peerpkg.Peer
56         reply chan struct{}
57 }
58
59 // invMsg packages a bitcoin inv message and the peer it came from together
60 // so the block handler has access to that information.
61 type invMsg struct {
62         inv  *wire.MsgInv
63         peer *peerpkg.Peer
64 }
65
66 // headersMsg packages a bitcoin headers message and the peer it came from
67 // together so the block handler has access to that information.
68 type headersMsg struct {
69         headers *wire.MsgHeaders
70         peer    *peerpkg.Peer
71 }
72
73 // donePeerMsg signifies a newly disconnected peer to the block handler.
74 type donePeerMsg struct {
75         peer *peerpkg.Peer
76 }
77
78 // txMsg packages a bitcoin tx message and the peer it came from together
79 // so the block handler has access to that information.
80 type txMsg struct {
81         tx    *btcutil.Tx
82         peer  *peerpkg.Peer
83         reply chan struct{}
84 }
85
86 // getSyncPeerMsg is a message type to be sent across the message channel for
87 // retrieving the current sync peer.
88 type getSyncPeerMsg struct {
89         reply chan int32
90 }
91
92 // processBlockResponse is a response sent to the reply channel of a
93 // processBlockMsg.
94 type processBlockResponse struct {
95         isOrphan bool
96         err      error
97 }
98
99 // processBlockMsg is a message type to be sent across the message channel
100 // for requested a block is processed.  Note this call differs from blockMsg
101 // above in that blockMsg is intended for blocks that came from peers and have
102 // extra handling whereas this message essentially is just a concurrent safe
103 // way to call ProcessBlock on the internal block chain instance.
104 type processBlockMsg struct {
105         block *btcutil.Block
106         flags blockchain.BehaviorFlags
107         reply chan processBlockResponse
108 }
109
110 // isCurrentMsg is a message type to be sent across the message channel for
111 // requesting whether or not the sync manager believes it is synced with the
112 // currently connected peers.
113 type isCurrentMsg struct {
114         reply chan bool
115 }
116
117 // pauseMsg is a message type to be sent across the message channel for
118 // pausing the sync manager.  This effectively provides the caller with
119 // exclusive access over the manager until a receive is performed on the
120 // unpause channel.
121 type pauseMsg struct {
122         unpause <-chan struct{}
123 }
124
125 // headerNode is used as a node in a list of headers that are linked together
126 // between checkpoints.
127 type headerNode struct {
128         height int32
129         hash   *chainhash.Hash
130 }
131
132 // peerSyncState stores additional information that the SyncManager tracks
133 // about a peer.
134 type peerSyncState struct {
135         syncCandidate   bool
136         requestQueue    []*wire.InvVect
137         requestedTxns   map[chainhash.Hash]struct{}
138         requestedBlocks map[chainhash.Hash]struct{}
139 }
140
141 // SyncManager is used to communicate block related messages with peers. The
142 // SyncManager is started as by executing Start() in a goroutine. Once started,
143 // it selects peers to sync from and starts the initial block download. Once the
144 // chain is in sync, the SyncManager handles incoming block and header
145 // notifications and relays announcements of new blocks to peers.
146 type SyncManager struct {
147         peerNotifier   PeerNotifier
148         started        int32
149         shutdown       int32
150         chain          *blockchain.BlockChain
151         txMemPool      *mempool.TxPool
152         chainParams    *chaincfg.Params
153         progressLogger *blockProgressLogger
154         msgChan        chan interface{}
155         wg             sync.WaitGroup
156         quit           chan struct{}
157
158         // These fields should only be accessed from the blockHandler thread
159         rejectedTxns    map[chainhash.Hash]struct{}
160         requestedTxns   map[chainhash.Hash]struct{}
161         requestedBlocks map[chainhash.Hash]struct{}
162         syncPeer        *peerpkg.Peer
163         peerStates      map[*peerpkg.Peer]*peerSyncState
164
165         // The following fields are used for headers-first mode.
166         headersFirstMode bool
167         headerList       *list.List
168         startHeader      *list.Element
169         nextCheckpoint   *chaincfg.Checkpoint
170 }
171
172 // resetHeaderState sets the headers-first mode state to values appropriate for
173 // syncing from a new peer.
174 func (sm *SyncManager) resetHeaderState(newestHash *chainhash.Hash, newestHeight int32) {
175         sm.headersFirstMode = false
176         sm.headerList.Init()
177         sm.startHeader = nil
178
179         // When there is a next checkpoint, add an entry for the latest known
180         // block into the header pool.  This allows the next downloaded header
181         // to prove it links to the chain properly.
182         if sm.nextCheckpoint != nil {
183                 node := headerNode{height: newestHeight, hash: newestHash}
184                 sm.headerList.PushBack(&node)
185         }
186 }
187
188 // findNextHeaderCheckpoint returns the next checkpoint after the passed height.
189 // It returns nil when there is not one either because the height is already
190 // later than the final checkpoint or some other reason such as disabled
191 // checkpoints.
192 func (sm *SyncManager) findNextHeaderCheckpoint(height int32) *chaincfg.Checkpoint {
193         checkpoints := sm.chain.Checkpoints()
194         if len(checkpoints) == 0 {
195                 return nil
196         }
197
198         // There is no next checkpoint if the height is already after the final
199         // checkpoint.
200         finalCheckpoint := &checkpoints[len(checkpoints)-1]
201         if height >= finalCheckpoint.Height {
202                 return nil
203         }
204
205         // Find the next checkpoint.
206         nextCheckpoint := finalCheckpoint
207         for i := len(checkpoints) - 2; i >= 0; i-- {
208                 if height >= checkpoints[i].Height {
209                         break
210                 }
211                 nextCheckpoint = &checkpoints[i]
212         }
213         return nextCheckpoint
214 }
215
216 // startSync will choose the best peer among the available candidate peers to
217 // download/sync the blockchain from.  When syncing is already running, it
218 // simply returns.  It also examines the candidates for any which are no longer
219 // candidates and removes them as needed.
220 func (sm *SyncManager) startSync() {
221         // Return now if we're already syncing.
222         if sm.syncPeer != nil {
223                 return
224         }
225
226         // Once the segwit soft-fork package has activated, we only
227         // want to sync from peers which are witness enabled to ensure
228         // that we fully validate all blockchain data.
229         segwitActive, err := sm.chain.IsDeploymentActive(chaincfg.DeploymentSegwit)
230         if err != nil {
231                 log.Errorf("Unable to query for segwit soft-fork state: %v", err)
232                 return
233         }
234
235         best := sm.chain.BestSnapshot()
236         var bestPeer *peerpkg.Peer
237         for peer, state := range sm.peerStates {
238                 if !state.syncCandidate {
239                         continue
240                 }
241
242                 if segwitActive && !peer.IsWitnessEnabled() {
243                         log.Debugf("peer %v not witness enabled, skipping", peer)
244                         continue
245                 }
246
247                 // Remove sync candidate peers that are no longer candidates due
248                 // to passing their latest known block.  NOTE: The < is
249                 // intentional as opposed to <=.  While technically the peer
250                 // doesn't have a later block when it's equal, it will likely
251                 // have one soon so it is a reasonable choice.  It also allows
252                 // the case where both are at 0 such as during regression test.
253                 if peer.LastBlock() < best.Height {
254                         state.syncCandidate = false
255                         continue
256                 }
257
258                 // TODO(davec): Use a better algorithm to choose the best peer.
259                 // For now, just pick the first available candidate.
260                 bestPeer = peer
261         }
262
263         // Start syncing from the best peer if one was selected.
264         if bestPeer != nil {
265                 // Clear the requestedBlocks if the sync peer changes, otherwise
266                 // we may ignore blocks we need that the last sync peer failed
267                 // to send.
268                 sm.requestedBlocks = make(map[chainhash.Hash]struct{})
269
270                 locator, err := sm.chain.LatestBlockLocator()
271                 if err != nil {
272                         log.Errorf("Failed to get block locator for the "+
273                                 "latest block: %v", err)
274                         return
275                 }
276
277                 log.Infof("Syncing to block height %d from peer %v",
278                         bestPeer.LastBlock(), bestPeer.Addr())
279
280                 // When the current height is less than a known checkpoint we
281                 // can use block headers to learn about which blocks comprise
282                 // the chain up to the checkpoint and perform less validation
283                 // for them.  This is possible since each header contains the
284                 // hash of the previous header and a merkle root.  Therefore if
285                 // we validate all of the received headers link together
286                 // properly and the checkpoint hashes match, we can be sure the
287                 // hashes for the blocks in between are accurate.  Further, once
288                 // the full blocks are downloaded, the merkle root is computed
289                 // and compared against the value in the header which proves the
290                 // full block hasn't been tampered with.
291                 //
292                 // Once we have passed the final checkpoint, or checkpoints are
293                 // disabled, use standard inv messages learn about the blocks
294                 // and fully validate them.  Finally, regression test mode does
295                 // not support the headers-first approach so do normal block
296                 // downloads when in regression test mode.
297                 if sm.nextCheckpoint != nil &&
298                         best.Height < sm.nextCheckpoint.Height &&
299                         sm.chainParams != &chaincfg.RegressionNetParams {
300
301                         bestPeer.PushGetHeadersMsg(locator, sm.nextCheckpoint.Hash)
302                         sm.headersFirstMode = true
303                         log.Infof("Downloading headers for blocks %d to "+
304                                 "%d from peer %s", best.Height+1,
305                                 sm.nextCheckpoint.Height, bestPeer.Addr())
306                 } else {
307                         bestPeer.PushGetBlocksMsg(locator, &zeroHash)
308                 }
309                 sm.syncPeer = bestPeer
310         } else {
311                 log.Warnf("No sync peer candidates available")
312         }
313 }
314
315 // isSyncCandidate returns whether or not the peer is a candidate to consider
316 // syncing from.
317 func (sm *SyncManager) isSyncCandidate(peer *peerpkg.Peer) bool {
318         // Typically a peer is not a candidate for sync if it's not a full node,
319         // however regression test is special in that the regression tool is
320         // not a full node and still needs to be considered a sync candidate.
321         if sm.chainParams == &chaincfg.RegressionNetParams {
322                 // The peer is not a candidate if it's not coming from localhost
323                 // or the hostname can't be determined for some reason.
324                 host, _, err := net.SplitHostPort(peer.Addr())
325                 if err != nil {
326                         return false
327                 }
328
329                 if host != "127.0.0.1" && host != "localhost" {
330                         return false
331                 }
332         } else {
333                 // The peer is not a candidate for sync if it's not a full
334                 // node. Additionally, if the segwit soft-fork package has
335                 // activated, then the peer must also be upgraded.
336                 segwitActive, err := sm.chain.IsDeploymentActive(chaincfg.DeploymentSegwit)
337                 if err != nil {
338                         log.Errorf("Unable to query for segwit "+
339                                 "soft-fork state: %v", err)
340                 }
341                 nodeServices := peer.Services()
342                 if nodeServices&wire.SFNodeNetwork != wire.SFNodeNetwork ||
343                         (segwitActive && !peer.IsWitnessEnabled()) {
344                         return false
345                 }
346         }
347
348         // Candidate if all checks passed.
349         return true
350 }
351
352 // handleNewPeerMsg deals with new peers that have signalled they may
353 // be considered as a sync peer (they have already successfully negotiated).  It
354 // also starts syncing if needed.  It is invoked from the syncHandler goroutine.
355 func (sm *SyncManager) handleNewPeerMsg(peer *peerpkg.Peer) {
356         // Ignore if in the process of shutting down.
357         if atomic.LoadInt32(&sm.shutdown) != 0 {
358                 return
359         }
360
361         log.Infof("New valid peer %s (%s)", peer, peer.UserAgent())
362
363         // Initialize the peer state
364         isSyncCandidate := sm.isSyncCandidate(peer)
365         sm.peerStates[peer] = &peerSyncState{
366                 syncCandidate:   isSyncCandidate,
367                 requestedTxns:   make(map[chainhash.Hash]struct{}),
368                 requestedBlocks: make(map[chainhash.Hash]struct{}),
369         }
370
371         // Start syncing by choosing the best candidate if needed.
372         if isSyncCandidate && sm.syncPeer == nil {
373                 sm.startSync()
374         }
375 }
376
377 // handleDonePeerMsg deals with peers that have signalled they are done.  It
378 // removes the peer as a candidate for syncing and in the case where it was
379 // the current sync peer, attempts to select a new best peer to sync from.  It
380 // is invoked from the syncHandler goroutine.
381 func (sm *SyncManager) handleDonePeerMsg(peer *peerpkg.Peer) {
382         state, exists := sm.peerStates[peer]
383         if !exists {
384                 log.Warnf("Received done peer message for unknown peer %s", peer)
385                 return
386         }
387
388         // Remove the peer from the list of candidate peers.
389         delete(sm.peerStates, peer)
390
391         log.Infof("Lost peer %s", peer)
392
393         // Remove requested transactions from the global map so that they will
394         // be fetched from elsewhere next time we get an inv.
395         for txHash := range state.requestedTxns {
396                 delete(sm.requestedTxns, txHash)
397         }
398
399         // Remove requested blocks from the global map so that they will be
400         // fetched from elsewhere next time we get an inv.
401         // TODO: we could possibly here check which peers have these blocks
402         // and request them now to speed things up a little.
403         for blockHash := range state.requestedBlocks {
404                 delete(sm.requestedBlocks, blockHash)
405         }
406
407         // Attempt to find a new peer to sync from if the quitting peer is the
408         // sync peer.  Also, reset the headers-first state if in headers-first
409         // mode so
410         if sm.syncPeer == peer {
411                 sm.syncPeer = nil
412                 if sm.headersFirstMode {
413                         best := sm.chain.BestSnapshot()
414                         sm.resetHeaderState(&best.Hash, best.Height)
415                 }
416                 sm.startSync()
417         }
418 }
419
420 // handleTxMsg handles transaction messages from all peers.
421 func (sm *SyncManager) handleTxMsg(tmsg *txMsg) {
422         peer := tmsg.peer
423         state, exists := sm.peerStates[peer]
424         if !exists {
425                 log.Warnf("Received tx message from unknown peer %s", peer)
426                 return
427         }
428
429         // NOTE:  BitcoinJ, and possibly other wallets, don't follow the spec of
430         // sending an inventory message and allowing the remote peer to decide
431         // whether or not they want to request the transaction via a getdata
432         // message.  Unfortunately, the reference implementation permits
433         // unrequested data, so it has allowed wallets that don't follow the
434         // spec to proliferate.  While this is not ideal, there is no check here
435         // to disconnect peers for sending unsolicited transactions to provide
436         // interoperability.
437         txHash := tmsg.tx.Hash()
438
439         // Ignore transactions that we have already rejected.  Do not
440         // send a reject message here because if the transaction was already
441         // rejected, the transaction was unsolicited.
442         if _, exists = sm.rejectedTxns[*txHash]; exists {
443                 log.Debugf("Ignoring unsolicited previously rejected "+
444                         "transaction %v from %s", txHash, peer)
445                 return
446         }
447
448         // Process the transaction to include validation, insertion in the
449         // memory pool, orphan handling, etc.
450         acceptedTxs, err := sm.txMemPool.ProcessTransaction(tmsg.tx,
451                 true, true, mempool.Tag(peer.ID()))
452
453         // Remove transaction from request maps. Either the mempool/chain
454         // already knows about it and as such we shouldn't have any more
455         // instances of trying to fetch it, or we failed to insert and thus
456         // we'll retry next time we get an inv.
457         delete(state.requestedTxns, *txHash)
458         delete(sm.requestedTxns, *txHash)
459
460         if err != nil {
461                 // Do not request this transaction again until a new block
462                 // has been processed.
463                 sm.rejectedTxns[*txHash] = struct{}{}
464                 sm.limitMap(sm.rejectedTxns, maxRejectedTxns)
465
466                 // When the error is a rule error, it means the transaction was
467                 // simply rejected as opposed to something actually going wrong,
468                 // so log it as such.  Otherwise, something really did go wrong,
469                 // so log it as an actual error.
470                 if _, ok := err.(mempool.RuleError); ok {
471                         log.Debugf("Rejected transaction %v from %s: %v",
472                                 txHash, peer, err)
473                 } else {
474                         log.Errorf("Failed to process transaction %v: %v",
475                                 txHash, err)
476                 }
477
478                 // Convert the error into an appropriate reject message and
479                 // send it.
480                 code, reason := mempool.ErrToRejectErr(err)
481                 peer.PushRejectMsg(wire.CmdTx, code, reason, txHash, false)
482                 return
483         }
484
485         sm.peerNotifier.AnnounceNewTransactions(acceptedTxs)
486 }
487
488 // current returns true if we believe we are synced with our peers, false if we
489 // still have blocks to check
490 func (sm *SyncManager) current() bool {
491         if !sm.chain.IsCurrent() {
492                 return false
493         }
494
495         // if blockChain thinks we are current and we have no syncPeer it
496         // is probably right.
497         if sm.syncPeer == nil {
498                 return true
499         }
500
501         // No matter what chain thinks, if we are below the block we are syncing
502         // to we are not current.
503         if sm.chain.BestSnapshot().Height < sm.syncPeer.LastBlock() {
504                 return false
505         }
506         return true
507 }
508
509 // handleBlockMsg handles block messages from all peers.
510 func (sm *SyncManager) handleBlockMsg(bmsg *blockMsg) {
511         peer := bmsg.peer
512         state, exists := sm.peerStates[peer]
513         if !exists {
514                 log.Warnf("Received block message from unknown peer %s", peer)
515                 return
516         }
517
518         // If we didn't ask for this block then the peer is misbehaving.
519         blockHash := bmsg.block.Hash()
520         if _, exists = state.requestedBlocks[*blockHash]; !exists {
521                 // The regression test intentionally sends some blocks twice
522                 // to test duplicate block insertion fails.  Don't disconnect
523                 // the peer or ignore the block when we're in regression test
524                 // mode in this case so the chain code is actually fed the
525                 // duplicate blocks.
526                 if sm.chainParams != &chaincfg.RegressionNetParams {
527                         log.Warnf("Got unrequested block %v from %s -- "+
528                                 "disconnecting", blockHash, peer.Addr())
529                         peer.Disconnect()
530                         return
531                 }
532         }
533
534         // When in headers-first mode, if the block matches the hash of the
535         // first header in the list of headers that are being fetched, it's
536         // eligible for less validation since the headers have already been
537         // verified to link together and are valid up to the next checkpoint.
538         // Also, remove the list entry for all blocks except the checkpoint
539         // since it is needed to verify the next round of headers links
540         // properly.
541         isCheckpointBlock := false
542         behaviorFlags := blockchain.BFNone
543         if sm.headersFirstMode {
544                 firstNodeEl := sm.headerList.Front()
545                 if firstNodeEl != nil {
546                         firstNode := firstNodeEl.Value.(*headerNode)
547                         if blockHash.IsEqual(firstNode.hash) {
548                                 behaviorFlags |= blockchain.BFFastAdd
549                                 if firstNode.hash.IsEqual(sm.nextCheckpoint.Hash) {
550                                         isCheckpointBlock = true
551                                 } else {
552                                         sm.headerList.Remove(firstNodeEl)
553                                 }
554                         }
555                 }
556         }
557
558         // Remove block from request maps. Either chain will know about it and
559         // so we shouldn't have any more instances of trying to fetch it, or we
560         // will fail the insert and thus we'll retry next time we get an inv.
561         delete(state.requestedBlocks, *blockHash)
562         delete(sm.requestedBlocks, *blockHash)
563
564         // Process the block to include validation, best chain selection, orphan
565         // handling, etc.
566         _, isOrphan, err := sm.chain.ProcessBlock(bmsg.block, behaviorFlags)
567         if err != nil {
568                 // When the error is a rule error, it means the block was simply
569                 // rejected as opposed to something actually going wrong, so log
570                 // it as such.  Otherwise, something really did go wrong, so log
571                 // it as an actual error.
572                 if _, ok := err.(blockchain.RuleError); ok {
573                         log.Infof("Rejected block %v from %s: %v", blockHash,
574                                 peer, err)
575                 } else {
576                         log.Errorf("Failed to process block %v: %v",
577                                 blockHash, err)
578                 }
579                 if dbErr, ok := err.(database.Error); ok && dbErr.ErrorCode ==
580                         database.ErrCorruption {
581                         panic(dbErr)
582                 }
583
584                 // Convert the error into an appropriate reject message and
585                 // send it.
586                 code, reason := mempool.ErrToRejectErr(err)
587                 peer.PushRejectMsg(wire.CmdBlock, code, reason, blockHash, false)
588                 return
589         }
590
591         // Meta-data about the new block this peer is reporting. We use this
592         // below to update this peer's lastest block height and the heights of
593         // other peers based on their last announced block hash. This allows us
594         // to dynamically update the block heights of peers, avoiding stale
595         // heights when looking for a new sync peer. Upon acceptance of a block
596         // or recognition of an orphan, we also use this information to update
597         // the block heights over other peers who's invs may have been ignored
598         // if we are actively syncing while the chain is not yet current or
599         // who may have lost the lock announcment race.
600         var heightUpdate int32
601         var blkHashUpdate *chainhash.Hash
602
603         // Request the parents for the orphan block from the peer that sent it.
604         if isOrphan {
605                 // We've just received an orphan block from a peer. In order
606                 // to update the height of the peer, we try to extract the
607                 // block height from the scriptSig of the coinbase transaction.
608                 // Extraction is only attempted if the block's version is
609                 // high enough (ver 2+).
610                 header := &bmsg.block.MsgBlock().Header
611                 if blockchain.ShouldHaveSerializedBlockHeight(header) {
612                         coinbaseTx := bmsg.block.Transactions()[0]
613                         cbHeight, err := blockchain.ExtractCoinbaseHeight(coinbaseTx)
614                         if err != nil {
615                                 log.Warnf("Unable to extract height from "+
616                                         "coinbase tx: %v", err)
617                         } else {
618                                 log.Debugf("Extracted height of %v from "+
619                                         "orphan block", cbHeight)
620                                 heightUpdate = cbHeight
621                                 blkHashUpdate = blockHash
622                         }
623                 }
624
625                 orphanRoot := sm.chain.GetOrphanRoot(blockHash)
626                 locator, err := sm.chain.LatestBlockLocator()
627                 if err != nil {
628                         log.Warnf("Failed to get block locator for the "+
629                                 "latest block: %v", err)
630                 } else {
631                         peer.PushGetBlocksMsg(locator, orphanRoot)
632                 }
633         } else {
634                 // When the block is not an orphan, log information about it and
635                 // update the chain state.
636                 sm.progressLogger.LogBlockHeight(bmsg.block)
637
638                 // Update this peer's latest block height, for future
639                 // potential sync node candidacy.
640                 best := sm.chain.BestSnapshot()
641                 heightUpdate = best.Height
642                 blkHashUpdate = &best.Hash
643
644                 // Clear the rejected transactions.
645                 sm.rejectedTxns = make(map[chainhash.Hash]struct{})
646         }
647
648         // Update the block height for this peer. But only send a message to
649         // the server for updating peer heights if this is an orphan or our
650         // chain is "current". This avoids sending a spammy amount of messages
651         // if we're syncing the chain from scratch.
652         if blkHashUpdate != nil && heightUpdate != 0 {
653                 peer.UpdateLastBlockHeight(heightUpdate)
654                 if isOrphan || sm.current() {
655                         go sm.peerNotifier.UpdatePeerHeights(blkHashUpdate, heightUpdate,
656                                 peer)
657                 }
658         }
659
660         // Nothing more to do if we aren't in headers-first mode.
661         if !sm.headersFirstMode {
662                 return
663         }
664
665         // This is headers-first mode, so if the block is not a checkpoint
666         // request more blocks using the header list when the request queue is
667         // getting short.
668         if !isCheckpointBlock {
669                 if sm.startHeader != nil &&
670                         len(state.requestedBlocks) < minInFlightBlocks {
671                         sm.fetchHeaderBlocks()
672                 }
673                 return
674         }
675
676         // This is headers-first mode and the block is a checkpoint.  When
677         // there is a next checkpoint, get the next round of headers by asking
678         // for headers starting from the block after this one up to the next
679         // checkpoint.
680         prevHeight := sm.nextCheckpoint.Height
681         prevHash := sm.nextCheckpoint.Hash
682         sm.nextCheckpoint = sm.findNextHeaderCheckpoint(prevHeight)
683         if sm.nextCheckpoint != nil {
684                 locator := blockchain.BlockLocator([]*chainhash.Hash{prevHash})
685                 err := peer.PushGetHeadersMsg(locator, sm.nextCheckpoint.Hash)
686                 if err != nil {
687                         log.Warnf("Failed to send getheaders message to "+
688                                 "peer %s: %v", peer.Addr(), err)
689                         return
690                 }
691                 log.Infof("Downloading headers for blocks %d to %d from "+
692                         "peer %s", prevHeight+1, sm.nextCheckpoint.Height,
693                         sm.syncPeer.Addr())
694                 return
695         }
696
697         // This is headers-first mode, the block is a checkpoint, and there are
698         // no more checkpoints, so switch to normal mode by requesting blocks
699         // from the block after this one up to the end of the chain (zero hash).
700         sm.headersFirstMode = false
701         sm.headerList.Init()
702         log.Infof("Reached the final checkpoint -- switching to normal mode")
703         locator := blockchain.BlockLocator([]*chainhash.Hash{blockHash})
704         err = peer.PushGetBlocksMsg(locator, &zeroHash)
705         if err != nil {
706                 log.Warnf("Failed to send getblocks message to peer %s: %v",
707                         peer.Addr(), err)
708                 return
709         }
710 }
711
712 // fetchHeaderBlocks creates and sends a request to the syncPeer for the next
713 // list of blocks to be downloaded based on the current list of headers.
714 func (sm *SyncManager) fetchHeaderBlocks() {
715         // Nothing to do if there is no start header.
716         if sm.startHeader == nil {
717                 log.Warnf("fetchHeaderBlocks called with no start header")
718                 return
719         }
720
721         // Build up a getdata request for the list of blocks the headers
722         // describe.  The size hint will be limited to wire.MaxInvPerMsg by
723         // the function, so no need to double check it here.
724         gdmsg := wire.NewMsgGetDataSizeHint(uint(sm.headerList.Len()))
725         numRequested := 0
726         for e := sm.startHeader; e != nil; e = e.Next() {
727                 node, ok := e.Value.(*headerNode)
728                 if !ok {
729                         log.Warn("Header list node type is not a headerNode")
730                         continue
731                 }
732
733                 iv := wire.NewInvVect(wire.InvTypeBlock, node.hash)
734                 haveInv, err := sm.haveInventory(iv)
735                 if err != nil {
736                         log.Warnf("Unexpected failure when checking for "+
737                                 "existing inventory during header block "+
738                                 "fetch: %v", err)
739                 }
740                 if !haveInv {
741                         syncPeerState := sm.peerStates[sm.syncPeer]
742
743                         sm.requestedBlocks[*node.hash] = struct{}{}
744                         syncPeerState.requestedBlocks[*node.hash] = struct{}{}
745
746                         // If we're fetching from a witness enabled peer
747                         // post-fork, then ensure that we receive all the
748                         // witness data in the blocks.
749                         if sm.syncPeer.IsWitnessEnabled() {
750                                 iv.Type = wire.InvTypeWitnessBlock
751                         }
752
753                         gdmsg.AddInvVect(iv)
754                         numRequested++
755                 }
756                 sm.startHeader = e.Next()
757                 if numRequested >= wire.MaxInvPerMsg {
758                         break
759                 }
760         }
761         if len(gdmsg.InvList) > 0 {
762                 sm.syncPeer.QueueMessage(gdmsg, nil)
763         }
764 }
765
766 // handleHeadersMsg handles block header messages from all peers.  Headers are
767 // requested when performing a headers-first sync.
768 func (sm *SyncManager) handleHeadersMsg(hmsg *headersMsg) {
769         peer := hmsg.peer
770         _, exists := sm.peerStates[peer]
771         if !exists {
772                 log.Warnf("Received headers message from unknown peer %s", peer)
773                 return
774         }
775
776         // The remote peer is misbehaving if we didn't request headers.
777         msg := hmsg.headers
778         numHeaders := len(msg.Headers)
779         if !sm.headersFirstMode {
780                 log.Warnf("Got %d unrequested headers from %s -- "+
781                         "disconnecting", numHeaders, peer.Addr())
782                 peer.Disconnect()
783                 return
784         }
785
786         // Nothing to do for an empty headers message.
787         if numHeaders == 0 {
788                 return
789         }
790
791         // Process all of the received headers ensuring each one connects to the
792         // previous and that checkpoints match.
793         receivedCheckpoint := false
794         var finalHash *chainhash.Hash
795         for _, blockHeader := range msg.Headers {
796                 blockHash := blockHeader.BlockHash()
797                 finalHash = &blockHash
798
799                 // Ensure there is a previous header to compare against.
800                 prevNodeEl := sm.headerList.Back()
801                 if prevNodeEl == nil {
802                         log.Warnf("Header list does not contain a previous" +
803                                 "element as expected -- disconnecting peer")
804                         peer.Disconnect()
805                         return
806                 }
807
808                 // Ensure the header properly connects to the previous one and
809                 // add it to the list of headers.
810                 node := headerNode{hash: &blockHash}
811                 prevNode := prevNodeEl.Value.(*headerNode)
812                 if prevNode.hash.IsEqual(&blockHeader.PrevBlock) {
813                         node.height = prevNode.height + 1
814                         e := sm.headerList.PushBack(&node)
815                         if sm.startHeader == nil {
816                                 sm.startHeader = e
817                         }
818                 } else {
819                         log.Warnf("Received block header that does not "+
820                                 "properly connect to the chain from peer %s "+
821                                 "-- disconnecting", peer.Addr())
822                         peer.Disconnect()
823                         return
824                 }
825
826                 // Verify the header at the next checkpoint height matches.
827                 if node.height == sm.nextCheckpoint.Height {
828                         if node.hash.IsEqual(sm.nextCheckpoint.Hash) {
829                                 receivedCheckpoint = true
830                                 log.Infof("Verified downloaded block "+
831                                         "header against checkpoint at height "+
832                                         "%d/hash %s", node.height, node.hash)
833                         } else {
834                                 log.Warnf("Block header at height %d/hash "+
835                                         "%s from peer %s does NOT match "+
836                                         "expected checkpoint hash of %s -- "+
837                                         "disconnecting", node.height,
838                                         node.hash, peer.Addr(),
839                                         sm.nextCheckpoint.Hash)
840                                 peer.Disconnect()
841                                 return
842                         }
843                         break
844                 }
845         }
846
847         // When this header is a checkpoint, switch to fetching the blocks for
848         // all of the headers since the last checkpoint.
849         if receivedCheckpoint {
850                 // Since the first entry of the list is always the final block
851                 // that is already in the database and is only used to ensure
852                 // the next header links properly, it must be removed before
853                 // fetching the blocks.
854                 sm.headerList.Remove(sm.headerList.Front())
855                 log.Infof("Received %v block headers: Fetching blocks",
856                         sm.headerList.Len())
857                 sm.progressLogger.SetLastLogTime(time.Now())
858                 sm.fetchHeaderBlocks()
859                 return
860         }
861
862         // This header is not a checkpoint, so request the next batch of
863         // headers starting from the latest known header and ending with the
864         // next checkpoint.
865         locator := blockchain.BlockLocator([]*chainhash.Hash{finalHash})
866         err := peer.PushGetHeadersMsg(locator, sm.nextCheckpoint.Hash)
867         if err != nil {
868                 log.Warnf("Failed to send getheaders message to "+
869                         "peer %s: %v", peer.Addr(), err)
870                 return
871         }
872 }
873
874 // haveInventory returns whether or not the inventory represented by the passed
875 // inventory vector is known.  This includes checking all of the various places
876 // inventory can be when it is in different states such as blocks that are part
877 // of the main chain, on a side chain, in the orphan pool, and transactions that
878 // are in the memory pool (either the main pool or orphan pool).
879 func (sm *SyncManager) haveInventory(invVect *wire.InvVect) (bool, error) {
880         switch invVect.Type {
881         case wire.InvTypeWitnessBlock:
882                 fallthrough
883         case wire.InvTypeBlock:
884                 // Ask chain if the block is known to it in any form (main
885                 // chain, side chain, or orphan).
886                 return sm.chain.HaveBlock(&invVect.Hash)
887
888         case wire.InvTypeWitnessTx:
889                 fallthrough
890         case wire.InvTypeTx:
891                 // Ask the transaction memory pool if the transaction is known
892                 // to it in any form (main pool or orphan).
893                 if sm.txMemPool.HaveTransaction(&invVect.Hash) {
894                         return true, nil
895                 }
896
897                 // Check if the transaction exists from the point of view of the
898                 // end of the main chain.
899                 entry, err := sm.chain.FetchUtxoEntry(&invVect.Hash)
900                 if err != nil {
901                         return false, err
902                 }
903                 return entry != nil && !entry.IsFullySpent(), nil
904         }
905
906         // The requested inventory is is an unsupported type, so just claim
907         // it is known to avoid requesting it.
908         return true, nil
909 }
910
911 // handleInvMsg handles inv messages from all peers.
912 // We examine the inventory advertised by the remote peer and act accordingly.
913 func (sm *SyncManager) handleInvMsg(imsg *invMsg) {
914         peer := imsg.peer
915         state, exists := sm.peerStates[peer]
916         if !exists {
917                 log.Warnf("Received inv message from unknown peer %s", peer)
918                 return
919         }
920
921         // Attempt to find the final block in the inventory list.  There may
922         // not be one.
923         lastBlock := -1
924         invVects := imsg.inv.InvList
925         for i := len(invVects) - 1; i >= 0; i-- {
926                 if invVects[i].Type == wire.InvTypeBlock {
927                         lastBlock = i
928                         break
929                 }
930         }
931
932         // If this inv contains a block announcement, and this isn't coming from
933         // our current sync peer or we're current, then update the last
934         // announced block for this peer. We'll use this information later to
935         // update the heights of peers based on blocks we've accepted that they
936         // previously announced.
937         if lastBlock != -1 && (peer != sm.syncPeer || sm.current()) {
938                 peer.UpdateLastAnnouncedBlock(&invVects[lastBlock].Hash)
939         }
940
941         // Ignore invs from peers that aren't the sync if we are not current.
942         // Helps prevent fetching a mass of orphans.
943         if peer != sm.syncPeer && !sm.current() {
944                 return
945         }
946
947         // If our chain is current and a peer announces a block we already
948         // know of, then update their current block height.
949         if lastBlock != -1 && sm.current() {
950                 blkHeight, err := sm.chain.BlockHeightByHash(&invVects[lastBlock].Hash)
951                 if err == nil {
952                         peer.UpdateLastBlockHeight(blkHeight)
953                 }
954         }
955
956         // Request the advertised inventory if we don't already have it.  Also,
957         // request parent blocks of orphans if we receive one we already have.
958         // Finally, attempt to detect potential stalls due to long side chains
959         // we already have and request more blocks to prevent them.
960         for i, iv := range invVects {
961                 // Ignore unsupported inventory types.
962                 switch iv.Type {
963                 case wire.InvTypeBlock:
964                 case wire.InvTypeTx:
965                 case wire.InvTypeWitnessBlock:
966                 case wire.InvTypeWitnessTx:
967                 default:
968                         continue
969                 }
970
971                 // Add the inventory to the cache of known inventory
972                 // for the peer.
973                 peer.AddKnownInventory(iv)
974
975                 // Ignore inventory when we're in headers-first mode.
976                 if sm.headersFirstMode {
977                         continue
978                 }
979
980                 // Request the inventory if we don't already have it.
981                 haveInv, err := sm.haveInventory(iv)
982                 if err != nil {
983                         log.Warnf("Unexpected failure when checking for "+
984                                 "existing inventory during inv message "+
985                                 "processing: %v", err)
986                         continue
987                 }
988                 if !haveInv {
989                         if iv.Type == wire.InvTypeTx {
990                                 // Skip the transaction if it has already been
991                                 // rejected.
992                                 if _, exists := sm.rejectedTxns[iv.Hash]; exists {
993                                         continue
994                                 }
995                         }
996
997                         // Ignore invs block invs from non-witness enabled
998                         // peers, as after segwit activation we only want to
999                         // download from peers that can provide us full witness
1000                         // data for blocks.
1001                         if !peer.IsWitnessEnabled() && iv.Type == wire.InvTypeBlock {
1002                                 continue
1003                         }
1004
1005                         // Add it to the request queue.
1006                         state.requestQueue = append(state.requestQueue, iv)
1007                         continue
1008                 }
1009
1010                 if iv.Type == wire.InvTypeBlock {
1011                         // The block is an orphan block that we already have.
1012                         // When the existing orphan was processed, it requested
1013                         // the missing parent blocks.  When this scenario
1014                         // happens, it means there were more blocks missing
1015                         // than are allowed into a single inventory message.  As
1016                         // a result, once this peer requested the final
1017                         // advertised block, the remote peer noticed and is now
1018                         // resending the orphan block as an available block
1019                         // to signal there are more missing blocks that need to
1020                         // be requested.
1021                         if sm.chain.IsKnownOrphan(&iv.Hash) {
1022                                 // Request blocks starting at the latest known
1023                                 // up to the root of the orphan that just came
1024                                 // in.
1025                                 orphanRoot := sm.chain.GetOrphanRoot(&iv.Hash)
1026                                 locator, err := sm.chain.LatestBlockLocator()
1027                                 if err != nil {
1028                                         log.Errorf("PEER: Failed to get block "+
1029                                                 "locator for the latest block: "+
1030                                                 "%v", err)
1031                                         continue
1032                                 }
1033                                 peer.PushGetBlocksMsg(locator, orphanRoot)
1034                                 continue
1035                         }
1036
1037                         // We already have the final block advertised by this
1038                         // inventory message, so force a request for more.  This
1039                         // should only happen if we're on a really long side
1040                         // chain.
1041                         if i == lastBlock {
1042                                 // Request blocks after this one up to the
1043                                 // final one the remote peer knows about (zero
1044                                 // stop hash).
1045                                 locator := sm.chain.BlockLocatorFromHash(&iv.Hash)
1046                                 peer.PushGetBlocksMsg(locator, &zeroHash)
1047                         }
1048                 }
1049         }
1050
1051         // Request as much as possible at once.  Anything that won't fit into
1052         // the request will be requested on the next inv message.
1053         numRequested := 0
1054         gdmsg := wire.NewMsgGetData()
1055         requestQueue := state.requestQueue
1056         for len(requestQueue) != 0 {
1057                 iv := requestQueue[0]
1058                 requestQueue[0] = nil
1059                 requestQueue = requestQueue[1:]
1060
1061                 switch iv.Type {
1062                 case wire.InvTypeWitnessBlock:
1063                         fallthrough
1064                 case wire.InvTypeBlock:
1065                         // Request the block if there is not already a pending
1066                         // request.
1067                         if _, exists := sm.requestedBlocks[iv.Hash]; !exists {
1068                                 sm.requestedBlocks[iv.Hash] = struct{}{}
1069                                 sm.limitMap(sm.requestedBlocks, maxRequestedBlocks)
1070                                 state.requestedBlocks[iv.Hash] = struct{}{}
1071
1072                                 if peer.IsWitnessEnabled() {
1073                                         iv.Type = wire.InvTypeWitnessBlock
1074                                 }
1075
1076                                 gdmsg.AddInvVect(iv)
1077                                 numRequested++
1078                         }
1079
1080                 case wire.InvTypeWitnessTx:
1081                         fallthrough
1082                 case wire.InvTypeTx:
1083                         // Request the transaction if there is not already a
1084                         // pending request.
1085                         if _, exists := sm.requestedTxns[iv.Hash]; !exists {
1086                                 sm.requestedTxns[iv.Hash] = struct{}{}
1087                                 sm.limitMap(sm.requestedTxns, maxRequestedTxns)
1088                                 state.requestedTxns[iv.Hash] = struct{}{}
1089
1090                                 // If the peer is capable, request the txn
1091                                 // including all witness data.
1092                                 if peer.IsWitnessEnabled() {
1093                                         iv.Type = wire.InvTypeWitnessTx
1094                                 }
1095
1096                                 gdmsg.AddInvVect(iv)
1097                                 numRequested++
1098                         }
1099                 }
1100
1101                 if numRequested >= wire.MaxInvPerMsg {
1102                         break
1103                 }
1104         }
1105         state.requestQueue = requestQueue
1106         if len(gdmsg.InvList) > 0 {
1107                 peer.QueueMessage(gdmsg, nil)
1108         }
1109 }
1110
1111 // limitMap is a helper function for maps that require a maximum limit by
1112 // evicting a random transaction if adding a new value would cause it to
1113 // overflow the maximum allowed.
1114 func (sm *SyncManager) limitMap(m map[chainhash.Hash]struct{}, limit int) {
1115         if len(m)+1 > limit {
1116                 // Remove a random entry from the map.  For most compilers, Go's
1117                 // range statement iterates starting at a random item although
1118                 // that is not 100% guaranteed by the spec.  The iteration order
1119                 // is not important here because an adversary would have to be
1120                 // able to pull off preimage attacks on the hashing function in
1121                 // order to target eviction of specific entries anyways.
1122                 for txHash := range m {
1123                         delete(m, txHash)
1124                         return
1125                 }
1126         }
1127 }
1128
1129 // blockHandler is the main handler for the sync manager.  It must be run as a
1130 // goroutine.  It processes block and inv messages in a separate goroutine
1131 // from the peer handlers so the block (MsgBlock) messages are handled by a
1132 // single thread without needing to lock memory data structures.  This is
1133 // important because the sync manager controls which blocks are needed and how
1134 // the fetching should proceed.
1135 func (sm *SyncManager) blockHandler() {
1136 out:
1137         for {
1138                 select {
1139                 case m := <-sm.msgChan:
1140                         switch msg := m.(type) {
1141                         case *newPeerMsg:
1142                                 sm.handleNewPeerMsg(msg.peer)
1143
1144                         case *txMsg:
1145                                 sm.handleTxMsg(msg)
1146                                 msg.reply <- struct{}{}
1147
1148                         case *blockMsg:
1149                                 sm.handleBlockMsg(msg)
1150                                 msg.reply <- struct{}{}
1151
1152                         case *invMsg:
1153                                 sm.handleInvMsg(msg)
1154
1155                         case *headersMsg:
1156                                 sm.handleHeadersMsg(msg)
1157
1158                         case *donePeerMsg:
1159                                 sm.handleDonePeerMsg(msg.peer)
1160
1161                         case getSyncPeerMsg:
1162                                 var peerID int32
1163                                 if sm.syncPeer != nil {
1164                                         peerID = sm.syncPeer.ID()
1165                                 }
1166                                 msg.reply <- peerID
1167
1168                         case processBlockMsg:
1169                                 _, isOrphan, err := sm.chain.ProcessBlock(
1170                                         msg.block, msg.flags)
1171                                 if err != nil {
1172                                         msg.reply <- processBlockResponse{
1173                                                 isOrphan: false,
1174                                                 err:      err,
1175                                         }
1176                                 }
1177
1178                                 msg.reply <- processBlockResponse{
1179                                         isOrphan: isOrphan,
1180                                         err:      nil,
1181                                 }
1182
1183                         case isCurrentMsg:
1184                                 msg.reply <- sm.current()
1185
1186                         case pauseMsg:
1187                                 // Wait until the sender unpauses the manager.
1188                                 <-msg.unpause
1189
1190                         default:
1191                                 log.Warnf("Invalid message type in block "+
1192                                         "handler: %T", msg)
1193                         }
1194
1195                 case <-sm.quit:
1196                         break out
1197                 }
1198         }
1199
1200         sm.wg.Done()
1201         log.Trace("Block handler done")
1202 }
1203
1204 // handleBlockchainNotification handles notifications from blockchain.  It does
1205 // things such as request orphan block parents and relay accepted blocks to
1206 // connected peers.
1207 func (sm *SyncManager) handleBlockchainNotification(notification *blockchain.Notification) {
1208         switch notification.Type {
1209         // A block has been accepted into the block chain.  Relay it to other
1210         // peers.
1211         case blockchain.NTBlockAccepted:
1212                 // Don't relay if we are not current. Other peers that are
1213                 // current should already know about it.
1214                 if !sm.current() {
1215                         return
1216                 }
1217
1218                 block, ok := notification.Data.(*btcutil.Block)
1219                 if !ok {
1220                         log.Warnf("Chain accepted notification is not a block.")
1221                         break
1222                 }
1223
1224                 // Generate the inventory vector and relay it.
1225                 iv := wire.NewInvVect(wire.InvTypeBlock, block.Hash())
1226                 sm.peerNotifier.RelayInventory(iv, block.MsgBlock().Header)
1227
1228         // A block has been connected to the main block chain.
1229         case blockchain.NTBlockConnected:
1230                 block, ok := notification.Data.(*btcutil.Block)
1231                 if !ok {
1232                         log.Warnf("Chain connected notification is not a block.")
1233                         break
1234                 }
1235
1236                 // Remove all of the transactions (except the coinbase) in the
1237                 // connected block from the transaction pool.  Secondly, remove any
1238                 // transactions which are now double spends as a result of these
1239                 // new transactions.  Finally, remove any transaction that is
1240                 // no longer an orphan. Transactions which depend on a confirmed
1241                 // transaction are NOT removed recursively because they are still
1242                 // valid.
1243                 for _, tx := range block.Transactions()[1:] {
1244                         sm.txMemPool.RemoveTransaction(tx, false)
1245                         sm.txMemPool.RemoveDoubleSpends(tx)
1246                         sm.txMemPool.RemoveOrphan(tx)
1247                         sm.peerNotifier.TransactionConfirmed(tx)
1248                         acceptedTxs := sm.txMemPool.ProcessOrphans(tx)
1249                         sm.peerNotifier.AnnounceNewTransactions(acceptedTxs)
1250                 }
1251
1252         // A block has been disconnected from the main block chain.
1253         case blockchain.NTBlockDisconnected:
1254                 block, ok := notification.Data.(*btcutil.Block)
1255                 if !ok {
1256                         log.Warnf("Chain disconnected notification is not a block.")
1257                         break
1258                 }
1259
1260                 // Reinsert all of the transactions (except the coinbase) into
1261                 // the transaction pool.
1262                 for _, tx := range block.Transactions()[1:] {
1263                         _, _, err := sm.txMemPool.MaybeAcceptTransaction(tx,
1264                                 false, false)
1265                         if err != nil {
1266                                 // Remove the transaction and all transactions
1267                                 // that depend on it if it wasn't accepted into
1268                                 // the transaction pool.
1269                                 sm.txMemPool.RemoveTransaction(tx, true)
1270                         }
1271                 }
1272         }
1273 }
1274
1275 // NewPeer informs the sync manager of a newly active peer.
1276 func (sm *SyncManager) NewPeer(peer *peerpkg.Peer) {
1277         // Ignore if we are shutting down.
1278         if atomic.LoadInt32(&sm.shutdown) != 0 {
1279                 return
1280         }
1281         sm.msgChan <- &newPeerMsg{peer: peer}
1282 }
1283
1284 // QueueTx adds the passed transaction message and peer to the block handling
1285 // queue. Responds to the done channel argument after the tx message is
1286 // processed.
1287 func (sm *SyncManager) QueueTx(tx *btcutil.Tx, peer *peerpkg.Peer, done chan struct{}) {
1288         // Don't accept more transactions if we're shutting down.
1289         if atomic.LoadInt32(&sm.shutdown) != 0 {
1290                 done <- struct{}{}
1291                 return
1292         }
1293
1294         sm.msgChan <- &txMsg{tx: tx, peer: peer, reply: done}
1295 }
1296
1297 // QueueBlock adds the passed block message and peer to the block handling
1298 // queue. Responds to the done channel argument after the block message is
1299 // processed.
1300 func (sm *SyncManager) QueueBlock(block *btcutil.Block, peer *peerpkg.Peer, done chan struct{}) {
1301         // Don't accept more blocks if we're shutting down.
1302         if atomic.LoadInt32(&sm.shutdown) != 0 {
1303                 done <- struct{}{}
1304                 return
1305         }
1306
1307         sm.msgChan <- &blockMsg{block: block, peer: peer, reply: done}
1308 }
1309
1310 // QueueInv adds the passed inv message and peer to the block handling queue.
1311 func (sm *SyncManager) QueueInv(inv *wire.MsgInv, peer *peerpkg.Peer) {
1312         // No channel handling here because peers do not need to block on inv
1313         // messages.
1314         if atomic.LoadInt32(&sm.shutdown) != 0 {
1315                 return
1316         }
1317
1318         sm.msgChan <- &invMsg{inv: inv, peer: peer}
1319 }
1320
1321 // QueueHeaders adds the passed headers message and peer to the block handling
1322 // queue.
1323 func (sm *SyncManager) QueueHeaders(headers *wire.MsgHeaders, peer *peerpkg.Peer) {
1324         // No channel handling here because peers do not need to block on
1325         // headers messages.
1326         if atomic.LoadInt32(&sm.shutdown) != 0 {
1327                 return
1328         }
1329
1330         sm.msgChan <- &headersMsg{headers: headers, peer: peer}
1331 }
1332
1333 // DonePeer informs the blockmanager that a peer has disconnected.
1334 func (sm *SyncManager) DonePeer(peer *peerpkg.Peer) {
1335         // Ignore if we are shutting down.
1336         if atomic.LoadInt32(&sm.shutdown) != 0 {
1337                 return
1338         }
1339
1340         sm.msgChan <- &donePeerMsg{peer: peer}
1341 }
1342
1343 // Start begins the core block handler which processes block and inv messages.
1344 func (sm *SyncManager) Start() {
1345         // Already started?
1346         if atomic.AddInt32(&sm.started, 1) != 1 {
1347                 return
1348         }
1349
1350         log.Trace("Starting sync manager")
1351         sm.wg.Add(1)
1352         go sm.blockHandler()
1353 }
1354
1355 // Stop gracefully shuts down the sync manager by stopping all asynchronous
1356 // handlers and waiting for them to finish.
1357 func (sm *SyncManager) Stop() error {
1358         if atomic.AddInt32(&sm.shutdown, 1) != 1 {
1359                 log.Warnf("Sync manager is already in the process of " +
1360                         "shutting down")
1361                 return nil
1362         }
1363
1364         log.Infof("Sync manager shutting down")
1365         close(sm.quit)
1366         sm.wg.Wait()
1367         return nil
1368 }
1369
1370 // SyncPeerID returns the ID of the current sync peer, or 0 if there is none.
1371 func (sm *SyncManager) SyncPeerID() int32 {
1372         reply := make(chan int32)
1373         sm.msgChan <- getSyncPeerMsg{reply: reply}
1374         return <-reply
1375 }
1376
1377 // ProcessBlock makes use of ProcessBlock on an internal instance of a block
1378 // chain.
1379 func (sm *SyncManager) ProcessBlock(block *btcutil.Block, flags blockchain.BehaviorFlags) (bool, error) {
1380         reply := make(chan processBlockResponse, 1)
1381         sm.msgChan <- processBlockMsg{block: block, flags: flags, reply: reply}
1382         response := <-reply
1383         return response.isOrphan, response.err
1384 }
1385
1386 // IsCurrent returns whether or not the sync manager believes it is synced with
1387 // the connected peers.
1388 func (sm *SyncManager) IsCurrent() bool {
1389         reply := make(chan bool)
1390         sm.msgChan <- isCurrentMsg{reply: reply}
1391         return <-reply
1392 }
1393
1394 // Pause pauses the sync manager until the returned channel is closed.
1395 //
1396 // Note that while paused, all peer and block processing is halted.  The
1397 // message sender should avoid pausing the sync manager for long durations.
1398 func (sm *SyncManager) Pause() chan<- struct{} {
1399         c := make(chan struct{})
1400         sm.msgChan <- pauseMsg{c}
1401         return c
1402 }
1403
1404 // New constructs a new SyncManager. Use Start to begin processing asynchronous
1405 // block, tx, and inv updates.
1406 func New(config *Config) (*SyncManager, error) {
1407         sm := SyncManager{
1408                 peerNotifier:    config.PeerNotifier,
1409                 chain:           config.Chain,
1410                 txMemPool:       config.TxMemPool,
1411                 chainParams:     config.ChainParams,
1412                 rejectedTxns:    make(map[chainhash.Hash]struct{}),
1413                 requestedTxns:   make(map[chainhash.Hash]struct{}),
1414                 requestedBlocks: make(map[chainhash.Hash]struct{}),
1415                 peerStates:      make(map[*peerpkg.Peer]*peerSyncState),
1416                 progressLogger:  newBlockProgressLogger("Processed", log),
1417                 msgChan:         make(chan interface{}, config.MaxPeers*3),
1418                 headerList:      list.New(),
1419                 quit:            make(chan struct{}),
1420         }
1421
1422         best := sm.chain.BestSnapshot()
1423         if !config.DisableCheckpoints {
1424                 // Initialize the next checkpoint based on the current height.
1425                 sm.nextCheckpoint = sm.findNextHeaderCheckpoint(best.Height)
1426                 if sm.nextCheckpoint != nil {
1427                         sm.resetHeaderState(&best.Hash, best.Height)
1428                 }
1429         } else {
1430                 log.Info("Checkpoints are disabled")
1431         }
1432
1433         sm.chain.Subscribe(sm.handleBlockchainNotification)
1434
1435         return &sm, nil
1436 }