1 // Copyright (c) 2016 The btcsuite developers
2 // Use of this source code is governed by an ISC
3 // license that can be found in the LICENSE file.
11 "github.com/btcsuite/btcd/blockchain"
12 "github.com/btcsuite/btcd/chaincfg/chainhash"
13 "github.com/btcsuite/btcd/database"
14 "github.com/btcsuite/btcd/wire"
15 "github.com/btcsuite/btcutil"
19 // indexTipsBucketName is the name of the db bucket used to house the
20 // current tip of each index.
21 indexTipsBucketName = []byte("idxtips")
24 // -----------------------------------------------------------------------------
25 // The index manager tracks the current tip of each index by using a parent
26 // bucket that contains an entry for index.
28 // The serialized format for an index tip is:
30 // [<block hash><block height>],...
33 // block hash chainhash.Hash chainhash.HashSize
34 // block height uint32 4 bytes
35 // -----------------------------------------------------------------------------
37 // dbPutIndexerTip uses an existing database transaction to update or add the
38 // current tip for the given index to the provided values.
39 func dbPutIndexerTip(dbTx database.Tx, idxKey []byte, hash *chainhash.Hash, height int32) error {
40 serialized := make([]byte, chainhash.HashSize+4)
41 copy(serialized, hash[:])
42 byteOrder.PutUint32(serialized[chainhash.HashSize:], uint32(height))
44 indexesBucket := dbTx.Metadata().Bucket(indexTipsBucketName)
45 return indexesBucket.Put(idxKey, serialized)
48 // dbFetchIndexerTip uses an existing database transaction to retrieve the
49 // hash and height of the current tip for the provided index.
50 func dbFetchIndexerTip(dbTx database.Tx, idxKey []byte) (*chainhash.Hash, int32, error) {
51 indexesBucket := dbTx.Metadata().Bucket(indexTipsBucketName)
52 serialized := indexesBucket.Get(idxKey)
53 if len(serialized) < chainhash.HashSize+4 {
54 return nil, 0, database.Error{
55 ErrorCode: database.ErrCorruption,
56 Description: fmt.Sprintf("unexpected end of data for "+
57 "index %q tip", string(idxKey)),
61 var hash chainhash.Hash
62 copy(hash[:], serialized[:chainhash.HashSize])
63 height := int32(byteOrder.Uint32(serialized[chainhash.HashSize:]))
64 return &hash, height, nil
67 // dbIndexConnectBlock adds all of the index entries associated with the
68 // given block using the provided indexer and updates the tip of the indexer
69 // accordingly. An error will be returned if the current tip for the indexer is
70 // not the previous block for the passed block.
71 func dbIndexConnectBlock(dbTx database.Tx, indexer Indexer, block *btcutil.Block, view *blockchain.UtxoViewpoint) error {
72 // Assert that the block being connected properly connects to the
73 // current tip of the index.
74 idxKey := indexer.Key()
75 curTipHash, _, err := dbFetchIndexerTip(dbTx, idxKey)
79 if !curTipHash.IsEqual(&block.MsgBlock().Header.PrevBlock) {
80 return AssertError(fmt.Sprintf("dbIndexConnectBlock must be "+
81 "called with a block that extends the current index "+
82 "tip (%s, tip %s, block %s)", indexer.Name(),
83 curTipHash, block.Hash()))
86 // Notify the indexer with the connected block so it can index it.
87 if err := indexer.ConnectBlock(dbTx, block, view); err != nil {
91 // Update the current index tip.
92 return dbPutIndexerTip(dbTx, idxKey, block.Hash(), block.Height())
95 // dbIndexDisconnectBlock removes all of the index entries associated with the
96 // given block using the provided indexer and updates the tip of the indexer
97 // accordingly. An error will be returned if the current tip for the indexer is
98 // not the passed block.
99 func dbIndexDisconnectBlock(dbTx database.Tx, indexer Indexer, block *btcutil.Block, view *blockchain.UtxoViewpoint) error {
100 // Assert that the block being disconnected is the current tip of the
102 idxKey := indexer.Key()
103 curTipHash, _, err := dbFetchIndexerTip(dbTx, idxKey)
107 if !curTipHash.IsEqual(block.Hash()) {
108 return AssertError(fmt.Sprintf("dbIndexDisconnectBlock must "+
109 "be called with the block at the current index tip "+
110 "(%s, tip %s, block %s)", indexer.Name(),
111 curTipHash, block.Hash()))
114 // Notify the indexer with the disconnected block so it can remove all
115 // of the appropriate entries.
116 if err := indexer.DisconnectBlock(dbTx, block, view); err != nil {
120 // Update the current index tip.
121 prevHash := &block.MsgBlock().Header.PrevBlock
122 return dbPutIndexerTip(dbTx, idxKey, prevHash, block.Height()-1)
125 // Manager defines an index manager that manages multiple optional indexes and
126 // implements the blockchain.IndexManager interface so it can be seamlessly
127 // plugged into normal chain processing.
128 type Manager struct {
130 enabledIndexes []Indexer
133 // Ensure the Manager type implements the blockchain.IndexManager interface.
134 var _ blockchain.IndexManager = (*Manager)(nil)
136 // indexDropKey returns the key for an index which indicates it is in the
137 // process of being dropped.
138 func indexDropKey(idxKey []byte) []byte {
139 dropKey := make([]byte, len(idxKey)+1)
141 copy(dropKey[1:], idxKey)
145 // maybeFinishDrops determines if each of the enabled indexes are in the middle
146 // of being dropped and finishes dropping them when the are. This is necessary
147 // because dropping and index has to be done in several atomic steps rather than
148 // one big atomic step due to the massive number of entries.
149 func (m *Manager) maybeFinishDrops() error {
150 indexNeedsDrop := make([]bool, len(m.enabledIndexes))
151 err := m.db.View(func(dbTx database.Tx) error {
152 // None of the indexes needs to be dropped if the index tips
153 // bucket hasn't been created yet.
154 indexesBucket := dbTx.Metadata().Bucket(indexTipsBucketName)
155 if indexesBucket == nil {
159 // Make the indexer as requiring a drop if one is already in
161 for i, indexer := range m.enabledIndexes {
162 dropKey := indexDropKey(indexer.Key())
163 if indexesBucket.Get(dropKey) != nil {
164 indexNeedsDrop[i] = true
174 // Finish dropping any of the enabled indexes that are already in the
175 // middle of being dropped.
176 for i, indexer := range m.enabledIndexes {
177 if !indexNeedsDrop[i] {
181 log.Infof("Resuming %s drop", indexer.Name())
182 err := dropIndex(m.db, indexer.Key(), indexer.Name())
191 // maybeCreateIndexes determines if each of the enabled indexes have already
192 // been created and creates them if not.
193 func (m *Manager) maybeCreateIndexes(dbTx database.Tx) error {
194 indexesBucket := dbTx.Metadata().Bucket(indexTipsBucketName)
195 for _, indexer := range m.enabledIndexes {
196 // Nothing to do if the index tip already exists.
197 idxKey := indexer.Key()
198 if indexesBucket.Get(idxKey) != nil {
202 // The tip for the index does not exist, so create it and
203 // invoke the create callback for the index so it can perform
204 // any one-time initialization it requires.
205 if err := indexer.Create(dbTx); err != nil {
209 // Set the tip for the index to values which represent an
210 // uninitialized index.
211 err := dbPutIndexerTip(dbTx, idxKey, &chainhash.Hash{}, -1)
220 // Init initializes the enabled indexes. This is called during chain
221 // initialization and primarily consists of catching up all indexes to the
222 // current best chain tip. This is necessary since each index can be disabled
223 // and re-enabled at any time and attempting to catch-up indexes at the same
224 // time new blocks are being downloaded would lead to an overall longer time to
225 // catch up due to the I/O contention.
227 // This is part of the blockchain.IndexManager interface.
228 func (m *Manager) Init(chain *blockchain.BlockChain) error {
229 // Nothing to do when no indexes are enabled.
230 if len(m.enabledIndexes) == 0 {
234 // Finish and drops that were previously interrupted.
235 if err := m.maybeFinishDrops(); err != nil {
239 // Create the initial state for the indexes as needed.
240 err := m.db.Update(func(dbTx database.Tx) error {
241 // Create the bucket for the current tips as needed.
242 meta := dbTx.Metadata()
243 _, err := meta.CreateBucketIfNotExists(indexTipsBucketName)
248 return m.maybeCreateIndexes(dbTx)
254 // Initialize each of the enabled indexes.
255 for _, indexer := range m.enabledIndexes {
256 if err := indexer.Init(); err != nil {
261 // Rollback indexes to the main chain if their tip is an orphaned fork.
262 // This is fairly unlikely, but it can happen if the chain is
263 // reorganized while the index is disabled. This has to be done in
264 // reverse order because later indexes can depend on earlier ones.
265 for i := len(m.enabledIndexes); i > 0; i-- {
266 indexer := m.enabledIndexes[i-1]
268 // Fetch the current tip for the index.
270 var hash *chainhash.Hash
271 err := m.db.View(func(dbTx database.Tx) error {
272 idxKey := indexer.Key()
273 hash, height, err = dbFetchIndexerTip(dbTx, idxKey)
280 // Nothing to do if the index does not have any entries yet.
285 // Loop until the tip is a block that exists in the main chain.
286 initialHeight := height
287 for !chain.MainChainHasBlock(hash) {
288 // At this point the index tip is orphaned, so load the
289 // orphaned block from the database directly and
290 // disconnect it from the index. The block has to be
291 // loaded directly since it is no longer in the main
292 // chain and thus the chain.BlockByHash function would
294 err = m.db.Update(func(dbTx database.Tx) error {
295 blockBytes, err := dbTx.FetchBlock(hash)
299 block, err := btcutil.NewBlockFromBytes(blockBytes)
303 block.SetHeight(height)
305 // When the index requires all of the referenced
306 // txouts they need to be retrieved from the
307 // transaction index.
308 var view *blockchain.UtxoViewpoint
309 if indexNeedsInputs(indexer) {
311 view, err = makeUtxoView(dbTx, block)
317 // Remove all of the index entries associated
318 // with the block and update the indexer tip.
319 err = dbIndexDisconnectBlock(dbTx, indexer,
325 // Update the tip to the previous block.
326 hash = &block.MsgBlock().Header.PrevBlock
336 if initialHeight != height {
337 log.Infof("Removed %d orphaned blocks from %s "+
338 "(heights %d to %d)", initialHeight-height,
339 indexer.Name(), height+1, initialHeight)
343 // Fetch the current tip heights for each index along with tracking the
344 // lowest one so the catchup code only needs to start at the earliest
345 // block and is able to skip connecting the block for the indexes that
347 bestHeight := chain.BestSnapshot().Height
348 lowestHeight := bestHeight
349 indexerHeights := make([]int32, len(m.enabledIndexes))
350 err = m.db.View(func(dbTx database.Tx) error {
351 for i, indexer := range m.enabledIndexes {
352 idxKey := indexer.Key()
353 hash, height, err := dbFetchIndexerTip(dbTx, idxKey)
358 log.Debugf("Current %s tip (height %d, hash %v)",
359 indexer.Name(), height, hash)
360 indexerHeights[i] = height
361 if height < lowestHeight {
362 lowestHeight = height
371 // Nothing to index if all of the indexes are caught up.
372 if lowestHeight == bestHeight {
376 // Create a progress logger for the indexing process below.
377 progressLogger := newBlockProgressLogger("Indexed", log)
379 // At this point, one or more indexes are behind the current best chain
380 // tip and need to be caught up, so log the details and loop through
381 // each block that needs to be indexed.
382 log.Infof("Catching up indexes from height %d to %d", lowestHeight,
384 for height := lowestHeight + 1; height <= bestHeight; height++ {
385 // Load the block for the height since it is required to index
387 block, err := chain.BlockByHeight(height)
392 // Connect the block for all indexes that need it.
393 var view *blockchain.UtxoViewpoint
394 for i, indexer := range m.enabledIndexes {
395 // Skip indexes that don't need to be updated with this
397 if indexerHeights[i] >= height {
401 err := m.db.Update(func(dbTx database.Tx) error {
402 // When the index requires all of the referenced
403 // txouts and they haven't been loaded yet, they
404 // need to be retrieved from the transaction
406 if view == nil && indexNeedsInputs(indexer) {
408 view, err = makeUtxoView(dbTx, block)
413 return dbIndexConnectBlock(dbTx, indexer, block,
419 indexerHeights[i] = height
422 // Log indexing progress.
423 progressLogger.LogBlockHeight(block)
426 log.Infof("Indexes caught up to height %d", bestHeight)
430 // indexNeedsInputs returns whether or not the index needs access to the txouts
431 // referenced by the transaction inputs being indexed.
432 func indexNeedsInputs(index Indexer) bool {
433 if idx, ok := index.(NeedsInputser); ok {
434 return idx.NeedsInputs()
440 // dbFetchTx looks up the passed transaction hash in the transaction index and
441 // loads it from the database.
442 func dbFetchTx(dbTx database.Tx, hash *chainhash.Hash) (*wire.MsgTx, error) {
443 // Look up the location of the transaction.
444 blockRegion, err := dbFetchTxIndexEntry(dbTx, hash)
448 if blockRegion == nil {
449 return nil, fmt.Errorf("transaction %v not found", hash)
452 // Load the raw transaction bytes from the database.
453 txBytes, err := dbTx.FetchBlockRegion(blockRegion)
458 // Deserialize the transaction.
460 err = msgTx.Deserialize(bytes.NewReader(txBytes))
468 // makeUtxoView creates a mock unspent transaction output view by using the
469 // transaction index in order to look up all inputs referenced by the
470 // transactions in the block. This is sometimes needed when catching indexes up
471 // because many of the txouts could actually already be spent however the
472 // associated scripts are still required to index them.
473 func makeUtxoView(dbTx database.Tx, block *btcutil.Block) (*blockchain.UtxoViewpoint, error) {
474 view := blockchain.NewUtxoViewpoint()
475 for txIdx, tx := range block.Transactions() {
476 // Coinbases do not reference any inputs. Since the block is
477 // required to have already gone through full validation, it has
478 // already been proven on the first transaction in the block is
484 // Use the transaction index to load all of the referenced
485 // inputs and add their outputs to the view.
486 for _, txIn := range tx.MsgTx().TxIn {
487 originOut := &txIn.PreviousOutPoint
488 originTx, err := dbFetchTx(dbTx, &originOut.Hash)
493 view.AddTxOuts(btcutil.NewTx(originTx), 0)
500 // ConnectBlock must be invoked when a block is extending the main chain. It
501 // keeps track of the state of each index it is managing, performs some sanity
502 // checks, and invokes each indexer.
504 // This is part of the blockchain.IndexManager interface.
505 func (m *Manager) ConnectBlock(dbTx database.Tx, block *btcutil.Block, view *blockchain.UtxoViewpoint) error {
506 // Call each of the currently active optional indexes with the block
507 // being connected so they can update accordingly.
508 for _, index := range m.enabledIndexes {
509 err := dbIndexConnectBlock(dbTx, index, block, view)
517 // DisconnectBlock must be invoked when a block is being disconnected from the
518 // end of the main chain. It keeps track of the state of each index it is
519 // managing, performs some sanity checks, and invokes each indexer to remove
520 // the index entries associated with the block.
522 // This is part of the blockchain.IndexManager interface.
523 func (m *Manager) DisconnectBlock(dbTx database.Tx, block *btcutil.Block, view *blockchain.UtxoViewpoint) error {
524 // Call each of the currently active optional indexes with the block
525 // being disconnected so they can update accordingly.
526 for _, index := range m.enabledIndexes {
527 err := dbIndexDisconnectBlock(dbTx, index, block, view)
535 // NewManager returns a new index manager with the provided indexes enabled.
537 // The manager returned satisfies the blockchain.IndexManager interface and thus
538 // cleanly plugs into the normal blockchain processing path.
539 func NewManager(db database.DB, enabledIndexes []Indexer) *Manager {
542 enabledIndexes: enabledIndexes,
546 // dropIndex drops the passed index from the database. Since indexes can be
547 // massive, it deletes the index in multiple database transactions in order to
548 // keep memory usage to reasonable levels. It also marks the drop in progress
549 // so the drop can be resumed if it is stopped before it is done before the
550 // index can be used again.
551 func dropIndex(db database.DB, idxKey []byte, idxName string) error {
552 // Nothing to do if the index doesn't already exist.
554 err := db.View(func(dbTx database.Tx) error {
555 indexesBucket := dbTx.Metadata().Bucket(indexTipsBucketName)
556 if indexesBucket != nil && indexesBucket.Get(idxKey) != nil {
565 log.Infof("Not dropping %s because it does not exist", idxName)
569 // Mark that the index is in the process of being dropped so that it
570 // can be resumed on the next start if interrupted before the process is
572 log.Infof("Dropping all %s entries. This might take a while...",
574 err = db.Update(func(dbTx database.Tx) error {
575 indexesBucket := dbTx.Metadata().Bucket(indexTipsBucketName)
576 return indexesBucket.Put(indexDropKey(idxKey), idxKey)
582 // Since the indexes can be so large, attempting to simply delete
583 // the bucket in a single database transaction would result in massive
584 // memory usage and likely crash many systems due to ulimits. In order
585 // to avoid this, use a cursor to delete a maximum number of entries out
586 // of the bucket at a time.
587 const maxDeletions = 2000000
588 var totalDeleted uint64
589 for numDeleted := maxDeletions; numDeleted == maxDeletions; {
591 err := db.Update(func(dbTx database.Tx) error {
592 bucket := dbTx.Metadata().Bucket(idxKey)
593 cursor := bucket.Cursor()
594 for ok := cursor.First(); ok; ok = cursor.Next() &&
595 numDeleted < maxDeletions {
597 if err := cursor.Delete(); err != nil {
609 totalDeleted += uint64(numDeleted)
610 log.Infof("Deleted %d keys (%d total) from %s",
611 numDeleted, totalDeleted, idxName)
615 // Call extra index specific deinitialization for the transaction index.
616 if idxName == txIndexName {
617 if err := dropBlockIDIndex(db); err != nil {
622 // Remove the index tip, index bucket, and in-progress drop flag now
623 // that all index entries have been removed.
624 err = db.Update(func(dbTx database.Tx) error {
625 meta := dbTx.Metadata()
626 indexesBucket := meta.Bucket(indexTipsBucketName)
627 if err := indexesBucket.Delete(idxKey); err != nil {
631 if err := meta.DeleteBucket(idxKey); err != nil {
635 return indexesBucket.Delete(indexDropKey(idxKey))
641 log.Infof("Dropped %s", idxName)