From: mars Date: Fri, 14 Jun 2019 08:54:18 +0000 (+0800) Subject: split block X-Git-Url: http://git.osdn.net/view?a=commitdiff_plain;h=5e8fb58293815103f117402919045e64836d423a;p=bytom%2Fvapor.git split block --- diff --git a/database/cache.go b/database/cache.go index c963f25f..dc118fb2 100644 --- a/database/cache.go +++ b/database/cache.go @@ -13,7 +13,7 @@ import ( const maxCachedBlocks = 30 -func newBlockCache(fillFn func(hash *bc.Hash) (*types.Block, error)) blockCache { +func newBlockCache(fillFn func(hash *bc.Hash, height uint64) (*types.Block, error)) blockCache { return blockCache{ lru: lru.New(maxCachedBlocks), fillFn: fillFn, @@ -23,17 +23,17 @@ func newBlockCache(fillFn func(hash *bc.Hash) (*types.Block, error)) blockCache type blockCache struct { mu sync.Mutex lru *lru.Cache - fillFn func(hash *bc.Hash) (*types.Block, error) + fillFn func(hash *bc.Hash, height uint64) (*types.Block, error) single singleflight.Group } -func (c *blockCache) lookup(hash *bc.Hash) (*types.Block, error) { +func (c *blockCache) lookup(hash *bc.Hash, height uint64) (*types.Block, error) { if b, ok := c.get(hash); ok { return b, nil } block, err := c.single.Do(hash.String(), func() (interface{}, error) { - b, err := c.fillFn(hash) + b, err := c.fillFn(hash, height) if err != nil { return nil, err } diff --git a/database/store.go b/database/store.go index b7cd2eb4..9ca77773 100644 --- a/database/store.go +++ b/database/store.go @@ -21,11 +21,12 @@ import ( const logModule = "leveldb" var ( - blockStoreKey = []byte("blockStore") - blockPrefix = []byte("B:") - blockHeaderPrefix = []byte("BH:") - txStatusPrefix = []byte("BTS:") - voteResultPrefix = []byte("VR:") + blockStoreKey = []byte("blockStore") + blockPrefix = []byte("B:") + blockHeaderPrefix = []byte("BH:") + blockTransactonsPrefix = []byte("BTXS:") + txStatusPrefix = []byte("BTS:") + voteResultPrefix = []byte("VR:") ) func loadBlockStoreStateJSON(db dbm.DB) *protocol.BlockStoreState { @@ -59,6 +60,10 @@ func calcBlockHeaderKey(height uint64, hash *bc.Hash) []byte { return append(key, hash.Bytes()...) } +func calcBlockTransactionsKey(hash *bc.Hash) []byte { + return append(blockTransactonsPrefix, hash.Bytes()...) +} + func calcTxStatusKey(hash *bc.Hash) []byte { return append(txStatusPrefix, hash.Bytes()...) } @@ -69,8 +74,8 @@ func calcVoteResultKey(seq uint64) []byte { return append(voteResultPrefix, buf[:]...) } -// GetBlock return the block by given hash -func GetBlock(db dbm.DB, hash *bc.Hash) (*types.Block, error) { +// GetBlock return the block by given hash and height +func GetBlock(db dbm.DB, hash *bc.Hash, height uint64) (*types.Block, error) { bytez := db.Get(calcBlockKey(hash)) if bytez == nil { return nil, nil @@ -83,8 +88,8 @@ func GetBlock(db dbm.DB, hash *bc.Hash) (*types.Block, error) { // NewStore creates and returns a new Store object. func NewStore(db dbm.DB) *Store { - cache := newBlockCache(func(hash *bc.Hash) (*types.Block, error) { - return GetBlock(db, hash) + cache := newBlockCache(func(hash *bc.Hash, height uint64) (*types.Block, error) { + return GetBlock(db, hash, height) }) return &Store{ db: db, @@ -98,14 +103,19 @@ func (s *Store) GetUtxo(hash *bc.Hash) (*storage.UtxoEntry, error) { } // BlockExist check if the block is stored in disk -func (s *Store) BlockExist(hash *bc.Hash) bool { - block, err := s.cache.lookup(hash) +func (s *Store) BlockExist(hash *bc.Hash, height uint64) bool { + block, err := s.cache.lookup(hash, height) return err == nil && block != nil } // GetBlock return the block by given hash -func (s *Store) GetBlock(hash *bc.Hash) (*types.Block, error) { - return s.cache.lookup(hash) +func (s *Store) GetBlock(hash *bc.Hash, height uint64) (*types.Block, error) { + return s.cache.lookup(hash, height) +} + +// GetBlockHeader return the BlockHeader by given hash +func (s *Store) GetBlockHeader(*bc.Hash, uint64) (*types.BlockHeader, error) { + return nil, nil } // GetTransactionsUtxo will return all the utxo that related to the input txs @@ -192,12 +202,19 @@ func (s *Store) LoadBlockIndex(stateBestHeight uint64) (*state.BlockIndex, error // SaveBlock persists a new block in the protocol. func (s *Store) SaveBlock(block *types.Block, ts *bc.TransactionStatus) error { startTime := time.Now() - binaryBlock, err := block.MarshalText() + /* + binaryBlock, err := block.MarshalText() + if err != nil { + return errors.Wrap(err, "Marshal block meta") + } + */ + + binaryBlockHeader, err := block.BlockHeader.MarshalText() if err != nil { - return errors.Wrap(err, "Marshal block meta") + return errors.Wrap(err, "Marshal block header") } - binaryBlockHeader, err := block.BlockHeader.MarshalText() + binaryBlockTxs, err := block.MarshalTextForTransactions() if err != nil { return errors.Wrap(err, "Marshal block header") } @@ -209,8 +226,9 @@ func (s *Store) SaveBlock(block *types.Block, ts *bc.TransactionStatus) error { blockHash := block.Hash() batch := s.db.NewBatch() - batch.Set(calcBlockKey(&blockHash), binaryBlock) + //batch.Set(calcBlockKey(&blockHash), binaryBlock) batch.Set(calcBlockHeaderKey(block.Height, &blockHash), binaryBlockHeader) + batch.Set(calcBlockTransactionsKey(&blockHash), binaryBlockTxs) batch.Set(calcTxStatusKey(&blockHash), binaryTxStatus) batch.Write() @@ -223,6 +241,29 @@ func (s *Store) SaveBlock(block *types.Block, ts *bc.TransactionStatus) error { return nil } +// SaveBlockHeader persists a new block header in the protocol. +func (s *Store) SaveBlockHeader(blockHeader *types.BlockHeader) error { + startTime := time.Now() + + binaryBlockHeader, err := blockHeader.MarshalText() + if err != nil { + return errors.Wrap(err, "Marshal block header") + } + + blockHash := blockHeader.Hash() + batch := s.db.NewBatch() + batch.Set(calcBlockHeaderKey(blockHeader.Height, &blockHash), binaryBlockHeader) + batch.Write() + + log.WithFields(log.Fields{ + "module": logModule, + "height": blockHeader.Height, + "hash": blockHash.String(), + "duration": time.Since(startTime), + }).Info("blockHeader saved on disk") + return nil +} + // SaveChainStatus save the core's newest status && delete old status func (s *Store) SaveChainStatus(node, irreversibleNode *state.BlockNode, view *state.UtxoViewpoint, voteResults []*state.VoteResult) error { batch := s.db.NewBatch() diff --git a/protocol/bbft.go b/protocol/bbft.go index c6fc3b70..6c619495 100644 --- a/protocol/bbft.go +++ b/protocol/bbft.go @@ -216,19 +216,14 @@ func (c *Chain) updateBlockSignature(blockNode *state.BlockNode, nodeOrder uint6 return err } - block, err := c.store.GetBlock(&blockNode.Hash) + blockHeader, err := c.store.GetBlockHeader(&blockNode.Hash, blockNode.Height) if err != nil { return err } - block.Set(nodeOrder, signature) + blockHeader.Set(nodeOrder, signature) - txStatus, err := c.store.GetTransactionStatus(&blockNode.Hash) - if err != nil { - return err - } - - if err := c.store.SaveBlock(block, txStatus); err != nil { + if err := c.store.SaveBlockHeader(blockHeader); err != nil { return err } diff --git a/protocol/bc/types/block.go b/protocol/bc/types/block.go index dbb301a6..1eeb00c2 100644 --- a/protocol/bc/types/block.go +++ b/protocol/bc/types/block.go @@ -58,6 +58,67 @@ func (b *Block) UnmarshalText(text []byte) error { return nil } +// MarshalTextForTransactions fulfills the json.Marshaler interface. +func (b *Block) MarshalTextForTransactions() ([]byte, error) { + buf := bufpool.Get() + defer bufpool.Put(buf) + + ew := errors.NewWriter(buf) + ew.Write([]byte{SerBlockTransactions}) + + if _, err := blockchain.WriteVarint31(ew, uint64(len(b.Transactions))); err != nil { + return nil, err + } + + for _, tx := range b.Transactions { + if _, err := tx.WriteTo(ew); err != nil { + return nil, err + } + } + + if err := ew.Err(); err != nil { + return nil, err + } + + enc := make([]byte, hex.EncodedLen(buf.Len())) + hex.Encode(enc, buf.Bytes()) + return enc, nil +} + +// UnmarshalTextForTransactions fulfills the encoding.TextUnmarshaler interface. +func (b *Block) UnmarshalTextForTransactions(text []byte) error { + decoded := make([]byte, hex.DecodedLen(len(text))) + if _, err := hex.Decode(decoded, text); err != nil { + return err + } + + r := blockchain.NewReader(decoded) + var serflags [1]byte + io.ReadFull(r, serflags[:]) + if serflags[0] != SerBlockTransactions { + return fmt.Errorf("unsupported serialization flags 0x%x", serflags) + } + + n, err := blockchain.ReadVarint31(r) + if err != nil { + return errors.Wrap(err, "reading number of transactions") + } + + for ; n > 0; n-- { + data := TxData{} + if err = data.readFrom(r); err != nil { + return errors.Wrapf(err, "reading transaction %d", len(b.Transactions)) + } + + b.Transactions = append(b.Transactions, NewTx(data)) + } + + if trailing := r.Len(); trailing > 0 { + return fmt.Errorf("trailing garbage (%d bytes)", trailing) + } + return nil +} + func (b *Block) readFrom(r *blockchain.Reader) error { serflags, err := b.BlockHeader.readFrom(r) if err != nil { diff --git a/protocol/block.go b/protocol/block.go index 8d878042..d5ef5cf7 100644 --- a/protocol/block.go +++ b/protocol/block.go @@ -28,7 +28,11 @@ func (c *Chain) BlockExist(hash *bc.Hash) bool { // GetBlockByHash return a block by given hash func (c *Chain) GetBlockByHash(hash *bc.Hash) (*types.Block, error) { - return c.store.GetBlock(hash) + node := c.index.GetNode(hash) + if node == nil { + return nil, errors.New("can't find block in given hash") + } + return c.store.GetBlock(hash, node.Height) } // GetBlockByHeight return a block header by given height @@ -37,7 +41,7 @@ func (c *Chain) GetBlockByHeight(height uint64) (*types.Block, error) { if node == nil { return nil, errors.New("can't find block in given height") } - return c.store.GetBlock(&node.Hash) + return c.store.GetBlock(&node.Hash, height) } // GetHeaderByHash return a block header by given hash @@ -125,7 +129,7 @@ func (c *Chain) reorganizeChain(node *state.BlockNode) error { } for _, detachNode := range detachNodes { - b, err := c.store.GetBlock(&detachNode.Hash) + b, err := c.store.GetBlock(&detachNode.Hash, detachNode.Height) if err != nil { return err } @@ -152,7 +156,7 @@ func (c *Chain) reorganizeChain(node *state.BlockNode) error { } for _, attachNode := range attachNodes { - b, err := c.store.GetBlock(&attachNode.Hash) + b, err := c.store.GetBlock(&attachNode.Hash, attachNode.Height) if err != nil { return err } diff --git a/protocol/consensus_node_manager.go b/protocol/consensus_node_manager.go index 4fe66385..66d18263 100644 --- a/protocol/consensus_node_manager.go +++ b/protocol/consensus_node_manager.go @@ -66,7 +66,7 @@ func getBlockerOrder(startTimestamp, blockTimestamp, numOfConsensusNode uint64) // The start time of the last round of product block lastRoundStartTime := startTimestamp + (blockTimestamp-startTimestamp)/roundBlockTime*roundBlockTime // Order of blocker - return (blockTimestamp - lastRoundStartTime)/(consensus.BlockNumEachNode*consensus.BlockTimeInterval) + return (blockTimestamp - lastRoundStartTime) / (consensus.BlockNumEachNode * consensus.BlockTimeInterval) } func (c *consensusNodeManager) getPrevRoundLastBlock(prevBlockHash *bc.Hash) (*state.BlockNode, error) { @@ -114,7 +114,7 @@ func (c *consensusNodeManager) getBestVoteResult() (*state.VoteResult, error) { // getVoteResult return the vote result // seq represent the sequence of vote // blockNode represent the chain in which the result of the vote is located -// Voting results need to be adjusted according to the chain +// Voting results need to be adjusted according to the chain func (c *consensusNodeManager) getVoteResult(seq uint64, blockNode *state.BlockNode) (*state.VoteResult, error) { voteResult, err := c.store.GetVoteResult(seq) if err != nil { @@ -136,7 +136,7 @@ func (c *consensusNodeManager) reorganizeVoteResult(voteResult *state.VoteResult var forChainRollback, mainChainRollBack bool if forChainRollback = forkChainNode.Height >= mainChainNode.Height; forChainRollback { attachNodes = append([]*state.BlockNode{forkChainNode}, attachNodes...) - } + } if mainChainRollBack = forkChainNode.Height <= mainChainNode.Height; mainChainRollBack { detachNodes = append(detachNodes, mainChainNode) } @@ -149,7 +149,7 @@ func (c *consensusNodeManager) reorganizeVoteResult(voteResult *state.VoteResult } for _, node := range detachNodes { - block, err := c.store.GetBlock(&node.Hash) + block, err := c.store.GetBlock(&node.Hash, node.Height) if err != nil { return err } @@ -160,7 +160,7 @@ func (c *consensusNodeManager) reorganizeVoteResult(voteResult *state.VoteResult } for _, node := range attachNodes { - block, err := c.store.GetBlock(&node.Hash) + block, err := c.store.GetBlock(&node.Hash, node.Height) if err != nil { return err } diff --git a/protocol/store.go b/protocol/store.go index a0f6b038..b3909ff8 100644 --- a/protocol/store.go +++ b/protocol/store.go @@ -15,9 +15,10 @@ var ( // Store provides storage interface for blockchain data type Store interface { - BlockExist(*bc.Hash) bool + BlockExist(*bc.Hash, uint64) bool - GetBlock(*bc.Hash) (*types.Block, error) + GetBlock(*bc.Hash, uint64) (*types.Block, error) + GetBlockHeader(*bc.Hash, uint64) (*types.BlockHeader, error) GetStoreStatus() *BlockStoreState GetTransactionStatus(*bc.Hash) (*bc.TransactionStatus, error) GetTransactionsUtxo(*state.UtxoViewpoint, []*bc.Tx) error @@ -26,6 +27,7 @@ type Store interface { LoadBlockIndex(uint64) (*state.BlockIndex, error) SaveBlock(*types.Block, *bc.TransactionStatus) error + SaveBlockHeader(*types.BlockHeader) error SaveChainStatus(*state.BlockNode, *state.BlockNode, *state.UtxoViewpoint, []*state.VoteResult) error } diff --git a/protocol/txpool_test.go b/protocol/txpool_test.go index 59e9d25b..1a7c07ec 100644 --- a/protocol/txpool_test.go +++ b/protocol/txpool_test.go @@ -111,8 +111,8 @@ var testTxs = []*types.Tx{ type mockStore struct{} -func (s *mockStore) BlockExist(hash *bc.Hash) bool { return false } -func (s *mockStore) GetBlock(*bc.Hash) (*types.Block, error) { return nil, nil } +func (s *mockStore) BlockExist(hash *bc.Hash, height uint64) bool { return false } +func (s *mockStore) GetBlock(*bc.Hash, uint64) (*types.Block, error) { return nil, nil } func (s *mockStore) GetStoreStatus() *BlockStoreState { return nil } func (s *mockStore) GetTransactionStatus(*bc.Hash) (*bc.TransactionStatus, error) { return nil, nil } func (s *mockStore) GetTransactionsUtxo(*state.UtxoViewpoint, []*bc.Tx) error { return nil } @@ -653,8 +653,8 @@ func TestRemoveOrphan(t *testing.T) { type mockStore1 struct{} -func (s *mockStore1) BlockExist(hash *bc.Hash) bool { return false } -func (s *mockStore1) GetBlock(*bc.Hash) (*types.Block, error) { return nil, nil } +func (s *mockStore1) BlockExist(hash *bc.Hash, height uint64) bool { return false } +func (s *mockStore1) GetBlock(*bc.Hash, uint64) (*types.Block, error) { return nil, nil } func (s *mockStore1) GetStoreStatus() *BlockStoreState { return nil } func (s *mockStore1) GetTransactionStatus(*bc.Hash) (*bc.TransactionStatus, error) { return nil, nil } func (s *mockStore1) GetTransactionsUtxo(utxoView *state.UtxoViewpoint, tx []*bc.Tx) error {