package database
import (
- "fmt"
"strconv"
"github.com/golang/groupcache/singleflight"
)
const (
- maxCachedBlockHeaders = 1000
- maxCachedBlockTransactions = 1000
- maxCachedVoteResults = 144 // int(60 * 60 * 24 * 1000 / consensus.BlockTimeInterval / consensus.RoundVoteBlockNums)
+ maxCachedBlockHeaders = 4096
+ maxCachedBlockTransactions = 1024
+ maxCachedVoteResults = 128
)
type fillBlockHeaderFn func(hash *bc.Hash, height uint64) (*types.BlockHeader, error)
type fillBlockTransactionsFn func(hash *bc.Hash) ([]*types.Tx, error)
type fillVoteResultFn func(seq uint64) (*state.VoteResult, error)
-func newBlockCache(fillBlockHeader fillBlockHeaderFn, fillBlockTxs fillBlockTransactionsFn, fillVoteResult fillVoteResultFn) blockCache {
- return blockCache{
+func newCache(fillBlockHeader fillBlockHeaderFn, fillBlockTxs fillBlockTransactionsFn, fillVoteResult fillVoteResultFn) cache {
+ return cache{
lruBlockHeaders: common.NewCache(maxCachedBlockHeaders),
lruBlockTxs: common.NewCache(maxCachedBlockTransactions),
lruVoteResults: common.NewCache(maxCachedVoteResults),
}
}
-type blockCache struct {
+type cache struct {
lruBlockHeaders *common.Cache
lruBlockTxs *common.Cache
lruVoteResults *common.Cache
fillBlockTransactionFn func(hash *bc.Hash) ([]*types.Tx, error)
fillVoteResultFn func(seq uint64) (*state.VoteResult, error)
- singleBlockHeader singleflight.Group
- singleBlockTxs singleflight.Group
- singleVoteResult singleflight.Group
+ sf singleflight.Group
}
-func (c *blockCache) lookupBlockHeader(hash *bc.Hash, height uint64) (*types.BlockHeader, error) {
- if bH, ok := c.getBlockHeader(hash); ok {
- return bH, nil
+func (c *cache) lookupBlockHeader(hash *bc.Hash, height uint64) (*types.BlockHeader, error) {
+ if data, ok := c.lruBlockHeaders.Get(*hash); ok {
+ return data.(*types.BlockHeader), nil
}
- blockHeader, err := c.singleBlockHeader.Do(hash.String(), func() (interface{}, error) {
- bH, err := c.fillBlockHeaderFn(hash, height)
+ blockHeader, err := c.sf.Do("BlockHeader:"+hash.String(), func() (interface{}, error) {
+ blockHeader, err := c.fillBlockHeaderFn(hash, height)
if err != nil {
return nil, err
}
- if bH == nil {
- return nil, fmt.Errorf("There are no blockHeader with given hash %s", hash.String())
- }
-
- c.addBlockHeader(bH)
- return bH, nil
+ c.lruBlockHeaders.Add(blockHeader.Hash(), blockHeader)
+ return blockHeader, nil
})
if err != nil {
return nil, err
return blockHeader.(*types.BlockHeader), nil
}
-func (c *blockCache) lookupBlockTxs(hash *bc.Hash) ([]*types.Tx, error) {
- if bTxs, ok := c.getBlockTransactions(hash); ok {
- return bTxs, nil
+func (c *cache) lookupBlockTxs(hash *bc.Hash) ([]*types.Tx, error) {
+ if data, ok := c.lruBlockTxs.Get(*hash); ok {
+ return data.([]*types.Tx), nil
}
- blockTransactions, err := c.singleBlockTxs.Do(hash.String(), func() (interface{}, error) {
- bTxs, err := c.fillBlockTransactionFn(hash)
+ blockTxs, err := c.sf.Do("BlockTxs:"+hash.String(), func() (interface{}, error) {
+ blockTxs, err := c.fillBlockTransactionFn(hash)
if err != nil {
return nil, err
}
- if bTxs == nil {
- return nil, fmt.Errorf("There are no block transactions with given hash %s", hash.String())
- }
-
- c.addBlockTxs(*hash, bTxs)
- return bTxs, nil
+ c.lruBlockTxs.Add(hash, blockTxs)
+ return blockTxs, nil
})
if err != nil {
return nil, err
}
- return blockTransactions.([]*types.Tx), nil
+ return blockTxs.([]*types.Tx), nil
}
-func (c *blockCache) lookupVoteResult(seq uint64) (*state.VoteResult, error) {
- if vr, ok := c.getVoteResult(seq); ok {
- return vr.Fork(), nil
+func (c *cache) lookupVoteResult(seq uint64) (*state.VoteResult, error) {
+ if data, ok := c.lruVoteResults.Get(seq); ok {
+ return data.(*state.VoteResult).Fork(), nil
}
seqStr := strconv.FormatUint(seq, 10)
- voteResult, err := c.singleVoteResult.Do(seqStr, func() (interface{}, error) {
- v, err := c.fillVoteResultFn(seq)
+ voteResult, err := c.sf.Do("VoteResult:"+seqStr, func() (interface{}, error) {
+ voteResult, err := c.fillVoteResultFn(seq)
if err != nil {
return nil, err
}
- if v == nil {
- return nil, fmt.Errorf("There are no vote result with given seq %s", seqStr)
- }
-
- c.addVoteResult(v)
- return v, nil
+ c.lruVoteResults.Add(voteResult.Seq, voteResult)
+ return voteResult, nil
})
if err != nil {
return nil, err
return voteResult.(*state.VoteResult).Fork(), nil
}
-func (c *blockCache) getBlockHeader(hash *bc.Hash) (*types.BlockHeader, bool) {
- blockHeader, ok := c.lruBlockHeaders.Get(*hash)
- if blockHeader == nil {
- return nil, ok
- }
- return blockHeader.(*types.BlockHeader), ok
-}
-
-func (c *blockCache) getBlockTransactions(hash *bc.Hash) ([]*types.Tx, bool) {
- txs, ok := c.lruBlockTxs.Get(*hash)
- if txs == nil {
- return nil, ok
- }
- return txs.([]*types.Tx), ok
-}
-
-func (c *blockCache) getVoteResult(seq uint64) (*state.VoteResult, bool) {
- voteResult, ok := c.lruVoteResults.Get(seq)
- if voteResult == nil {
- return nil, ok
- }
- return voteResult.(*state.VoteResult), ok
-}
-
-func (c *blockCache) addBlockHeader(blockHeader *types.BlockHeader) {
- c.lruBlockHeaders.Add(blockHeader.Hash(), blockHeader)
-}
-
-func (c *blockCache) addBlockTxs(hash bc.Hash, txs []*types.Tx) {
- c.lruBlockTxs.Add(hash, txs)
+func (c *cache) removeBlockHeader(blockHeader *types.BlockHeader) {
+ c.lruBlockHeaders.Remove(blockHeader.Hash())
}
-func (c *blockCache) addVoteResult(voteResult *state.VoteResult) {
- c.lruVoteResults.Add(voteResult.Seq, voteResult)
+func (c *cache) removeVoteResult(voteResult *state.VoteResult) {
+ c.lruVoteResults.Remove(voteResult.Seq)
}
return voteResults[seq], nil
}
- cache := newBlockCache(fillBlockHeaderFn, fillBlockTxsFn, fillVoteResultFn)
+ cache := newCache(fillBlockHeaderFn, fillBlockTxsFn, fillVoteResultFn)
for i := 0; i < maxCachedBlockHeaders+10; i++ {
block := newBlock(uint64(i))
for i := 0; i < 10; i++ {
block := newBlock(uint64(i))
hash := block.Hash()
- if b, _ := cache.getBlockHeader(&hash); b != nil {
+ if _, ok := cache.lruBlockHeaders.Get(hash); ok {
t.Fatalf("find old block")
}
}
for i := 10; i < maxCachedBlockHeaders+10; i++ {
block := newBlock(uint64(i))
hash := block.Hash()
- if b, _ := cache.getBlockHeader(&hash); b == nil {
+ if _, ok := cache.lruBlockHeaders.Get(hash); !ok {
t.Fatalf("can't find new block")
}
}
for i := 0; i < 10; i++ {
voteResult := newVoteResult(uint64(i))
- if v, _ := cache.getVoteResult(voteResult.Seq); v != nil {
+ if _, ok := cache.lruVoteResults.Get(voteResult.Seq); ok {
t.Fatalf("find old vote result")
}
}
for i := 10; i < maxCachedVoteResults+10; i++ {
voteResult := newVoteResult(uint64(i))
- if v, _ := cache.getVoteResult(voteResult.Seq); v == nil {
+ if _, ok := cache.lruVoteResults.Get(voteResult.Seq); !ok {
t.Fatalf("can't find new vote result")
}
}
import (
"encoding/binary"
"encoding/json"
+ "fmt"
"time"
"github.com/golang/protobuf/proto"
log "github.com/sirupsen/logrus"
- "github.com/tendermint/tmlibs/common"
dbm "github.com/vapor/database/leveldb"
"github.com/vapor/database/storage"
if bytes == nil {
return nil
}
+
bsj := &protocol.BlockStoreState{}
if err := json.Unmarshal(bytes, bsj); err != nil {
- common.PanicCrisis(common.Fmt("Could not unmarshal bytes: %X", bytes))
+ log.WithField("err", err).Panic("fail on unmarshal BlockStoreStateJSON")
}
return bsj
}
// methods for querying current data.
type Store struct {
db dbm.DB
- cache blockCache
+ cache cache
}
func calcBlockHeaderKey(height uint64, hash *bc.Hash) []byte {
// GetBlockHeader return the block header by given hash and height
func GetBlockHeader(db dbm.DB, hash *bc.Hash, height uint64) (*types.BlockHeader, error) {
- block := &types.Block{}
binaryBlockHeader := db.Get(calcBlockHeaderKey(height, hash))
if binaryBlockHeader == nil {
- return nil, nil
+ return nil, fmt.Errorf("There are no blockHeader with given hash %s", hash.String())
}
+
+ block := &types.Block{}
if err := block.UnmarshalText(binaryBlockHeader); err != nil {
return nil, err
}
-
return &block.BlockHeader, nil
}
// GetBlockTransactions return the block transactions by given hash
func GetBlockTransactions(db dbm.DB, hash *bc.Hash) ([]*types.Tx, error) {
- block := &types.Block{}
binaryBlockTxs := db.Get(calcBlockTransactionsKey(hash))
if binaryBlockTxs == nil {
- return nil, errors.New("The transactions in the block is empty")
+ return nil, fmt.Errorf("There are no block transactions with given hash %s", hash.String())
}
+ block := &types.Block{}
if err := block.UnmarshalText(binaryBlockTxs); err != nil {
return nil, err
}
fillVoteResultFn := func(seq uint64) (*state.VoteResult, error) {
return GetVoteResult(db, seq)
}
- bc := newBlockCache(fillBlockHeaderFn, fillBlockTxsFn, fillVoteResultFn)
+ bc := newCache(fillBlockHeaderFn, fillBlockTxsFn, fillVoteResultFn)
return &Store{
db: db,
cache: bc,
}
}
-// GetUtxo will search the utxo in db
-func (s *Store) GetUtxo(hash *bc.Hash) (*storage.UtxoEntry, error) {
- return getUtxo(s.db, hash)
-}
-
// BlockExist check if the block is stored in disk
func (s *Store) BlockExist(hash *bc.Hash, height uint64) bool {
- blockHeader, err := s.cache.lookupBlockHeader(hash, height)
- return err == nil && blockHeader != nil
+ _, err := s.cache.lookupBlockHeader(hash, height)
+ return err == nil
}
// GetBlock return the block by given hash
// GetBlockHeader return the BlockHeader by given hash
func (s *Store) GetBlockHeader(hash *bc.Hash, height uint64) (*types.BlockHeader, error) {
- blockHeader, err := s.cache.lookupBlockHeader(hash, height)
- if err != nil {
- return nil, err
- }
- return blockHeader, nil
+ return s.cache.lookupBlockHeader(hash, height)
}
// GetBlockTransactions return the Block transactions by given hash
func (s *Store) GetBlockTransactions(hash *bc.Hash) ([]*types.Tx, error) {
- txs, err := s.cache.lookupBlockTxs(hash)
- if err != nil {
- return nil, err
- }
- return txs, nil
+ return s.cache.lookupBlockTxs(hash)
+}
+
+// GetStoreStatus return the BlockStoreStateJSON
+func (s *Store) GetStoreStatus() *protocol.BlockStoreState {
+ return loadBlockStoreStateJSON(s.db)
}
// GetTransactionsUtxo will return all the utxo that related to the input txs
return ts, nil
}
-// GetStoreStatus return the BlockStoreStateJSON
-func (s *Store) GetStoreStatus() *protocol.BlockStoreState {
- return loadBlockStoreStateJSON(s.db)
+// GetUtxo will search the utxo in db
+func (s *Store) GetUtxo(hash *bc.Hash) (*storage.UtxoEntry, error) {
+ return getUtxo(s.db, hash)
}
// GetVoteResult retrive the voting result in specified vote sequence
// SaveBlock persists a new block in the protocol.
func (s *Store) SaveBlock(block *types.Block, ts *bc.TransactionStatus) error {
startTime := time.Now()
-
binaryBlockHeader, err := block.MarshalTextForBlockHeader()
if err != nil {
return errors.Wrap(err, "Marshal block header")
// SaveBlockHeader persists a new block header in the protocol.
func (s *Store) SaveBlockHeader(blockHeader *types.BlockHeader) error {
- startTime := time.Now()
-
binaryBlockHeader, err := blockHeader.MarshalText()
if err != nil {
return errors.Wrap(err, "Marshal block header")
blockHash := blockHeader.Hash()
s.db.Set(calcBlockHeaderKey(blockHeader.Height, &blockHash), binaryBlockHeader)
-
- // updata blockheader cache
- if _, ok := s.cache.getBlockHeader(&blockHash); ok {
- s.cache.addBlockHeader(blockHeader)
- }
-
- log.WithFields(log.Fields{
- "module": logModule,
- "height": blockHeader.Height,
- "hash": blockHash.String(),
- "duration": time.Since(startTime),
- }).Info("blockHeader saved on disk")
+ s.cache.removeBlockHeader(blockHeader)
return nil
}
}
batch.Set(calcVoteResultKey(vote.Seq), bytes)
- if _, ok := s.cache.getVoteResult(vote.Seq); ok {
- s.cache.addVoteResult(vote)
- }
+ s.cache.removeVoteResult(vote)
}
bytes, err := json.Marshal(protocol.BlockStoreState{
Vote: resOut.Vote,
}
default:
- log.WithFields(log.Fields{"module": logModule, "err": errors.Wrapf(bc.ErrEntryType, "entry %x has unexpected type %T", inpID.Bytes(), e)}).Error("txInToUtxos fail on get resOut")
continue
}
utxos = append(utxos, utxo)