OSDN Git Service

Add mempool sync test (#114)
[bytom/vapor.git] / netsync / chainmgr / block_keeper.go
1 package chainmgr
2
3 import (
4         "container/list"
5         "time"
6
7         log "github.com/sirupsen/logrus"
8
9         "github.com/vapor/consensus"
10         "github.com/vapor/errors"
11         "github.com/vapor/netsync/peers"
12         "github.com/vapor/protocol/bc"
13         "github.com/vapor/protocol/bc/types"
14 )
15
16 const (
17         syncCycle            = 5 * time.Second
18         blockProcessChSize   = 1024
19         blocksProcessChSize  = 128
20         headersProcessChSize = 1024
21 )
22
23 var (
24         maxBlockPerMsg        = uint64(128)
25         maxBlockHeadersPerMsg = uint64(2048)
26         syncTimeout           = 30 * time.Second
27
28         errAppendHeaders  = errors.New("fail to append list due to order dismatch")
29         errRequestTimeout = errors.New("request timeout")
30         errPeerDropped    = errors.New("Peer dropped")
31 )
32
33 type blockMsg struct {
34         block  *types.Block
35         peerID string
36 }
37
38 type blocksMsg struct {
39         blocks []*types.Block
40         peerID string
41 }
42
43 type headersMsg struct {
44         headers []*types.BlockHeader
45         peerID  string
46 }
47
48 type blockKeeper struct {
49         chain Chain
50         peers *peers.PeerSet
51
52         syncPeer         *peers.Peer
53         blockProcessCh   chan *blockMsg
54         blocksProcessCh  chan *blocksMsg
55         headersProcessCh chan *headersMsg
56
57         headerList *list.List
58 }
59
60 func newBlockKeeper(chain Chain, peers *peers.PeerSet) *blockKeeper {
61         bk := &blockKeeper{
62                 chain:            chain,
63                 peers:            peers,
64                 blockProcessCh:   make(chan *blockMsg, blockProcessChSize),
65                 blocksProcessCh:  make(chan *blocksMsg, blocksProcessChSize),
66                 headersProcessCh: make(chan *headersMsg, headersProcessChSize),
67                 headerList:       list.New(),
68         }
69         bk.resetHeaderState()
70         go bk.syncWorker()
71         return bk
72 }
73
74 func (bk *blockKeeper) appendHeaderList(headers []*types.BlockHeader) error {
75         for _, header := range headers {
76                 prevHeader := bk.headerList.Back().Value.(*types.BlockHeader)
77                 if prevHeader.Hash() != header.PreviousBlockHash {
78                         return errAppendHeaders
79                 }
80                 bk.headerList.PushBack(header)
81         }
82         return nil
83 }
84
85 func (bk *blockKeeper) blockLocator() []*bc.Hash {
86         header := bk.chain.BestBlockHeader()
87         locator := []*bc.Hash{}
88
89         step := uint64(1)
90         for header != nil {
91                 headerHash := header.Hash()
92                 locator = append(locator, &headerHash)
93                 if header.Height == 0 {
94                         break
95                 }
96
97                 var err error
98                 if header.Height < step {
99                         header, err = bk.chain.GetHeaderByHeight(0)
100                 } else {
101                         header, err = bk.chain.GetHeaderByHeight(header.Height - step)
102                 }
103                 if err != nil {
104                         log.WithFields(log.Fields{"module": logModule, "err": err}).Error("blockKeeper fail on get blockLocator")
105                         break
106                 }
107
108                 if len(locator) >= 9 {
109                         step *= 2
110                 }
111         }
112         return locator
113 }
114
115 func (bk *blockKeeper) fastBlockSync(checkPoint *consensus.Checkpoint) error {
116         bk.resetHeaderState()
117         lastHeader := bk.headerList.Back().Value.(*types.BlockHeader)
118         for ; lastHeader.Hash() != checkPoint.Hash; lastHeader = bk.headerList.Back().Value.(*types.BlockHeader) {
119                 if lastHeader.Height >= checkPoint.Height {
120                         return errors.Wrap(peers.ErrPeerMisbehave, "peer is not in the checkpoint branch")
121                 }
122
123                 lastHash := lastHeader.Hash()
124                 headers, err := bk.requireHeaders([]*bc.Hash{&lastHash}, &checkPoint.Hash)
125                 if err != nil {
126                         return err
127                 }
128
129                 if len(headers) == 0 {
130                         return errors.Wrap(peers.ErrPeerMisbehave, "requireHeaders return empty list")
131                 }
132
133                 if err := bk.appendHeaderList(headers); err != nil {
134                         return err
135                 }
136         }
137
138         fastHeader := bk.headerList.Front()
139         for bk.chain.BestBlockHeight() < checkPoint.Height {
140                 locator := bk.blockLocator()
141                 blocks, err := bk.requireBlocks(locator, &checkPoint.Hash)
142                 if err != nil {
143                         return err
144                 }
145
146                 if len(blocks) == 0 {
147                         return errors.Wrap(peers.ErrPeerMisbehave, "requireBlocks return empty list")
148                 }
149
150                 for _, block := range blocks {
151                         if fastHeader = fastHeader.Next(); fastHeader == nil {
152                                 return errors.New("get block than is higher than checkpoint")
153                         }
154
155                         if _, err = bk.chain.ProcessBlock(block); err != nil {
156                                 return errors.Wrap(err, "fail on fastBlockSync process block")
157                         }
158                 }
159         }
160         return nil
161 }
162
163 func (bk *blockKeeper) locateBlocks(locator []*bc.Hash, stopHash *bc.Hash) ([]*types.Block, error) {
164         headers, err := bk.locateHeaders(locator, stopHash)
165         if err != nil {
166                 return nil, err
167         }
168
169         blocks := []*types.Block{}
170         for i, header := range headers {
171                 if uint64(i) >= maxBlockPerMsg {
172                         break
173                 }
174
175                 headerHash := header.Hash()
176                 block, err := bk.chain.GetBlockByHash(&headerHash)
177                 if err != nil {
178                         return nil, err
179                 }
180
181                 blocks = append(blocks, block)
182         }
183         return blocks, nil
184 }
185
186 func (bk *blockKeeper) locateHeaders(locator []*bc.Hash, stopHash *bc.Hash) ([]*types.BlockHeader, error) {
187         stopHeader, err := bk.chain.GetHeaderByHash(stopHash)
188         if err != nil {
189                 return nil, err
190         }
191
192         startHeader, err := bk.chain.GetHeaderByHeight(0)
193         if err != nil {
194                 return nil, err
195         }
196
197         for _, hash := range locator {
198                 header, err := bk.chain.GetHeaderByHash(hash)
199                 if err == nil && bk.chain.InMainChain(header.Hash()) {
200                         startHeader = header
201                         break
202                 }
203         }
204
205         totalHeaders := stopHeader.Height - startHeader.Height
206         if totalHeaders > maxBlockHeadersPerMsg {
207                 totalHeaders = maxBlockHeadersPerMsg
208         }
209
210         headers := []*types.BlockHeader{}
211         for i := uint64(1); i <= totalHeaders; i++ {
212                 header, err := bk.chain.GetHeaderByHeight(startHeader.Height + i)
213                 if err != nil {
214                         return nil, err
215                 }
216
217                 headers = append(headers, header)
218         }
219         return headers, nil
220 }
221
222 func (bk *blockKeeper) nextCheckpoint() *consensus.Checkpoint {
223         height := bk.chain.BestBlockHeader().Height
224         checkpoints := consensus.ActiveNetParams.Checkpoints
225         if len(checkpoints) == 0 || height >= checkpoints[len(checkpoints)-1].Height {
226                 return nil
227         }
228
229         nextCheckpoint := &checkpoints[len(checkpoints)-1]
230         for i := len(checkpoints) - 2; i >= 0; i-- {
231                 if height >= checkpoints[i].Height {
232                         break
233                 }
234                 nextCheckpoint = &checkpoints[i]
235         }
236         return nextCheckpoint
237 }
238
239 func (bk *blockKeeper) processBlock(peerID string, block *types.Block) {
240         bk.blockProcessCh <- &blockMsg{block: block, peerID: peerID}
241 }
242
243 func (bk *blockKeeper) processBlocks(peerID string, blocks []*types.Block) {
244         bk.blocksProcessCh <- &blocksMsg{blocks: blocks, peerID: peerID}
245 }
246
247 func (bk *blockKeeper) processHeaders(peerID string, headers []*types.BlockHeader) {
248         bk.headersProcessCh <- &headersMsg{headers: headers, peerID: peerID}
249 }
250
251 func (bk *blockKeeper) regularBlockSync(wantHeight uint64) error {
252         i := bk.chain.BestBlockHeight() + 1
253         for i <= wantHeight {
254                 block, err := bk.requireBlock(i)
255                 if err != nil {
256                         return err
257                 }
258
259                 isOrphan, err := bk.chain.ProcessBlock(block)
260                 if err != nil {
261                         return err
262                 }
263
264                 if isOrphan {
265                         i--
266                         continue
267                 }
268                 i = bk.chain.BestBlockHeight() + 1
269         }
270         return nil
271 }
272
273 func (bk *blockKeeper) requireBlock(height uint64) (*types.Block, error) {
274         if ok := bk.syncPeer.GetBlockByHeight(height); !ok {
275                 return nil, errPeerDropped
276         }
277
278         timeout := time.NewTimer(syncTimeout)
279         defer timeout.Stop()
280
281         for {
282                 select {
283                 case msg := <-bk.blockProcessCh:
284                         if msg.peerID != bk.syncPeer.ID() {
285                                 continue
286                         }
287                         if msg.block.Height != height {
288                                 continue
289                         }
290                         return msg.block, nil
291                 case <-timeout.C:
292                         return nil, errors.Wrap(errRequestTimeout, "requireBlock")
293                 }
294         }
295 }
296
297 func (bk *blockKeeper) requireBlocks(locator []*bc.Hash, stopHash *bc.Hash) ([]*types.Block, error) {
298         if ok := bk.syncPeer.GetBlocks(locator, stopHash); !ok {
299                 return nil, errPeerDropped
300         }
301
302         timeout := time.NewTimer(syncTimeout)
303         defer timeout.Stop()
304
305         for {
306                 select {
307                 case msg := <-bk.blocksProcessCh:
308                         if msg.peerID != bk.syncPeer.ID() {
309                                 continue
310                         }
311                         return msg.blocks, nil
312                 case <-timeout.C:
313                         return nil, errors.Wrap(errRequestTimeout, "requireBlocks")
314                 }
315         }
316 }
317
318 func (bk *blockKeeper) requireHeaders(locator []*bc.Hash, stopHash *bc.Hash) ([]*types.BlockHeader, error) {
319         if ok := bk.syncPeer.GetHeaders(locator, stopHash); !ok {
320                 return nil, errPeerDropped
321         }
322
323         timeout := time.NewTimer(syncTimeout)
324         defer timeout.Stop()
325
326         for {
327                 select {
328                 case msg := <-bk.headersProcessCh:
329                         if msg.peerID != bk.syncPeer.ID() {
330                                 continue
331                         }
332                         return msg.headers, nil
333                 case <-timeout.C:
334                         return nil, errors.Wrap(errRequestTimeout, "requireHeaders")
335                 }
336         }
337 }
338
339 // resetHeaderState sets the headers-first mode state to values appropriate for
340 // syncing from a new peer.
341 func (bk *blockKeeper) resetHeaderState() {
342         header := bk.chain.BestBlockHeader()
343         bk.headerList.Init()
344         if bk.nextCheckpoint() != nil {
345                 bk.headerList.PushBack(header)
346         }
347 }
348
349 func (bk *blockKeeper) startSync() bool {
350         checkPoint := bk.nextCheckpoint()
351         peer := bk.peers.BestPeer(consensus.SFFastSync | consensus.SFFullNode)
352         if peer != nil && checkPoint != nil && peer.Height() >= checkPoint.Height {
353                 bk.syncPeer = peer
354                 if err := bk.fastBlockSync(checkPoint); err != nil {
355                         log.WithFields(log.Fields{"module": logModule, "err": err}).Warning("fail on fastBlockSync")
356                         bk.peers.ErrorHandler(peer.ID(), err)
357                         return false
358                 }
359                 return true
360         }
361
362         blockHeight := bk.chain.BestBlockHeight()
363         peer = bk.peers.BestPeer(consensus.SFFullNode)
364         if peer != nil && peer.Height() > blockHeight {
365                 bk.syncPeer = peer
366                 targetHeight := blockHeight + maxBlockPerMsg
367                 if targetHeight > peer.Height() {
368                         targetHeight = peer.Height()
369                 }
370
371                 if err := bk.regularBlockSync(targetHeight); err != nil {
372                         log.WithFields(log.Fields{"module": logModule, "err": err}).Warning("fail on regularBlockSync")
373                         bk.peers.ErrorHandler(peer.ID(), err)
374                         return false
375                 }
376                 return true
377         }
378         return false
379 }
380
381 func (bk *blockKeeper) syncWorker() {
382         syncTicker := time.NewTicker(syncCycle)
383         defer syncTicker.Stop()
384
385         for {
386                 <-syncTicker.C
387                 if update := bk.startSync(); !update {
388                         continue
389                 }
390
391                 block, err := bk.chain.GetBlockByHeight(bk.chain.BestBlockHeight())
392                 if err != nil {
393                         log.WithFields(log.Fields{"module": logModule, "err": err}).Error("fail on syncWorker get best block")
394                 }
395
396                 if err = bk.peers.BroadcastNewStatus(block); err != nil {
397                         log.WithFields(log.Fields{"module": logModule, "err": err}).Error("fail on syncWorker broadcast new status")
398                 }
399         }
400 }