X-Git-Url: http://git.osdn.net/view?p=bytom%2Fvapor.git;a=blobdiff_plain;f=database%2Fstore.go;h=1970543ba6c7bee49e32c8e401c5610df5f16cce;hp=b3e75e8fe4fa8513910bbec34b8319f4db18bba3;hb=8ca571fcd4ad57b3c7a520654b5c77fca8a89735;hpb=418514c6c8d60e61c57c171178a3d84596c6d4fe diff --git a/database/store.go b/database/store.go index b3e75e8f..1970543b 100644 --- a/database/store.go +++ b/database/store.go @@ -3,39 +3,47 @@ package database import ( "encoding/binary" "encoding/json" + "fmt" "time" "github.com/golang/protobuf/proto" log "github.com/sirupsen/logrus" - "github.com/tendermint/tmlibs/common" - - dbm "github.com/vapor/database/leveldb" - "github.com/vapor/database/storage" - "github.com/vapor/errors" - "github.com/vapor/protocol" - "github.com/vapor/protocol/bc" - "github.com/vapor/protocol/bc/types" - "github.com/vapor/protocol/state" + + dbm "github.com/bytom/vapor/database/leveldb" + "github.com/bytom/vapor/database/storage" + "github.com/bytom/vapor/errors" + "github.com/bytom/vapor/protocol" + "github.com/bytom/vapor/protocol/bc" + "github.com/bytom/vapor/protocol/bc/types" + "github.com/bytom/vapor/protocol/state" ) -const logModule = "leveldb" +const ( + // log module + logModule = "leveldb" + // the byte of colon(:) + colon = byte(0x3a) +) -var ( - blockStoreKey = []byte("blockStore") - blockHeaderPrefix = []byte("BH:") - blockTransactonsPrefix = []byte("BTXS:") - txStatusPrefix = []byte("BTS:") - voteResultPrefix = []byte("VR:") +const ( + blockStore byte = iota + blockHashes + blockHeader + blockTransactons + mainChainIndex + txStatus + consensusResult ) func loadBlockStoreStateJSON(db dbm.DB) *protocol.BlockStoreState { - bytes := db.Get(blockStoreKey) + bytes := db.Get([]byte{blockStore}) if bytes == nil { return nil } + bsj := &protocol.BlockStoreState{} if err := json.Unmarshal(bytes, bsj); err != nil { - common.PanicCrisis(common.Fmt("Could not unmarshal bytes: %X", bytes)) + log.WithField("err", err).Panic("fail on unmarshal BlockStoreStateJSON") } return bsj } @@ -45,104 +53,146 @@ func loadBlockStoreStateJSON(db dbm.DB) *protocol.BlockStoreState { // methods for querying current data. type Store struct { db dbm.DB - cache blockCache + cache *cache +} + +func calcMainChainIndexPrefix(height uint64) []byte { + buf := [8]byte{} + binary.BigEndian.PutUint64(buf[:], height) + return append([]byte{mainChainIndex, colon}, buf[:]...) } -func calcBlockHeaderKey(height uint64, hash *bc.Hash) []byte { +func calcBlockHashesPrefix(height uint64) []byte { buf := [8]byte{} binary.BigEndian.PutUint64(buf[:], height) - key := append(blockHeaderPrefix, buf[:]...) - return append(key, hash.Bytes()...) + return append([]byte{blockHashes, colon}, buf[:]...) +} + +func calcBlockHeaderKey(hash *bc.Hash) []byte { + return append([]byte{blockHeader, colon}, hash.Bytes()...) } func calcBlockTransactionsKey(hash *bc.Hash) []byte { - return append(blockTransactonsPrefix, hash.Bytes()...) + return append([]byte{blockTransactons, colon}, hash.Bytes()...) } func calcTxStatusKey(hash *bc.Hash) []byte { - return append(txStatusPrefix, hash.Bytes()...) + return append([]byte{txStatus, colon}, hash.Bytes()...) } -func calcVoteResultKey(seq uint64) []byte { +func calcConsensusResultKey(seq uint64) []byte { buf := [8]byte{} binary.BigEndian.PutUint64(buf[:], seq) - return append(voteResultPrefix, buf[:]...) + return append([]byte{consensusResult, colon}, buf[:]...) } -// GetBlockHeader return the block header by given hash and height -func GetBlockHeader(db dbm.DB, hash *bc.Hash, height uint64) (*types.BlockHeader, error) { - block := &types.Block{} - binaryBlockHeader := db.Get(calcBlockHeaderKey(height, hash)) +// GetBlockHeader return the block header by given hash +func GetBlockHeader(db dbm.DB, hash *bc.Hash) (*types.BlockHeader, error) { + binaryBlockHeader := db.Get(calcBlockHeaderKey(hash)) if binaryBlockHeader == nil { - return nil, nil + return nil, fmt.Errorf("There are no blockHeader with given hash %s", hash.String()) } - if err := block.UnmarshalText(binaryBlockHeader); err != nil { + + blockHeader := &types.BlockHeader{} + if err := blockHeader.UnmarshalText(binaryBlockHeader); err != nil { return nil, err } - - return &block.BlockHeader, nil + return blockHeader, nil } // GetBlockTransactions return the block transactions by given hash func GetBlockTransactions(db dbm.DB, hash *bc.Hash) ([]*types.Tx, error) { - block := &types.Block{} binaryBlockTxs := db.Get(calcBlockTransactionsKey(hash)) if binaryBlockTxs == nil { - return nil, errors.New("The transactions in the block is empty") + return nil, fmt.Errorf("There are no block transactions with given hash %s", hash.String()) } + block := &types.Block{} if err := block.UnmarshalText(binaryBlockTxs); err != nil { return nil, err } return block.Transactions, nil } -// GetVoteResult return the vote result by given sequence -func GetVoteResult(db dbm.DB, seq uint64) (*state.VoteResult, error) { - data := db.Get(calcVoteResultKey(seq)) +// GetBlockHashesByHeight return block hashes by given height +func GetBlockHashesByHeight(db dbm.DB, height uint64) ([]*bc.Hash, error) { + binaryHashes := db.Get(calcBlockHashesPrefix(height)) + if binaryHashes == nil { + return []*bc.Hash{}, nil + } + + hashes := []*bc.Hash{} + if err := json.Unmarshal(binaryHashes, &hashes); err != nil { + return nil, err + } + return hashes, nil +} + +// GetMainChainHash return BlockHash by given height +func GetMainChainHash(db dbm.DB, height uint64) (*bc.Hash, error) { + binaryHash := db.Get(calcMainChainIndexPrefix(height)) + if binaryHash == nil { + return nil, fmt.Errorf("There are no BlockHash with given height %d", height) + } + + hash := &bc.Hash{} + if err := hash.UnmarshalText(binaryHash); err != nil { + return nil, err + } + return hash, nil +} + +// GetConsensusResult return the vote result by given sequence +func GetConsensusResult(db dbm.DB, seq uint64) (*state.ConsensusResult, error) { + data := db.Get(calcConsensusResultKey(seq)) if data == nil { - return nil, protocol.ErrNotFoundVoteResult + return nil, protocol.ErrNotFoundConsensusResult } - voteResult := new(state.VoteResult) - if err := json.Unmarshal(data, voteResult); err != nil { + consensusResult := new(state.ConsensusResult) + if err := json.Unmarshal(data, consensusResult); err != nil { return nil, errors.Wrap(err, "unmarshaling vote result") } - return voteResult, nil + return consensusResult, nil } // NewStore creates and returns a new Store object. func NewStore(db dbm.DB) *Store { - fillBlockHeaderFn := func(hash *bc.Hash, height uint64) (*types.BlockHeader, error) { - return GetBlockHeader(db, hash, height) + fillBlockHeaderFn := func(hash *bc.Hash) (*types.BlockHeader, error) { + return GetBlockHeader(db, hash) } fillBlockTxsFn := func(hash *bc.Hash) ([]*types.Tx, error) { return GetBlockTransactions(db, hash) } - fillVoteResultFn := func(seq uint64) (*state.VoteResult, error) { - return GetVoteResult(db, seq) + + fillBlockHashesFn := func(height uint64) ([]*bc.Hash, error) { + return GetBlockHashesByHeight(db, height) + } + + fillMainChainHashFn := func(height uint64) (*bc.Hash, error) { + return GetMainChainHash(db, height) + } + + fillConsensusResultFn := func(seq uint64) (*state.ConsensusResult, error) { + return GetConsensusResult(db, seq) } - bc := newBlockCache(fillBlockHeaderFn, fillBlockTxsFn, fillVoteResultFn) + + cache := newCache(fillBlockHeaderFn, fillBlockTxsFn, fillBlockHashesFn, fillMainChainHashFn, fillConsensusResultFn) return &Store{ db: db, - cache: bc, + cache: cache, } } -// GetUtxo will search the utxo in db -func (s *Store) GetUtxo(hash *bc.Hash) (*storage.UtxoEntry, error) { - return getUtxo(s.db, hash) -} - // BlockExist check if the block is stored in disk -func (s *Store) BlockExist(hash *bc.Hash, height uint64) bool { - blockHeader, err := s.cache.lookupBlockHeader(hash, height) - return err == nil && blockHeader != nil +func (s *Store) BlockExist(hash *bc.Hash) bool { + _, err := s.cache.lookupBlockHeader(hash) + return err == nil } // GetBlock return the block by given hash -func (s *Store) GetBlock(hash *bc.Hash, height uint64) (*types.Block, error) { - blockHeader, err := s.GetBlockHeader(hash, height) +func (s *Store) GetBlock(hash *bc.Hash) (*types.Block, error) { + blockHeader, err := s.GetBlockHeader(hash) if err != nil { return nil, err } @@ -159,21 +209,28 @@ func (s *Store) GetBlock(hash *bc.Hash, height uint64) (*types.Block, error) { } // GetBlockHeader return the BlockHeader by given hash -func (s *Store) GetBlockHeader(hash *bc.Hash, height uint64) (*types.BlockHeader, error) { - blockHeader, err := s.cache.lookupBlockHeader(hash, height) - if err != nil { - return nil, err - } - return blockHeader, nil +func (s *Store) GetBlockHeader(hash *bc.Hash) (*types.BlockHeader, error) { + return s.cache.lookupBlockHeader(hash) } // GetBlockTransactions return the Block transactions by given hash func (s *Store) GetBlockTransactions(hash *bc.Hash) ([]*types.Tx, error) { - txs, err := s.cache.lookupBlockTxs(hash) - if err != nil { - return nil, err - } - return txs, nil + return s.cache.lookupBlockTxs(hash) +} + +// GetBlockHashesByHeight return the block hash by the specified height +func (s *Store) GetBlockHashesByHeight(height uint64) ([]*bc.Hash, error) { + return s.cache.lookupBlockHashesByHeight(height) +} + +// GetMainChainHash return the block hash by the specified height +func (s *Store) GetMainChainHash(height uint64) (*bc.Hash, error) { + return s.cache.lookupMainChainHash(height) +} + +// GetStoreStatus return the BlockStoreStateJSON +func (s *Store) GetStoreStatus() *protocol.BlockStoreState { + return loadBlockStoreStateJSON(s.db) } // GetTransactionsUtxo will return all the utxo that related to the input txs @@ -195,63 +252,19 @@ func (s *Store) GetTransactionStatus(hash *bc.Hash) (*bc.TransactionStatus, erro return ts, nil } -// GetStoreStatus return the BlockStoreStateJSON -func (s *Store) GetStoreStatus() *protocol.BlockStoreState { - return loadBlockStoreStateJSON(s.db) -} - -// GetVoteResult retrive the voting result in specified vote sequence -func (s *Store) GetVoteResult(seq uint64) (*state.VoteResult, error) { - return s.cache.lookupVoteResult(seq) +// GetUtxo will search the utxo in db +func (s *Store) GetUtxo(hash *bc.Hash) (*storage.UtxoEntry, error) { + return getUtxo(s.db, hash) } -func (s *Store) LoadBlockIndex(stateBestHeight uint64) (*state.BlockIndex, error) { - startTime := time.Now() - blockIndex := state.NewBlockIndex() - bhIter := s.db.IteratorPrefix(blockHeaderPrefix) - defer bhIter.Release() - - var lastNode *state.BlockNode - for bhIter.Next() { - bh := &types.BlockHeader{} - if err := bh.UnmarshalText(bhIter.Value()); err != nil { - return nil, err - } - - // If a block with a height greater than the best height of state is added to the index, - // It may cause a bug that the new block cant not be process properly. - if bh.Height > stateBestHeight { - break - } - - var parent *state.BlockNode - if lastNode == nil || lastNode.Hash == bh.PreviousBlockHash { - parent = lastNode - } else { - parent = blockIndex.GetNode(&bh.PreviousBlockHash) - } - - node, err := state.NewBlockNode(bh, parent) - if err != nil { - return nil, err - } - - blockIndex.AddNode(node) - lastNode = node - } - - log.WithFields(log.Fields{ - "module": logModule, - "height": stateBestHeight, - "duration": time.Since(startTime), - }).Debug("initialize load history block index from database") - return blockIndex, nil +// GetConsensusResult retrive the voting result in specified vote sequence +func (s *Store) GetConsensusResult(seq uint64) (*state.ConsensusResult, error) { + return s.cache.lookupConsensusResult(seq) } // SaveBlock persists a new block in the protocol. func (s *Store) SaveBlock(block *types.Block, ts *bc.TransactionStatus) error { startTime := time.Now() - binaryBlockHeader, err := block.MarshalTextForBlockHeader() if err != nil { return errors.Wrap(err, "Marshal block header") @@ -264,16 +277,30 @@ func (s *Store) SaveBlock(block *types.Block, ts *bc.TransactionStatus) error { binaryTxStatus, err := proto.Marshal(ts) if err != nil { - return errors.Wrap(err, "marshal block transaction status") + return errors.Wrap(err, "Marshal block transaction status") } + blockHashes := []*bc.Hash{} + hashes, err := s.GetBlockHashesByHeight(block.Height) + if err != nil { + return err + } + blockHashes = append(blockHashes, hashes...) blockHash := block.Hash() + blockHashes = append(blockHashes, &blockHash) + binaryBlockHashes, err := json.Marshal(blockHashes) + if err != nil { + return errors.Wrap(err, "Marshal block hashes") + } + batch := s.db.NewBatch() - batch.Set(calcBlockHeaderKey(block.Height, &blockHash), binaryBlockHeader) + batch.Set(calcBlockHashesPrefix(block.Height), binaryBlockHashes) + batch.Set(calcBlockHeaderKey(&blockHash), binaryBlockHeader) batch.Set(calcBlockTransactionsKey(&blockHash), binaryBlockTxs) batch.Set(calcTxStatusKey(&blockHash), binaryTxStatus) batch.Write() + s.cache.removeBlockHashes(block.Height) log.WithFields(log.Fields{ "module": logModule, "height": block.Height, @@ -285,54 +312,80 @@ func (s *Store) SaveBlock(block *types.Block, ts *bc.TransactionStatus) error { // SaveBlockHeader persists a new block header in the protocol. func (s *Store) SaveBlockHeader(blockHeader *types.BlockHeader) error { - startTime := time.Now() - binaryBlockHeader, err := blockHeader.MarshalText() if err != nil { return errors.Wrap(err, "Marshal block header") } blockHash := blockHeader.Hash() - s.db.Set(calcBlockHeaderKey(blockHeader.Height, &blockHash), binaryBlockHeader) - - // updata blockheader cache - if _, ok := s.cache.getBlockHeader(&blockHash); ok { - s.cache.addBlockHeader(blockHeader) - } - + s.db.Set(calcBlockHeaderKey(&blockHash), binaryBlockHeader) + s.cache.removeBlockHeader(blockHeader) return nil } // SaveChainStatus save the core's newest status && delete old status -func (s *Store) SaveChainStatus(node, irreversibleNode *state.BlockNode, view *state.UtxoViewpoint, voteResults []*state.VoteResult) error { +func (s *Store) SaveChainStatus(blockHeader, irrBlockHeader *types.BlockHeader, mainBlockHeaders []*types.BlockHeader, view *state.UtxoViewpoint, consensusResults []*state.ConsensusResult) error { + currentStatus := loadBlockStoreStateJSON(s.db) batch := s.db.NewBatch() if err := saveUtxoView(batch, view); err != nil { return err } - for _, vote := range voteResults { - bytes, err := json.Marshal(vote) + var clearCacheFuncs []func() + for _, consensusResult := range consensusResults { + result := consensusResult + bytes, err := json.Marshal(result) if err != nil { return err } - batch.Set(calcVoteResultKey(vote.Seq), bytes) - if _, ok := s.cache.getVoteResult(vote.Seq); ok { - s.cache.addVoteResult(vote) - } + batch.Set(calcConsensusResultKey(result.Seq), bytes) + clearCacheFuncs = append(clearCacheFuncs, func() { + s.cache.removeConsensusResult(result) + }) } + blockHash := blockHeader.Hash() + irrBlockHash := irrBlockHeader.Hash() bytes, err := json.Marshal(protocol.BlockStoreState{ - Height: node.Height, - Hash: &node.Hash, - IrreversibleHeight: irreversibleNode.Height, - IrreversibleHash: &irreversibleNode.Hash, + Height: blockHeader.Height, + Hash: &blockHash, + IrreversibleHeight: irrBlockHeader.Height, + IrreversibleHash: &irrBlockHash, }) if err != nil { return err } + batch.Set([]byte{blockStore}, bytes) + + // save main chain blockHeaders + for _, blockHeader := range mainBlockHeaders { + bh := blockHeader + blockHash := bh.Hash() + binaryBlockHash, err := blockHash.MarshalText() + if err != nil { + return errors.Wrap(err, "Marshal block hash") + } + + batch.Set(calcMainChainIndexPrefix(bh.Height), binaryBlockHash) + clearCacheFuncs = append(clearCacheFuncs, func() { + s.cache.removeMainChainHash(bh.Height) + }) + } - batch.Set(blockStoreKey, bytes) + if currentStatus != nil { + for i := blockHeader.Height + 1; i <= currentStatus.Height; i++ { + index := i + batch.Delete(calcMainChainIndexPrefix(index)) + clearCacheFuncs = append(clearCacheFuncs, func() { + s.cache.removeMainChainHash(index) + }) + } + } batch.Write() + + for _, clearCacheFunc := range clearCacheFuncs { + clearCacheFunc() + } return nil }