OSDN Git Service

new repo
[bytom/vapor.git] / vendor / github.com / btcsuite / btcd / database / ffldb / reconcile.go
1 // Copyright (c) 2015-2016 The btcsuite developers
2 // Use of this source code is governed by an ISC
3 // license that can be found in the LICENSE file.
4
5 package ffldb
6
7 import (
8         "fmt"
9         "hash/crc32"
10
11         "github.com/btcsuite/btcd/database"
12 )
13
14 // The serialized write cursor location format is:
15 //
16 //  [0:4]  Block file (4 bytes)
17 //  [4:8]  File offset (4 bytes)
18 //  [8:12] Castagnoli CRC-32 checksum (4 bytes)
19
20 // serializeWriteRow serialize the current block file and offset where new
21 // will be written into a format suitable for storage into the metadata.
22 func serializeWriteRow(curBlockFileNum, curFileOffset uint32) []byte {
23         var serializedRow [12]byte
24         byteOrder.PutUint32(serializedRow[0:4], curBlockFileNum)
25         byteOrder.PutUint32(serializedRow[4:8], curFileOffset)
26         checksum := crc32.Checksum(serializedRow[:8], castagnoli)
27         byteOrder.PutUint32(serializedRow[8:12], checksum)
28         return serializedRow[:]
29 }
30
31 // deserializeWriteRow deserializes the write cursor location stored in the
32 // metadata.  Returns ErrCorruption if the checksum of the entry doesn't match.
33 func deserializeWriteRow(writeRow []byte) (uint32, uint32, error) {
34         // Ensure the checksum matches.  The checksum is at the end.
35         gotChecksum := crc32.Checksum(writeRow[:8], castagnoli)
36         wantChecksumBytes := writeRow[8:12]
37         wantChecksum := byteOrder.Uint32(wantChecksumBytes)
38         if gotChecksum != wantChecksum {
39                 str := fmt.Sprintf("metadata for write cursor does not match "+
40                         "the expected checksum - got %d, want %d", gotChecksum,
41                         wantChecksum)
42                 return 0, 0, makeDbErr(database.ErrCorruption, str, nil)
43         }
44
45         fileNum := byteOrder.Uint32(writeRow[0:4])
46         fileOffset := byteOrder.Uint32(writeRow[4:8])
47         return fileNum, fileOffset, nil
48 }
49
50 // reconcileDB reconciles the metadata with the flat block files on disk.  It
51 // will also initialize the underlying database if the create flag is set.
52 func reconcileDB(pdb *db, create bool) (database.DB, error) {
53         // Perform initial internal bucket and value creation during database
54         // creation.
55         if create {
56                 if err := initDB(pdb.cache.ldb); err != nil {
57                         return nil, err
58                 }
59         }
60
61         // Load the current write cursor position from the metadata.
62         var curFileNum, curOffset uint32
63         err := pdb.View(func(tx database.Tx) error {
64                 writeRow := tx.Metadata().Get(writeLocKeyName)
65                 if writeRow == nil {
66                         str := "write cursor does not exist"
67                         return makeDbErr(database.ErrCorruption, str, nil)
68                 }
69
70                 var err error
71                 curFileNum, curOffset, err = deserializeWriteRow(writeRow)
72                 return err
73         })
74         if err != nil {
75                 return nil, err
76         }
77
78         // When the write cursor position found by scanning the block files on
79         // disk is AFTER the position the metadata believes to be true, truncate
80         // the files on disk to match the metadata.  This can be a fairly common
81         // occurrence in unclean shutdown scenarios while the block files are in
82         // the middle of being written.  Since the metadata isn't updated until
83         // after the block data is written, this is effectively just a rollback
84         // to the known good point before the unclean shutdown.
85         wc := pdb.store.writeCursor
86         if wc.curFileNum > curFileNum || (wc.curFileNum == curFileNum &&
87                 wc.curOffset > curOffset) {
88
89                 log.Info("Detected unclean shutdown - Repairing...")
90                 log.Debugf("Metadata claims file %d, offset %d. Block data is "+
91                         "at file %d, offset %d", curFileNum, curOffset,
92                         wc.curFileNum, wc.curOffset)
93                 pdb.store.handleRollback(curFileNum, curOffset)
94                 log.Infof("Database sync complete")
95         }
96
97         // When the write cursor position found by scanning the block files on
98         // disk is BEFORE the position the metadata believes to be true, return
99         // a corruption error.  Since sync is called after each block is written
100         // and before the metadata is updated, this should only happen in the
101         // case of missing, deleted, or truncated block files, which generally
102         // is not an easily recoverable scenario.  In the future, it might be
103         // possible to rescan and rebuild the metadata from the block files,
104         // however, that would need to happen with coordination from a higher
105         // layer since it could invalidate other metadata.
106         if wc.curFileNum < curFileNum || (wc.curFileNum == curFileNum &&
107                 wc.curOffset < curOffset) {
108
109                 str := fmt.Sprintf("metadata claims file %d, offset %d, but "+
110                         "block data is at file %d, offset %d", curFileNum,
111                         curOffset, wc.curFileNum, wc.curOffset)
112                 log.Warnf("***Database corruption detected***: %v", str)
113                 return nil, makeDbErr(database.ErrCorruption, str, nil)
114         }
115
116         return pdb, nil
117 }