OSDN Git Service

update vote result cache (#185)
[bytom/vapor.git] / database / store.go
1 package database
2
3 import (
4         "encoding/binary"
5         "encoding/json"
6         "time"
7
8         "github.com/golang/protobuf/proto"
9         log "github.com/sirupsen/logrus"
10         "github.com/tendermint/tmlibs/common"
11
12         dbm "github.com/vapor/database/leveldb"
13         "github.com/vapor/database/storage"
14         "github.com/vapor/errors"
15         "github.com/vapor/protocol"
16         "github.com/vapor/protocol/bc"
17         "github.com/vapor/protocol/bc/types"
18         "github.com/vapor/protocol/state"
19 )
20
21 const logModule = "leveldb"
22
23 var (
24         blockStoreKey          = []byte("blockStore")
25         blockHeaderPrefix      = []byte("BH:")
26         blockTransactonsPrefix = []byte("BTXS:")
27         txStatusPrefix         = []byte("BTS:")
28         voteResultPrefix       = []byte("VR:")
29 )
30
31 func loadBlockStoreStateJSON(db dbm.DB) *protocol.BlockStoreState {
32         bytes := db.Get(blockStoreKey)
33         if bytes == nil {
34                 return nil
35         }
36         bsj := &protocol.BlockStoreState{}
37         if err := json.Unmarshal(bytes, bsj); err != nil {
38                 common.PanicCrisis(common.Fmt("Could not unmarshal bytes: %X", bytes))
39         }
40         return bsj
41 }
42
43 // A Store encapsulates storage for blockchain validation.
44 // It satisfies the interface protocol.Store, and provides additional
45 // methods for querying current data.
46 type Store struct {
47         db    dbm.DB
48         cache blockCache
49 }
50
51 func calcBlockHeaderKey(height uint64, hash *bc.Hash) []byte {
52         buf := [8]byte{}
53         binary.BigEndian.PutUint64(buf[:], height)
54         key := append(blockHeaderPrefix, buf[:]...)
55         return append(key, hash.Bytes()...)
56 }
57
58 func calcBlockTransactionsKey(hash *bc.Hash) []byte {
59         return append(blockTransactonsPrefix, hash.Bytes()...)
60 }
61
62 func calcTxStatusKey(hash *bc.Hash) []byte {
63         return append(txStatusPrefix, hash.Bytes()...)
64 }
65
66 func calcVoteResultKey(seq uint64) []byte {
67         buf := [8]byte{}
68         binary.BigEndian.PutUint64(buf[:], seq)
69         return append(voteResultPrefix, buf[:]...)
70 }
71
72 // GetBlockHeader return the block header by given hash and height
73 func GetBlockHeader(db dbm.DB, hash *bc.Hash, height uint64) (*types.BlockHeader, error) {
74         block := &types.Block{}
75         binaryBlockHeader := db.Get(calcBlockHeaderKey(height, hash))
76         if binaryBlockHeader == nil {
77                 return nil, nil
78         }
79         if err := block.UnmarshalText(binaryBlockHeader); err != nil {
80                 return nil, err
81         }
82
83         return &block.BlockHeader, nil
84 }
85
86 // GetBlockTransactions return the block transactions by given hash
87 func GetBlockTransactions(db dbm.DB, hash *bc.Hash) ([]*types.Tx, error) {
88         block := &types.Block{}
89         binaryBlockTxs := db.Get(calcBlockTransactionsKey(hash))
90         if binaryBlockTxs == nil {
91                 return nil, errors.New("The transactions in the block is empty")
92         }
93
94         if err := block.UnmarshalText(binaryBlockTxs); err != nil {
95                 return nil, err
96         }
97         return block.Transactions, nil
98 }
99
100 // GetVoteResult return the vote result by given sequence
101 func GetVoteResult(db dbm.DB, seq uint64) (*state.VoteResult, error) {
102         data := db.Get(calcVoteResultKey(seq))
103         if data == nil {
104                 return nil, protocol.ErrNotFoundVoteResult
105         }
106
107         voteResult := new(state.VoteResult)
108         if err := json.Unmarshal(data, voteResult); err != nil {
109                 return nil, errors.Wrap(err, "unmarshaling vote result")
110         }
111         return voteResult, nil
112 }
113
114 // NewStore creates and returns a new Store object.
115 func NewStore(db dbm.DB) *Store {
116         fillBlockHeaderFn := func(hash *bc.Hash, height uint64) (*types.BlockHeader, error) {
117                 return GetBlockHeader(db, hash, height)
118         }
119         fillBlockTxsFn := func(hash *bc.Hash) ([]*types.Tx, error) {
120                 return GetBlockTransactions(db, hash)
121         }
122         fillVoteResultFn := func(seq uint64) (*state.VoteResult, error) {
123                 return GetVoteResult(db, seq)
124         }
125         bc := newBlockCache(fillBlockHeaderFn, fillBlockTxsFn, fillVoteResultFn)
126         return &Store{
127                 db:    db,
128                 cache: bc,
129         }
130 }
131
132 // GetUtxo will search the utxo in db
133 func (s *Store) GetUtxo(hash *bc.Hash) (*storage.UtxoEntry, error) {
134         return getUtxo(s.db, hash)
135 }
136
137 // BlockExist check if the block is stored in disk
138 func (s *Store) BlockExist(hash *bc.Hash, height uint64) bool {
139         blockHeader, err := s.cache.lookupBlockHeader(hash, height)
140         return err == nil && blockHeader != nil
141 }
142
143 // GetBlock return the block by given hash
144 func (s *Store) GetBlock(hash *bc.Hash, height uint64) (*types.Block, error) {
145         blockHeader, err := s.GetBlockHeader(hash, height)
146         if err != nil {
147                 return nil, err
148         }
149
150         txs, err := s.GetBlockTransactions(hash)
151         if err != nil {
152                 return nil, err
153         }
154
155         return &types.Block{
156                 BlockHeader:  *blockHeader,
157                 Transactions: txs,
158         }, nil
159 }
160
161 // GetBlockHeader return the BlockHeader by given hash
162 func (s *Store) GetBlockHeader(hash *bc.Hash, height uint64) (*types.BlockHeader, error) {
163         blockHeader, err := s.cache.lookupBlockHeader(hash, height)
164         if err != nil {
165                 return nil, err
166         }
167         return blockHeader, nil
168 }
169
170 // GetBlockTransactions return the Block transactions by given hash
171 func (s *Store) GetBlockTransactions(hash *bc.Hash) ([]*types.Tx, error) {
172         txs, err := s.cache.lookupBlockTxs(hash)
173         if err != nil {
174                 return nil, err
175         }
176         return txs, nil
177 }
178
179 // GetTransactionsUtxo will return all the utxo that related to the input txs
180 func (s *Store) GetTransactionsUtxo(view *state.UtxoViewpoint, txs []*bc.Tx) error {
181         return getTransactionsUtxo(s.db, view, txs)
182 }
183
184 // GetTransactionStatus will return the utxo that related to the block hash
185 func (s *Store) GetTransactionStatus(hash *bc.Hash) (*bc.TransactionStatus, error) {
186         data := s.db.Get(calcTxStatusKey(hash))
187         if data == nil {
188                 return nil, errors.New("can't find the transaction status by given hash")
189         }
190
191         ts := &bc.TransactionStatus{}
192         if err := proto.Unmarshal(data, ts); err != nil {
193                 return nil, errors.Wrap(err, "unmarshaling transaction status")
194         }
195         return ts, nil
196 }
197
198 // GetStoreStatus return the BlockStoreStateJSON
199 func (s *Store) GetStoreStatus() *protocol.BlockStoreState {
200         return loadBlockStoreStateJSON(s.db)
201 }
202
203 // GetVoteResult retrive the voting result in specified vote sequence
204 func (s *Store) GetVoteResult(seq uint64) (*state.VoteResult, error) {
205         return s.cache.lookupVoteResult(seq)
206 }
207
208 func (s *Store) LoadBlockIndex(stateBestHeight uint64) (*state.BlockIndex, error) {
209         startTime := time.Now()
210         blockIndex := state.NewBlockIndex()
211         bhIter := s.db.IteratorPrefix(blockHeaderPrefix)
212         defer bhIter.Release()
213
214         var lastNode *state.BlockNode
215         for bhIter.Next() {
216                 bh := &types.BlockHeader{}
217                 if err := bh.UnmarshalText(bhIter.Value()); err != nil {
218                         return nil, err
219                 }
220
221                 // If a block with a height greater than the best height of state is added to the index,
222                 // It may cause a bug that the new block cant not be process properly.
223                 if bh.Height > stateBestHeight {
224                         break
225                 }
226
227                 var parent *state.BlockNode
228                 if lastNode == nil || lastNode.Hash == bh.PreviousBlockHash {
229                         parent = lastNode
230                 } else {
231                         parent = blockIndex.GetNode(&bh.PreviousBlockHash)
232                 }
233
234                 node, err := state.NewBlockNode(bh, parent)
235                 if err != nil {
236                         return nil, err
237                 }
238
239                 blockIndex.AddNode(node)
240                 lastNode = node
241         }
242
243         log.WithFields(log.Fields{
244                 "module":   logModule,
245                 "height":   stateBestHeight,
246                 "duration": time.Since(startTime),
247         }).Debug("initialize load history block index from database")
248         return blockIndex, nil
249 }
250
251 // SaveBlock persists a new block in the protocol.
252 func (s *Store) SaveBlock(block *types.Block, ts *bc.TransactionStatus) error {
253         startTime := time.Now()
254
255         binaryBlockHeader, err := block.MarshalTextForBlockHeader()
256         if err != nil {
257                 return errors.Wrap(err, "Marshal block header")
258         }
259
260         binaryBlockTxs, err := block.MarshalTextForTransactions()
261         if err != nil {
262                 return errors.Wrap(err, "Marshal block transactions")
263         }
264
265         binaryTxStatus, err := proto.Marshal(ts)
266         if err != nil {
267                 return errors.Wrap(err, "marshal block transaction status")
268         }
269
270         blockHash := block.Hash()
271         batch := s.db.NewBatch()
272         batch.Set(calcBlockHeaderKey(block.Height, &blockHash), binaryBlockHeader)
273         batch.Set(calcBlockTransactionsKey(&blockHash), binaryBlockTxs)
274         batch.Set(calcTxStatusKey(&blockHash), binaryTxStatus)
275         batch.Write()
276
277         log.WithFields(log.Fields{
278                 "module":   logModule,
279                 "height":   block.Height,
280                 "hash":     blockHash.String(),
281                 "duration": time.Since(startTime),
282         }).Info("block saved on disk")
283         return nil
284 }
285
286 // SaveBlockHeader persists a new block header in the protocol.
287 func (s *Store) SaveBlockHeader(blockHeader *types.BlockHeader) error {
288         startTime := time.Now()
289
290         binaryBlockHeader, err := blockHeader.MarshalText()
291         if err != nil {
292                 return errors.Wrap(err, "Marshal block header")
293         }
294
295         blockHash := blockHeader.Hash()
296         s.db.Set(calcBlockHeaderKey(blockHeader.Height, &blockHash), binaryBlockHeader)
297
298         // updata blockheader cache
299         if _, ok := s.cache.getBlockHeader(&blockHash); ok {
300                 s.cache.addBlockHeader(blockHeader)
301         }
302
303         log.WithFields(log.Fields{
304                 "module":   logModule,
305                 "height":   blockHeader.Height,
306                 "hash":     blockHash.String(),
307                 "duration": time.Since(startTime),
308         }).Info("blockHeader saved on disk")
309         return nil
310 }
311
312 // SaveChainStatus save the core's newest status && delete old status
313 func (s *Store) SaveChainStatus(node, irreversibleNode *state.BlockNode, view *state.UtxoViewpoint, voteResults []*state.VoteResult) error {
314         batch := s.db.NewBatch()
315         if err := saveUtxoView(batch, view); err != nil {
316                 return err
317         }
318
319         for _, vote := range voteResults {
320                 bytes, err := json.Marshal(vote)
321                 if err != nil {
322                         return err
323                 }
324
325                 batch.Set(calcVoteResultKey(vote.Seq), bytes)
326                 if _, ok := s.cache.getVoteResult(vote.Seq); ok {
327                         s.cache.addVoteResult(vote)
328                 }
329         }
330
331         bytes, err := json.Marshal(protocol.BlockStoreState{
332                 Height:             node.Height,
333                 Hash:               &node.Hash,
334                 IrreversibleHeight: irreversibleNode.Height,
335                 IrreversibleHash:   &irreversibleNode.Hash,
336         })
337         if err != nil {
338                 return err
339         }
340
341         batch.Set(blockStoreKey, bytes)
342         batch.Write()
343         return nil
344 }