OSDN Git Service

Merge pull request #41 from Bytom/dev
[bytom/vapor.git] / vendor / github.com / btcsuite / btcd / database / ffldb / db.go
1 // Copyright (c) 2015-2016 The btcsuite developers
2 // Use of this source code is governed by an ISC
3 // license that can be found in the LICENSE file.
4
5 package ffldb
6
7 import (
8         "bytes"
9         "encoding/binary"
10         "fmt"
11         "os"
12         "path/filepath"
13         "runtime"
14         "sort"
15         "sync"
16
17         "github.com/btcsuite/btcd/chaincfg/chainhash"
18         "github.com/btcsuite/btcd/database"
19         "github.com/btcsuite/btcd/database/internal/treap"
20         "github.com/btcsuite/btcd/wire"
21         "github.com/btcsuite/btcutil"
22         "github.com/btcsuite/goleveldb/leveldb"
23         "github.com/btcsuite/goleveldb/leveldb/comparer"
24         ldberrors "github.com/btcsuite/goleveldb/leveldb/errors"
25         "github.com/btcsuite/goleveldb/leveldb/filter"
26         "github.com/btcsuite/goleveldb/leveldb/iterator"
27         "github.com/btcsuite/goleveldb/leveldb/opt"
28         "github.com/btcsuite/goleveldb/leveldb/util"
29 )
30
31 const (
32         // metadataDbName is the name used for the metadata database.
33         metadataDbName = "metadata"
34
35         // blockHdrSize is the size of a block header.  This is simply the
36         // constant from wire and is only provided here for convenience since
37         // wire.MaxBlockHeaderPayload is quite long.
38         blockHdrSize = wire.MaxBlockHeaderPayload
39
40         // blockHdrOffset defines the offsets into a block index row for the
41         // block header.
42         //
43         // The serialized block index row format is:
44         //   <blocklocation><blockheader>
45         blockHdrOffset = blockLocSize
46 )
47
48 var (
49         // byteOrder is the preferred byte order used through the database and
50         // block files.  Sometimes big endian will be used to allow ordered byte
51         // sortable integer values.
52         byteOrder = binary.LittleEndian
53
54         // bucketIndexPrefix is the prefix used for all entries in the bucket
55         // index.
56         bucketIndexPrefix = []byte("bidx")
57
58         // curBucketIDKeyName is the name of the key used to keep track of the
59         // current bucket ID counter.
60         curBucketIDKeyName = []byte("bidx-cbid")
61
62         // metadataBucketID is the ID of the top-level metadata bucket.
63         // It is the value 0 encoded as an unsigned big-endian uint32.
64         metadataBucketID = [4]byte{}
65
66         // blockIdxBucketID is the ID of the internal block metadata bucket.
67         // It is the value 1 encoded as an unsigned big-endian uint32.
68         blockIdxBucketID = [4]byte{0x00, 0x00, 0x00, 0x01}
69
70         // blockIdxBucketName is the bucket used internally to track block
71         // metadata.
72         blockIdxBucketName = []byte("ffldb-blockidx")
73
74         // writeLocKeyName is the key used to store the current write file
75         // location.
76         writeLocKeyName = []byte("ffldb-writeloc")
77 )
78
79 // Common error strings.
80 const (
81         // errDbNotOpenStr is the text to use for the database.ErrDbNotOpen
82         // error code.
83         errDbNotOpenStr = "database is not open"
84
85         // errTxClosedStr is the text to use for the database.ErrTxClosed error
86         // code.
87         errTxClosedStr = "database tx is closed"
88 )
89
90 // bulkFetchData is allows a block location to be specified along with the
91 // index it was requested from.  This in turn allows the bulk data loading
92 // functions to sort the data accesses based on the location to improve
93 // performance while keeping track of which result the data is for.
94 type bulkFetchData struct {
95         *blockLocation
96         replyIndex int
97 }
98
99 // bulkFetchDataSorter implements sort.Interface to allow a slice of
100 // bulkFetchData to be sorted.  In particular it sorts by file and then
101 // offset so that reads from files are grouped and linear.
102 type bulkFetchDataSorter []bulkFetchData
103
104 // Len returns the number of items in the slice.  It is part of the
105 // sort.Interface implementation.
106 func (s bulkFetchDataSorter) Len() int {
107         return len(s)
108 }
109
110 // Swap swaps the items at the passed indices.  It is part of the
111 // sort.Interface implementation.
112 func (s bulkFetchDataSorter) Swap(i, j int) {
113         s[i], s[j] = s[j], s[i]
114 }
115
116 // Less returns whether the item with index i should sort before the item with
117 // index j.  It is part of the sort.Interface implementation.
118 func (s bulkFetchDataSorter) Less(i, j int) bool {
119         if s[i].blockFileNum < s[j].blockFileNum {
120                 return true
121         }
122         if s[i].blockFileNum > s[j].blockFileNum {
123                 return false
124         }
125
126         return s[i].fileOffset < s[j].fileOffset
127 }
128
129 // makeDbErr creates a database.Error given a set of arguments.
130 func makeDbErr(c database.ErrorCode, desc string, err error) database.Error {
131         return database.Error{ErrorCode: c, Description: desc, Err: err}
132 }
133
134 // convertErr converts the passed leveldb error into a database error with an
135 // equivalent error code  and the passed description.  It also sets the passed
136 // error as the underlying error.
137 func convertErr(desc string, ldbErr error) database.Error {
138         // Use the driver-specific error code by default.  The code below will
139         // update this with the converted error if it's recognized.
140         var code = database.ErrDriverSpecific
141
142         switch {
143         // Database corruption errors.
144         case ldberrors.IsCorrupted(ldbErr):
145                 code = database.ErrCorruption
146
147         // Database open/create errors.
148         case ldbErr == leveldb.ErrClosed:
149                 code = database.ErrDbNotOpen
150
151         // Transaction errors.
152         case ldbErr == leveldb.ErrSnapshotReleased:
153                 code = database.ErrTxClosed
154         case ldbErr == leveldb.ErrIterReleased:
155                 code = database.ErrTxClosed
156         }
157
158         return database.Error{ErrorCode: code, Description: desc, Err: ldbErr}
159 }
160
161 // copySlice returns a copy of the passed slice.  This is mostly used to copy
162 // leveldb iterator keys and values since they are only valid until the iterator
163 // is moved instead of during the entirety of the transaction.
164 func copySlice(slice []byte) []byte {
165         ret := make([]byte, len(slice))
166         copy(ret, slice)
167         return ret
168 }
169
170 // cursor is an internal type used to represent a cursor over key/value pairs
171 // and nested buckets of a bucket and implements the database.Cursor interface.
172 type cursor struct {
173         bucket      *bucket
174         dbIter      iterator.Iterator
175         pendingIter iterator.Iterator
176         currentIter iterator.Iterator
177 }
178
179 // Enforce cursor implements the database.Cursor interface.
180 var _ database.Cursor = (*cursor)(nil)
181
182 // Bucket returns the bucket the cursor was created for.
183 //
184 // This function is part of the database.Cursor interface implementation.
185 func (c *cursor) Bucket() database.Bucket {
186         // Ensure transaction state is valid.
187         if err := c.bucket.tx.checkClosed(); err != nil {
188                 return nil
189         }
190
191         return c.bucket
192 }
193
194 // Delete removes the current key/value pair the cursor is at without
195 // invalidating the cursor.
196 //
197 // Returns the following errors as required by the interface contract:
198 //   - ErrIncompatibleValue if attempted when the cursor points to a nested
199 //     bucket
200 //   - ErrTxNotWritable if attempted against a read-only transaction
201 //   - ErrTxClosed if the transaction has already been closed
202 //
203 // This function is part of the database.Cursor interface implementation.
204 func (c *cursor) Delete() error {
205         // Ensure transaction state is valid.
206         if err := c.bucket.tx.checkClosed(); err != nil {
207                 return err
208         }
209
210         // Error if the cursor is exhausted.
211         if c.currentIter == nil {
212                 str := "cursor is exhausted"
213                 return makeDbErr(database.ErrIncompatibleValue, str, nil)
214         }
215
216         // Do not allow buckets to be deleted via the cursor.
217         key := c.currentIter.Key()
218         if bytes.HasPrefix(key, bucketIndexPrefix) {
219                 str := "buckets may not be deleted from a cursor"
220                 return makeDbErr(database.ErrIncompatibleValue, str, nil)
221         }
222
223         c.bucket.tx.deleteKey(copySlice(key), true)
224         return nil
225 }
226
227 // skipPendingUpdates skips any keys at the current database iterator position
228 // that are being updated by the transaction.  The forwards flag indicates the
229 // direction the cursor is moving.
230 func (c *cursor) skipPendingUpdates(forwards bool) {
231         for c.dbIter.Valid() {
232                 var skip bool
233                 key := c.dbIter.Key()
234                 if c.bucket.tx.pendingRemove.Has(key) {
235                         skip = true
236                 } else if c.bucket.tx.pendingKeys.Has(key) {
237                         skip = true
238                 }
239                 if !skip {
240                         break
241                 }
242
243                 if forwards {
244                         c.dbIter.Next()
245                 } else {
246                         c.dbIter.Prev()
247                 }
248         }
249 }
250
251 // chooseIterator first skips any entries in the database iterator that are
252 // being updated by the transaction and sets the current iterator to the
253 // appropriate iterator depending on their validity and the order they compare
254 // in while taking into account the direction flag.  When the cursor is being
255 // moved forwards and both iterators are valid, the iterator with the smaller
256 // key is chosen and vice versa when the cursor is being moved backwards.
257 func (c *cursor) chooseIterator(forwards bool) bool {
258         // Skip any keys at the current database iterator position that are
259         // being updated by the transaction.
260         c.skipPendingUpdates(forwards)
261
262         // When both iterators are exhausted, the cursor is exhausted too.
263         if !c.dbIter.Valid() && !c.pendingIter.Valid() {
264                 c.currentIter = nil
265                 return false
266         }
267
268         // Choose the database iterator when the pending keys iterator is
269         // exhausted.
270         if !c.pendingIter.Valid() {
271                 c.currentIter = c.dbIter
272                 return true
273         }
274
275         // Choose the pending keys iterator when the database iterator is
276         // exhausted.
277         if !c.dbIter.Valid() {
278                 c.currentIter = c.pendingIter
279                 return true
280         }
281
282         // Both iterators are valid, so choose the iterator with either the
283         // smaller or larger key depending on the forwards flag.
284         compare := bytes.Compare(c.dbIter.Key(), c.pendingIter.Key())
285         if (forwards && compare > 0) || (!forwards && compare < 0) {
286                 c.currentIter = c.pendingIter
287         } else {
288                 c.currentIter = c.dbIter
289         }
290         return true
291 }
292
293 // First positions the cursor at the first key/value pair and returns whether or
294 // not the pair exists.
295 //
296 // This function is part of the database.Cursor interface implementation.
297 func (c *cursor) First() bool {
298         // Ensure transaction state is valid.
299         if err := c.bucket.tx.checkClosed(); err != nil {
300                 return false
301         }
302
303         // Seek to the first key in both the database and pending iterators and
304         // choose the iterator that is both valid and has the smaller key.
305         c.dbIter.First()
306         c.pendingIter.First()
307         return c.chooseIterator(true)
308 }
309
310 // Last positions the cursor at the last key/value pair and returns whether or
311 // not the pair exists.
312 //
313 // This function is part of the database.Cursor interface implementation.
314 func (c *cursor) Last() bool {
315         // Ensure transaction state is valid.
316         if err := c.bucket.tx.checkClosed(); err != nil {
317                 return false
318         }
319
320         // Seek to the last key in both the database and pending iterators and
321         // choose the iterator that is both valid and has the larger key.
322         c.dbIter.Last()
323         c.pendingIter.Last()
324         return c.chooseIterator(false)
325 }
326
327 // Next moves the cursor one key/value pair forward and returns whether or not
328 // the pair exists.
329 //
330 // This function is part of the database.Cursor interface implementation.
331 func (c *cursor) Next() bool {
332         // Ensure transaction state is valid.
333         if err := c.bucket.tx.checkClosed(); err != nil {
334                 return false
335         }
336
337         // Nothing to return if cursor is exhausted.
338         if c.currentIter == nil {
339                 return false
340         }
341
342         // Move the current iterator to the next entry and choose the iterator
343         // that is both valid and has the smaller key.
344         c.currentIter.Next()
345         return c.chooseIterator(true)
346 }
347
348 // Prev moves the cursor one key/value pair backward and returns whether or not
349 // the pair exists.
350 //
351 // This function is part of the database.Cursor interface implementation.
352 func (c *cursor) Prev() bool {
353         // Ensure transaction state is valid.
354         if err := c.bucket.tx.checkClosed(); err != nil {
355                 return false
356         }
357
358         // Nothing to return if cursor is exhausted.
359         if c.currentIter == nil {
360                 return false
361         }
362
363         // Move the current iterator to the previous entry and choose the
364         // iterator that is both valid and has the larger key.
365         c.currentIter.Prev()
366         return c.chooseIterator(false)
367 }
368
369 // Seek positions the cursor at the first key/value pair that is greater than or
370 // equal to the passed seek key.  Returns false if no suitable key was found.
371 //
372 // This function is part of the database.Cursor interface implementation.
373 func (c *cursor) Seek(seek []byte) bool {
374         // Ensure transaction state is valid.
375         if err := c.bucket.tx.checkClosed(); err != nil {
376                 return false
377         }
378
379         // Seek to the provided key in both the database and pending iterators
380         // then choose the iterator that is both valid and has the larger key.
381         seekKey := bucketizedKey(c.bucket.id, seek)
382         c.dbIter.Seek(seekKey)
383         c.pendingIter.Seek(seekKey)
384         return c.chooseIterator(true)
385 }
386
387 // rawKey returns the current key the cursor is pointing to without stripping
388 // the current bucket prefix or bucket index prefix.
389 func (c *cursor) rawKey() []byte {
390         // Nothing to return if cursor is exhausted.
391         if c.currentIter == nil {
392                 return nil
393         }
394
395         return copySlice(c.currentIter.Key())
396 }
397
398 // Key returns the current key the cursor is pointing to.
399 //
400 // This function is part of the database.Cursor interface implementation.
401 func (c *cursor) Key() []byte {
402         // Ensure transaction state is valid.
403         if err := c.bucket.tx.checkClosed(); err != nil {
404                 return nil
405         }
406
407         // Nothing to return if cursor is exhausted.
408         if c.currentIter == nil {
409                 return nil
410         }
411
412         // Slice out the actual key name and make a copy since it is no longer
413         // valid after iterating to the next item.
414         //
415         // The key is after the bucket index prefix and parent ID when the
416         // cursor is pointing to a nested bucket.
417         key := c.currentIter.Key()
418         if bytes.HasPrefix(key, bucketIndexPrefix) {
419                 key = key[len(bucketIndexPrefix)+4:]
420                 return copySlice(key)
421         }
422
423         // The key is after the bucket ID when the cursor is pointing to a
424         // normal entry.
425         key = key[len(c.bucket.id):]
426         return copySlice(key)
427 }
428
429 // rawValue returns the current value the cursor is pointing to without
430 // stripping without filtering bucket index values.
431 func (c *cursor) rawValue() []byte {
432         // Nothing to return if cursor is exhausted.
433         if c.currentIter == nil {
434                 return nil
435         }
436
437         return copySlice(c.currentIter.Value())
438 }
439
440 // Value returns the current value the cursor is pointing to.  This will be nil
441 // for nested buckets.
442 //
443 // This function is part of the database.Cursor interface implementation.
444 func (c *cursor) Value() []byte {
445         // Ensure transaction state is valid.
446         if err := c.bucket.tx.checkClosed(); err != nil {
447                 return nil
448         }
449
450         // Nothing to return if cursor is exhausted.
451         if c.currentIter == nil {
452                 return nil
453         }
454
455         // Return nil for the value when the cursor is pointing to a nested
456         // bucket.
457         if bytes.HasPrefix(c.currentIter.Key(), bucketIndexPrefix) {
458                 return nil
459         }
460
461         return copySlice(c.currentIter.Value())
462 }
463
464 // cursorType defines the type of cursor to create.
465 type cursorType int
466
467 // The following constants define the allowed cursor types.
468 const (
469         // ctKeys iterates through all of the keys in a given bucket.
470         ctKeys cursorType = iota
471
472         // ctBuckets iterates through all directly nested buckets in a given
473         // bucket.
474         ctBuckets
475
476         // ctFull iterates through both the keys and the directly nested buckets
477         // in a given bucket.
478         ctFull
479 )
480
481 // cursorFinalizer is either invoked when a cursor is being garbage collected or
482 // called manually to ensure the underlying cursor iterators are released.
483 func cursorFinalizer(c *cursor) {
484         c.dbIter.Release()
485         c.pendingIter.Release()
486 }
487
488 // newCursor returns a new cursor for the given bucket, bucket ID, and cursor
489 // type.
490 //
491 // NOTE: The caller is responsible for calling the cursorFinalizer function on
492 // the returned cursor.
493 func newCursor(b *bucket, bucketID []byte, cursorTyp cursorType) *cursor {
494         var dbIter, pendingIter iterator.Iterator
495         switch cursorTyp {
496         case ctKeys:
497                 keyRange := util.BytesPrefix(bucketID)
498                 dbIter = b.tx.snapshot.NewIterator(keyRange)
499                 pendingKeyIter := newLdbTreapIter(b.tx, keyRange)
500                 pendingIter = pendingKeyIter
501
502         case ctBuckets:
503                 // The serialized bucket index key format is:
504                 //   <bucketindexprefix><parentbucketid><bucketname>
505
506                 // Create an iterator for the both the database and the pending
507                 // keys which are prefixed by the bucket index identifier and
508                 // the provided bucket ID.
509                 prefix := make([]byte, len(bucketIndexPrefix)+4)
510                 copy(prefix, bucketIndexPrefix)
511                 copy(prefix[len(bucketIndexPrefix):], bucketID)
512                 bucketRange := util.BytesPrefix(prefix)
513
514                 dbIter = b.tx.snapshot.NewIterator(bucketRange)
515                 pendingBucketIter := newLdbTreapIter(b.tx, bucketRange)
516                 pendingIter = pendingBucketIter
517
518         case ctFull:
519                 fallthrough
520         default:
521                 // The serialized bucket index key format is:
522                 //   <bucketindexprefix><parentbucketid><bucketname>
523                 prefix := make([]byte, len(bucketIndexPrefix)+4)
524                 copy(prefix, bucketIndexPrefix)
525                 copy(prefix[len(bucketIndexPrefix):], bucketID)
526                 bucketRange := util.BytesPrefix(prefix)
527                 keyRange := util.BytesPrefix(bucketID)
528
529                 // Since both keys and buckets are needed from the database,
530                 // create an individual iterator for each prefix and then create
531                 // a merged iterator from them.
532                 dbKeyIter := b.tx.snapshot.NewIterator(keyRange)
533                 dbBucketIter := b.tx.snapshot.NewIterator(bucketRange)
534                 iters := []iterator.Iterator{dbKeyIter, dbBucketIter}
535                 dbIter = iterator.NewMergedIterator(iters,
536                         comparer.DefaultComparer, true)
537
538                 // Since both keys and buckets are needed from the pending keys,
539                 // create an individual iterator for each prefix and then create
540                 // a merged iterator from them.
541                 pendingKeyIter := newLdbTreapIter(b.tx, keyRange)
542                 pendingBucketIter := newLdbTreapIter(b.tx, bucketRange)
543                 iters = []iterator.Iterator{pendingKeyIter, pendingBucketIter}
544                 pendingIter = iterator.NewMergedIterator(iters,
545                         comparer.DefaultComparer, true)
546         }
547
548         // Create the cursor using the iterators.
549         return &cursor{bucket: b, dbIter: dbIter, pendingIter: pendingIter}
550 }
551
552 // bucket is an internal type used to represent a collection of key/value pairs
553 // and implements the database.Bucket interface.
554 type bucket struct {
555         tx *transaction
556         id [4]byte
557 }
558
559 // Enforce bucket implements the database.Bucket interface.
560 var _ database.Bucket = (*bucket)(nil)
561
562 // bucketIndexKey returns the actual key to use for storing and retrieving a
563 // child bucket in the bucket index.  This is required because additional
564 // information is needed to distinguish nested buckets with the same name.
565 func bucketIndexKey(parentID [4]byte, key []byte) []byte {
566         // The serialized bucket index key format is:
567         //   <bucketindexprefix><parentbucketid><bucketname>
568         indexKey := make([]byte, len(bucketIndexPrefix)+4+len(key))
569         copy(indexKey, bucketIndexPrefix)
570         copy(indexKey[len(bucketIndexPrefix):], parentID[:])
571         copy(indexKey[len(bucketIndexPrefix)+4:], key)
572         return indexKey
573 }
574
575 // bucketizedKey returns the actual key to use for storing and retrieving a key
576 // for the provided bucket ID.  This is required because bucketizing is handled
577 // through the use of a unique prefix per bucket.
578 func bucketizedKey(bucketID [4]byte, key []byte) []byte {
579         // The serialized block index key format is:
580         //   <bucketid><key>
581         bKey := make([]byte, 4+len(key))
582         copy(bKey, bucketID[:])
583         copy(bKey[4:], key)
584         return bKey
585 }
586
587 // Bucket retrieves a nested bucket with the given key.  Returns nil if
588 // the bucket does not exist.
589 //
590 // This function is part of the database.Bucket interface implementation.
591 func (b *bucket) Bucket(key []byte) database.Bucket {
592         // Ensure transaction state is valid.
593         if err := b.tx.checkClosed(); err != nil {
594                 return nil
595         }
596
597         // Attempt to fetch the ID for the child bucket.  The bucket does not
598         // exist if the bucket index entry does not exist.
599         childID := b.tx.fetchKey(bucketIndexKey(b.id, key))
600         if childID == nil {
601                 return nil
602         }
603
604         childBucket := &bucket{tx: b.tx}
605         copy(childBucket.id[:], childID)
606         return childBucket
607 }
608
609 // CreateBucket creates and returns a new nested bucket with the given key.
610 //
611 // Returns the following errors as required by the interface contract:
612 //   - ErrBucketExists if the bucket already exists
613 //   - ErrBucketNameRequired if the key is empty
614 //   - ErrIncompatibleValue if the key is otherwise invalid for the particular
615 //     implementation
616 //   - ErrTxNotWritable if attempted against a read-only transaction
617 //   - ErrTxClosed if the transaction has already been closed
618 //
619 // This function is part of the database.Bucket interface implementation.
620 func (b *bucket) CreateBucket(key []byte) (database.Bucket, error) {
621         // Ensure transaction state is valid.
622         if err := b.tx.checkClosed(); err != nil {
623                 return nil, err
624         }
625
626         // Ensure the transaction is writable.
627         if !b.tx.writable {
628                 str := "create bucket requires a writable database transaction"
629                 return nil, makeDbErr(database.ErrTxNotWritable, str, nil)
630         }
631
632         // Ensure a key was provided.
633         if len(key) == 0 {
634                 str := "create bucket requires a key"
635                 return nil, makeDbErr(database.ErrBucketNameRequired, str, nil)
636         }
637
638         // Ensure bucket does not already exist.
639         bidxKey := bucketIndexKey(b.id, key)
640         if b.tx.hasKey(bidxKey) {
641                 str := "bucket already exists"
642                 return nil, makeDbErr(database.ErrBucketExists, str, nil)
643         }
644
645         // Find the appropriate next bucket ID to use for the new bucket.  In
646         // the case of the special internal block index, keep the fixed ID.
647         var childID [4]byte
648         if b.id == metadataBucketID && bytes.Equal(key, blockIdxBucketName) {
649                 childID = blockIdxBucketID
650         } else {
651                 var err error
652                 childID, err = b.tx.nextBucketID()
653                 if err != nil {
654                         return nil, err
655                 }
656         }
657
658         // Add the new bucket to the bucket index.
659         if err := b.tx.putKey(bidxKey, childID[:]); err != nil {
660                 str := fmt.Sprintf("failed to create bucket with key %q", key)
661                 return nil, convertErr(str, err)
662         }
663         return &bucket{tx: b.tx, id: childID}, nil
664 }
665
666 // CreateBucketIfNotExists creates and returns a new nested bucket with the
667 // given key if it does not already exist.
668 //
669 // Returns the following errors as required by the interface contract:
670 //   - ErrBucketNameRequired if the key is empty
671 //   - ErrIncompatibleValue if the key is otherwise invalid for the particular
672 //     implementation
673 //   - ErrTxNotWritable if attempted against a read-only transaction
674 //   - ErrTxClosed if the transaction has already been closed
675 //
676 // This function is part of the database.Bucket interface implementation.
677 func (b *bucket) CreateBucketIfNotExists(key []byte) (database.Bucket, error) {
678         // Ensure transaction state is valid.
679         if err := b.tx.checkClosed(); err != nil {
680                 return nil, err
681         }
682
683         // Ensure the transaction is writable.
684         if !b.tx.writable {
685                 str := "create bucket requires a writable database transaction"
686                 return nil, makeDbErr(database.ErrTxNotWritable, str, nil)
687         }
688
689         // Return existing bucket if it already exists, otherwise create it.
690         if bucket := b.Bucket(key); bucket != nil {
691                 return bucket, nil
692         }
693         return b.CreateBucket(key)
694 }
695
696 // DeleteBucket removes a nested bucket with the given key.
697 //
698 // Returns the following errors as required by the interface contract:
699 //   - ErrBucketNotFound if the specified bucket does not exist
700 //   - ErrTxNotWritable if attempted against a read-only transaction
701 //   - ErrTxClosed if the transaction has already been closed
702 //
703 // This function is part of the database.Bucket interface implementation.
704 func (b *bucket) DeleteBucket(key []byte) error {
705         // Ensure transaction state is valid.
706         if err := b.tx.checkClosed(); err != nil {
707                 return err
708         }
709
710         // Ensure the transaction is writable.
711         if !b.tx.writable {
712                 str := "delete bucket requires a writable database transaction"
713                 return makeDbErr(database.ErrTxNotWritable, str, nil)
714         }
715
716         // Attempt to fetch the ID for the child bucket.  The bucket does not
717         // exist if the bucket index entry does not exist.  In the case of the
718         // special internal block index, keep the fixed ID.
719         bidxKey := bucketIndexKey(b.id, key)
720         childID := b.tx.fetchKey(bidxKey)
721         if childID == nil {
722                 str := fmt.Sprintf("bucket %q does not exist", key)
723                 return makeDbErr(database.ErrBucketNotFound, str, nil)
724         }
725
726         // Remove all nested buckets and their keys.
727         childIDs := [][]byte{childID}
728         for len(childIDs) > 0 {
729                 childID = childIDs[len(childIDs)-1]
730                 childIDs = childIDs[:len(childIDs)-1]
731
732                 // Delete all keys in the nested bucket.
733                 keyCursor := newCursor(b, childID, ctKeys)
734                 for ok := keyCursor.First(); ok; ok = keyCursor.Next() {
735                         b.tx.deleteKey(keyCursor.rawKey(), false)
736                 }
737                 cursorFinalizer(keyCursor)
738
739                 // Iterate through all nested buckets.
740                 bucketCursor := newCursor(b, childID, ctBuckets)
741                 for ok := bucketCursor.First(); ok; ok = bucketCursor.Next() {
742                         // Push the id of the nested bucket onto the stack for
743                         // the next iteration.
744                         childID := bucketCursor.rawValue()
745                         childIDs = append(childIDs, childID)
746
747                         // Remove the nested bucket from the bucket index.
748                         b.tx.deleteKey(bucketCursor.rawKey(), false)
749                 }
750                 cursorFinalizer(bucketCursor)
751         }
752
753         // Remove the nested bucket from the bucket index.  Any buckets nested
754         // under it were already removed above.
755         b.tx.deleteKey(bidxKey, true)
756         return nil
757 }
758
759 // Cursor returns a new cursor, allowing for iteration over the bucket's
760 // key/value pairs and nested buckets in forward or backward order.
761 //
762 // You must seek to a position using the First, Last, or Seek functions before
763 // calling the Next, Prev, Key, or Value functions.  Failure to do so will
764 // result in the same return values as an exhausted cursor, which is false for
765 // the Prev and Next functions and nil for Key and Value functions.
766 //
767 // This function is part of the database.Bucket interface implementation.
768 func (b *bucket) Cursor() database.Cursor {
769         // Ensure transaction state is valid.
770         if err := b.tx.checkClosed(); err != nil {
771                 return &cursor{bucket: b}
772         }
773
774         // Create the cursor and setup a runtime finalizer to ensure the
775         // iterators are released when the cursor is garbage collected.
776         c := newCursor(b, b.id[:], ctFull)
777         runtime.SetFinalizer(c, cursorFinalizer)
778         return c
779 }
780
781 // ForEach invokes the passed function with every key/value pair in the bucket.
782 // This does not include nested buckets or the key/value pairs within those
783 // nested buckets.
784 //
785 // WARNING: It is not safe to mutate data while iterating with this method.
786 // Doing so may cause the underlying cursor to be invalidated and return
787 // unexpected keys and/or values.
788 //
789 // Returns the following errors as required by the interface contract:
790 //   - ErrTxClosed if the transaction has already been closed
791 //
792 // NOTE: The values returned by this function are only valid during a
793 // transaction.  Attempting to access them after a transaction has ended will
794 // likely result in an access violation.
795 //
796 // This function is part of the database.Bucket interface implementation.
797 func (b *bucket) ForEach(fn func(k, v []byte) error) error {
798         // Ensure transaction state is valid.
799         if err := b.tx.checkClosed(); err != nil {
800                 return err
801         }
802
803         // Invoke the callback for each cursor item.  Return the error returned
804         // from the callback when it is non-nil.
805         c := newCursor(b, b.id[:], ctKeys)
806         defer cursorFinalizer(c)
807         for ok := c.First(); ok; ok = c.Next() {
808                 err := fn(c.Key(), c.Value())
809                 if err != nil {
810                         return err
811                 }
812         }
813
814         return nil
815 }
816
817 // ForEachBucket invokes the passed function with the key of every nested bucket
818 // in the current bucket.  This does not include any nested buckets within those
819 // nested buckets.
820 //
821 // WARNING: It is not safe to mutate data while iterating with this method.
822 // Doing so may cause the underlying cursor to be invalidated and return
823 // unexpected keys.
824 //
825 // Returns the following errors as required by the interface contract:
826 //   - ErrTxClosed if the transaction has already been closed
827 //
828 // NOTE: The values returned by this function are only valid during a
829 // transaction.  Attempting to access them after a transaction has ended will
830 // likely result in an access violation.
831 //
832 // This function is part of the database.Bucket interface implementation.
833 func (b *bucket) ForEachBucket(fn func(k []byte) error) error {
834         // Ensure transaction state is valid.
835         if err := b.tx.checkClosed(); err != nil {
836                 return err
837         }
838
839         // Invoke the callback for each cursor item.  Return the error returned
840         // from the callback when it is non-nil.
841         c := newCursor(b, b.id[:], ctBuckets)
842         defer cursorFinalizer(c)
843         for ok := c.First(); ok; ok = c.Next() {
844                 err := fn(c.Key())
845                 if err != nil {
846                         return err
847                 }
848         }
849
850         return nil
851 }
852
853 // Writable returns whether or not the bucket is writable.
854 //
855 // This function is part of the database.Bucket interface implementation.
856 func (b *bucket) Writable() bool {
857         return b.tx.writable
858 }
859
860 // Put saves the specified key/value pair to the bucket.  Keys that do not
861 // already exist are added and keys that already exist are overwritten.
862 //
863 // Returns the following errors as required by the interface contract:
864 //   - ErrKeyRequired if the key is empty
865 //   - ErrIncompatibleValue if the key is the same as an existing bucket
866 //   - ErrTxNotWritable if attempted against a read-only transaction
867 //   - ErrTxClosed if the transaction has already been closed
868 //
869 // This function is part of the database.Bucket interface implementation.
870 func (b *bucket) Put(key, value []byte) error {
871         // Ensure transaction state is valid.
872         if err := b.tx.checkClosed(); err != nil {
873                 return err
874         }
875
876         // Ensure the transaction is writable.
877         if !b.tx.writable {
878                 str := "setting a key requires a writable database transaction"
879                 return makeDbErr(database.ErrTxNotWritable, str, nil)
880         }
881
882         // Ensure a key was provided.
883         if len(key) == 0 {
884                 str := "put requires a key"
885                 return makeDbErr(database.ErrKeyRequired, str, nil)
886         }
887
888         return b.tx.putKey(bucketizedKey(b.id, key), value)
889 }
890
891 // Get returns the value for the given key.  Returns nil if the key does not
892 // exist in this bucket.  An empty slice is returned for keys that exist but
893 // have no value assigned.
894 //
895 // NOTE: The value returned by this function is only valid during a transaction.
896 // Attempting to access it after a transaction has ended results in undefined
897 // behavior.  Additionally, the value must NOT be modified by the caller.
898 //
899 // This function is part of the database.Bucket interface implementation.
900 func (b *bucket) Get(key []byte) []byte {
901         // Ensure transaction state is valid.
902         if err := b.tx.checkClosed(); err != nil {
903                 return nil
904         }
905
906         // Nothing to return if there is no key.
907         if len(key) == 0 {
908                 return nil
909         }
910
911         return b.tx.fetchKey(bucketizedKey(b.id, key))
912 }
913
914 // Delete removes the specified key from the bucket.  Deleting a key that does
915 // not exist does not return an error.
916 //
917 // Returns the following errors as required by the interface contract:
918 //   - ErrKeyRequired if the key is empty
919 //   - ErrIncompatibleValue if the key is the same as an existing bucket
920 //   - ErrTxNotWritable if attempted against a read-only transaction
921 //   - ErrTxClosed if the transaction has already been closed
922 //
923 // This function is part of the database.Bucket interface implementation.
924 func (b *bucket) Delete(key []byte) error {
925         // Ensure transaction state is valid.
926         if err := b.tx.checkClosed(); err != nil {
927                 return err
928         }
929
930         // Ensure the transaction is writable.
931         if !b.tx.writable {
932                 str := "deleting a value requires a writable database transaction"
933                 return makeDbErr(database.ErrTxNotWritable, str, nil)
934         }
935
936         // Nothing to do if there is no key.
937         if len(key) == 0 {
938                 return nil
939         }
940
941         b.tx.deleteKey(bucketizedKey(b.id, key), true)
942         return nil
943 }
944
945 // pendingBlock houses a block that will be written to disk when the database
946 // transaction is committed.
947 type pendingBlock struct {
948         hash  *chainhash.Hash
949         bytes []byte
950 }
951
952 // transaction represents a database transaction.  It can either be read-only or
953 // read-write and implements the database.Bucket interface.  The transaction
954 // provides a root bucket against which all read and writes occur.
955 type transaction struct {
956         managed        bool             // Is the transaction managed?
957         closed         bool             // Is the transaction closed?
958         writable       bool             // Is the transaction writable?
959         db             *db              // DB instance the tx was created from.
960         snapshot       *dbCacheSnapshot // Underlying snapshot for txns.
961         metaBucket     *bucket          // The root metadata bucket.
962         blockIdxBucket *bucket          // The block index bucket.
963
964         // Blocks that need to be stored on commit.  The pendingBlocks map is
965         // kept to allow quick lookups of pending data by block hash.
966         pendingBlocks    map[chainhash.Hash]int
967         pendingBlockData []pendingBlock
968
969         // Keys that need to be stored or deleted on commit.
970         pendingKeys   *treap.Mutable
971         pendingRemove *treap.Mutable
972
973         // Active iterators that need to be notified when the pending keys have
974         // been updated so the cursors can properly handle updates to the
975         // transaction state.
976         activeIterLock sync.RWMutex
977         activeIters    []*treap.Iterator
978 }
979
980 // Enforce transaction implements the database.Tx interface.
981 var _ database.Tx = (*transaction)(nil)
982
983 // removeActiveIter removes the passed iterator from the list of active
984 // iterators against the pending keys treap.
985 func (tx *transaction) removeActiveIter(iter *treap.Iterator) {
986         // An indexing for loop is intentionally used over a range here as range
987         // does not reevaluate the slice on each iteration nor does it adjust
988         // the index for the modified slice.
989         tx.activeIterLock.Lock()
990         for i := 0; i < len(tx.activeIters); i++ {
991                 if tx.activeIters[i] == iter {
992                         copy(tx.activeIters[i:], tx.activeIters[i+1:])
993                         tx.activeIters[len(tx.activeIters)-1] = nil
994                         tx.activeIters = tx.activeIters[:len(tx.activeIters)-1]
995                 }
996         }
997         tx.activeIterLock.Unlock()
998 }
999
1000 // addActiveIter adds the passed iterator to the list of active iterators for
1001 // the pending keys treap.
1002 func (tx *transaction) addActiveIter(iter *treap.Iterator) {
1003         tx.activeIterLock.Lock()
1004         tx.activeIters = append(tx.activeIters, iter)
1005         tx.activeIterLock.Unlock()
1006 }
1007
1008 // notifyActiveIters notifies all of the active iterators for the pending keys
1009 // treap that it has been updated.
1010 func (tx *transaction) notifyActiveIters() {
1011         tx.activeIterLock.RLock()
1012         for _, iter := range tx.activeIters {
1013                 iter.ForceReseek()
1014         }
1015         tx.activeIterLock.RUnlock()
1016 }
1017
1018 // checkClosed returns an error if the the database or transaction is closed.
1019 func (tx *transaction) checkClosed() error {
1020         // The transaction is no longer valid if it has been closed.
1021         if tx.closed {
1022                 return makeDbErr(database.ErrTxClosed, errTxClosedStr, nil)
1023         }
1024
1025         return nil
1026 }
1027
1028 // hasKey returns whether or not the provided key exists in the database while
1029 // taking into account the current transaction state.
1030 func (tx *transaction) hasKey(key []byte) bool {
1031         // When the transaction is writable, check the pending transaction
1032         // state first.
1033         if tx.writable {
1034                 if tx.pendingRemove.Has(key) {
1035                         return false
1036                 }
1037                 if tx.pendingKeys.Has(key) {
1038                         return true
1039                 }
1040         }
1041
1042         // Consult the database cache and underlying database.
1043         return tx.snapshot.Has(key)
1044 }
1045
1046 // putKey adds the provided key to the list of keys to be updated in the
1047 // database when the transaction is committed.
1048 //
1049 // NOTE: This function must only be called on a writable transaction.  Since it
1050 // is an internal helper function, it does not check.
1051 func (tx *transaction) putKey(key, value []byte) error {
1052         // Prevent the key from being deleted if it was previously scheduled
1053         // to be deleted on transaction commit.
1054         tx.pendingRemove.Delete(key)
1055
1056         // Add the key/value pair to the list to be written on transaction
1057         // commit.
1058         tx.pendingKeys.Put(key, value)
1059         tx.notifyActiveIters()
1060         return nil
1061 }
1062
1063 // fetchKey attempts to fetch the provided key from the database cache (and
1064 // hence underlying database) while taking into account the current transaction
1065 // state.  Returns nil if the key does not exist.
1066 func (tx *transaction) fetchKey(key []byte) []byte {
1067         // When the transaction is writable, check the pending transaction
1068         // state first.
1069         if tx.writable {
1070                 if tx.pendingRemove.Has(key) {
1071                         return nil
1072                 }
1073                 if value := tx.pendingKeys.Get(key); value != nil {
1074                         return value
1075                 }
1076         }
1077
1078         // Consult the database cache and underlying database.
1079         return tx.snapshot.Get(key)
1080 }
1081
1082 // deleteKey adds the provided key to the list of keys to be deleted from the
1083 // database when the transaction is committed.  The notify iterators flag is
1084 // useful to delay notifying iterators about the changes during bulk deletes.
1085 //
1086 // NOTE: This function must only be called on a writable transaction.  Since it
1087 // is an internal helper function, it does not check.
1088 func (tx *transaction) deleteKey(key []byte, notifyIterators bool) {
1089         // Remove the key from the list of pendings keys to be written on
1090         // transaction commit if needed.
1091         tx.pendingKeys.Delete(key)
1092
1093         // Add the key to the list to be deleted on transaction commit.
1094         tx.pendingRemove.Put(key, nil)
1095
1096         // Notify the active iterators about the change if the flag is set.
1097         if notifyIterators {
1098                 tx.notifyActiveIters()
1099         }
1100 }
1101
1102 // nextBucketID returns the next bucket ID to use for creating a new bucket.
1103 //
1104 // NOTE: This function must only be called on a writable transaction.  Since it
1105 // is an internal helper function, it does not check.
1106 func (tx *transaction) nextBucketID() ([4]byte, error) {
1107         // Load the currently highest used bucket ID.
1108         curIDBytes := tx.fetchKey(curBucketIDKeyName)
1109         curBucketNum := binary.BigEndian.Uint32(curIDBytes)
1110
1111         // Increment and update the current bucket ID and return it.
1112         var nextBucketID [4]byte
1113         binary.BigEndian.PutUint32(nextBucketID[:], curBucketNum+1)
1114         if err := tx.putKey(curBucketIDKeyName, nextBucketID[:]); err != nil {
1115                 return [4]byte{}, err
1116         }
1117         return nextBucketID, nil
1118 }
1119
1120 // Metadata returns the top-most bucket for all metadata storage.
1121 //
1122 // This function is part of the database.Tx interface implementation.
1123 func (tx *transaction) Metadata() database.Bucket {
1124         return tx.metaBucket
1125 }
1126
1127 // hasBlock returns whether or not a block with the given hash exists.
1128 func (tx *transaction) hasBlock(hash *chainhash.Hash) bool {
1129         // Return true if the block is pending to be written on commit since
1130         // it exists from the viewpoint of this transaction.
1131         if _, exists := tx.pendingBlocks[*hash]; exists {
1132                 return true
1133         }
1134
1135         return tx.hasKey(bucketizedKey(blockIdxBucketID, hash[:]))
1136 }
1137
1138 // StoreBlock stores the provided block into the database.  There are no checks
1139 // to ensure the block connects to a previous block, contains double spends, or
1140 // any additional functionality such as transaction indexing.  It simply stores
1141 // the block in the database.
1142 //
1143 // Returns the following errors as required by the interface contract:
1144 //   - ErrBlockExists when the block hash already exists
1145 //   - ErrTxNotWritable if attempted against a read-only transaction
1146 //   - ErrTxClosed if the transaction has already been closed
1147 //
1148 // This function is part of the database.Tx interface implementation.
1149 func (tx *transaction) StoreBlock(block *btcutil.Block) error {
1150         // Ensure transaction state is valid.
1151         if err := tx.checkClosed(); err != nil {
1152                 return err
1153         }
1154
1155         // Ensure the transaction is writable.
1156         if !tx.writable {
1157                 str := "store block requires a writable database transaction"
1158                 return makeDbErr(database.ErrTxNotWritable, str, nil)
1159         }
1160
1161         // Reject the block if it already exists.
1162         blockHash := block.Hash()
1163         if tx.hasBlock(blockHash) {
1164                 str := fmt.Sprintf("block %s already exists", blockHash)
1165                 return makeDbErr(database.ErrBlockExists, str, nil)
1166         }
1167
1168         blockBytes, err := block.Bytes()
1169         if err != nil {
1170                 str := fmt.Sprintf("failed to get serialized bytes for block %s",
1171                         blockHash)
1172                 return makeDbErr(database.ErrDriverSpecific, str, err)
1173         }
1174
1175         // Add the block to be stored to the list of pending blocks to store
1176         // when the transaction is committed.  Also, add it to pending blocks
1177         // map so it is easy to determine the block is pending based on the
1178         // block hash.
1179         if tx.pendingBlocks == nil {
1180                 tx.pendingBlocks = make(map[chainhash.Hash]int)
1181         }
1182         tx.pendingBlocks[*blockHash] = len(tx.pendingBlockData)
1183         tx.pendingBlockData = append(tx.pendingBlockData, pendingBlock{
1184                 hash:  blockHash,
1185                 bytes: blockBytes,
1186         })
1187         log.Tracef("Added block %s to pending blocks", blockHash)
1188
1189         return nil
1190 }
1191
1192 // HasBlock returns whether or not a block with the given hash exists in the
1193 // database.
1194 //
1195 // Returns the following errors as required by the interface contract:
1196 //   - ErrTxClosed if the transaction has already been closed
1197 //
1198 // This function is part of the database.Tx interface implementation.
1199 func (tx *transaction) HasBlock(hash *chainhash.Hash) (bool, error) {
1200         // Ensure transaction state is valid.
1201         if err := tx.checkClosed(); err != nil {
1202                 return false, err
1203         }
1204
1205         return tx.hasBlock(hash), nil
1206 }
1207
1208 // HasBlocks returns whether or not the blocks with the provided hashes
1209 // exist in the database.
1210 //
1211 // Returns the following errors as required by the interface contract:
1212 //   - ErrTxClosed if the transaction has already been closed
1213 //
1214 // This function is part of the database.Tx interface implementation.
1215 func (tx *transaction) HasBlocks(hashes []chainhash.Hash) ([]bool, error) {
1216         // Ensure transaction state is valid.
1217         if err := tx.checkClosed(); err != nil {
1218                 return nil, err
1219         }
1220
1221         results := make([]bool, len(hashes))
1222         for i := range hashes {
1223                 results[i] = tx.hasBlock(&hashes[i])
1224         }
1225
1226         return results, nil
1227 }
1228
1229 // fetchBlockRow fetches the metadata stored in the block index for the provided
1230 // hash.  It will return ErrBlockNotFound if there is no entry.
1231 func (tx *transaction) fetchBlockRow(hash *chainhash.Hash) ([]byte, error) {
1232         blockRow := tx.blockIdxBucket.Get(hash[:])
1233         if blockRow == nil {
1234                 str := fmt.Sprintf("block %s does not exist", hash)
1235                 return nil, makeDbErr(database.ErrBlockNotFound, str, nil)
1236         }
1237
1238         return blockRow, nil
1239 }
1240
1241 // FetchBlockHeader returns the raw serialized bytes for the block header
1242 // identified by the given hash.  The raw bytes are in the format returned by
1243 // Serialize on a wire.BlockHeader.
1244 //
1245 // Returns the following errors as required by the interface contract:
1246 //   - ErrBlockNotFound if the requested block hash does not exist
1247 //   - ErrTxClosed if the transaction has already been closed
1248 //   - ErrCorruption if the database has somehow become corrupted
1249 //
1250 // NOTE: The data returned by this function is only valid during a
1251 // database transaction.  Attempting to access it after a transaction
1252 // has ended results in undefined behavior.  This constraint prevents
1253 // additional data copies and allows support for memory-mapped database
1254 // implementations.
1255 //
1256 // This function is part of the database.Tx interface implementation.
1257 func (tx *transaction) FetchBlockHeader(hash *chainhash.Hash) ([]byte, error) {
1258         // Ensure transaction state is valid.
1259         if err := tx.checkClosed(); err != nil {
1260                 return nil, err
1261         }
1262
1263         // When the block is pending to be written on commit return the bytes
1264         // from there.
1265         if idx, exists := tx.pendingBlocks[*hash]; exists {
1266                 blockBytes := tx.pendingBlockData[idx].bytes
1267                 return blockBytes[0:blockHdrSize:blockHdrSize], nil
1268         }
1269
1270         // Fetch the block index row and slice off the header.  Notice the use
1271         // of the cap on the subslice to prevent the caller from accidentally
1272         // appending into the db data.
1273         blockRow, err := tx.fetchBlockRow(hash)
1274         if err != nil {
1275                 return nil, err
1276         }
1277         endOffset := blockLocSize + blockHdrSize
1278         return blockRow[blockLocSize:endOffset:endOffset], nil
1279 }
1280
1281 // FetchBlockHeaders returns the raw serialized bytes for the block headers
1282 // identified by the given hashes.  The raw bytes are in the format returned by
1283 // Serialize on a wire.BlockHeader.
1284 //
1285 // Returns the following errors as required by the interface contract:
1286 //   - ErrBlockNotFound if the any of the requested block hashes do not exist
1287 //   - ErrTxClosed if the transaction has already been closed
1288 //   - ErrCorruption if the database has somehow become corrupted
1289 //
1290 // NOTE: The data returned by this function is only valid during a database
1291 // transaction.  Attempting to access it after a transaction has ended results
1292 // in undefined behavior.  This constraint prevents additional data copies and
1293 // allows support for memory-mapped database implementations.
1294 //
1295 // This function is part of the database.Tx interface implementation.
1296 func (tx *transaction) FetchBlockHeaders(hashes []chainhash.Hash) ([][]byte, error) {
1297         // Ensure transaction state is valid.
1298         if err := tx.checkClosed(); err != nil {
1299                 return nil, err
1300         }
1301
1302         // NOTE: This could check for the existence of all blocks before loading
1303         // any of the headers which would be faster in the failure case, however
1304         // callers will not typically be calling this function with invalid
1305         // values, so optimize for the common case.
1306
1307         // Load the headers.
1308         headers := make([][]byte, len(hashes))
1309         for i := range hashes {
1310                 hash := &hashes[i]
1311
1312                 // When the block is pending to be written on commit return the
1313                 // bytes from there.
1314                 if idx, exists := tx.pendingBlocks[*hash]; exists {
1315                         blkBytes := tx.pendingBlockData[idx].bytes
1316                         headers[i] = blkBytes[0:blockHdrSize:blockHdrSize]
1317                         continue
1318                 }
1319
1320                 // Fetch the block index row and slice off the header.  Notice
1321                 // the use of the cap on the subslice to prevent the caller
1322                 // from accidentally appending into the db data.
1323                 blockRow, err := tx.fetchBlockRow(hash)
1324                 if err != nil {
1325                         return nil, err
1326                 }
1327                 endOffset := blockLocSize + blockHdrSize
1328                 headers[i] = blockRow[blockLocSize:endOffset:endOffset]
1329         }
1330
1331         return headers, nil
1332 }
1333
1334 // FetchBlock returns the raw serialized bytes for the block identified by the
1335 // given hash.  The raw bytes are in the format returned by Serialize on a
1336 // wire.MsgBlock.
1337 //
1338 // Returns the following errors as required by the interface contract:
1339 //   - ErrBlockNotFound if the requested block hash does not exist
1340 //   - ErrTxClosed if the transaction has already been closed
1341 //   - ErrCorruption if the database has somehow become corrupted
1342 //
1343 // In addition, returns ErrDriverSpecific if any failures occur when reading the
1344 // block files.
1345 //
1346 // NOTE: The data returned by this function is only valid during a database
1347 // transaction.  Attempting to access it after a transaction has ended results
1348 // in undefined behavior.  This constraint prevents additional data copies and
1349 // allows support for memory-mapped database implementations.
1350 //
1351 // This function is part of the database.Tx interface implementation.
1352 func (tx *transaction) FetchBlock(hash *chainhash.Hash) ([]byte, error) {
1353         // Ensure transaction state is valid.
1354         if err := tx.checkClosed(); err != nil {
1355                 return nil, err
1356         }
1357
1358         // When the block is pending to be written on commit return the bytes
1359         // from there.
1360         if idx, exists := tx.pendingBlocks[*hash]; exists {
1361                 return tx.pendingBlockData[idx].bytes, nil
1362         }
1363
1364         // Lookup the location of the block in the files from the block index.
1365         blockRow, err := tx.fetchBlockRow(hash)
1366         if err != nil {
1367                 return nil, err
1368         }
1369         location := deserializeBlockLoc(blockRow)
1370
1371         // Read the block from the appropriate location.  The function also
1372         // performs a checksum over the data to detect data corruption.
1373         blockBytes, err := tx.db.store.readBlock(hash, location)
1374         if err != nil {
1375                 return nil, err
1376         }
1377
1378         return blockBytes, nil
1379 }
1380
1381 // FetchBlocks returns the raw serialized bytes for the blocks identified by the
1382 // given hashes.  The raw bytes are in the format returned by Serialize on a
1383 // wire.MsgBlock.
1384 //
1385 // Returns the following errors as required by the interface contract:
1386 //   - ErrBlockNotFound if any of the requested block hashed do not exist
1387 //   - ErrTxClosed if the transaction has already been closed
1388 //   - ErrCorruption if the database has somehow become corrupted
1389 //
1390 // In addition, returns ErrDriverSpecific if any failures occur when reading the
1391 // block files.
1392 //
1393 // NOTE: The data returned by this function is only valid during a database
1394 // transaction.  Attempting to access it after a transaction has ended results
1395 // in undefined behavior.  This constraint prevents additional data copies and
1396 // allows support for memory-mapped database implementations.
1397 //
1398 // This function is part of the database.Tx interface implementation.
1399 func (tx *transaction) FetchBlocks(hashes []chainhash.Hash) ([][]byte, error) {
1400         // Ensure transaction state is valid.
1401         if err := tx.checkClosed(); err != nil {
1402                 return nil, err
1403         }
1404
1405         // NOTE: This could check for the existence of all blocks before loading
1406         // any of them which would be faster in the failure case, however
1407         // callers will not typically be calling this function with invalid
1408         // values, so optimize for the common case.
1409
1410         // Load the blocks.
1411         blocks := make([][]byte, len(hashes))
1412         for i := range hashes {
1413                 var err error
1414                 blocks[i], err = tx.FetchBlock(&hashes[i])
1415                 if err != nil {
1416                         return nil, err
1417                 }
1418         }
1419
1420         return blocks, nil
1421 }
1422
1423 // fetchPendingRegion attempts to fetch the provided region from any block which
1424 // are pending to be written on commit.  It will return nil for the byte slice
1425 // when the region references a block which is not pending.  When the region
1426 // does reference a pending block, it is bounds checked and returns
1427 // ErrBlockRegionInvalid if invalid.
1428 func (tx *transaction) fetchPendingRegion(region *database.BlockRegion) ([]byte, error) {
1429         // Nothing to do if the block is not pending to be written on commit.
1430         idx, exists := tx.pendingBlocks[*region.Hash]
1431         if !exists {
1432                 return nil, nil
1433         }
1434
1435         // Ensure the region is within the bounds of the block.
1436         blockBytes := tx.pendingBlockData[idx].bytes
1437         blockLen := uint32(len(blockBytes))
1438         endOffset := region.Offset + region.Len
1439         if endOffset < region.Offset || endOffset > blockLen {
1440                 str := fmt.Sprintf("block %s region offset %d, length %d "+
1441                         "exceeds block length of %d", region.Hash,
1442                         region.Offset, region.Len, blockLen)
1443                 return nil, makeDbErr(database.ErrBlockRegionInvalid, str, nil)
1444         }
1445
1446         // Return the bytes from the pending block.
1447         return blockBytes[region.Offset:endOffset:endOffset], nil
1448 }
1449
1450 // FetchBlockRegion returns the raw serialized bytes for the given block region.
1451 //
1452 // For example, it is possible to directly extract Bitcoin transactions and/or
1453 // scripts from a block with this function.  Depending on the backend
1454 // implementation, this can provide significant savings by avoiding the need to
1455 // load entire blocks.
1456 //
1457 // The raw bytes are in the format returned by Serialize on a wire.MsgBlock and
1458 // the Offset field in the provided BlockRegion is zero-based and relative to
1459 // the start of the block (byte 0).
1460 //
1461 // Returns the following errors as required by the interface contract:
1462 //   - ErrBlockNotFound if the requested block hash does not exist
1463 //   - ErrBlockRegionInvalid if the region exceeds the bounds of the associated
1464 //     block
1465 //   - ErrTxClosed if the transaction has already been closed
1466 //   - ErrCorruption if the database has somehow become corrupted
1467 //
1468 // In addition, returns ErrDriverSpecific if any failures occur when reading the
1469 // block files.
1470 //
1471 // NOTE: The data returned by this function is only valid during a database
1472 // transaction.  Attempting to access it after a transaction has ended results
1473 // in undefined behavior.  This constraint prevents additional data copies and
1474 // allows support for memory-mapped database implementations.
1475 //
1476 // This function is part of the database.Tx interface implementation.
1477 func (tx *transaction) FetchBlockRegion(region *database.BlockRegion) ([]byte, error) {
1478         // Ensure transaction state is valid.
1479         if err := tx.checkClosed(); err != nil {
1480                 return nil, err
1481         }
1482
1483         // When the block is pending to be written on commit return the bytes
1484         // from there.
1485         if tx.pendingBlocks != nil {
1486                 regionBytes, err := tx.fetchPendingRegion(region)
1487                 if err != nil {
1488                         return nil, err
1489                 }
1490                 if regionBytes != nil {
1491                         return regionBytes, nil
1492                 }
1493         }
1494
1495         // Lookup the location of the block in the files from the block index.
1496         blockRow, err := tx.fetchBlockRow(region.Hash)
1497         if err != nil {
1498                 return nil, err
1499         }
1500         location := deserializeBlockLoc(blockRow)
1501
1502         // Ensure the region is within the bounds of the block.
1503         endOffset := region.Offset + region.Len
1504         if endOffset < region.Offset || endOffset > location.blockLen {
1505                 str := fmt.Sprintf("block %s region offset %d, length %d "+
1506                         "exceeds block length of %d", region.Hash,
1507                         region.Offset, region.Len, location.blockLen)
1508                 return nil, makeDbErr(database.ErrBlockRegionInvalid, str, nil)
1509
1510         }
1511
1512         // Read the region from the appropriate disk block file.
1513         regionBytes, err := tx.db.store.readBlockRegion(location, region.Offset,
1514                 region.Len)
1515         if err != nil {
1516                 return nil, err
1517         }
1518
1519         return regionBytes, nil
1520 }
1521
1522 // FetchBlockRegions returns the raw serialized bytes for the given block
1523 // regions.
1524 //
1525 // For example, it is possible to directly extract Bitcoin transactions and/or
1526 // scripts from various blocks with this function.  Depending on the backend
1527 // implementation, this can provide significant savings by avoiding the need to
1528 // load entire blocks.
1529 //
1530 // The raw bytes are in the format returned by Serialize on a wire.MsgBlock and
1531 // the Offset fields in the provided BlockRegions are zero-based and relative to
1532 // the start of the block (byte 0).
1533 //
1534 // Returns the following errors as required by the interface contract:
1535 //   - ErrBlockNotFound if any of the request block hashes do not exist
1536 //   - ErrBlockRegionInvalid if one or more region exceed the bounds of the
1537 //     associated block
1538 //   - ErrTxClosed if the transaction has already been closed
1539 //   - ErrCorruption if the database has somehow become corrupted
1540 //
1541 // In addition, returns ErrDriverSpecific if any failures occur when reading the
1542 // block files.
1543 //
1544 // NOTE: The data returned by this function is only valid during a database
1545 // transaction.  Attempting to access it after a transaction has ended results
1546 // in undefined behavior.  This constraint prevents additional data copies and
1547 // allows support for memory-mapped database implementations.
1548 //
1549 // This function is part of the database.Tx interface implementation.
1550 func (tx *transaction) FetchBlockRegions(regions []database.BlockRegion) ([][]byte, error) {
1551         // Ensure transaction state is valid.
1552         if err := tx.checkClosed(); err != nil {
1553                 return nil, err
1554         }
1555
1556         // NOTE: This could check for the existence of all blocks before
1557         // deserializing the locations and building up the fetch list which
1558         // would be faster in the failure case, however callers will not
1559         // typically be calling this function with invalid values, so optimize
1560         // for the common case.
1561
1562         // NOTE: A potential optimization here would be to combine adjacent
1563         // regions to reduce the number of reads.
1564
1565         // In order to improve efficiency of loading the bulk data, first grab
1566         // the block location for all of the requested block hashes and sort
1567         // the reads by filenum:offset so that all reads are grouped by file
1568         // and linear within each file.  This can result in quite a significant
1569         // performance increase depending on how spread out the requested hashes
1570         // are by reducing the number of file open/closes and random accesses
1571         // needed.  The fetchList is intentionally allocated with a cap because
1572         // some of the regions might be fetched from the pending blocks and
1573         // hence there is no need to fetch those from disk.
1574         blockRegions := make([][]byte, len(regions))
1575         fetchList := make([]bulkFetchData, 0, len(regions))
1576         for i := range regions {
1577                 region := &regions[i]
1578
1579                 // When the block is pending to be written on commit grab the
1580                 // bytes from there.
1581                 if tx.pendingBlocks != nil {
1582                         regionBytes, err := tx.fetchPendingRegion(region)
1583                         if err != nil {
1584                                 return nil, err
1585                         }
1586                         if regionBytes != nil {
1587                                 blockRegions[i] = regionBytes
1588                                 continue
1589                         }
1590                 }
1591
1592                 // Lookup the location of the block in the files from the block
1593                 // index.
1594                 blockRow, err := tx.fetchBlockRow(region.Hash)
1595                 if err != nil {
1596                         return nil, err
1597                 }
1598                 location := deserializeBlockLoc(blockRow)
1599
1600                 // Ensure the region is within the bounds of the block.
1601                 endOffset := region.Offset + region.Len
1602                 if endOffset < region.Offset || endOffset > location.blockLen {
1603                         str := fmt.Sprintf("block %s region offset %d, length "+
1604                                 "%d exceeds block length of %d", region.Hash,
1605                                 region.Offset, region.Len, location.blockLen)
1606                         return nil, makeDbErr(database.ErrBlockRegionInvalid, str, nil)
1607                 }
1608
1609                 fetchList = append(fetchList, bulkFetchData{&location, i})
1610         }
1611         sort.Sort(bulkFetchDataSorter(fetchList))
1612
1613         // Read all of the regions in the fetch list and set the results.
1614         for i := range fetchList {
1615                 fetchData := &fetchList[i]
1616                 ri := fetchData.replyIndex
1617                 region := &regions[ri]
1618                 location := fetchData.blockLocation
1619                 regionBytes, err := tx.db.store.readBlockRegion(*location,
1620                         region.Offset, region.Len)
1621                 if err != nil {
1622                         return nil, err
1623                 }
1624                 blockRegions[ri] = regionBytes
1625         }
1626
1627         return blockRegions, nil
1628 }
1629
1630 // close marks the transaction closed then releases any pending data, the
1631 // underlying snapshot, the transaction read lock, and the write lock when the
1632 // transaction is writable.
1633 func (tx *transaction) close() {
1634         tx.closed = true
1635
1636         // Clear pending blocks that would have been written on commit.
1637         tx.pendingBlocks = nil
1638         tx.pendingBlockData = nil
1639
1640         // Clear pending keys that would have been written or deleted on commit.
1641         tx.pendingKeys = nil
1642         tx.pendingRemove = nil
1643
1644         // Release the snapshot.
1645         if tx.snapshot != nil {
1646                 tx.snapshot.Release()
1647                 tx.snapshot = nil
1648         }
1649
1650         tx.db.closeLock.RUnlock()
1651
1652         // Release the writer lock for writable transactions to unblock any
1653         // other write transaction which are possibly waiting.
1654         if tx.writable {
1655                 tx.db.writeLock.Unlock()
1656         }
1657 }
1658
1659 // serializeBlockRow serializes a block row into a format suitable for storage
1660 // into the block index.
1661 func serializeBlockRow(blockLoc blockLocation, blockHdr []byte) []byte {
1662         // The serialized block index row format is:
1663         //
1664         //  [0:blockLocSize]                          Block location
1665         //  [blockLocSize:blockLocSize+blockHdrSize]  Block header
1666         serializedRow := make([]byte, blockLocSize+blockHdrSize)
1667         copy(serializedRow, serializeBlockLoc(blockLoc))
1668         copy(serializedRow[blockHdrOffset:], blockHdr)
1669         return serializedRow
1670 }
1671
1672 // writePendingAndCommit writes pending block data to the flat block files,
1673 // updates the metadata with their locations as well as the new current write
1674 // location, and commits the metadata to the memory database cache.  It also
1675 // properly handles rollback in the case of failures.
1676 //
1677 // This function MUST only be called when there is pending data to be written.
1678 func (tx *transaction) writePendingAndCommit() error {
1679         // Save the current block store write position for potential rollback.
1680         // These variables are only updated here in this function and there can
1681         // only be one write transaction active at a time, so it's safe to store
1682         // them for potential rollback.
1683         wc := tx.db.store.writeCursor
1684         wc.RLock()
1685         oldBlkFileNum := wc.curFileNum
1686         oldBlkOffset := wc.curOffset
1687         wc.RUnlock()
1688
1689         // rollback is a closure that is used to rollback all writes to the
1690         // block files.
1691         rollback := func() {
1692                 // Rollback any modifications made to the block files if needed.
1693                 tx.db.store.handleRollback(oldBlkFileNum, oldBlkOffset)
1694         }
1695
1696         // Loop through all of the pending blocks to store and write them.
1697         for _, blockData := range tx.pendingBlockData {
1698                 log.Tracef("Storing block %s", blockData.hash)
1699                 location, err := tx.db.store.writeBlock(blockData.bytes)
1700                 if err != nil {
1701                         rollback()
1702                         return err
1703                 }
1704
1705                 // Add a record in the block index for the block.  The record
1706                 // includes the location information needed to locate the block
1707                 // on the filesystem as well as the block header since they are
1708                 // so commonly needed.
1709                 blockHdr := blockData.bytes[0:blockHdrSize]
1710                 blockRow := serializeBlockRow(location, blockHdr)
1711                 err = tx.blockIdxBucket.Put(blockData.hash[:], blockRow)
1712                 if err != nil {
1713                         rollback()
1714                         return err
1715                 }
1716         }
1717
1718         // Update the metadata for the current write file and offset.
1719         writeRow := serializeWriteRow(wc.curFileNum, wc.curOffset)
1720         if err := tx.metaBucket.Put(writeLocKeyName, writeRow); err != nil {
1721                 rollback()
1722                 return convertErr("failed to store write cursor", err)
1723         }
1724
1725         // Atomically update the database cache.  The cache automatically
1726         // handles flushing to the underlying persistent storage database.
1727         return tx.db.cache.commitTx(tx)
1728 }
1729
1730 // Commit commits all changes that have been made to the root metadata bucket
1731 // and all of its sub-buckets to the database cache which is periodically synced
1732 // to persistent storage.  In addition, it commits all new blocks directly to
1733 // persistent storage bypassing the db cache.  Blocks can be rather large, so
1734 // this help increase the amount of cache available for the metadata updates and
1735 // is safe since blocks are immutable.
1736 //
1737 // This function is part of the database.Tx interface implementation.
1738 func (tx *transaction) Commit() error {
1739         // Prevent commits on managed transactions.
1740         if tx.managed {
1741                 tx.close()
1742                 panic("managed transaction commit not allowed")
1743         }
1744
1745         // Ensure transaction state is valid.
1746         if err := tx.checkClosed(); err != nil {
1747                 return err
1748         }
1749
1750         // Regardless of whether the commit succeeds, the transaction is closed
1751         // on return.
1752         defer tx.close()
1753
1754         // Ensure the transaction is writable.
1755         if !tx.writable {
1756                 str := "Commit requires a writable database transaction"
1757                 return makeDbErr(database.ErrTxNotWritable, str, nil)
1758         }
1759
1760         // Write pending data.  The function will rollback if any errors occur.
1761         return tx.writePendingAndCommit()
1762 }
1763
1764 // Rollback undoes all changes that have been made to the root bucket and all of
1765 // its sub-buckets.
1766 //
1767 // This function is part of the database.Tx interface implementation.
1768 func (tx *transaction) Rollback() error {
1769         // Prevent rollbacks on managed transactions.
1770         if tx.managed {
1771                 tx.close()
1772                 panic("managed transaction rollback not allowed")
1773         }
1774
1775         // Ensure transaction state is valid.
1776         if err := tx.checkClosed(); err != nil {
1777                 return err
1778         }
1779
1780         tx.close()
1781         return nil
1782 }
1783
1784 // db represents a collection of namespaces which are persisted and implements
1785 // the database.DB interface.  All database access is performed through
1786 // transactions which are obtained through the specific Namespace.
1787 type db struct {
1788         writeLock sync.Mutex   // Limit to one write transaction at a time.
1789         closeLock sync.RWMutex // Make database close block while txns active.
1790         closed    bool         // Is the database closed?
1791         store     *blockStore  // Handles read/writing blocks to flat files.
1792         cache     *dbCache     // Cache layer which wraps underlying leveldb DB.
1793 }
1794
1795 // Enforce db implements the database.DB interface.
1796 var _ database.DB = (*db)(nil)
1797
1798 // Type returns the database driver type the current database instance was
1799 // created with.
1800 //
1801 // This function is part of the database.DB interface implementation.
1802 func (db *db) Type() string {
1803         return dbType
1804 }
1805
1806 // begin is the implementation function for the Begin database method.  See its
1807 // documentation for more details.
1808 //
1809 // This function is only separate because it returns the internal transaction
1810 // which is used by the managed transaction code while the database method
1811 // returns the interface.
1812 func (db *db) begin(writable bool) (*transaction, error) {
1813         // Whenever a new writable transaction is started, grab the write lock
1814         // to ensure only a single write transaction can be active at the same
1815         // time.  This lock will not be released until the transaction is
1816         // closed (via Rollback or Commit).
1817         if writable {
1818                 db.writeLock.Lock()
1819         }
1820
1821         // Whenever a new transaction is started, grab a read lock against the
1822         // database to ensure Close will wait for the transaction to finish.
1823         // This lock will not be released until the transaction is closed (via
1824         // Rollback or Commit).
1825         db.closeLock.RLock()
1826         if db.closed {
1827                 db.closeLock.RUnlock()
1828                 if writable {
1829                         db.writeLock.Unlock()
1830                 }
1831                 return nil, makeDbErr(database.ErrDbNotOpen, errDbNotOpenStr,
1832                         nil)
1833         }
1834
1835         // Grab a snapshot of the database cache (which in turn also handles the
1836         // underlying database).
1837         snapshot, err := db.cache.Snapshot()
1838         if err != nil {
1839                 db.closeLock.RUnlock()
1840                 if writable {
1841                         db.writeLock.Unlock()
1842                 }
1843
1844                 return nil, err
1845         }
1846
1847         // The metadata and block index buckets are internal-only buckets, so
1848         // they have defined IDs.
1849         tx := &transaction{
1850                 writable:      writable,
1851                 db:            db,
1852                 snapshot:      snapshot,
1853                 pendingKeys:   treap.NewMutable(),
1854                 pendingRemove: treap.NewMutable(),
1855         }
1856         tx.metaBucket = &bucket{tx: tx, id: metadataBucketID}
1857         tx.blockIdxBucket = &bucket{tx: tx, id: blockIdxBucketID}
1858         return tx, nil
1859 }
1860
1861 // Begin starts a transaction which is either read-only or read-write depending
1862 // on the specified flag.  Multiple read-only transactions can be started
1863 // simultaneously while only a single read-write transaction can be started at a
1864 // time.  The call will block when starting a read-write transaction when one is
1865 // already open.
1866 //
1867 // NOTE: The transaction must be closed by calling Rollback or Commit on it when
1868 // it is no longer needed.  Failure to do so will result in unclaimed memory.
1869 //
1870 // This function is part of the database.DB interface implementation.
1871 func (db *db) Begin(writable bool) (database.Tx, error) {
1872         return db.begin(writable)
1873 }
1874
1875 // rollbackOnPanic rolls the passed transaction back if the code in the calling
1876 // function panics.  This is needed since the mutex on a transaction must be
1877 // released and a panic in called code would prevent that from happening.
1878 //
1879 // NOTE: This can only be handled manually for managed transactions since they
1880 // control the life-cycle of the transaction.  As the documentation on Begin
1881 // calls out, callers opting to use manual transactions will have to ensure the
1882 // transaction is rolled back on panic if it desires that functionality as well
1883 // or the database will fail to close since the read-lock will never be
1884 // released.
1885 func rollbackOnPanic(tx *transaction) {
1886         if err := recover(); err != nil {
1887                 tx.managed = false
1888                 _ = tx.Rollback()
1889                 panic(err)
1890         }
1891 }
1892
1893 // View invokes the passed function in the context of a managed read-only
1894 // transaction with the root bucket for the namespace.  Any errors returned from
1895 // the user-supplied function are returned from this function.
1896 //
1897 // This function is part of the database.DB interface implementation.
1898 func (db *db) View(fn func(database.Tx) error) error {
1899         // Start a read-only transaction.
1900         tx, err := db.begin(false)
1901         if err != nil {
1902                 return err
1903         }
1904
1905         // Since the user-provided function might panic, ensure the transaction
1906         // releases all mutexes and resources.  There is no guarantee the caller
1907         // won't use recover and keep going.  Thus, the database must still be
1908         // in a usable state on panics due to caller issues.
1909         defer rollbackOnPanic(tx)
1910
1911         tx.managed = true
1912         err = fn(tx)
1913         tx.managed = false
1914         if err != nil {
1915                 // The error is ignored here because nothing was written yet
1916                 // and regardless of a rollback failure, the tx is closed now
1917                 // anyways.
1918                 _ = tx.Rollback()
1919                 return err
1920         }
1921
1922         return tx.Rollback()
1923 }
1924
1925 // Update invokes the passed function in the context of a managed read-write
1926 // transaction with the root bucket for the namespace.  Any errors returned from
1927 // the user-supplied function will cause the transaction to be rolled back and
1928 // are returned from this function.  Otherwise, the transaction is committed
1929 // when the user-supplied function returns a nil error.
1930 //
1931 // This function is part of the database.DB interface implementation.
1932 func (db *db) Update(fn func(database.Tx) error) error {
1933         // Start a read-write transaction.
1934         tx, err := db.begin(true)
1935         if err != nil {
1936                 return err
1937         }
1938
1939         // Since the user-provided function might panic, ensure the transaction
1940         // releases all mutexes and resources.  There is no guarantee the caller
1941         // won't use recover and keep going.  Thus, the database must still be
1942         // in a usable state on panics due to caller issues.
1943         defer rollbackOnPanic(tx)
1944
1945         tx.managed = true
1946         err = fn(tx)
1947         tx.managed = false
1948         if err != nil {
1949                 // The error is ignored here because nothing was written yet
1950                 // and regardless of a rollback failure, the tx is closed now
1951                 // anyways.
1952                 _ = tx.Rollback()
1953                 return err
1954         }
1955
1956         return tx.Commit()
1957 }
1958
1959 // Close cleanly shuts down the database and syncs all data.  It will block
1960 // until all database transactions have been finalized (rolled back or
1961 // committed).
1962 //
1963 // This function is part of the database.DB interface implementation.
1964 func (db *db) Close() error {
1965         // Since all transactions have a read lock on this mutex, this will
1966         // cause Close to wait for all readers to complete.
1967         db.closeLock.Lock()
1968         defer db.closeLock.Unlock()
1969
1970         if db.closed {
1971                 return makeDbErr(database.ErrDbNotOpen, errDbNotOpenStr, nil)
1972         }
1973         db.closed = true
1974
1975         // NOTE: Since the above lock waits for all transactions to finish and
1976         // prevents any new ones from being started, it is safe to flush the
1977         // cache and clear all state without the individual locks.
1978
1979         // Close the database cache which will flush any existing entries to
1980         // disk and close the underlying leveldb database.  Any error is saved
1981         // and returned at the end after the remaining cleanup since the
1982         // database will be marked closed even if this fails given there is no
1983         // good way for the caller to recover from a failure here anyways.
1984         closeErr := db.cache.Close()
1985
1986         // Close any open flat files that house the blocks.
1987         wc := db.store.writeCursor
1988         if wc.curFile.file != nil {
1989                 _ = wc.curFile.file.Close()
1990                 wc.curFile.file = nil
1991         }
1992         for _, blockFile := range db.store.openBlockFiles {
1993                 _ = blockFile.file.Close()
1994         }
1995         db.store.openBlockFiles = nil
1996         db.store.openBlocksLRU.Init()
1997         db.store.fileNumToLRUElem = nil
1998
1999         return closeErr
2000 }
2001
2002 // filesExists reports whether the named file or directory exists.
2003 func fileExists(name string) bool {
2004         if _, err := os.Stat(name); err != nil {
2005                 if os.IsNotExist(err) {
2006                         return false
2007                 }
2008         }
2009         return true
2010 }
2011
2012 // initDB creates the initial buckets and values used by the package.  This is
2013 // mainly in a separate function for testing purposes.
2014 func initDB(ldb *leveldb.DB) error {
2015         // The starting block file write cursor location is file num 0, offset
2016         // 0.
2017         batch := new(leveldb.Batch)
2018         batch.Put(bucketizedKey(metadataBucketID, writeLocKeyName),
2019                 serializeWriteRow(0, 0))
2020
2021         // Create block index bucket and set the current bucket id.
2022         //
2023         // NOTE: Since buckets are virtualized through the use of prefixes,
2024         // there is no need to store the bucket index data for the metadata
2025         // bucket in the database.  However, the first bucket ID to use does
2026         // need to account for it to ensure there are no key collisions.
2027         batch.Put(bucketIndexKey(metadataBucketID, blockIdxBucketName),
2028                 blockIdxBucketID[:])
2029         batch.Put(curBucketIDKeyName, blockIdxBucketID[:])
2030
2031         // Write everything as a single batch.
2032         if err := ldb.Write(batch, nil); err != nil {
2033                 str := fmt.Sprintf("failed to initialize metadata database: %v",
2034                         err)
2035                 return convertErr(str, err)
2036         }
2037
2038         return nil
2039 }
2040
2041 // openDB opens the database at the provided path.  database.ErrDbDoesNotExist
2042 // is returned if the database doesn't exist and the create flag is not set.
2043 func openDB(dbPath string, network wire.BitcoinNet, create bool) (database.DB, error) {
2044         // Error if the database doesn't exist and the create flag is not set.
2045         metadataDbPath := filepath.Join(dbPath, metadataDbName)
2046         dbExists := fileExists(metadataDbPath)
2047         if !create && !dbExists {
2048                 str := fmt.Sprintf("database %q does not exist", metadataDbPath)
2049                 return nil, makeDbErr(database.ErrDbDoesNotExist, str, nil)
2050         }
2051
2052         // Ensure the full path to the database exists.
2053         if !dbExists {
2054                 // The error can be ignored here since the call to
2055                 // leveldb.OpenFile will fail if the directory couldn't be
2056                 // created.
2057                 _ = os.MkdirAll(dbPath, 0700)
2058         }
2059
2060         // Open the metadata database (will create it if needed).
2061         opts := opt.Options{
2062                 ErrorIfExist: create,
2063                 Strict:       opt.DefaultStrict,
2064                 Compression:  opt.NoCompression,
2065                 Filter:       filter.NewBloomFilter(10),
2066         }
2067         ldb, err := leveldb.OpenFile(metadataDbPath, &opts)
2068         if err != nil {
2069                 return nil, convertErr(err.Error(), err)
2070         }
2071
2072         // Create the block store which includes scanning the existing flat
2073         // block files to find what the current write cursor position is
2074         // according to the data that is actually on disk.  Also create the
2075         // database cache which wraps the underlying leveldb database to provide
2076         // write caching.
2077         store := newBlockStore(dbPath, network)
2078         cache := newDbCache(ldb, store, defaultCacheSize, defaultFlushSecs)
2079         pdb := &db{store: store, cache: cache}
2080
2081         // Perform any reconciliation needed between the block and metadata as
2082         // well as database initialization, if needed.
2083         return reconcileDB(pdb, create)
2084 }