OSDN Git Service

add dpos consensus
[bytom/vapor.git] / netsync / block_keeper.go
1 package netsync
2
3 import (
4         "container/list"
5         "time"
6
7         log "github.com/sirupsen/logrus"
8
9         "github.com/vapor/chain"
10         "github.com/vapor/consensus"
11         "github.com/vapor/errors"
12         "github.com/vapor/mining/tensority"
13         "github.com/vapor/protocol/bc"
14         "github.com/vapor/protocol/bc/types"
15 )
16
17 const (
18         syncCycle            = 5 * time.Second
19         blockProcessChSize   = 1024
20         blocksProcessChSize  = 128
21         headersProcessChSize = 1024
22 )
23
24 var (
25         maxBlockPerMsg        = uint64(128)
26         maxBlockHeadersPerMsg = uint64(2048)
27         syncTimeout           = 30 * time.Second
28
29         errAppendHeaders  = errors.New("fail to append list due to order dismatch")
30         errRequestTimeout = errors.New("request timeout")
31         errPeerDropped    = errors.New("Peer dropped")
32         errPeerMisbehave  = errors.New("peer is misbehave")
33 )
34
35 type blockMsg struct {
36         block  *types.Block
37         peerID string
38 }
39
40 type blocksMsg struct {
41         blocks []*types.Block
42         peerID string
43 }
44
45 type headersMsg struct {
46         headers []*types.BlockHeader
47         peerID  string
48 }
49
50 type blockKeeper struct {
51         chain chain.Chain
52         peers *peerSet
53
54         syncPeer         *peer
55         blockProcessCh   chan *blockMsg
56         blocksProcessCh  chan *blocksMsg
57         headersProcessCh chan *headersMsg
58
59         headerList *list.List
60 }
61
62 func newBlockKeeper(chain chain.Chain, peers *peerSet) *blockKeeper {
63         bk := &blockKeeper{
64                 chain:            chain,
65                 peers:            peers,
66                 blockProcessCh:   make(chan *blockMsg, blockProcessChSize),
67                 blocksProcessCh:  make(chan *blocksMsg, blocksProcessChSize),
68                 headersProcessCh: make(chan *headersMsg, headersProcessChSize),
69                 headerList:       list.New(),
70         }
71         bk.resetHeaderState()
72         go bk.syncWorker()
73         return bk
74 }
75
76 func (bk *blockKeeper) appendHeaderList(headers []*types.BlockHeader) error {
77         for _, header := range headers {
78                 prevHeader := bk.headerList.Back().Value.(*types.BlockHeader)
79                 if prevHeader.Hash() != header.PreviousBlockHash {
80                         return errAppendHeaders
81                 }
82                 bk.headerList.PushBack(header)
83         }
84         return nil
85 }
86
87 func (bk *blockKeeper) blockLocator() []*bc.Hash {
88         header := bk.chain.BestBlockHeader()
89         locator := []*bc.Hash{}
90
91         step := uint64(1)
92         for header != nil {
93                 headerHash := header.Hash()
94                 locator = append(locator, &headerHash)
95                 if header.Height == 0 {
96                         break
97                 }
98
99                 var err error
100                 if header.Height < step {
101                         header, err = bk.chain.GetHeaderByHeight(0)
102                 } else {
103                         header, err = bk.chain.GetHeaderByHeight(header.Height - step)
104                 }
105                 if err != nil {
106                         log.WithFields(log.Fields{"module": logModule, "err": err}).Error("blockKeeper fail on get blockLocator")
107                         break
108                 }
109
110                 if len(locator) >= 9 {
111                         step *= 2
112                 }
113         }
114         return locator
115 }
116
117 func (bk *blockKeeper) fastBlockSync(checkPoint *consensus.Checkpoint) error {
118         bk.resetHeaderState()
119         lastHeader := bk.headerList.Back().Value.(*types.BlockHeader)
120         for ; lastHeader.Hash() != checkPoint.Hash; lastHeader = bk.headerList.Back().Value.(*types.BlockHeader) {
121                 if lastHeader.Height >= checkPoint.Height {
122                         return errors.Wrap(errPeerMisbehave, "peer is not in the checkpoint branch")
123                 }
124
125                 lastHash := lastHeader.Hash()
126                 headers, err := bk.requireHeaders([]*bc.Hash{&lastHash}, &checkPoint.Hash)
127                 if err != nil {
128                         return err
129                 }
130
131                 if len(headers) == 0 {
132                         return errors.Wrap(errPeerMisbehave, "requireHeaders return empty list")
133                 }
134
135                 if err := bk.appendHeaderList(headers); err != nil {
136                         return err
137                 }
138         }
139
140         fastHeader := bk.headerList.Front()
141         for bk.chain.BestBlockHeight() < checkPoint.Height {
142                 locator := bk.blockLocator()
143                 blocks, err := bk.requireBlocks(locator, &checkPoint.Hash)
144                 if err != nil {
145                         return err
146                 }
147
148                 if len(blocks) == 0 {
149                         return errors.Wrap(errPeerMisbehave, "requireBlocks return empty list")
150                 }
151
152                 for _, block := range blocks {
153                         if fastHeader = fastHeader.Next(); fastHeader == nil {
154                                 return errors.New("get block than is higher than checkpoint")
155                         }
156
157                         blockHash := block.Hash()
158                         if blockHash != fastHeader.Value.(*types.BlockHeader).Hash() {
159                                 return errPeerMisbehave
160                         }
161
162                         seed, err := bk.chain.CalcNextSeed(&block.PreviousBlockHash)
163                         if err != nil {
164                                 return errors.Wrap(err, "fail on fastBlockSync calculate seed")
165                         }
166
167                         tensority.AIHash.AddCache(&blockHash, seed, &bc.Hash{})
168                         _, err = bk.chain.ProcessBlock(block)
169                         tensority.AIHash.RemoveCache(&blockHash, seed)
170                         if err != nil {
171                                 return errors.Wrap(err, "fail on fastBlockSync process block")
172                         }
173                 }
174         }
175         return nil
176 }
177
178 func (bk *blockKeeper) locateBlocks(locator []*bc.Hash, stopHash *bc.Hash) ([]*types.Block, error) {
179         headers, err := bk.locateHeaders(locator, stopHash)
180         if err != nil {
181                 return nil, err
182         }
183
184         blocks := []*types.Block{}
185         for i, header := range headers {
186                 if uint64(i) >= maxBlockPerMsg {
187                         break
188                 }
189
190                 headerHash := header.Hash()
191                 block, err := bk.chain.GetBlockByHash(&headerHash)
192                 if err != nil {
193                         return nil, err
194                 }
195
196                 blocks = append(blocks, block)
197         }
198         return blocks, nil
199 }
200
201 func (bk *blockKeeper) locateHeaders(locator []*bc.Hash, stopHash *bc.Hash) ([]*types.BlockHeader, error) {
202         stopHeader, err := bk.chain.GetHeaderByHash(stopHash)
203         if err != nil {
204                 return nil, err
205         }
206
207         startHeader, err := bk.chain.GetHeaderByHeight(0)
208         if err != nil {
209                 return nil, err
210         }
211
212         for _, hash := range locator {
213                 header, err := bk.chain.GetHeaderByHash(hash)
214                 if err == nil && bk.chain.InMainChain(header.Hash()) {
215                         startHeader = header
216                         break
217                 }
218         }
219
220         totalHeaders := stopHeader.Height - startHeader.Height
221         if totalHeaders > maxBlockHeadersPerMsg {
222                 totalHeaders = maxBlockHeadersPerMsg
223         }
224
225         headers := []*types.BlockHeader{}
226         for i := uint64(1); i <= totalHeaders; i++ {
227                 header, err := bk.chain.GetHeaderByHeight(startHeader.Height + i)
228                 if err != nil {
229                         return nil, err
230                 }
231
232                 headers = append(headers, header)
233         }
234         return headers, nil
235 }
236
237 func (bk *blockKeeper) nextCheckpoint() *consensus.Checkpoint {
238         height := bk.chain.BestBlockHeader().Height
239         checkpoints := consensus.ActiveNetParams.Checkpoints
240         if len(checkpoints) == 0 || height >= checkpoints[len(checkpoints)-1].Height {
241                 return nil
242         }
243
244         nextCheckpoint := &checkpoints[len(checkpoints)-1]
245         for i := len(checkpoints) - 2; i >= 0; i-- {
246                 if height >= checkpoints[i].Height {
247                         break
248                 }
249                 nextCheckpoint = &checkpoints[i]
250         }
251         return nextCheckpoint
252 }
253
254 func (bk *blockKeeper) processBlock(peerID string, block *types.Block) {
255         bk.blockProcessCh <- &blockMsg{block: block, peerID: peerID}
256 }
257
258 func (bk *blockKeeper) processBlocks(peerID string, blocks []*types.Block) {
259         bk.blocksProcessCh <- &blocksMsg{blocks: blocks, peerID: peerID}
260 }
261
262 func (bk *blockKeeper) processHeaders(peerID string, headers []*types.BlockHeader) {
263         bk.headersProcessCh <- &headersMsg{headers: headers, peerID: peerID}
264 }
265
266 func (bk *blockKeeper) regularBlockSync(wantHeight uint64) error {
267         i := bk.chain.BestBlockHeight() + 1
268         for i <= wantHeight {
269                 block, err := bk.requireBlock(i)
270                 if err != nil {
271                         return err
272                 }
273
274                 isOrphan, err := bk.chain.ProcessBlock(block)
275                 if err != nil {
276                         return err
277                 }
278
279                 if isOrphan {
280                         i--
281                         continue
282                 }
283                 i = bk.chain.BestBlockHeight() + 1
284         }
285         return nil
286 }
287
288 func (bk *blockKeeper) requireBlock(height uint64) (*types.Block, error) {
289         if ok := bk.syncPeer.getBlockByHeight(height); !ok {
290                 return nil, errPeerDropped
291         }
292
293         waitTicker := time.NewTimer(syncTimeout)
294         for {
295                 select {
296                 case msg := <-bk.blockProcessCh:
297                         if msg.peerID != bk.syncPeer.ID() {
298                                 continue
299                         }
300                         if msg.block.Height != height {
301                                 continue
302                         }
303                         return msg.block, nil
304                 case <-waitTicker.C:
305                         return nil, errors.Wrap(errRequestTimeout, "requireBlock")
306                 }
307         }
308 }
309
310 func (bk *blockKeeper) requireBlocks(locator []*bc.Hash, stopHash *bc.Hash) ([]*types.Block, error) {
311         if ok := bk.syncPeer.getBlocks(locator, stopHash); !ok {
312                 return nil, errPeerDropped
313         }
314
315         waitTicker := time.NewTimer(syncTimeout)
316         for {
317                 select {
318                 case msg := <-bk.blocksProcessCh:
319                         if msg.peerID != bk.syncPeer.ID() {
320                                 continue
321                         }
322                         return msg.blocks, nil
323                 case <-waitTicker.C:
324                         return nil, errors.Wrap(errRequestTimeout, "requireBlocks")
325                 }
326         }
327 }
328
329 func (bk *blockKeeper) requireHeaders(locator []*bc.Hash, stopHash *bc.Hash) ([]*types.BlockHeader, error) {
330         if ok := bk.syncPeer.getHeaders(locator, stopHash); !ok {
331                 return nil, errPeerDropped
332         }
333
334         waitTicker := time.NewTimer(syncTimeout)
335         for {
336                 select {
337                 case msg := <-bk.headersProcessCh:
338                         if msg.peerID != bk.syncPeer.ID() {
339                                 continue
340                         }
341                         return msg.headers, nil
342                 case <-waitTicker.C:
343                         return nil, errors.Wrap(errRequestTimeout, "requireHeaders")
344                 }
345         }
346 }
347
348 // resetHeaderState sets the headers-first mode state to values appropriate for
349 // syncing from a new peer.
350 func (bk *blockKeeper) resetHeaderState() {
351         header := bk.chain.BestBlockHeader()
352         bk.headerList.Init()
353         if bk.nextCheckpoint() != nil {
354                 bk.headerList.PushBack(header)
355         }
356 }
357
358 func (bk *blockKeeper) startSync() bool {
359         checkPoint := bk.nextCheckpoint()
360         peer := bk.peers.bestPeer(consensus.SFFastSync | consensus.SFFullNode)
361         if peer != nil && checkPoint != nil && peer.Height() >= checkPoint.Height {
362                 bk.syncPeer = peer
363                 if err := bk.fastBlockSync(checkPoint); err != nil {
364                         log.WithFields(log.Fields{"module": logModule, "err": err}).Warning("fail on fastBlockSync")
365                         bk.peers.errorHandler(peer.ID(), err)
366                         return false
367                 }
368                 return true
369         }
370
371         blockHeight := bk.chain.BestBlockHeight()
372         peer = bk.peers.bestPeer(consensus.SFFullNode)
373         if peer != nil && peer.Height() > blockHeight {
374                 bk.syncPeer = peer
375                 targetHeight := blockHeight + maxBlockPerMsg
376                 if targetHeight > peer.Height() {
377                         targetHeight = peer.Height()
378                 }
379
380                 if err := bk.regularBlockSync(targetHeight); err != nil {
381                         log.WithFields(log.Fields{"module": logModule, "err": err}).Warning("fail on regularBlockSync")
382                         bk.peers.errorHandler(peer.ID(), err)
383                         return false
384                 }
385                 return true
386         }
387         return false
388 }
389
390 func (bk *blockKeeper) syncWorker() {
391         genesisBlock, err := bk.chain.GetBlockByHeight(0)
392         if err != nil {
393                 log.WithFields(log.Fields{"module": logModule, "err": err}).Error("fail on handleStatusRequestMsg get genesis")
394                 return
395         }
396         syncTicker := time.NewTicker(syncCycle)
397         for {
398                 <-syncTicker.C
399                 if update := bk.startSync(); !update {
400                         continue
401                 }
402
403                 block, err := bk.chain.GetBlockByHeight(bk.chain.BestBlockHeight())
404                 if err != nil {
405                         log.WithFields(log.Fields{"module": logModule, "err": err}).Error("fail on syncWorker get best block")
406                 }
407
408                 if err := bk.peers.broadcastMinedBlock(block); err != nil {
409                         log.WithFields(log.Fields{"module": logModule, "err": err}).Error("fail on syncWorker broadcast new block")
410                 }
411
412                 if err = bk.peers.broadcastNewStatus(block, genesisBlock); err != nil {
413                         log.WithFields(log.Fields{"module": logModule, "err": err}).Error("fail on syncWorker broadcast new status")
414                 }
415         }
416 }