)
const (
- defaultNumWorkers = 1
- logModule = "blockproposer"
+ logModule = "blockproposer"
)
// BlockProposer propose several block in specified time range
type BlockProposer struct {
sync.Mutex
- chain *protocol.Chain
- accountManager *account.Manager
- txPool *protocol.TxPool
- numWorkers uint64
- started bool
- discreteMining bool
- workerWg sync.WaitGroup
- updateNumWorkers chan struct{}
- quit chan struct{}
- eventDispatcher *event.Dispatcher
+ chain *protocol.Chain
+ accountManager *account.Manager
+ txPool *protocol.TxPool
+ started bool
+ quit chan struct{}
+ eventDispatcher *event.Dispatcher
}
// generateBlocks is a worker that is controlled by the proposeWorkerController.
// is submitted.
//
// It must be run as a goroutine.
-func (b *BlockProposer) generateBlocks(quit chan struct{}) {
- ticker := time.NewTicker(time.Millisecond * 100)
+func (b *BlockProposer) generateBlocks() {
+ xpub := config.CommonConfig.PrivateKey().XPub()
+ xpubStr := hex.EncodeToString(xpub[:])
+ ticker := time.NewTicker(time.Millisecond * 500)
defer ticker.Stop()
-out:
+
for {
select {
- case <-quit:
- break out
+ case <-b.quit:
+ return
case <-ticker.C:
}
bestBlockHeader := b.chain.BestBlockHeader()
bestBlockHash := bestBlockHeader.Hash()
- pubKey := config.CommonConfig.PrivateKey().XPub()
- timeStart, timeEnd, err := b.chain.GetBBFT().NextLeaderTimeRange(pubKey[:], &bestBlockHash)
- if err != nil {
- log.WithFields(log.Fields{"module": logModule, "error": err, "pubKey": hex.EncodeToString(pubKey[:])}).Debug("fail on get next leader time range")
- continue
+ nextBlockTime := uint64(time.Now().UnixNano() / 1e6)
+ if minNextBlockTime := bestBlockHeader.Timestamp + uint64(500*time.Millisecond); nextBlockTime < minNextBlockTime {
+ nextBlockTime = minNextBlockTime
}
- now := uint64(time.Now().UnixNano() / 1e6)
- if timeStart < now {
- timeStart = now
+ if isBlocker, err := b.chain.GetBBFT().IsBlocker(&bestBlockHash, xpubStr, nextBlockTime); !isBlocker {
+ log.WithFields(log.Fields{"module": logModule, "error": err, "pubKey": xpubStr}).Debug("fail on check is next blocker")
+ continue
}
- time.Sleep(time.Millisecond * time.Duration(timeStart-now))
-
- count := 0
- for now = timeStart; now < timeEnd && count < protocol.BlockNumEachNode; now = uint64(time.Now().UnixNano() / 1e6) {
- block, err := proposal.NewBlockTemplate(b.chain, b.txPool, b.accountManager, now)
- if err != nil {
- log.Errorf("failed on create NewBlockTemplate: %v", err)
- } else {
- if isOrphan, err := b.chain.ProcessBlock(block); err == nil {
- log.WithFields(log.Fields{
- "module": logModule,
- "height": block.BlockHeader.Height,
- "isOrphan": isOrphan,
- "tx": len(block.Transactions),
- }).Info("Proposer processed block")
-
- // Broadcast the block and announce chain insertion event
- if err = b.eventDispatcher.Post(event.NewProposedBlockEvent{Block: *block}); err != nil {
- log.WithFields(log.Fields{"module": logModule, "height": block.BlockHeader.Height, "error": err}).Errorf("Proposer fail on post block")
- }
- count++
- } else {
- log.WithFields(log.Fields{"module": logModule, "height": block.BlockHeader.Height, "error": err}).Errorf("Proposer fail on ProcessBlock")
- }
- }
+ block, err := proposal.NewBlockTemplate(b.chain, b.txPool, b.accountManager, nextBlockTime)
+ if err != nil {
+ log.WithFields(log.Fields{"module": logModule, "error": err}).Error("failed on create NewBlockTemplate")
+ continue
}
- }
-
- b.workerWg.Done()
-}
-// proposeWorkerController launches the worker goroutines that are used to
-// generate block templates. It also provides the ability to
-// dynamically adjust the number of running worker goroutines.
-//
-// It must be run as a goroutine.
-func (b *BlockProposer) proposeWorkerController() {
- // launchWorkers groups common code to launch a specified number of
- // workers for generating blocks.
- var runningWorkers []chan struct{}
- launchWorkers := func(numWorkers uint64) {
- for i := uint64(0); i < numWorkers; i++ {
- quit := make(chan struct{})
- runningWorkers = append(runningWorkers, quit)
-
- b.workerWg.Add(1)
- go b.generateBlocks(quit)
+ isOrphan, err := b.chain.ProcessBlock(block)
+ if err != nil {
+ log.WithFields(log.Fields{"module": logModule, "height": block.BlockHeader.Height, "error": err}).Error("proposer fail on ProcessBlock")
+ continue
}
- }
-
- // Launch the current number of workers by default.
- runningWorkers = make([]chan struct{}, 0, b.numWorkers)
- launchWorkers(b.numWorkers)
-
-out:
- for {
- select {
- // Update the number of running workers.
- case <-b.updateNumWorkers:
- // No change.
- numRunning := uint64(len(runningWorkers))
- if b.numWorkers == numRunning {
- continue
- }
-
- // Add new workers.
- if b.numWorkers > numRunning {
- launchWorkers(b.numWorkers - numRunning)
- continue
- }
- // Signal the most recently created goroutines to exit.
- for i := numRunning - 1; i >= b.numWorkers; i-- {
- close(runningWorkers[i])
- runningWorkers[i] = nil
- runningWorkers = runningWorkers[:i]
- }
-
- case <-b.quit:
- for _, quit := range runningWorkers {
- close(quit)
- }
- break out
+ log.WithFields(log.Fields{"module": logModule, "height": block.BlockHeader.Height, "isOrphan": isOrphan, "tx": len(block.Transactions)}).Info("proposer processed block")
+ // Broadcast the block and announce chain insertion event
+ if err = b.eventDispatcher.Post(event.NewProposedBlockEvent{Block: *block}); err != nil {
+ log.WithFields(log.Fields{"module": logModule, "height": block.BlockHeader.Height, "error": err}).Error("proposer fail on post block")
}
}
-
- b.workerWg.Wait()
}
// Start begins the block propose process as well as the speed monitor used to
}
b.quit = make(chan struct{})
- go b.proposeWorkerController()
+ go b.generateBlocks()
b.started = true
log.Infof("block proposer started")
return b.started
}
-// SetNumWorkers sets the number of workers to create which solve blocks. Any
-// negative values will cause a default number of workers to be used which is
-// based on the number of processor cores in the system. A value of 0 will
-// cause all block proposer to be stopped.
-//
-// This function is safe for concurrent access.
-func (b *BlockProposer) SetNumWorkers(numWorkers int32) {
- if numWorkers == 0 {
- b.Stop()
- }
-
- // Don't lock until after the first check since Stop does its own
- // locking.
- b.Lock()
- defer b.Unlock()
-
- // Use default if provided value is negative.
- if numWorkers < 0 {
- b.numWorkers = defaultNumWorkers
- } else {
- b.numWorkers = uint64(numWorkers)
- }
-
- // When the proposer is already running, notify the controller about the
- // the change.
- if b.started {
- b.updateNumWorkers <- struct{}{}
- }
-}
-
-// NumWorkers returns the number of workers which are running to solve blocks.
-//
-// This function is safe for concurrent access.
-func (b *BlockProposer) NumWorkers() int32 {
- b.Lock()
- defer b.Unlock()
-
- return int32(b.numWorkers)
-}
-
// NewBlockProposer returns a new instance of a block proposer for the provided configuration.
// Use Start to begin the proposal process. See the documentation for BlockProposer
// type for more details.
func NewBlockProposer(c *protocol.Chain, accountManager *account.Manager, txPool *protocol.TxPool, dispatcher *event.Dispatcher) *BlockProposer {
return &BlockProposer{
- chain: c,
- accountManager: accountManager,
- txPool: txPool,
- numWorkers: defaultNumWorkers,
- updateNumWorkers: make(chan struct{}),
- eventDispatcher: dispatcher,
+ chain: c,
+ accountManager: accountManager,
+ txPool: txPool,
+ eventDispatcher: dispatcher,
}
}
+++ /dev/null
-package protocol
-
-import (
- "testing"
-)
-
-func TestNextLeaderTime(t *testing.T) {
- cases := []struct {
- desc string
- startTime uint64
- now uint64
- nodeOrder uint64
- wantError error
- wantNextLeaderTime uint64
- }{
- {
- desc: "normal case",
- startTime: 1557906284061,
- now: 1557906534061,
- nodeOrder: 1,
- wantError: nil,
- wantNextLeaderTime: 1557906537561,
- },
- {
- desc: "best block height equals to start block height",
- startTime: 1557906284061,
- now: 1557906284061,
- nodeOrder: 0,
- wantError: nil,
- wantNextLeaderTime: 1557906284061,
- },
- {
- desc: "best block height equals to start block height",
- startTime: 1557906284061,
- now: 1557906284061,
- nodeOrder: 1,
- wantError: nil,
- wantNextLeaderTime: 1557906284061 + BlockNumEachNode*BlockTimeInterval,
- },
- {
- desc: "the node is producting block",
- startTime: 1557906284061,
- now: 1557906284561,
- nodeOrder: 0,
- wantError: nil,
- wantNextLeaderTime: 1557906315561,
- },
- {
- desc: "the node is producting block",
- startTime: 1557906284061,
- now: 1557906317561,
- nodeOrder: 1,
- wantError: nil,
- wantNextLeaderTime: 1557906348561,
- },
- {
- desc: "first round, must exclude genesis block",
- startTime: 1557906284061,
- now: 1557906286561,
- nodeOrder: 3,
- wantError: nil,
- wantNextLeaderTime: 1557906284061 + 9*BlockTimeInterval,
- },
- }
-
- for i, c := range cases {
- nextLeaderTimestamp, err := nextLeaderTimeHelper(c.startTime, c.now, c.nodeOrder)
- if err != c.wantError {
- t.Fatalf("case #%d (%s) want error:%v, got error:%v", i, c.desc, c.wantError, err)
- }
-
- if err != nil {
- continue
- }
- if nextLeaderTimestamp != c.wantNextLeaderTime {
- t.Errorf("case #%d (%s) want next leader time:%d, got next leader time:%d", i, c.desc, c.wantNextLeaderTime, nextLeaderTimestamp)
- }
- }
-}
import (
"encoding/hex"
"sort"
- "time"
"github.com/vapor/config"
"github.com/vapor/errors"
return node, nil
}
-func (c *consensusNodeManager) isBlocker(block *types.Block, pubKey string) (bool, error) {
- consensusNode, err := c.getConsensusNode(&block.PreviousBlockHash, pubKey)
+func (c *consensusNodeManager) isBlocker(prevBlockHash *bc.Hash, pubKey string, timeStamp uint64) (bool, error) {
+ consensusNode, err := c.getConsensusNode(prevBlockHash, pubKey)
if err != nil && err != errNotFoundConsensusNode {
return false, err
}
return false, nil
}
- prevVoteRoundLastBlock, err := c.getPrevRoundVoteLastBlock(&block.PreviousBlockHash)
+ prevVoteRoundLastBlock, err := c.getPrevRoundVoteLastBlock(prevBlockHash)
if err != nil {
return false, err
}
startTimestamp := prevVoteRoundLastBlock.Timestamp + BlockTimeInterval
-
- begin := getLastBlockTimeInTimeRange(startTimestamp, block.Timestamp, consensusNode.order)
+ begin := getLastBlockTimeInTimeRange(startTimestamp, timeStamp, consensusNode.order)
end := begin + BlockNumEachNode*BlockTimeInterval
- return block.Timestamp >= begin && block.Timestamp < end, nil
-}
-
-func (c *consensusNodeManager) nextLeaderTimeRange(pubkey []byte, prevBlockHash *bc.Hash) (uint64, uint64, error) {
- consensusNode, err := c.getConsensusNode(prevBlockHash, hex.EncodeToString(pubkey))
- if err != nil {
- return 0, 0, err
- }
-
- prevRoundLastBlock, err := c.getPrevRoundVoteLastBlock(prevBlockHash)
- if err != nil {
- return 0, 0, err
- }
-
- startTime := prevRoundLastBlock.Timestamp + BlockTimeInterval
-
- nextLeaderTime, err := nextLeaderTimeHelper(startTime, uint64(time.Now().UnixNano()/1e6), consensusNode.order)
- if err != nil {
- return 0, 0, err
- }
-
- return nextLeaderTime, nextLeaderTime + BlockNumEachNode*BlockTimeInterval, nil
-}
-
-func nextLeaderTimeHelper(startTime, now, nodeOrder uint64) (uint64, error) {
- nextLeaderTimestamp := getLastBlockTimeInTimeRange(startTime, now, nodeOrder)
- roundBlockTime := uint64(BlockNumEachNode * NumOfConsensusNode * BlockTimeInterval)
-
- if now > nextLeaderTimestamp {
- nextLeaderTimestamp += roundBlockTime
- }
-
- return nextLeaderTimestamp, nil
+ return timeStamp >= begin && timeStamp < end, nil
}
func getLastBlockTimeInTimeRange(startTimestamp, endTimestamp, order uint64) uint64 {