OSDN Git Service

Merge pull request #41 from Bytom/dev
[bytom/vapor.git] / vendor / github.com / btcsuite / btcd / blockchain / indexers / manager.go
1 // Copyright (c) 2016 The btcsuite developers
2 // Use of this source code is governed by an ISC
3 // license that can be found in the LICENSE file.
4
5 package indexers
6
7 import (
8         "bytes"
9         "fmt"
10
11         "github.com/btcsuite/btcd/blockchain"
12         "github.com/btcsuite/btcd/chaincfg/chainhash"
13         "github.com/btcsuite/btcd/database"
14         "github.com/btcsuite/btcd/wire"
15         "github.com/btcsuite/btcutil"
16 )
17
18 var (
19         // indexTipsBucketName is the name of the db bucket used to house the
20         // current tip of each index.
21         indexTipsBucketName = []byte("idxtips")
22 )
23
24 // -----------------------------------------------------------------------------
25 // The index manager tracks the current tip of each index by using a parent
26 // bucket that contains an entry for index.
27 //
28 // The serialized format for an index tip is:
29 //
30 //   [<block hash><block height>],...
31 //
32 //   Field           Type             Size
33 //   block hash      chainhash.Hash   chainhash.HashSize
34 //   block height    uint32           4 bytes
35 // -----------------------------------------------------------------------------
36
37 // dbPutIndexerTip uses an existing database transaction to update or add the
38 // current tip for the given index to the provided values.
39 func dbPutIndexerTip(dbTx database.Tx, idxKey []byte, hash *chainhash.Hash, height int32) error {
40         serialized := make([]byte, chainhash.HashSize+4)
41         copy(serialized, hash[:])
42         byteOrder.PutUint32(serialized[chainhash.HashSize:], uint32(height))
43
44         indexesBucket := dbTx.Metadata().Bucket(indexTipsBucketName)
45         return indexesBucket.Put(idxKey, serialized)
46 }
47
48 // dbFetchIndexerTip uses an existing database transaction to retrieve the
49 // hash and height of the current tip for the provided index.
50 func dbFetchIndexerTip(dbTx database.Tx, idxKey []byte) (*chainhash.Hash, int32, error) {
51         indexesBucket := dbTx.Metadata().Bucket(indexTipsBucketName)
52         serialized := indexesBucket.Get(idxKey)
53         if len(serialized) < chainhash.HashSize+4 {
54                 return nil, 0, database.Error{
55                         ErrorCode: database.ErrCorruption,
56                         Description: fmt.Sprintf("unexpected end of data for "+
57                                 "index %q tip", string(idxKey)),
58                 }
59         }
60
61         var hash chainhash.Hash
62         copy(hash[:], serialized[:chainhash.HashSize])
63         height := int32(byteOrder.Uint32(serialized[chainhash.HashSize:]))
64         return &hash, height, nil
65 }
66
67 // dbIndexConnectBlock adds all of the index entries associated with the
68 // given block using the provided indexer and updates the tip of the indexer
69 // accordingly.  An error will be returned if the current tip for the indexer is
70 // not the previous block for the passed block.
71 func dbIndexConnectBlock(dbTx database.Tx, indexer Indexer, block *btcutil.Block, view *blockchain.UtxoViewpoint) error {
72         // Assert that the block being connected properly connects to the
73         // current tip of the index.
74         idxKey := indexer.Key()
75         curTipHash, _, err := dbFetchIndexerTip(dbTx, idxKey)
76         if err != nil {
77                 return err
78         }
79         if !curTipHash.IsEqual(&block.MsgBlock().Header.PrevBlock) {
80                 return AssertError(fmt.Sprintf("dbIndexConnectBlock must be "+
81                         "called with a block that extends the current index "+
82                         "tip (%s, tip %s, block %s)", indexer.Name(),
83                         curTipHash, block.Hash()))
84         }
85
86         // Notify the indexer with the connected block so it can index it.
87         if err := indexer.ConnectBlock(dbTx, block, view); err != nil {
88                 return err
89         }
90
91         // Update the current index tip.
92         return dbPutIndexerTip(dbTx, idxKey, block.Hash(), block.Height())
93 }
94
95 // dbIndexDisconnectBlock removes all of the index entries associated with the
96 // given block using the provided indexer and updates the tip of the indexer
97 // accordingly.  An error will be returned if the current tip for the indexer is
98 // not the passed block.
99 func dbIndexDisconnectBlock(dbTx database.Tx, indexer Indexer, block *btcutil.Block, view *blockchain.UtxoViewpoint) error {
100         // Assert that the block being disconnected is the current tip of the
101         // index.
102         idxKey := indexer.Key()
103         curTipHash, _, err := dbFetchIndexerTip(dbTx, idxKey)
104         if err != nil {
105                 return err
106         }
107         if !curTipHash.IsEqual(block.Hash()) {
108                 return AssertError(fmt.Sprintf("dbIndexDisconnectBlock must "+
109                         "be called with the block at the current index tip "+
110                         "(%s, tip %s, block %s)", indexer.Name(),
111                         curTipHash, block.Hash()))
112         }
113
114         // Notify the indexer with the disconnected block so it can remove all
115         // of the appropriate entries.
116         if err := indexer.DisconnectBlock(dbTx, block, view); err != nil {
117                 return err
118         }
119
120         // Update the current index tip.
121         prevHash := &block.MsgBlock().Header.PrevBlock
122         return dbPutIndexerTip(dbTx, idxKey, prevHash, block.Height()-1)
123 }
124
125 // Manager defines an index manager that manages multiple optional indexes and
126 // implements the blockchain.IndexManager interface so it can be seamlessly
127 // plugged into normal chain processing.
128 type Manager struct {
129         db             database.DB
130         enabledIndexes []Indexer
131 }
132
133 // Ensure the Manager type implements the blockchain.IndexManager interface.
134 var _ blockchain.IndexManager = (*Manager)(nil)
135
136 // indexDropKey returns the key for an index which indicates it is in the
137 // process of being dropped.
138 func indexDropKey(idxKey []byte) []byte {
139         dropKey := make([]byte, len(idxKey)+1)
140         dropKey[0] = 'd'
141         copy(dropKey[1:], idxKey)
142         return dropKey
143 }
144
145 // maybeFinishDrops determines if each of the enabled indexes are in the middle
146 // of being dropped and finishes dropping them when the are.  This is necessary
147 // because dropping and index has to be done in several atomic steps rather than
148 // one big atomic step due to the massive number of entries.
149 func (m *Manager) maybeFinishDrops() error {
150         indexNeedsDrop := make([]bool, len(m.enabledIndexes))
151         err := m.db.View(func(dbTx database.Tx) error {
152                 // None of the indexes needs to be dropped if the index tips
153                 // bucket hasn't been created yet.
154                 indexesBucket := dbTx.Metadata().Bucket(indexTipsBucketName)
155                 if indexesBucket == nil {
156                         return nil
157                 }
158
159                 // Make the indexer as requiring a drop if one is already in
160                 // progress.
161                 for i, indexer := range m.enabledIndexes {
162                         dropKey := indexDropKey(indexer.Key())
163                         if indexesBucket.Get(dropKey) != nil {
164                                 indexNeedsDrop[i] = true
165                         }
166                 }
167
168                 return nil
169         })
170         if err != nil {
171                 return err
172         }
173
174         // Finish dropping any of the enabled indexes that are already in the
175         // middle of being dropped.
176         for i, indexer := range m.enabledIndexes {
177                 if !indexNeedsDrop[i] {
178                         continue
179                 }
180
181                 log.Infof("Resuming %s drop", indexer.Name())
182                 err := dropIndex(m.db, indexer.Key(), indexer.Name())
183                 if err != nil {
184                         return err
185                 }
186         }
187
188         return nil
189 }
190
191 // maybeCreateIndexes determines if each of the enabled indexes have already
192 // been created and creates them if not.
193 func (m *Manager) maybeCreateIndexes(dbTx database.Tx) error {
194         indexesBucket := dbTx.Metadata().Bucket(indexTipsBucketName)
195         for _, indexer := range m.enabledIndexes {
196                 // Nothing to do if the index tip already exists.
197                 idxKey := indexer.Key()
198                 if indexesBucket.Get(idxKey) != nil {
199                         continue
200                 }
201
202                 // The tip for the index does not exist, so create it and
203                 // invoke the create callback for the index so it can perform
204                 // any one-time initialization it requires.
205                 if err := indexer.Create(dbTx); err != nil {
206                         return err
207                 }
208
209                 // Set the tip for the index to values which represent an
210                 // uninitialized index.
211                 err := dbPutIndexerTip(dbTx, idxKey, &chainhash.Hash{}, -1)
212                 if err != nil {
213                         return err
214                 }
215         }
216
217         return nil
218 }
219
220 // Init initializes the enabled indexes.  This is called during chain
221 // initialization and primarily consists of catching up all indexes to the
222 // current best chain tip.  This is necessary since each index can be disabled
223 // and re-enabled at any time and attempting to catch-up indexes at the same
224 // time new blocks are being downloaded would lead to an overall longer time to
225 // catch up due to the I/O contention.
226 //
227 // This is part of the blockchain.IndexManager interface.
228 func (m *Manager) Init(chain *blockchain.BlockChain) error {
229         // Nothing to do when no indexes are enabled.
230         if len(m.enabledIndexes) == 0 {
231                 return nil
232         }
233
234         // Finish and drops that were previously interrupted.
235         if err := m.maybeFinishDrops(); err != nil {
236                 return err
237         }
238
239         // Create the initial state for the indexes as needed.
240         err := m.db.Update(func(dbTx database.Tx) error {
241                 // Create the bucket for the current tips as needed.
242                 meta := dbTx.Metadata()
243                 _, err := meta.CreateBucketIfNotExists(indexTipsBucketName)
244                 if err != nil {
245                         return err
246                 }
247
248                 return m.maybeCreateIndexes(dbTx)
249         })
250         if err != nil {
251                 return err
252         }
253
254         // Initialize each of the enabled indexes.
255         for _, indexer := range m.enabledIndexes {
256                 if err := indexer.Init(); err != nil {
257                         return err
258                 }
259         }
260
261         // Rollback indexes to the main chain if their tip is an orphaned fork.
262         // This is fairly unlikely, but it can happen if the chain is
263         // reorganized while the index is disabled.  This has to be done in
264         // reverse order because later indexes can depend on earlier ones.
265         for i := len(m.enabledIndexes); i > 0; i-- {
266                 indexer := m.enabledIndexes[i-1]
267
268                 // Fetch the current tip for the index.
269                 var height int32
270                 var hash *chainhash.Hash
271                 err := m.db.View(func(dbTx database.Tx) error {
272                         idxKey := indexer.Key()
273                         hash, height, err = dbFetchIndexerTip(dbTx, idxKey)
274                         return err
275                 })
276                 if err != nil {
277                         return err
278                 }
279
280                 // Nothing to do if the index does not have any entries yet.
281                 if height == -1 {
282                         continue
283                 }
284
285                 // Loop until the tip is a block that exists in the main chain.
286                 initialHeight := height
287                 for !chain.MainChainHasBlock(hash) {
288                         // At this point the index tip is orphaned, so load the
289                         // orphaned block from the database directly and
290                         // disconnect it from the index.  The block has to be
291                         // loaded directly since it is no longer in the main
292                         // chain and thus the chain.BlockByHash function would
293                         // error.
294                         err = m.db.Update(func(dbTx database.Tx) error {
295                                 blockBytes, err := dbTx.FetchBlock(hash)
296                                 if err != nil {
297                                         return err
298                                 }
299                                 block, err := btcutil.NewBlockFromBytes(blockBytes)
300                                 if err != nil {
301                                         return err
302                                 }
303                                 block.SetHeight(height)
304
305                                 // When the index requires all of the referenced
306                                 // txouts they need to be retrieved from the
307                                 // transaction index.
308                                 var view *blockchain.UtxoViewpoint
309                                 if indexNeedsInputs(indexer) {
310                                         var err error
311                                         view, err = makeUtxoView(dbTx, block)
312                                         if err != nil {
313                                                 return err
314                                         }
315                                 }
316
317                                 // Remove all of the index entries associated
318                                 // with the block and update the indexer tip.
319                                 err = dbIndexDisconnectBlock(dbTx, indexer,
320                                         block, view)
321                                 if err != nil {
322                                         return err
323                                 }
324
325                                 // Update the tip to the previous block.
326                                 hash = &block.MsgBlock().Header.PrevBlock
327                                 height--
328
329                                 return nil
330                         })
331                         if err != nil {
332                                 return err
333                         }
334                 }
335
336                 if initialHeight != height {
337                         log.Infof("Removed %d orphaned blocks from %s "+
338                                 "(heights %d to %d)", initialHeight-height,
339                                 indexer.Name(), height+1, initialHeight)
340                 }
341         }
342
343         // Fetch the current tip heights for each index along with tracking the
344         // lowest one so the catchup code only needs to start at the earliest
345         // block and is able to skip connecting the block for the indexes that
346         // don't need it.
347         bestHeight := chain.BestSnapshot().Height
348         lowestHeight := bestHeight
349         indexerHeights := make([]int32, len(m.enabledIndexes))
350         err = m.db.View(func(dbTx database.Tx) error {
351                 for i, indexer := range m.enabledIndexes {
352                         idxKey := indexer.Key()
353                         hash, height, err := dbFetchIndexerTip(dbTx, idxKey)
354                         if err != nil {
355                                 return err
356                         }
357
358                         log.Debugf("Current %s tip (height %d, hash %v)",
359                                 indexer.Name(), height, hash)
360                         indexerHeights[i] = height
361                         if height < lowestHeight {
362                                 lowestHeight = height
363                         }
364                 }
365                 return nil
366         })
367         if err != nil {
368                 return err
369         }
370
371         // Nothing to index if all of the indexes are caught up.
372         if lowestHeight == bestHeight {
373                 return nil
374         }
375
376         // Create a progress logger for the indexing process below.
377         progressLogger := newBlockProgressLogger("Indexed", log)
378
379         // At this point, one or more indexes are behind the current best chain
380         // tip and need to be caught up, so log the details and loop through
381         // each block that needs to be indexed.
382         log.Infof("Catching up indexes from height %d to %d", lowestHeight,
383                 bestHeight)
384         for height := lowestHeight + 1; height <= bestHeight; height++ {
385                 // Load the block for the height since it is required to index
386                 // it.
387                 block, err := chain.BlockByHeight(height)
388                 if err != nil {
389                         return err
390                 }
391
392                 // Connect the block for all indexes that need it.
393                 var view *blockchain.UtxoViewpoint
394                 for i, indexer := range m.enabledIndexes {
395                         // Skip indexes that don't need to be updated with this
396                         // block.
397                         if indexerHeights[i] >= height {
398                                 continue
399                         }
400
401                         err := m.db.Update(func(dbTx database.Tx) error {
402                                 // When the index requires all of the referenced
403                                 // txouts and they haven't been loaded yet, they
404                                 // need to be retrieved from the transaction
405                                 // index.
406                                 if view == nil && indexNeedsInputs(indexer) {
407                                         var err error
408                                         view, err = makeUtxoView(dbTx, block)
409                                         if err != nil {
410                                                 return err
411                                         }
412                                 }
413                                 return dbIndexConnectBlock(dbTx, indexer, block,
414                                         view)
415                         })
416                         if err != nil {
417                                 return err
418                         }
419                         indexerHeights[i] = height
420                 }
421
422                 // Log indexing progress.
423                 progressLogger.LogBlockHeight(block)
424         }
425
426         log.Infof("Indexes caught up to height %d", bestHeight)
427         return nil
428 }
429
430 // indexNeedsInputs returns whether or not the index needs access to the txouts
431 // referenced by the transaction inputs being indexed.
432 func indexNeedsInputs(index Indexer) bool {
433         if idx, ok := index.(NeedsInputser); ok {
434                 return idx.NeedsInputs()
435         }
436
437         return false
438 }
439
440 // dbFetchTx looks up the passed transaction hash in the transaction index and
441 // loads it from the database.
442 func dbFetchTx(dbTx database.Tx, hash *chainhash.Hash) (*wire.MsgTx, error) {
443         // Look up the location of the transaction.
444         blockRegion, err := dbFetchTxIndexEntry(dbTx, hash)
445         if err != nil {
446                 return nil, err
447         }
448         if blockRegion == nil {
449                 return nil, fmt.Errorf("transaction %v not found", hash)
450         }
451
452         // Load the raw transaction bytes from the database.
453         txBytes, err := dbTx.FetchBlockRegion(blockRegion)
454         if err != nil {
455                 return nil, err
456         }
457
458         // Deserialize the transaction.
459         var msgTx wire.MsgTx
460         err = msgTx.Deserialize(bytes.NewReader(txBytes))
461         if err != nil {
462                 return nil, err
463         }
464
465         return &msgTx, nil
466 }
467
468 // makeUtxoView creates a mock unspent transaction output view by using the
469 // transaction index in order to look up all inputs referenced by the
470 // transactions in the block.  This is sometimes needed when catching indexes up
471 // because many of the txouts could actually already be spent however the
472 // associated scripts are still required to index them.
473 func makeUtxoView(dbTx database.Tx, block *btcutil.Block) (*blockchain.UtxoViewpoint, error) {
474         view := blockchain.NewUtxoViewpoint()
475         for txIdx, tx := range block.Transactions() {
476                 // Coinbases do not reference any inputs.  Since the block is
477                 // required to have already gone through full validation, it has
478                 // already been proven on the first transaction in the block is
479                 // a coinbase.
480                 if txIdx == 0 {
481                         continue
482                 }
483
484                 // Use the transaction index to load all of the referenced
485                 // inputs and add their outputs to the view.
486                 for _, txIn := range tx.MsgTx().TxIn {
487                         originOut := &txIn.PreviousOutPoint
488                         originTx, err := dbFetchTx(dbTx, &originOut.Hash)
489                         if err != nil {
490                                 return nil, err
491                         }
492
493                         view.AddTxOuts(btcutil.NewTx(originTx), 0)
494                 }
495         }
496
497         return view, nil
498 }
499
500 // ConnectBlock must be invoked when a block is extending the main chain.  It
501 // keeps track of the state of each index it is managing, performs some sanity
502 // checks, and invokes each indexer.
503 //
504 // This is part of the blockchain.IndexManager interface.
505 func (m *Manager) ConnectBlock(dbTx database.Tx, block *btcutil.Block, view *blockchain.UtxoViewpoint) error {
506         // Call each of the currently active optional indexes with the block
507         // being connected so they can update accordingly.
508         for _, index := range m.enabledIndexes {
509                 err := dbIndexConnectBlock(dbTx, index, block, view)
510                 if err != nil {
511                         return err
512                 }
513         }
514         return nil
515 }
516
517 // DisconnectBlock must be invoked when a block is being disconnected from the
518 // end of the main chain.  It keeps track of the state of each index it is
519 // managing, performs some sanity checks, and invokes each indexer to remove
520 // the index entries associated with the block.
521 //
522 // This is part of the blockchain.IndexManager interface.
523 func (m *Manager) DisconnectBlock(dbTx database.Tx, block *btcutil.Block, view *blockchain.UtxoViewpoint) error {
524         // Call each of the currently active optional indexes with the block
525         // being disconnected so they can update accordingly.
526         for _, index := range m.enabledIndexes {
527                 err := dbIndexDisconnectBlock(dbTx, index, block, view)
528                 if err != nil {
529                         return err
530                 }
531         }
532         return nil
533 }
534
535 // NewManager returns a new index manager with the provided indexes enabled.
536 //
537 // The manager returned satisfies the blockchain.IndexManager interface and thus
538 // cleanly plugs into the normal blockchain processing path.
539 func NewManager(db database.DB, enabledIndexes []Indexer) *Manager {
540         return &Manager{
541                 db:             db,
542                 enabledIndexes: enabledIndexes,
543         }
544 }
545
546 // dropIndex drops the passed index from the database.  Since indexes can be
547 // massive, it deletes the index in multiple database transactions in order to
548 // keep memory usage to reasonable levels.  It also marks the drop in progress
549 // so the drop can be resumed if it is stopped before it is done before the
550 // index can be used again.
551 func dropIndex(db database.DB, idxKey []byte, idxName string) error {
552         // Nothing to do if the index doesn't already exist.
553         var needsDelete bool
554         err := db.View(func(dbTx database.Tx) error {
555                 indexesBucket := dbTx.Metadata().Bucket(indexTipsBucketName)
556                 if indexesBucket != nil && indexesBucket.Get(idxKey) != nil {
557                         needsDelete = true
558                 }
559                 return nil
560         })
561         if err != nil {
562                 return err
563         }
564         if !needsDelete {
565                 log.Infof("Not dropping %s because it does not exist", idxName)
566                 return nil
567         }
568
569         // Mark that the index is in the process of being dropped so that it
570         // can be resumed on the next start if interrupted before the process is
571         // complete.
572         log.Infof("Dropping all %s entries.  This might take a while...",
573                 idxName)
574         err = db.Update(func(dbTx database.Tx) error {
575                 indexesBucket := dbTx.Metadata().Bucket(indexTipsBucketName)
576                 return indexesBucket.Put(indexDropKey(idxKey), idxKey)
577         })
578         if err != nil {
579                 return err
580         }
581
582         // Since the indexes can be so large, attempting to simply delete
583         // the bucket in a single database transaction would result in massive
584         // memory usage and likely crash many systems due to ulimits.  In order
585         // to avoid this, use a cursor to delete a maximum number of entries out
586         // of the bucket at a time.
587         const maxDeletions = 2000000
588         var totalDeleted uint64
589         for numDeleted := maxDeletions; numDeleted == maxDeletions; {
590                 numDeleted = 0
591                 err := db.Update(func(dbTx database.Tx) error {
592                         bucket := dbTx.Metadata().Bucket(idxKey)
593                         cursor := bucket.Cursor()
594                         for ok := cursor.First(); ok; ok = cursor.Next() &&
595                                 numDeleted < maxDeletions {
596
597                                 if err := cursor.Delete(); err != nil {
598                                         return err
599                                 }
600                                 numDeleted++
601                         }
602                         return nil
603                 })
604                 if err != nil {
605                         return err
606                 }
607
608                 if numDeleted > 0 {
609                         totalDeleted += uint64(numDeleted)
610                         log.Infof("Deleted %d keys (%d total) from %s",
611                                 numDeleted, totalDeleted, idxName)
612                 }
613         }
614
615         // Call extra index specific deinitialization for the transaction index.
616         if idxName == txIndexName {
617                 if err := dropBlockIDIndex(db); err != nil {
618                         return err
619                 }
620         }
621
622         // Remove the index tip, index bucket, and in-progress drop flag now
623         // that all index entries have been removed.
624         err = db.Update(func(dbTx database.Tx) error {
625                 meta := dbTx.Metadata()
626                 indexesBucket := meta.Bucket(indexTipsBucketName)
627                 if err := indexesBucket.Delete(idxKey); err != nil {
628                         return err
629                 }
630
631                 if err := meta.DeleteBucket(idxKey); err != nil {
632                         return err
633                 }
634
635                 return indexesBucket.Delete(indexDropKey(idxKey))
636         })
637         if err != nil {
638                 return err
639         }
640
641         log.Infof("Dropped %s", idxName)
642         return nil
643 }