OSDN Git Service

batch_save_state_and_checkpoints (#1937)
authorPoseidon <shenao.78@163.com>
Mon, 24 May 2021 08:01:30 +0000 (16:01 +0800)
committerGitHub <noreply@github.com>
Mon, 24 May 2021 08:01:30 +0000 (16:01 +0800)
* batch_save_state_and_checkpoints

* fix ci

Co-authored-by: Paladz <yzhu101@uottawa.ca>
database/store.go
database/store_test.go
protocol/apply_block.go
protocol/auth_verification.go
protocol/block.go
protocol/casper.go
protocol/protocol.go
protocol/store.go
protocol/txpool_test.go
test/utxo_view/utxo_view_test.go

index f0d2d7f..548c27f 100644 (file)
@@ -231,7 +231,7 @@ func (s *Store) LoadBlockIndex(stateBestHeight uint64) (*state.BlockIndex, error
 }
 
 // SaveChainStatus save the core's newest status && delete old status
-func (s *Store) SaveChainStatus(node *state.BlockNode, view *state.UtxoViewpoint, contractView *state.ContractViewpoint, finalizedHeight uint64, finalizedHash *bc.Hash) error {
+func (s *Store) SaveChainStatus(node *state.BlockNode, view *state.UtxoViewpoint, contractView *state.ContractViewpoint, checkpoints []*state.Checkpoint, finalizedHeight uint64, finalizedHash *bc.Hash) error {
        batch := s.db.NewBatch()
        if err := saveUtxoView(batch, view); err != nil {
                return err
@@ -245,6 +245,10 @@ func (s *Store) SaveChainStatus(node *state.BlockNode, view *state.UtxoViewpoint
                return err
        }
 
+       if err := s.saveCheckpoints(batch, checkpoints); err != nil {
+               return err
+       }
+
        bytes, err := json.Marshal(protocol.BlockStoreState{Height: node.Height, Hash: &node.Hash, FinalizedHeight: finalizedHeight, FinalizedHash: finalizedHash})
        if err != nil {
                return err
@@ -331,6 +335,16 @@ func (s *Store) loadCheckpointsFromIter(iter dbm.Iterator) ([]*state.Checkpoint,
 // SaveCheckpoints bulk save multiple checkpoint
 func (s *Store) SaveCheckpoints(checkpoints ...*state.Checkpoint) error {
        batch := s.db.NewBatch()
+
+       if err := s.saveCheckpoints(batch, checkpoints); err != nil {
+               return err
+       }
+
+       batch.Write()
+       return nil
+}
+
+func (s *Store) saveCheckpoints(batch dbm.Batch, checkpoints []*state.Checkpoint) error {
        for _, checkpoint := range checkpoints {
                data, err := json.Marshal(checkpoint)
                if err != nil {
@@ -348,7 +362,6 @@ func (s *Store) SaveCheckpoints(checkpoints ...*state.Checkpoint) error {
 
                batch.Set(calcCheckpointKey(checkpoint.Height, &checkpoint.Hash), data)
        }
-       batch.Write()
        return nil
 }
 
index 1202646..10cf0d1 100644 (file)
@@ -151,7 +151,7 @@ func TestSaveChainStatus(t *testing.T) {
        }
 
        contractView := state.NewContractViewpoint()
-       if err := store.SaveChainStatus(node, view, contractView, 0, &bc.Hash{}); err != nil {
+       if err := store.SaveChainStatus(node, view, contractView, nil,0, &bc.Hash{}); err != nil {
                t.Fatal(err)
        }
 
index bb13486..0b61d47 100644 (file)
@@ -16,38 +16,43 @@ import (
 // the tree of checkpoint will grow with the arrival of new blocks
 // it will return verification when an epoch is reached and the current node is the validator, otherwise return nil
 // the chain module must broadcast the verification
-func (c *Casper) ApplyBlock(block *types.Block) (*Verification, error) {
+func (c *Casper) ApplyBlock(block *types.Block) (*Verification, *state.Checkpoint, error) {
+       if block.Height % state.BlocksOfEpoch == 1 {
+               c.newEpochCh <- block.PreviousBlockHash
+       }
+
        c.mu.Lock()
        defer c.mu.Unlock()
 
        if _, err := c.tree.nodeByHash(block.Hash()); err == nil {
                // already processed
-               return nil, nil
+               return nil, nil, nil
        }
 
        target, err := c.applyBlockToCheckpoint(block)
        if err != nil {
-               return nil, errors.Wrap(err, "apply block to checkpoint")
+               return nil, nil, errors.Wrap(err, "apply block to checkpoint")
        }
 
        if err := c.applyTransactions(target, block.Transactions); err != nil {
-               return nil, err
+               return nil, nil, err
        }
 
        validators, err := c.Validators(&target.Hash)
        if err != nil {
-               return nil, err
+               return nil, nil, err
        }
 
-       if err := c.applySupLinks(target, block.SupLinks, validators); err != nil {
-               return nil, err
+       verification, err := c.applyMyVerification(target, block, validators)
+       if err != nil {
+               return nil, nil, err
        }
 
-       if block.Height%state.BlocksOfEpoch == 0 {
-               c.newEpochCh <- block.Hash()
+       if err := c.applySupLinks(target, block.SupLinks, validators); err != nil {
+               return nil, nil, err
        }
 
-       return c.applyMyVerification(target, validators)
+       return verification, target, nil
 }
 
 func (c *Casper) applyBlockToCheckpoint(block *types.Block) (*state.Checkpoint, error) {
@@ -74,7 +79,7 @@ func (c *Casper) applyBlockToCheckpoint(block *types.Block) (*state.Checkpoint,
        checkpoint.Height = block.Height
        checkpoint.Hash = block.Hash()
        checkpoint.Timestamp = block.Timestamp
-       return checkpoint, c.store.SaveCheckpoints(checkpoint)
+       return checkpoint, nil
 }
 
 func (c *Casper) applyTransactions(target *state.Checkpoint, transactions []*types.Tx) error {
@@ -116,6 +121,7 @@ func (c *Casper) applySupLinks(target *state.Checkpoint, supLinks []*types.SupLi
                return nil
        }
 
+       affectedCheckpoints := make(map[bc.Hash]*state.Checkpoint)
        for _, supLink := range supLinks {
                var validVerifications []*Verification
                for _, v := range supLinkToVerifications(supLink, validators, target.Hash, target.Height) {
@@ -123,25 +129,27 @@ func (c *Casper) applySupLinks(target *state.Checkpoint, supLinks []*types.SupLi
                                validVerifications = append(validVerifications, v)
                        }
                }
-               if err := c.addVerificationToCheckpoint(target, validators, validVerifications...); err != nil {
+
+               checkpoints, err := c.addVerificationToCheckpoint(target, validators, validVerifications...)
+               if err != nil {
                        return err
                }
-       }
-       return nil
-}
 
-func (c *Casper) applyMyVerification(target *state.Checkpoint, validators map[string]*state.Validator) (*Verification, error) {
-       if target.Height%state.BlocksOfEpoch != 0 {
-               return nil, nil
+               for _, c := range checkpoints {
+                       affectedCheckpoints[c.Hash] = c
+               }
        }
 
-       pubKey := config.CommonConfig.PrivateKey().XPub().String()
-       if _, ok := validators[pubKey]; !ok {
-               return nil, nil
+       delete(affectedCheckpoints, target.Hash)
+       var checkpoints []*state.Checkpoint
+       for _, c := range affectedCheckpoints {
+               checkpoints = append(checkpoints, c)
        }
+       return c.store.SaveCheckpoints(checkpoints...)
+}
 
-       validatorOrder := validators[pubKey].Order
-       v, err := c.myVerification(target, validatorOrder)
+func (c *Casper) applyMyVerification(target *state.Checkpoint, block *types.Block, validators map[string]*state.Validator) (*Verification, error) {
+       v, err := c.myVerification(target, validators)
        if err != nil {
                return nil, err
        }
@@ -150,20 +158,31 @@ func (c *Casper) applyMyVerification(target *state.Checkpoint, validators map[st
                return nil, nil
        }
 
-       if err := c.addVerificationToCheckpoint(target, validators, v); err != nil {
+       signature, err := hex.DecodeString(v.Signature)
+       if err != nil {
                return nil, err
        }
 
-       return v, c.saveVerificationToHeader(v, validatorOrder)
+       block.SupLinks.AddSupLink(v.SourceHeight, v.SourceHash, signature, validators[v.PubKey].Order)
+       return v, c.store.SaveBlockHeader(&block.BlockHeader)
 }
 
-func (c *Casper) myVerification(target *state.Checkpoint, validatorOrder int) (*Verification, error) {
+func (c *Casper) myVerification(target *state.Checkpoint, validators map[string]*state.Validator) (*Verification, error) {
+       if target.Height % state.BlocksOfEpoch != 0 {
+               return nil, nil
+       }
+
+       pubKey := config.CommonConfig.PrivateKey().XPub().String()
+       if _, ok := validators[pubKey]; !ok {
+               return nil, nil
+       }
+
+       validatorOrder := validators[pubKey].Order
        source := c.lastJustifiedCheckpointOfBranch(target)
        if target.ContainsVerification(source.Hash, validatorOrder) {
                return nil, nil
        }
 
-       pubKey := config.CommonConfig.PrivateKey().XPub().String()
        if source != nil {
                v := &Verification{
                        SourceHash:   source.Hash,
index c43e891..fb9e3aa 100644 (file)
@@ -57,20 +57,25 @@ func (c *Casper) authVerification(v *Verification, target *state.Checkpoint, val
                return err
        }
 
-       if err := c.addVerificationToCheckpoint(target, validators, v); err != nil {
+       checkpoints, err := c.addVerificationToCheckpoint(target, validators, v)
+       if err != nil {
+               return err
+       }
+
+       if err := c.store.SaveCheckpoints(checkpoints...); err != nil {
                return err
        }
 
        return c.saveVerificationToHeader(v, validator.Order)
 }
 
-func (c *Casper) addVerificationToCheckpoint(target *state.Checkpoint, validators map[string]*state.Validator, verifications ...*Verification) error {
+func (c *Casper) addVerificationToCheckpoint(target *state.Checkpoint, validators map[string]*state.Validator, verifications ...*Verification) ([]*state.Checkpoint, error) {
        _, oldBestHash := c.bestChain()
        var affectedCheckpoints []*state.Checkpoint
        for _, v := range verifications {
                source, err := c.store.GetCheckpoint(&v.SourceHash)
                if err != nil {
-                       return err
+                       return nil, err
                }
 
                supLink := target.AddVerification(v.SourceHash, v.SourceHeight, validators[v.PubKey].Order, v.Signature)
@@ -91,7 +96,7 @@ func (c *Casper) addVerificationToCheckpoint(target *state.Checkpoint, validator
                c.rollbackNotifyCh <- nil
        }
 
-       return c.store.SaveCheckpoints(affectedCheckpoints...)
+       return affectedCheckpoints, nil
 }
 
 func (c *Casper) saveVerificationToHeader(v *Verification, validatorOrder int) error {
index db4f460..eb5abf3 100644 (file)
@@ -86,7 +86,7 @@ func (c *Chain) connectBlock(block *types.Block) (err error) {
                return err
        }
 
-       verification, err := c.casper.ApplyBlock(block)
+       verification, checkpoint, err := c.casper.ApplyBlock(block)
        if err != nil {
                return err
        }
@@ -103,7 +103,7 @@ func (c *Chain) connectBlock(block *types.Block) (err error) {
        }
 
        node := c.index.GetNode(&bcBlock.ID)
-       if err := c.setState(node, utxoView, contractView); err != nil {
+       if err := c.setState(node, utxoView, contractView, checkpoint); err != nil {
                return err
        }
 
@@ -145,6 +145,7 @@ func (c *Chain) reorganizeChain(node *state.BlockNode) error {
        }
 
        txsToRemove := map[bc.Hash]*types.Tx{}
+       var affectedCheckpoints []*state.Checkpoint
        for _, attachNode := range attachNodes {
                b, err := c.store.GetBlock(&attachNode.Hash)
                if err != nil {
@@ -160,11 +161,13 @@ func (c *Chain) reorganizeChain(node *state.BlockNode) error {
                        return err
                }
 
-               verification, err := c.casper.ApplyBlock(b)
+               verification, checkpoint, err := c.casper.ApplyBlock(b)
                if err != nil {
                        return err
                }
 
+               affectedCheckpoints = append(affectedCheckpoints, checkpoint)
+
                if err := c.broadcastVerification(verification); err != nil {
                        return err
                }
@@ -184,7 +187,7 @@ func (c *Chain) reorganizeChain(node *state.BlockNode) error {
                log.WithFields(log.Fields{"module": logModule, "height": node.Height, "hash": node.Hash.String()}).Debug("attach from mainchain")
        }
 
-       if err := c.setState(node, utxoView, contractView); err != nil {
+       if err := c.setState(node, utxoView, contractView, affectedCheckpoints...); err != nil {
                return err
        }
 
index ed265d2..d73ead3 100644 (file)
@@ -172,15 +172,15 @@ func (c *Casper) prevCheckpointHash(blockHash *bc.Hash) (*bc.Hash, error) {
                return nil, err
        }
 
-       c.prevCheckpointCache.Add(blockHash, result)
+       c.prevCheckpointCache.Add(*blockHash, result)
        return result, nil
 }
 
 func (c *Casper) prevCheckpointHashByPrevHash(prevBlockHash *bc.Hash) (*bc.Hash, error) {
        prevHash := prevBlockHash
        for {
-               if data, ok := c.prevCheckpointCache.Get(prevHash); ok {
-                       c.prevCheckpointCache.Add(prevBlockHash, data)
+               if data, ok := c.prevCheckpointCache.Get(*prevHash); ok {
+                       c.prevCheckpointCache.Add(*prevBlockHash, data)
                        return data.(*bc.Hash), nil
                }
 
@@ -190,7 +190,7 @@ func (c *Casper) prevCheckpointHashByPrevHash(prevBlockHash *bc.Hash) (*bc.Hash,
                }
 
                if prevBlock.Height%state.BlocksOfEpoch == 0 {
-                       c.prevCheckpointCache.Add(prevBlockHash, prevHash)
+                       c.prevCheckpointCache.Add(*prevBlockHash, prevHash)
                        return prevHash, nil
                }
 
index aac4bb3..7d96ed1 100644 (file)
@@ -85,9 +85,6 @@ func (c *Chain) initChainStatus() error {
                Timestamp: genesisBlock.Timestamp,
                Status:    state.Justified,
        }
-       if err := c.store.SaveCheckpoints(checkpoint); err != nil {
-               return err
-       }
 
        utxoView := state.NewUtxoViewpoint()
        bcBlock := types.MapBlock(genesisBlock)
@@ -101,7 +98,7 @@ func (c *Chain) initChainStatus() error {
        }
 
        contractView := state.NewContractViewpoint()
-       return c.store.SaveChainStatus(node, utxoView, contractView, 0, &checkpoint.Hash)
+       return c.store.SaveChainStatus(node, utxoView, contractView, []*state.Checkpoint{checkpoint}, 0, &checkpoint.Hash)
 }
 
 func newCasper(store Store, storeStatus *BlockStoreState, rollbackNotifyCh chan interface{}) (*Casper, error) {
@@ -193,9 +190,9 @@ func (c *Chain) SignBlockHeader(blockHeader *types.BlockHeader) {
 }
 
 // This function must be called with mu lock in above level
-func (c *Chain) setState(node *state.BlockNode, view *state.UtxoViewpoint, contractView *state.ContractViewpoint) error {
+func (c *Chain) setState(node *state.BlockNode, view *state.UtxoViewpoint, contractView *state.ContractViewpoint, checkpoints ...*state.Checkpoint) error {
        finalizedHeight, finalizedHash := c.casper.LastFinalized()
-       if err := c.store.SaveChainStatus(node, view, contractView, finalizedHeight, &finalizedHash); err != nil {
+       if err := c.store.SaveChainStatus(node, view, contractView, checkpoints, finalizedHeight, &finalizedHash); err != nil {
                return err
        }
 
index 09dd762..db4dfc5 100644 (file)
@@ -26,7 +26,7 @@ type Store interface {
        LoadBlockIndex(uint64) (*state.BlockIndex, error)
        SaveBlock(*types.Block) error
        SaveBlockHeader(*types.BlockHeader) error
-       SaveChainStatus(*state.BlockNode, *state.UtxoViewpoint, *state.ContractViewpoint, uint64, *bc.Hash) error
+       SaveChainStatus(*state.BlockNode, *state.UtxoViewpoint, *state.ContractViewpoint, []*state.Checkpoint, uint64, *bc.Hash) error
 }
 
 // BlockStoreState represents the core's db status
index 0e37af7..e6ece43 100644 (file)
@@ -111,7 +111,7 @@ func (s *mockStore) GetContract(hash [32]byte) ([]byte, error)
 func (s *mockStore) LoadBlockIndex(uint64) (*state.BlockIndex, error)             { return nil, nil }
 func (s *mockStore) SaveBlock(*types.Block) error                                 { return nil }
 func (s *mockStore) SaveBlockHeader(*types.BlockHeader) error                     { return nil }
-func (s *mockStore) SaveChainStatus(*state.BlockNode, *state.UtxoViewpoint, *state.ContractViewpoint, uint64, *bc.Hash) error {
+func (s *mockStore) SaveChainStatus(*state.BlockNode, *state.UtxoViewpoint, *state.ContractViewpoint, []*state.Checkpoint, uint64, *bc.Hash) error {
        return nil
 }
 
@@ -614,7 +614,7 @@ func (s *mockStore1) GetContract(hash [32]byte) ([]byte, error)           { retu
 func (s *mockStore1) LoadBlockIndex(uint64) (*state.BlockIndex, error)    { return nil, nil }
 func (s *mockStore1) SaveBlock(*types.Block) error                        { return nil }
 func (s *mockStore1) SaveBlockHeader(*types.BlockHeader) error            { return nil }
-func (s *mockStore1) SaveChainStatus(*state.BlockNode, *state.UtxoViewpoint, *state.ContractViewpoint, uint64, *bc.Hash) error { return nil}
+func (s *mockStore1) SaveChainStatus(*state.BlockNode, *state.UtxoViewpoint, *state.ContractViewpoint, []*state.Checkpoint, uint64, *bc.Hash) error { return nil}
 
 func TestProcessTransaction(t *testing.T) {
        txPool := &TxPool{
index b70dd15..53db6ec 100644 (file)
@@ -293,7 +293,7 @@ func TestAttachOrDetachBlocks(t *testing.T) {
                        utxoViewpoint0.Entries[k] = v
                }
                contractView := state.NewContractViewpoint()
-               if err := store.SaveChainStatus(node, utxoViewpoint0, contractView, 0, &bc.Hash{}); err != nil {
+               if err := store.SaveChainStatus(node, utxoViewpoint0, contractView, nil, 0, &bc.Hash{}); err != nil {
                        t.Error(err)
                }
 
@@ -315,7 +315,7 @@ func TestAttachOrDetachBlocks(t *testing.T) {
                                t.Error(err)
                        }
                }
-               if err := store.SaveChainStatus(node, utxoViewpoint, contractView, 0, &bc.Hash{}); err != nil {
+               if err := store.SaveChainStatus(node, utxoViewpoint, contractView, nil,0, &bc.Hash{}); err != nil {
                        t.Error(err)
                }