OSDN Git Service

Add MOV subprotol framework support
[bytom/vapor.git] / proposal / blockproposer / blockproposer.go
1 package blockproposer
2
3 import (
4         "encoding/hex"
5         "sort"
6         "sync"
7         "time"
8
9         log "github.com/sirupsen/logrus"
10
11         "github.com/vapor/account"
12         "github.com/vapor/config"
13         "github.com/vapor/consensus"
14         "github.com/vapor/event"
15         "github.com/vapor/proposal"
16         "github.com/vapor/protocol"
17         "github.com/vapor/protocol/bc/types"
18 )
19
20 const (
21         logModule = "blockproposer"
22 )
23
24 type Preprocessor interface {
25         BeforeProposalBlock(txs []*types.Tx) ([]*types.Tx, error)
26 }
27
28 // BlockProposer propose several block in specified time range
29 type BlockProposer struct {
30         sync.Mutex
31         chain           *protocol.Chain
32         accountManager  *account.Manager
33         txPool          *protocol.TxPool
34         Preprocessors   []Preprocessor
35         started         bool
36         quit            chan struct{}
37         eventDispatcher *event.Dispatcher
38 }
39
40 // generateBlocks is a worker that is controlled by the proposeWorkerController.
41 // It is self contained in that it creates block templates and attempts to solve
42 // them while detecting when it is performing stale work and reacting
43 // accordingly by generating a new block template.  When a block is verified, it
44 // is submitted.
45 //
46 // It must be run as a goroutine.
47 func (b *BlockProposer) generateBlocks() {
48         xpub := config.CommonConfig.PrivateKey().XPub()
49         xpubStr := hex.EncodeToString(xpub[:])
50         ticker := time.NewTicker(time.Duration(consensus.ActiveNetParams.BlockTimeInterval) * time.Millisecond)
51         defer ticker.Stop()
52
53         for {
54                 select {
55                 case <-b.quit:
56                         return
57                 case <-ticker.C:
58                 }
59
60                 bestBlockHeader := b.chain.BestBlockHeader()
61                 bestBlockHash := bestBlockHeader.Hash()
62
63                 now := uint64(time.Now().UnixNano() / 1e6)
64                 base := now
65                 if now < bestBlockHeader.Timestamp {
66                         base = bestBlockHeader.Timestamp
67                 }
68                 minTimeToNextBlock := consensus.ActiveNetParams.BlockTimeInterval - base%consensus.ActiveNetParams.BlockTimeInterval
69                 nextBlockTime := base + minTimeToNextBlock
70                 if (nextBlockTime - now) < consensus.ActiveNetParams.BlockTimeInterval/10 {
71                         nextBlockTime += consensus.ActiveNetParams.BlockTimeInterval
72                 }
73
74                 blocker, err := b.chain.GetBlocker(&bestBlockHash, nextBlockTime)
75                 if err != nil {
76                         log.WithFields(log.Fields{"module": logModule, "error": err, "pubKey": xpubStr}).Error("fail on check is next blocker")
77                         continue
78                 }
79
80                 if xpubStr != blocker {
81                         continue
82                 }
83
84                 packageTxs := []*types.Tx{}
85                 txs := b.txPool.GetTransactions()
86                 sort.Sort(byTime(txs))
87                 for _, txDesc := range txs {
88                         packageTxs = append(packageTxs, txDesc.Tx)
89                 }
90
91                 for i, p := range b.Preprocessors {
92                         txs, err := p.BeforeProposalBlock(packageTxs)
93                         if err != nil {
94                                 log.WithFields(log.Fields{"module": logModule, "index": i, "error": err}).Error("failed on sub protocol txs package")
95                                 continue
96                         }
97                         packageTxs = append(packageTxs, txs...)
98                 }
99
100                 block, err := proposal.NewBlockTemplate(b.chain, b.txPool, b.accountManager, packageTxs, nextBlockTime)
101                 if err != nil {
102                         log.WithFields(log.Fields{"module": logModule, "error": err}).Error("failed on create NewBlockTemplate")
103                         continue
104                 }
105
106                 isOrphan, err := b.chain.ProcessBlock(block)
107                 if err != nil {
108                         log.WithFields(log.Fields{"module": logModule, "height": block.BlockHeader.Height, "error": err}).Error("proposer fail on ProcessBlock")
109                         continue
110                 }
111
112                 log.WithFields(log.Fields{"module": logModule, "height": block.BlockHeader.Height, "isOrphan": isOrphan, "tx": len(block.Transactions)}).Info("proposer processed block")
113                 // Broadcast the block and announce chain insertion event
114                 if err = b.eventDispatcher.Post(event.NewProposedBlockEvent{Block: *block}); err != nil {
115                         log.WithFields(log.Fields{"module": logModule, "height": block.BlockHeader.Height, "error": err}).Error("proposer fail on post block")
116                 }
117         }
118 }
119
120 // Start begins the block propose process as well as the speed monitor used to
121 // track hashing metrics.  Calling this function when the block proposer has
122 // already been started will have no effect.
123 //
124 // This function is safe for concurrent access.
125 func (b *BlockProposer) Start() {
126         b.Lock()
127         defer b.Unlock()
128
129         // Nothing to do if the miner is already running
130         if b.started {
131                 return
132         }
133
134         b.quit = make(chan struct{})
135         go b.generateBlocks()
136
137         b.started = true
138         log.Infof("block proposer started")
139 }
140
141 // Stop gracefully stops the proposal process by signalling all workers, and the
142 // speed monitor to quit.  Calling this function when the block proposer has not
143 // already been started will have no effect.
144 //
145 // This function is safe for concurrent access.
146 func (b *BlockProposer) Stop() {
147         b.Lock()
148         defer b.Unlock()
149
150         // Nothing to do if the miner is not currently running
151         if !b.started {
152                 return
153         }
154
155         close(b.quit)
156         b.started = false
157         log.Info("block proposer stopped")
158 }
159
160 // IsProposing returns whether the block proposer has been started.
161 //
162 // This function is safe for concurrent access.
163 func (b *BlockProposer) IsProposing() bool {
164         b.Lock()
165         defer b.Unlock()
166
167         return b.started
168 }
169
170 // NewBlockProposer returns a new instance of a block proposer for the provided configuration.
171 // Use Start to begin the proposal process.  See the documentation for BlockProposer
172 // type for more details.
173 func NewBlockProposer(c *protocol.Chain, accountManager *account.Manager, txPool *protocol.TxPool, preprocessors []Preprocessor, dispatcher *event.Dispatcher) *BlockProposer {
174         return &BlockProposer{
175                 chain:           c,
176                 accountManager:  accountManager,
177                 txPool:          txPool,
178                 Preprocessors:   preprocessors,
179                 eventDispatcher: dispatcher,
180         }
181 }