OSDN Git Service

versoin1.1.9 (#594)
[bytom/vapor.git] / database / store.go
1 package database
2
3 import (
4         "encoding/binary"
5         "encoding/json"
6         "fmt"
7         "time"
8
9         "github.com/golang/protobuf/proto"
10         log "github.com/sirupsen/logrus"
11
12         dbm "github.com/bytom/vapor/database/leveldb"
13         "github.com/bytom/vapor/database/storage"
14         "github.com/bytom/vapor/errors"
15         "github.com/bytom/vapor/protocol"
16         "github.com/bytom/vapor/protocol/bc"
17         "github.com/bytom/vapor/protocol/bc/types"
18         "github.com/bytom/vapor/protocol/state"
19 )
20
21 const (
22         // log module
23         logModule = "leveldb"
24         // the byte of colon(:)
25         colon = byte(0x3a)
26 )
27
28 const (
29         blockStore byte = iota
30         blockHashes
31         blockHeader
32         blockTransactons
33         mainChainIndex
34         txStatus
35         consensusResult
36 )
37
38 func loadBlockStoreStateJSON(db dbm.DB) *protocol.BlockStoreState {
39         bytes := db.Get([]byte{blockStore})
40         if bytes == nil {
41                 return nil
42         }
43
44         bsj := &protocol.BlockStoreState{}
45         if err := json.Unmarshal(bytes, bsj); err != nil {
46                 log.WithField("err", err).Panic("fail on unmarshal BlockStoreStateJSON")
47         }
48         return bsj
49 }
50
51 // A Store encapsulates storage for blockchain validation.
52 // It satisfies the interface protocol.Store, and provides additional
53 // methods for querying current data.
54 type Store struct {
55         db    dbm.DB
56         cache *cache
57 }
58
59 func calcMainChainIndexPrefix(height uint64) []byte {
60         buf := [8]byte{}
61         binary.BigEndian.PutUint64(buf[:], height)
62         return append([]byte{mainChainIndex, colon}, buf[:]...)
63 }
64
65 func calcBlockHashesPrefix(height uint64) []byte {
66         buf := [8]byte{}
67         binary.BigEndian.PutUint64(buf[:], height)
68         return append([]byte{blockHashes, colon}, buf[:]...)
69 }
70
71 func calcBlockHeaderKey(hash *bc.Hash) []byte {
72         return append([]byte{blockHeader, colon}, hash.Bytes()...)
73 }
74
75 func calcBlockTransactionsKey(hash *bc.Hash) []byte {
76         return append([]byte{blockTransactons, colon}, hash.Bytes()...)
77 }
78
79 func calcTxStatusKey(hash *bc.Hash) []byte {
80         return append([]byte{txStatus, colon}, hash.Bytes()...)
81 }
82
83 func calcConsensusResultKey(seq uint64) []byte {
84         buf := [8]byte{}
85         binary.BigEndian.PutUint64(buf[:], seq)
86         return append([]byte{consensusResult, colon}, buf[:]...)
87 }
88
89 // GetBlockHeader return the block header by given hash
90 func GetBlockHeader(db dbm.DB, hash *bc.Hash) (*types.BlockHeader, error) {
91         binaryBlockHeader := db.Get(calcBlockHeaderKey(hash))
92         if binaryBlockHeader == nil {
93                 return nil, fmt.Errorf("There are no blockHeader with given hash %s", hash.String())
94         }
95
96         blockHeader := &types.BlockHeader{}
97         if err := blockHeader.UnmarshalText(binaryBlockHeader); err != nil {
98                 return nil, err
99         }
100         return blockHeader, nil
101 }
102
103 // GetBlockTransactions return the block transactions by given hash
104 func GetBlockTransactions(db dbm.DB, hash *bc.Hash) ([]*types.Tx, error) {
105         binaryBlockTxs := db.Get(calcBlockTransactionsKey(hash))
106         if binaryBlockTxs == nil {
107                 return nil, fmt.Errorf("There are no block transactions with given hash %s", hash.String())
108         }
109
110         block := &types.Block{}
111         if err := block.UnmarshalText(binaryBlockTxs); err != nil {
112                 return nil, err
113         }
114         return block.Transactions, nil
115 }
116
117 // GetBlockHashesByHeight return block hashes by given height
118 func GetBlockHashesByHeight(db dbm.DB, height uint64) ([]*bc.Hash, error) {
119         binaryHashes := db.Get(calcBlockHashesPrefix(height))
120         if binaryHashes == nil {
121                 return []*bc.Hash{}, nil
122         }
123
124         hashes := []*bc.Hash{}
125         if err := json.Unmarshal(binaryHashes, &hashes); err != nil {
126                 return nil, err
127         }
128         return hashes, nil
129 }
130
131 // GetMainChainHash return BlockHash by given height
132 func GetMainChainHash(db dbm.DB, height uint64) (*bc.Hash, error) {
133         binaryHash := db.Get(calcMainChainIndexPrefix(height))
134         if binaryHash == nil {
135                 return nil, fmt.Errorf("There are no BlockHash with given height %d", height)
136         }
137
138         hash := &bc.Hash{}
139         if err := hash.UnmarshalText(binaryHash); err != nil {
140                 return nil, err
141         }
142         return hash, nil
143 }
144
145 // GetConsensusResult return the vote result by given sequence
146 func GetConsensusResult(db dbm.DB, seq uint64) (*state.ConsensusResult, error) {
147         data := db.Get(calcConsensusResultKey(seq))
148         if data == nil {
149                 return nil, protocol.ErrNotFoundConsensusResult
150         }
151
152         consensusResult := new(state.ConsensusResult)
153         if err := json.Unmarshal(data, consensusResult); err != nil {
154                 return nil, errors.Wrap(err, "unmarshaling vote result")
155         }
156         return consensusResult, nil
157 }
158
159 // DeleteConsensusResult delete a consensusResult from cache and database
160 func (s *Store) DeleteConsensusResult(seq uint64) error {
161         consensusResult, err := GetConsensusResult(s.db, seq)
162         if err != nil {
163                 return err
164         }
165
166         s.db.Delete(calcConsensusResultKey(seq))
167         s.cache.removeConsensusResult(consensusResult)
168         return nil
169 }
170
171 // DeleteBlock delete a new block in the protocol.
172 func (s *Store) DeleteBlock(block *types.Block) error {
173         blockHash := block.Hash()
174         blockHashes, err := s.GetBlockHashesByHeight(block.Height)
175         if err != nil {
176                 return err
177         }
178
179         for i := 0; i < len(blockHashes); i++ {
180                 if blockHashes[i].String() == blockHash.String() {
181                         blockHashes = append(blockHashes[0:i], blockHashes[i+1:len(blockHashes)]...)
182                         break
183                 }
184         }
185
186         batch := s.db.NewBatch()
187         if len(blockHashes) == 0 {
188                 batch.Delete(calcBlockHashesPrefix(block.Height))
189         } else {
190                 binaryBlockHashes, err := json.Marshal(blockHashes)
191                 if err != nil {
192                         return errors.Wrap(err, "Marshal block hashes")
193                 }
194
195                 batch.Set(calcBlockHashesPrefix(block.Height), binaryBlockHashes)
196         }
197
198         batch.Delete(calcBlockHeaderKey(&blockHash))
199         batch.Delete(calcBlockTransactionsKey(&blockHash))
200         batch.Delete(calcTxStatusKey(&blockHash))
201         batch.Write()
202
203         s.cache.removeBlockHashes(block.Height)
204         s.cache.removeBlockHeader(&block.BlockHeader)
205
206         return nil
207 }
208
209 // NewStore creates and returns a new Store object.
210 func NewStore(db dbm.DB) *Store {
211         fillBlockHeaderFn := func(hash *bc.Hash) (*types.BlockHeader, error) {
212                 return GetBlockHeader(db, hash)
213         }
214         fillBlockTxsFn := func(hash *bc.Hash) ([]*types.Tx, error) {
215                 return GetBlockTransactions(db, hash)
216         }
217
218         fillBlockHashesFn := func(height uint64) ([]*bc.Hash, error) {
219                 return GetBlockHashesByHeight(db, height)
220         }
221
222         fillMainChainHashFn := func(height uint64) (*bc.Hash, error) {
223                 return GetMainChainHash(db, height)
224         }
225
226         fillConsensusResultFn := func(seq uint64) (*state.ConsensusResult, error) {
227                 return GetConsensusResult(db, seq)
228         }
229
230         cache := newCache(fillBlockHeaderFn, fillBlockTxsFn, fillBlockHashesFn, fillMainChainHashFn, fillConsensusResultFn)
231         return &Store{
232                 db:    db,
233                 cache: cache,
234         }
235 }
236
237 // BlockExist check if the block is stored in disk
238 func (s *Store) BlockExist(hash *bc.Hash) bool {
239         _, err := s.cache.lookupBlockHeader(hash)
240         return err == nil
241 }
242
243 // GetBlock return the block by given hash
244 func (s *Store) GetBlock(hash *bc.Hash) (*types.Block, error) {
245         blockHeader, err := s.GetBlockHeader(hash)
246         if err != nil {
247                 return nil, err
248         }
249
250         txs, err := s.GetBlockTransactions(hash)
251         if err != nil {
252                 return nil, err
253         }
254
255         return &types.Block{
256                 BlockHeader:  *blockHeader,
257                 Transactions: txs,
258         }, nil
259 }
260
261 // GetBlockHeader return the BlockHeader by given hash
262 func (s *Store) GetBlockHeader(hash *bc.Hash) (*types.BlockHeader, error) {
263         return s.cache.lookupBlockHeader(hash)
264 }
265
266 // GetBlockTransactions return the Block transactions by given hash
267 func (s *Store) GetBlockTransactions(hash *bc.Hash) ([]*types.Tx, error) {
268         return s.cache.lookupBlockTxs(hash)
269 }
270
271 // GetBlockHashesByHeight return the block hash by the specified height
272 func (s *Store) GetBlockHashesByHeight(height uint64) ([]*bc.Hash, error) {
273         return s.cache.lookupBlockHashesByHeight(height)
274 }
275
276 // GetMainChainHash return the block hash by the specified height
277 func (s *Store) GetMainChainHash(height uint64) (*bc.Hash, error) {
278         return s.cache.lookupMainChainHash(height)
279 }
280
281 // GetStoreStatus return the BlockStoreStateJSON
282 func (s *Store) GetStoreStatus() *protocol.BlockStoreState {
283         return loadBlockStoreStateJSON(s.db)
284 }
285
286 // GetTransactionsUtxo will return all the utxo that related to the input txs
287 func (s *Store) GetTransactionsUtxo(view *state.UtxoViewpoint, txs []*bc.Tx) error {
288         return getTransactionsUtxo(s.db, view, txs)
289 }
290
291 // GetTransactionStatus will return the utxo that related to the block hash
292 func (s *Store) GetTransactionStatus(hash *bc.Hash) (*bc.TransactionStatus, error) {
293         data := s.db.Get(calcTxStatusKey(hash))
294         if data == nil {
295                 return nil, errors.New("can't find the transaction status by given hash")
296         }
297
298         ts := &bc.TransactionStatus{}
299         if err := proto.Unmarshal(data, ts); err != nil {
300                 return nil, errors.Wrap(err, "unmarshaling transaction status")
301         }
302         return ts, nil
303 }
304
305 // GetUtxo will search the utxo in db
306 func (s *Store) GetUtxo(hash *bc.Hash) (*storage.UtxoEntry, error) {
307         return getUtxo(s.db, hash)
308 }
309
310 // GetConsensusResult retrive the voting result in specified vote sequence
311 func (s *Store) GetConsensusResult(seq uint64) (*state.ConsensusResult, error) {
312         return s.cache.lookupConsensusResult(seq)
313 }
314
315 // SaveBlock persists a new block in the protocol.
316 func (s *Store) SaveBlock(block *types.Block, ts *bc.TransactionStatus) error {
317         startTime := time.Now()
318         binaryBlockHeader, err := block.MarshalTextForBlockHeader()
319         if err != nil {
320                 return errors.Wrap(err, "Marshal block header")
321         }
322
323         binaryBlockTxs, err := block.MarshalTextForTransactions()
324         if err != nil {
325                 return errors.Wrap(err, "Marshal block transactions")
326         }
327
328         binaryTxStatus, err := proto.Marshal(ts)
329         if err != nil {
330                 return errors.Wrap(err, "Marshal block transaction status")
331         }
332
333         blockHashes := []*bc.Hash{}
334         hashes, err := s.GetBlockHashesByHeight(block.Height)
335         if err != nil {
336                 return err
337         }
338         blockHashes = append(blockHashes, hashes...)
339         blockHash := block.Hash()
340         blockHashes = append(blockHashes, &blockHash)
341         binaryBlockHashes, err := json.Marshal(blockHashes)
342         if err != nil {
343                 return errors.Wrap(err, "Marshal block hashes")
344         }
345
346         batch := s.db.NewBatch()
347         batch.Set(calcBlockHashesPrefix(block.Height), binaryBlockHashes)
348         batch.Set(calcBlockHeaderKey(&blockHash), binaryBlockHeader)
349         batch.Set(calcBlockTransactionsKey(&blockHash), binaryBlockTxs)
350         batch.Set(calcTxStatusKey(&blockHash), binaryTxStatus)
351         batch.Write()
352
353         s.cache.removeBlockHashes(block.Height)
354         log.WithFields(log.Fields{
355                 "module":   logModule,
356                 "height":   block.Height,
357                 "hash":     blockHash.String(),
358                 "duration": time.Since(startTime),
359         }).Info("block saved on disk")
360         return nil
361 }
362
363 // SaveBlockHeader persists a new block header in the protocol.
364 func (s *Store) SaveBlockHeader(blockHeader *types.BlockHeader) error {
365         binaryBlockHeader, err := blockHeader.MarshalText()
366         if err != nil {
367                 return errors.Wrap(err, "Marshal block header")
368         }
369
370         blockHash := blockHeader.Hash()
371         s.db.Set(calcBlockHeaderKey(&blockHash), binaryBlockHeader)
372         s.cache.removeBlockHeader(blockHeader)
373         return nil
374 }
375
376 // SaveChainStatus save the core's newest status && delete old status
377 func (s *Store) SaveChainStatus(blockHeader, irrBlockHeader *types.BlockHeader, mainBlockHeaders []*types.BlockHeader, view *state.UtxoViewpoint, consensusResults []*state.ConsensusResult) error {
378         currentStatus := loadBlockStoreStateJSON(s.db)
379         batch := s.db.NewBatch()
380         if err := saveUtxoView(batch, view); err != nil {
381                 return err
382         }
383
384         var clearCacheFuncs []func()
385         for _, consensusResult := range consensusResults {
386                 result := consensusResult
387                 bytes, err := json.Marshal(result)
388                 if err != nil {
389                         return err
390                 }
391
392                 batch.Set(calcConsensusResultKey(result.Seq), bytes)
393                 clearCacheFuncs = append(clearCacheFuncs, func() {
394                         s.cache.removeConsensusResult(result)
395                 })
396         }
397
398         blockHash := blockHeader.Hash()
399         irrBlockHash := irrBlockHeader.Hash()
400         bytes, err := json.Marshal(protocol.BlockStoreState{
401                 Height:             blockHeader.Height,
402                 Hash:               &blockHash,
403                 IrreversibleHeight: irrBlockHeader.Height,
404                 IrreversibleHash:   &irrBlockHash,
405         })
406         if err != nil {
407                 return err
408         }
409         batch.Set([]byte{blockStore}, bytes)
410
411         // save main chain blockHeaders
412         for _, blockHeader := range mainBlockHeaders {
413                 bh := blockHeader
414                 blockHash := bh.Hash()
415                 binaryBlockHash, err := blockHash.MarshalText()
416                 if err != nil {
417                         return errors.Wrap(err, "Marshal block hash")
418                 }
419
420                 batch.Set(calcMainChainIndexPrefix(bh.Height), binaryBlockHash)
421                 clearCacheFuncs = append(clearCacheFuncs, func() {
422                         s.cache.removeMainChainHash(bh.Height)
423                 })
424         }
425
426         if currentStatus != nil {
427                 for i := blockHeader.Height + 1; i <= currentStatus.Height; i++ {
428                         index := i
429                         batch.Delete(calcMainChainIndexPrefix(index))
430                         clearCacheFuncs = append(clearCacheFuncs, func() {
431                                 s.cache.removeMainChainHash(index)
432                         })
433                 }
434         }
435         batch.Write()
436
437         for _, clearCacheFunc := range clearCacheFuncs {
438                 clearCacheFunc()
439         }
440         return nil
441 }