8 log "github.com/sirupsen/logrus"
10 "github.com/vapor/account"
11 "github.com/vapor/config"
12 "github.com/vapor/event"
13 "github.com/vapor/proposal"
14 "github.com/vapor/protocol"
19 logModule = "blockproposer"
22 // BlockProposer propose several block in specified time range
23 type BlockProposer struct {
26 accountManager *account.Manager
27 txPool *protocol.TxPool
31 workerWg sync.WaitGroup
32 updateNumWorkers chan struct{}
34 eventDispatcher *event.Dispatcher
37 // generateBlocks is a worker that is controlled by the proposeWorkerController.
38 // It is self contained in that it creates block templates and attempts to solve
39 // them while detecting when it is performing stale work and reacting
40 // accordingly by generating a new block template. When a block is verified, it
43 // It must be run as a goroutine.
44 func (b *BlockProposer) generateBlocks(quit chan struct{}) {
45 ticker := time.NewTicker(time.Millisecond * 100)
55 bestBlockHeader := b.chain.BestBlockHeader()
56 bestBlockHash := bestBlockHeader.Hash()
57 pubKey := config.CommonConfig.PrivateKey().XPub()
58 timeStart, timeEnd, err := b.chain.GetBBFT().NextLeaderTimeRange(pubKey[:], &bestBlockHash)
60 log.WithFields(log.Fields{"module": logModule, "error": err, "pubKey": hex.EncodeToString(pubKey[:])}).Debug("fail on get next leader time range")
64 now := uint64(time.Now().UnixNano() / 1e6)
69 time.Sleep(time.Millisecond * time.Duration(timeStart-now))
72 for now = timeStart; now < timeEnd && count < protocol.BlockNumEachNode; now = uint64(time.Now().UnixNano() / 1e6) {
73 block, err := proposal.NewBlockTemplate(b.chain, b.txPool, b.accountManager, now)
75 log.Errorf("failed on create NewBlockTemplate: %v", err)
77 if isOrphan, err := b.chain.ProcessBlock(block); err == nil {
78 log.WithFields(log.Fields{
80 "height": block.BlockHeader.Height,
82 "tx": len(block.Transactions),
83 }).Info("Proposer processed block")
85 // Broadcast the block and announce chain insertion event
86 if err = b.eventDispatcher.Post(event.NewProposedBlockEvent{Block: *block}); err != nil {
87 log.WithFields(log.Fields{"module": logModule, "height": block.BlockHeader.Height, "error": err}).Errorf("Proposer fail on post block")
91 log.WithFields(log.Fields{"module": logModule, "height": block.BlockHeader.Height, "error": err}).Errorf("Proposer fail on ProcessBlock")
100 // proposeWorkerController launches the worker goroutines that are used to
101 // generate block templates. It also provides the ability to
102 // dynamically adjust the number of running worker goroutines.
104 // It must be run as a goroutine.
105 func (b *BlockProposer) proposeWorkerController() {
106 // launchWorkers groups common code to launch a specified number of
107 // workers for generating blocks.
108 var runningWorkers []chan struct{}
109 launchWorkers := func(numWorkers uint64) {
110 for i := uint64(0); i < numWorkers; i++ {
111 quit := make(chan struct{})
112 runningWorkers = append(runningWorkers, quit)
115 go b.generateBlocks(quit)
119 // Launch the current number of workers by default.
120 runningWorkers = make([]chan struct{}, 0, b.numWorkers)
121 launchWorkers(b.numWorkers)
126 // Update the number of running workers.
127 case <-b.updateNumWorkers:
129 numRunning := uint64(len(runningWorkers))
130 if b.numWorkers == numRunning {
135 if b.numWorkers > numRunning {
136 launchWorkers(b.numWorkers - numRunning)
140 // Signal the most recently created goroutines to exit.
141 for i := numRunning - 1; i >= b.numWorkers; i-- {
142 close(runningWorkers[i])
143 runningWorkers[i] = nil
144 runningWorkers = runningWorkers[:i]
148 for _, quit := range runningWorkers {
158 // Start begins the block propose process as well as the speed monitor used to
159 // track hashing metrics. Calling this function when the block proposer has
160 // already been started will have no effect.
162 // This function is safe for concurrent access.
163 func (b *BlockProposer) Start() {
167 // Nothing to do if the miner is already running
172 b.quit = make(chan struct{})
173 go b.proposeWorkerController()
176 log.Infof("block proposer started")
179 // Stop gracefully stops the proposal process by signalling all workers, and the
180 // speed monitor to quit. Calling this function when the block proposer has not
181 // already been started will have no effect.
183 // This function is safe for concurrent access.
184 func (b *BlockProposer) Stop() {
188 // Nothing to do if the miner is not currently running
195 log.Info("block proposer stopped")
198 // IsProposing returns whether the block proposer has been started.
200 // This function is safe for concurrent access.
201 func (b *BlockProposer) IsProposing() bool {
208 // SetNumWorkers sets the number of workers to create which solve blocks. Any
209 // negative values will cause a default number of workers to be used which is
210 // based on the number of processor cores in the system. A value of 0 will
211 // cause all block proposer to be stopped.
213 // This function is safe for concurrent access.
214 func (b *BlockProposer) SetNumWorkers(numWorkers int32) {
219 // Don't lock until after the first check since Stop does its own
224 // Use default if provided value is negative.
226 b.numWorkers = defaultNumWorkers
228 b.numWorkers = uint64(numWorkers)
231 // When the proposer is already running, notify the controller about the
234 b.updateNumWorkers <- struct{}{}
238 // NumWorkers returns the number of workers which are running to solve blocks.
240 // This function is safe for concurrent access.
241 func (b *BlockProposer) NumWorkers() int32 {
245 return int32(b.numWorkers)
248 // NewBlockProposer returns a new instance of a block proposer for the provided configuration.
249 // Use Start to begin the proposal process. See the documentation for BlockProposer
250 // type for more details.
251 func NewBlockProposer(c *protocol.Chain, accountManager *account.Manager, txPool *protocol.TxPool, dispatcher *event.Dispatcher) *BlockProposer {
252 return &BlockProposer{
254 accountManager: accountManager,
256 numWorkers: defaultNumWorkers,
257 updateNumWorkers: make(chan struct{}),
258 eventDispatcher: dispatcher,