OSDN Git Service

Add p2p security module (#143)
[bytom/vapor.git] / netsync / chainmgr / block_keeper.go
1 package chainmgr
2
3 import (
4         "container/list"
5         "time"
6
7         log "github.com/sirupsen/logrus"
8
9         "github.com/vapor/consensus"
10         "github.com/vapor/errors"
11         "github.com/vapor/netsync/peers"
12         "github.com/vapor/p2p/security"
13         "github.com/vapor/protocol/bc"
14         "github.com/vapor/protocol/bc/types"
15 )
16
17 const (
18         syncCycle            = 5 * time.Second
19         blockProcessChSize   = 1024
20         blocksProcessChSize  = 128
21         headersProcessChSize = 1024
22 )
23
24 var (
25         maxBlockPerMsg        = uint64(128)
26         maxBlockHeadersPerMsg = uint64(2048)
27         syncTimeout           = 30 * time.Second
28
29         errAppendHeaders  = errors.New("fail to append list due to order dismatch")
30         errRequestTimeout = errors.New("request timeout")
31         errPeerDropped    = errors.New("Peer dropped")
32 )
33
34 type blockMsg struct {
35         block  *types.Block
36         peerID string
37 }
38
39 type blocksMsg struct {
40         blocks []*types.Block
41         peerID string
42 }
43
44 type headersMsg struct {
45         headers []*types.BlockHeader
46         peerID  string
47 }
48
49 type blockKeeper struct {
50         chain Chain
51         peers *peers.PeerSet
52
53         syncPeer         *peers.Peer
54         blockProcessCh   chan *blockMsg
55         blocksProcessCh  chan *blocksMsg
56         headersProcessCh chan *headersMsg
57
58         headerList *list.List
59 }
60
61 func newBlockKeeper(chain Chain, peers *peers.PeerSet) *blockKeeper {
62         bk := &blockKeeper{
63                 chain:            chain,
64                 peers:            peers,
65                 blockProcessCh:   make(chan *blockMsg, blockProcessChSize),
66                 blocksProcessCh:  make(chan *blocksMsg, blocksProcessChSize),
67                 headersProcessCh: make(chan *headersMsg, headersProcessChSize),
68                 headerList:       list.New(),
69         }
70         bk.resetHeaderState()
71         go bk.syncWorker()
72         return bk
73 }
74
75 func (bk *blockKeeper) appendHeaderList(headers []*types.BlockHeader) error {
76         for _, header := range headers {
77                 prevHeader := bk.headerList.Back().Value.(*types.BlockHeader)
78                 if prevHeader.Hash() != header.PreviousBlockHash {
79                         return errAppendHeaders
80                 }
81                 bk.headerList.PushBack(header)
82         }
83         return nil
84 }
85
86 func (bk *blockKeeper) blockLocator() []*bc.Hash {
87         header := bk.chain.BestBlockHeader()
88         locator := []*bc.Hash{}
89
90         step := uint64(1)
91         for header != nil {
92                 headerHash := header.Hash()
93                 locator = append(locator, &headerHash)
94                 if header.Height == 0 {
95                         break
96                 }
97
98                 var err error
99                 if header.Height < step {
100                         header, err = bk.chain.GetHeaderByHeight(0)
101                 } else {
102                         header, err = bk.chain.GetHeaderByHeight(header.Height - step)
103                 }
104                 if err != nil {
105                         log.WithFields(log.Fields{"module": logModule, "err": err}).Error("blockKeeper fail on get blockLocator")
106                         break
107                 }
108
109                 if len(locator) >= 9 {
110                         step *= 2
111                 }
112         }
113         return locator
114 }
115
116 func (bk *blockKeeper) fastBlockSync(checkPoint *consensus.Checkpoint) error {
117         bk.resetHeaderState()
118         lastHeader := bk.headerList.Back().Value.(*types.BlockHeader)
119         for ; lastHeader.Hash() != checkPoint.Hash; lastHeader = bk.headerList.Back().Value.(*types.BlockHeader) {
120                 if lastHeader.Height >= checkPoint.Height {
121                         return errors.Wrap(peers.ErrPeerMisbehave, "peer is not in the checkpoint branch")
122                 }
123
124                 lastHash := lastHeader.Hash()
125                 headers, err := bk.requireHeaders([]*bc.Hash{&lastHash}, &checkPoint.Hash)
126                 if err != nil {
127                         return err
128                 }
129
130                 if len(headers) == 0 {
131                         return errors.Wrap(peers.ErrPeerMisbehave, "requireHeaders return empty list")
132                 }
133
134                 if err := bk.appendHeaderList(headers); err != nil {
135                         return err
136                 }
137         }
138
139         fastHeader := bk.headerList.Front()
140         for bk.chain.BestBlockHeight() < checkPoint.Height {
141                 locator := bk.blockLocator()
142                 blocks, err := bk.requireBlocks(locator, &checkPoint.Hash)
143                 if err != nil {
144                         return err
145                 }
146
147                 if len(blocks) == 0 {
148                         return errors.Wrap(peers.ErrPeerMisbehave, "requireBlocks return empty list")
149                 }
150
151                 for _, block := range blocks {
152                         if fastHeader = fastHeader.Next(); fastHeader == nil {
153                                 return errors.New("get block than is higher than checkpoint")
154                         }
155
156                         if _, err = bk.chain.ProcessBlock(block); err != nil {
157                                 return errors.Wrap(err, "fail on fastBlockSync process block")
158                         }
159                 }
160         }
161         return nil
162 }
163
164 func (bk *blockKeeper) locateBlocks(locator []*bc.Hash, stopHash *bc.Hash) ([]*types.Block, error) {
165         headers, err := bk.locateHeaders(locator, stopHash)
166         if err != nil {
167                 return nil, err
168         }
169
170         blocks := []*types.Block{}
171         for i, header := range headers {
172                 if uint64(i) >= maxBlockPerMsg {
173                         break
174                 }
175
176                 headerHash := header.Hash()
177                 block, err := bk.chain.GetBlockByHash(&headerHash)
178                 if err != nil {
179                         return nil, err
180                 }
181
182                 blocks = append(blocks, block)
183         }
184         return blocks, nil
185 }
186
187 func (bk *blockKeeper) locateHeaders(locator []*bc.Hash, stopHash *bc.Hash) ([]*types.BlockHeader, error) {
188         stopHeader, err := bk.chain.GetHeaderByHash(stopHash)
189         if err != nil {
190                 return nil, err
191         }
192
193         startHeader, err := bk.chain.GetHeaderByHeight(0)
194         if err != nil {
195                 return nil, err
196         }
197
198         for _, hash := range locator {
199                 header, err := bk.chain.GetHeaderByHash(hash)
200                 if err == nil && bk.chain.InMainChain(header.Hash()) {
201                         startHeader = header
202                         break
203                 }
204         }
205
206         totalHeaders := stopHeader.Height - startHeader.Height
207         if totalHeaders > maxBlockHeadersPerMsg {
208                 totalHeaders = maxBlockHeadersPerMsg
209         }
210
211         headers := []*types.BlockHeader{}
212         for i := uint64(1); i <= totalHeaders; i++ {
213                 header, err := bk.chain.GetHeaderByHeight(startHeader.Height + i)
214                 if err != nil {
215                         return nil, err
216                 }
217
218                 headers = append(headers, header)
219         }
220         return headers, nil
221 }
222
223 func (bk *blockKeeper) nextCheckpoint() *consensus.Checkpoint {
224         height := bk.chain.BestBlockHeader().Height
225         checkpoints := consensus.ActiveNetParams.Checkpoints
226         if len(checkpoints) == 0 || height >= checkpoints[len(checkpoints)-1].Height {
227                 return nil
228         }
229
230         nextCheckpoint := &checkpoints[len(checkpoints)-1]
231         for i := len(checkpoints) - 2; i >= 0; i-- {
232                 if height >= checkpoints[i].Height {
233                         break
234                 }
235                 nextCheckpoint = &checkpoints[i]
236         }
237         return nextCheckpoint
238 }
239
240 func (bk *blockKeeper) processBlock(peerID string, block *types.Block) {
241         bk.blockProcessCh <- &blockMsg{block: block, peerID: peerID}
242 }
243
244 func (bk *blockKeeper) processBlocks(peerID string, blocks []*types.Block) {
245         bk.blocksProcessCh <- &blocksMsg{blocks: blocks, peerID: peerID}
246 }
247
248 func (bk *blockKeeper) processHeaders(peerID string, headers []*types.BlockHeader) {
249         bk.headersProcessCh <- &headersMsg{headers: headers, peerID: peerID}
250 }
251
252 func (bk *blockKeeper) regularBlockSync(wantHeight uint64) error {
253         i := bk.chain.BestBlockHeight() + 1
254         for i <= wantHeight {
255                 block, err := bk.requireBlock(i)
256                 if err != nil {
257                         return err
258                 }
259
260                 isOrphan, err := bk.chain.ProcessBlock(block)
261                 if err != nil {
262                         return err
263                 }
264
265                 if isOrphan {
266                         i--
267                         continue
268                 }
269                 i = bk.chain.BestBlockHeight() + 1
270         }
271         return nil
272 }
273
274 func (bk *blockKeeper) requireBlock(height uint64) (*types.Block, error) {
275         if ok := bk.syncPeer.GetBlockByHeight(height); !ok {
276                 return nil, errPeerDropped
277         }
278
279         timeout := time.NewTimer(syncTimeout)
280         defer timeout.Stop()
281
282         for {
283                 select {
284                 case msg := <-bk.blockProcessCh:
285                         if msg.peerID != bk.syncPeer.ID() {
286                                 continue
287                         }
288                         if msg.block.Height != height {
289                                 continue
290                         }
291                         return msg.block, nil
292                 case <-timeout.C:
293                         return nil, errors.Wrap(errRequestTimeout, "requireBlock")
294                 }
295         }
296 }
297
298 func (bk *blockKeeper) requireBlocks(locator []*bc.Hash, stopHash *bc.Hash) ([]*types.Block, error) {
299         if ok := bk.syncPeer.GetBlocks(locator, stopHash); !ok {
300                 return nil, errPeerDropped
301         }
302
303         timeout := time.NewTimer(syncTimeout)
304         defer timeout.Stop()
305
306         for {
307                 select {
308                 case msg := <-bk.blocksProcessCh:
309                         if msg.peerID != bk.syncPeer.ID() {
310                                 continue
311                         }
312                         return msg.blocks, nil
313                 case <-timeout.C:
314                         return nil, errors.Wrap(errRequestTimeout, "requireBlocks")
315                 }
316         }
317 }
318
319 func (bk *blockKeeper) requireHeaders(locator []*bc.Hash, stopHash *bc.Hash) ([]*types.BlockHeader, error) {
320         if ok := bk.syncPeer.GetHeaders(locator, stopHash); !ok {
321                 return nil, errPeerDropped
322         }
323
324         timeout := time.NewTimer(syncTimeout)
325         defer timeout.Stop()
326
327         for {
328                 select {
329                 case msg := <-bk.headersProcessCh:
330                         if msg.peerID != bk.syncPeer.ID() {
331                                 continue
332                         }
333                         return msg.headers, nil
334                 case <-timeout.C:
335                         return nil, errors.Wrap(errRequestTimeout, "requireHeaders")
336                 }
337         }
338 }
339
340 // resetHeaderState sets the headers-first mode state to values appropriate for
341 // syncing from a new peer.
342 func (bk *blockKeeper) resetHeaderState() {
343         header := bk.chain.BestBlockHeader()
344         bk.headerList.Init()
345         if bk.nextCheckpoint() != nil {
346                 bk.headerList.PushBack(header)
347         }
348 }
349
350 func (bk *blockKeeper) startSync() bool {
351         checkPoint := bk.nextCheckpoint()
352         peer := bk.peers.BestPeer(consensus.SFFastSync | consensus.SFFullNode)
353         if peer != nil && checkPoint != nil && peer.Height() >= checkPoint.Height {
354                 bk.syncPeer = peer
355                 if err := bk.fastBlockSync(checkPoint); err != nil {
356                         log.WithFields(log.Fields{"module": logModule, "err": err}).Warning("fail on fastBlockSync")
357                         bk.peers.ErrorHandler(peer.ID(), security.LevelMsgIllegal, err)
358                         return false
359                 }
360                 return true
361         }
362
363         blockHeight := bk.chain.BestBlockHeight()
364         peer = bk.peers.BestPeer(consensus.SFFullNode)
365         if peer != nil && peer.Height() > blockHeight {
366                 bk.syncPeer = peer
367                 targetHeight := blockHeight + maxBlockPerMsg
368                 if targetHeight > peer.Height() {
369                         targetHeight = peer.Height()
370                 }
371
372                 if err := bk.regularBlockSync(targetHeight); err != nil {
373                         log.WithFields(log.Fields{"module": logModule, "err": err}).Warning("fail on regularBlockSync")
374                         bk.peers.ErrorHandler(peer.ID(),security.LevelMsgIllegal, err)
375                         return false
376                 }
377                 return true
378         }
379         return false
380 }
381
382 func (bk *blockKeeper) syncWorker() {
383         syncTicker := time.NewTicker(syncCycle)
384         defer syncTicker.Stop()
385
386         for {
387                 <-syncTicker.C
388                 if update := bk.startSync(); !update {
389                         continue
390                 }
391
392                 block, err := bk.chain.GetBlockByHeight(bk.chain.BestBlockHeight())
393                 if err != nil {
394                         log.WithFields(log.Fields{"module": logModule, "err": err}).Error("fail on syncWorker get best block")
395                 }
396
397                 if err = bk.peers.BroadcastNewStatus(block); err != nil {
398                         log.WithFields(log.Fields{"module": logModule, "err": err}).Error("fail on syncWorker broadcast new status")
399                 }
400         }
401 }