OSDN Git Service

fix next leader time (#98)
[bytom/vapor.git] / proposal / blockproposer / blockproposer.go
1 package blockproposer
2
3 import (
4         "encoding/hex"
5         "sync"
6         "time"
7
8         log "github.com/sirupsen/logrus"
9
10         "github.com/vapor/account"
11         "github.com/vapor/config"
12         "github.com/vapor/event"
13         "github.com/vapor/proposal"
14         "github.com/vapor/protocol"
15 )
16
17 const (
18         defaultNumWorkers = 1
19         logModule         = "blockproposer"
20 )
21
22 // BlockProposer propose several block in specified time range
23 type BlockProposer struct {
24         sync.Mutex
25         chain            *protocol.Chain
26         accountManager   *account.Manager
27         txPool           *protocol.TxPool
28         numWorkers       uint64
29         started          bool
30         discreteMining   bool
31         workerWg         sync.WaitGroup
32         updateNumWorkers chan struct{}
33         quit             chan struct{}
34         eventDispatcher  *event.Dispatcher
35 }
36
37 // generateBlocks is a worker that is controlled by the proposeWorkerController.
38 // It is self contained in that it creates block templates and attempts to solve
39 // them while detecting when it is performing stale work and reacting
40 // accordingly by generating a new block template.  When a block is verified, it
41 // is submitted.
42 //
43 // It must be run as a goroutine.
44 func (b *BlockProposer) generateBlocks(quit chan struct{}) {
45         ticker := time.NewTicker(time.Millisecond * 100)
46         defer ticker.Stop()
47 out:
48         for {
49                 select {
50                 case <-quit:
51                         break out
52                 case <-ticker.C:
53                 }
54
55                 bestBlockHeader := b.chain.BestBlockHeader()
56                 bestBlockHash := bestBlockHeader.Hash()
57                 pubKey := config.CommonConfig.PrivateKey().XPub()
58                 timeStart, timeEnd, err := b.chain.GetBBFT().NextLeaderTimeRange(pubKey[:], &bestBlockHash)
59                 if err != nil {
60                         log.WithFields(log.Fields{"module": logModule, "error": err, "pubKey": hex.EncodeToString(pubKey[:])}).Debug("fail on get next leader time range")
61                         continue
62                 }
63
64                 now := uint64(time.Now().UnixNano() / 1e6)
65                 if timeStart < now {
66                         timeStart = now
67                 }
68
69                 time.Sleep(time.Millisecond * time.Duration(timeStart-now))
70
71                 count := 0
72                 for now = timeStart; now < timeEnd && count < protocol.BlockNumEachNode; now = uint64(time.Now().UnixNano() / 1e6) {
73                         block, err := proposal.NewBlockTemplate(b.chain, b.txPool, b.accountManager, now)
74                         if err != nil {
75                                 log.Errorf("failed on create NewBlockTemplate: %v", err)
76                         } else {
77                                 if isOrphan, err := b.chain.ProcessBlock(block); err == nil {
78                                         log.WithFields(log.Fields{
79                                                 "module":   logModule,
80                                                 "height":   block.BlockHeader.Height,
81                                                 "isOrphan": isOrphan,
82                                                 "tx":       len(block.Transactions),
83                                         }).Info("Proposer processed block")
84
85                                         // Broadcast the block and announce chain insertion event
86                                         if err = b.eventDispatcher.Post(event.NewProposedBlockEvent{Block: *block}); err != nil {
87                                                 log.WithFields(log.Fields{"module": logModule, "height": block.BlockHeader.Height, "error": err}).Errorf("Proposer fail on post block")
88                                         }
89                                         count++
90                                 } else {
91                                         log.WithFields(log.Fields{"module": logModule, "height": block.BlockHeader.Height, "error": err}).Errorf("Proposer fail on ProcessBlock")
92                                 }
93                         }
94                 }
95         }
96
97         b.workerWg.Done()
98 }
99
100 // proposeWorkerController launches the worker goroutines that are used to
101 // generate block templates.  It also provides the ability to
102 // dynamically adjust the number of running worker goroutines.
103 //
104 // It must be run as a goroutine.
105 func (b *BlockProposer) proposeWorkerController() {
106         // launchWorkers groups common code to launch a specified number of
107         // workers for generating blocks.
108         var runningWorkers []chan struct{}
109         launchWorkers := func(numWorkers uint64) {
110                 for i := uint64(0); i < numWorkers; i++ {
111                         quit := make(chan struct{})
112                         runningWorkers = append(runningWorkers, quit)
113
114                         b.workerWg.Add(1)
115                         go b.generateBlocks(quit)
116                 }
117         }
118
119         // Launch the current number of workers by default.
120         runningWorkers = make([]chan struct{}, 0, b.numWorkers)
121         launchWorkers(b.numWorkers)
122
123 out:
124         for {
125                 select {
126                 // Update the number of running workers.
127                 case <-b.updateNumWorkers:
128                         // No change.
129                         numRunning := uint64(len(runningWorkers))
130                         if b.numWorkers == numRunning {
131                                 continue
132                         }
133
134                         // Add new workers.
135                         if b.numWorkers > numRunning {
136                                 launchWorkers(b.numWorkers - numRunning)
137                                 continue
138                         }
139
140                         // Signal the most recently created goroutines to exit.
141                         for i := numRunning - 1; i >= b.numWorkers; i-- {
142                                 close(runningWorkers[i])
143                                 runningWorkers[i] = nil
144                                 runningWorkers = runningWorkers[:i]
145                         }
146
147                 case <-b.quit:
148                         for _, quit := range runningWorkers {
149                                 close(quit)
150                         }
151                         break out
152                 }
153         }
154
155         b.workerWg.Wait()
156 }
157
158 // Start begins the block propose process as well as the speed monitor used to
159 // track hashing metrics.  Calling this function when the block proposer has
160 // already been started will have no effect.
161 //
162 // This function is safe for concurrent access.
163 func (b *BlockProposer) Start() {
164         b.Lock()
165         defer b.Unlock()
166
167         // Nothing to do if the miner is already running
168         if b.started {
169                 return
170         }
171
172         b.quit = make(chan struct{})
173         go b.proposeWorkerController()
174
175         b.started = true
176         log.Infof("block proposer started")
177 }
178
179 // Stop gracefully stops the proposal process by signalling all workers, and the
180 // speed monitor to quit.  Calling this function when the block proposer has not
181 // already been started will have no effect.
182 //
183 // This function is safe for concurrent access.
184 func (b *BlockProposer) Stop() {
185         b.Lock()
186         defer b.Unlock()
187
188         // Nothing to do if the miner is not currently running
189         if !b.started {
190                 return
191         }
192
193         close(b.quit)
194         b.started = false
195         log.Info("block proposer stopped")
196 }
197
198 // IsProposing returns whether the block proposer has been started.
199 //
200 // This function is safe for concurrent access.
201 func (b *BlockProposer) IsProposing() bool {
202         b.Lock()
203         defer b.Unlock()
204
205         return b.started
206 }
207
208 // SetNumWorkers sets the number of workers to create which solve blocks.  Any
209 // negative values will cause a default number of workers to be used which is
210 // based on the number of processor cores in the system.  A value of 0 will
211 // cause all block proposer to be stopped.
212 //
213 // This function is safe for concurrent access.
214 func (b *BlockProposer) SetNumWorkers(numWorkers int32) {
215         if numWorkers == 0 {
216                 b.Stop()
217         }
218
219         // Don't lock until after the first check since Stop does its own
220         // locking.
221         b.Lock()
222         defer b.Unlock()
223
224         // Use default if provided value is negative.
225         if numWorkers < 0 {
226                 b.numWorkers = defaultNumWorkers
227         } else {
228                 b.numWorkers = uint64(numWorkers)
229         }
230
231         // When the proposer is already running, notify the controller about the
232         // the change.
233         if b.started {
234                 b.updateNumWorkers <- struct{}{}
235         }
236 }
237
238 // NumWorkers returns the number of workers which are running to solve blocks.
239 //
240 // This function is safe for concurrent access.
241 func (b *BlockProposer) NumWorkers() int32 {
242         b.Lock()
243         defer b.Unlock()
244
245         return int32(b.numWorkers)
246 }
247
248 // NewBlockProposer returns a new instance of a block proposer for the provided configuration.
249 // Use Start to begin the proposal process.  See the documentation for BlockProposer
250 // type for more details.
251 func NewBlockProposer(c *protocol.Chain, accountManager *account.Manager, txPool *protocol.TxPool, dispatcher *event.Dispatcher) *BlockProposer {
252         return &BlockProposer{
253                 chain:            c,
254                 accountManager:   accountManager,
255                 txPool:           txPool,
256                 numWorkers:       defaultNumWorkers,
257                 updateNumWorkers: make(chan struct{}),
258                 eventDispatcher:  dispatcher,
259         }
260 }