From d01a1668d0ba642accd74d1bda7a4e280c3e202e Mon Sep 17 00:00:00 2001 From: paladz <453256728@qq.com> Date: Thu, 21 Nov 2019 15:56:46 +0800 Subject: [PATCH] set proposal have timeout --- application/mov/mov_core.go | 6 +- proposal/blockproposer/blockproposer.go | 6 +- proposal/proposal.go | 303 +++++++++++++++++--------------- protocol/protocol.go | 4 +- protocol/validation/tx.go | 6 +- test/bench_blockchain_test.go | 2 +- test/performance/mining_test.go | 2 +- 7 files changed, 179 insertions(+), 150 deletions(-) diff --git a/application/mov/mov_core.go b/application/mov/mov_core.go index 3a4061b8..51236622 100644 --- a/application/mov/mov_core.go +++ b/application/mov/mov_core.go @@ -76,7 +76,7 @@ func (m *MovCore) ApplyBlock(block *types.Block) error { become an infinite loop and DDoS attacks the whole network? */ // BeforeProposalBlock return all transactions than can be matched, and the number of transactions cannot exceed the given capacity. -func (m *MovCore) BeforeProposalBlock(txs []*types.Tx, nodeProgram []byte, blockHeight uint64, gasLeft int64) ([]*types.Tx, int64, error) { +func (m *MovCore) BeforeProposalBlock(txs []*types.Tx, nodeProgram []byte, blockHeight uint64, gasLeft int64, isTimeout func() bool) ([]*types.Tx, int64, error) { if blockHeight <= m.startBlockHeight { return nil, 0, nil } @@ -91,7 +91,7 @@ func (m *MovCore) BeforeProposalBlock(txs []*types.Tx, nodeProgram []byte, block tradePairIterator := database.NewTradePairIterator(m.movStore) var packagedTxs []*types.Tx - for gasLeft > 0 && tradePairIterator.HasNext() { + for gasLeft > 0 && !isTimeout() && tradePairIterator.HasNext() { tradePair := tradePairIterator.Next() if tradePairMap[tradePair.Key()] { continue @@ -99,7 +99,7 @@ func (m *MovCore) BeforeProposalBlock(txs []*types.Tx, nodeProgram []byte, block tradePairMap[tradePair.Key()] = true tradePairMap[tradePair.Reverse().Key()] = true - for gasLeft > 0 && matchEngine.HasMatchedTx(tradePair, tradePair.Reverse()) { + for gasLeft > 0 && !isTimeout() && matchEngine.HasMatchedTx(tradePair, tradePair.Reverse()) { matchedTx, err := matchEngine.NextMatchedTx(tradePair, tradePair.Reverse()) if err != nil { return nil, 0, err diff --git a/proposal/blockproposer/blockproposer.go b/proposal/blockproposer/blockproposer.go index 7cfe24eb..7b7c0c46 100644 --- a/proposal/blockproposer/blockproposer.go +++ b/proposal/blockproposer/blockproposer.go @@ -16,7 +16,8 @@ import ( ) const ( - logModule = "blockproposer" + logModule = "blockproposer" + timeProportionDenominator = 3 ) // BlockProposer propose several block in specified time range @@ -74,7 +75,8 @@ func (b *BlockProposer) generateBlocks() { continue } - block, err := proposal.NewBlockTemplate(b.chain, b.accountManager, nextBlockTime) + timeoutDuration := time.Duration(consensus.ActiveNetParams.BlockTimeInterval/timeProportionDenominator) * time.Millisecond + block, err := proposal.NewBlockTemplate(b.chain, b.accountManager, nextBlockTime, timeoutDuration) if err != nil { log.WithFields(log.Fields{"module": logModule, "error": err}).Error("failed on create NewBlockTemplate") continue diff --git a/proposal/proposal.go b/proposal/proposal.go index b620465a..c2569fe0 100644 --- a/proposal/proposal.go +++ b/proposal/proposal.go @@ -20,25 +20,176 @@ import ( ) const ( - logModule = "mining" + logModule = "mining" + batchApplyNum = 64 ) +type blockBuilder struct { + chain *protocol.Chain + accountManager *account.Manager + + block *types.Block + txStatus *bc.TransactionStatus + utxoView *state.UtxoViewpoint + + timeoutCh <-chan time.Time + gasLeft int64 + timeoutFlag bool +} + +func newBlockBuilder(chain *protocol.Chain, accountManager *account.Manager, timestamp uint64, timeoutDuration time.Duration) *blockBuilder { + preBlockHeader := chain.BestBlockHeader() + block := &types.Block{ + BlockHeader: types.BlockHeader{ + Version: 1, + Height: preBlockHeader.Height + 1, + PreviousBlockHash: preBlockHeader.Hash(), + Timestamp: timestamp, + BlockCommitment: types.BlockCommitment{}, + BlockWitness: types.BlockWitness{Witness: make([][]byte, consensus.ActiveNetParams.NumOfConsensusNode)}, + }, + } + + builder := &blockBuilder{ + chain: chain, + accountManager: accountManager, + block: block, + txStatus: bc.NewTransactionStatus(), + utxoView: state.NewUtxoViewpoint(), + timeoutCh: time.After(timeoutDuration), + gasLeft: int64(consensus.ActiveNetParams.MaxBlockGas), + } + return builder +} + +func (b *blockBuilder) applyCoinbaseTransaction() error { + coinbaseTx, err := b.createCoinbaseTx() + if err != nil { + return errors.Wrap(err, "fail on create coinbase tx") + } + + gasState, err := validation.ValidateTx(coinbaseTx.Tx, &bc.Block{BlockHeader: &bc.BlockHeader{Height: b.block.Height}, Transactions: []*bc.Tx{coinbaseTx.Tx}}) + if err != nil { + return err + } + + b.block.Transactions = append(b.block.Transactions, coinbaseTx) + if err := b.txStatus.SetStatus(0, false); err != nil { + return err + } + + b.gasLeft -= gasState.GasUsed + return nil +} +func (b *blockBuilder) applyTransactions(txs []*types.Tx) error { + tempTxs := []*types.Tx{} + for i := 0; i < len(txs); i++ { + if tempTxs = append(tempTxs, txs[i]); len(tempTxs) < batchApplyNum && i != len(txs)-1 { + continue + } + + results, gasLeft := preValidateTxs(tempTxs, b.chain, b.utxoView, b.gasLeft) + for _, result := range results { + if result.err != nil && !result.gasOnly { + blkGenSkipTxForErr(b.chain.GetTxPool(), &result.tx.ID, result.err) + continue + } + + if err := b.txStatus.SetStatus(len(b.block.Transactions), result.gasOnly); err != nil { + return err + } + + b.block.Transactions = append(b.block.Transactions, result.tx) + } + + b.gasLeft = gasLeft + tempTxs = []*types.Tx{} + if b.isTimeout() { + break + } + } + return nil +} + +func (b *blockBuilder) applyTransactionFromPool() error { + txDescList := b.chain.GetTxPool().GetTransactions() + sort.Sort(byTime(txDescList)) + + poolTxs := make([]*types.Tx, len(txDescList)) + for i, txDesc := range txDescList { + poolTxs[i] = txDesc.Tx + } + + return b.applyTransactions(poolTxs) +} + +func (b *blockBuilder) applyTransactionFromSubProtocol() error { + cp, err := b.accountManager.GetCoinbaseControlProgram() + if err != nil { + return err + } + + for i, p := range b.chain.SubProtocols() { + if b.gasLeft <= 0 || b.isTimeout() { + break + } + + subTxs, _, err := p.BeforeProposalBlock(b.block.Transactions, cp, b.block.Height, b.gasLeft, b.isTimeout) + if err != nil { + log.WithFields(log.Fields{"module": logModule, "index": i, "error": err}).Error("failed on sub protocol txs package") + continue + } + + if err := b.applyTransactions(subTxs); err != nil { + return err + } + } + return nil +} + +func (b *blockBuilder) calcBlockCommitment() (err error) { + var txEntries []*bc.Tx + for _, tx := range b.block.Transactions { + txEntries = append(txEntries, tx.Tx) + } + + b.block.BlockHeader.BlockCommitment.TransactionsMerkleRoot, err = types.TxMerkleRoot(txEntries) + if err != nil { + return err + } + + b.block.BlockHeader.BlockCommitment.TransactionStatusHash, err = types.TxStatusMerkleRoot(b.txStatus.VerifyStatus) + return err +} + // createCoinbaseTx returns a coinbase transaction paying an appropriate subsidy // based on the passed block height to the provided address. When the address // is nil, the coinbase transaction will instead be redeemable by anyone. -func createCoinbaseTx(accountManager *account.Manager, chain *protocol.Chain, preBlockHeader *types.BlockHeader) (tx *types.Tx, err error) { - preBlockHash := preBlockHeader.Hash() - consensusResult, err := chain.GetConsensusResultByHash(&preBlockHash) +func (b *blockBuilder) createCoinbaseTx() (*types.Tx, error) { + consensusResult, err := b.chain.GetConsensusResultByHash(&b.block.PreviousBlockHash) if err != nil { return nil, err } - rewards, err := consensusResult.GetCoinbaseRewards(preBlockHeader.Height) + rewards, err := consensusResult.GetCoinbaseRewards(b.block.Height - 1) if err != nil { return nil, err } - return createCoinbaseTxByReward(accountManager, preBlockHeader.Height + 1, rewards) + return createCoinbaseTxByReward(b.accountManager, b.block.Height, rewards) +} + +func (b *blockBuilder) isTimeout() bool { + if b.timeoutFlag { + return true + } + + select { + case <-b.timeoutCh: + b.timeoutFlag = true + default: + } + return b.timeoutFlag } func createCoinbaseTxByReward(accountManager *account.Manager, blockHeight uint64, rewards []state.CoinbaseReward) (tx *types.Tx, err error) { @@ -91,113 +242,26 @@ func createCoinbaseTxByReward(accountManager *account.Manager, blockHeight uint6 } // NewBlockTemplate returns a new block template that is ready to be solved -func NewBlockTemplate(chain *protocol.Chain, accountManager *account.Manager, timestamp uint64) (*types.Block, error) { - block := createBasicBlock(chain, timestamp) - - view := state.NewUtxoViewpoint() - txStatus := bc.NewTransactionStatus() - - gasLeft, err := applyCoinbaseTransaction(chain, block, txStatus, accountManager, int64(consensus.ActiveNetParams.MaxBlockGas)) - if err != nil { +func NewBlockTemplate(chain *protocol.Chain, accountManager *account.Manager, timestamp uint64, timeoutDuration time.Duration) (*types.Block, error) { + builder := newBlockBuilder(chain, accountManager, timestamp, timeoutDuration) + if err := builder.applyCoinbaseTransaction(); err != nil { return nil, err } - gasLeft, err = applyTransactionFromPool(chain, view, block, txStatus, gasLeft) - if err != nil { - return nil, err - } - - if err := applyTransactionFromSubProtocol(chain, view, block, txStatus, accountManager, gasLeft); err != nil { + if err := builder.applyTransactionFromPool(); err != nil { return nil, err } - var txEntries []*bc.Tx - for _, tx := range block.Transactions { - txEntries = append(txEntries, tx.Tx) - } - - block.BlockHeader.BlockCommitment.TransactionsMerkleRoot, err = types.TxMerkleRoot(txEntries) - if err != nil { + if err := builder.applyTransactionFromSubProtocol(); err != nil { return nil, err } - block.BlockHeader.BlockCommitment.TransactionStatusHash, err = types.TxStatusMerkleRoot(txStatus.VerifyStatus) - - _, err = chain.SignBlock(block) - return block, err -} - -func createBasicBlock(chain *protocol.Chain, timestamp uint64) *types.Block { - preBlockHeader := chain.BestBlockHeader() - return &types.Block{ - BlockHeader: types.BlockHeader{ - Version: 1, - Height: preBlockHeader.Height + 1, - PreviousBlockHash: preBlockHeader.Hash(), - Timestamp: timestamp, - BlockCommitment: types.BlockCommitment{}, - BlockWitness: types.BlockWitness{Witness: make([][]byte, consensus.ActiveNetParams.NumOfConsensusNode)}, - }, - } -} - -func applyCoinbaseTransaction(chain *protocol.Chain, block *types.Block, txStatus *bc.TransactionStatus, accountManager *account.Manager, gasLeft int64) (int64, error) { - coinbaseTx, err := createCoinbaseTx(accountManager, chain, chain.BestBlockHeader()) - if err != nil { - return 0, errors.Wrap(err, "fail on create coinbase tx") - } - - gasState, err := validation.ValidateTx(coinbaseTx.Tx, &bc.Block{BlockHeader: &bc.BlockHeader{Height: chain.BestBlockHeight() + 1}, Transactions: []*bc.Tx{coinbaseTx.Tx}}) - if err != nil { - return 0, err - } - - block.Transactions = append(block.Transactions, coinbaseTx) - if err := txStatus.SetStatus(0, false); err != nil { - return 0, err - } - - return gasLeft - gasState.GasUsed, nil -} - - -func applyTransactionFromPool(chain *protocol.Chain, view *state.UtxoViewpoint, block *types.Block, txStatus *bc.TransactionStatus, gasLeft int64) (int64, error) { - poolTxs := getAllTxsFromPool(chain.GetTxPool()) - results, gasLeft := preValidateTxs(poolTxs, chain, view, gasLeft) - for _, result := range results { - if result.err != nil && !result.gasOnly { - blkGenSkipTxForErr(chain.GetTxPool(), &result.tx.ID, result.err) - continue - } - - if err := txStatus.SetStatus(len(block.Transactions), result.gasOnly); err != nil { - return 0, err - } - - block.Transactions = append(block.Transactions, result.tx) - } - return gasLeft, nil -} - -func applyTransactionFromSubProtocol(chain *protocol.Chain, view *state.UtxoViewpoint, block *types.Block, txStatus *bc.TransactionStatus, accountManager *account.Manager, gasLeft int64) error { - txs, err := getTxsFromSubProtocols(chain, accountManager, block.Transactions, gasLeft) - if err != nil { - return err + if err := builder.calcBlockCommitment(); err != nil { + return nil, err } - results, gasLeft := preValidateTxs(txs, chain, view, gasLeft) - for _, result := range results { - if result.err != nil { - return err - } - - if err := txStatus.SetStatus(len(block.Transactions), result.gasOnly); err != nil { - return err - } - - block.Transactions = append(block.Transactions, result.tx) - } - return nil + _, err := builder.chain.SignBlock(builder.block) + return builder.block, err } type validateTxResult struct { @@ -262,41 +326,6 @@ func validateBySubProtocols(tx *types.Tx, statusFail bool, subProtocols []protoc return nil } -func getAllTxsFromPool(txPool *protocol.TxPool) []*types.Tx { - txDescList := txPool.GetTransactions() - sort.Sort(byTime(txDescList)) - - poolTxs := make([]*types.Tx, len(txDescList)) - for i, txDesc := range txDescList { - poolTxs[i] = txDesc.Tx - } - return poolTxs -} - -func getTxsFromSubProtocols(chain *protocol.Chain, accountManager *account.Manager, poolTxs []*types.Tx, gasLeft int64) ([]*types.Tx, error) { - cp, err := accountManager.GetCoinbaseControlProgram() - if err != nil { - return nil, err - } - - var result []*types.Tx - var subTxs []*types.Tx - for i, p := range chain.SubProtocols() { - if gasLeft <= 0 { - break - } - - subTxs, gasLeft, err = p.BeforeProposalBlock(poolTxs, cp, chain.BestBlockHeight() + 1, gasLeft) - if err != nil { - log.WithFields(log.Fields{"module": logModule, "index": i, "error": err}).Error("failed on sub protocol txs package") - continue - } - - result = append(result, subTxs...) - } - return result, nil -} - func blkGenSkipTxForErr(txPool *protocol.TxPool, txHash *bc.Hash, err error) { log.WithFields(log.Fields{"module": logModule, "error": err}).Error("mining block generation: skip tx due to") txPool.RemoveTransaction(txHash) diff --git a/protocol/protocol.go b/protocol/protocol.go index 095a588a..f4b6706b 100644 --- a/protocol/protocol.go +++ b/protocol/protocol.go @@ -22,7 +22,7 @@ const ( type Protocoler interface { Name() string StartHeight() uint64 - BeforeProposalBlock(txs []*types.Tx, nodeProgram []byte, blockHeight uint64, gasLeft int64) ([]*types.Tx, int64, error) + BeforeProposalBlock(txs []*types.Tx, nodeProgram []byte, blockHeight uint64, gasLeft int64, isTimeout func() bool) ([]*types.Tx, int64, error) ChainStatus() (uint64, *bc.Hash, error) ValidateBlock(block *types.Block, verifyResults []*bc.TxVerifyResult) error ValidateTxs(txs []*types.Tx, verifyResults []*bc.TxVerifyResult) error @@ -238,7 +238,7 @@ func (c *Chain) syncProtocolStatus(subProtocol Protocoler) error { return errors.Wrap(err, subProtocol.Name(), "sub protocol detach block err") } - protocolHeight, protocolHash = block.Height -1, &block.PreviousBlockHash + protocolHeight, protocolHash = block.Height-1, &block.PreviousBlockHash } for height := protocolHeight + 1; height <= c.BestBlockHeight(); height++ { diff --git a/protocol/validation/tx.go b/protocol/validation/tx.go index 392c46e4..af58cba8 100644 --- a/protocol/validation/tx.go +++ b/protocol/validation/tx.go @@ -3,6 +3,7 @@ package validation import ( "fmt" "math" + "runtime" "sync" "github.com/vapor/common" @@ -14,10 +15,6 @@ import ( "github.com/vapor/protocol/vm" ) -const ( - validateWorkerNum = 32 -) - // validate transaction error var ( ErrTxVersion = errors.New("invalid transaction version") @@ -665,6 +662,7 @@ func validateTxWorker(workCh chan *validateTxWork, resultCh chan *ValidateTxResu // ValidateTxs validates txs in async mode func ValidateTxs(txs []*bc.Tx, block *bc.Block) []*ValidateTxResult { txSize := len(txs) + validateWorkerNum := runtime.NumCPU() //init the goroutine validate worker var wg sync.WaitGroup workCh := make(chan *validateTxWork, txSize) diff --git a/test/bench_blockchain_test.go b/test/bench_blockchain_test.go index 40365d0b..2d24dc7a 100644 --- a/test/bench_blockchain_test.go +++ b/test/bench_blockchain_test.go @@ -159,7 +159,7 @@ func InsertChain(chain *protocol.Chain, txPool *protocol.TxPool, txs []*types.Tx } } - block, err := proposal.NewBlockTemplate(chain, nil, uint64(time.Now().UnixNano()/1e6)) + block, err := proposal.NewBlockTemplate(chain, nil, uint64(time.Now().UnixNano()/1e6), time.Minute) if err != nil { return err } diff --git a/test/performance/mining_test.go b/test/performance/mining_test.go index b8cbfefa..72cb004e 100644 --- a/test/performance/mining_test.go +++ b/test/performance/mining_test.go @@ -26,6 +26,6 @@ func BenchmarkNewBlockTpl(b *testing.B) { b.ResetTimer() for i := 0; i < b.N; i++ { - proposal.NewBlockTemplate(chain, accountManager, uint64(time.Now().UnixNano()/1e6)) + proposal.NewBlockTemplate(chain, accountManager, uint64(time.Now().UnixNano()/1e6), time.Minute) } } -- 2.11.0