8 log "github.com/sirupsen/logrus"
10 "github.com/vapor/account"
11 "github.com/vapor/event"
12 "github.com/vapor/proposal"
13 "github.com/vapor/protocol"
18 logModule = "blockproposer"
21 // BlockProposer propose several block in specified time range
22 type BlockProposer struct {
25 accountManager *account.Manager
26 txPool *protocol.TxPool
30 workerWg sync.WaitGroup
31 updateNumWorkers chan struct{}
33 eventDispatcher *event.Dispatcher
36 // generateBlocks is a worker that is controlled by the proposeWorkerController.
37 // It is self contained in that it creates block templates and attempts to solve
38 // them while detecting when it is performing stale work and reacting
39 // accordingly by generating a new block template. When a block is verified, it
42 // It must be run as a goroutine.
43 func (b *BlockProposer) generateBlocks(quit chan struct{}) {
44 ticker := time.NewTicker(time.Millisecond * 100)
54 bestBlockHeader := b.chain.BestBlockHeader()
56 timeStart, timeEnd, err := b.chain.GetBBFT().NextLeaderTimeRange(pubKey, bestBlockHeader.Timestamp, bestBlockHeader.Height)
58 log.WithFields(log.Fields{"module": logModule, "error": err, "pubKey": hex.EncodeToString(pubKey)}).Debug("fail on get next leader time range")
62 now := uint64(time.Now().UnixNano() / 1e6)
67 time.Sleep(time.Millisecond * time.Duration(timeStart - now))
70 for now = timeStart; now < timeEnd && count < protocol.BlockNumEachNode; now = uint64(time.Now().UnixNano() / 1e6) {
71 block, err := proposal.NewBlockTemplate(b.chain, b.txPool, b.accountManager)
73 log.Errorf("failed on create NewBlockTemplate: %v", err)
75 if isOrphan, err := b.chain.ProcessBlock(block); err == nil {
76 log.WithFields(log.Fields{
78 "height": block.BlockHeader.Height,
80 "tx": len(block.Transactions),
81 }).Info("Proposer processed block")
83 // Broadcast the block and announce chain insertion event
84 if err = b.eventDispatcher.Post(event.NewProposedBlockEvent{Block: *block}); err != nil {
85 log.WithFields(log.Fields{"module": logModule, "height": block.BlockHeader.Height, "error": err}).Errorf("Proposer fail on post block")
89 log.WithFields(log.Fields{"module": logModule, "height": block.BlockHeader.Height, "error": err}).Errorf("Proposer fail on ProcessBlock")
98 // proposeWorkerController launches the worker goroutines that are used to
99 // generate block templates. It also provides the ability to
100 // dynamically adjust the number of running worker goroutines.
102 // It must be run as a goroutine.
103 func (b *BlockProposer) proposeWorkerController() {
104 // launchWorkers groups common code to launch a specified number of
105 // workers for generating blocks.
106 var runningWorkers []chan struct{}
107 launchWorkers := func(numWorkers uint64) {
108 for i := uint64(0); i < numWorkers; i++ {
109 quit := make(chan struct{})
110 runningWorkers = append(runningWorkers, quit)
113 go b.generateBlocks(quit)
117 // Launch the current number of workers by default.
118 runningWorkers = make([]chan struct{}, 0, b.numWorkers)
119 launchWorkers(b.numWorkers)
124 // Update the number of running workers.
125 case <-b.updateNumWorkers:
127 numRunning := uint64(len(runningWorkers))
128 if b.numWorkers == numRunning {
133 if b.numWorkers > numRunning {
134 launchWorkers(b.numWorkers - numRunning)
138 // Signal the most recently created goroutines to exit.
139 for i := numRunning - 1; i >= b.numWorkers; i-- {
140 close(runningWorkers[i])
141 runningWorkers[i] = nil
142 runningWorkers = runningWorkers[:i]
146 for _, quit := range runningWorkers {
156 // Start begins the block propose process as well as the speed monitor used to
157 // track hashing metrics. Calling this function when the block proposer has
158 // already been started will have no effect.
160 // This function is safe for concurrent access.
161 func (b *BlockProposer) Start() {
165 // Nothing to do if the miner is already running
170 b.quit = make(chan struct{})
171 go b.proposeWorkerController()
174 log.Infof("block proposer started")
177 // Stop gracefully stops the proposal process by signalling all workers, and the
178 // speed monitor to quit. Calling this function when the block proposer has not
179 // already been started will have no effect.
181 // This function is safe for concurrent access.
182 func (b *BlockProposer) Stop() {
186 // Nothing to do if the miner is not currently running
193 log.Info("block proposer stopped")
196 // IsProposing returns whether the block proposer has been started.
198 // This function is safe for concurrent access.
199 func (b *BlockProposer) IsProposing() bool {
206 // SetNumWorkers sets the number of workers to create which solve blocks. Any
207 // negative values will cause a default number of workers to be used which is
208 // based on the number of processor cores in the system. A value of 0 will
209 // cause all block proposer to be stopped.
211 // This function is safe for concurrent access.
212 func (b *BlockProposer) SetNumWorkers(numWorkers int32) {
217 // Don't lock until after the first check since Stop does its own
222 // Use default if provided value is negative.
224 b.numWorkers = defaultNumWorkers
226 b.numWorkers = uint64(numWorkers)
229 // When the proposer is already running, notify the controller about the
232 b.updateNumWorkers <- struct{}{}
236 // NumWorkers returns the number of workers which are running to solve blocks.
238 // This function is safe for concurrent access.
239 func (b *BlockProposer) NumWorkers() int32 {
243 return int32(b.numWorkers)
246 // NewBlockProposer returns a new instance of a block proposer for the provided configuration.
247 // Use Start to begin the proposal process. See the documentation for BlockProposer
248 // type for more details.
249 func NewBlockProposer(c *protocol.Chain, accountManager *account.Manager, txPool *protocol.TxPool, dispatcher *event.Dispatcher) *BlockProposer {
250 return &BlockProposer{
252 accountManager: accountManager,
254 numWorkers: defaultNumWorkers,
255 updateNumWorkers: make(chan struct{}),
256 eventDispatcher: dispatcher,