8 log "github.com/sirupsen/logrus"
10 "github.com/vapor/account"
11 "github.com/vapor/event"
12 "github.com/vapor/proposal"
13 "github.com/vapor/protocol"
18 logModule = "blockproposer"
21 // BlockProposer propose several block in specified time range
22 type BlockProposer struct {
25 accountManager *account.Manager
26 txPool *protocol.TxPool
30 workerWg sync.WaitGroup
31 updateNumWorkers chan struct{}
33 eventDispatcher *event.Dispatcher
36 // generateBlocks is a worker that is controlled by the proposeWorkerController.
37 // It is self contained in that it creates block templates and attempts to solve
38 // them while detecting when it is performing stale work and reacting
39 // accordingly by generating a new block template. When a block is verified, it
42 // It must be run as a goroutine.
43 func (b *BlockProposer) generateBlocks(quit chan struct{}) {
44 ticker := time.NewTicker(time.Millisecond * 100)
54 bestBlockHeader := b.chain.BestBlockHeader()
55 bestBlockHash := bestBlockHeader.Hash()
57 timeStart, timeEnd, err := b.chain.GetBBFT().NextLeaderTimeRange(pubKey, &bestBlockHash)
59 log.WithFields(log.Fields{"module": logModule, "error": err, "pubKey": hex.EncodeToString(pubKey)}).Debug("fail on get next leader time range")
63 now := uint64(time.Now().UnixNano() / 1e6)
68 time.Sleep(time.Millisecond * time.Duration(timeStart - now))
71 for now = timeStart; now < timeEnd && count < protocol.BlockNumEachNode; now = uint64(time.Now().UnixNano() / 1e6) {
72 block, err := proposal.NewBlockTemplate(b.chain, b.txPool, b.accountManager)
74 log.Errorf("failed on create NewBlockTemplate: %v", err)
76 if isOrphan, err := b.chain.ProcessBlock(block); err == nil {
77 log.WithFields(log.Fields{
79 "height": block.BlockHeader.Height,
81 "tx": len(block.Transactions),
82 }).Info("Proposer processed block")
84 // Broadcast the block and announce chain insertion event
85 if err = b.eventDispatcher.Post(event.NewProposedBlockEvent{Block: *block}); err != nil {
86 log.WithFields(log.Fields{"module": logModule, "height": block.BlockHeader.Height, "error": err}).Errorf("Proposer fail on post block")
90 log.WithFields(log.Fields{"module": logModule, "height": block.BlockHeader.Height, "error": err}).Errorf("Proposer fail on ProcessBlock")
99 // proposeWorkerController launches the worker goroutines that are used to
100 // generate block templates. It also provides the ability to
101 // dynamically adjust the number of running worker goroutines.
103 // It must be run as a goroutine.
104 func (b *BlockProposer) proposeWorkerController() {
105 // launchWorkers groups common code to launch a specified number of
106 // workers for generating blocks.
107 var runningWorkers []chan struct{}
108 launchWorkers := func(numWorkers uint64) {
109 for i := uint64(0); i < numWorkers; i++ {
110 quit := make(chan struct{})
111 runningWorkers = append(runningWorkers, quit)
114 go b.generateBlocks(quit)
118 // Launch the current number of workers by default.
119 runningWorkers = make([]chan struct{}, 0, b.numWorkers)
120 launchWorkers(b.numWorkers)
125 // Update the number of running workers.
126 case <-b.updateNumWorkers:
128 numRunning := uint64(len(runningWorkers))
129 if b.numWorkers == numRunning {
134 if b.numWorkers > numRunning {
135 launchWorkers(b.numWorkers - numRunning)
139 // Signal the most recently created goroutines to exit.
140 for i := numRunning - 1; i >= b.numWorkers; i-- {
141 close(runningWorkers[i])
142 runningWorkers[i] = nil
143 runningWorkers = runningWorkers[:i]
147 for _, quit := range runningWorkers {
157 // Start begins the block propose process as well as the speed monitor used to
158 // track hashing metrics. Calling this function when the block proposer has
159 // already been started will have no effect.
161 // This function is safe for concurrent access.
162 func (b *BlockProposer) Start() {
166 // Nothing to do if the miner is already running
171 b.quit = make(chan struct{})
172 go b.proposeWorkerController()
175 log.Infof("block proposer started")
178 // Stop gracefully stops the proposal process by signalling all workers, and the
179 // speed monitor to quit. Calling this function when the block proposer has not
180 // already been started will have no effect.
182 // This function is safe for concurrent access.
183 func (b *BlockProposer) Stop() {
187 // Nothing to do if the miner is not currently running
194 log.Info("block proposer stopped")
197 // IsProposing returns whether the block proposer has been started.
199 // This function is safe for concurrent access.
200 func (b *BlockProposer) IsProposing() bool {
207 // SetNumWorkers sets the number of workers to create which solve blocks. Any
208 // negative values will cause a default number of workers to be used which is
209 // based on the number of processor cores in the system. A value of 0 will
210 // cause all block proposer to be stopped.
212 // This function is safe for concurrent access.
213 func (b *BlockProposer) SetNumWorkers(numWorkers int32) {
218 // Don't lock until after the first check since Stop does its own
223 // Use default if provided value is negative.
225 b.numWorkers = defaultNumWorkers
227 b.numWorkers = uint64(numWorkers)
230 // When the proposer is already running, notify the controller about the
233 b.updateNumWorkers <- struct{}{}
237 // NumWorkers returns the number of workers which are running to solve blocks.
239 // This function is safe for concurrent access.
240 func (b *BlockProposer) NumWorkers() int32 {
244 return int32(b.numWorkers)
247 // NewBlockProposer returns a new instance of a block proposer for the provided configuration.
248 // Use Start to begin the proposal process. See the documentation for BlockProposer
249 // type for more details.
250 func NewBlockProposer(c *protocol.Chain, accountManager *account.Manager, txPool *protocol.TxPool, dispatcher *event.Dispatcher) *BlockProposer {
251 return &BlockProposer{
253 accountManager: accountManager,
255 numWorkers: defaultNumWorkers,
256 updateNumWorkers: make(chan struct{}),
257 eventDispatcher: dispatcher,