From: Paladz Date: Mon, 3 Jun 2019 01:46:56 +0000 (+0800) Subject: Fix mining (#113) X-Git-Tag: v1.0.5~208^2~79 X-Git-Url: http://git.osdn.net/view?p=bytom%2Fvapor.git;a=commitdiff_plain;h=e0bb6e8ab045e0449a652a9b924b815f59c34e63 Fix mining (#113) * delete the unused stuff * edit the mining part * fix small bug --- diff --git a/proposal/blockproposer/blockproposer.go b/proposal/blockproposer/blockproposer.go index 3ef5d213..70c93ecb 100644 --- a/proposal/blockproposer/blockproposer.go +++ b/proposal/blockproposer/blockproposer.go @@ -15,23 +15,18 @@ import ( ) const ( - defaultNumWorkers = 1 - logModule = "blockproposer" + logModule = "blockproposer" ) // BlockProposer propose several block in specified time range type BlockProposer struct { sync.Mutex - chain *protocol.Chain - accountManager *account.Manager - txPool *protocol.TxPool - numWorkers uint64 - started bool - discreteMining bool - workerWg sync.WaitGroup - updateNumWorkers chan struct{} - quit chan struct{} - eventDispatcher *event.Dispatcher + chain *protocol.Chain + accountManager *account.Manager + txPool *protocol.TxPool + started bool + quit chan struct{} + eventDispatcher *event.Dispatcher } // generateBlocks is a worker that is controlled by the proposeWorkerController. @@ -41,118 +36,49 @@ type BlockProposer struct { // is submitted. // // It must be run as a goroutine. -func (b *BlockProposer) generateBlocks(quit chan struct{}) { - ticker := time.NewTicker(time.Millisecond * 100) +func (b *BlockProposer) generateBlocks() { + xpub := config.CommonConfig.PrivateKey().XPub() + xpubStr := hex.EncodeToString(xpub[:]) + ticker := time.NewTicker(time.Millisecond * 500) defer ticker.Stop() -out: + for { select { - case <-quit: - break out + case <-b.quit: + return case <-ticker.C: } bestBlockHeader := b.chain.BestBlockHeader() bestBlockHash := bestBlockHeader.Hash() - pubKey := config.CommonConfig.PrivateKey().XPub() - timeStart, timeEnd, err := b.chain.GetBBFT().NextLeaderTimeRange(pubKey[:], &bestBlockHash) - if err != nil { - log.WithFields(log.Fields{"module": logModule, "error": err, "pubKey": hex.EncodeToString(pubKey[:])}).Debug("fail on get next leader time range") - continue + nextBlockTime := uint64(time.Now().UnixNano() / 1e6) + if minNextBlockTime := bestBlockHeader.Timestamp + uint64(500*time.Millisecond); nextBlockTime < minNextBlockTime { + nextBlockTime = minNextBlockTime } - now := uint64(time.Now().UnixNano() / 1e6) - if timeStart < now { - timeStart = now + if isBlocker, err := b.chain.GetBBFT().IsBlocker(&bestBlockHash, xpubStr, nextBlockTime); !isBlocker { + log.WithFields(log.Fields{"module": logModule, "error": err, "pubKey": xpubStr}).Debug("fail on check is next blocker") + continue } - time.Sleep(time.Millisecond * time.Duration(timeStart-now)) - - count := 0 - for now = timeStart; now < timeEnd && count < protocol.BlockNumEachNode; now = uint64(time.Now().UnixNano() / 1e6) { - block, err := proposal.NewBlockTemplate(b.chain, b.txPool, b.accountManager, now) - if err != nil { - log.Errorf("failed on create NewBlockTemplate: %v", err) - } else { - if isOrphan, err := b.chain.ProcessBlock(block); err == nil { - log.WithFields(log.Fields{ - "module": logModule, - "height": block.BlockHeader.Height, - "isOrphan": isOrphan, - "tx": len(block.Transactions), - }).Info("Proposer processed block") - - // Broadcast the block and announce chain insertion event - if err = b.eventDispatcher.Post(event.NewProposedBlockEvent{Block: *block}); err != nil { - log.WithFields(log.Fields{"module": logModule, "height": block.BlockHeader.Height, "error": err}).Errorf("Proposer fail on post block") - } - count++ - } else { - log.WithFields(log.Fields{"module": logModule, "height": block.BlockHeader.Height, "error": err}).Errorf("Proposer fail on ProcessBlock") - } - } + block, err := proposal.NewBlockTemplate(b.chain, b.txPool, b.accountManager, nextBlockTime) + if err != nil { + log.WithFields(log.Fields{"module": logModule, "error": err}).Error("failed on create NewBlockTemplate") + continue } - } - - b.workerWg.Done() -} -// proposeWorkerController launches the worker goroutines that are used to -// generate block templates. It also provides the ability to -// dynamically adjust the number of running worker goroutines. -// -// It must be run as a goroutine. -func (b *BlockProposer) proposeWorkerController() { - // launchWorkers groups common code to launch a specified number of - // workers for generating blocks. - var runningWorkers []chan struct{} - launchWorkers := func(numWorkers uint64) { - for i := uint64(0); i < numWorkers; i++ { - quit := make(chan struct{}) - runningWorkers = append(runningWorkers, quit) - - b.workerWg.Add(1) - go b.generateBlocks(quit) + isOrphan, err := b.chain.ProcessBlock(block) + if err != nil { + log.WithFields(log.Fields{"module": logModule, "height": block.BlockHeader.Height, "error": err}).Error("proposer fail on ProcessBlock") + continue } - } - - // Launch the current number of workers by default. - runningWorkers = make([]chan struct{}, 0, b.numWorkers) - launchWorkers(b.numWorkers) - -out: - for { - select { - // Update the number of running workers. - case <-b.updateNumWorkers: - // No change. - numRunning := uint64(len(runningWorkers)) - if b.numWorkers == numRunning { - continue - } - - // Add new workers. - if b.numWorkers > numRunning { - launchWorkers(b.numWorkers - numRunning) - continue - } - // Signal the most recently created goroutines to exit. - for i := numRunning - 1; i >= b.numWorkers; i-- { - close(runningWorkers[i]) - runningWorkers[i] = nil - runningWorkers = runningWorkers[:i] - } - - case <-b.quit: - for _, quit := range runningWorkers { - close(quit) - } - break out + log.WithFields(log.Fields{"module": logModule, "height": block.BlockHeader.Height, "isOrphan": isOrphan, "tx": len(block.Transactions)}).Info("proposer processed block") + // Broadcast the block and announce chain insertion event + if err = b.eventDispatcher.Post(event.NewProposedBlockEvent{Block: *block}); err != nil { + log.WithFields(log.Fields{"module": logModule, "height": block.BlockHeader.Height, "error": err}).Error("proposer fail on post block") } } - - b.workerWg.Wait() } // Start begins the block propose process as well as the speed monitor used to @@ -170,7 +96,7 @@ func (b *BlockProposer) Start() { } b.quit = make(chan struct{}) - go b.proposeWorkerController() + go b.generateBlocks() b.started = true log.Infof("block proposer started") @@ -205,56 +131,14 @@ func (b *BlockProposer) IsProposing() bool { return b.started } -// SetNumWorkers sets the number of workers to create which solve blocks. Any -// negative values will cause a default number of workers to be used which is -// based on the number of processor cores in the system. A value of 0 will -// cause all block proposer to be stopped. -// -// This function is safe for concurrent access. -func (b *BlockProposer) SetNumWorkers(numWorkers int32) { - if numWorkers == 0 { - b.Stop() - } - - // Don't lock until after the first check since Stop does its own - // locking. - b.Lock() - defer b.Unlock() - - // Use default if provided value is negative. - if numWorkers < 0 { - b.numWorkers = defaultNumWorkers - } else { - b.numWorkers = uint64(numWorkers) - } - - // When the proposer is already running, notify the controller about the - // the change. - if b.started { - b.updateNumWorkers <- struct{}{} - } -} - -// NumWorkers returns the number of workers which are running to solve blocks. -// -// This function is safe for concurrent access. -func (b *BlockProposer) NumWorkers() int32 { - b.Lock() - defer b.Unlock() - - return int32(b.numWorkers) -} - // NewBlockProposer returns a new instance of a block proposer for the provided configuration. // Use Start to begin the proposal process. See the documentation for BlockProposer // type for more details. func NewBlockProposer(c *protocol.Chain, accountManager *account.Manager, txPool *protocol.TxPool, dispatcher *event.Dispatcher) *BlockProposer { return &BlockProposer{ - chain: c, - accountManager: accountManager, - txPool: txPool, - numWorkers: defaultNumWorkers, - updateNumWorkers: make(chan struct{}), - eventDispatcher: dispatcher, + chain: c, + accountManager: accountManager, + txPool: txPool, + eventDispatcher: dispatcher, } } diff --git a/protocol/bbft.go b/protocol/bbft.go index d026e959..d0049b5b 100644 --- a/protocol/bbft.go +++ b/protocol/bbft.go @@ -58,8 +58,8 @@ func (b *bbft) isIrreversible(block *types.Block) bool { } // NextLeaderTime returns the start time of the specified public key as the next leader node -func (b *bbft) NextLeaderTimeRange(pubkey []byte, prevBlockHash *bc.Hash) (uint64, uint64, error) { - return b.consensusNodeManager.nextLeaderTimeRange(pubkey, prevBlockHash) +func (b *bbft) IsBlocker(prevBlockHash *bc.Hash, pubkey string, timeStamp uint64) (bool, error) { + return b.consensusNodeManager.isBlocker(prevBlockHash, pubkey, timeStamp) } func (b *bbft) ApplyBlock(voteResultMap map[uint64]*state.VoteResult, block *types.Block) (err error) { @@ -168,7 +168,7 @@ func (b *bbft) validateSign(block *types.Block) (uint64, error) { block.Witness[node.order] = nil } else { correctSignNum++ - isBlocker, err := b.consensusNodeManager.isBlocker(block, pubKey) + isBlocker, err := b.consensusNodeManager.isBlocker(&block.PreviousBlockHash, pubKey, block.Timestamp) if err != nil { return 0, err } diff --git a/protocol/bbft_test.go b/protocol/bbft_test.go deleted file mode 100644 index fb6d0d38..00000000 --- a/protocol/bbft_test.go +++ /dev/null @@ -1,79 +0,0 @@ -package protocol - -import ( - "testing" -) - -func TestNextLeaderTime(t *testing.T) { - cases := []struct { - desc string - startTime uint64 - now uint64 - nodeOrder uint64 - wantError error - wantNextLeaderTime uint64 - }{ - { - desc: "normal case", - startTime: 1557906284061, - now: 1557906534061, - nodeOrder: 1, - wantError: nil, - wantNextLeaderTime: 1557906537561, - }, - { - desc: "best block height equals to start block height", - startTime: 1557906284061, - now: 1557906284061, - nodeOrder: 0, - wantError: nil, - wantNextLeaderTime: 1557906284061, - }, - { - desc: "best block height equals to start block height", - startTime: 1557906284061, - now: 1557906284061, - nodeOrder: 1, - wantError: nil, - wantNextLeaderTime: 1557906284061 + BlockNumEachNode*BlockTimeInterval, - }, - { - desc: "the node is producting block", - startTime: 1557906284061, - now: 1557906284561, - nodeOrder: 0, - wantError: nil, - wantNextLeaderTime: 1557906315561, - }, - { - desc: "the node is producting block", - startTime: 1557906284061, - now: 1557906317561, - nodeOrder: 1, - wantError: nil, - wantNextLeaderTime: 1557906348561, - }, - { - desc: "first round, must exclude genesis block", - startTime: 1557906284061, - now: 1557906286561, - nodeOrder: 3, - wantError: nil, - wantNextLeaderTime: 1557906284061 + 9*BlockTimeInterval, - }, - } - - for i, c := range cases { - nextLeaderTimestamp, err := nextLeaderTimeHelper(c.startTime, c.now, c.nodeOrder) - if err != c.wantError { - t.Fatalf("case #%d (%s) want error:%v, got error:%v", i, c.desc, c.wantError, err) - } - - if err != nil { - continue - } - if nextLeaderTimestamp != c.wantNextLeaderTime { - t.Errorf("case #%d (%s) want next leader time:%d, got next leader time:%d", i, c.desc, c.wantNextLeaderTime, nextLeaderTimestamp) - } - } -} diff --git a/protocol/consensus_node_manager.go b/protocol/consensus_node_manager.go index 18fb23ba..675b6804 100644 --- a/protocol/consensus_node_manager.go +++ b/protocol/consensus_node_manager.go @@ -3,7 +3,6 @@ package protocol import ( "encoding/hex" "sort" - "time" "github.com/vapor/config" "github.com/vapor/errors" @@ -66,8 +65,8 @@ func (c *consensusNodeManager) getConsensusNode(prevBlockHash *bc.Hash, pubkey s return node, nil } -func (c *consensusNodeManager) isBlocker(block *types.Block, pubKey string) (bool, error) { - consensusNode, err := c.getConsensusNode(&block.PreviousBlockHash, pubKey) +func (c *consensusNodeManager) isBlocker(prevBlockHash *bc.Hash, pubKey string, timeStamp uint64) (bool, error) { + consensusNode, err := c.getConsensusNode(prevBlockHash, pubKey) if err != nil && err != errNotFoundConsensusNode { return false, err } @@ -76,48 +75,15 @@ func (c *consensusNodeManager) isBlocker(block *types.Block, pubKey string) (boo return false, nil } - prevVoteRoundLastBlock, err := c.getPrevRoundVoteLastBlock(&block.PreviousBlockHash) + prevVoteRoundLastBlock, err := c.getPrevRoundVoteLastBlock(prevBlockHash) if err != nil { return false, err } startTimestamp := prevVoteRoundLastBlock.Timestamp + BlockTimeInterval - - begin := getLastBlockTimeInTimeRange(startTimestamp, block.Timestamp, consensusNode.order) + begin := getLastBlockTimeInTimeRange(startTimestamp, timeStamp, consensusNode.order) end := begin + BlockNumEachNode*BlockTimeInterval - return block.Timestamp >= begin && block.Timestamp < end, nil -} - -func (c *consensusNodeManager) nextLeaderTimeRange(pubkey []byte, prevBlockHash *bc.Hash) (uint64, uint64, error) { - consensusNode, err := c.getConsensusNode(prevBlockHash, hex.EncodeToString(pubkey)) - if err != nil { - return 0, 0, err - } - - prevRoundLastBlock, err := c.getPrevRoundVoteLastBlock(prevBlockHash) - if err != nil { - return 0, 0, err - } - - startTime := prevRoundLastBlock.Timestamp + BlockTimeInterval - - nextLeaderTime, err := nextLeaderTimeHelper(startTime, uint64(time.Now().UnixNano()/1e6), consensusNode.order) - if err != nil { - return 0, 0, err - } - - return nextLeaderTime, nextLeaderTime + BlockNumEachNode*BlockTimeInterval, nil -} - -func nextLeaderTimeHelper(startTime, now, nodeOrder uint64) (uint64, error) { - nextLeaderTimestamp := getLastBlockTimeInTimeRange(startTime, now, nodeOrder) - roundBlockTime := uint64(BlockNumEachNode * NumOfConsensusNode * BlockTimeInterval) - - if now > nextLeaderTimestamp { - nextLeaderTimestamp += roundBlockTime - } - - return nextLeaderTimestamp, nil + return timeStamp >= begin && timeStamp < end, nil } func getLastBlockTimeInTimeRange(startTimestamp, endTimestamp, order uint64) uint64 {