From e34f38162f5d016893e53d6f14d47ed06dafe031 Mon Sep 17 00:00:00 2001 From: muscle_boy Date: Wed, 22 May 2019 14:29:17 +0800 Subject: [PATCH] Dpos process block (#69) * add bbft process block * merge v0.1 * remove functional test * opt code * opt code * opt code * bug fix * bug fix * opt code * opt code * bug fix * add update consensus node * opt code --- .travis.yml | 3 - Makefile | 2 +- common/bit_map.go | 53 +++++ database/store.go | 31 ++- database/store_test.go | 4 +- protocol/bbft.go | 240 +++++++++++++++++++++ protocol/bbft/bbft.go | 39 ---- protocol/bbft/consensus_node_manager.go | 170 --------------- ...consensus_node_manager_test.go => bbft_test.go} | 42 ++-- protocol/block.go | 53 ++++- protocol/consensus_node_manager.go | 205 ++++++++++++++++++ protocol/protocol.go | 18 +- protocol/state/blockindex.go | 22 +- protocol/state/blockindex_test.go | 5 +- protocol/store.go | 15 +- protocol/txpool_test.go | 6 +- protocol/validation/tx.go | 2 +- test/utxo_view/utxo_view_test.go | 4 +- 18 files changed, 645 insertions(+), 269 deletions(-) create mode 100644 common/bit_map.go create mode 100644 protocol/bbft.go delete mode 100644 protocol/bbft/bbft.go delete mode 100644 protocol/bbft/consensus_node_manager.go rename protocol/{bbft/consensus_node_manager_test.go => bbft_test.go} (68%) create mode 100644 protocol/consensus_node_manager.go diff --git a/.travis.yml b/.travis.yml index 63925506..02a340a5 100644 --- a/.travis.yml +++ b/.travis.yml @@ -6,9 +6,6 @@ sudo: false matrix: include: - go: 1.11.4 - - go: tip - allow_failures: - - go: tip branches: only: diff --git a/Makefile b/Makefile index 1345055c..fa8426b1 100644 --- a/Makefile +++ b/Makefile @@ -133,6 +133,6 @@ benchmark: functional-tests: @go test -timeout=5m -tags="functional" ./test -ci: test functional-tests +ci: test .PHONY: all target release-all clean test benchmark diff --git a/common/bit_map.go b/common/bit_map.go new file mode 100644 index 00000000..6dabaebd --- /dev/null +++ b/common/bit_map.go @@ -0,0 +1,53 @@ +package common + +import ( + "errors" +) + +const bitLen = 32 + +var ( + errIndexOutOfBounds = errors.New("index out of bounds error") +) + +type BitMap struct { + size uint32 + arr []int32 +} + +func NewBitMap(size uint32) *BitMap { + obj := &BitMap{size: size} + num := (size + bitLen - 1) / bitLen + arr := make([]int32, num) + obj.arr = arr + return obj +} + +func (b *BitMap) Set(index uint32) error { + if index >= b.size { + return errIndexOutOfBounds + } + + arrIndex, bitIndex := index / bitLen, index % bitLen + b.arr[arrIndex] |= (1 << bitIndex) + return nil +} + +func (b *BitMap) Clean(index uint32) error { + if index >= b.size { + return errIndexOutOfBounds + } + + arrIndex, bitIndex := index / bitLen, index % bitLen + b.arr[arrIndex] &= (^(1 << bitIndex)) + return nil +} + +func (b *BitMap) Test(index uint32) (bool, error) { + if index >= b.size { + return false, errIndexOutOfBounds + } + + arrIndex, bitIndex := index / bitLen, index % bitLen + return b.arr[arrIndex] & (1 << bitIndex) != 0, nil +} diff --git a/database/store.go b/database/store.go index c037b320..d218f0b1 100644 --- a/database/store.go +++ b/database/store.go @@ -136,7 +136,7 @@ func (s *Store) GetStoreStatus() *protocol.BlockStoreState { func (s *Store) GetVoteResult(seq uint64) (*state.VoteResult, error) { data := s.db.Get(calcVoteResultKey(seq)) if data == nil { - return nil, errors.New("can't find the vote result by given sequence") + return nil, protocol.ErrNotFoundVoteResult } vr := &state.VoteResult{} @@ -224,13 +224,22 @@ func (s *Store) SaveBlock(block *types.Block, ts *bc.TransactionStatus) error { } // SaveChainStatus save the core's newest status && delete old status -func (s *Store) SaveChainStatus(node *state.BlockNode, view *state.UtxoViewpoint) error { +func (s *Store) SaveChainStatus(node, irreversibleNode *state.BlockNode, view *state.UtxoViewpoint, voteMap map[uint64]*state.VoteResult) error { batch := s.db.NewBatch() if err := saveUtxoView(batch, view); err != nil { return err } - bytes, err := json.Marshal(protocol.BlockStoreState{Height: node.Height, Hash: &node.Hash}) + if err := saveVoteResult(batch, voteMap); err != nil { + return err + } + + bytes, err := json.Marshal(protocol.BlockStoreState{ + Height: node.Height, + Hash: &node.Hash, + IrreversibleHeight: irreversibleNode.Height, + IrreversibleHash: &irreversibleNode.Hash, + }) if err != nil { return err } @@ -240,13 +249,15 @@ func (s *Store) SaveChainStatus(node *state.BlockNode, view *state.UtxoViewpoint return nil } -// SaveVoteResult update the voting results generated by each irreversible block -func (s *Store) SaveVoteResult(vr *state.VoteResult) error { - bytes, err := json.Marshal(vr) - if err != nil { - return err - } +// saveVoteResult update the voting results generated by each irreversible block +func saveVoteResult(batch dbm.Batch, voteMap map[uint64]*state.VoteResult) error { + for _, vote := range voteMap { + bytes, err := json.Marshal(vote) + if err != nil { + return err + } - s.db.Set(calcVoteResultKey(vr.Seq), bytes) + batch.Set(calcVoteResultKey(vote.Seq), bytes) + } return nil } diff --git a/database/store_test.go b/database/store_test.go index ee8c02cd..deffc051 100644 --- a/database/store_test.go +++ b/database/store_test.go @@ -155,11 +155,11 @@ func TestSaveChainStatus(t *testing.T) { }, } - if err := store.SaveChainStatus(node, view); err != nil { + if err := store.SaveChainStatus(node, node, view, map[uint64]*state.VoteResult{}); err != nil { t.Fatal(err) } - expectStatus := &protocol.BlockStoreState{Height: node.Height, Hash: &node.Hash} + expectStatus := &protocol.BlockStoreState{Height: node.Height, Hash: &node.Hash, IrreversibleHeight: node.Height, IrreversibleHash: &node.Hash} if !testutil.DeepEqual(store.GetStoreStatus(), expectStatus) { t.Errorf("got block status:%v, expect block status:%v", store.GetStoreStatus(), expectStatus) } diff --git a/protocol/bbft.go b/protocol/bbft.go new file mode 100644 index 00000000..9315ca89 --- /dev/null +++ b/protocol/bbft.go @@ -0,0 +1,240 @@ +package protocol + +import ( + "encoding/hex" + "time" + + "github.com/vapor/crypto/ed25519" + "github.com/vapor/crypto/ed25519/chainkd" + "github.com/vapor/errors" + "github.com/vapor/math/checked" + "github.com/vapor/protocol/bc/types" + "github.com/vapor/protocol/state" +) + +var ( + errVotingOperationOverFlow = errors.New("voting operation result overflow") +) + +type bbft struct { + consensusNodeManager *consensusNodeManager +} + +func newBbft(store Store, blockIndex *state.BlockIndex) *bbft { + return &bbft{ + consensusNodeManager: newConsensusNodeManager(store, blockIndex), + } +} + +// IsConsensusPubkey determine whether a public key is a consensus node at a specified height +func (b *bbft) IsConsensusPubkey(height uint64, pubkey []byte) (bool, error) { + node, err := b.consensusNodeManager.getConsensusNode(height, hex.EncodeToString(pubkey)) + if err != nil && err != errNotFoundConsensusNode { + return false, err + } + return node != nil, nil +} + +func (b *bbft) isIrreversible(block *types.Block) bool { + signNum, err := b.validateSign(block) + if err != nil { + return false + } + + return signNum > (numOfConsensusNode * 2 / 3) +} + +// NextLeaderTime returns the start time of the specified public key as the next leader node +func (b *bbft) NextLeaderTime(pubkey []byte, bestBlockTimestamp, bestBlockHeight uint64) (*time.Time, error) { + return b.consensusNodeManager.nextLeaderTime(pubkey, bestBlockTimestamp, bestBlockHeight) +} + +func (b *bbft) ApplyBlock(voteResultMap map[uint64]*state.VoteResult, block *types.Block) (err error) { + voteSeq := block.Height / roundVoteBlockNums + voteResult := voteResultMap[voteSeq] + + if voteResult == nil { + store := b.consensusNodeManager.store + voteResult, err = store.GetVoteResult(voteSeq) + if err != nil && err != ErrNotFoundVoteResult { + return err + } + } + + if voteResult == nil { + voteResult = &state.VoteResult{ + Seq: voteSeq, + NumOfVote: make(map[string]uint64), + LastBlockHeight: block.Height, + } + } + + voteResultMap[voteSeq] = voteResult + + if voteResult.LastBlockHeight+1 != block.Height { + return errors.New("bbft append block error, the block height is not equals last block height plus 1 of vote result") + } + + for _, tx := range block.Transactions { + for _, input := range tx.Inputs { + unVoteInput, ok := input.TypedInput.(*types.UnvoteInput) + if !ok { + continue + } + + pubkey := hex.EncodeToString(unVoteInput.Vote) + voteResult.NumOfVote[pubkey], ok = checked.SubUint64(voteResult.NumOfVote[pubkey], unVoteInput.Amount) + if !ok { + return errVotingOperationOverFlow + } + } + for _, output := range tx.Outputs { + voteOutput, ok := output.TypedOutput.(*types.VoteTxOutput) + if !ok { + continue + } + + pubkey := hex.EncodeToString(voteOutput.Vote) + voteResult.NumOfVote[pubkey], ok = checked.AddUint64(voteResult.NumOfVote[pubkey], voteOutput.Amount) + if !ok { + return errVotingOperationOverFlow + } + } + } + + voteResult.LastBlockHeight++ + voteResult.Finalized = (block.Height+1)%roundVoteBlockNums == 0 + return nil +} + +func (b *bbft) DetachBlock(voteResultMap map[uint64]*state.VoteResult, block *types.Block) error { + voteSeq := block.Height / roundVoteBlockNums + voteResult := voteResultMap[voteSeq] + + if voteResult == nil { + store := b.consensusNodeManager.store + voteResult, err := store.GetVoteResult(voteSeq) + if err != nil { + return err + } + voteResultMap[voteSeq] = voteResult + } + + if voteResult.LastBlockHeight != block.Height { + return errors.New("bbft detach block error, the block height is not equals last block height of vote result") + } + + for _, tx := range block.Transactions { + for _, input := range tx.Inputs { + unVoteInput, ok := input.TypedInput.(*types.UnvoteInput) + if !ok { + continue + } + + pubkey := hex.EncodeToString(unVoteInput.Vote) + voteResult.NumOfVote[pubkey], ok = checked.AddUint64(voteResult.NumOfVote[pubkey], unVoteInput.Amount) + if !ok { + return errVotingOperationOverFlow + } + } + for _, output := range tx.Outputs { + voteOutput, ok := output.TypedOutput.(*types.VoteTxOutput) + if !ok { + continue + } + + pubkey := hex.EncodeToString(voteOutput.Vote) + voteResult.NumOfVote[pubkey], ok = checked.SubUint64(voteResult.NumOfVote[pubkey], voteOutput.Amount) + if !ok { + return errVotingOperationOverFlow + } + } + } + + voteResult.LastBlockHeight-- + voteResult.Finalized = false + return nil +} + +// ValidateBlock verify whether the block is valid +func (b *bbft) ValidateBlock(block *types.Block) error { + signNum, err := b.validateSign(block) + if err != nil { + return err + } + + if signNum == 0 { + return errors.New("no valid signature") + } + return nil +} + +// validateSign verify the signatures of block, and return the number of correct signature +// if some signature is invalid, they will be reset to nil +// if the block has not the signature of blocker, it will return error +func (b *bbft) validateSign(block *types.Block) (uint64, error) { + var correctSignNum uint64 + consensusNodeMap, err := b.consensusNodeManager.getConsensusNodesByVoteResult(block.Height) + if err != nil { + return 0, err + } + + hasBlockerSign := false + for pubkey, node := range consensusNodeMap { + if len(block.Witness) <= int(node.order) { + continue + } + + blocks := b.consensusNodeManager.blockIndex.NodesByHeight(block.Height) + for _, b := range blocks { + if b.Hash == block.Hash() { + continue + } + if ok, err := b.BlockWitness.Test(uint32(node.order)); err != nil && ok { + // Consensus node is signed twice with the same block height, discard the signature + block.Witness[node.order] = nil + break + } + } + + if ed25519.Verify(ed25519.PublicKey(pubkey), block.Hash().Bytes(), block.Witness[node.order]) { + correctSignNum++ + isBlocker, err := b.consensusNodeManager.isBlocker(block.Height, block.Timestamp, pubkey) + if err != nil { + return 0, err + } + if isBlocker { + hasBlockerSign = true + } + } else { + // discard the invalid signature + block.Witness[node.order] = nil + } + } + if !hasBlockerSign { + return 0, errors.New("the block has no signature of the blocker") + } + return correctSignNum, nil +} + +// SignBlock signing the block if current node is consensus node +func (b *bbft) SignBlock(block *types.Block) error { + var xprv chainkd.XPrv + xpub := [64]byte(xprv.XPub()) + node, err := b.consensusNodeManager.getConsensusNode(block.Height, hex.EncodeToString(xpub[:])) + if err != nil && err != errNotFoundConsensusNode { + return err + } + + if node == nil { + return nil + } + + block.Witness[node.order] = xprv.Sign(block.Hash().Bytes()) + return nil +} + +// UpdateConsensusNodes used to update consensus node after each round of voting +func (b *bbft) UpdateConsensusNodes(blockHeight uint64) error { + return b.consensusNodeManager.updateConsensusNodes(blockHeight) +} diff --git a/protocol/bbft/bbft.go b/protocol/bbft/bbft.go deleted file mode 100644 index e8a483f1..00000000 --- a/protocol/bbft/bbft.go +++ /dev/null @@ -1,39 +0,0 @@ -package bbft - -import ( - "time" - - "github.com/vapor/crypto/ed25519" - "github.com/vapor/errors" - "github.com/vapor/protocol/bc" - "github.com/vapor/protocol" - "github.com/vapor/database" -) - -type bbft struct { - consensusNodeManager *consensusNodeManager -} - -func newBbft(store *database.Store, chain *protocol.Chain) *bbft { - return &bbft{ - consensusNodeManager: newConsensusNodeManager(store, chain), - } -} - -// IsConsensusPubkey determine whether a public key is a consensus node at a specified height -func (b *bbft) IsConsensusPubkey(height uint64, pubkey []byte) (bool, error) { - return b.consensusNodeManager.isConsensusPubkey(height, pubkey) -} - -// NextLeaderTime returns the start time of the specified public key as the next leader node -func (b *bbft) NextLeaderTime(pubkey []byte) (*time.Time, error) { - return b.consensusNodeManager.nextLeaderTime(pubkey) -} - -// ValidateSign verify the signature of block id -func (b *bbft) ValidateSign(blockID bc.Hash, pubkey, sign []byte) error { - if ok := ed25519.Verify(ed25519.PublicKey(pubkey), blockID.Bytes(), sign); !ok { - return errors.New("validate block signature fail") - } - return nil -} diff --git a/protocol/bbft/consensus_node_manager.go b/protocol/bbft/consensus_node_manager.go deleted file mode 100644 index 8f758412..00000000 --- a/protocol/bbft/consensus_node_manager.go +++ /dev/null @@ -1,170 +0,0 @@ -package bbft - -import ( - "encoding/hex" - "fmt" - "sort" - "sync" - "time" - - "github.com/vapor/database" - "github.com/vapor/errors" - "github.com/vapor/protocol" -) - -const ( - numOfConsensusNode = 21 - roundVoteBlockNums = 1000 - - // product one block per 500 milliseconds - blockTimeInterval = 500 - blockNumEachNode = 3 -) - -var ( - errHasNoChanceProductBlock = errors.New("the node has no chance to product a block in this round of voting") -) - -type consensusNode struct { - pubkey string - voteNum uint64 - order uint64 -} - -type consensusNodeSlice []*consensusNode - -func (c consensusNodeSlice) Len() int { return len(c) } -func (c consensusNodeSlice) Less(i, j int) bool { return c[i].voteNum > c[j].voteNum } -func (c consensusNodeSlice) Swap(i, j int) { c[i], c[j] = c[j], c[i] } - -type consensusNodeManager struct { - consensusNodeMap map[string]*consensusNode - effectiveStartHeight uint64 - store *database.Store - chain *protocol.Chain - sync.RWMutex -} - -func newConsensusNodeManager(store *database.Store, chain *protocol.Chain) *consensusNodeManager { - return &consensusNodeManager{ - consensusNodeMap: make(map[string]*consensusNode), - effectiveStartHeight: 1, - chain: chain, - store: store, - } -} - -func (c *consensusNodeManager) isConsensusPubkey(height uint64, pubkey []byte) (bool, error) { - defer c.RUnlock() - c.RLock() - if height >= c.effectiveStartHeight + roundVoteBlockNums { - return false, errors.New("the vote has not been completed for the specified block height ") - } - - var err error - consensusNodeMap := c.consensusNodeMap - // query history vote result - if height < c.effectiveStartHeight { - consensusNodeMap, err = c.getConsensusNodesByVoteResult(height / roundVoteBlockNums) - if err != nil { - return false, err - } - } - - encodePubkey := hex.EncodeToString(pubkey) - _, exist := consensusNodeMap[encodePubkey] - return exist, nil -} - -func (c *consensusNodeManager) nextLeaderTime(pubkey []byte) (*time.Time, error) { - defer c.RLock() - c.RLock() - - encodePubkey := hex.EncodeToString(pubkey) - consensusNode, ok := c.consensusNodeMap[encodePubkey] - if !ok { - return nil, fmt.Errorf("pubkey:%s is not consensus node", encodePubkey) - } - - startBlockHeight := c.effectiveStartHeight - bestBlockHeight := c.chain.BestBlockHeight() - - prevRoundLastBlock, err := c.chain.GetHeaderByHeight(startBlockHeight - 1) - if err != nil { - return nil, err - } - - startTime := prevRoundLastBlock.Timestamp * 1000 + blockTimeInterval - return nextLeaderTimeHelper(startBlockHeight, bestBlockHeight, startTime, consensusNode.order) -} - -func nextLeaderTimeHelper(startBlockHeight, bestBlockHeight, startTime, nodeOrder uint64) (*time.Time, error) { - endBlockHeight := startBlockHeight + roundVoteBlockNums - // exclude genesis block - if startBlockHeight == 1 { - endBlockHeight-- - } - - roundBlockNums := uint64(blockNumEachNode * numOfConsensusNode) - latestRoundBlockHeight := startBlockHeight + (bestBlockHeight - startBlockHeight) / roundBlockNums * roundBlockNums - nextBlockHeight := latestRoundBlockHeight + blockNumEachNode * nodeOrder - - if int64(bestBlockHeight - nextBlockHeight) >= blockNumEachNode { - nextBlockHeight += roundBlockNums - if nextBlockHeight > endBlockHeight { - return nil, errHasNoChanceProductBlock - } - } - - nextLeaderTimestamp := int64(startTime + (nextBlockHeight - startBlockHeight) * blockTimeInterval) - nextLeaderTime := time.Unix(nextLeaderTimestamp / 1000, (nextLeaderTimestamp % 1000) * 1e6) - return &nextLeaderTime, nil -} - -// UpdateConsensusNodes used to update consensus node after each round of voting -func (c *consensusNodeManager) UpdateConsensusNodes(voteSeq uint64) error { - defer c.Unlock() - c.Lock() - if voteSeq <= c.effectiveStartHeight / roundVoteBlockNums { - return nil - } - - consensusNodeMap, err := c.getConsensusNodesByVoteResult(voteSeq) - if err != nil { - return err - } - - c.consensusNodeMap = consensusNodeMap - c.effectiveStartHeight = voteSeq * roundVoteBlockNums - return nil -} - -func (c *consensusNodeManager) getConsensusNodesByVoteResult(voteSeq uint64) (map[string]*consensusNode, error) { - voteResult, err := c.store.GetVoteResult(voteSeq) - if err != nil { - return nil, err - } - - if !voteResult.Finalized { - return nil, errors.New("vote result is not finalized") - } - - var nodes []*consensusNode - for pubkey, voteNum := range voteResult.NumOfVote { - nodes = append(nodes, &consensusNode{ - pubkey: pubkey, - voteNum: voteNum, - }) - } - // In principle, there is no need to sort all voting nodes. - // if there is a performance problem, consider the optimization later. - // TODO not consider the same number of votes - sort.Sort(consensusNodeSlice(nodes)) - - result := make(map[string]*consensusNode) - for i, node := range nodes { - node.order = uint64(i) - result[node.pubkey] = node - } - return result, nil -} diff --git a/protocol/bbft/consensus_node_manager_test.go b/protocol/bbft_test.go similarity index 68% rename from protocol/bbft/consensus_node_manager_test.go rename to protocol/bbft_test.go index bb534032..7af6f5b9 100644 --- a/protocol/bbft/consensus_node_manager_test.go +++ b/protocol/bbft_test.go @@ -1,4 +1,4 @@ -package bbft +package protocol import ( "testing" @@ -7,80 +7,80 @@ import ( func TestNextLeaderTime(t *testing.T) { cases := []struct { desc string - startBlockHeight uint64 - bestBlockHeight uint64 startTime uint64 + endTime uint64 + now uint64 nodeOrder uint64 wantError error wantNextLeaderTime int64 }{ { desc: "normal case", - startBlockHeight: 1000, - bestBlockHeight: 1500, startTime: 1557906284061, + endTime: 1557906784061, + now: 1557906534061, nodeOrder: 1, wantError: nil, wantNextLeaderTime: 1557906537561, }, { desc: "best block height equals to start block height", - startBlockHeight: 1000, - bestBlockHeight: 1000, startTime: 1557906284061, + endTime: 1557906784061, + now: 1557906284061, nodeOrder: 0, wantError: nil, wantNextLeaderTime: 1557906284061, }, { desc: "best block height equals to start block height", - startBlockHeight: 1000, - bestBlockHeight: 1000, startTime: 1557906284061, + endTime: 1557906784061, + now: 1557906284061, nodeOrder: 1, wantError: nil, - wantNextLeaderTime: 1557906284061 + blockNumEachNode * blockTimeInterval, + wantNextLeaderTime: 1557906284061 + blockNumEachNode*blockTimeInterval, }, { desc: "has no chance product block in this round of voting", - startBlockHeight: 1000, - bestBlockHeight: 1995, startTime: 1557906284061, + endTime: 1557906784061, + now: 1557906781561, nodeOrder: 1, wantError: errHasNoChanceProductBlock, wantNextLeaderTime: 0, }, { desc: "the node is producting block", - startBlockHeight: 1000, - bestBlockHeight: 1001, startTime: 1557906284061, + endTime: 1557906784061, + now: 1557906284561, nodeOrder: 0, wantError: nil, wantNextLeaderTime: 1557906284061, }, { desc: "the node is producting block", - startBlockHeight: 1000, - bestBlockHeight: 1067, startTime: 1557906284061, + endTime: 1557906784061, + now: 1557906317561, nodeOrder: 1, wantError: nil, - wantNextLeaderTime: 1557906284061 + 66 * blockTimeInterval, + wantNextLeaderTime: 1557906284061 + 66*blockTimeInterval, }, { desc: "first round, must exclude genesis block", - startBlockHeight: 1, - bestBlockHeight: 5, startTime: 1557906284061, + endTime: 1557906783561, + now: 1557906286561, nodeOrder: 3, wantError: nil, - wantNextLeaderTime: 1557906284061 + 9 * blockTimeInterval, + wantNextLeaderTime: 1557906284061 + 9*blockTimeInterval, }, } for i, c := range cases { - nextLeaderTime, err := nextLeaderTimeHelper(c.startBlockHeight, c.bestBlockHeight, c.startTime, c.nodeOrder) + nextLeaderTime, err := nextLeaderTimeHelper(c.startTime, c.endTime, c.now, c.nodeOrder) if err != c.wantError { t.Fatalf("case #%d (%s) want error:%v, got error:%v", i, c.desc, c.wantError, err) } diff --git a/protocol/block.go b/protocol/block.go index ad4647b5..983e6cd8 100644 --- a/protocol/block.go +++ b/protocol/block.go @@ -74,6 +74,7 @@ func (c *Chain) calcReorganizeNodes(node *state.BlockNode) ([]*state.BlockNode, } func (c *Chain) connectBlock(block *types.Block) (err error) { + irreversibleNode := c.bestIrreversibleNode bcBlock := types.MapBlock(block) if bcBlock.TransactionStatus, err = c.store.GetTransactionStatus(&bcBlock.ID); err != nil { return err @@ -87,8 +88,17 @@ func (c *Chain) connectBlock(block *types.Block) (err error) { return err } + voteResultMap := make(map[uint64]*state.VoteResult) + if err := c.bbft.ApplyBlock(voteResultMap, block); err != nil { + return err + } + node := c.index.GetNode(&bcBlock.ID) - if err := c.setState(node, utxoView); err != nil { + if c.bbft.isIrreversible(block) && block.Height > irreversibleNode.Height { + irreversibleNode = node + } + + if err := c.setState(node, irreversibleNode, utxoView, voteResultMap); err != nil { return err } @@ -101,13 +111,19 @@ func (c *Chain) connectBlock(block *types.Block) (err error) { func (c *Chain) reorganizeChain(node *state.BlockNode) error { attachNodes, detachNodes := c.calcReorganizeNodes(node) utxoView := state.NewUtxoViewpoint() - + voteResultMap := make(map[uint64]*state.VoteResult) + irreversibleNode := c.bestIrreversibleNode + for _, detachNode := range detachNodes { b, err := c.store.GetBlock(&detachNode.Hash) if err != nil { return err } + if b.Height <= irreversibleNode.Height { + return errors.New("the height of rollback block below the height of irreversible block") + } + detachBlock := types.MapBlock(b) if err := c.store.GetTransactionsUtxo(utxoView, detachBlock.Transactions); err != nil { return err @@ -119,6 +135,10 @@ func (c *Chain) reorganizeChain(node *state.BlockNode) error { if err := utxoView.DetachBlock(detachBlock, txStatus); err != nil { return err } + + if err := c.bbft.DetachBlock(voteResultMap, b); err != nil { + return err + } log.WithFields(log.Fields{"module": logModule, "height": node.Height, "hash": node.Hash.String()}).Debug("detach from mainchain") } @@ -141,20 +161,36 @@ func (c *Chain) reorganizeChain(node *state.BlockNode) error { return err } + if err := c.bbft.ApplyBlock(voteResultMap, b); err != nil { + return err + } + + if c.bbft.isIrreversible(b) && b.Height > irreversibleNode.Height { + irreversibleNode = attachNode + } + log.WithFields(log.Fields{"module": logModule, "height": node.Height, "hash": node.Hash.String()}).Debug("attach from mainchain") } - return c.setState(node, utxoView) + return c.setState(node, irreversibleNode, utxoView, voteResultMap) } // SaveBlock will validate and save block into storage func (c *Chain) saveBlock(block *types.Block) error { - bcBlock := types.MapBlock(block) + if err := c.bbft.ValidateBlock(block); err != nil { + return errors.Sub(ErrBadBlock, err) + } + parent := c.index.GetNode(&block.PreviousBlockHash) + if err := validation.ValidateBlock(types.MapBlock(block), parent); err != nil { + return errors.Sub(ErrBadBlock, err) + } - if err := validation.ValidateBlock(bcBlock, parent); err != nil { + if err := c.bbft.SignBlock(block); err != nil { return errors.Sub(ErrBadBlock, err) } + + bcBlock := types.MapBlock(block) if err := c.store.SaveBlock(block, bcBlock.TransactionStatus); err != nil { return err } @@ -222,13 +258,18 @@ func (c *Chain) blockProcesser() { // ProcessBlock is the entry for handle block insert func (c *Chain) processBlock(block *types.Block) (bool, error) { + if block.Height <= c.bestIrreversibleNode.Height { + return false, errors.New("the height of block below the height of irreversible block") + } + blockHash := block.Hash() if c.BlockExist(&blockHash) { log.WithFields(log.Fields{"module": logModule, "hash": blockHash.String(), "height": block.Height}).Info("block has been processed") return c.orphanManage.BlockExist(&blockHash), nil } - if parent := c.index.GetNode(&block.PreviousBlockHash); parent == nil { + parent := c.index.GetNode(&block.PreviousBlockHash) + if parent == nil { c.orphanManage.Add(block) return true, nil } diff --git a/protocol/consensus_node_manager.go b/protocol/consensus_node_manager.go new file mode 100644 index 00000000..d116eb0e --- /dev/null +++ b/protocol/consensus_node_manager.go @@ -0,0 +1,205 @@ +package protocol + +import ( + "encoding/hex" + "fmt" + "sort" + "sync" + "time" + + "github.com/vapor/errors" + "github.com/vapor/protocol/state" +) + +const ( + numOfConsensusNode = 21 + roundVoteBlockNums = 1000 + + // product one block per 500 milliseconds + blockTimeInterval = 500 + blockNumEachNode = 3 +) + +var ( + errHasNoChanceProductBlock = errors.New("the node has no chance to product a block in this round of voting") + errNotFoundConsensusNode = errors.New("can not found consensus node") + errVoteResultIsNotfinalized = errors.New("vote result is not finalized") +) + +type consensusNode struct { + pubkey string + voteNum uint64 + order uint64 +} + +type consensusNodeSlice []*consensusNode + +func (c consensusNodeSlice) Len() int { return len(c) } +func (c consensusNodeSlice) Less(i, j int) bool { return c[i].voteNum > c[j].voteNum } +func (c consensusNodeSlice) Swap(i, j int) { c[i], c[j] = c[j], c[i] } + +type consensusNodeManager struct { + consensusNodeMap map[string]*consensusNode + effectiveStartHeight uint64 + store Store + blockIndex *state.BlockIndex + sync.RWMutex +} + +func newConsensusNodeManager(store Store, blockIndex *state.BlockIndex) *consensusNodeManager { + return &consensusNodeManager{ + consensusNodeMap: make(map[string]*consensusNode), + effectiveStartHeight: 1, + store: store, + blockIndex: blockIndex, + } +} + +func (c *consensusNodeManager) getConsensusNode(height uint64, pubkey string) (*consensusNode, error) { + defer c.RUnlock() + c.RLock() + if height >= c.effectiveStartHeight+roundVoteBlockNums { + return nil, errors.New("the vote has not been completed for the specified block height ") + } + + var err error + consensusNodeMap := c.consensusNodeMap + // query history vote result + if height < c.effectiveStartHeight { + consensusNodeMap, err = c.getConsensusNodesByVoteResult(height) + if err != nil { + return nil, err + } + } + + node, exist := consensusNodeMap[pubkey] + if !exist { + return node, errNotFoundConsensusNode + } + return node, nil +} + +func (c *consensusNodeManager) isBlocker(height uint64, blockTimestamp uint64, pubkey string) (bool, error) { + prevVoteRoundLastBlock := c.blockIndex.NodeByHeight(height - 1) + startTimestamp := prevVoteRoundLastBlock.Timestamp + blockTimeInterval + + consensusNodeMap, err := c.getConsensusNodesByVoteResult(height) + if err != nil { + return false, err + } + + blockerNode, exist := consensusNodeMap[pubkey] + if !exist { + return false, nil + } + + begin := getLastBlockTimeInTimeRange(startTimestamp, blockTimestamp, blockerNode.order) + end := begin + blockNumEachNode*blockTimeInterval + return blockTimestamp >= begin && blockTimestamp < end, nil +} + +func (c *consensusNodeManager) nextLeaderTime(pubkey []byte, bestBlockTimestamp, bestBlockHeight uint64) (*time.Time, error) { + defer c.RUnlock() + c.RLock() + + startHeight := c.effectiveStartHeight + prevRoundLastBlock := c.blockIndex.NodeByHeight(startHeight - 1) + startTime := prevRoundLastBlock.Timestamp + blockTimeInterval + endTime := bestBlockTimestamp + (roundVoteBlockNums-bestBlockHeight%roundVoteBlockNums)*blockTimeInterval + + consensusNode, exist := c.consensusNodeMap[hex.EncodeToString(pubkey)] + if !exist { + return nil, fmt.Errorf("pubkey:%s is not consensus node", hex.EncodeToString(pubkey)) + } + + nextLeaderTime, err := nextLeaderTimeHelper(startTime, endTime, uint64(time.Now().UnixNano()/1e6), consensusNode.order) + if err != nil { + return nil, err + } + + return nextLeaderTime, nil +} + +func nextLeaderTimeHelper(startTime, endTime, now, nodeOrder uint64) (*time.Time, error) { + nextLeaderTimestamp := getLastBlockTimeInTimeRange(startTime, now, nodeOrder) + roundBlockTime := uint64(blockNumEachNode * numOfConsensusNode * blockTimeInterval) + + if int64(now-nextLeaderTimestamp) >= blockNumEachNode*blockTimeInterval { + nextLeaderTimestamp += roundBlockTime + if nextLeaderTimestamp >= endTime { + return nil, errHasNoChanceProductBlock + } + } + + nextLeaderTime := time.Unix(int64(nextLeaderTimestamp)/1000, (int64(nextLeaderTimestamp)%1000)*1e6) + return &nextLeaderTime, nil +} + +// updateConsensusNodes used to update consensus node after each round of voting +func (c *consensusNodeManager) updateConsensusNodes(bestBlockHeight uint64) error { + defer c.Unlock() + c.Lock() + + consensusNodeMap, err := c.getConsensusNodesByVoteResult(bestBlockHeight) + if err != nil && err != errVoteResultIsNotfinalized { + return err + } + + if err == errVoteResultIsNotfinalized { + return nil + } + + c.consensusNodeMap = consensusNodeMap + c.effectiveStartHeight = bestBlockHeight / roundVoteBlockNums * roundVoteBlockNums + return nil +} + +func getLastBlockTimeInTimeRange(startTimestamp, endTimestamp, order uint64) uint64 { + // One round of product block time for all consensus nodes + roundBlockTime := uint64(blockNumEachNode * numOfConsensusNode * blockTimeInterval) + // The start time of the last round of product block + lastRoundStartTime := startTimestamp + (endTimestamp-startTimestamp)/roundBlockTime*roundBlockTime + // The time of product block of the consensus in last round + return lastRoundStartTime + order*(blockNumEachNode*blockTimeInterval) +} + +func (c *consensusNodeManager) getConsensusNodesByVoteResult(blockHeight uint64) (map[string]*consensusNode, error) { + defer c.RUnlock() + c.RLock() + if blockHeight >= c.effectiveStartHeight+roundVoteBlockNums { + return nil, errors.New("the given block height is greater than current vote start height") + } + + if blockHeight >= c.effectiveStartHeight { + return c.consensusNodeMap, nil + } + + voteResult, err := c.store.GetVoteResult(blockHeight / roundVoteBlockNums) + if err != nil { + return nil, err + } + + if !voteResult.Finalized { + return nil, errVoteResultIsNotfinalized + } + + var nodes []*consensusNode + for pubkey, voteNum := range voteResult.NumOfVote { + nodes = append(nodes, &consensusNode{ + pubkey: pubkey, + voteNum: voteNum, + }) + } + // In principle, there is no need to sort all voting nodes. + // if there is a performance problem, consider the optimization later. + // TODO not consider the same number of votes + sort.Sort(consensusNodeSlice(nodes)) + + result := make(map[string]*consensusNode) + for i := 0; i < numOfConsensusNode; i++ { + node := nodes[i] + node.order = uint64(i) + result[node.pubkey] = node + } + return result, nil +} diff --git a/protocol/protocol.go b/protocol/protocol.go index 9fb2b3d6..3908c72a 100644 --- a/protocol/protocol.go +++ b/protocol/protocol.go @@ -19,10 +19,12 @@ type Chain struct { orphanManage *OrphanManage txPool *TxPool store Store + bbft *bbft processBlockCh chan *processBlockMsg - cond sync.Cond - bestNode *state.BlockNode + cond sync.Cond + bestNode *state.BlockNode + bestIrreversibleNode *state.BlockNode } // NewChain returns a new Chain using store as the underlying storage. @@ -50,6 +52,7 @@ func NewChain(store Store, txPool *TxPool) (*Chain, error) { c.bestNode = c.index.GetNode(storeStatus.Hash) c.index.SetMainChain(c.bestNode) + c.bbft = newBbft(store, c.index) go c.blockProcesser() return c, nil } @@ -77,7 +80,7 @@ func (c *Chain) initChainStatus() error { if err != nil { return err } - return c.store.SaveChainStatus(node, utxoView) + return c.store.SaveChainStatus(node, node, utxoView, map[uint64]*state.VoteResult{}) } // BestBlockHeight returns the current height of the blockchain. @@ -106,16 +109,21 @@ func (c *Chain) InMainChain(hash bc.Hash) bool { } // This function must be called with mu lock in above level -func (c *Chain) setState(node *state.BlockNode, view *state.UtxoViewpoint) error { - if err := c.store.SaveChainStatus(node, view); err != nil { +func (c *Chain) setState(node *state.BlockNode, irreversibleNode *state.BlockNode, view *state.UtxoViewpoint, voteMap map[uint64]*state.VoteResult) error { + if err := c.store.SaveChainStatus(node, irreversibleNode, view, voteMap); err != nil { return err } c.cond.L.Lock() defer c.cond.L.Unlock() + if err := c.bbft.UpdateConsensusNodes(node.Height); err != nil { + return err + } + c.index.SetMainChain(node) c.bestNode = node + c.bestIrreversibleNode = irreversibleNode log.WithFields(log.Fields{"module": logModule, "height": c.bestNode.Height, "hash": c.bestNode.Hash.String()}).Debug("chain best status has been update") c.cond.Broadcast() diff --git a/protocol/state/blockindex.go b/protocol/state/blockindex.go index 3262d8d8..61b11646 100644 --- a/protocol/state/blockindex.go +++ b/protocol/state/blockindex.go @@ -24,6 +24,7 @@ type BlockNode struct { Version uint64 Height uint64 Timestamp uint64 + BlockWitness *common.BitMap TransactionsMerkleRoot bc.Hash TransactionStatusHash bc.Hash } @@ -42,6 +43,13 @@ func NewBlockNode(bh *types.BlockHeader, parent *BlockNode) (*BlockNode, error) TransactionsMerkleRoot: bh.TransactionsMerkleRoot, TransactionStatusHash: bh.TransactionStatusHash, } + + node.BlockWitness = common.NewBitMap(uint32(len(bh.Witness))) + for i, witness := range bh.Witness { + if len(witness) != 0 { + node.BlockWitness.Set(uint32(i)) + } + } return node, nil } @@ -79,14 +87,16 @@ func (node *BlockNode) CalcPastMedianTime() uint64 { type BlockIndex struct { sync.RWMutex - index map[bc.Hash]*BlockNode - mainChain []*BlockNode + index map[bc.Hash]*BlockNode + heightIndex map[uint64][]*BlockNode + mainChain []*BlockNode } // NewBlockIndex will create a empty BlockIndex func NewBlockIndex() *BlockIndex { return &BlockIndex{ index: make(map[bc.Hash]*BlockNode), + heightIndex: make(map[uint64][]*BlockNode), mainChain: make([]*BlockNode, 0, approxNodesPerDay), } } @@ -95,6 +105,7 @@ func NewBlockIndex() *BlockIndex { func (bi *BlockIndex) AddNode(node *BlockNode) { bi.Lock() bi.index[node.Hash] = node + bi.heightIndex[node.Height] = append(bi.heightIndex[node.Height], node) bi.Unlock() } @@ -145,6 +156,13 @@ func (bi *BlockIndex) NodeByHeight(height uint64) *BlockNode { return bi.nodeByHeight(height) } +// NodesByHeight return all block nodes at the specified height. +func (bi *BlockIndex) NodesByHeight(height uint64) []*BlockNode { + bi.RLock() + defer bi.RUnlock() + return bi.heightIndex[height] +} + // SetMainChain will set the the mainChain array func (bi *BlockIndex) SetMainChain(node *BlockNode) { bi.Lock() diff --git a/protocol/state/blockindex_test.go b/protocol/state/blockindex_test.go index 28e2281a..360c0496 100644 --- a/protocol/state/blockindex_test.go +++ b/protocol/state/blockindex_test.go @@ -121,8 +121,9 @@ func TestSetMainChain(t *testing.T) { // MockBlockIndex will mock a empty BlockIndex func MockBlockIndex() *BlockIndex { return &BlockIndex{ - index: make(map[bc.Hash]*BlockNode), - mainChain: make([]*BlockNode, 0, 2), + index: make(map[bc.Hash]*BlockNode), + heightIndex: make(map[uint64][]*BlockNode), + mainChain: make([]*BlockNode, 0, 2), } } diff --git a/protocol/store.go b/protocol/store.go index c1e2d70b..68544185 100644 --- a/protocol/store.go +++ b/protocol/store.go @@ -1,12 +1,18 @@ package protocol import ( + "errors" + "github.com/vapor/database/storage" "github.com/vapor/protocol/bc" "github.com/vapor/protocol/bc/types" "github.com/vapor/protocol/state" ) +var ( + ErrNotFoundVoteResult = errors.New("can't find the vote result by given sequence") +) + // Store provides storage interface for blockchain data type Store interface { BlockExist(*bc.Hash) bool @@ -16,14 +22,17 @@ type Store interface { GetTransactionStatus(*bc.Hash) (*bc.TransactionStatus, error) GetTransactionsUtxo(*state.UtxoViewpoint, []*bc.Tx) error GetUtxo(*bc.Hash) (*storage.UtxoEntry, error) + GetVoteResult(uint64) (*state.VoteResult, error) LoadBlockIndex(uint64) (*state.BlockIndex, error) SaveBlock(*types.Block, *bc.TransactionStatus) error - SaveChainStatus(*state.BlockNode, *state.UtxoViewpoint) error + SaveChainStatus(*state.BlockNode, *state.BlockNode, *state.UtxoViewpoint, map[uint64]*state.VoteResult) error } // BlockStoreState represents the core's db status type BlockStoreState struct { - Height uint64 - Hash *bc.Hash + Height uint64 + Hash *bc.Hash + IrreversibleHeight uint64 + IrreversibleHash *bc.Hash } diff --git a/protocol/txpool_test.go b/protocol/txpool_test.go index 05eff7f4..ba1f672c 100644 --- a/protocol/txpool_test.go +++ b/protocol/txpool_test.go @@ -117,9 +117,10 @@ func (s *mockStore) GetStoreStatus() *BlockStoreState func (s *mockStore) GetTransactionStatus(*bc.Hash) (*bc.TransactionStatus, error) { return nil, nil } func (s *mockStore) GetTransactionsUtxo(*state.UtxoViewpoint, []*bc.Tx) error { return nil } func (s *mockStore) GetUtxo(*bc.Hash) (*storage.UtxoEntry, error) { return nil, nil } +func (s *mockStore) GetVoteResult(uint64) (*state.VoteResult, error) { return nil, nil } func (s *mockStore) LoadBlockIndex(uint64) (*state.BlockIndex, error) { return nil, nil } func (s *mockStore) SaveBlock(*types.Block, *bc.TransactionStatus) error { return nil } -func (s *mockStore) SaveChainStatus(*state.BlockNode, *state.UtxoViewpoint) error { return nil } +func (s *mockStore) SaveChainStatus(*state.BlockNode, *state.BlockNode, *state.UtxoViewpoint, map[uint64]*state.VoteResult) error { return nil } func TestAddOrphan(t *testing.T) { cases := []struct { @@ -661,9 +662,10 @@ func (s *mockStore1) GetTransactionsUtxo(utxoView *state.UtxoViewpoint, tx []*bc return nil } func (s *mockStore1) GetUtxo(*bc.Hash) (*storage.UtxoEntry, error) { return nil, nil } +func (s *mockStore1) GetVoteResult(uint64) (*state.VoteResult, error) { return nil, nil } func (s *mockStore1) LoadBlockIndex(uint64) (*state.BlockIndex, error) { return nil, nil } func (s *mockStore1) SaveBlock(*types.Block, *bc.TransactionStatus) error { return nil } -func (s *mockStore1) SaveChainStatus(*state.BlockNode, *state.UtxoViewpoint) error { return nil } +func (s *mockStore1) SaveChainStatus(*state.BlockNode, *state.BlockNode, *state.UtxoViewpoint, map[uint64]*state.VoteResult) error { return nil } func TestProcessTransaction(t *testing.T) { txPool := &TxPool{ diff --git a/protocol/validation/tx.go b/protocol/validation/tx.go index 9225eb2f..a7488555 100644 --- a/protocol/validation/tx.go +++ b/protocol/validation/tx.go @@ -228,7 +228,7 @@ func checkValid(vs *validationState, e bc.Entry) (err error) { vs2 := *vs vs2.sourcePos = 0 if err = checkValidSrc(&vs2, e.Source); err != nil { - return errors.Wrap(err, "checking output source") + return errors.Wrap(err, "checking vote output source") } case *bc.Retirement: diff --git a/test/utxo_view/utxo_view_test.go b/test/utxo_view/utxo_view_test.go index 017ade8b..77d7f361 100644 --- a/test/utxo_view/utxo_view_test.go +++ b/test/utxo_view/utxo_view_test.go @@ -414,7 +414,7 @@ func TestAttachOrDetachBlocks(t *testing.T) { for k, v := range c.before { utxoViewpoint.Entries[k] = v } - if err := store.SaveChainStatus(node, utxoViewpoint); err != nil { + if err := store.SaveChainStatus(node, node, utxoViewpoint, map[uint64]*state.VoteResult{}); err != nil { t.Error(err) } @@ -436,7 +436,7 @@ func TestAttachOrDetachBlocks(t *testing.T) { t.Error(err) } } - if err := store.SaveChainStatus(node, utxoViewpoint); err != nil { + if err := store.SaveChainStatus(node, node, utxoViewpoint, map[uint64]*state.VoteResult{}); err != nil { t.Error(err) } -- 2.11.0