+++ /dev/null
-// Copyright (c) 2015-2016 The btcsuite developers
-// Use of this source code is governed by an ISC
-// license that can be found in the LICENSE file.
-
-package ffldb
-
-import (
- "bytes"
- "encoding/binary"
- "fmt"
- "os"
- "path/filepath"
- "runtime"
- "sort"
- "sync"
-
- "github.com/btcsuite/btcd/chaincfg/chainhash"
- "github.com/btcsuite/btcd/database"
- "github.com/btcsuite/btcd/database/internal/treap"
- "github.com/btcsuite/btcd/wire"
- "github.com/btcsuite/btcutil"
- "github.com/btcsuite/goleveldb/leveldb"
- "github.com/btcsuite/goleveldb/leveldb/comparer"
- ldberrors "github.com/btcsuite/goleveldb/leveldb/errors"
- "github.com/btcsuite/goleveldb/leveldb/filter"
- "github.com/btcsuite/goleveldb/leveldb/iterator"
- "github.com/btcsuite/goleveldb/leveldb/opt"
- "github.com/btcsuite/goleveldb/leveldb/util"
-)
-
-const (
- // metadataDbName is the name used for the metadata database.
- metadataDbName = "metadata"
-
- // blockHdrSize is the size of a block header. This is simply the
- // constant from wire and is only provided here for convenience since
- // wire.MaxBlockHeaderPayload is quite long.
- blockHdrSize = wire.MaxBlockHeaderPayload
-
- // blockHdrOffset defines the offsets into a block index row for the
- // block header.
- //
- // The serialized block index row format is:
- // <blocklocation><blockheader>
- blockHdrOffset = blockLocSize
-)
-
-var (
- // byteOrder is the preferred byte order used through the database and
- // block files. Sometimes big endian will be used to allow ordered byte
- // sortable integer values.
- byteOrder = binary.LittleEndian
-
- // bucketIndexPrefix is the prefix used for all entries in the bucket
- // index.
- bucketIndexPrefix = []byte("bidx")
-
- // curBucketIDKeyName is the name of the key used to keep track of the
- // current bucket ID counter.
- curBucketIDKeyName = []byte("bidx-cbid")
-
- // metadataBucketID is the ID of the top-level metadata bucket.
- // It is the value 0 encoded as an unsigned big-endian uint32.
- metadataBucketID = [4]byte{}
-
- // blockIdxBucketID is the ID of the internal block metadata bucket.
- // It is the value 1 encoded as an unsigned big-endian uint32.
- blockIdxBucketID = [4]byte{0x00, 0x00, 0x00, 0x01}
-
- // blockIdxBucketName is the bucket used internally to track block
- // metadata.
- blockIdxBucketName = []byte("ffldb-blockidx")
-
- // writeLocKeyName is the key used to store the current write file
- // location.
- writeLocKeyName = []byte("ffldb-writeloc")
-)
-
-// Common error strings.
-const (
- // errDbNotOpenStr is the text to use for the database.ErrDbNotOpen
- // error code.
- errDbNotOpenStr = "database is not open"
-
- // errTxClosedStr is the text to use for the database.ErrTxClosed error
- // code.
- errTxClosedStr = "database tx is closed"
-)
-
-// bulkFetchData is allows a block location to be specified along with the
-// index it was requested from. This in turn allows the bulk data loading
-// functions to sort the data accesses based on the location to improve
-// performance while keeping track of which result the data is for.
-type bulkFetchData struct {
- *blockLocation
- replyIndex int
-}
-
-// bulkFetchDataSorter implements sort.Interface to allow a slice of
-// bulkFetchData to be sorted. In particular it sorts by file and then
-// offset so that reads from files are grouped and linear.
-type bulkFetchDataSorter []bulkFetchData
-
-// Len returns the number of items in the slice. It is part of the
-// sort.Interface implementation.
-func (s bulkFetchDataSorter) Len() int {
- return len(s)
-}
-
-// Swap swaps the items at the passed indices. It is part of the
-// sort.Interface implementation.
-func (s bulkFetchDataSorter) Swap(i, j int) {
- s[i], s[j] = s[j], s[i]
-}
-
-// Less returns whether the item with index i should sort before the item with
-// index j. It is part of the sort.Interface implementation.
-func (s bulkFetchDataSorter) Less(i, j int) bool {
- if s[i].blockFileNum < s[j].blockFileNum {
- return true
- }
- if s[i].blockFileNum > s[j].blockFileNum {
- return false
- }
-
- return s[i].fileOffset < s[j].fileOffset
-}
-
-// makeDbErr creates a database.Error given a set of arguments.
-func makeDbErr(c database.ErrorCode, desc string, err error) database.Error {
- return database.Error{ErrorCode: c, Description: desc, Err: err}
-}
-
-// convertErr converts the passed leveldb error into a database error with an
-// equivalent error code and the passed description. It also sets the passed
-// error as the underlying error.
-func convertErr(desc string, ldbErr error) database.Error {
- // Use the driver-specific error code by default. The code below will
- // update this with the converted error if it's recognized.
- var code = database.ErrDriverSpecific
-
- switch {
- // Database corruption errors.
- case ldberrors.IsCorrupted(ldbErr):
- code = database.ErrCorruption
-
- // Database open/create errors.
- case ldbErr == leveldb.ErrClosed:
- code = database.ErrDbNotOpen
-
- // Transaction errors.
- case ldbErr == leveldb.ErrSnapshotReleased:
- code = database.ErrTxClosed
- case ldbErr == leveldb.ErrIterReleased:
- code = database.ErrTxClosed
- }
-
- return database.Error{ErrorCode: code, Description: desc, Err: ldbErr}
-}
-
-// copySlice returns a copy of the passed slice. This is mostly used to copy
-// leveldb iterator keys and values since they are only valid until the iterator
-// is moved instead of during the entirety of the transaction.
-func copySlice(slice []byte) []byte {
- ret := make([]byte, len(slice))
- copy(ret, slice)
- return ret
-}
-
-// cursor is an internal type used to represent a cursor over key/value pairs
-// and nested buckets of a bucket and implements the database.Cursor interface.
-type cursor struct {
- bucket *bucket
- dbIter iterator.Iterator
- pendingIter iterator.Iterator
- currentIter iterator.Iterator
-}
-
-// Enforce cursor implements the database.Cursor interface.
-var _ database.Cursor = (*cursor)(nil)
-
-// Bucket returns the bucket the cursor was created for.
-//
-// This function is part of the database.Cursor interface implementation.
-func (c *cursor) Bucket() database.Bucket {
- // Ensure transaction state is valid.
- if err := c.bucket.tx.checkClosed(); err != nil {
- return nil
- }
-
- return c.bucket
-}
-
-// Delete removes the current key/value pair the cursor is at without
-// invalidating the cursor.
-//
-// Returns the following errors as required by the interface contract:
-// - ErrIncompatibleValue if attempted when the cursor points to a nested
-// bucket
-// - ErrTxNotWritable if attempted against a read-only transaction
-// - ErrTxClosed if the transaction has already been closed
-//
-// This function is part of the database.Cursor interface implementation.
-func (c *cursor) Delete() error {
- // Ensure transaction state is valid.
- if err := c.bucket.tx.checkClosed(); err != nil {
- return err
- }
-
- // Error if the cursor is exhausted.
- if c.currentIter == nil {
- str := "cursor is exhausted"
- return makeDbErr(database.ErrIncompatibleValue, str, nil)
- }
-
- // Do not allow buckets to be deleted via the cursor.
- key := c.currentIter.Key()
- if bytes.HasPrefix(key, bucketIndexPrefix) {
- str := "buckets may not be deleted from a cursor"
- return makeDbErr(database.ErrIncompatibleValue, str, nil)
- }
-
- c.bucket.tx.deleteKey(copySlice(key), true)
- return nil
-}
-
-// skipPendingUpdates skips any keys at the current database iterator position
-// that are being updated by the transaction. The forwards flag indicates the
-// direction the cursor is moving.
-func (c *cursor) skipPendingUpdates(forwards bool) {
- for c.dbIter.Valid() {
- var skip bool
- key := c.dbIter.Key()
- if c.bucket.tx.pendingRemove.Has(key) {
- skip = true
- } else if c.bucket.tx.pendingKeys.Has(key) {
- skip = true
- }
- if !skip {
- break
- }
-
- if forwards {
- c.dbIter.Next()
- } else {
- c.dbIter.Prev()
- }
- }
-}
-
-// chooseIterator first skips any entries in the database iterator that are
-// being updated by the transaction and sets the current iterator to the
-// appropriate iterator depending on their validity and the order they compare
-// in while taking into account the direction flag. When the cursor is being
-// moved forwards and both iterators are valid, the iterator with the smaller
-// key is chosen and vice versa when the cursor is being moved backwards.
-func (c *cursor) chooseIterator(forwards bool) bool {
- // Skip any keys at the current database iterator position that are
- // being updated by the transaction.
- c.skipPendingUpdates(forwards)
-
- // When both iterators are exhausted, the cursor is exhausted too.
- if !c.dbIter.Valid() && !c.pendingIter.Valid() {
- c.currentIter = nil
- return false
- }
-
- // Choose the database iterator when the pending keys iterator is
- // exhausted.
- if !c.pendingIter.Valid() {
- c.currentIter = c.dbIter
- return true
- }
-
- // Choose the pending keys iterator when the database iterator is
- // exhausted.
- if !c.dbIter.Valid() {
- c.currentIter = c.pendingIter
- return true
- }
-
- // Both iterators are valid, so choose the iterator with either the
- // smaller or larger key depending on the forwards flag.
- compare := bytes.Compare(c.dbIter.Key(), c.pendingIter.Key())
- if (forwards && compare > 0) || (!forwards && compare < 0) {
- c.currentIter = c.pendingIter
- } else {
- c.currentIter = c.dbIter
- }
- return true
-}
-
-// First positions the cursor at the first key/value pair and returns whether or
-// not the pair exists.
-//
-// This function is part of the database.Cursor interface implementation.
-func (c *cursor) First() bool {
- // Ensure transaction state is valid.
- if err := c.bucket.tx.checkClosed(); err != nil {
- return false
- }
-
- // Seek to the first key in both the database and pending iterators and
- // choose the iterator that is both valid and has the smaller key.
- c.dbIter.First()
- c.pendingIter.First()
- return c.chooseIterator(true)
-}
-
-// Last positions the cursor at the last key/value pair and returns whether or
-// not the pair exists.
-//
-// This function is part of the database.Cursor interface implementation.
-func (c *cursor) Last() bool {
- // Ensure transaction state is valid.
- if err := c.bucket.tx.checkClosed(); err != nil {
- return false
- }
-
- // Seek to the last key in both the database and pending iterators and
- // choose the iterator that is both valid and has the larger key.
- c.dbIter.Last()
- c.pendingIter.Last()
- return c.chooseIterator(false)
-}
-
-// Next moves the cursor one key/value pair forward and returns whether or not
-// the pair exists.
-//
-// This function is part of the database.Cursor interface implementation.
-func (c *cursor) Next() bool {
- // Ensure transaction state is valid.
- if err := c.bucket.tx.checkClosed(); err != nil {
- return false
- }
-
- // Nothing to return if cursor is exhausted.
- if c.currentIter == nil {
- return false
- }
-
- // Move the current iterator to the next entry and choose the iterator
- // that is both valid and has the smaller key.
- c.currentIter.Next()
- return c.chooseIterator(true)
-}
-
-// Prev moves the cursor one key/value pair backward and returns whether or not
-// the pair exists.
-//
-// This function is part of the database.Cursor interface implementation.
-func (c *cursor) Prev() bool {
- // Ensure transaction state is valid.
- if err := c.bucket.tx.checkClosed(); err != nil {
- return false
- }
-
- // Nothing to return if cursor is exhausted.
- if c.currentIter == nil {
- return false
- }
-
- // Move the current iterator to the previous entry and choose the
- // iterator that is both valid and has the larger key.
- c.currentIter.Prev()
- return c.chooseIterator(false)
-}
-
-// Seek positions the cursor at the first key/value pair that is greater than or
-// equal to the passed seek key. Returns false if no suitable key was found.
-//
-// This function is part of the database.Cursor interface implementation.
-func (c *cursor) Seek(seek []byte) bool {
- // Ensure transaction state is valid.
- if err := c.bucket.tx.checkClosed(); err != nil {
- return false
- }
-
- // Seek to the provided key in both the database and pending iterators
- // then choose the iterator that is both valid and has the larger key.
- seekKey := bucketizedKey(c.bucket.id, seek)
- c.dbIter.Seek(seekKey)
- c.pendingIter.Seek(seekKey)
- return c.chooseIterator(true)
-}
-
-// rawKey returns the current key the cursor is pointing to without stripping
-// the current bucket prefix or bucket index prefix.
-func (c *cursor) rawKey() []byte {
- // Nothing to return if cursor is exhausted.
- if c.currentIter == nil {
- return nil
- }
-
- return copySlice(c.currentIter.Key())
-}
-
-// Key returns the current key the cursor is pointing to.
-//
-// This function is part of the database.Cursor interface implementation.
-func (c *cursor) Key() []byte {
- // Ensure transaction state is valid.
- if err := c.bucket.tx.checkClosed(); err != nil {
- return nil
- }
-
- // Nothing to return if cursor is exhausted.
- if c.currentIter == nil {
- return nil
- }
-
- // Slice out the actual key name and make a copy since it is no longer
- // valid after iterating to the next item.
- //
- // The key is after the bucket index prefix and parent ID when the
- // cursor is pointing to a nested bucket.
- key := c.currentIter.Key()
- if bytes.HasPrefix(key, bucketIndexPrefix) {
- key = key[len(bucketIndexPrefix)+4:]
- return copySlice(key)
- }
-
- // The key is after the bucket ID when the cursor is pointing to a
- // normal entry.
- key = key[len(c.bucket.id):]
- return copySlice(key)
-}
-
-// rawValue returns the current value the cursor is pointing to without
-// stripping without filtering bucket index values.
-func (c *cursor) rawValue() []byte {
- // Nothing to return if cursor is exhausted.
- if c.currentIter == nil {
- return nil
- }
-
- return copySlice(c.currentIter.Value())
-}
-
-// Value returns the current value the cursor is pointing to. This will be nil
-// for nested buckets.
-//
-// This function is part of the database.Cursor interface implementation.
-func (c *cursor) Value() []byte {
- // Ensure transaction state is valid.
- if err := c.bucket.tx.checkClosed(); err != nil {
- return nil
- }
-
- // Nothing to return if cursor is exhausted.
- if c.currentIter == nil {
- return nil
- }
-
- // Return nil for the value when the cursor is pointing to a nested
- // bucket.
- if bytes.HasPrefix(c.currentIter.Key(), bucketIndexPrefix) {
- return nil
- }
-
- return copySlice(c.currentIter.Value())
-}
-
-// cursorType defines the type of cursor to create.
-type cursorType int
-
-// The following constants define the allowed cursor types.
-const (
- // ctKeys iterates through all of the keys in a given bucket.
- ctKeys cursorType = iota
-
- // ctBuckets iterates through all directly nested buckets in a given
- // bucket.
- ctBuckets
-
- // ctFull iterates through both the keys and the directly nested buckets
- // in a given bucket.
- ctFull
-)
-
-// cursorFinalizer is either invoked when a cursor is being garbage collected or
-// called manually to ensure the underlying cursor iterators are released.
-func cursorFinalizer(c *cursor) {
- c.dbIter.Release()
- c.pendingIter.Release()
-}
-
-// newCursor returns a new cursor for the given bucket, bucket ID, and cursor
-// type.
-//
-// NOTE: The caller is responsible for calling the cursorFinalizer function on
-// the returned cursor.
-func newCursor(b *bucket, bucketID []byte, cursorTyp cursorType) *cursor {
- var dbIter, pendingIter iterator.Iterator
- switch cursorTyp {
- case ctKeys:
- keyRange := util.BytesPrefix(bucketID)
- dbIter = b.tx.snapshot.NewIterator(keyRange)
- pendingKeyIter := newLdbTreapIter(b.tx, keyRange)
- pendingIter = pendingKeyIter
-
- case ctBuckets:
- // The serialized bucket index key format is:
- // <bucketindexprefix><parentbucketid><bucketname>
-
- // Create an iterator for the both the database and the pending
- // keys which are prefixed by the bucket index identifier and
- // the provided bucket ID.
- prefix := make([]byte, len(bucketIndexPrefix)+4)
- copy(prefix, bucketIndexPrefix)
- copy(prefix[len(bucketIndexPrefix):], bucketID)
- bucketRange := util.BytesPrefix(prefix)
-
- dbIter = b.tx.snapshot.NewIterator(bucketRange)
- pendingBucketIter := newLdbTreapIter(b.tx, bucketRange)
- pendingIter = pendingBucketIter
-
- case ctFull:
- fallthrough
- default:
- // The serialized bucket index key format is:
- // <bucketindexprefix><parentbucketid><bucketname>
- prefix := make([]byte, len(bucketIndexPrefix)+4)
- copy(prefix, bucketIndexPrefix)
- copy(prefix[len(bucketIndexPrefix):], bucketID)
- bucketRange := util.BytesPrefix(prefix)
- keyRange := util.BytesPrefix(bucketID)
-
- // Since both keys and buckets are needed from the database,
- // create an individual iterator for each prefix and then create
- // a merged iterator from them.
- dbKeyIter := b.tx.snapshot.NewIterator(keyRange)
- dbBucketIter := b.tx.snapshot.NewIterator(bucketRange)
- iters := []iterator.Iterator{dbKeyIter, dbBucketIter}
- dbIter = iterator.NewMergedIterator(iters,
- comparer.DefaultComparer, true)
-
- // Since both keys and buckets are needed from the pending keys,
- // create an individual iterator for each prefix and then create
- // a merged iterator from them.
- pendingKeyIter := newLdbTreapIter(b.tx, keyRange)
- pendingBucketIter := newLdbTreapIter(b.tx, bucketRange)
- iters = []iterator.Iterator{pendingKeyIter, pendingBucketIter}
- pendingIter = iterator.NewMergedIterator(iters,
- comparer.DefaultComparer, true)
- }
-
- // Create the cursor using the iterators.
- return &cursor{bucket: b, dbIter: dbIter, pendingIter: pendingIter}
-}
-
-// bucket is an internal type used to represent a collection of key/value pairs
-// and implements the database.Bucket interface.
-type bucket struct {
- tx *transaction
- id [4]byte
-}
-
-// Enforce bucket implements the database.Bucket interface.
-var _ database.Bucket = (*bucket)(nil)
-
-// bucketIndexKey returns the actual key to use for storing and retrieving a
-// child bucket in the bucket index. This is required because additional
-// information is needed to distinguish nested buckets with the same name.
-func bucketIndexKey(parentID [4]byte, key []byte) []byte {
- // The serialized bucket index key format is:
- // <bucketindexprefix><parentbucketid><bucketname>
- indexKey := make([]byte, len(bucketIndexPrefix)+4+len(key))
- copy(indexKey, bucketIndexPrefix)
- copy(indexKey[len(bucketIndexPrefix):], parentID[:])
- copy(indexKey[len(bucketIndexPrefix)+4:], key)
- return indexKey
-}
-
-// bucketizedKey returns the actual key to use for storing and retrieving a key
-// for the provided bucket ID. This is required because bucketizing is handled
-// through the use of a unique prefix per bucket.
-func bucketizedKey(bucketID [4]byte, key []byte) []byte {
- // The serialized block index key format is:
- // <bucketid><key>
- bKey := make([]byte, 4+len(key))
- copy(bKey, bucketID[:])
- copy(bKey[4:], key)
- return bKey
-}
-
-// Bucket retrieves a nested bucket with the given key. Returns nil if
-// the bucket does not exist.
-//
-// This function is part of the database.Bucket interface implementation.
-func (b *bucket) Bucket(key []byte) database.Bucket {
- // Ensure transaction state is valid.
- if err := b.tx.checkClosed(); err != nil {
- return nil
- }
-
- // Attempt to fetch the ID for the child bucket. The bucket does not
- // exist if the bucket index entry does not exist.
- childID := b.tx.fetchKey(bucketIndexKey(b.id, key))
- if childID == nil {
- return nil
- }
-
- childBucket := &bucket{tx: b.tx}
- copy(childBucket.id[:], childID)
- return childBucket
-}
-
-// CreateBucket creates and returns a new nested bucket with the given key.
-//
-// Returns the following errors as required by the interface contract:
-// - ErrBucketExists if the bucket already exists
-// - ErrBucketNameRequired if the key is empty
-// - ErrIncompatibleValue if the key is otherwise invalid for the particular
-// implementation
-// - ErrTxNotWritable if attempted against a read-only transaction
-// - ErrTxClosed if the transaction has already been closed
-//
-// This function is part of the database.Bucket interface implementation.
-func (b *bucket) CreateBucket(key []byte) (database.Bucket, error) {
- // Ensure transaction state is valid.
- if err := b.tx.checkClosed(); err != nil {
- return nil, err
- }
-
- // Ensure the transaction is writable.
- if !b.tx.writable {
- str := "create bucket requires a writable database transaction"
- return nil, makeDbErr(database.ErrTxNotWritable, str, nil)
- }
-
- // Ensure a key was provided.
- if len(key) == 0 {
- str := "create bucket requires a key"
- return nil, makeDbErr(database.ErrBucketNameRequired, str, nil)
- }
-
- // Ensure bucket does not already exist.
- bidxKey := bucketIndexKey(b.id, key)
- if b.tx.hasKey(bidxKey) {
- str := "bucket already exists"
- return nil, makeDbErr(database.ErrBucketExists, str, nil)
- }
-
- // Find the appropriate next bucket ID to use for the new bucket. In
- // the case of the special internal block index, keep the fixed ID.
- var childID [4]byte
- if b.id == metadataBucketID && bytes.Equal(key, blockIdxBucketName) {
- childID = blockIdxBucketID
- } else {
- var err error
- childID, err = b.tx.nextBucketID()
- if err != nil {
- return nil, err
- }
- }
-
- // Add the new bucket to the bucket index.
- if err := b.tx.putKey(bidxKey, childID[:]); err != nil {
- str := fmt.Sprintf("failed to create bucket with key %q", key)
- return nil, convertErr(str, err)
- }
- return &bucket{tx: b.tx, id: childID}, nil
-}
-
-// CreateBucketIfNotExists creates and returns a new nested bucket with the
-// given key if it does not already exist.
-//
-// Returns the following errors as required by the interface contract:
-// - ErrBucketNameRequired if the key is empty
-// - ErrIncompatibleValue if the key is otherwise invalid for the particular
-// implementation
-// - ErrTxNotWritable if attempted against a read-only transaction
-// - ErrTxClosed if the transaction has already been closed
-//
-// This function is part of the database.Bucket interface implementation.
-func (b *bucket) CreateBucketIfNotExists(key []byte) (database.Bucket, error) {
- // Ensure transaction state is valid.
- if err := b.tx.checkClosed(); err != nil {
- return nil, err
- }
-
- // Ensure the transaction is writable.
- if !b.tx.writable {
- str := "create bucket requires a writable database transaction"
- return nil, makeDbErr(database.ErrTxNotWritable, str, nil)
- }
-
- // Return existing bucket if it already exists, otherwise create it.
- if bucket := b.Bucket(key); bucket != nil {
- return bucket, nil
- }
- return b.CreateBucket(key)
-}
-
-// DeleteBucket removes a nested bucket with the given key.
-//
-// Returns the following errors as required by the interface contract:
-// - ErrBucketNotFound if the specified bucket does not exist
-// - ErrTxNotWritable if attempted against a read-only transaction
-// - ErrTxClosed if the transaction has already been closed
-//
-// This function is part of the database.Bucket interface implementation.
-func (b *bucket) DeleteBucket(key []byte) error {
- // Ensure transaction state is valid.
- if err := b.tx.checkClosed(); err != nil {
- return err
- }
-
- // Ensure the transaction is writable.
- if !b.tx.writable {
- str := "delete bucket requires a writable database transaction"
- return makeDbErr(database.ErrTxNotWritable, str, nil)
- }
-
- // Attempt to fetch the ID for the child bucket. The bucket does not
- // exist if the bucket index entry does not exist. In the case of the
- // special internal block index, keep the fixed ID.
- bidxKey := bucketIndexKey(b.id, key)
- childID := b.tx.fetchKey(bidxKey)
- if childID == nil {
- str := fmt.Sprintf("bucket %q does not exist", key)
- return makeDbErr(database.ErrBucketNotFound, str, nil)
- }
-
- // Remove all nested buckets and their keys.
- childIDs := [][]byte{childID}
- for len(childIDs) > 0 {
- childID = childIDs[len(childIDs)-1]
- childIDs = childIDs[:len(childIDs)-1]
-
- // Delete all keys in the nested bucket.
- keyCursor := newCursor(b, childID, ctKeys)
- for ok := keyCursor.First(); ok; ok = keyCursor.Next() {
- b.tx.deleteKey(keyCursor.rawKey(), false)
- }
- cursorFinalizer(keyCursor)
-
- // Iterate through all nested buckets.
- bucketCursor := newCursor(b, childID, ctBuckets)
- for ok := bucketCursor.First(); ok; ok = bucketCursor.Next() {
- // Push the id of the nested bucket onto the stack for
- // the next iteration.
- childID := bucketCursor.rawValue()
- childIDs = append(childIDs, childID)
-
- // Remove the nested bucket from the bucket index.
- b.tx.deleteKey(bucketCursor.rawKey(), false)
- }
- cursorFinalizer(bucketCursor)
- }
-
- // Remove the nested bucket from the bucket index. Any buckets nested
- // under it were already removed above.
- b.tx.deleteKey(bidxKey, true)
- return nil
-}
-
-// Cursor returns a new cursor, allowing for iteration over the bucket's
-// key/value pairs and nested buckets in forward or backward order.
-//
-// You must seek to a position using the First, Last, or Seek functions before
-// calling the Next, Prev, Key, or Value functions. Failure to do so will
-// result in the same return values as an exhausted cursor, which is false for
-// the Prev and Next functions and nil for Key and Value functions.
-//
-// This function is part of the database.Bucket interface implementation.
-func (b *bucket) Cursor() database.Cursor {
- // Ensure transaction state is valid.
- if err := b.tx.checkClosed(); err != nil {
- return &cursor{bucket: b}
- }
-
- // Create the cursor and setup a runtime finalizer to ensure the
- // iterators are released when the cursor is garbage collected.
- c := newCursor(b, b.id[:], ctFull)
- runtime.SetFinalizer(c, cursorFinalizer)
- return c
-}
-
-// ForEach invokes the passed function with every key/value pair in the bucket.
-// This does not include nested buckets or the key/value pairs within those
-// nested buckets.
-//
-// WARNING: It is not safe to mutate data while iterating with this method.
-// Doing so may cause the underlying cursor to be invalidated and return
-// unexpected keys and/or values.
-//
-// Returns the following errors as required by the interface contract:
-// - ErrTxClosed if the transaction has already been closed
-//
-// NOTE: The values returned by this function are only valid during a
-// transaction. Attempting to access them after a transaction has ended will
-// likely result in an access violation.
-//
-// This function is part of the database.Bucket interface implementation.
-func (b *bucket) ForEach(fn func(k, v []byte) error) error {
- // Ensure transaction state is valid.
- if err := b.tx.checkClosed(); err != nil {
- return err
- }
-
- // Invoke the callback for each cursor item. Return the error returned
- // from the callback when it is non-nil.
- c := newCursor(b, b.id[:], ctKeys)
- defer cursorFinalizer(c)
- for ok := c.First(); ok; ok = c.Next() {
- err := fn(c.Key(), c.Value())
- if err != nil {
- return err
- }
- }
-
- return nil
-}
-
-// ForEachBucket invokes the passed function with the key of every nested bucket
-// in the current bucket. This does not include any nested buckets within those
-// nested buckets.
-//
-// WARNING: It is not safe to mutate data while iterating with this method.
-// Doing so may cause the underlying cursor to be invalidated and return
-// unexpected keys.
-//
-// Returns the following errors as required by the interface contract:
-// - ErrTxClosed if the transaction has already been closed
-//
-// NOTE: The values returned by this function are only valid during a
-// transaction. Attempting to access them after a transaction has ended will
-// likely result in an access violation.
-//
-// This function is part of the database.Bucket interface implementation.
-func (b *bucket) ForEachBucket(fn func(k []byte) error) error {
- // Ensure transaction state is valid.
- if err := b.tx.checkClosed(); err != nil {
- return err
- }
-
- // Invoke the callback for each cursor item. Return the error returned
- // from the callback when it is non-nil.
- c := newCursor(b, b.id[:], ctBuckets)
- defer cursorFinalizer(c)
- for ok := c.First(); ok; ok = c.Next() {
- err := fn(c.Key())
- if err != nil {
- return err
- }
- }
-
- return nil
-}
-
-// Writable returns whether or not the bucket is writable.
-//
-// This function is part of the database.Bucket interface implementation.
-func (b *bucket) Writable() bool {
- return b.tx.writable
-}
-
-// Put saves the specified key/value pair to the bucket. Keys that do not
-// already exist are added and keys that already exist are overwritten.
-//
-// Returns the following errors as required by the interface contract:
-// - ErrKeyRequired if the key is empty
-// - ErrIncompatibleValue if the key is the same as an existing bucket
-// - ErrTxNotWritable if attempted against a read-only transaction
-// - ErrTxClosed if the transaction has already been closed
-//
-// This function is part of the database.Bucket interface implementation.
-func (b *bucket) Put(key, value []byte) error {
- // Ensure transaction state is valid.
- if err := b.tx.checkClosed(); err != nil {
- return err
- }
-
- // Ensure the transaction is writable.
- if !b.tx.writable {
- str := "setting a key requires a writable database transaction"
- return makeDbErr(database.ErrTxNotWritable, str, nil)
- }
-
- // Ensure a key was provided.
- if len(key) == 0 {
- str := "put requires a key"
- return makeDbErr(database.ErrKeyRequired, str, nil)
- }
-
- return b.tx.putKey(bucketizedKey(b.id, key), value)
-}
-
-// Get returns the value for the given key. Returns nil if the key does not
-// exist in this bucket. An empty slice is returned for keys that exist but
-// have no value assigned.
-//
-// NOTE: The value returned by this function is only valid during a transaction.
-// Attempting to access it after a transaction has ended results in undefined
-// behavior. Additionally, the value must NOT be modified by the caller.
-//
-// This function is part of the database.Bucket interface implementation.
-func (b *bucket) Get(key []byte) []byte {
- // Ensure transaction state is valid.
- if err := b.tx.checkClosed(); err != nil {
- return nil
- }
-
- // Nothing to return if there is no key.
- if len(key) == 0 {
- return nil
- }
-
- return b.tx.fetchKey(bucketizedKey(b.id, key))
-}
-
-// Delete removes the specified key from the bucket. Deleting a key that does
-// not exist does not return an error.
-//
-// Returns the following errors as required by the interface contract:
-// - ErrKeyRequired if the key is empty
-// - ErrIncompatibleValue if the key is the same as an existing bucket
-// - ErrTxNotWritable if attempted against a read-only transaction
-// - ErrTxClosed if the transaction has already been closed
-//
-// This function is part of the database.Bucket interface implementation.
-func (b *bucket) Delete(key []byte) error {
- // Ensure transaction state is valid.
- if err := b.tx.checkClosed(); err != nil {
- return err
- }
-
- // Ensure the transaction is writable.
- if !b.tx.writable {
- str := "deleting a value requires a writable database transaction"
- return makeDbErr(database.ErrTxNotWritable, str, nil)
- }
-
- // Nothing to do if there is no key.
- if len(key) == 0 {
- return nil
- }
-
- b.tx.deleteKey(bucketizedKey(b.id, key), true)
- return nil
-}
-
-// pendingBlock houses a block that will be written to disk when the database
-// transaction is committed.
-type pendingBlock struct {
- hash *chainhash.Hash
- bytes []byte
-}
-
-// transaction represents a database transaction. It can either be read-only or
-// read-write and implements the database.Bucket interface. The transaction
-// provides a root bucket against which all read and writes occur.
-type transaction struct {
- managed bool // Is the transaction managed?
- closed bool // Is the transaction closed?
- writable bool // Is the transaction writable?
- db *db // DB instance the tx was created from.
- snapshot *dbCacheSnapshot // Underlying snapshot for txns.
- metaBucket *bucket // The root metadata bucket.
- blockIdxBucket *bucket // The block index bucket.
-
- // Blocks that need to be stored on commit. The pendingBlocks map is
- // kept to allow quick lookups of pending data by block hash.
- pendingBlocks map[chainhash.Hash]int
- pendingBlockData []pendingBlock
-
- // Keys that need to be stored or deleted on commit.
- pendingKeys *treap.Mutable
- pendingRemove *treap.Mutable
-
- // Active iterators that need to be notified when the pending keys have
- // been updated so the cursors can properly handle updates to the
- // transaction state.
- activeIterLock sync.RWMutex
- activeIters []*treap.Iterator
-}
-
-// Enforce transaction implements the database.Tx interface.
-var _ database.Tx = (*transaction)(nil)
-
-// removeActiveIter removes the passed iterator from the list of active
-// iterators against the pending keys treap.
-func (tx *transaction) removeActiveIter(iter *treap.Iterator) {
- // An indexing for loop is intentionally used over a range here as range
- // does not reevaluate the slice on each iteration nor does it adjust
- // the index for the modified slice.
- tx.activeIterLock.Lock()
- for i := 0; i < len(tx.activeIters); i++ {
- if tx.activeIters[i] == iter {
- copy(tx.activeIters[i:], tx.activeIters[i+1:])
- tx.activeIters[len(tx.activeIters)-1] = nil
- tx.activeIters = tx.activeIters[:len(tx.activeIters)-1]
- }
- }
- tx.activeIterLock.Unlock()
-}
-
-// addActiveIter adds the passed iterator to the list of active iterators for
-// the pending keys treap.
-func (tx *transaction) addActiveIter(iter *treap.Iterator) {
- tx.activeIterLock.Lock()
- tx.activeIters = append(tx.activeIters, iter)
- tx.activeIterLock.Unlock()
-}
-
-// notifyActiveIters notifies all of the active iterators for the pending keys
-// treap that it has been updated.
-func (tx *transaction) notifyActiveIters() {
- tx.activeIterLock.RLock()
- for _, iter := range tx.activeIters {
- iter.ForceReseek()
- }
- tx.activeIterLock.RUnlock()
-}
-
-// checkClosed returns an error if the the database or transaction is closed.
-func (tx *transaction) checkClosed() error {
- // The transaction is no longer valid if it has been closed.
- if tx.closed {
- return makeDbErr(database.ErrTxClosed, errTxClosedStr, nil)
- }
-
- return nil
-}
-
-// hasKey returns whether or not the provided key exists in the database while
-// taking into account the current transaction state.
-func (tx *transaction) hasKey(key []byte) bool {
- // When the transaction is writable, check the pending transaction
- // state first.
- if tx.writable {
- if tx.pendingRemove.Has(key) {
- return false
- }
- if tx.pendingKeys.Has(key) {
- return true
- }
- }
-
- // Consult the database cache and underlying database.
- return tx.snapshot.Has(key)
-}
-
-// putKey adds the provided key to the list of keys to be updated in the
-// database when the transaction is committed.
-//
-// NOTE: This function must only be called on a writable transaction. Since it
-// is an internal helper function, it does not check.
-func (tx *transaction) putKey(key, value []byte) error {
- // Prevent the key from being deleted if it was previously scheduled
- // to be deleted on transaction commit.
- tx.pendingRemove.Delete(key)
-
- // Add the key/value pair to the list to be written on transaction
- // commit.
- tx.pendingKeys.Put(key, value)
- tx.notifyActiveIters()
- return nil
-}
-
-// fetchKey attempts to fetch the provided key from the database cache (and
-// hence underlying database) while taking into account the current transaction
-// state. Returns nil if the key does not exist.
-func (tx *transaction) fetchKey(key []byte) []byte {
- // When the transaction is writable, check the pending transaction
- // state first.
- if tx.writable {
- if tx.pendingRemove.Has(key) {
- return nil
- }
- if value := tx.pendingKeys.Get(key); value != nil {
- return value
- }
- }
-
- // Consult the database cache and underlying database.
- return tx.snapshot.Get(key)
-}
-
-// deleteKey adds the provided key to the list of keys to be deleted from the
-// database when the transaction is committed. The notify iterators flag is
-// useful to delay notifying iterators about the changes during bulk deletes.
-//
-// NOTE: This function must only be called on a writable transaction. Since it
-// is an internal helper function, it does not check.
-func (tx *transaction) deleteKey(key []byte, notifyIterators bool) {
- // Remove the key from the list of pendings keys to be written on
- // transaction commit if needed.
- tx.pendingKeys.Delete(key)
-
- // Add the key to the list to be deleted on transaction commit.
- tx.pendingRemove.Put(key, nil)
-
- // Notify the active iterators about the change if the flag is set.
- if notifyIterators {
- tx.notifyActiveIters()
- }
-}
-
-// nextBucketID returns the next bucket ID to use for creating a new bucket.
-//
-// NOTE: This function must only be called on a writable transaction. Since it
-// is an internal helper function, it does not check.
-func (tx *transaction) nextBucketID() ([4]byte, error) {
- // Load the currently highest used bucket ID.
- curIDBytes := tx.fetchKey(curBucketIDKeyName)
- curBucketNum := binary.BigEndian.Uint32(curIDBytes)
-
- // Increment and update the current bucket ID and return it.
- var nextBucketID [4]byte
- binary.BigEndian.PutUint32(nextBucketID[:], curBucketNum+1)
- if err := tx.putKey(curBucketIDKeyName, nextBucketID[:]); err != nil {
- return [4]byte{}, err
- }
- return nextBucketID, nil
-}
-
-// Metadata returns the top-most bucket for all metadata storage.
-//
-// This function is part of the database.Tx interface implementation.
-func (tx *transaction) Metadata() database.Bucket {
- return tx.metaBucket
-}
-
-// hasBlock returns whether or not a block with the given hash exists.
-func (tx *transaction) hasBlock(hash *chainhash.Hash) bool {
- // Return true if the block is pending to be written on commit since
- // it exists from the viewpoint of this transaction.
- if _, exists := tx.pendingBlocks[*hash]; exists {
- return true
- }
-
- return tx.hasKey(bucketizedKey(blockIdxBucketID, hash[:]))
-}
-
-// StoreBlock stores the provided block into the database. There are no checks
-// to ensure the block connects to a previous block, contains double spends, or
-// any additional functionality such as transaction indexing. It simply stores
-// the block in the database.
-//
-// Returns the following errors as required by the interface contract:
-// - ErrBlockExists when the block hash already exists
-// - ErrTxNotWritable if attempted against a read-only transaction
-// - ErrTxClosed if the transaction has already been closed
-//
-// This function is part of the database.Tx interface implementation.
-func (tx *transaction) StoreBlock(block *btcutil.Block) error {
- // Ensure transaction state is valid.
- if err := tx.checkClosed(); err != nil {
- return err
- }
-
- // Ensure the transaction is writable.
- if !tx.writable {
- str := "store block requires a writable database transaction"
- return makeDbErr(database.ErrTxNotWritable, str, nil)
- }
-
- // Reject the block if it already exists.
- blockHash := block.Hash()
- if tx.hasBlock(blockHash) {
- str := fmt.Sprintf("block %s already exists", blockHash)
- return makeDbErr(database.ErrBlockExists, str, nil)
- }
-
- blockBytes, err := block.Bytes()
- if err != nil {
- str := fmt.Sprintf("failed to get serialized bytes for block %s",
- blockHash)
- return makeDbErr(database.ErrDriverSpecific, str, err)
- }
-
- // Add the block to be stored to the list of pending blocks to store
- // when the transaction is committed. Also, add it to pending blocks
- // map so it is easy to determine the block is pending based on the
- // block hash.
- if tx.pendingBlocks == nil {
- tx.pendingBlocks = make(map[chainhash.Hash]int)
- }
- tx.pendingBlocks[*blockHash] = len(tx.pendingBlockData)
- tx.pendingBlockData = append(tx.pendingBlockData, pendingBlock{
- hash: blockHash,
- bytes: blockBytes,
- })
- log.Tracef("Added block %s to pending blocks", blockHash)
-
- return nil
-}
-
-// HasBlock returns whether or not a block with the given hash exists in the
-// database.
-//
-// Returns the following errors as required by the interface contract:
-// - ErrTxClosed if the transaction has already been closed
-//
-// This function is part of the database.Tx interface implementation.
-func (tx *transaction) HasBlock(hash *chainhash.Hash) (bool, error) {
- // Ensure transaction state is valid.
- if err := tx.checkClosed(); err != nil {
- return false, err
- }
-
- return tx.hasBlock(hash), nil
-}
-
-// HasBlocks returns whether or not the blocks with the provided hashes
-// exist in the database.
-//
-// Returns the following errors as required by the interface contract:
-// - ErrTxClosed if the transaction has already been closed
-//
-// This function is part of the database.Tx interface implementation.
-func (tx *transaction) HasBlocks(hashes []chainhash.Hash) ([]bool, error) {
- // Ensure transaction state is valid.
- if err := tx.checkClosed(); err != nil {
- return nil, err
- }
-
- results := make([]bool, len(hashes))
- for i := range hashes {
- results[i] = tx.hasBlock(&hashes[i])
- }
-
- return results, nil
-}
-
-// fetchBlockRow fetches the metadata stored in the block index for the provided
-// hash. It will return ErrBlockNotFound if there is no entry.
-func (tx *transaction) fetchBlockRow(hash *chainhash.Hash) ([]byte, error) {
- blockRow := tx.blockIdxBucket.Get(hash[:])
- if blockRow == nil {
- str := fmt.Sprintf("block %s does not exist", hash)
- return nil, makeDbErr(database.ErrBlockNotFound, str, nil)
- }
-
- return blockRow, nil
-}
-
-// FetchBlockHeader returns the raw serialized bytes for the block header
-// identified by the given hash. The raw bytes are in the format returned by
-// Serialize on a wire.BlockHeader.
-//
-// Returns the following errors as required by the interface contract:
-// - ErrBlockNotFound if the requested block hash does not exist
-// - ErrTxClosed if the transaction has already been closed
-// - ErrCorruption if the database has somehow become corrupted
-//
-// NOTE: The data returned by this function is only valid during a
-// database transaction. Attempting to access it after a transaction
-// has ended results in undefined behavior. This constraint prevents
-// additional data copies and allows support for memory-mapped database
-// implementations.
-//
-// This function is part of the database.Tx interface implementation.
-func (tx *transaction) FetchBlockHeader(hash *chainhash.Hash) ([]byte, error) {
- // Ensure transaction state is valid.
- if err := tx.checkClosed(); err != nil {
- return nil, err
- }
-
- // When the block is pending to be written on commit return the bytes
- // from there.
- if idx, exists := tx.pendingBlocks[*hash]; exists {
- blockBytes := tx.pendingBlockData[idx].bytes
- return blockBytes[0:blockHdrSize:blockHdrSize], nil
- }
-
- // Fetch the block index row and slice off the header. Notice the use
- // of the cap on the subslice to prevent the caller from accidentally
- // appending into the db data.
- blockRow, err := tx.fetchBlockRow(hash)
- if err != nil {
- return nil, err
- }
- endOffset := blockLocSize + blockHdrSize
- return blockRow[blockLocSize:endOffset:endOffset], nil
-}
-
-// FetchBlockHeaders returns the raw serialized bytes for the block headers
-// identified by the given hashes. The raw bytes are in the format returned by
-// Serialize on a wire.BlockHeader.
-//
-// Returns the following errors as required by the interface contract:
-// - ErrBlockNotFound if the any of the requested block hashes do not exist
-// - ErrTxClosed if the transaction has already been closed
-// - ErrCorruption if the database has somehow become corrupted
-//
-// NOTE: The data returned by this function is only valid during a database
-// transaction. Attempting to access it after a transaction has ended results
-// in undefined behavior. This constraint prevents additional data copies and
-// allows support for memory-mapped database implementations.
-//
-// This function is part of the database.Tx interface implementation.
-func (tx *transaction) FetchBlockHeaders(hashes []chainhash.Hash) ([][]byte, error) {
- // Ensure transaction state is valid.
- if err := tx.checkClosed(); err != nil {
- return nil, err
- }
-
- // NOTE: This could check for the existence of all blocks before loading
- // any of the headers which would be faster in the failure case, however
- // callers will not typically be calling this function with invalid
- // values, so optimize for the common case.
-
- // Load the headers.
- headers := make([][]byte, len(hashes))
- for i := range hashes {
- hash := &hashes[i]
-
- // When the block is pending to be written on commit return the
- // bytes from there.
- if idx, exists := tx.pendingBlocks[*hash]; exists {
- blkBytes := tx.pendingBlockData[idx].bytes
- headers[i] = blkBytes[0:blockHdrSize:blockHdrSize]
- continue
- }
-
- // Fetch the block index row and slice off the header. Notice
- // the use of the cap on the subslice to prevent the caller
- // from accidentally appending into the db data.
- blockRow, err := tx.fetchBlockRow(hash)
- if err != nil {
- return nil, err
- }
- endOffset := blockLocSize + blockHdrSize
- headers[i] = blockRow[blockLocSize:endOffset:endOffset]
- }
-
- return headers, nil
-}
-
-// FetchBlock returns the raw serialized bytes for the block identified by the
-// given hash. The raw bytes are in the format returned by Serialize on a
-// wire.MsgBlock.
-//
-// Returns the following errors as required by the interface contract:
-// - ErrBlockNotFound if the requested block hash does not exist
-// - ErrTxClosed if the transaction has already been closed
-// - ErrCorruption if the database has somehow become corrupted
-//
-// In addition, returns ErrDriverSpecific if any failures occur when reading the
-// block files.
-//
-// NOTE: The data returned by this function is only valid during a database
-// transaction. Attempting to access it after a transaction has ended results
-// in undefined behavior. This constraint prevents additional data copies and
-// allows support for memory-mapped database implementations.
-//
-// This function is part of the database.Tx interface implementation.
-func (tx *transaction) FetchBlock(hash *chainhash.Hash) ([]byte, error) {
- // Ensure transaction state is valid.
- if err := tx.checkClosed(); err != nil {
- return nil, err
- }
-
- // When the block is pending to be written on commit return the bytes
- // from there.
- if idx, exists := tx.pendingBlocks[*hash]; exists {
- return tx.pendingBlockData[idx].bytes, nil
- }
-
- // Lookup the location of the block in the files from the block index.
- blockRow, err := tx.fetchBlockRow(hash)
- if err != nil {
- return nil, err
- }
- location := deserializeBlockLoc(blockRow)
-
- // Read the block from the appropriate location. The function also
- // performs a checksum over the data to detect data corruption.
- blockBytes, err := tx.db.store.readBlock(hash, location)
- if err != nil {
- return nil, err
- }
-
- return blockBytes, nil
-}
-
-// FetchBlocks returns the raw serialized bytes for the blocks identified by the
-// given hashes. The raw bytes are in the format returned by Serialize on a
-// wire.MsgBlock.
-//
-// Returns the following errors as required by the interface contract:
-// - ErrBlockNotFound if any of the requested block hashed do not exist
-// - ErrTxClosed if the transaction has already been closed
-// - ErrCorruption if the database has somehow become corrupted
-//
-// In addition, returns ErrDriverSpecific if any failures occur when reading the
-// block files.
-//
-// NOTE: The data returned by this function is only valid during a database
-// transaction. Attempting to access it after a transaction has ended results
-// in undefined behavior. This constraint prevents additional data copies and
-// allows support for memory-mapped database implementations.
-//
-// This function is part of the database.Tx interface implementation.
-func (tx *transaction) FetchBlocks(hashes []chainhash.Hash) ([][]byte, error) {
- // Ensure transaction state is valid.
- if err := tx.checkClosed(); err != nil {
- return nil, err
- }
-
- // NOTE: This could check for the existence of all blocks before loading
- // any of them which would be faster in the failure case, however
- // callers will not typically be calling this function with invalid
- // values, so optimize for the common case.
-
- // Load the blocks.
- blocks := make([][]byte, len(hashes))
- for i := range hashes {
- var err error
- blocks[i], err = tx.FetchBlock(&hashes[i])
- if err != nil {
- return nil, err
- }
- }
-
- return blocks, nil
-}
-
-// fetchPendingRegion attempts to fetch the provided region from any block which
-// are pending to be written on commit. It will return nil for the byte slice
-// when the region references a block which is not pending. When the region
-// does reference a pending block, it is bounds checked and returns
-// ErrBlockRegionInvalid if invalid.
-func (tx *transaction) fetchPendingRegion(region *database.BlockRegion) ([]byte, error) {
- // Nothing to do if the block is not pending to be written on commit.
- idx, exists := tx.pendingBlocks[*region.Hash]
- if !exists {
- return nil, nil
- }
-
- // Ensure the region is within the bounds of the block.
- blockBytes := tx.pendingBlockData[idx].bytes
- blockLen := uint32(len(blockBytes))
- endOffset := region.Offset + region.Len
- if endOffset < region.Offset || endOffset > blockLen {
- str := fmt.Sprintf("block %s region offset %d, length %d "+
- "exceeds block length of %d", region.Hash,
- region.Offset, region.Len, blockLen)
- return nil, makeDbErr(database.ErrBlockRegionInvalid, str, nil)
- }
-
- // Return the bytes from the pending block.
- return blockBytes[region.Offset:endOffset:endOffset], nil
-}
-
-// FetchBlockRegion returns the raw serialized bytes for the given block region.
-//
-// For example, it is possible to directly extract Bitcoin transactions and/or
-// scripts from a block with this function. Depending on the backend
-// implementation, this can provide significant savings by avoiding the need to
-// load entire blocks.
-//
-// The raw bytes are in the format returned by Serialize on a wire.MsgBlock and
-// the Offset field in the provided BlockRegion is zero-based and relative to
-// the start of the block (byte 0).
-//
-// Returns the following errors as required by the interface contract:
-// - ErrBlockNotFound if the requested block hash does not exist
-// - ErrBlockRegionInvalid if the region exceeds the bounds of the associated
-// block
-// - ErrTxClosed if the transaction has already been closed
-// - ErrCorruption if the database has somehow become corrupted
-//
-// In addition, returns ErrDriverSpecific if any failures occur when reading the
-// block files.
-//
-// NOTE: The data returned by this function is only valid during a database
-// transaction. Attempting to access it after a transaction has ended results
-// in undefined behavior. This constraint prevents additional data copies and
-// allows support for memory-mapped database implementations.
-//
-// This function is part of the database.Tx interface implementation.
-func (tx *transaction) FetchBlockRegion(region *database.BlockRegion) ([]byte, error) {
- // Ensure transaction state is valid.
- if err := tx.checkClosed(); err != nil {
- return nil, err
- }
-
- // When the block is pending to be written on commit return the bytes
- // from there.
- if tx.pendingBlocks != nil {
- regionBytes, err := tx.fetchPendingRegion(region)
- if err != nil {
- return nil, err
- }
- if regionBytes != nil {
- return regionBytes, nil
- }
- }
-
- // Lookup the location of the block in the files from the block index.
- blockRow, err := tx.fetchBlockRow(region.Hash)
- if err != nil {
- return nil, err
- }
- location := deserializeBlockLoc(blockRow)
-
- // Ensure the region is within the bounds of the block.
- endOffset := region.Offset + region.Len
- if endOffset < region.Offset || endOffset > location.blockLen {
- str := fmt.Sprintf("block %s region offset %d, length %d "+
- "exceeds block length of %d", region.Hash,
- region.Offset, region.Len, location.blockLen)
- return nil, makeDbErr(database.ErrBlockRegionInvalid, str, nil)
-
- }
-
- // Read the region from the appropriate disk block file.
- regionBytes, err := tx.db.store.readBlockRegion(location, region.Offset,
- region.Len)
- if err != nil {
- return nil, err
- }
-
- return regionBytes, nil
-}
-
-// FetchBlockRegions returns the raw serialized bytes for the given block
-// regions.
-//
-// For example, it is possible to directly extract Bitcoin transactions and/or
-// scripts from various blocks with this function. Depending on the backend
-// implementation, this can provide significant savings by avoiding the need to
-// load entire blocks.
-//
-// The raw bytes are in the format returned by Serialize on a wire.MsgBlock and
-// the Offset fields in the provided BlockRegions are zero-based and relative to
-// the start of the block (byte 0).
-//
-// Returns the following errors as required by the interface contract:
-// - ErrBlockNotFound if any of the request block hashes do not exist
-// - ErrBlockRegionInvalid if one or more region exceed the bounds of the
-// associated block
-// - ErrTxClosed if the transaction has already been closed
-// - ErrCorruption if the database has somehow become corrupted
-//
-// In addition, returns ErrDriverSpecific if any failures occur when reading the
-// block files.
-//
-// NOTE: The data returned by this function is only valid during a database
-// transaction. Attempting to access it after a transaction has ended results
-// in undefined behavior. This constraint prevents additional data copies and
-// allows support for memory-mapped database implementations.
-//
-// This function is part of the database.Tx interface implementation.
-func (tx *transaction) FetchBlockRegions(regions []database.BlockRegion) ([][]byte, error) {
- // Ensure transaction state is valid.
- if err := tx.checkClosed(); err != nil {
- return nil, err
- }
-
- // NOTE: This could check for the existence of all blocks before
- // deserializing the locations and building up the fetch list which
- // would be faster in the failure case, however callers will not
- // typically be calling this function with invalid values, so optimize
- // for the common case.
-
- // NOTE: A potential optimization here would be to combine adjacent
- // regions to reduce the number of reads.
-
- // In order to improve efficiency of loading the bulk data, first grab
- // the block location for all of the requested block hashes and sort
- // the reads by filenum:offset so that all reads are grouped by file
- // and linear within each file. This can result in quite a significant
- // performance increase depending on how spread out the requested hashes
- // are by reducing the number of file open/closes and random accesses
- // needed. The fetchList is intentionally allocated with a cap because
- // some of the regions might be fetched from the pending blocks and
- // hence there is no need to fetch those from disk.
- blockRegions := make([][]byte, len(regions))
- fetchList := make([]bulkFetchData, 0, len(regions))
- for i := range regions {
- region := ®ions[i]
-
- // When the block is pending to be written on commit grab the
- // bytes from there.
- if tx.pendingBlocks != nil {
- regionBytes, err := tx.fetchPendingRegion(region)
- if err != nil {
- return nil, err
- }
- if regionBytes != nil {
- blockRegions[i] = regionBytes
- continue
- }
- }
-
- // Lookup the location of the block in the files from the block
- // index.
- blockRow, err := tx.fetchBlockRow(region.Hash)
- if err != nil {
- return nil, err
- }
- location := deserializeBlockLoc(blockRow)
-
- // Ensure the region is within the bounds of the block.
- endOffset := region.Offset + region.Len
- if endOffset < region.Offset || endOffset > location.blockLen {
- str := fmt.Sprintf("block %s region offset %d, length "+
- "%d exceeds block length of %d", region.Hash,
- region.Offset, region.Len, location.blockLen)
- return nil, makeDbErr(database.ErrBlockRegionInvalid, str, nil)
- }
-
- fetchList = append(fetchList, bulkFetchData{&location, i})
- }
- sort.Sort(bulkFetchDataSorter(fetchList))
-
- // Read all of the regions in the fetch list and set the results.
- for i := range fetchList {
- fetchData := &fetchList[i]
- ri := fetchData.replyIndex
- region := ®ions[ri]
- location := fetchData.blockLocation
- regionBytes, err := tx.db.store.readBlockRegion(*location,
- region.Offset, region.Len)
- if err != nil {
- return nil, err
- }
- blockRegions[ri] = regionBytes
- }
-
- return blockRegions, nil
-}
-
-// close marks the transaction closed then releases any pending data, the
-// underlying snapshot, the transaction read lock, and the write lock when the
-// transaction is writable.
-func (tx *transaction) close() {
- tx.closed = true
-
- // Clear pending blocks that would have been written on commit.
- tx.pendingBlocks = nil
- tx.pendingBlockData = nil
-
- // Clear pending keys that would have been written or deleted on commit.
- tx.pendingKeys = nil
- tx.pendingRemove = nil
-
- // Release the snapshot.
- if tx.snapshot != nil {
- tx.snapshot.Release()
- tx.snapshot = nil
- }
-
- tx.db.closeLock.RUnlock()
-
- // Release the writer lock for writable transactions to unblock any
- // other write transaction which are possibly waiting.
- if tx.writable {
- tx.db.writeLock.Unlock()
- }
-}
-
-// serializeBlockRow serializes a block row into a format suitable for storage
-// into the block index.
-func serializeBlockRow(blockLoc blockLocation, blockHdr []byte) []byte {
- // The serialized block index row format is:
- //
- // [0:blockLocSize] Block location
- // [blockLocSize:blockLocSize+blockHdrSize] Block header
- serializedRow := make([]byte, blockLocSize+blockHdrSize)
- copy(serializedRow, serializeBlockLoc(blockLoc))
- copy(serializedRow[blockHdrOffset:], blockHdr)
- return serializedRow
-}
-
-// writePendingAndCommit writes pending block data to the flat block files,
-// updates the metadata with their locations as well as the new current write
-// location, and commits the metadata to the memory database cache. It also
-// properly handles rollback in the case of failures.
-//
-// This function MUST only be called when there is pending data to be written.
-func (tx *transaction) writePendingAndCommit() error {
- // Save the current block store write position for potential rollback.
- // These variables are only updated here in this function and there can
- // only be one write transaction active at a time, so it's safe to store
- // them for potential rollback.
- wc := tx.db.store.writeCursor
- wc.RLock()
- oldBlkFileNum := wc.curFileNum
- oldBlkOffset := wc.curOffset
- wc.RUnlock()
-
- // rollback is a closure that is used to rollback all writes to the
- // block files.
- rollback := func() {
- // Rollback any modifications made to the block files if needed.
- tx.db.store.handleRollback(oldBlkFileNum, oldBlkOffset)
- }
-
- // Loop through all of the pending blocks to store and write them.
- for _, blockData := range tx.pendingBlockData {
- log.Tracef("Storing block %s", blockData.hash)
- location, err := tx.db.store.writeBlock(blockData.bytes)
- if err != nil {
- rollback()
- return err
- }
-
- // Add a record in the block index for the block. The record
- // includes the location information needed to locate the block
- // on the filesystem as well as the block header since they are
- // so commonly needed.
- blockHdr := blockData.bytes[0:blockHdrSize]
- blockRow := serializeBlockRow(location, blockHdr)
- err = tx.blockIdxBucket.Put(blockData.hash[:], blockRow)
- if err != nil {
- rollback()
- return err
- }
- }
-
- // Update the metadata for the current write file and offset.
- writeRow := serializeWriteRow(wc.curFileNum, wc.curOffset)
- if err := tx.metaBucket.Put(writeLocKeyName, writeRow); err != nil {
- rollback()
- return convertErr("failed to store write cursor", err)
- }
-
- // Atomically update the database cache. The cache automatically
- // handles flushing to the underlying persistent storage database.
- return tx.db.cache.commitTx(tx)
-}
-
-// Commit commits all changes that have been made to the root metadata bucket
-// and all of its sub-buckets to the database cache which is periodically synced
-// to persistent storage. In addition, it commits all new blocks directly to
-// persistent storage bypassing the db cache. Blocks can be rather large, so
-// this help increase the amount of cache available for the metadata updates and
-// is safe since blocks are immutable.
-//
-// This function is part of the database.Tx interface implementation.
-func (tx *transaction) Commit() error {
- // Prevent commits on managed transactions.
- if tx.managed {
- tx.close()
- panic("managed transaction commit not allowed")
- }
-
- // Ensure transaction state is valid.
- if err := tx.checkClosed(); err != nil {
- return err
- }
-
- // Regardless of whether the commit succeeds, the transaction is closed
- // on return.
- defer tx.close()
-
- // Ensure the transaction is writable.
- if !tx.writable {
- str := "Commit requires a writable database transaction"
- return makeDbErr(database.ErrTxNotWritable, str, nil)
- }
-
- // Write pending data. The function will rollback if any errors occur.
- return tx.writePendingAndCommit()
-}
-
-// Rollback undoes all changes that have been made to the root bucket and all of
-// its sub-buckets.
-//
-// This function is part of the database.Tx interface implementation.
-func (tx *transaction) Rollback() error {
- // Prevent rollbacks on managed transactions.
- if tx.managed {
- tx.close()
- panic("managed transaction rollback not allowed")
- }
-
- // Ensure transaction state is valid.
- if err := tx.checkClosed(); err != nil {
- return err
- }
-
- tx.close()
- return nil
-}
-
-// db represents a collection of namespaces which are persisted and implements
-// the database.DB interface. All database access is performed through
-// transactions which are obtained through the specific Namespace.
-type db struct {
- writeLock sync.Mutex // Limit to one write transaction at a time.
- closeLock sync.RWMutex // Make database close block while txns active.
- closed bool // Is the database closed?
- store *blockStore // Handles read/writing blocks to flat files.
- cache *dbCache // Cache layer which wraps underlying leveldb DB.
-}
-
-// Enforce db implements the database.DB interface.
-var _ database.DB = (*db)(nil)
-
-// Type returns the database driver type the current database instance was
-// created with.
-//
-// This function is part of the database.DB interface implementation.
-func (db *db) Type() string {
- return dbType
-}
-
-// begin is the implementation function for the Begin database method. See its
-// documentation for more details.
-//
-// This function is only separate because it returns the internal transaction
-// which is used by the managed transaction code while the database method
-// returns the interface.
-func (db *db) begin(writable bool) (*transaction, error) {
- // Whenever a new writable transaction is started, grab the write lock
- // to ensure only a single write transaction can be active at the same
- // time. This lock will not be released until the transaction is
- // closed (via Rollback or Commit).
- if writable {
- db.writeLock.Lock()
- }
-
- // Whenever a new transaction is started, grab a read lock against the
- // database to ensure Close will wait for the transaction to finish.
- // This lock will not be released until the transaction is closed (via
- // Rollback or Commit).
- db.closeLock.RLock()
- if db.closed {
- db.closeLock.RUnlock()
- if writable {
- db.writeLock.Unlock()
- }
- return nil, makeDbErr(database.ErrDbNotOpen, errDbNotOpenStr,
- nil)
- }
-
- // Grab a snapshot of the database cache (which in turn also handles the
- // underlying database).
- snapshot, err := db.cache.Snapshot()
- if err != nil {
- db.closeLock.RUnlock()
- if writable {
- db.writeLock.Unlock()
- }
-
- return nil, err
- }
-
- // The metadata and block index buckets are internal-only buckets, so
- // they have defined IDs.
- tx := &transaction{
- writable: writable,
- db: db,
- snapshot: snapshot,
- pendingKeys: treap.NewMutable(),
- pendingRemove: treap.NewMutable(),
- }
- tx.metaBucket = &bucket{tx: tx, id: metadataBucketID}
- tx.blockIdxBucket = &bucket{tx: tx, id: blockIdxBucketID}
- return tx, nil
-}
-
-// Begin starts a transaction which is either read-only or read-write depending
-// on the specified flag. Multiple read-only transactions can be started
-// simultaneously while only a single read-write transaction can be started at a
-// time. The call will block when starting a read-write transaction when one is
-// already open.
-//
-// NOTE: The transaction must be closed by calling Rollback or Commit on it when
-// it is no longer needed. Failure to do so will result in unclaimed memory.
-//
-// This function is part of the database.DB interface implementation.
-func (db *db) Begin(writable bool) (database.Tx, error) {
- return db.begin(writable)
-}
-
-// rollbackOnPanic rolls the passed transaction back if the code in the calling
-// function panics. This is needed since the mutex on a transaction must be
-// released and a panic in called code would prevent that from happening.
-//
-// NOTE: This can only be handled manually for managed transactions since they
-// control the life-cycle of the transaction. As the documentation on Begin
-// calls out, callers opting to use manual transactions will have to ensure the
-// transaction is rolled back on panic if it desires that functionality as well
-// or the database will fail to close since the read-lock will never be
-// released.
-func rollbackOnPanic(tx *transaction) {
- if err := recover(); err != nil {
- tx.managed = false
- _ = tx.Rollback()
- panic(err)
- }
-}
-
-// View invokes the passed function in the context of a managed read-only
-// transaction with the root bucket for the namespace. Any errors returned from
-// the user-supplied function are returned from this function.
-//
-// This function is part of the database.DB interface implementation.
-func (db *db) View(fn func(database.Tx) error) error {
- // Start a read-only transaction.
- tx, err := db.begin(false)
- if err != nil {
- return err
- }
-
- // Since the user-provided function might panic, ensure the transaction
- // releases all mutexes and resources. There is no guarantee the caller
- // won't use recover and keep going. Thus, the database must still be
- // in a usable state on panics due to caller issues.
- defer rollbackOnPanic(tx)
-
- tx.managed = true
- err = fn(tx)
- tx.managed = false
- if err != nil {
- // The error is ignored here because nothing was written yet
- // and regardless of a rollback failure, the tx is closed now
- // anyways.
- _ = tx.Rollback()
- return err
- }
-
- return tx.Rollback()
-}
-
-// Update invokes the passed function in the context of a managed read-write
-// transaction with the root bucket for the namespace. Any errors returned from
-// the user-supplied function will cause the transaction to be rolled back and
-// are returned from this function. Otherwise, the transaction is committed
-// when the user-supplied function returns a nil error.
-//
-// This function is part of the database.DB interface implementation.
-func (db *db) Update(fn func(database.Tx) error) error {
- // Start a read-write transaction.
- tx, err := db.begin(true)
- if err != nil {
- return err
- }
-
- // Since the user-provided function might panic, ensure the transaction
- // releases all mutexes and resources. There is no guarantee the caller
- // won't use recover and keep going. Thus, the database must still be
- // in a usable state on panics due to caller issues.
- defer rollbackOnPanic(tx)
-
- tx.managed = true
- err = fn(tx)
- tx.managed = false
- if err != nil {
- // The error is ignored here because nothing was written yet
- // and regardless of a rollback failure, the tx is closed now
- // anyways.
- _ = tx.Rollback()
- return err
- }
-
- return tx.Commit()
-}
-
-// Close cleanly shuts down the database and syncs all data. It will block
-// until all database transactions have been finalized (rolled back or
-// committed).
-//
-// This function is part of the database.DB interface implementation.
-func (db *db) Close() error {
- // Since all transactions have a read lock on this mutex, this will
- // cause Close to wait for all readers to complete.
- db.closeLock.Lock()
- defer db.closeLock.Unlock()
-
- if db.closed {
- return makeDbErr(database.ErrDbNotOpen, errDbNotOpenStr, nil)
- }
- db.closed = true
-
- // NOTE: Since the above lock waits for all transactions to finish and
- // prevents any new ones from being started, it is safe to flush the
- // cache and clear all state without the individual locks.
-
- // Close the database cache which will flush any existing entries to
- // disk and close the underlying leveldb database. Any error is saved
- // and returned at the end after the remaining cleanup since the
- // database will be marked closed even if this fails given there is no
- // good way for the caller to recover from a failure here anyways.
- closeErr := db.cache.Close()
-
- // Close any open flat files that house the blocks.
- wc := db.store.writeCursor
- if wc.curFile.file != nil {
- _ = wc.curFile.file.Close()
- wc.curFile.file = nil
- }
- for _, blockFile := range db.store.openBlockFiles {
- _ = blockFile.file.Close()
- }
- db.store.openBlockFiles = nil
- db.store.openBlocksLRU.Init()
- db.store.fileNumToLRUElem = nil
-
- return closeErr
-}
-
-// filesExists reports whether the named file or directory exists.
-func fileExists(name string) bool {
- if _, err := os.Stat(name); err != nil {
- if os.IsNotExist(err) {
- return false
- }
- }
- return true
-}
-
-// initDB creates the initial buckets and values used by the package. This is
-// mainly in a separate function for testing purposes.
-func initDB(ldb *leveldb.DB) error {
- // The starting block file write cursor location is file num 0, offset
- // 0.
- batch := new(leveldb.Batch)
- batch.Put(bucketizedKey(metadataBucketID, writeLocKeyName),
- serializeWriteRow(0, 0))
-
- // Create block index bucket and set the current bucket id.
- //
- // NOTE: Since buckets are virtualized through the use of prefixes,
- // there is no need to store the bucket index data for the metadata
- // bucket in the database. However, the first bucket ID to use does
- // need to account for it to ensure there are no key collisions.
- batch.Put(bucketIndexKey(metadataBucketID, blockIdxBucketName),
- blockIdxBucketID[:])
- batch.Put(curBucketIDKeyName, blockIdxBucketID[:])
-
- // Write everything as a single batch.
- if err := ldb.Write(batch, nil); err != nil {
- str := fmt.Sprintf("failed to initialize metadata database: %v",
- err)
- return convertErr(str, err)
- }
-
- return nil
-}
-
-// openDB opens the database at the provided path. database.ErrDbDoesNotExist
-// is returned if the database doesn't exist and the create flag is not set.
-func openDB(dbPath string, network wire.BitcoinNet, create bool) (database.DB, error) {
- // Error if the database doesn't exist and the create flag is not set.
- metadataDbPath := filepath.Join(dbPath, metadataDbName)
- dbExists := fileExists(metadataDbPath)
- if !create && !dbExists {
- str := fmt.Sprintf("database %q does not exist", metadataDbPath)
- return nil, makeDbErr(database.ErrDbDoesNotExist, str, nil)
- }
-
- // Ensure the full path to the database exists.
- if !dbExists {
- // The error can be ignored here since the call to
- // leveldb.OpenFile will fail if the directory couldn't be
- // created.
- _ = os.MkdirAll(dbPath, 0700)
- }
-
- // Open the metadata database (will create it if needed).
- opts := opt.Options{
- ErrorIfExist: create,
- Strict: opt.DefaultStrict,
- Compression: opt.NoCompression,
- Filter: filter.NewBloomFilter(10),
- }
- ldb, err := leveldb.OpenFile(metadataDbPath, &opts)
- if err != nil {
- return nil, convertErr(err.Error(), err)
- }
-
- // Create the block store which includes scanning the existing flat
- // block files to find what the current write cursor position is
- // according to the data that is actually on disk. Also create the
- // database cache which wraps the underlying leveldb database to provide
- // write caching.
- store := newBlockStore(dbPath, network)
- cache := newDbCache(ldb, store, defaultCacheSize, defaultFlushSecs)
- pdb := &db{store: store, cache: cache}
-
- // Perform any reconciliation needed between the block and metadata as
- // well as database initialization, if needed.
- return reconcileDB(pdb, create)
-}