OSDN Git Service

fix vote result (#94)
[bytom/vapor.git] / proposal / blockproposer / blockproposer.go
1 package blockproposer
2
3 import (
4         "sync"
5         "time"
6         "encoding/hex"
7
8         log "github.com/sirupsen/logrus"
9
10         "github.com/vapor/account"
11         "github.com/vapor/event"
12         "github.com/vapor/proposal"
13         "github.com/vapor/protocol"
14 )
15
16 const (
17         defaultNumWorkers = 1
18         logModule         = "blockproposer"
19 )
20
21 // BlockProposer propose several block in specified time range
22 type BlockProposer struct {
23         sync.Mutex
24         chain            *protocol.Chain
25         accountManager   *account.Manager
26         txPool           *protocol.TxPool
27         numWorkers       uint64
28         started          bool
29         discreteMining   bool
30         workerWg         sync.WaitGroup
31         updateNumWorkers chan struct{}
32         quit             chan struct{}
33         eventDispatcher  *event.Dispatcher
34 }
35
36 // generateBlocks is a worker that is controlled by the proposeWorkerController.
37 // It is self contained in that it creates block templates and attempts to solve
38 // them while detecting when it is performing stale work and reacting
39 // accordingly by generating a new block template.  When a block is verified, it
40 // is submitted.
41 //
42 // It must be run as a goroutine.
43 func (b *BlockProposer) generateBlocks(quit chan struct{}) {
44         ticker := time.NewTicker(time.Millisecond * 100)
45         defer ticker.Stop()
46 out:
47         for {
48                 select {
49                 case <-quit:
50                         break out
51                 case <-ticker.C:
52                 }
53
54                 bestBlockHeader := b.chain.BestBlockHeader()
55                 bestBlockHash := bestBlockHeader.Hash()
56                 var pubKey []byte
57                 timeStart, timeEnd, err := b.chain.GetBBFT().NextLeaderTimeRange(pubKey, &bestBlockHash)
58                 if err != nil {
59                         log.WithFields(log.Fields{"module": logModule, "error": err, "pubKey": hex.EncodeToString(pubKey)}).Debug("fail on get next leader time range")
60                         continue
61                 }
62
63                 now := uint64(time.Now().UnixNano() / 1e6)
64                 if timeStart < now {
65                         timeStart = now
66                 }
67
68                 time.Sleep(time.Millisecond * time.Duration(timeStart - now))
69
70                 count := 0
71                 for now = timeStart; now < timeEnd && count < protocol.BlockNumEachNode; now = uint64(time.Now().UnixNano() / 1e6) {
72                         block, err := proposal.NewBlockTemplate(b.chain, b.txPool, b.accountManager)
73                         if err != nil {
74                                 log.Errorf("failed on create NewBlockTemplate: %v", err)
75                         } else {
76                                 if isOrphan, err := b.chain.ProcessBlock(block); err == nil {
77                                         log.WithFields(log.Fields{
78                                                 "module":   logModule,
79                                                 "height":   block.BlockHeader.Height,
80                                                 "isOrphan": isOrphan,
81                                                 "tx":       len(block.Transactions),
82                                         }).Info("Proposer processed block")
83
84                                         // Broadcast the block and announce chain insertion event
85                                         if err = b.eventDispatcher.Post(event.NewProposedBlockEvent{Block: *block}); err != nil {
86                                                 log.WithFields(log.Fields{"module": logModule, "height": block.BlockHeader.Height, "error": err}).Errorf("Proposer fail on post block")
87                                         }
88                                         count++
89                                 } else {
90                                         log.WithFields(log.Fields{"module": logModule, "height": block.BlockHeader.Height, "error": err}).Errorf("Proposer fail on ProcessBlock")
91                                 }
92                         }
93                 }
94         }
95
96         b.workerWg.Done()
97 }
98
99 // proposeWorkerController launches the worker goroutines that are used to
100 // generate block templates.  It also provides the ability to
101 // dynamically adjust the number of running worker goroutines.
102 //
103 // It must be run as a goroutine.
104 func (b *BlockProposer) proposeWorkerController() {
105         // launchWorkers groups common code to launch a specified number of
106         // workers for generating blocks.
107         var runningWorkers []chan struct{}
108         launchWorkers := func(numWorkers uint64) {
109                 for i := uint64(0); i < numWorkers; i++ {
110                         quit := make(chan struct{})
111                         runningWorkers = append(runningWorkers, quit)
112
113                         b.workerWg.Add(1)
114                         go b.generateBlocks(quit)
115                 }
116         }
117
118         // Launch the current number of workers by default.
119         runningWorkers = make([]chan struct{}, 0, b.numWorkers)
120         launchWorkers(b.numWorkers)
121
122 out:
123         for {
124                 select {
125                 // Update the number of running workers.
126                 case <-b.updateNumWorkers:
127                         // No change.
128                         numRunning := uint64(len(runningWorkers))
129                         if b.numWorkers == numRunning {
130                                 continue
131                         }
132
133                         // Add new workers.
134                         if b.numWorkers > numRunning {
135                                 launchWorkers(b.numWorkers - numRunning)
136                                 continue
137                         }
138
139                         // Signal the most recently created goroutines to exit.
140                         for i := numRunning - 1; i >= b.numWorkers; i-- {
141                                 close(runningWorkers[i])
142                                 runningWorkers[i] = nil
143                                 runningWorkers = runningWorkers[:i]
144                         }
145
146                 case <-b.quit:
147                         for _, quit := range runningWorkers {
148                                 close(quit)
149                         }
150                         break out
151                 }
152         }
153
154         b.workerWg.Wait()
155 }
156
157 // Start begins the block propose process as well as the speed monitor used to
158 // track hashing metrics.  Calling this function when the block proposer has
159 // already been started will have no effect.
160 //
161 // This function is safe for concurrent access.
162 func (b *BlockProposer) Start() {
163         b.Lock()
164         defer b.Unlock()
165
166         // Nothing to do if the miner is already running
167         if b.started {
168                 return
169         }
170
171         b.quit = make(chan struct{})
172         go b.proposeWorkerController()
173
174         b.started = true
175         log.Infof("block proposer started")
176 }
177
178 // Stop gracefully stops the proposal process by signalling all workers, and the
179 // speed monitor to quit.  Calling this function when the block proposer has not
180 // already been started will have no effect.
181 //
182 // This function is safe for concurrent access.
183 func (b *BlockProposer) Stop() {
184         b.Lock()
185         defer b.Unlock()
186
187         // Nothing to do if the miner is not currently running
188         if !b.started {
189                 return
190         }
191
192         close(b.quit)
193         b.started = false
194         log.Info("block proposer stopped")
195 }
196
197 // IsProposing returns whether the block proposer has been started.
198 //
199 // This function is safe for concurrent access.
200 func (b *BlockProposer) IsProposing() bool {
201         b.Lock()
202         defer b.Unlock()
203
204         return b.started
205 }
206
207 // SetNumWorkers sets the number of workers to create which solve blocks.  Any
208 // negative values will cause a default number of workers to be used which is
209 // based on the number of processor cores in the system.  A value of 0 will
210 // cause all block proposer to be stopped.
211 //
212 // This function is safe for concurrent access.
213 func (b *BlockProposer) SetNumWorkers(numWorkers int32) {
214         if numWorkers == 0 {
215                 b.Stop()
216         }
217
218         // Don't lock until after the first check since Stop does its own
219         // locking.
220         b.Lock()
221         defer b.Unlock()
222
223         // Use default if provided value is negative.
224         if numWorkers < 0 {
225                 b.numWorkers = defaultNumWorkers
226         } else {
227                 b.numWorkers = uint64(numWorkers)
228         }
229
230         // When the proposer is already running, notify the controller about the
231         // the change.
232         if b.started {
233                 b.updateNumWorkers <- struct{}{}
234         }
235 }
236
237 // NumWorkers returns the number of workers which are running to solve blocks.
238 //
239 // This function is safe for concurrent access.
240 func (b *BlockProposer) NumWorkers() int32 {
241         b.Lock()
242         defer b.Unlock()
243
244         return int32(b.numWorkers)
245 }
246
247 // NewBlockProposer returns a new instance of a block proposer for the provided configuration.
248 // Use Start to begin the proposal process.  See the documentation for BlockProposer
249 // type for more details.
250 func NewBlockProposer(c *protocol.Chain, accountManager *account.Manager, txPool *protocol.TxPool, dispatcher *event.Dispatcher) *BlockProposer {
251         return &BlockProposer{
252                 chain:            c,
253                 accountManager:   accountManager,
254                 txPool:           txPool,
255                 numWorkers:       defaultNumWorkers,
256                 updateNumWorkers: make(chan struct{}),
257                 eventDispatcher:  dispatcher,
258         }
259 }