OSDN Git Service

Merge pull request #201 from Bytom/v0.1
[bytom/vapor.git] / database / store.go
diff --git a/database/store.go b/database/store.go
new file mode 100644 (file)
index 0000000..6de274b
--- /dev/null
@@ -0,0 +1,321 @@
+package database
+
+import (
+       "encoding/binary"
+       "encoding/json"
+       "fmt"
+       "time"
+
+       "github.com/golang/protobuf/proto"
+       log "github.com/sirupsen/logrus"
+
+       dbm "github.com/vapor/database/leveldb"
+       "github.com/vapor/database/storage"
+       "github.com/vapor/errors"
+       "github.com/vapor/protocol"
+       "github.com/vapor/protocol/bc"
+       "github.com/vapor/protocol/bc/types"
+       "github.com/vapor/protocol/state"
+)
+
+const logModule = "leveldb"
+
+var (
+       blockStoreKey          = []byte("blockStore")
+       blockHeaderPrefix      = []byte("BH:")
+       blockTransactonsPrefix = []byte("BTXS:")
+       txStatusPrefix         = []byte("BTS:")
+       voteResultPrefix       = []byte("VR:")
+)
+
+func loadBlockStoreStateJSON(db dbm.DB) *protocol.BlockStoreState {
+       bytes := db.Get(blockStoreKey)
+       if bytes == nil {
+               return nil
+       }
+
+       bsj := &protocol.BlockStoreState{}
+       if err := json.Unmarshal(bytes, bsj); err != nil {
+               log.WithField("err", err).Panic("fail on unmarshal BlockStoreStateJSON")
+       }
+       return bsj
+}
+
+// A Store encapsulates storage for blockchain validation.
+// It satisfies the interface protocol.Store, and provides additional
+// methods for querying current data.
+type Store struct {
+       db    dbm.DB
+       cache cache
+}
+
+func calcBlockHeaderKey(height uint64, hash *bc.Hash) []byte {
+       buf := [8]byte{}
+       binary.BigEndian.PutUint64(buf[:], height)
+       key := append(blockHeaderPrefix, buf[:]...)
+       return append(key, hash.Bytes()...)
+}
+
+func calcBlockTransactionsKey(hash *bc.Hash) []byte {
+       return append(blockTransactonsPrefix, hash.Bytes()...)
+}
+
+func calcTxStatusKey(hash *bc.Hash) []byte {
+       return append(txStatusPrefix, hash.Bytes()...)
+}
+
+func calcVoteResultKey(seq uint64) []byte {
+       buf := [8]byte{}
+       binary.BigEndian.PutUint64(buf[:], seq)
+       return append(voteResultPrefix, buf[:]...)
+}
+
+// GetBlockHeader return the block header by given hash and height
+func GetBlockHeader(db dbm.DB, hash *bc.Hash, height uint64) (*types.BlockHeader, error) {
+       binaryBlockHeader := db.Get(calcBlockHeaderKey(height, hash))
+       if binaryBlockHeader == nil {
+               return nil, fmt.Errorf("There are no blockHeader with given hash %s", hash.String())
+       }
+
+       block := &types.Block{}
+       if err := block.UnmarshalText(binaryBlockHeader); err != nil {
+               return nil, err
+       }
+       return &block.BlockHeader, nil
+}
+
+// GetBlockTransactions return the block transactions by given hash
+func GetBlockTransactions(db dbm.DB, hash *bc.Hash) ([]*types.Tx, error) {
+       binaryBlockTxs := db.Get(calcBlockTransactionsKey(hash))
+       if binaryBlockTxs == nil {
+               return nil, fmt.Errorf("There are no block transactions with given hash %s", hash.String())
+       }
+
+       block := &types.Block{}
+       if err := block.UnmarshalText(binaryBlockTxs); err != nil {
+               return nil, err
+       }
+       return block.Transactions, nil
+}
+
+// GetVoteResult return the vote result by given sequence
+func GetVoteResult(db dbm.DB, seq uint64) (*state.VoteResult, error) {
+       data := db.Get(calcVoteResultKey(seq))
+       if data == nil {
+               return nil, protocol.ErrNotFoundVoteResult
+       }
+
+       voteResult := new(state.VoteResult)
+       if err := json.Unmarshal(data, voteResult); err != nil {
+               return nil, errors.Wrap(err, "unmarshaling vote result")
+       }
+       return voteResult, nil
+}
+
+// NewStore creates and returns a new Store object.
+func NewStore(db dbm.DB) *Store {
+       fillBlockHeaderFn := func(hash *bc.Hash, height uint64) (*types.BlockHeader, error) {
+               return GetBlockHeader(db, hash, height)
+       }
+       fillBlockTxsFn := func(hash *bc.Hash) ([]*types.Tx, error) {
+               return GetBlockTransactions(db, hash)
+       }
+       fillVoteResultFn := func(seq uint64) (*state.VoteResult, error) {
+               return GetVoteResult(db, seq)
+       }
+       bc := newCache(fillBlockHeaderFn, fillBlockTxsFn, fillVoteResultFn)
+       return &Store{
+               db:    db,
+               cache: bc,
+       }
+}
+
+// BlockExist check if the block is stored in disk
+func (s *Store) BlockExist(hash *bc.Hash, height uint64) bool {
+       _, err := s.cache.lookupBlockHeader(hash, height)
+       return err == nil
+}
+
+// GetBlock return the block by given hash
+func (s *Store) GetBlock(hash *bc.Hash, height uint64) (*types.Block, error) {
+       blockHeader, err := s.GetBlockHeader(hash, height)
+       if err != nil {
+               return nil, err
+       }
+
+       txs, err := s.GetBlockTransactions(hash)
+       if err != nil {
+               return nil, err
+       }
+
+       return &types.Block{
+               BlockHeader:  *blockHeader,
+               Transactions: txs,
+       }, nil
+}
+
+// GetBlockHeader return the BlockHeader by given hash
+func (s *Store) GetBlockHeader(hash *bc.Hash, height uint64) (*types.BlockHeader, error) {
+       return s.cache.lookupBlockHeader(hash, height)
+}
+
+// GetBlockTransactions return the Block transactions by given hash
+func (s *Store) GetBlockTransactions(hash *bc.Hash) ([]*types.Tx, error) {
+       return s.cache.lookupBlockTxs(hash)
+}
+
+// GetStoreStatus return the BlockStoreStateJSON
+func (s *Store) GetStoreStatus() *protocol.BlockStoreState {
+       return loadBlockStoreStateJSON(s.db)
+}
+
+// GetTransactionsUtxo will return all the utxo that related to the input txs
+func (s *Store) GetTransactionsUtxo(view *state.UtxoViewpoint, txs []*bc.Tx) error {
+       return getTransactionsUtxo(s.db, view, txs)
+}
+
+// GetTransactionStatus will return the utxo that related to the block hash
+func (s *Store) GetTransactionStatus(hash *bc.Hash) (*bc.TransactionStatus, error) {
+       data := s.db.Get(calcTxStatusKey(hash))
+       if data == nil {
+               return nil, errors.New("can't find the transaction status by given hash")
+       }
+
+       ts := &bc.TransactionStatus{}
+       if err := proto.Unmarshal(data, ts); err != nil {
+               return nil, errors.Wrap(err, "unmarshaling transaction status")
+       }
+       return ts, nil
+}
+
+// GetUtxo will search the utxo in db
+func (s *Store) GetUtxo(hash *bc.Hash) (*storage.UtxoEntry, error) {
+       return getUtxo(s.db, hash)
+}
+
+// GetVoteResult retrive the voting result in specified vote sequence
+func (s *Store) GetVoteResult(seq uint64) (*state.VoteResult, error) {
+       return s.cache.lookupVoteResult(seq)
+}
+
+func (s *Store) LoadBlockIndex(stateBestHeight uint64) (*state.BlockIndex, error) {
+       startTime := time.Now()
+       blockIndex := state.NewBlockIndex()
+       bhIter := s.db.IteratorPrefix(blockHeaderPrefix)
+       defer bhIter.Release()
+
+       var lastNode *state.BlockNode
+       for bhIter.Next() {
+               bh := &types.BlockHeader{}
+               if err := bh.UnmarshalText(bhIter.Value()); err != nil {
+                       return nil, err
+               }
+
+               // If a block with a height greater than the best height of state is added to the index,
+               // It may cause a bug that the new block cant not be process properly.
+               if bh.Height > stateBestHeight {
+                       break
+               }
+
+               var parent *state.BlockNode
+               if lastNode == nil || lastNode.Hash == bh.PreviousBlockHash {
+                       parent = lastNode
+               } else {
+                       parent = blockIndex.GetNode(&bh.PreviousBlockHash)
+               }
+
+               node, err := state.NewBlockNode(bh, parent)
+               if err != nil {
+                       return nil, err
+               }
+
+               blockIndex.AddNode(node)
+               lastNode = node
+       }
+
+       log.WithFields(log.Fields{
+               "module":   logModule,
+               "height":   stateBestHeight,
+               "duration": time.Since(startTime),
+       }).Debug("initialize load history block index from database")
+       return blockIndex, nil
+}
+
+// SaveBlock persists a new block in the protocol.
+func (s *Store) SaveBlock(block *types.Block, ts *bc.TransactionStatus) error {
+       startTime := time.Now()
+       binaryBlockHeader, err := block.MarshalTextForBlockHeader()
+       if err != nil {
+               return errors.Wrap(err, "Marshal block header")
+       }
+
+       binaryBlockTxs, err := block.MarshalTextForTransactions()
+       if err != nil {
+               return errors.Wrap(err, "Marshal block transactions")
+       }
+
+       binaryTxStatus, err := proto.Marshal(ts)
+       if err != nil {
+               return errors.Wrap(err, "marshal block transaction status")
+       }
+
+       blockHash := block.Hash()
+       batch := s.db.NewBatch()
+       batch.Set(calcBlockHeaderKey(block.Height, &blockHash), binaryBlockHeader)
+       batch.Set(calcBlockTransactionsKey(&blockHash), binaryBlockTxs)
+       batch.Set(calcTxStatusKey(&blockHash), binaryTxStatus)
+       batch.Write()
+
+       log.WithFields(log.Fields{
+               "module":   logModule,
+               "height":   block.Height,
+               "hash":     blockHash.String(),
+               "duration": time.Since(startTime),
+       }).Info("block saved on disk")
+       return nil
+}
+
+// SaveBlockHeader persists a new block header in the protocol.
+func (s *Store) SaveBlockHeader(blockHeader *types.BlockHeader) error {
+       binaryBlockHeader, err := blockHeader.MarshalText()
+       if err != nil {
+               return errors.Wrap(err, "Marshal block header")
+       }
+
+       blockHash := blockHeader.Hash()
+       s.db.Set(calcBlockHeaderKey(blockHeader.Height, &blockHash), binaryBlockHeader)
+       s.cache.removeBlockHeader(blockHeader)
+       return nil
+}
+
+// SaveChainStatus save the core's newest status && delete old status
+func (s *Store) SaveChainStatus(node, irreversibleNode *state.BlockNode, view *state.UtxoViewpoint, voteResults []*state.VoteResult) error {
+       batch := s.db.NewBatch()
+       if err := saveUtxoView(batch, view); err != nil {
+               return err
+       }
+
+       for _, vote := range voteResults {
+               bytes, err := json.Marshal(vote)
+               if err != nil {
+                       return err
+               }
+
+               batch.Set(calcVoteResultKey(vote.Seq), bytes)
+               s.cache.removeVoteResult(vote)
+       }
+
+       bytes, err := json.Marshal(protocol.BlockStoreState{
+               Height:             node.Height,
+               Hash:               &node.Hash,
+               IrreversibleHeight: irreversibleNode.Height,
+               IrreversibleHash:   &irreversibleNode.Hash,
+       })
+       if err != nil {
+               return err
+       }
+
+       batch.Set(blockStoreKey, bytes)
+       batch.Write()
+       return nil
+}