OSDN Git Service

Merge pull request #1759 from Bytom/security
[bytom/bytom.git] / netsync / block_keeper.go
1 package netsync
2
3 import (
4         "container/list"
5         "time"
6
7         log "github.com/sirupsen/logrus"
8
9         "github.com/bytom/consensus"
10         "github.com/bytom/errors"
11         "github.com/bytom/mining/tensority"
12         "github.com/bytom/p2p/security"
13         "github.com/bytom/protocol/bc"
14         "github.com/bytom/protocol/bc/types"
15 )
16
17 const (
18         syncCycle            = 5 * time.Second
19         blockProcessChSize   = 1024
20         blocksProcessChSize  = 128
21         headersProcessChSize = 1024
22 )
23
24 var (
25         maxBlockPerMsg        = uint64(128)
26         maxBlockHeadersPerMsg = uint64(2048)
27         syncTimeout           = 30 * time.Second
28
29         errAppendHeaders  = errors.New("fail to append list due to order dismatch")
30         errRequestTimeout = errors.New("request timeout")
31         errPeerDropped    = errors.New("Peer dropped")
32         errPeerMisbehave  = errors.New("peer is misbehave")
33         ErrPeerMisbehave  = errors.New("peer is misbehave")
34 )
35
36 type blockMsg struct {
37         block  *types.Block
38         peerID string
39 }
40
41 type blocksMsg struct {
42         blocks []*types.Block
43         peerID string
44 }
45
46 type headersMsg struct {
47         headers []*types.BlockHeader
48         peerID  string
49 }
50
51 type blockKeeper struct {
52         chain Chain
53         peers *peerSet
54
55         syncPeer         *peer
56         blockProcessCh   chan *blockMsg
57         blocksProcessCh  chan *blocksMsg
58         headersProcessCh chan *headersMsg
59
60         headerList *list.List
61 }
62
63 func newBlockKeeper(chain Chain, peers *peerSet) *blockKeeper {
64         bk := &blockKeeper{
65                 chain:            chain,
66                 peers:            peers,
67                 blockProcessCh:   make(chan *blockMsg, blockProcessChSize),
68                 blocksProcessCh:  make(chan *blocksMsg, blocksProcessChSize),
69                 headersProcessCh: make(chan *headersMsg, headersProcessChSize),
70                 headerList:       list.New(),
71         }
72         bk.resetHeaderState()
73         go bk.syncWorker()
74         return bk
75 }
76
77 func (bk *blockKeeper) appendHeaderList(headers []*types.BlockHeader) error {
78         for _, header := range headers {
79                 prevHeader := bk.headerList.Back().Value.(*types.BlockHeader)
80                 if prevHeader.Hash() != header.PreviousBlockHash {
81                         return errAppendHeaders
82                 }
83                 bk.headerList.PushBack(header)
84         }
85         return nil
86 }
87
88 func (bk *blockKeeper) blockLocator() []*bc.Hash {
89         header := bk.chain.BestBlockHeader()
90         locator := []*bc.Hash{}
91
92         step := uint64(1)
93         for header != nil {
94                 headerHash := header.Hash()
95                 locator = append(locator, &headerHash)
96                 if header.Height == 0 {
97                         break
98                 }
99
100                 var err error
101                 if header.Height < step {
102                         header, err = bk.chain.GetHeaderByHeight(0)
103                 } else {
104                         header, err = bk.chain.GetHeaderByHeight(header.Height - step)
105                 }
106                 if err != nil {
107                         log.WithFields(log.Fields{"module": logModule, "err": err}).Error("blockKeeper fail on get blockLocator")
108                         break
109                 }
110
111                 if len(locator) >= 9 {
112                         step *= 2
113                 }
114         }
115         return locator
116 }
117
118 func (bk *blockKeeper) fastBlockSync(checkPoint *consensus.Checkpoint) error {
119         bk.resetHeaderState()
120         lastHeader := bk.headerList.Back().Value.(*types.BlockHeader)
121         for ; lastHeader.Hash() != checkPoint.Hash; lastHeader = bk.headerList.Back().Value.(*types.BlockHeader) {
122                 if lastHeader.Height >= checkPoint.Height {
123                         return errors.Wrap(errPeerMisbehave, "peer is not in the checkpoint branch")
124                 }
125
126                 lastHash := lastHeader.Hash()
127                 headers, err := bk.requireHeaders([]*bc.Hash{&lastHash}, &checkPoint.Hash)
128                 if err != nil {
129                         return err
130                 }
131
132                 if len(headers) == 0 {
133                         return errors.Wrap(errPeerMisbehave, "requireHeaders return empty list")
134                 }
135
136                 if err := bk.appendHeaderList(headers); err != nil {
137                         return err
138                 }
139         }
140
141         fastHeader := bk.headerList.Front()
142         for bk.chain.BestBlockHeight() < checkPoint.Height {
143                 locator := bk.blockLocator()
144                 blocks, err := bk.requireBlocks(locator, &checkPoint.Hash)
145                 if err != nil {
146                         return err
147                 }
148
149                 if len(blocks) == 0 {
150                         return errors.Wrap(errPeerMisbehave, "requireBlocks return empty list")
151                 }
152
153                 for _, block := range blocks {
154                         if fastHeader = fastHeader.Next(); fastHeader == nil {
155                                 return errors.New("get block than is higher than checkpoint")
156                         }
157
158                         blockHash := block.Hash()
159                         if blockHash != fastHeader.Value.(*types.BlockHeader).Hash() {
160                                 return errPeerMisbehave
161                         }
162
163                         seed, err := bk.chain.CalcNextSeed(&block.PreviousBlockHash)
164                         if err != nil {
165                                 return errors.Wrap(err, "fail on fastBlockSync calculate seed")
166                         }
167
168                         tensority.AIHash.AddCache(&blockHash, seed, &bc.Hash{})
169                         _, err = bk.chain.ProcessBlock(block)
170                         tensority.AIHash.RemoveCache(&blockHash, seed)
171                         if err != nil {
172                                 return errors.Wrap(err, "fail on fastBlockSync process block")
173                         }
174                 }
175         }
176         return nil
177 }
178
179 func (bk *blockKeeper) locateBlocks(locator []*bc.Hash, stopHash *bc.Hash) ([]*types.Block, error) {
180         headers, err := bk.locateHeaders(locator, stopHash)
181         if err != nil {
182                 return nil, err
183         }
184
185         blocks := []*types.Block{}
186         for i, header := range headers {
187                 if uint64(i) >= maxBlockPerMsg {
188                         break
189                 }
190
191                 headerHash := header.Hash()
192                 block, err := bk.chain.GetBlockByHash(&headerHash)
193                 if err != nil {
194                         return nil, err
195                 }
196
197                 blocks = append(blocks, block)
198         }
199         return blocks, nil
200 }
201
202 func (bk *blockKeeper) locateHeaders(locator []*bc.Hash, stopHash *bc.Hash) ([]*types.BlockHeader, error) {
203         stopHeader, err := bk.chain.GetHeaderByHash(stopHash)
204         if err != nil {
205                 return nil, err
206         }
207
208         startHeader, err := bk.chain.GetHeaderByHeight(0)
209         if err != nil {
210                 return nil, err
211         }
212
213         for _, hash := range locator {
214                 header, err := bk.chain.GetHeaderByHash(hash)
215                 if err == nil && bk.chain.InMainChain(header.Hash()) {
216                         startHeader = header
217                         break
218                 }
219         }
220
221         totalHeaders := stopHeader.Height - startHeader.Height
222         if totalHeaders > maxBlockHeadersPerMsg {
223                 totalHeaders = maxBlockHeadersPerMsg
224         }
225
226         headers := []*types.BlockHeader{}
227         for i := uint64(1); i <= totalHeaders; i++ {
228                 header, err := bk.chain.GetHeaderByHeight(startHeader.Height + i)
229                 if err != nil {
230                         return nil, err
231                 }
232
233                 headers = append(headers, header)
234         }
235         return headers, nil
236 }
237
238 func (bk *blockKeeper) nextCheckpoint() *consensus.Checkpoint {
239         height := bk.chain.BestBlockHeader().Height
240         checkpoints := consensus.ActiveNetParams.Checkpoints
241         if len(checkpoints) == 0 || height >= checkpoints[len(checkpoints)-1].Height {
242                 return nil
243         }
244
245         nextCheckpoint := &checkpoints[len(checkpoints)-1]
246         for i := len(checkpoints) - 2; i >= 0; i-- {
247                 if height >= checkpoints[i].Height {
248                         break
249                 }
250                 nextCheckpoint = &checkpoints[i]
251         }
252         return nextCheckpoint
253 }
254
255 func (bk *blockKeeper) processBlock(peerID string, block *types.Block) {
256         bk.blockProcessCh <- &blockMsg{block: block, peerID: peerID}
257 }
258
259 func (bk *blockKeeper) processBlocks(peerID string, blocks []*types.Block) {
260         bk.blocksProcessCh <- &blocksMsg{blocks: blocks, peerID: peerID}
261 }
262
263 func (bk *blockKeeper) processHeaders(peerID string, headers []*types.BlockHeader) {
264         bk.headersProcessCh <- &headersMsg{headers: headers, peerID: peerID}
265 }
266
267 func (bk *blockKeeper) regularBlockSync(wantHeight uint64) error {
268         i := bk.chain.BestBlockHeight() + 1
269         for i <= wantHeight {
270                 block, err := bk.requireBlock(i)
271                 if err != nil {
272                         return err
273                 }
274
275                 isOrphan, err := bk.chain.ProcessBlock(block)
276                 if err != nil {
277                         return err
278                 }
279
280                 if isOrphan {
281                         i--
282                         continue
283                 }
284                 i = bk.chain.BestBlockHeight() + 1
285         }
286         return nil
287 }
288
289 func (bk *blockKeeper) requireBlock(height uint64) (*types.Block, error) {
290         if ok := bk.syncPeer.getBlockByHeight(height); !ok {
291                 return nil, errPeerDropped
292         }
293
294         timeout := time.NewTimer(syncTimeout)
295         defer timeout.Stop()
296
297         for {
298                 select {
299                 case msg := <-bk.blockProcessCh:
300                         if msg.peerID != bk.syncPeer.ID() {
301                                 continue
302                         }
303                         if msg.block.Height != height {
304                                 continue
305                         }
306                         return msg.block, nil
307                 case <-timeout.C:
308                         return nil, errors.Wrap(errRequestTimeout, "requireBlock")
309                 }
310         }
311 }
312
313 func (bk *blockKeeper) requireBlocks(locator []*bc.Hash, stopHash *bc.Hash) ([]*types.Block, error) {
314         if ok := bk.syncPeer.getBlocks(locator, stopHash); !ok {
315                 return nil, errPeerDropped
316         }
317
318         timeout := time.NewTimer(syncTimeout)
319         defer timeout.Stop()
320
321         for {
322                 select {
323                 case msg := <-bk.blocksProcessCh:
324                         if msg.peerID != bk.syncPeer.ID() {
325                                 continue
326                         }
327                         return msg.blocks, nil
328                 case <-timeout.C:
329                         return nil, errors.Wrap(errRequestTimeout, "requireBlocks")
330                 }
331         }
332 }
333
334 func (bk *blockKeeper) requireHeaders(locator []*bc.Hash, stopHash *bc.Hash) ([]*types.BlockHeader, error) {
335         if ok := bk.syncPeer.getHeaders(locator, stopHash); !ok {
336                 return nil, errPeerDropped
337         }
338
339         timeout := time.NewTimer(syncTimeout)
340         defer timeout.Stop()
341
342         for {
343                 select {
344                 case msg := <-bk.headersProcessCh:
345                         if msg.peerID != bk.syncPeer.ID() {
346                                 continue
347                         }
348                         return msg.headers, nil
349                 case <-timeout.C:
350                         return nil, errors.Wrap(errRequestTimeout, "requireHeaders")
351                 }
352         }
353 }
354
355 // resetHeaderState sets the headers-first mode state to values appropriate for
356 // syncing from a new peer.
357 func (bk *blockKeeper) resetHeaderState() {
358         header := bk.chain.BestBlockHeader()
359         bk.headerList.Init()
360         if bk.nextCheckpoint() != nil {
361                 bk.headerList.PushBack(header)
362         }
363 }
364
365 func (bk *blockKeeper) startSync() bool {
366         checkPoint := bk.nextCheckpoint()
367         peer := bk.peers.bestPeer(consensus.SFFastSync | consensus.SFFullNode)
368         if peer != nil && checkPoint != nil && peer.Height() >= checkPoint.Height {
369                 bk.syncPeer = peer
370                 if err := bk.fastBlockSync(checkPoint); err != nil {
371                         log.WithFields(log.Fields{"module": logModule, "err": err}).Warning("fail on fastBlockSync")
372                         bk.peers.ProcessIllegal(peer.ID(), security.LevelMsgIllegal, err.Error())
373                         return false
374                 }
375                 return true
376         }
377
378         blockHeight := bk.chain.BestBlockHeight()
379         peer = bk.peers.bestPeer(consensus.SFFullNode)
380         if peer != nil && peer.Height() > blockHeight {
381                 bk.syncPeer = peer
382                 targetHeight := blockHeight + maxBlockPerMsg
383                 if targetHeight > peer.Height() {
384                         targetHeight = peer.Height()
385                 }
386
387                 if err := bk.regularBlockSync(targetHeight); err != nil {
388                         log.WithFields(log.Fields{"module": logModule, "err": err}).Warning("fail on regularBlockSync")
389                         bk.peers.ProcessIllegal(peer.ID(), security.LevelMsgIllegal, err.Error())
390                         return false
391                 }
392                 return true
393         }
394         return false
395 }
396
397 func (bk *blockKeeper) syncWorker() {
398         genesisBlock, err := bk.chain.GetBlockByHeight(0)
399         if err != nil {
400                 log.WithFields(log.Fields{"module": logModule, "err": err}).Error("fail on handleStatusRequestMsg get genesis")
401                 return
402         }
403         syncTicker := time.NewTicker(syncCycle)
404         defer syncTicker.Stop()
405
406         for {
407                 <-syncTicker.C
408                 if update := bk.startSync(); !update {
409                         continue
410                 }
411
412                 block, err := bk.chain.GetBlockByHeight(bk.chain.BestBlockHeight())
413                 if err != nil {
414                         log.WithFields(log.Fields{"module": logModule, "err": err}).Error("fail on syncWorker get best block")
415                 }
416
417                 if err := bk.peers.broadcastMinedBlock(block); err != nil {
418                         log.WithFields(log.Fields{"module": logModule, "err": err}).Error("fail on syncWorker broadcast new block")
419                 }
420
421                 if err = bk.peers.broadcastNewStatus(block, genesisBlock); err != nil {
422                         log.WithFields(log.Fields{"module": logModule, "err": err}).Error("fail on syncWorker broadcast new status")
423                 }
424         }
425 }