OSDN Git Service

modify miner block to propose block (#92)
[bytom/vapor.git] / proposal / blockproposer / blockproposer.go
1 package blockproposer
2
3 import (
4         "sync"
5         "time"
6         "encoding/hex"
7
8         log "github.com/sirupsen/logrus"
9
10         "github.com/vapor/account"
11         "github.com/vapor/event"
12         "github.com/vapor/proposal"
13         "github.com/vapor/protocol"
14 )
15
16 const (
17         defaultNumWorkers = 1
18         logModule         = "blockproposer"
19 )
20
21 // BlockProposer propose several block in specified time range
22 type BlockProposer struct {
23         sync.Mutex
24         chain            *protocol.Chain
25         accountManager   *account.Manager
26         txPool           *protocol.TxPool
27         numWorkers       uint64
28         started          bool
29         discreteMining   bool
30         workerWg         sync.WaitGroup
31         updateNumWorkers chan struct{}
32         quit             chan struct{}
33         eventDispatcher  *event.Dispatcher
34 }
35
36 // generateBlocks is a worker that is controlled by the proposeWorkerController.
37 // It is self contained in that it creates block templates and attempts to solve
38 // them while detecting when it is performing stale work and reacting
39 // accordingly by generating a new block template.  When a block is verified, it
40 // is submitted.
41 //
42 // It must be run as a goroutine.
43 func (b *BlockProposer) generateBlocks(quit chan struct{}) {
44         ticker := time.NewTicker(time.Millisecond * 100)
45         defer ticker.Stop()
46 out:
47         for {
48                 select {
49                 case <-quit:
50                         break out
51                 case <-ticker.C:
52                 }
53
54                 bestBlockHeader := b.chain.BestBlockHeader()
55                 var pubKey []byte
56                 timeStart, timeEnd, err := b.chain.GetBBFT().NextLeaderTimeRange(pubKey, bestBlockHeader.Timestamp, bestBlockHeader.Height)
57                 if err != nil {
58                         log.WithFields(log.Fields{"module": logModule, "error": err, "pubKey": hex.EncodeToString(pubKey)}).Debug("fail on get next leader time range")
59                         continue
60                 }
61
62                 now := uint64(time.Now().UnixNano() / 1e6)
63                 if timeStart < now {
64                         timeStart = now
65                 }
66
67                 time.Sleep(time.Millisecond * time.Duration(timeStart - now))
68
69                 count := 0
70                 for now = timeStart; now < timeEnd && count < protocol.BlockNumEachNode; now = uint64(time.Now().UnixNano() / 1e6) {
71                         block, err := proposal.NewBlockTemplate(b.chain, b.txPool, b.accountManager)
72                         if err != nil {
73                                 log.Errorf("failed on create NewBlockTemplate: %v", err)
74                         } else {
75                                 if isOrphan, err := b.chain.ProcessBlock(block); err == nil {
76                                         log.WithFields(log.Fields{
77                                                 "module":   logModule,
78                                                 "height":   block.BlockHeader.Height,
79                                                 "isOrphan": isOrphan,
80                                                 "tx":       len(block.Transactions),
81                                         }).Info("Proposer processed block")
82
83                                         // Broadcast the block and announce chain insertion event
84                                         if err = b.eventDispatcher.Post(event.NewProposedBlockEvent{Block: *block}); err != nil {
85                                                 log.WithFields(log.Fields{"module": logModule, "height": block.BlockHeader.Height, "error": err}).Errorf("Proposer fail on post block")
86                                         }
87                                         count++
88                                 } else {
89                                         log.WithFields(log.Fields{"module": logModule, "height": block.BlockHeader.Height, "error": err}).Errorf("Proposer fail on ProcessBlock")
90                                 }
91                         }
92                 }
93         }
94
95         b.workerWg.Done()
96 }
97
98 // proposeWorkerController launches the worker goroutines that are used to
99 // generate block templates.  It also provides the ability to
100 // dynamically adjust the number of running worker goroutines.
101 //
102 // It must be run as a goroutine.
103 func (b *BlockProposer) proposeWorkerController() {
104         // launchWorkers groups common code to launch a specified number of
105         // workers for generating blocks.
106         var runningWorkers []chan struct{}
107         launchWorkers := func(numWorkers uint64) {
108                 for i := uint64(0); i < numWorkers; i++ {
109                         quit := make(chan struct{})
110                         runningWorkers = append(runningWorkers, quit)
111
112                         b.workerWg.Add(1)
113                         go b.generateBlocks(quit)
114                 }
115         }
116
117         // Launch the current number of workers by default.
118         runningWorkers = make([]chan struct{}, 0, b.numWorkers)
119         launchWorkers(b.numWorkers)
120
121 out:
122         for {
123                 select {
124                 // Update the number of running workers.
125                 case <-b.updateNumWorkers:
126                         // No change.
127                         numRunning := uint64(len(runningWorkers))
128                         if b.numWorkers == numRunning {
129                                 continue
130                         }
131
132                         // Add new workers.
133                         if b.numWorkers > numRunning {
134                                 launchWorkers(b.numWorkers - numRunning)
135                                 continue
136                         }
137
138                         // Signal the most recently created goroutines to exit.
139                         for i := numRunning - 1; i >= b.numWorkers; i-- {
140                                 close(runningWorkers[i])
141                                 runningWorkers[i] = nil
142                                 runningWorkers = runningWorkers[:i]
143                         }
144
145                 case <-b.quit:
146                         for _, quit := range runningWorkers {
147                                 close(quit)
148                         }
149                         break out
150                 }
151         }
152
153         b.workerWg.Wait()
154 }
155
156 // Start begins the block propose process as well as the speed monitor used to
157 // track hashing metrics.  Calling this function when the block proposer has
158 // already been started will have no effect.
159 //
160 // This function is safe for concurrent access.
161 func (b *BlockProposer) Start() {
162         b.Lock()
163         defer b.Unlock()
164
165         // Nothing to do if the miner is already running
166         if b.started {
167                 return
168         }
169
170         b.quit = make(chan struct{})
171         go b.proposeWorkerController()
172
173         b.started = true
174         log.Infof("block proposer started")
175 }
176
177 // Stop gracefully stops the proposal process by signalling all workers, and the
178 // speed monitor to quit.  Calling this function when the block proposer has not
179 // already been started will have no effect.
180 //
181 // This function is safe for concurrent access.
182 func (b *BlockProposer) Stop() {
183         b.Lock()
184         defer b.Unlock()
185
186         // Nothing to do if the miner is not currently running
187         if !b.started {
188                 return
189         }
190
191         close(b.quit)
192         b.started = false
193         log.Info("block proposer stopped")
194 }
195
196 // IsProposing returns whether the block proposer has been started.
197 //
198 // This function is safe for concurrent access.
199 func (b *BlockProposer) IsProposing() bool {
200         b.Lock()
201         defer b.Unlock()
202
203         return b.started
204 }
205
206 // SetNumWorkers sets the number of workers to create which solve blocks.  Any
207 // negative values will cause a default number of workers to be used which is
208 // based on the number of processor cores in the system.  A value of 0 will
209 // cause all block proposer to be stopped.
210 //
211 // This function is safe for concurrent access.
212 func (b *BlockProposer) SetNumWorkers(numWorkers int32) {
213         if numWorkers == 0 {
214                 b.Stop()
215         }
216
217         // Don't lock until after the first check since Stop does its own
218         // locking.
219         b.Lock()
220         defer b.Unlock()
221
222         // Use default if provided value is negative.
223         if numWorkers < 0 {
224                 b.numWorkers = defaultNumWorkers
225         } else {
226                 b.numWorkers = uint64(numWorkers)
227         }
228
229         // When the proposer is already running, notify the controller about the
230         // the change.
231         if b.started {
232                 b.updateNumWorkers <- struct{}{}
233         }
234 }
235
236 // NumWorkers returns the number of workers which are running to solve blocks.
237 //
238 // This function is safe for concurrent access.
239 func (b *BlockProposer) NumWorkers() int32 {
240         b.Lock()
241         defer b.Unlock()
242
243         return int32(b.numWorkers)
244 }
245
246 // NewBlockProposer returns a new instance of a block proposer for the provided configuration.
247 // Use Start to begin the proposal process.  See the documentation for BlockProposer
248 // type for more details.
249 func NewBlockProposer(c *protocol.Chain, accountManager *account.Manager, txPool *protocol.TxPool, dispatcher *event.Dispatcher) *BlockProposer {
250         return &BlockProposer{
251                 chain:            c,
252                 accountManager:   accountManager,
253                 txPool:           txPool,
254                 numWorkers:       defaultNumWorkers,
255                 updateNumWorkers: make(chan struct{}),
256                 eventDispatcher:  dispatcher,
257         }
258 }