OSDN Git Service

Hulk did something
[bytom/vapor.git] / netsync / block_keeper.go
1 package netsync
2
3 import (
4         "container/list"
5         "time"
6
7         log "github.com/sirupsen/logrus"
8
9         "github.com/vapor/consensus"
10         "github.com/vapor/errors"
11         "github.com/vapor/mining/tensority"
12         "github.com/vapor/protocol/bc"
13         "github.com/vapor/protocol/bc/types"
14 )
15
16 const (
17         syncCycle            = 5 * time.Second
18         blockProcessChSize   = 1024
19         blocksProcessChSize  = 128
20         headersProcessChSize = 1024
21 )
22
23 var (
24         maxBlockPerMsg        = uint64(128)
25         maxBlockHeadersPerMsg = uint64(2048)
26         syncTimeout           = 30 * time.Second
27
28         errAppendHeaders  = errors.New("fail to append list due to order dismatch")
29         errRequestTimeout = errors.New("request timeout")
30         errPeerDropped    = errors.New("Peer dropped")
31         errPeerMisbehave  = errors.New("peer is misbehave")
32 )
33
34 type blockMsg struct {
35         block  *types.Block
36         peerID string
37 }
38
39 type blocksMsg struct {
40         blocks []*types.Block
41         peerID string
42 }
43
44 type headersMsg struct {
45         headers []*types.BlockHeader
46         peerID  string
47 }
48
49 type blockKeeper struct {
50         chain Chain
51         peers *peerSet
52
53         syncPeer         *peer
54         blockProcessCh   chan *blockMsg
55         blocksProcessCh  chan *blocksMsg
56         headersProcessCh chan *headersMsg
57
58         headerList *list.List
59 }
60
61 func newBlockKeeper(chain Chain, peers *peerSet) *blockKeeper {
62         bk := &blockKeeper{
63                 chain:            chain,
64                 peers:            peers,
65                 blockProcessCh:   make(chan *blockMsg, blockProcessChSize),
66                 blocksProcessCh:  make(chan *blocksMsg, blocksProcessChSize),
67                 headersProcessCh: make(chan *headersMsg, headersProcessChSize),
68                 headerList:       list.New(),
69         }
70         bk.resetHeaderState()
71         go bk.syncWorker()
72         return bk
73 }
74
75 func (bk *blockKeeper) appendHeaderList(headers []*types.BlockHeader) error {
76         for _, header := range headers {
77                 prevHeader := bk.headerList.Back().Value.(*types.BlockHeader)
78                 if prevHeader.Hash() != header.PreviousBlockHash {
79                         return errAppendHeaders
80                 }
81                 bk.headerList.PushBack(header)
82         }
83         return nil
84 }
85
86 func (bk *blockKeeper) blockLocator() []*bc.Hash {
87         header := bk.chain.BestBlockHeader()
88         locator := []*bc.Hash{}
89
90         step := uint64(1)
91         for header != nil {
92                 headerHash := header.Hash()
93                 locator = append(locator, &headerHash)
94                 if header.Height == 0 {
95                         break
96                 }
97
98                 var err error
99                 if header.Height < step {
100                         header, err = bk.chain.GetHeaderByHeight(0)
101                 } else {
102                         header, err = bk.chain.GetHeaderByHeight(header.Height - step)
103                 }
104                 if err != nil {
105                         log.WithFields(log.Fields{"module": logModule, "err": err}).Error("blockKeeper fail on get blockLocator")
106                         break
107                 }
108
109                 if len(locator) >= 9 {
110                         step *= 2
111                 }
112         }
113         return locator
114 }
115
116 func (bk *blockKeeper) fastBlockSync(checkPoint *consensus.Checkpoint) error {
117         bk.resetHeaderState()
118         lastHeader := bk.headerList.Back().Value.(*types.BlockHeader)
119         for ; lastHeader.Hash() != checkPoint.Hash; lastHeader = bk.headerList.Back().Value.(*types.BlockHeader) {
120                 if lastHeader.Height >= checkPoint.Height {
121                         return errors.Wrap(errPeerMisbehave, "peer is not in the checkpoint branch")
122                 }
123
124                 lastHash := lastHeader.Hash()
125                 headers, err := bk.requireHeaders([]*bc.Hash{&lastHash}, &checkPoint.Hash)
126                 if err != nil {
127                         return err
128                 }
129
130                 if len(headers) == 0 {
131                         return errors.Wrap(errPeerMisbehave, "requireHeaders return empty list")
132                 }
133
134                 if err := bk.appendHeaderList(headers); err != nil {
135                         return err
136                 }
137         }
138
139         fastHeader := bk.headerList.Front()
140         for bk.chain.BestBlockHeight() < checkPoint.Height {
141                 locator := bk.blockLocator()
142                 blocks, err := bk.requireBlocks(locator, &checkPoint.Hash)
143                 if err != nil {
144                         return err
145                 }
146
147                 if len(blocks) == 0 {
148                         return errors.Wrap(errPeerMisbehave, "requireBlocks return empty list")
149                 }
150
151                 for _, block := range blocks {
152                         if fastHeader = fastHeader.Next(); fastHeader == nil {
153                                 return errors.New("get block than is higher than checkpoint")
154                         }
155
156                         blockHash := block.Hash()
157                         if blockHash != fastHeader.Value.(*types.BlockHeader).Hash() {
158                                 return errPeerMisbehave
159                         }
160
161                         seed, err := bk.chain.CalcNextSeed(&block.PreviousBlockHash)
162                         if err != nil {
163                                 return errors.Wrap(err, "fail on fastBlockSync calculate seed")
164                         }
165
166                         tensority.AIHash.AddCache(&blockHash, seed, &bc.Hash{})
167                         _, err = bk.chain.ProcessBlock(block)
168                         tensority.AIHash.RemoveCache(&blockHash, seed)
169                         if err != nil {
170                                 return errors.Wrap(err, "fail on fastBlockSync process block")
171                         }
172                 }
173         }
174         return nil
175 }
176
177 func (bk *blockKeeper) locateBlocks(locator []*bc.Hash, stopHash *bc.Hash) ([]*types.Block, error) {
178         headers, err := bk.locateHeaders(locator, stopHash)
179         if err != nil {
180                 return nil, err
181         }
182
183         blocks := []*types.Block{}
184         for i, header := range headers {
185                 if uint64(i) >= maxBlockPerMsg {
186                         break
187                 }
188
189                 headerHash := header.Hash()
190                 block, err := bk.chain.GetBlockByHash(&headerHash)
191                 if err != nil {
192                         return nil, err
193                 }
194
195                 blocks = append(blocks, block)
196         }
197         return blocks, nil
198 }
199
200 func (bk *blockKeeper) locateHeaders(locator []*bc.Hash, stopHash *bc.Hash) ([]*types.BlockHeader, error) {
201         stopHeader, err := bk.chain.GetHeaderByHash(stopHash)
202         if err != nil {
203                 return nil, err
204         }
205
206         startHeader, err := bk.chain.GetHeaderByHeight(0)
207         if err != nil {
208                 return nil, err
209         }
210
211         for _, hash := range locator {
212                 header, err := bk.chain.GetHeaderByHash(hash)
213                 if err == nil && bk.chain.InMainChain(header.Hash()) {
214                         startHeader = header
215                         break
216                 }
217         }
218
219         totalHeaders := stopHeader.Height - startHeader.Height
220         if totalHeaders > maxBlockHeadersPerMsg {
221                 totalHeaders = maxBlockHeadersPerMsg
222         }
223
224         headers := []*types.BlockHeader{}
225         for i := uint64(1); i <= totalHeaders; i++ {
226                 header, err := bk.chain.GetHeaderByHeight(startHeader.Height + i)
227                 if err != nil {
228                         return nil, err
229                 }
230
231                 headers = append(headers, header)
232         }
233         return headers, nil
234 }
235
236 func (bk *blockKeeper) nextCheckpoint() *consensus.Checkpoint {
237         height := bk.chain.BestBlockHeader().Height
238         checkpoints := consensus.ActiveNetParams.Checkpoints
239         if len(checkpoints) == 0 || height >= checkpoints[len(checkpoints)-1].Height {
240                 return nil
241         }
242
243         nextCheckpoint := &checkpoints[len(checkpoints)-1]
244         for i := len(checkpoints) - 2; i >= 0; i-- {
245                 if height >= checkpoints[i].Height {
246                         break
247                 }
248                 nextCheckpoint = &checkpoints[i]
249         }
250         return nextCheckpoint
251 }
252
253 func (bk *blockKeeper) processBlock(peerID string, block *types.Block) {
254         bk.blockProcessCh <- &blockMsg{block: block, peerID: peerID}
255 }
256
257 func (bk *blockKeeper) processBlocks(peerID string, blocks []*types.Block) {
258         bk.blocksProcessCh <- &blocksMsg{blocks: blocks, peerID: peerID}
259 }
260
261 func (bk *blockKeeper) processHeaders(peerID string, headers []*types.BlockHeader) {
262         bk.headersProcessCh <- &headersMsg{headers: headers, peerID: peerID}
263 }
264
265 func (bk *blockKeeper) regularBlockSync(wantHeight uint64) error {
266         i := bk.chain.BestBlockHeight() + 1
267         for i <= wantHeight {
268                 block, err := bk.requireBlock(i)
269                 if err != nil {
270                         return err
271                 }
272
273                 isOrphan, err := bk.chain.ProcessBlock(block)
274                 if err != nil {
275                         return err
276                 }
277
278                 if isOrphan {
279                         i--
280                         continue
281                 }
282                 i = bk.chain.BestBlockHeight() + 1
283         }
284         return nil
285 }
286
287 func (bk *blockKeeper) requireBlock(height uint64) (*types.Block, error) {
288         if ok := bk.syncPeer.getBlockByHeight(height); !ok {
289                 return nil, errPeerDropped
290         }
291
292         timeout := time.NewTimer(syncTimeout)
293         defer timeout.Stop()
294
295         for {
296                 select {
297                 case msg := <-bk.blockProcessCh:
298                         if msg.peerID != bk.syncPeer.ID() {
299                                 continue
300                         }
301                         if msg.block.Height != height {
302                                 continue
303                         }
304                         return msg.block, nil
305                 case <-timeout.C:
306                         return nil, errors.Wrap(errRequestTimeout, "requireBlock")
307                 }
308         }
309 }
310
311 func (bk *blockKeeper) requireBlocks(locator []*bc.Hash, stopHash *bc.Hash) ([]*types.Block, error) {
312         if ok := bk.syncPeer.getBlocks(locator, stopHash); !ok {
313                 return nil, errPeerDropped
314         }
315
316         timeout := time.NewTimer(syncTimeout)
317         defer timeout.Stop()
318
319         for {
320                 select {
321                 case msg := <-bk.blocksProcessCh:
322                         if msg.peerID != bk.syncPeer.ID() {
323                                 continue
324                         }
325                         return msg.blocks, nil
326                 case <-timeout.C:
327                         return nil, errors.Wrap(errRequestTimeout, "requireBlocks")
328                 }
329         }
330 }
331
332 func (bk *blockKeeper) requireHeaders(locator []*bc.Hash, stopHash *bc.Hash) ([]*types.BlockHeader, error) {
333         if ok := bk.syncPeer.getHeaders(locator, stopHash); !ok {
334                 return nil, errPeerDropped
335         }
336
337         timeout := time.NewTimer(syncTimeout)
338         defer timeout.Stop()
339
340         for {
341                 select {
342                 case msg := <-bk.headersProcessCh:
343                         if msg.peerID != bk.syncPeer.ID() {
344                                 continue
345                         }
346                         return msg.headers, nil
347                 case <-timeout.C:
348                         return nil, errors.Wrap(errRequestTimeout, "requireHeaders")
349                 }
350         }
351 }
352
353 // resetHeaderState sets the headers-first mode state to values appropriate for
354 // syncing from a new peer.
355 func (bk *blockKeeper) resetHeaderState() {
356         header := bk.chain.BestBlockHeader()
357         bk.headerList.Init()
358         if bk.nextCheckpoint() != nil {
359                 bk.headerList.PushBack(header)
360         }
361 }
362
363 func (bk *blockKeeper) startSync() bool {
364         checkPoint := bk.nextCheckpoint()
365         peer := bk.peers.bestPeer(consensus.SFFastSync | consensus.SFFullNode)
366         if peer != nil && checkPoint != nil && peer.Height() >= checkPoint.Height {
367                 bk.syncPeer = peer
368                 if err := bk.fastBlockSync(checkPoint); err != nil {
369                         log.WithFields(log.Fields{"module": logModule, "err": err}).Warning("fail on fastBlockSync")
370                         bk.peers.errorHandler(peer.ID(), err)
371                         return false
372                 }
373                 return true
374         }
375
376         blockHeight := bk.chain.BestBlockHeight()
377         peer = bk.peers.bestPeer(consensus.SFFullNode)
378         if peer != nil && peer.Height() > blockHeight {
379                 bk.syncPeer = peer
380                 targetHeight := blockHeight + maxBlockPerMsg
381                 if targetHeight > peer.Height() {
382                         targetHeight = peer.Height()
383                 }
384
385                 if err := bk.regularBlockSync(targetHeight); err != nil {
386                         log.WithFields(log.Fields{"module": logModule, "err": err}).Warning("fail on regularBlockSync")
387                         bk.peers.errorHandler(peer.ID(), err)
388                         return false
389                 }
390                 return true
391         }
392         return false
393 }
394
395 func (bk *blockKeeper) syncWorker() {
396         genesisBlock, err := bk.chain.GetBlockByHeight(0)
397         if err != nil {
398                 log.WithFields(log.Fields{"module": logModule, "err": err}).Error("fail on handleStatusRequestMsg get genesis")
399                 return
400         }
401         syncTicker := time.NewTicker(syncCycle)
402         defer syncTicker.Stop()
403
404         for {
405                 <-syncTicker.C
406                 if update := bk.startSync(); !update {
407                         continue
408                 }
409
410                 block, err := bk.chain.GetBlockByHeight(bk.chain.BestBlockHeight())
411                 if err != nil {
412                         log.WithFields(log.Fields{"module": logModule, "err": err}).Error("fail on syncWorker get best block")
413                 }
414
415                 if err := bk.peers.broadcastMinedBlock(block); err != nil {
416                         log.WithFields(log.Fields{"module": logModule, "err": err}).Error("fail on syncWorker broadcast new block")
417                 }
418
419                 if err = bk.peers.broadcastNewStatus(block, genesisBlock); err != nil {
420                         log.WithFields(log.Fields{"module": logModule, "err": err}).Error("fail on syncWorker broadcast new status")
421                 }
422         }
423 }