1 // Copyright (c) 2015-2016 The btcsuite developers
2 // Use of this source code is governed by an ISC
3 // license that can be found in the LICENSE file.
17 "github.com/btcsuite/btcd/chaincfg/chainhash"
18 "github.com/btcsuite/btcd/database"
19 "github.com/btcsuite/btcd/database/internal/treap"
20 "github.com/btcsuite/btcd/wire"
21 "github.com/btcsuite/btcutil"
22 "github.com/btcsuite/goleveldb/leveldb"
23 "github.com/btcsuite/goleveldb/leveldb/comparer"
24 ldberrors "github.com/btcsuite/goleveldb/leveldb/errors"
25 "github.com/btcsuite/goleveldb/leveldb/filter"
26 "github.com/btcsuite/goleveldb/leveldb/iterator"
27 "github.com/btcsuite/goleveldb/leveldb/opt"
28 "github.com/btcsuite/goleveldb/leveldb/util"
32 // metadataDbName is the name used for the metadata database.
33 metadataDbName = "metadata"
35 // blockHdrSize is the size of a block header. This is simply the
36 // constant from wire and is only provided here for convenience since
37 // wire.MaxBlockHeaderPayload is quite long.
38 blockHdrSize = wire.MaxBlockHeaderPayload
40 // blockHdrOffset defines the offsets into a block index row for the
43 // The serialized block index row format is:
44 // <blocklocation><blockheader>
45 blockHdrOffset = blockLocSize
49 // byteOrder is the preferred byte order used through the database and
50 // block files. Sometimes big endian will be used to allow ordered byte
51 // sortable integer values.
52 byteOrder = binary.LittleEndian
54 // bucketIndexPrefix is the prefix used for all entries in the bucket
56 bucketIndexPrefix = []byte("bidx")
58 // curBucketIDKeyName is the name of the key used to keep track of the
59 // current bucket ID counter.
60 curBucketIDKeyName = []byte("bidx-cbid")
62 // metadataBucketID is the ID of the top-level metadata bucket.
63 // It is the value 0 encoded as an unsigned big-endian uint32.
64 metadataBucketID = [4]byte{}
66 // blockIdxBucketID is the ID of the internal block metadata bucket.
67 // It is the value 1 encoded as an unsigned big-endian uint32.
68 blockIdxBucketID = [4]byte{0x00, 0x00, 0x00, 0x01}
70 // blockIdxBucketName is the bucket used internally to track block
72 blockIdxBucketName = []byte("ffldb-blockidx")
74 // writeLocKeyName is the key used to store the current write file
76 writeLocKeyName = []byte("ffldb-writeloc")
79 // Common error strings.
81 // errDbNotOpenStr is the text to use for the database.ErrDbNotOpen
83 errDbNotOpenStr = "database is not open"
85 // errTxClosedStr is the text to use for the database.ErrTxClosed error
87 errTxClosedStr = "database tx is closed"
90 // bulkFetchData is allows a block location to be specified along with the
91 // index it was requested from. This in turn allows the bulk data loading
92 // functions to sort the data accesses based on the location to improve
93 // performance while keeping track of which result the data is for.
94 type bulkFetchData struct {
99 // bulkFetchDataSorter implements sort.Interface to allow a slice of
100 // bulkFetchData to be sorted. In particular it sorts by file and then
101 // offset so that reads from files are grouped and linear.
102 type bulkFetchDataSorter []bulkFetchData
104 // Len returns the number of items in the slice. It is part of the
105 // sort.Interface implementation.
106 func (s bulkFetchDataSorter) Len() int {
110 // Swap swaps the items at the passed indices. It is part of the
111 // sort.Interface implementation.
112 func (s bulkFetchDataSorter) Swap(i, j int) {
113 s[i], s[j] = s[j], s[i]
116 // Less returns whether the item with index i should sort before the item with
117 // index j. It is part of the sort.Interface implementation.
118 func (s bulkFetchDataSorter) Less(i, j int) bool {
119 if s[i].blockFileNum < s[j].blockFileNum {
122 if s[i].blockFileNum > s[j].blockFileNum {
126 return s[i].fileOffset < s[j].fileOffset
129 // makeDbErr creates a database.Error given a set of arguments.
130 func makeDbErr(c database.ErrorCode, desc string, err error) database.Error {
131 return database.Error{ErrorCode: c, Description: desc, Err: err}
134 // convertErr converts the passed leveldb error into a database error with an
135 // equivalent error code and the passed description. It also sets the passed
136 // error as the underlying error.
137 func convertErr(desc string, ldbErr error) database.Error {
138 // Use the driver-specific error code by default. The code below will
139 // update this with the converted error if it's recognized.
140 var code = database.ErrDriverSpecific
143 // Database corruption errors.
144 case ldberrors.IsCorrupted(ldbErr):
145 code = database.ErrCorruption
147 // Database open/create errors.
148 case ldbErr == leveldb.ErrClosed:
149 code = database.ErrDbNotOpen
151 // Transaction errors.
152 case ldbErr == leveldb.ErrSnapshotReleased:
153 code = database.ErrTxClosed
154 case ldbErr == leveldb.ErrIterReleased:
155 code = database.ErrTxClosed
158 return database.Error{ErrorCode: code, Description: desc, Err: ldbErr}
161 // copySlice returns a copy of the passed slice. This is mostly used to copy
162 // leveldb iterator keys and values since they are only valid until the iterator
163 // is moved instead of during the entirety of the transaction.
164 func copySlice(slice []byte) []byte {
165 ret := make([]byte, len(slice))
170 // cursor is an internal type used to represent a cursor over key/value pairs
171 // and nested buckets of a bucket and implements the database.Cursor interface.
174 dbIter iterator.Iterator
175 pendingIter iterator.Iterator
176 currentIter iterator.Iterator
179 // Enforce cursor implements the database.Cursor interface.
180 var _ database.Cursor = (*cursor)(nil)
182 // Bucket returns the bucket the cursor was created for.
184 // This function is part of the database.Cursor interface implementation.
185 func (c *cursor) Bucket() database.Bucket {
186 // Ensure transaction state is valid.
187 if err := c.bucket.tx.checkClosed(); err != nil {
194 // Delete removes the current key/value pair the cursor is at without
195 // invalidating the cursor.
197 // Returns the following errors as required by the interface contract:
198 // - ErrIncompatibleValue if attempted when the cursor points to a nested
200 // - ErrTxNotWritable if attempted against a read-only transaction
201 // - ErrTxClosed if the transaction has already been closed
203 // This function is part of the database.Cursor interface implementation.
204 func (c *cursor) Delete() error {
205 // Ensure transaction state is valid.
206 if err := c.bucket.tx.checkClosed(); err != nil {
210 // Error if the cursor is exhausted.
211 if c.currentIter == nil {
212 str := "cursor is exhausted"
213 return makeDbErr(database.ErrIncompatibleValue, str, nil)
216 // Do not allow buckets to be deleted via the cursor.
217 key := c.currentIter.Key()
218 if bytes.HasPrefix(key, bucketIndexPrefix) {
219 str := "buckets may not be deleted from a cursor"
220 return makeDbErr(database.ErrIncompatibleValue, str, nil)
223 c.bucket.tx.deleteKey(copySlice(key), true)
227 // skipPendingUpdates skips any keys at the current database iterator position
228 // that are being updated by the transaction. The forwards flag indicates the
229 // direction the cursor is moving.
230 func (c *cursor) skipPendingUpdates(forwards bool) {
231 for c.dbIter.Valid() {
233 key := c.dbIter.Key()
234 if c.bucket.tx.pendingRemove.Has(key) {
236 } else if c.bucket.tx.pendingKeys.Has(key) {
251 // chooseIterator first skips any entries in the database iterator that are
252 // being updated by the transaction and sets the current iterator to the
253 // appropriate iterator depending on their validity and the order they compare
254 // in while taking into account the direction flag. When the cursor is being
255 // moved forwards and both iterators are valid, the iterator with the smaller
256 // key is chosen and vice versa when the cursor is being moved backwards.
257 func (c *cursor) chooseIterator(forwards bool) bool {
258 // Skip any keys at the current database iterator position that are
259 // being updated by the transaction.
260 c.skipPendingUpdates(forwards)
262 // When both iterators are exhausted, the cursor is exhausted too.
263 if !c.dbIter.Valid() && !c.pendingIter.Valid() {
268 // Choose the database iterator when the pending keys iterator is
270 if !c.pendingIter.Valid() {
271 c.currentIter = c.dbIter
275 // Choose the pending keys iterator when the database iterator is
277 if !c.dbIter.Valid() {
278 c.currentIter = c.pendingIter
282 // Both iterators are valid, so choose the iterator with either the
283 // smaller or larger key depending on the forwards flag.
284 compare := bytes.Compare(c.dbIter.Key(), c.pendingIter.Key())
285 if (forwards && compare > 0) || (!forwards && compare < 0) {
286 c.currentIter = c.pendingIter
288 c.currentIter = c.dbIter
293 // First positions the cursor at the first key/value pair and returns whether or
294 // not the pair exists.
296 // This function is part of the database.Cursor interface implementation.
297 func (c *cursor) First() bool {
298 // Ensure transaction state is valid.
299 if err := c.bucket.tx.checkClosed(); err != nil {
303 // Seek to the first key in both the database and pending iterators and
304 // choose the iterator that is both valid and has the smaller key.
306 c.pendingIter.First()
307 return c.chooseIterator(true)
310 // Last positions the cursor at the last key/value pair and returns whether or
311 // not the pair exists.
313 // This function is part of the database.Cursor interface implementation.
314 func (c *cursor) Last() bool {
315 // Ensure transaction state is valid.
316 if err := c.bucket.tx.checkClosed(); err != nil {
320 // Seek to the last key in both the database and pending iterators and
321 // choose the iterator that is both valid and has the larger key.
324 return c.chooseIterator(false)
327 // Next moves the cursor one key/value pair forward and returns whether or not
330 // This function is part of the database.Cursor interface implementation.
331 func (c *cursor) Next() bool {
332 // Ensure transaction state is valid.
333 if err := c.bucket.tx.checkClosed(); err != nil {
337 // Nothing to return if cursor is exhausted.
338 if c.currentIter == nil {
342 // Move the current iterator to the next entry and choose the iterator
343 // that is both valid and has the smaller key.
345 return c.chooseIterator(true)
348 // Prev moves the cursor one key/value pair backward and returns whether or not
351 // This function is part of the database.Cursor interface implementation.
352 func (c *cursor) Prev() bool {
353 // Ensure transaction state is valid.
354 if err := c.bucket.tx.checkClosed(); err != nil {
358 // Nothing to return if cursor is exhausted.
359 if c.currentIter == nil {
363 // Move the current iterator to the previous entry and choose the
364 // iterator that is both valid and has the larger key.
366 return c.chooseIterator(false)
369 // Seek positions the cursor at the first key/value pair that is greater than or
370 // equal to the passed seek key. Returns false if no suitable key was found.
372 // This function is part of the database.Cursor interface implementation.
373 func (c *cursor) Seek(seek []byte) bool {
374 // Ensure transaction state is valid.
375 if err := c.bucket.tx.checkClosed(); err != nil {
379 // Seek to the provided key in both the database and pending iterators
380 // then choose the iterator that is both valid and has the larger key.
381 seekKey := bucketizedKey(c.bucket.id, seek)
382 c.dbIter.Seek(seekKey)
383 c.pendingIter.Seek(seekKey)
384 return c.chooseIterator(true)
387 // rawKey returns the current key the cursor is pointing to without stripping
388 // the current bucket prefix or bucket index prefix.
389 func (c *cursor) rawKey() []byte {
390 // Nothing to return if cursor is exhausted.
391 if c.currentIter == nil {
395 return copySlice(c.currentIter.Key())
398 // Key returns the current key the cursor is pointing to.
400 // This function is part of the database.Cursor interface implementation.
401 func (c *cursor) Key() []byte {
402 // Ensure transaction state is valid.
403 if err := c.bucket.tx.checkClosed(); err != nil {
407 // Nothing to return if cursor is exhausted.
408 if c.currentIter == nil {
412 // Slice out the actual key name and make a copy since it is no longer
413 // valid after iterating to the next item.
415 // The key is after the bucket index prefix and parent ID when the
416 // cursor is pointing to a nested bucket.
417 key := c.currentIter.Key()
418 if bytes.HasPrefix(key, bucketIndexPrefix) {
419 key = key[len(bucketIndexPrefix)+4:]
420 return copySlice(key)
423 // The key is after the bucket ID when the cursor is pointing to a
425 key = key[len(c.bucket.id):]
426 return copySlice(key)
429 // rawValue returns the current value the cursor is pointing to without
430 // stripping without filtering bucket index values.
431 func (c *cursor) rawValue() []byte {
432 // Nothing to return if cursor is exhausted.
433 if c.currentIter == nil {
437 return copySlice(c.currentIter.Value())
440 // Value returns the current value the cursor is pointing to. This will be nil
441 // for nested buckets.
443 // This function is part of the database.Cursor interface implementation.
444 func (c *cursor) Value() []byte {
445 // Ensure transaction state is valid.
446 if err := c.bucket.tx.checkClosed(); err != nil {
450 // Nothing to return if cursor is exhausted.
451 if c.currentIter == nil {
455 // Return nil for the value when the cursor is pointing to a nested
457 if bytes.HasPrefix(c.currentIter.Key(), bucketIndexPrefix) {
461 return copySlice(c.currentIter.Value())
464 // cursorType defines the type of cursor to create.
467 // The following constants define the allowed cursor types.
469 // ctKeys iterates through all of the keys in a given bucket.
470 ctKeys cursorType = iota
472 // ctBuckets iterates through all directly nested buckets in a given
476 // ctFull iterates through both the keys and the directly nested buckets
477 // in a given bucket.
481 // cursorFinalizer is either invoked when a cursor is being garbage collected or
482 // called manually to ensure the underlying cursor iterators are released.
483 func cursorFinalizer(c *cursor) {
485 c.pendingIter.Release()
488 // newCursor returns a new cursor for the given bucket, bucket ID, and cursor
491 // NOTE: The caller is responsible for calling the cursorFinalizer function on
492 // the returned cursor.
493 func newCursor(b *bucket, bucketID []byte, cursorTyp cursorType) *cursor {
494 var dbIter, pendingIter iterator.Iterator
497 keyRange := util.BytesPrefix(bucketID)
498 dbIter = b.tx.snapshot.NewIterator(keyRange)
499 pendingKeyIter := newLdbTreapIter(b.tx, keyRange)
500 pendingIter = pendingKeyIter
503 // The serialized bucket index key format is:
504 // <bucketindexprefix><parentbucketid><bucketname>
506 // Create an iterator for the both the database and the pending
507 // keys which are prefixed by the bucket index identifier and
508 // the provided bucket ID.
509 prefix := make([]byte, len(bucketIndexPrefix)+4)
510 copy(prefix, bucketIndexPrefix)
511 copy(prefix[len(bucketIndexPrefix):], bucketID)
512 bucketRange := util.BytesPrefix(prefix)
514 dbIter = b.tx.snapshot.NewIterator(bucketRange)
515 pendingBucketIter := newLdbTreapIter(b.tx, bucketRange)
516 pendingIter = pendingBucketIter
521 // The serialized bucket index key format is:
522 // <bucketindexprefix><parentbucketid><bucketname>
523 prefix := make([]byte, len(bucketIndexPrefix)+4)
524 copy(prefix, bucketIndexPrefix)
525 copy(prefix[len(bucketIndexPrefix):], bucketID)
526 bucketRange := util.BytesPrefix(prefix)
527 keyRange := util.BytesPrefix(bucketID)
529 // Since both keys and buckets are needed from the database,
530 // create an individual iterator for each prefix and then create
531 // a merged iterator from them.
532 dbKeyIter := b.tx.snapshot.NewIterator(keyRange)
533 dbBucketIter := b.tx.snapshot.NewIterator(bucketRange)
534 iters := []iterator.Iterator{dbKeyIter, dbBucketIter}
535 dbIter = iterator.NewMergedIterator(iters,
536 comparer.DefaultComparer, true)
538 // Since both keys and buckets are needed from the pending keys,
539 // create an individual iterator for each prefix and then create
540 // a merged iterator from them.
541 pendingKeyIter := newLdbTreapIter(b.tx, keyRange)
542 pendingBucketIter := newLdbTreapIter(b.tx, bucketRange)
543 iters = []iterator.Iterator{pendingKeyIter, pendingBucketIter}
544 pendingIter = iterator.NewMergedIterator(iters,
545 comparer.DefaultComparer, true)
548 // Create the cursor using the iterators.
549 return &cursor{bucket: b, dbIter: dbIter, pendingIter: pendingIter}
552 // bucket is an internal type used to represent a collection of key/value pairs
553 // and implements the database.Bucket interface.
559 // Enforce bucket implements the database.Bucket interface.
560 var _ database.Bucket = (*bucket)(nil)
562 // bucketIndexKey returns the actual key to use for storing and retrieving a
563 // child bucket in the bucket index. This is required because additional
564 // information is needed to distinguish nested buckets with the same name.
565 func bucketIndexKey(parentID [4]byte, key []byte) []byte {
566 // The serialized bucket index key format is:
567 // <bucketindexprefix><parentbucketid><bucketname>
568 indexKey := make([]byte, len(bucketIndexPrefix)+4+len(key))
569 copy(indexKey, bucketIndexPrefix)
570 copy(indexKey[len(bucketIndexPrefix):], parentID[:])
571 copy(indexKey[len(bucketIndexPrefix)+4:], key)
575 // bucketizedKey returns the actual key to use for storing and retrieving a key
576 // for the provided bucket ID. This is required because bucketizing is handled
577 // through the use of a unique prefix per bucket.
578 func bucketizedKey(bucketID [4]byte, key []byte) []byte {
579 // The serialized block index key format is:
581 bKey := make([]byte, 4+len(key))
582 copy(bKey, bucketID[:])
587 // Bucket retrieves a nested bucket with the given key. Returns nil if
588 // the bucket does not exist.
590 // This function is part of the database.Bucket interface implementation.
591 func (b *bucket) Bucket(key []byte) database.Bucket {
592 // Ensure transaction state is valid.
593 if err := b.tx.checkClosed(); err != nil {
597 // Attempt to fetch the ID for the child bucket. The bucket does not
598 // exist if the bucket index entry does not exist.
599 childID := b.tx.fetchKey(bucketIndexKey(b.id, key))
604 childBucket := &bucket{tx: b.tx}
605 copy(childBucket.id[:], childID)
609 // CreateBucket creates and returns a new nested bucket with the given key.
611 // Returns the following errors as required by the interface contract:
612 // - ErrBucketExists if the bucket already exists
613 // - ErrBucketNameRequired if the key is empty
614 // - ErrIncompatibleValue if the key is otherwise invalid for the particular
616 // - ErrTxNotWritable if attempted against a read-only transaction
617 // - ErrTxClosed if the transaction has already been closed
619 // This function is part of the database.Bucket interface implementation.
620 func (b *bucket) CreateBucket(key []byte) (database.Bucket, error) {
621 // Ensure transaction state is valid.
622 if err := b.tx.checkClosed(); err != nil {
626 // Ensure the transaction is writable.
628 str := "create bucket requires a writable database transaction"
629 return nil, makeDbErr(database.ErrTxNotWritable, str, nil)
632 // Ensure a key was provided.
634 str := "create bucket requires a key"
635 return nil, makeDbErr(database.ErrBucketNameRequired, str, nil)
638 // Ensure bucket does not already exist.
639 bidxKey := bucketIndexKey(b.id, key)
640 if b.tx.hasKey(bidxKey) {
641 str := "bucket already exists"
642 return nil, makeDbErr(database.ErrBucketExists, str, nil)
645 // Find the appropriate next bucket ID to use for the new bucket. In
646 // the case of the special internal block index, keep the fixed ID.
648 if b.id == metadataBucketID && bytes.Equal(key, blockIdxBucketName) {
649 childID = blockIdxBucketID
652 childID, err = b.tx.nextBucketID()
658 // Add the new bucket to the bucket index.
659 if err := b.tx.putKey(bidxKey, childID[:]); err != nil {
660 str := fmt.Sprintf("failed to create bucket with key %q", key)
661 return nil, convertErr(str, err)
663 return &bucket{tx: b.tx, id: childID}, nil
666 // CreateBucketIfNotExists creates and returns a new nested bucket with the
667 // given key if it does not already exist.
669 // Returns the following errors as required by the interface contract:
670 // - ErrBucketNameRequired if the key is empty
671 // - ErrIncompatibleValue if the key is otherwise invalid for the particular
673 // - ErrTxNotWritable if attempted against a read-only transaction
674 // - ErrTxClosed if the transaction has already been closed
676 // This function is part of the database.Bucket interface implementation.
677 func (b *bucket) CreateBucketIfNotExists(key []byte) (database.Bucket, error) {
678 // Ensure transaction state is valid.
679 if err := b.tx.checkClosed(); err != nil {
683 // Ensure the transaction is writable.
685 str := "create bucket requires a writable database transaction"
686 return nil, makeDbErr(database.ErrTxNotWritable, str, nil)
689 // Return existing bucket if it already exists, otherwise create it.
690 if bucket := b.Bucket(key); bucket != nil {
693 return b.CreateBucket(key)
696 // DeleteBucket removes a nested bucket with the given key.
698 // Returns the following errors as required by the interface contract:
699 // - ErrBucketNotFound if the specified bucket does not exist
700 // - ErrTxNotWritable if attempted against a read-only transaction
701 // - ErrTxClosed if the transaction has already been closed
703 // This function is part of the database.Bucket interface implementation.
704 func (b *bucket) DeleteBucket(key []byte) error {
705 // Ensure transaction state is valid.
706 if err := b.tx.checkClosed(); err != nil {
710 // Ensure the transaction is writable.
712 str := "delete bucket requires a writable database transaction"
713 return makeDbErr(database.ErrTxNotWritable, str, nil)
716 // Attempt to fetch the ID for the child bucket. The bucket does not
717 // exist if the bucket index entry does not exist. In the case of the
718 // special internal block index, keep the fixed ID.
719 bidxKey := bucketIndexKey(b.id, key)
720 childID := b.tx.fetchKey(bidxKey)
722 str := fmt.Sprintf("bucket %q does not exist", key)
723 return makeDbErr(database.ErrBucketNotFound, str, nil)
726 // Remove all nested buckets and their keys.
727 childIDs := [][]byte{childID}
728 for len(childIDs) > 0 {
729 childID = childIDs[len(childIDs)-1]
730 childIDs = childIDs[:len(childIDs)-1]
732 // Delete all keys in the nested bucket.
733 keyCursor := newCursor(b, childID, ctKeys)
734 for ok := keyCursor.First(); ok; ok = keyCursor.Next() {
735 b.tx.deleteKey(keyCursor.rawKey(), false)
737 cursorFinalizer(keyCursor)
739 // Iterate through all nested buckets.
740 bucketCursor := newCursor(b, childID, ctBuckets)
741 for ok := bucketCursor.First(); ok; ok = bucketCursor.Next() {
742 // Push the id of the nested bucket onto the stack for
743 // the next iteration.
744 childID := bucketCursor.rawValue()
745 childIDs = append(childIDs, childID)
747 // Remove the nested bucket from the bucket index.
748 b.tx.deleteKey(bucketCursor.rawKey(), false)
750 cursorFinalizer(bucketCursor)
753 // Remove the nested bucket from the bucket index. Any buckets nested
754 // under it were already removed above.
755 b.tx.deleteKey(bidxKey, true)
759 // Cursor returns a new cursor, allowing for iteration over the bucket's
760 // key/value pairs and nested buckets in forward or backward order.
762 // You must seek to a position using the First, Last, or Seek functions before
763 // calling the Next, Prev, Key, or Value functions. Failure to do so will
764 // result in the same return values as an exhausted cursor, which is false for
765 // the Prev and Next functions and nil for Key and Value functions.
767 // This function is part of the database.Bucket interface implementation.
768 func (b *bucket) Cursor() database.Cursor {
769 // Ensure transaction state is valid.
770 if err := b.tx.checkClosed(); err != nil {
771 return &cursor{bucket: b}
774 // Create the cursor and setup a runtime finalizer to ensure the
775 // iterators are released when the cursor is garbage collected.
776 c := newCursor(b, b.id[:], ctFull)
777 runtime.SetFinalizer(c, cursorFinalizer)
781 // ForEach invokes the passed function with every key/value pair in the bucket.
782 // This does not include nested buckets or the key/value pairs within those
785 // WARNING: It is not safe to mutate data while iterating with this method.
786 // Doing so may cause the underlying cursor to be invalidated and return
787 // unexpected keys and/or values.
789 // Returns the following errors as required by the interface contract:
790 // - ErrTxClosed if the transaction has already been closed
792 // NOTE: The values returned by this function are only valid during a
793 // transaction. Attempting to access them after a transaction has ended will
794 // likely result in an access violation.
796 // This function is part of the database.Bucket interface implementation.
797 func (b *bucket) ForEach(fn func(k, v []byte) error) error {
798 // Ensure transaction state is valid.
799 if err := b.tx.checkClosed(); err != nil {
803 // Invoke the callback for each cursor item. Return the error returned
804 // from the callback when it is non-nil.
805 c := newCursor(b, b.id[:], ctKeys)
806 defer cursorFinalizer(c)
807 for ok := c.First(); ok; ok = c.Next() {
808 err := fn(c.Key(), c.Value())
817 // ForEachBucket invokes the passed function with the key of every nested bucket
818 // in the current bucket. This does not include any nested buckets within those
821 // WARNING: It is not safe to mutate data while iterating with this method.
822 // Doing so may cause the underlying cursor to be invalidated and return
825 // Returns the following errors as required by the interface contract:
826 // - ErrTxClosed if the transaction has already been closed
828 // NOTE: The values returned by this function are only valid during a
829 // transaction. Attempting to access them after a transaction has ended will
830 // likely result in an access violation.
832 // This function is part of the database.Bucket interface implementation.
833 func (b *bucket) ForEachBucket(fn func(k []byte) error) error {
834 // Ensure transaction state is valid.
835 if err := b.tx.checkClosed(); err != nil {
839 // Invoke the callback for each cursor item. Return the error returned
840 // from the callback when it is non-nil.
841 c := newCursor(b, b.id[:], ctBuckets)
842 defer cursorFinalizer(c)
843 for ok := c.First(); ok; ok = c.Next() {
853 // Writable returns whether or not the bucket is writable.
855 // This function is part of the database.Bucket interface implementation.
856 func (b *bucket) Writable() bool {
860 // Put saves the specified key/value pair to the bucket. Keys that do not
861 // already exist are added and keys that already exist are overwritten.
863 // Returns the following errors as required by the interface contract:
864 // - ErrKeyRequired if the key is empty
865 // - ErrIncompatibleValue if the key is the same as an existing bucket
866 // - ErrTxNotWritable if attempted against a read-only transaction
867 // - ErrTxClosed if the transaction has already been closed
869 // This function is part of the database.Bucket interface implementation.
870 func (b *bucket) Put(key, value []byte) error {
871 // Ensure transaction state is valid.
872 if err := b.tx.checkClosed(); err != nil {
876 // Ensure the transaction is writable.
878 str := "setting a key requires a writable database transaction"
879 return makeDbErr(database.ErrTxNotWritable, str, nil)
882 // Ensure a key was provided.
884 str := "put requires a key"
885 return makeDbErr(database.ErrKeyRequired, str, nil)
888 return b.tx.putKey(bucketizedKey(b.id, key), value)
891 // Get returns the value for the given key. Returns nil if the key does not
892 // exist in this bucket. An empty slice is returned for keys that exist but
893 // have no value assigned.
895 // NOTE: The value returned by this function is only valid during a transaction.
896 // Attempting to access it after a transaction has ended results in undefined
897 // behavior. Additionally, the value must NOT be modified by the caller.
899 // This function is part of the database.Bucket interface implementation.
900 func (b *bucket) Get(key []byte) []byte {
901 // Ensure transaction state is valid.
902 if err := b.tx.checkClosed(); err != nil {
906 // Nothing to return if there is no key.
911 return b.tx.fetchKey(bucketizedKey(b.id, key))
914 // Delete removes the specified key from the bucket. Deleting a key that does
915 // not exist does not return an error.
917 // Returns the following errors as required by the interface contract:
918 // - ErrKeyRequired if the key is empty
919 // - ErrIncompatibleValue if the key is the same as an existing bucket
920 // - ErrTxNotWritable if attempted against a read-only transaction
921 // - ErrTxClosed if the transaction has already been closed
923 // This function is part of the database.Bucket interface implementation.
924 func (b *bucket) Delete(key []byte) error {
925 // Ensure transaction state is valid.
926 if err := b.tx.checkClosed(); err != nil {
930 // Ensure the transaction is writable.
932 str := "deleting a value requires a writable database transaction"
933 return makeDbErr(database.ErrTxNotWritable, str, nil)
936 // Nothing to do if there is no key.
941 b.tx.deleteKey(bucketizedKey(b.id, key), true)
945 // pendingBlock houses a block that will be written to disk when the database
946 // transaction is committed.
947 type pendingBlock struct {
952 // transaction represents a database transaction. It can either be read-only or
953 // read-write and implements the database.Bucket interface. The transaction
954 // provides a root bucket against which all read and writes occur.
955 type transaction struct {
956 managed bool // Is the transaction managed?
957 closed bool // Is the transaction closed?
958 writable bool // Is the transaction writable?
959 db *db // DB instance the tx was created from.
960 snapshot *dbCacheSnapshot // Underlying snapshot for txns.
961 metaBucket *bucket // The root metadata bucket.
962 blockIdxBucket *bucket // The block index bucket.
964 // Blocks that need to be stored on commit. The pendingBlocks map is
965 // kept to allow quick lookups of pending data by block hash.
966 pendingBlocks map[chainhash.Hash]int
967 pendingBlockData []pendingBlock
969 // Keys that need to be stored or deleted on commit.
970 pendingKeys *treap.Mutable
971 pendingRemove *treap.Mutable
973 // Active iterators that need to be notified when the pending keys have
974 // been updated so the cursors can properly handle updates to the
975 // transaction state.
976 activeIterLock sync.RWMutex
977 activeIters []*treap.Iterator
980 // Enforce transaction implements the database.Tx interface.
981 var _ database.Tx = (*transaction)(nil)
983 // removeActiveIter removes the passed iterator from the list of active
984 // iterators against the pending keys treap.
985 func (tx *transaction) removeActiveIter(iter *treap.Iterator) {
986 // An indexing for loop is intentionally used over a range here as range
987 // does not reevaluate the slice on each iteration nor does it adjust
988 // the index for the modified slice.
989 tx.activeIterLock.Lock()
990 for i := 0; i < len(tx.activeIters); i++ {
991 if tx.activeIters[i] == iter {
992 copy(tx.activeIters[i:], tx.activeIters[i+1:])
993 tx.activeIters[len(tx.activeIters)-1] = nil
994 tx.activeIters = tx.activeIters[:len(tx.activeIters)-1]
997 tx.activeIterLock.Unlock()
1000 // addActiveIter adds the passed iterator to the list of active iterators for
1001 // the pending keys treap.
1002 func (tx *transaction) addActiveIter(iter *treap.Iterator) {
1003 tx.activeIterLock.Lock()
1004 tx.activeIters = append(tx.activeIters, iter)
1005 tx.activeIterLock.Unlock()
1008 // notifyActiveIters notifies all of the active iterators for the pending keys
1009 // treap that it has been updated.
1010 func (tx *transaction) notifyActiveIters() {
1011 tx.activeIterLock.RLock()
1012 for _, iter := range tx.activeIters {
1015 tx.activeIterLock.RUnlock()
1018 // checkClosed returns an error if the the database or transaction is closed.
1019 func (tx *transaction) checkClosed() error {
1020 // The transaction is no longer valid if it has been closed.
1022 return makeDbErr(database.ErrTxClosed, errTxClosedStr, nil)
1028 // hasKey returns whether or not the provided key exists in the database while
1029 // taking into account the current transaction state.
1030 func (tx *transaction) hasKey(key []byte) bool {
1031 // When the transaction is writable, check the pending transaction
1034 if tx.pendingRemove.Has(key) {
1037 if tx.pendingKeys.Has(key) {
1042 // Consult the database cache and underlying database.
1043 return tx.snapshot.Has(key)
1046 // putKey adds the provided key to the list of keys to be updated in the
1047 // database when the transaction is committed.
1049 // NOTE: This function must only be called on a writable transaction. Since it
1050 // is an internal helper function, it does not check.
1051 func (tx *transaction) putKey(key, value []byte) error {
1052 // Prevent the key from being deleted if it was previously scheduled
1053 // to be deleted on transaction commit.
1054 tx.pendingRemove.Delete(key)
1056 // Add the key/value pair to the list to be written on transaction
1058 tx.pendingKeys.Put(key, value)
1059 tx.notifyActiveIters()
1063 // fetchKey attempts to fetch the provided key from the database cache (and
1064 // hence underlying database) while taking into account the current transaction
1065 // state. Returns nil if the key does not exist.
1066 func (tx *transaction) fetchKey(key []byte) []byte {
1067 // When the transaction is writable, check the pending transaction
1070 if tx.pendingRemove.Has(key) {
1073 if value := tx.pendingKeys.Get(key); value != nil {
1078 // Consult the database cache and underlying database.
1079 return tx.snapshot.Get(key)
1082 // deleteKey adds the provided key to the list of keys to be deleted from the
1083 // database when the transaction is committed. The notify iterators flag is
1084 // useful to delay notifying iterators about the changes during bulk deletes.
1086 // NOTE: This function must only be called on a writable transaction. Since it
1087 // is an internal helper function, it does not check.
1088 func (tx *transaction) deleteKey(key []byte, notifyIterators bool) {
1089 // Remove the key from the list of pendings keys to be written on
1090 // transaction commit if needed.
1091 tx.pendingKeys.Delete(key)
1093 // Add the key to the list to be deleted on transaction commit.
1094 tx.pendingRemove.Put(key, nil)
1096 // Notify the active iterators about the change if the flag is set.
1097 if notifyIterators {
1098 tx.notifyActiveIters()
1102 // nextBucketID returns the next bucket ID to use for creating a new bucket.
1104 // NOTE: This function must only be called on a writable transaction. Since it
1105 // is an internal helper function, it does not check.
1106 func (tx *transaction) nextBucketID() ([4]byte, error) {
1107 // Load the currently highest used bucket ID.
1108 curIDBytes := tx.fetchKey(curBucketIDKeyName)
1109 curBucketNum := binary.BigEndian.Uint32(curIDBytes)
1111 // Increment and update the current bucket ID and return it.
1112 var nextBucketID [4]byte
1113 binary.BigEndian.PutUint32(nextBucketID[:], curBucketNum+1)
1114 if err := tx.putKey(curBucketIDKeyName, nextBucketID[:]); err != nil {
1115 return [4]byte{}, err
1117 return nextBucketID, nil
1120 // Metadata returns the top-most bucket for all metadata storage.
1122 // This function is part of the database.Tx interface implementation.
1123 func (tx *transaction) Metadata() database.Bucket {
1124 return tx.metaBucket
1127 // hasBlock returns whether or not a block with the given hash exists.
1128 func (tx *transaction) hasBlock(hash *chainhash.Hash) bool {
1129 // Return true if the block is pending to be written on commit since
1130 // it exists from the viewpoint of this transaction.
1131 if _, exists := tx.pendingBlocks[*hash]; exists {
1135 return tx.hasKey(bucketizedKey(blockIdxBucketID, hash[:]))
1138 // StoreBlock stores the provided block into the database. There are no checks
1139 // to ensure the block connects to a previous block, contains double spends, or
1140 // any additional functionality such as transaction indexing. It simply stores
1141 // the block in the database.
1143 // Returns the following errors as required by the interface contract:
1144 // - ErrBlockExists when the block hash already exists
1145 // - ErrTxNotWritable if attempted against a read-only transaction
1146 // - ErrTxClosed if the transaction has already been closed
1148 // This function is part of the database.Tx interface implementation.
1149 func (tx *transaction) StoreBlock(block *btcutil.Block) error {
1150 // Ensure transaction state is valid.
1151 if err := tx.checkClosed(); err != nil {
1155 // Ensure the transaction is writable.
1157 str := "store block requires a writable database transaction"
1158 return makeDbErr(database.ErrTxNotWritable, str, nil)
1161 // Reject the block if it already exists.
1162 blockHash := block.Hash()
1163 if tx.hasBlock(blockHash) {
1164 str := fmt.Sprintf("block %s already exists", blockHash)
1165 return makeDbErr(database.ErrBlockExists, str, nil)
1168 blockBytes, err := block.Bytes()
1170 str := fmt.Sprintf("failed to get serialized bytes for block %s",
1172 return makeDbErr(database.ErrDriverSpecific, str, err)
1175 // Add the block to be stored to the list of pending blocks to store
1176 // when the transaction is committed. Also, add it to pending blocks
1177 // map so it is easy to determine the block is pending based on the
1179 if tx.pendingBlocks == nil {
1180 tx.pendingBlocks = make(map[chainhash.Hash]int)
1182 tx.pendingBlocks[*blockHash] = len(tx.pendingBlockData)
1183 tx.pendingBlockData = append(tx.pendingBlockData, pendingBlock{
1187 log.Tracef("Added block %s to pending blocks", blockHash)
1192 // HasBlock returns whether or not a block with the given hash exists in the
1195 // Returns the following errors as required by the interface contract:
1196 // - ErrTxClosed if the transaction has already been closed
1198 // This function is part of the database.Tx interface implementation.
1199 func (tx *transaction) HasBlock(hash *chainhash.Hash) (bool, error) {
1200 // Ensure transaction state is valid.
1201 if err := tx.checkClosed(); err != nil {
1205 return tx.hasBlock(hash), nil
1208 // HasBlocks returns whether or not the blocks with the provided hashes
1209 // exist in the database.
1211 // Returns the following errors as required by the interface contract:
1212 // - ErrTxClosed if the transaction has already been closed
1214 // This function is part of the database.Tx interface implementation.
1215 func (tx *transaction) HasBlocks(hashes []chainhash.Hash) ([]bool, error) {
1216 // Ensure transaction state is valid.
1217 if err := tx.checkClosed(); err != nil {
1221 results := make([]bool, len(hashes))
1222 for i := range hashes {
1223 results[i] = tx.hasBlock(&hashes[i])
1229 // fetchBlockRow fetches the metadata stored in the block index for the provided
1230 // hash. It will return ErrBlockNotFound if there is no entry.
1231 func (tx *transaction) fetchBlockRow(hash *chainhash.Hash) ([]byte, error) {
1232 blockRow := tx.blockIdxBucket.Get(hash[:])
1233 if blockRow == nil {
1234 str := fmt.Sprintf("block %s does not exist", hash)
1235 return nil, makeDbErr(database.ErrBlockNotFound, str, nil)
1238 return blockRow, nil
1241 // FetchBlockHeader returns the raw serialized bytes for the block header
1242 // identified by the given hash. The raw bytes are in the format returned by
1243 // Serialize on a wire.BlockHeader.
1245 // Returns the following errors as required by the interface contract:
1246 // - ErrBlockNotFound if the requested block hash does not exist
1247 // - ErrTxClosed if the transaction has already been closed
1248 // - ErrCorruption if the database has somehow become corrupted
1250 // NOTE: The data returned by this function is only valid during a
1251 // database transaction. Attempting to access it after a transaction
1252 // has ended results in undefined behavior. This constraint prevents
1253 // additional data copies and allows support for memory-mapped database
1256 // This function is part of the database.Tx interface implementation.
1257 func (tx *transaction) FetchBlockHeader(hash *chainhash.Hash) ([]byte, error) {
1258 // Ensure transaction state is valid.
1259 if err := tx.checkClosed(); err != nil {
1263 // When the block is pending to be written on commit return the bytes
1265 if idx, exists := tx.pendingBlocks[*hash]; exists {
1266 blockBytes := tx.pendingBlockData[idx].bytes
1267 return blockBytes[0:blockHdrSize:blockHdrSize], nil
1270 // Fetch the block index row and slice off the header. Notice the use
1271 // of the cap on the subslice to prevent the caller from accidentally
1272 // appending into the db data.
1273 blockRow, err := tx.fetchBlockRow(hash)
1277 endOffset := blockLocSize + blockHdrSize
1278 return blockRow[blockLocSize:endOffset:endOffset], nil
1281 // FetchBlockHeaders returns the raw serialized bytes for the block headers
1282 // identified by the given hashes. The raw bytes are in the format returned by
1283 // Serialize on a wire.BlockHeader.
1285 // Returns the following errors as required by the interface contract:
1286 // - ErrBlockNotFound if the any of the requested block hashes do not exist
1287 // - ErrTxClosed if the transaction has already been closed
1288 // - ErrCorruption if the database has somehow become corrupted
1290 // NOTE: The data returned by this function is only valid during a database
1291 // transaction. Attempting to access it after a transaction has ended results
1292 // in undefined behavior. This constraint prevents additional data copies and
1293 // allows support for memory-mapped database implementations.
1295 // This function is part of the database.Tx interface implementation.
1296 func (tx *transaction) FetchBlockHeaders(hashes []chainhash.Hash) ([][]byte, error) {
1297 // Ensure transaction state is valid.
1298 if err := tx.checkClosed(); err != nil {
1302 // NOTE: This could check for the existence of all blocks before loading
1303 // any of the headers which would be faster in the failure case, however
1304 // callers will not typically be calling this function with invalid
1305 // values, so optimize for the common case.
1307 // Load the headers.
1308 headers := make([][]byte, len(hashes))
1309 for i := range hashes {
1312 // When the block is pending to be written on commit return the
1313 // bytes from there.
1314 if idx, exists := tx.pendingBlocks[*hash]; exists {
1315 blkBytes := tx.pendingBlockData[idx].bytes
1316 headers[i] = blkBytes[0:blockHdrSize:blockHdrSize]
1320 // Fetch the block index row and slice off the header. Notice
1321 // the use of the cap on the subslice to prevent the caller
1322 // from accidentally appending into the db data.
1323 blockRow, err := tx.fetchBlockRow(hash)
1327 endOffset := blockLocSize + blockHdrSize
1328 headers[i] = blockRow[blockLocSize:endOffset:endOffset]
1334 // FetchBlock returns the raw serialized bytes for the block identified by the
1335 // given hash. The raw bytes are in the format returned by Serialize on a
1338 // Returns the following errors as required by the interface contract:
1339 // - ErrBlockNotFound if the requested block hash does not exist
1340 // - ErrTxClosed if the transaction has already been closed
1341 // - ErrCorruption if the database has somehow become corrupted
1343 // In addition, returns ErrDriverSpecific if any failures occur when reading the
1346 // NOTE: The data returned by this function is only valid during a database
1347 // transaction. Attempting to access it after a transaction has ended results
1348 // in undefined behavior. This constraint prevents additional data copies and
1349 // allows support for memory-mapped database implementations.
1351 // This function is part of the database.Tx interface implementation.
1352 func (tx *transaction) FetchBlock(hash *chainhash.Hash) ([]byte, error) {
1353 // Ensure transaction state is valid.
1354 if err := tx.checkClosed(); err != nil {
1358 // When the block is pending to be written on commit return the bytes
1360 if idx, exists := tx.pendingBlocks[*hash]; exists {
1361 return tx.pendingBlockData[idx].bytes, nil
1364 // Lookup the location of the block in the files from the block index.
1365 blockRow, err := tx.fetchBlockRow(hash)
1369 location := deserializeBlockLoc(blockRow)
1371 // Read the block from the appropriate location. The function also
1372 // performs a checksum over the data to detect data corruption.
1373 blockBytes, err := tx.db.store.readBlock(hash, location)
1378 return blockBytes, nil
1381 // FetchBlocks returns the raw serialized bytes for the blocks identified by the
1382 // given hashes. The raw bytes are in the format returned by Serialize on a
1385 // Returns the following errors as required by the interface contract:
1386 // - ErrBlockNotFound if any of the requested block hashed do not exist
1387 // - ErrTxClosed if the transaction has already been closed
1388 // - ErrCorruption if the database has somehow become corrupted
1390 // In addition, returns ErrDriverSpecific if any failures occur when reading the
1393 // NOTE: The data returned by this function is only valid during a database
1394 // transaction. Attempting to access it after a transaction has ended results
1395 // in undefined behavior. This constraint prevents additional data copies and
1396 // allows support for memory-mapped database implementations.
1398 // This function is part of the database.Tx interface implementation.
1399 func (tx *transaction) FetchBlocks(hashes []chainhash.Hash) ([][]byte, error) {
1400 // Ensure transaction state is valid.
1401 if err := tx.checkClosed(); err != nil {
1405 // NOTE: This could check for the existence of all blocks before loading
1406 // any of them which would be faster in the failure case, however
1407 // callers will not typically be calling this function with invalid
1408 // values, so optimize for the common case.
1411 blocks := make([][]byte, len(hashes))
1412 for i := range hashes {
1414 blocks[i], err = tx.FetchBlock(&hashes[i])
1423 // fetchPendingRegion attempts to fetch the provided region from any block which
1424 // are pending to be written on commit. It will return nil for the byte slice
1425 // when the region references a block which is not pending. When the region
1426 // does reference a pending block, it is bounds checked and returns
1427 // ErrBlockRegionInvalid if invalid.
1428 func (tx *transaction) fetchPendingRegion(region *database.BlockRegion) ([]byte, error) {
1429 // Nothing to do if the block is not pending to be written on commit.
1430 idx, exists := tx.pendingBlocks[*region.Hash]
1435 // Ensure the region is within the bounds of the block.
1436 blockBytes := tx.pendingBlockData[idx].bytes
1437 blockLen := uint32(len(blockBytes))
1438 endOffset := region.Offset + region.Len
1439 if endOffset < region.Offset || endOffset > blockLen {
1440 str := fmt.Sprintf("block %s region offset %d, length %d "+
1441 "exceeds block length of %d", region.Hash,
1442 region.Offset, region.Len, blockLen)
1443 return nil, makeDbErr(database.ErrBlockRegionInvalid, str, nil)
1446 // Return the bytes from the pending block.
1447 return blockBytes[region.Offset:endOffset:endOffset], nil
1450 // FetchBlockRegion returns the raw serialized bytes for the given block region.
1452 // For example, it is possible to directly extract Bitcoin transactions and/or
1453 // scripts from a block with this function. Depending on the backend
1454 // implementation, this can provide significant savings by avoiding the need to
1455 // load entire blocks.
1457 // The raw bytes are in the format returned by Serialize on a wire.MsgBlock and
1458 // the Offset field in the provided BlockRegion is zero-based and relative to
1459 // the start of the block (byte 0).
1461 // Returns the following errors as required by the interface contract:
1462 // - ErrBlockNotFound if the requested block hash does not exist
1463 // - ErrBlockRegionInvalid if the region exceeds the bounds of the associated
1465 // - ErrTxClosed if the transaction has already been closed
1466 // - ErrCorruption if the database has somehow become corrupted
1468 // In addition, returns ErrDriverSpecific if any failures occur when reading the
1471 // NOTE: The data returned by this function is only valid during a database
1472 // transaction. Attempting to access it after a transaction has ended results
1473 // in undefined behavior. This constraint prevents additional data copies and
1474 // allows support for memory-mapped database implementations.
1476 // This function is part of the database.Tx interface implementation.
1477 func (tx *transaction) FetchBlockRegion(region *database.BlockRegion) ([]byte, error) {
1478 // Ensure transaction state is valid.
1479 if err := tx.checkClosed(); err != nil {
1483 // When the block is pending to be written on commit return the bytes
1485 if tx.pendingBlocks != nil {
1486 regionBytes, err := tx.fetchPendingRegion(region)
1490 if regionBytes != nil {
1491 return regionBytes, nil
1495 // Lookup the location of the block in the files from the block index.
1496 blockRow, err := tx.fetchBlockRow(region.Hash)
1500 location := deserializeBlockLoc(blockRow)
1502 // Ensure the region is within the bounds of the block.
1503 endOffset := region.Offset + region.Len
1504 if endOffset < region.Offset || endOffset > location.blockLen {
1505 str := fmt.Sprintf("block %s region offset %d, length %d "+
1506 "exceeds block length of %d", region.Hash,
1507 region.Offset, region.Len, location.blockLen)
1508 return nil, makeDbErr(database.ErrBlockRegionInvalid, str, nil)
1512 // Read the region from the appropriate disk block file.
1513 regionBytes, err := tx.db.store.readBlockRegion(location, region.Offset,
1519 return regionBytes, nil
1522 // FetchBlockRegions returns the raw serialized bytes for the given block
1525 // For example, it is possible to directly extract Bitcoin transactions and/or
1526 // scripts from various blocks with this function. Depending on the backend
1527 // implementation, this can provide significant savings by avoiding the need to
1528 // load entire blocks.
1530 // The raw bytes are in the format returned by Serialize on a wire.MsgBlock and
1531 // the Offset fields in the provided BlockRegions are zero-based and relative to
1532 // the start of the block (byte 0).
1534 // Returns the following errors as required by the interface contract:
1535 // - ErrBlockNotFound if any of the request block hashes do not exist
1536 // - ErrBlockRegionInvalid if one or more region exceed the bounds of the
1538 // - ErrTxClosed if the transaction has already been closed
1539 // - ErrCorruption if the database has somehow become corrupted
1541 // In addition, returns ErrDriverSpecific if any failures occur when reading the
1544 // NOTE: The data returned by this function is only valid during a database
1545 // transaction. Attempting to access it after a transaction has ended results
1546 // in undefined behavior. This constraint prevents additional data copies and
1547 // allows support for memory-mapped database implementations.
1549 // This function is part of the database.Tx interface implementation.
1550 func (tx *transaction) FetchBlockRegions(regions []database.BlockRegion) ([][]byte, error) {
1551 // Ensure transaction state is valid.
1552 if err := tx.checkClosed(); err != nil {
1556 // NOTE: This could check for the existence of all blocks before
1557 // deserializing the locations and building up the fetch list which
1558 // would be faster in the failure case, however callers will not
1559 // typically be calling this function with invalid values, so optimize
1560 // for the common case.
1562 // NOTE: A potential optimization here would be to combine adjacent
1563 // regions to reduce the number of reads.
1565 // In order to improve efficiency of loading the bulk data, first grab
1566 // the block location for all of the requested block hashes and sort
1567 // the reads by filenum:offset so that all reads are grouped by file
1568 // and linear within each file. This can result in quite a significant
1569 // performance increase depending on how spread out the requested hashes
1570 // are by reducing the number of file open/closes and random accesses
1571 // needed. The fetchList is intentionally allocated with a cap because
1572 // some of the regions might be fetched from the pending blocks and
1573 // hence there is no need to fetch those from disk.
1574 blockRegions := make([][]byte, len(regions))
1575 fetchList := make([]bulkFetchData, 0, len(regions))
1576 for i := range regions {
1577 region := ®ions[i]
1579 // When the block is pending to be written on commit grab the
1580 // bytes from there.
1581 if tx.pendingBlocks != nil {
1582 regionBytes, err := tx.fetchPendingRegion(region)
1586 if regionBytes != nil {
1587 blockRegions[i] = regionBytes
1592 // Lookup the location of the block in the files from the block
1594 blockRow, err := tx.fetchBlockRow(region.Hash)
1598 location := deserializeBlockLoc(blockRow)
1600 // Ensure the region is within the bounds of the block.
1601 endOffset := region.Offset + region.Len
1602 if endOffset < region.Offset || endOffset > location.blockLen {
1603 str := fmt.Sprintf("block %s region offset %d, length "+
1604 "%d exceeds block length of %d", region.Hash,
1605 region.Offset, region.Len, location.blockLen)
1606 return nil, makeDbErr(database.ErrBlockRegionInvalid, str, nil)
1609 fetchList = append(fetchList, bulkFetchData{&location, i})
1611 sort.Sort(bulkFetchDataSorter(fetchList))
1613 // Read all of the regions in the fetch list and set the results.
1614 for i := range fetchList {
1615 fetchData := &fetchList[i]
1616 ri := fetchData.replyIndex
1617 region := ®ions[ri]
1618 location := fetchData.blockLocation
1619 regionBytes, err := tx.db.store.readBlockRegion(*location,
1620 region.Offset, region.Len)
1624 blockRegions[ri] = regionBytes
1627 return blockRegions, nil
1630 // close marks the transaction closed then releases any pending data, the
1631 // underlying snapshot, the transaction read lock, and the write lock when the
1632 // transaction is writable.
1633 func (tx *transaction) close() {
1636 // Clear pending blocks that would have been written on commit.
1637 tx.pendingBlocks = nil
1638 tx.pendingBlockData = nil
1640 // Clear pending keys that would have been written or deleted on commit.
1641 tx.pendingKeys = nil
1642 tx.pendingRemove = nil
1644 // Release the snapshot.
1645 if tx.snapshot != nil {
1646 tx.snapshot.Release()
1650 tx.db.closeLock.RUnlock()
1652 // Release the writer lock for writable transactions to unblock any
1653 // other write transaction which are possibly waiting.
1655 tx.db.writeLock.Unlock()
1659 // serializeBlockRow serializes a block row into a format suitable for storage
1660 // into the block index.
1661 func serializeBlockRow(blockLoc blockLocation, blockHdr []byte) []byte {
1662 // The serialized block index row format is:
1664 // [0:blockLocSize] Block location
1665 // [blockLocSize:blockLocSize+blockHdrSize] Block header
1666 serializedRow := make([]byte, blockLocSize+blockHdrSize)
1667 copy(serializedRow, serializeBlockLoc(blockLoc))
1668 copy(serializedRow[blockHdrOffset:], blockHdr)
1669 return serializedRow
1672 // writePendingAndCommit writes pending block data to the flat block files,
1673 // updates the metadata with their locations as well as the new current write
1674 // location, and commits the metadata to the memory database cache. It also
1675 // properly handles rollback in the case of failures.
1677 // This function MUST only be called when there is pending data to be written.
1678 func (tx *transaction) writePendingAndCommit() error {
1679 // Save the current block store write position for potential rollback.
1680 // These variables are only updated here in this function and there can
1681 // only be one write transaction active at a time, so it's safe to store
1682 // them for potential rollback.
1683 wc := tx.db.store.writeCursor
1685 oldBlkFileNum := wc.curFileNum
1686 oldBlkOffset := wc.curOffset
1689 // rollback is a closure that is used to rollback all writes to the
1691 rollback := func() {
1692 // Rollback any modifications made to the block files if needed.
1693 tx.db.store.handleRollback(oldBlkFileNum, oldBlkOffset)
1696 // Loop through all of the pending blocks to store and write them.
1697 for _, blockData := range tx.pendingBlockData {
1698 log.Tracef("Storing block %s", blockData.hash)
1699 location, err := tx.db.store.writeBlock(blockData.bytes)
1705 // Add a record in the block index for the block. The record
1706 // includes the location information needed to locate the block
1707 // on the filesystem as well as the block header since they are
1708 // so commonly needed.
1709 blockHdr := blockData.bytes[0:blockHdrSize]
1710 blockRow := serializeBlockRow(location, blockHdr)
1711 err = tx.blockIdxBucket.Put(blockData.hash[:], blockRow)
1718 // Update the metadata for the current write file and offset.
1719 writeRow := serializeWriteRow(wc.curFileNum, wc.curOffset)
1720 if err := tx.metaBucket.Put(writeLocKeyName, writeRow); err != nil {
1722 return convertErr("failed to store write cursor", err)
1725 // Atomically update the database cache. The cache automatically
1726 // handles flushing to the underlying persistent storage database.
1727 return tx.db.cache.commitTx(tx)
1730 // Commit commits all changes that have been made to the root metadata bucket
1731 // and all of its sub-buckets to the database cache which is periodically synced
1732 // to persistent storage. In addition, it commits all new blocks directly to
1733 // persistent storage bypassing the db cache. Blocks can be rather large, so
1734 // this help increase the amount of cache available for the metadata updates and
1735 // is safe since blocks are immutable.
1737 // This function is part of the database.Tx interface implementation.
1738 func (tx *transaction) Commit() error {
1739 // Prevent commits on managed transactions.
1742 panic("managed transaction commit not allowed")
1745 // Ensure transaction state is valid.
1746 if err := tx.checkClosed(); err != nil {
1750 // Regardless of whether the commit succeeds, the transaction is closed
1754 // Ensure the transaction is writable.
1756 str := "Commit requires a writable database transaction"
1757 return makeDbErr(database.ErrTxNotWritable, str, nil)
1760 // Write pending data. The function will rollback if any errors occur.
1761 return tx.writePendingAndCommit()
1764 // Rollback undoes all changes that have been made to the root bucket and all of
1767 // This function is part of the database.Tx interface implementation.
1768 func (tx *transaction) Rollback() error {
1769 // Prevent rollbacks on managed transactions.
1772 panic("managed transaction rollback not allowed")
1775 // Ensure transaction state is valid.
1776 if err := tx.checkClosed(); err != nil {
1784 // db represents a collection of namespaces which are persisted and implements
1785 // the database.DB interface. All database access is performed through
1786 // transactions which are obtained through the specific Namespace.
1788 writeLock sync.Mutex // Limit to one write transaction at a time.
1789 closeLock sync.RWMutex // Make database close block while txns active.
1790 closed bool // Is the database closed?
1791 store *blockStore // Handles read/writing blocks to flat files.
1792 cache *dbCache // Cache layer which wraps underlying leveldb DB.
1795 // Enforce db implements the database.DB interface.
1796 var _ database.DB = (*db)(nil)
1798 // Type returns the database driver type the current database instance was
1801 // This function is part of the database.DB interface implementation.
1802 func (db *db) Type() string {
1806 // begin is the implementation function for the Begin database method. See its
1807 // documentation for more details.
1809 // This function is only separate because it returns the internal transaction
1810 // which is used by the managed transaction code while the database method
1811 // returns the interface.
1812 func (db *db) begin(writable bool) (*transaction, error) {
1813 // Whenever a new writable transaction is started, grab the write lock
1814 // to ensure only a single write transaction can be active at the same
1815 // time. This lock will not be released until the transaction is
1816 // closed (via Rollback or Commit).
1821 // Whenever a new transaction is started, grab a read lock against the
1822 // database to ensure Close will wait for the transaction to finish.
1823 // This lock will not be released until the transaction is closed (via
1824 // Rollback or Commit).
1825 db.closeLock.RLock()
1827 db.closeLock.RUnlock()
1829 db.writeLock.Unlock()
1831 return nil, makeDbErr(database.ErrDbNotOpen, errDbNotOpenStr,
1835 // Grab a snapshot of the database cache (which in turn also handles the
1836 // underlying database).
1837 snapshot, err := db.cache.Snapshot()
1839 db.closeLock.RUnlock()
1841 db.writeLock.Unlock()
1847 // The metadata and block index buckets are internal-only buckets, so
1848 // they have defined IDs.
1853 pendingKeys: treap.NewMutable(),
1854 pendingRemove: treap.NewMutable(),
1856 tx.metaBucket = &bucket{tx: tx, id: metadataBucketID}
1857 tx.blockIdxBucket = &bucket{tx: tx, id: blockIdxBucketID}
1861 // Begin starts a transaction which is either read-only or read-write depending
1862 // on the specified flag. Multiple read-only transactions can be started
1863 // simultaneously while only a single read-write transaction can be started at a
1864 // time. The call will block when starting a read-write transaction when one is
1867 // NOTE: The transaction must be closed by calling Rollback or Commit on it when
1868 // it is no longer needed. Failure to do so will result in unclaimed memory.
1870 // This function is part of the database.DB interface implementation.
1871 func (db *db) Begin(writable bool) (database.Tx, error) {
1872 return db.begin(writable)
1875 // rollbackOnPanic rolls the passed transaction back if the code in the calling
1876 // function panics. This is needed since the mutex on a transaction must be
1877 // released and a panic in called code would prevent that from happening.
1879 // NOTE: This can only be handled manually for managed transactions since they
1880 // control the life-cycle of the transaction. As the documentation on Begin
1881 // calls out, callers opting to use manual transactions will have to ensure the
1882 // transaction is rolled back on panic if it desires that functionality as well
1883 // or the database will fail to close since the read-lock will never be
1885 func rollbackOnPanic(tx *transaction) {
1886 if err := recover(); err != nil {
1893 // View invokes the passed function in the context of a managed read-only
1894 // transaction with the root bucket for the namespace. Any errors returned from
1895 // the user-supplied function are returned from this function.
1897 // This function is part of the database.DB interface implementation.
1898 func (db *db) View(fn func(database.Tx) error) error {
1899 // Start a read-only transaction.
1900 tx, err := db.begin(false)
1905 // Since the user-provided function might panic, ensure the transaction
1906 // releases all mutexes and resources. There is no guarantee the caller
1907 // won't use recover and keep going. Thus, the database must still be
1908 // in a usable state on panics due to caller issues.
1909 defer rollbackOnPanic(tx)
1915 // The error is ignored here because nothing was written yet
1916 // and regardless of a rollback failure, the tx is closed now
1922 return tx.Rollback()
1925 // Update invokes the passed function in the context of a managed read-write
1926 // transaction with the root bucket for the namespace. Any errors returned from
1927 // the user-supplied function will cause the transaction to be rolled back and
1928 // are returned from this function. Otherwise, the transaction is committed
1929 // when the user-supplied function returns a nil error.
1931 // This function is part of the database.DB interface implementation.
1932 func (db *db) Update(fn func(database.Tx) error) error {
1933 // Start a read-write transaction.
1934 tx, err := db.begin(true)
1939 // Since the user-provided function might panic, ensure the transaction
1940 // releases all mutexes and resources. There is no guarantee the caller
1941 // won't use recover and keep going. Thus, the database must still be
1942 // in a usable state on panics due to caller issues.
1943 defer rollbackOnPanic(tx)
1949 // The error is ignored here because nothing was written yet
1950 // and regardless of a rollback failure, the tx is closed now
1959 // Close cleanly shuts down the database and syncs all data. It will block
1960 // until all database transactions have been finalized (rolled back or
1963 // This function is part of the database.DB interface implementation.
1964 func (db *db) Close() error {
1965 // Since all transactions have a read lock on this mutex, this will
1966 // cause Close to wait for all readers to complete.
1968 defer db.closeLock.Unlock()
1971 return makeDbErr(database.ErrDbNotOpen, errDbNotOpenStr, nil)
1975 // NOTE: Since the above lock waits for all transactions to finish and
1976 // prevents any new ones from being started, it is safe to flush the
1977 // cache and clear all state without the individual locks.
1979 // Close the database cache which will flush any existing entries to
1980 // disk and close the underlying leveldb database. Any error is saved
1981 // and returned at the end after the remaining cleanup since the
1982 // database will be marked closed even if this fails given there is no
1983 // good way for the caller to recover from a failure here anyways.
1984 closeErr := db.cache.Close()
1986 // Close any open flat files that house the blocks.
1987 wc := db.store.writeCursor
1988 if wc.curFile.file != nil {
1989 _ = wc.curFile.file.Close()
1990 wc.curFile.file = nil
1992 for _, blockFile := range db.store.openBlockFiles {
1993 _ = blockFile.file.Close()
1995 db.store.openBlockFiles = nil
1996 db.store.openBlocksLRU.Init()
1997 db.store.fileNumToLRUElem = nil
2002 // filesExists reports whether the named file or directory exists.
2003 func fileExists(name string) bool {
2004 if _, err := os.Stat(name); err != nil {
2005 if os.IsNotExist(err) {
2012 // initDB creates the initial buckets and values used by the package. This is
2013 // mainly in a separate function for testing purposes.
2014 func initDB(ldb *leveldb.DB) error {
2015 // The starting block file write cursor location is file num 0, offset
2017 batch := new(leveldb.Batch)
2018 batch.Put(bucketizedKey(metadataBucketID, writeLocKeyName),
2019 serializeWriteRow(0, 0))
2021 // Create block index bucket and set the current bucket id.
2023 // NOTE: Since buckets are virtualized through the use of prefixes,
2024 // there is no need to store the bucket index data for the metadata
2025 // bucket in the database. However, the first bucket ID to use does
2026 // need to account for it to ensure there are no key collisions.
2027 batch.Put(bucketIndexKey(metadataBucketID, blockIdxBucketName),
2028 blockIdxBucketID[:])
2029 batch.Put(curBucketIDKeyName, blockIdxBucketID[:])
2031 // Write everything as a single batch.
2032 if err := ldb.Write(batch, nil); err != nil {
2033 str := fmt.Sprintf("failed to initialize metadata database: %v",
2035 return convertErr(str, err)
2041 // openDB opens the database at the provided path. database.ErrDbDoesNotExist
2042 // is returned if the database doesn't exist and the create flag is not set.
2043 func openDB(dbPath string, network wire.BitcoinNet, create bool) (database.DB, error) {
2044 // Error if the database doesn't exist and the create flag is not set.
2045 metadataDbPath := filepath.Join(dbPath, metadataDbName)
2046 dbExists := fileExists(metadataDbPath)
2047 if !create && !dbExists {
2048 str := fmt.Sprintf("database %q does not exist", metadataDbPath)
2049 return nil, makeDbErr(database.ErrDbDoesNotExist, str, nil)
2052 // Ensure the full path to the database exists.
2054 // The error can be ignored here since the call to
2055 // leveldb.OpenFile will fail if the directory couldn't be
2057 _ = os.MkdirAll(dbPath, 0700)
2060 // Open the metadata database (will create it if needed).
2061 opts := opt.Options{
2062 ErrorIfExist: create,
2063 Strict: opt.DefaultStrict,
2064 Compression: opt.NoCompression,
2065 Filter: filter.NewBloomFilter(10),
2067 ldb, err := leveldb.OpenFile(metadataDbPath, &opts)
2069 return nil, convertErr(err.Error(), err)
2072 // Create the block store which includes scanning the existing flat
2073 // block files to find what the current write cursor position is
2074 // according to the data that is actually on disk. Also create the
2075 // database cache which wraps the underlying leveldb database to provide
2077 store := newBlockStore(dbPath, network)
2078 cache := newDbCache(ldb, store, defaultCacheSize, defaultFlushSecs)
2079 pdb := &db{store: store, cache: cache}
2081 // Perform any reconciliation needed between the block and metadata as
2082 // well as database initialization, if needed.
2083 return reconcileDB(pdb, create)