OSDN Git Service

add_federation (#1936)
[bytom/bytom.git] / database / store.go
1 package database
2
3 import (
4         "encoding/binary"
5         "encoding/hex"
6         "encoding/json"
7         "time"
8
9         log "github.com/sirupsen/logrus"
10         "github.com/tendermint/tmlibs/common"
11
12         "github.com/bytom/bytom/consensus"
13         dbm "github.com/bytom/bytom/database/leveldb"
14         "github.com/bytom/bytom/database/storage"
15         "github.com/bytom/bytom/errors"
16         "github.com/bytom/bytom/protocol"
17         "github.com/bytom/bytom/protocol/bc"
18         "github.com/bytom/bytom/protocol/bc/types"
19         "github.com/bytom/bytom/protocol/state"
20 )
21
22 const logModule = "leveldb"
23
24 var (
25         // CheckpointPrefix represent the namespace of checkpoints in db
26         CheckpointPrefix = []byte("CP:")
27         // BlockStoreKey block store key
28         BlockStoreKey = []byte("blockStore")
29         // BlockHeaderIndexPrefix  block header index with height
30         BlockHeaderIndexPrefix = []byte("BH:")
31 )
32
33 func loadBlockStoreStateJSON(db dbm.DB) *protocol.BlockStoreState {
34         bytes := db.Get(BlockStoreKey)
35         if bytes == nil {
36                 return nil
37         }
38         bsj := &protocol.BlockStoreState{}
39         if err := json.Unmarshal(bytes, bsj); err != nil {
40                 common.PanicCrisis(common.Fmt("Could not unmarshal bytes: %X", bytes))
41         }
42         return bsj
43 }
44
45 // A Store encapsulates storage for blockchain validation.
46 // It satisfies the interface protocol.Store, and provides additional
47 // methods for querying current data.
48 type Store struct {
49         db    dbm.DB
50         cache cache
51 }
52
53 // NewStore creates and returns a new Store object.
54 func NewStore(db dbm.DB) *Store {
55         fillBlockHeaderFn := func(hash *bc.Hash) (*types.BlockHeader, error) {
56                 return GetBlockHeader(db, hash)
57         }
58
59         fillBlockTxsFn := func(hash *bc.Hash) ([]*types.Tx, error) {
60                 return GetBlockTransactions(db, hash)
61         }
62
63         fillBlockHashesFn := func(height uint64) ([]*bc.Hash, error) {
64                 return GetBlockHashesByHeight(db, height)
65         }
66
67         cache := newCache(fillBlockHeaderFn, fillBlockTxsFn, fillBlockHashesFn)
68         return &Store{
69                 db:    db,
70                 cache: cache,
71         }
72 }
73
74 // GetBlockHeader return the BlockHeader by given hash
75 func (s *Store) GetBlockHeader(hash *bc.Hash) (*types.BlockHeader, error) {
76         return s.cache.lookupBlockHeader(hash)
77 }
78
79 // GetUtxo will search the utxo in db
80 func (s *Store) GetUtxo(hash *bc.Hash) (*storage.UtxoEntry, error) {
81         return getUtxo(s.db, hash)
82 }
83
84 func (s *Store) GetContract(hash [32]byte) ([]byte, error) {
85         return getContract(s.db, hash)
86 }
87
88 // BlockExist check if the block is stored in disk
89 func (s *Store) BlockExist(hash *bc.Hash) bool {
90         _, err := s.cache.lookupBlockHeader(hash)
91         return err == nil
92 }
93
94 // SaveBlockHeader persists a new block header in the protocol.
95 func (s *Store) SaveBlockHeader(blockHeader *types.BlockHeader) error {
96         binaryBlockHeader, err := blockHeader.MarshalText()
97         if err != nil {
98                 return errors.Wrap(err, "Marshal block header")
99         }
100
101         blockHash := blockHeader.Hash()
102         s.db.Set(CalcBlockHeaderKey(&blockHash), binaryBlockHeader)
103         s.cache.removeBlockHeader(blockHeader)
104         return nil
105 }
106
107 // GetBlockHashesByHeight return the block hash by the specified height
108 func (s *Store) GetBlockHashesByHeight(height uint64) ([]*bc.Hash, error) {
109         return s.cache.lookupBlockHashesByHeight(height)
110 }
111
112 // SaveBlock persists a new block in the protocol.
113 func (s *Store) SaveBlock(block *types.Block) error {
114         startTime := time.Now()
115         binaryBlockHeader, err := block.MarshalTextForBlockHeader()
116         if err != nil {
117                 return errors.Wrap(err, "Marshal block header")
118         }
119
120         binaryBlockTxs, err := block.MarshalTextForTransactions()
121         if err != nil {
122                 return errors.Wrap(err, "Marshal block transactions")
123         }
124
125         blockHashes := []*bc.Hash{}
126         hashes, err := s.GetBlockHashesByHeight(block.Height)
127         if err != nil {
128                 return err
129         }
130
131         blockHashes = append(blockHashes, hashes...)
132         blockHash := block.Hash()
133         blockHashes = append(blockHashes, &blockHash)
134         binaryBlockHashes, err := json.Marshal(blockHashes)
135         if err != nil {
136                 return errors.Wrap(err, "Marshal block hashes")
137         }
138
139         batch := s.db.NewBatch()
140         batch.Set(CalcBlockHashesKey(block.Height), binaryBlockHashes)
141         batch.Set(CalcBlockHeaderKey(&blockHash), binaryBlockHeader)
142         batch.Set(CalcBlockTransactionsKey(&blockHash), binaryBlockTxs)
143         batch.Set(CalcBlockHeaderIndexKey(block.Height, &blockHash), binaryBlockHeader)
144         batch.Write()
145
146         s.cache.removeBlockHashes(block.Height)
147         log.WithFields(log.Fields{
148                 "module":   logModule,
149                 "height":   block.Height,
150                 "hash":     blockHash.String(),
151                 "duration": time.Since(startTime),
152         }).Info("block saved on disk")
153         return nil
154 }
155
156 // GetBlockTransactions return the Block transactions by given hash
157 func (s *Store) GetBlockTransactions(hash *bc.Hash) ([]*types.Tx, error) {
158         return s.cache.lookupBlockTxs(hash)
159 }
160
161 // GetBlock return the block by given hash
162 func (s *Store) GetBlock(hash *bc.Hash) (*types.Block, error) {
163         blockHeader, err := s.GetBlockHeader(hash)
164         if err != nil {
165                 return nil, err
166         }
167
168         txs, err := s.GetBlockTransactions(hash)
169         if err != nil {
170                 return nil, err
171         }
172
173         return &types.Block{
174                 BlockHeader:  *blockHeader,
175                 Transactions: txs,
176         }, nil
177 }
178
179 // GetTransactionsUtxo will return all the utxo that related to the input txs
180 func (s *Store) GetTransactionsUtxo(view *state.UtxoViewpoint, txs []*bc.Tx) error {
181         return getTransactionsUtxo(s.db, view, txs)
182 }
183
184 // GetStoreStatus return the BlockStoreStateJSON
185 func (s *Store) GetStoreStatus() *protocol.BlockStoreState {
186         return loadBlockStoreStateJSON(s.db)
187 }
188
189 // LoadBlockIndex loadblockIndex by bestHeight
190 func (s *Store) LoadBlockIndex(stateBestHeight uint64) (*state.BlockIndex, error) {
191         startTime := time.Now()
192         blockIndex := state.NewBlockIndex()
193         bhIter := s.db.IteratorPrefix(BlockHeaderIndexPrefix)
194         defer bhIter.Release()
195
196         var lastNode *state.BlockNode
197         for bhIter.Next() {
198                 bh := &types.BlockHeader{}
199                 if err := bh.UnmarshalText(bhIter.Value()); err != nil {
200                         return nil, err
201                 }
202
203                 // If a block with a height greater than the best height of state is added to the index,
204                 // It may cause a bug that the new block cant not be process properly.
205                 if bh.Height > stateBestHeight {
206                         break
207                 }
208
209                 var parent *state.BlockNode
210                 if lastNode == nil || lastNode.Hash == bh.PreviousBlockHash {
211                         parent = lastNode
212                 } else {
213                         parent = blockIndex.GetNode(&bh.PreviousBlockHash)
214                 }
215
216                 node, err := state.NewBlockNode(bh, parent)
217                 if err != nil {
218                         return nil, err
219                 }
220
221                 blockIndex.AddNode(node)
222                 lastNode = node
223         }
224
225         log.WithFields(log.Fields{
226                 "module":   logModule,
227                 "height":   stateBestHeight,
228                 "duration": time.Since(startTime),
229         }).Debug("initialize load history block index from database")
230         return blockIndex, nil
231 }
232
233 // SaveChainStatus save the core's newest status && delete old status
234 func (s *Store) SaveChainStatus(node *state.BlockNode, view *state.UtxoViewpoint, contractView *state.ContractViewpoint, finalizedHeight uint64, finalizedHash *bc.Hash) error {
235         batch := s.db.NewBatch()
236         if err := saveUtxoView(batch, view); err != nil {
237                 return err
238         }
239
240         if err := deleteContractView(s.db, batch, contractView); err != nil {
241                 return err
242         }
243
244         if err := saveContractView(s.db, batch, contractView); err != nil {
245                 return err
246         }
247
248         bytes, err := json.Marshal(protocol.BlockStoreState{Height: node.Height, Hash: &node.Hash, FinalizedHeight: finalizedHeight, FinalizedHash: finalizedHash})
249         if err != nil {
250                 return err
251         }
252
253         batch.Set(BlockStoreKey, bytes)
254         batch.Write()
255         return nil
256 }
257
258 func calcCheckpointKey(height uint64, hash *bc.Hash) []byte {
259         buf := make([]byte, 8)
260         binary.BigEndian.PutUint64(buf, height)
261         key := append(CheckpointPrefix, buf...)
262         if hash != nil {
263                 key = append(key, hash.Bytes()...)
264         }
265         return key
266 }
267
268 func (s *Store) GetCheckpoint(hash *bc.Hash) (*state.Checkpoint, error) {
269         header, err := s.GetBlockHeader(hash)
270         if err != nil {
271                 return nil, err
272         }
273
274         data := s.db.Get(calcCheckpointKey(header.Height, hash))
275         checkpoint := &state.Checkpoint{}
276         if err := json.Unmarshal(data, checkpoint); err != nil {
277                 return nil, err
278         }
279
280         setSupLinkToCheckpoint(checkpoint, header.SupLinks)
281         return checkpoint, nil
282 }
283
284 // GetCheckpointsByHeight return all checkpoints of specified block height
285 func (s *Store) GetCheckpointsByHeight(height uint64) ([]*state.Checkpoint, error) {
286         iter := s.db.IteratorPrefix(calcCheckpointKey(height, nil))
287         defer iter.Release()
288         return s.loadCheckpointsFromIter(iter)
289 }
290
291 // CheckpointsFromNode return all checkpoints from specified block height and hash
292 func (s *Store) CheckpointsFromNode(height uint64, hash *bc.Hash) ([]*state.Checkpoint, error) {
293         startKey := calcCheckpointKey(height, hash)
294         iter := s.db.IteratorPrefixWithStart(CheckpointPrefix, startKey, false)
295
296         firstCheckpoint := &state.Checkpoint{}
297         if err := json.Unmarshal(iter.Value(), firstCheckpoint); err != nil {
298                 return nil, err
299         }
300
301         checkpoints := []*state.Checkpoint{firstCheckpoint}
302         subs, err := s.loadCheckpointsFromIter(iter)
303         if err != nil {
304                 return nil, err
305         }
306
307         checkpoints = append(checkpoints, subs...)
308         return checkpoints, nil
309 }
310
311 func (s *Store) loadCheckpointsFromIter(iter dbm.Iterator) ([]*state.Checkpoint, error) {
312         var checkpoints []*state.Checkpoint
313         defer iter.Release()
314         for iter.Next() {
315                 checkpoint := &state.Checkpoint{}
316                 if err := json.Unmarshal(iter.Value(), checkpoint); err != nil {
317                         return nil, err
318                 }
319
320                 header, err := s.GetBlockHeader(&checkpoint.Hash)
321                 if err != nil {
322                         return nil, err
323                 }
324
325                 setSupLinkToCheckpoint(checkpoint, header.SupLinks)
326                 checkpoints = append(checkpoints, checkpoint)
327         }
328         return checkpoints, nil
329 }
330
331 // SaveCheckpoints bulk save multiple checkpoint
332 func (s *Store) SaveCheckpoints(checkpoints ...*state.Checkpoint) error {
333         batch := s.db.NewBatch()
334         for _, checkpoint := range checkpoints {
335                 data, err := json.Marshal(checkpoint)
336                 if err != nil {
337                         return err
338                 }
339
340                 if checkpoint.Height % state.BlocksOfEpoch != 1 {
341                         header, err := s.GetBlockHeader(&checkpoint.Hash)
342                         if err != nil {
343                                 return err
344                         }
345
346                         batch.Delete(calcCheckpointKey(header.Height-1, &header.PreviousBlockHash))
347                 }
348
349                 batch.Set(calcCheckpointKey(checkpoint.Height, &checkpoint.Hash), data)
350         }
351         batch.Write()
352         return nil
353 }
354
355 func setSupLinkToCheckpoint(c *state.Checkpoint, supLinks types.SupLinks) {
356         for _, supLink := range supLinks {
357                 var signatures [consensus.MaxNumOfValidators]string
358                 for i, signature := range supLink.Signatures {
359                         signatures[i] = hex.EncodeToString(signature)
360                 }
361
362                 c.SupLinks = append(c.SupLinks, &state.SupLink{
363                         SourceHeight: supLink.SourceHeight,
364                         SourceHash:   supLink.SourceHash,
365                         Signatures:   signatures,
366                 })
367         }
368 }