OSDN Git Service

edit dup sup link struct (#1988)
[bytom/bytom.git] / database / store.go
index bbe13fc..70b6fed 100644 (file)
 package database
 
 import (
-       "github.com/bytom/database/storage"
-       "github.com/bytom/protocol/bc"
-       "github.com/bytom/protocol/bc/types"
-       "github.com/bytom/protocol/state"
+       "encoding/binary"
+       "encoding/json"
+       "time"
+
+       log "github.com/sirupsen/logrus"
+       "github.com/tendermint/tmlibs/common"
+
+       "github.com/bytom/bytom/consensus"
+       dbm "github.com/bytom/bytom/database/leveldb"
+       "github.com/bytom/bytom/database/storage"
+       "github.com/bytom/bytom/errors"
+       "github.com/bytom/bytom/protocol"
+       "github.com/bytom/bytom/protocol/bc"
+       "github.com/bytom/bytom/protocol/bc/types"
+       "github.com/bytom/bytom/protocol/state"
+)
+
+const logModule = "leveldb"
+
+var (
+       // CheckpointPrefix represent the namespace of checkpoints in db
+       CheckpointPrefix = []byte("CP:")
+       // BlockStoreKey block store key
+       BlockStoreKey = []byte("blockStore")
+       // BlockHeaderIndexPrefix  block header index with height
+       BlockHeaderIndexPrefix = []byte("BH:")
 )
 
-// Store provides storage interface for blockchain data
-type Store interface {
-       BlockExist(*bc.Hash) bool
+func loadBlockStoreStateJSON(db dbm.DB) *protocol.BlockStoreState {
+       bytes := db.Get(BlockStoreKey)
+       if bytes == nil {
+               return nil
+       }
+       bsj := &protocol.BlockStoreState{}
+       if err := json.Unmarshal(bytes, bsj); err != nil {
+               common.PanicCrisis(common.Fmt("Could not unmarshal bytes: %X", bytes))
+       }
+       return bsj
+}
+
+// A Store encapsulates storage for blockchain validation.
+// It satisfies the interface protocol.Store, and provides additional
+// methods for querying current data.
+type Store struct {
+       db    dbm.DB
+       cache cache
+}
+
+// NewStore creates and returns a new Store object.
+func NewStore(db dbm.DB) *Store {
+       fillBlockHeaderFn := func(hash *bc.Hash) (*types.BlockHeader, error) {
+               return GetBlockHeader(db, hash)
+       }
+
+       fillBlockTxsFn := func(hash *bc.Hash) ([]*types.Tx, error) {
+               return GetBlockTransactions(db, hash)
+       }
+
+       fillBlockHashesFn := func(height uint64) ([]*bc.Hash, error) {
+               return GetBlockHashesByHeight(db, height)
+       }
+
+       fillMainChainHashFn := func(height uint64) (*bc.Hash, error) {
+               return GetMainChainHash(db, height)
+       }
+
+       cache := newCache(fillBlockHeaderFn, fillBlockTxsFn, fillBlockHashesFn, fillMainChainHashFn)
+       return &Store{
+               db:    db,
+               cache: cache,
+       }
+}
+
+// GetBlockHeader return the BlockHeader by given hash
+func (s *Store) GetBlockHeader(hash *bc.Hash) (*types.BlockHeader, error) {
+       return s.cache.lookupBlockHeader(hash)
+}
+
+// GetUtxo will search the utxo in db
+func (s *Store) GetUtxo(hash *bc.Hash) (*storage.UtxoEntry, error) {
+       return getUtxo(s.db, hash)
+}
+
+func (s *Store) GetContract(hash [32]byte) ([]byte, error) {
+       return getContract(s.db, hash)
+}
+
+// BlockExist check if the block is stored in disk
+func (s *Store) BlockExist(hash *bc.Hash) bool {
+       _, err := s.cache.lookupBlockHeader(hash)
+       return err == nil
+}
+
+// SaveBlockHeader persists a new block header in the protocol.
+func (s *Store) SaveBlockHeader(blockHeader *types.BlockHeader) error {
+       binaryBlockHeader, err := blockHeader.MarshalText()
+       if err != nil {
+               return errors.Wrap(err, "Marshal block header")
+       }
+
+       blockHash := blockHeader.Hash()
+       s.db.Set(CalcBlockHeaderKey(&blockHash), binaryBlockHeader)
+       s.cache.removeBlockHeader(blockHeader)
+       return nil
+}
+
+// GetBlockHashesByHeight return the block hash by the specified height
+func (s *Store) GetBlockHashesByHeight(height uint64) ([]*bc.Hash, error) {
+       return s.cache.lookupBlockHashesByHeight(height)
+}
+
+// GetMainChainHash return the block hash by the specified height
+func (s *Store) GetMainChainHash(height uint64) (*bc.Hash, error) {
+       return s.cache.lookupMainChainHash(height)
+}
+
+// SaveBlock persists a new block in the protocol.
+func (s *Store) SaveBlock(block *types.Block) error {
+       startTime := time.Now()
+       binaryBlockHeader, err := block.MarshalTextForBlockHeader()
+       if err != nil {
+               return errors.Wrap(err, "Marshal block header")
+       }
+
+       binaryBlockTxs, err := block.MarshalTextForTransactions()
+       if err != nil {
+               return errors.Wrap(err, "Marshal block transactions")
+       }
+
+       blockHashes := []*bc.Hash{}
+       hashes, err := s.GetBlockHashesByHeight(block.Height)
+       if err != nil {
+               return err
+       }
+
+       blockHashes = append(blockHashes, hashes...)
+       blockHash := block.Hash()
+       blockHashes = append(blockHashes, &blockHash)
+       binaryBlockHashes, err := json.Marshal(blockHashes)
+       if err != nil {
+               return errors.Wrap(err, "Marshal block hashes")
+       }
+
+       batch := s.db.NewBatch()
+       batch.Set(CalcBlockHashesKey(block.Height), binaryBlockHashes)
+       batch.Set(CalcBlockHeaderKey(&blockHash), binaryBlockHeader)
+       batch.Set(CalcBlockTransactionsKey(&blockHash), binaryBlockTxs)
+       batch.Set(CalcBlockHeaderIndexKey(block.Height, &blockHash), binaryBlockHeader)
+       batch.Write()
+
+       s.cache.removeBlockHashes(block.Height)
+       log.WithFields(log.Fields{
+               "module":   logModule,
+               "height":   block.Height,
+               "hash":     blockHash.String(),
+               "duration": time.Since(startTime),
+       }).Info("block saved on disk")
+       return nil
+}
+
+// GetBlockTransactions return the Block transactions by given hash
+func (s *Store) GetBlockTransactions(hash *bc.Hash) ([]*types.Tx, error) {
+       return s.cache.lookupBlockTxs(hash)
+}
+
+// GetBlock return the block by given hash
+func (s *Store) GetBlock(hash *bc.Hash) (*types.Block, error) {
+       blockHeader, err := s.GetBlockHeader(hash)
+       if err != nil {
+               return nil, err
+       }
+
+       txs, err := s.GetBlockTransactions(hash)
+       if err != nil {
+               return nil, err
+       }
+
+       return &types.Block{
+               BlockHeader:  *blockHeader,
+               Transactions: txs,
+       }, nil
+}
+
+// GetTransactionsUtxo will return all the utxo that related to the input txs
+func (s *Store) GetTransactionsUtxo(view *state.UtxoViewpoint, txs []*bc.Tx) error {
+       return getTransactionsUtxo(s.db, view, txs)
+}
+
+// GetStoreStatus return the BlockStoreStateJSON
+func (s *Store) GetStoreStatus() *protocol.BlockStoreState {
+       return loadBlockStoreStateJSON(s.db)
+}
+
+// SaveChainStatus save the core's newest status && delete old status
+func (s *Store) SaveChainStatus(blockHeader *types.BlockHeader, mainBlockHeaders []*types.BlockHeader, view *state.UtxoViewpoint, contractView *state.ContractViewpoint, finalizedHeight uint64, finalizedHash *bc.Hash) error {
+       batch := s.db.NewBatch()
+       if err := saveUtxoView(batch, view); err != nil {
+               return err
+       }
+
+       if err := deleteContractView(s.db, batch, contractView); err != nil {
+               return err
+       }
+
+       if err := saveContractView(s.db, batch, contractView); err != nil {
+               return err
+       }
+
+       blockHeaderHash := blockHeader.Hash()
+       bytes, err := json.Marshal(
+               protocol.BlockStoreState{
+                       Height:          blockHeader.Height,
+                       Hash:            &blockHeaderHash,
+                       FinalizedHeight: finalizedHeight,
+                       FinalizedHash:   finalizedHash,
+               })
+       if err != nil {
+               return err
+       }
+
+       batch.Set(BlockStoreKey, bytes)
+
+       var clearCacheFuncs []func()
+       // save main chain blockHeaders
+       for _, blockHeader := range mainBlockHeaders {
+               bh := blockHeader
+               blockHash := bh.Hash()
+               binaryBlockHash, err := blockHash.MarshalText()
+               if err != nil {
+                       return errors.Wrap(err, "Marshal block hash")
+               }
+
+               batch.Set(calcMainChainIndexPrefix(bh.Height), binaryBlockHash)
+               clearCacheFuncs = append(clearCacheFuncs, func() {
+                       s.cache.removeMainChainHash(bh.Height)
+               })
+       }
+       batch.Write()
+       for _, clearCacheFunc := range clearCacheFuncs {
+               clearCacheFunc()
+       }
+
+       return nil
+}
+
+func calcCheckpointKey(height uint64, hash *bc.Hash) []byte {
+       buf := make([]byte, 8)
+       binary.BigEndian.PutUint64(buf, height)
+       key := append(CheckpointPrefix, buf...)
+       if hash != nil {
+               key = append(key, hash.Bytes()...)
+       }
+       return key
+}
+
+func (s *Store) GetCheckpoint(hash *bc.Hash) (*state.Checkpoint, error) {
+       header, err := s.GetBlockHeader(hash)
+       if err != nil {
+               return nil, err
+       }
+
+       data := s.db.Get(calcCheckpointKey(header.Height, hash))
+       checkpoint := &state.Checkpoint{}
+       if err := json.Unmarshal(data, checkpoint); err != nil {
+               return nil, err
+       }
+
+       checkpoint.SupLinks = append(checkpoint.SupLinks, header.SupLinks...)
+       return checkpoint, nil
+}
+
+// GetCheckpointsByHeight return all checkpoints of specified block height
+func (s *Store) GetCheckpointsByHeight(height uint64) ([]*state.Checkpoint, error) {
+       iter := s.db.IteratorPrefix(calcCheckpointKey(height, nil))
+       defer iter.Release()
+       return s.loadCheckpointsFromIter(iter)
+}
+
+// CheckpointsFromNode return all checkpoints from specified block height and hash
+func (s *Store) CheckpointsFromNode(height uint64, hash *bc.Hash) ([]*state.Checkpoint, error) {
+       startKey := calcCheckpointKey(height, hash)
+       iter := s.db.IteratorPrefixWithStart(CheckpointPrefix, startKey, false)
+
+       firstCheckpoint := &state.Checkpoint{}
+       if err := json.Unmarshal(iter.Value(), firstCheckpoint); err != nil {
+               return nil, err
+       }
 
-       GetBlock(*bc.Hash) (*types.Block, error)
-       GetMainchain(*bc.Hash) (map[uint64]*bc.Hash, error)
-       GetStoreStatus() BlockStoreStateJSON
-       GetSeed(*bc.Hash) (*bc.Hash, error)
-       GetTransactionStatus(*bc.Hash) (*bc.TransactionStatus, error)
-       GetTransactionsUtxo(*state.UtxoViewpoint, []*bc.Tx) error
-       GetUtxo(*bc.Hash) (*storage.UtxoEntry, error)
+       checkpoints := []*state.Checkpoint{firstCheckpoint}
+       subs, err := s.loadCheckpointsFromIter(iter)
+       if err != nil {
+               return nil, err
+       }
 
-       SaveBlock(*types.Block, *bc.TransactionStatus, *bc.Hash) error
-       SaveChainStatus(*types.Block, *state.UtxoViewpoint, map[uint64]*bc.Hash) error
+       checkpoints = append(checkpoints, subs...)
+       return checkpoints, nil
 }
 
-// BlockStoreStateJSON represents the core's db status
-type BlockStoreStateJSON struct {
-       Height uint64
-       Hash   *bc.Hash
+func (s *Store) loadCheckpointsFromIter(iter dbm.Iterator) ([]*state.Checkpoint, error) {
+       var checkpoints []*state.Checkpoint
+       defer iter.Release()
+       for iter.Next() {
+               checkpoint := &state.Checkpoint{}
+               if err := json.Unmarshal(iter.Value(), checkpoint); err != nil {
+                       return nil, err
+               }
+
+               header, err := s.GetBlockHeader(&checkpoint.Hash)
+               if err != nil {
+                       return nil, err
+               }
+
+               checkpoint.SupLinks = append(checkpoint.SupLinks, header.SupLinks...)
+               checkpoints = append(checkpoints, checkpoint)
+       }
+       return checkpoints, nil
+}
+
+// SaveCheckpoints bulk save multiple checkpoint
+func (s *Store) SaveCheckpoints(checkpoints []*state.Checkpoint) error {
+       batch := s.db.NewBatch()
+
+       if err := s.saveCheckpoints(batch, checkpoints); err != nil {
+               return err
+       }
+
+       batch.Write()
+       return nil
+}
+
+func (s *Store) saveCheckpoints(batch dbm.Batch, checkpoints []*state.Checkpoint) error {
+       for _, checkpoint := range checkpoints {
+               startTime := time.Now()
+               data, err := json.Marshal(checkpoint)
+               if err != nil {
+                       return err
+               }
+
+               if checkpoint.Height%consensus.ActiveNetParams.BlocksOfEpoch != 1 {
+                       header, err := s.GetBlockHeader(&checkpoint.Hash)
+                       if err != nil {
+                               return err
+                       }
+
+                       batch.Delete(calcCheckpointKey(header.Height-1, &header.PreviousBlockHash))
+               }
+
+               batch.Set(calcCheckpointKey(checkpoint.Height, &checkpoint.Hash), data)
+               log.WithFields(log.Fields{
+                       "module":   logModule,
+                       "height":   checkpoint.Height,
+                       "hash":     checkpoint.Hash.String(),
+                       "status":   checkpoint.Status,
+                       "duration": time.Since(startTime),
+               }).Info("checkpoint saved on disk")
+       }
+       return nil
 }