OSDN Git Service

Add consensus message transfer
[bytom/vapor.git] / netsync / block_keeper.go
1 package netsync
2
3 import (
4         "container/list"
5         "time"
6
7         log "github.com/sirupsen/logrus"
8
9         "github.com/vapor/consensus"
10         "github.com/vapor/errors"
11         "github.com/vapor/netsync/peers"
12         "github.com/vapor/protocol/bc"
13         "github.com/vapor/protocol/bc/types"
14 )
15
16 const (
17         syncCycle            = 5 * time.Second
18         blockProcessChSize   = 1024
19         blocksProcessChSize  = 128
20         headersProcessChSize = 1024
21 )
22
23 var (
24         maxBlockPerMsg        = uint64(128)
25         maxBlockHeadersPerMsg = uint64(2048)
26         syncTimeout           = 30 * time.Second
27
28         errAppendHeaders  = errors.New("fail to append list due to order dismatch")
29         errRequestTimeout = errors.New("request timeout")
30         errPeerDropped    = errors.New("Peer dropped")
31 )
32
33 type blockMsg struct {
34         block  *types.Block
35         peerID string
36 }
37
38 type blocksMsg struct {
39         blocks []*types.Block
40         peerID string
41 }
42
43 type headersMsg struct {
44         headers []*types.BlockHeader
45         peerID  string
46 }
47
48 type blockKeeper struct {
49         chain Chain
50         peers *peers.PeerSet
51
52         syncPeerID       string
53         blockProcessCh   chan *blockMsg
54         blocksProcessCh  chan *blocksMsg
55         headersProcessCh chan *headersMsg
56
57         headerList *list.List
58 }
59
60 func newBlockKeeper(chain Chain, peers *peers.PeerSet) *blockKeeper {
61         bk := &blockKeeper{
62                 chain:            chain,
63                 peers:            peers,
64                 blockProcessCh:   make(chan *blockMsg, blockProcessChSize),
65                 blocksProcessCh:  make(chan *blocksMsg, blocksProcessChSize),
66                 headersProcessCh: make(chan *headersMsg, headersProcessChSize),
67                 headerList:       list.New(),
68         }
69         bk.resetHeaderState()
70         go bk.syncWorker()
71         return bk
72 }
73
74 func (bk *blockKeeper) appendHeaderList(headers []*types.BlockHeader) error {
75         for _, header := range headers {
76                 prevHeader := bk.headerList.Back().Value.(*types.BlockHeader)
77                 if prevHeader.Hash() != header.PreviousBlockHash {
78                         return errAppendHeaders
79                 }
80                 bk.headerList.PushBack(header)
81         }
82         return nil
83 }
84
85 func (bk *blockKeeper) blockLocator() []*bc.Hash {
86         header := bk.chain.BestBlockHeader()
87         locator := []*bc.Hash{}
88
89         step := uint64(1)
90         for header != nil {
91                 headerHash := header.Hash()
92                 locator = append(locator, &headerHash)
93                 if header.Height == 0 {
94                         break
95                 }
96
97                 var err error
98                 if header.Height < step {
99                         header, err = bk.chain.GetHeaderByHeight(0)
100                 } else {
101                         header, err = bk.chain.GetHeaderByHeight(header.Height - step)
102                 }
103                 if err != nil {
104                         log.WithFields(log.Fields{"module": logModule, "err": err}).Error("blockKeeper fail on get blockLocator")
105                         break
106                 }
107
108                 if len(locator) >= 9 {
109                         step *= 2
110                 }
111         }
112         return locator
113 }
114
115 func (bk *blockKeeper) fastBlockSync(checkPoint *consensus.Checkpoint) error {
116         bk.resetHeaderState()
117         lastHeader := bk.headerList.Back().Value.(*types.BlockHeader)
118         for ; lastHeader.Hash() != checkPoint.Hash; lastHeader = bk.headerList.Back().Value.(*types.BlockHeader) {
119                 if lastHeader.Height >= checkPoint.Height {
120                         return errors.Wrap(peers.ErrPeerMisbehave, "peer is not in the checkpoint branch")
121                 }
122
123                 lastHash := lastHeader.Hash()
124                 headers, err := bk.requireHeaders([]*bc.Hash{&lastHash}, &checkPoint.Hash)
125                 if err != nil {
126                         return err
127                 }
128
129                 if len(headers) == 0 {
130                         return errors.Wrap(peers.ErrPeerMisbehave, "requireHeaders return empty list")
131                 }
132
133                 if err := bk.appendHeaderList(headers); err != nil {
134                         return err
135                 }
136         }
137
138         fastHeader := bk.headerList.Front()
139         for bk.chain.BestBlockHeight() < checkPoint.Height {
140                 locator := bk.blockLocator()
141                 blocks, err := bk.requireBlocks(locator, &checkPoint.Hash)
142                 if err != nil {
143                         return err
144                 }
145
146                 if len(blocks) == 0 {
147                         return errors.Wrap(peers.ErrPeerMisbehave, "requireBlocks return empty list")
148                 }
149
150                 for _, block := range blocks {
151                         if fastHeader = fastHeader.Next(); fastHeader == nil {
152                                 return errors.New("get block than is higher than checkpoint")
153                         }
154
155                         if _, err = bk.chain.ProcessBlock(block); err != nil {
156                                 return errors.Wrap(err, "fail on fastBlockSync process block")
157                         }
158                 }
159         }
160         return nil
161 }
162
163 func (bk *blockKeeper) locateBlocks(locator []*bc.Hash, stopHash *bc.Hash) ([]*types.Block, error) {
164         headers, err := bk.locateHeaders(locator, stopHash)
165         if err != nil {
166                 return nil, err
167         }
168
169         blocks := []*types.Block{}
170         for i, header := range headers {
171                 if uint64(i) >= maxBlockPerMsg {
172                         break
173                 }
174
175                 headerHash := header.Hash()
176                 block, err := bk.chain.GetBlockByHash(&headerHash)
177                 if err != nil {
178                         return nil, err
179                 }
180
181                 blocks = append(blocks, block)
182         }
183         return blocks, nil
184 }
185
186 func (bk *blockKeeper) locateHeaders(locator []*bc.Hash, stopHash *bc.Hash) ([]*types.BlockHeader, error) {
187         stopHeader, err := bk.chain.GetHeaderByHash(stopHash)
188         if err != nil {
189                 return nil, err
190         }
191
192         startHeader, err := bk.chain.GetHeaderByHeight(0)
193         if err != nil {
194                 return nil, err
195         }
196
197         for _, hash := range locator {
198                 header, err := bk.chain.GetHeaderByHash(hash)
199                 if err == nil && bk.chain.InMainChain(header.Hash()) {
200                         startHeader = header
201                         break
202                 }
203         }
204
205         totalHeaders := stopHeader.Height - startHeader.Height
206         if totalHeaders > maxBlockHeadersPerMsg {
207                 totalHeaders = maxBlockHeadersPerMsg
208         }
209
210         headers := []*types.BlockHeader{}
211         for i := uint64(1); i <= totalHeaders; i++ {
212                 header, err := bk.chain.GetHeaderByHeight(startHeader.Height + i)
213                 if err != nil {
214                         return nil, err
215                 }
216
217                 headers = append(headers, header)
218         }
219         return headers, nil
220 }
221
222 func (bk *blockKeeper) nextCheckpoint() *consensus.Checkpoint {
223         height := bk.chain.BestBlockHeader().Height
224         checkpoints := consensus.ActiveNetParams.Checkpoints
225         if len(checkpoints) == 0 || height >= checkpoints[len(checkpoints)-1].Height {
226                 return nil
227         }
228
229         nextCheckpoint := &checkpoints[len(checkpoints)-1]
230         for i := len(checkpoints) - 2; i >= 0; i-- {
231                 if height >= checkpoints[i].Height {
232                         break
233                 }
234                 nextCheckpoint = &checkpoints[i]
235         }
236         return nextCheckpoint
237 }
238
239 func (bk *blockKeeper) processBlock(peerID string, block *types.Block) {
240         bk.blockProcessCh <- &blockMsg{block: block, peerID: peerID}
241 }
242
243 func (bk *blockKeeper) processBlocks(peerID string, blocks []*types.Block) {
244         bk.blocksProcessCh <- &blocksMsg{blocks: blocks, peerID: peerID}
245 }
246
247 func (bk *blockKeeper) processHeaders(peerID string, headers []*types.BlockHeader) {
248         bk.headersProcessCh <- &headersMsg{headers: headers, peerID: peerID}
249 }
250
251 func (bk *blockKeeper) regularBlockSync(wantHeight uint64) error {
252         i := bk.chain.BestBlockHeight() + 1
253         for i <= wantHeight {
254                 block, err := bk.requireBlock(i)
255                 if err != nil {
256                         return err
257                 }
258
259                 isOrphan, err := bk.chain.ProcessBlock(block)
260                 if err != nil {
261                         return err
262                 }
263
264                 if isOrphan {
265                         i--
266                         continue
267                 }
268                 i = bk.chain.BestBlockHeight() + 1
269         }
270         return nil
271 }
272
273 func (bk *blockKeeper) requireBlock(height uint64) (*types.Block, error) {
274         //send get block msg
275         msg := struct{ BlockchainMessage }{NewGetBlockMessage(height, [32]byte{})}
276         if ok := bk.peers.SendMsg(bk.syncPeerID, BlockchainChannel, msg); !ok {
277                 return nil, errPeerDropped
278         }
279
280         timeout := time.NewTimer(syncTimeout)
281         defer timeout.Stop()
282
283         for {
284                 select {
285                 case msg := <-bk.blockProcessCh:
286                         if msg.peerID != bk.syncPeerID {
287                                 continue
288                         }
289                         if msg.block.Height != height {
290                                 continue
291                         }
292                         return msg.block, nil
293                 case <-timeout.C:
294                         return nil, errors.Wrap(errRequestTimeout, "requireBlock")
295                 }
296         }
297 }
298
299 func (bk *blockKeeper) requireBlocks(locator []*bc.Hash, stopHash *bc.Hash) ([]*types.Block, error) {
300         //send get blocks msg
301         msg := struct{ BlockchainMessage }{NewGetBlocksMessage(locator, stopHash)}
302         if ok := bk.peers.SendMsg(bk.syncPeerID, BlockchainChannel, msg); !ok {
303                 return nil, errPeerDropped
304         }
305
306         timeout := time.NewTimer(syncTimeout)
307         defer timeout.Stop()
308
309         for {
310                 select {
311                 case msg := <-bk.blocksProcessCh:
312                         if msg.peerID != bk.syncPeerID {
313                                 continue
314                         }
315                         return msg.blocks, nil
316                 case <-timeout.C:
317                         return nil, errors.Wrap(errRequestTimeout, "requireBlocks")
318                 }
319         }
320 }
321
322 func (bk *blockKeeper) requireHeaders(locator []*bc.Hash, stopHash *bc.Hash) ([]*types.BlockHeader, error) {
323         //send get headers msg
324         msg := struct{ BlockchainMessage }{NewGetHeadersMessage(locator, stopHash)}
325         if ok := bk.peers.SendMsg(bk.syncPeerID, BlockchainChannel, msg); !ok {
326                 return nil, errPeerDropped
327         }
328
329         timeout := time.NewTimer(syncTimeout)
330         defer timeout.Stop()
331
332         for {
333                 select {
334                 case msg := <-bk.headersProcessCh:
335                         if msg.peerID != bk.syncPeerID {
336                                 continue
337                         }
338                         return msg.headers, nil
339                 case <-timeout.C:
340                         return nil, errors.Wrap(errRequestTimeout, "requireHeaders")
341                 }
342         }
343 }
344
345 // resetHeaderState sets the headers-first mode state to values appropriate for
346 // syncing from a new peer.
347 func (bk *blockKeeper) resetHeaderState() {
348         header := bk.chain.BestBlockHeader()
349         bk.headerList.Init()
350         if bk.nextCheckpoint() != nil {
351                 bk.headerList.PushBack(header)
352         }
353 }
354
355 func (bk *blockKeeper) startSync() bool {
356         checkPoint := bk.nextCheckpoint()
357         bestPeerID, bestPeerHeight := bk.peers.BestPeerInfo(consensus.SFFastSync | consensus.SFFullNode)
358         if bestPeerID != "" && checkPoint != nil && bestPeerHeight >= checkPoint.Height {
359                 bk.syncPeerID = bestPeerID
360                 if err := bk.fastBlockSync(checkPoint); err != nil {
361                         log.WithFields(log.Fields{"module": logModule, "err": err}).Warning("fail on fastBlockSync")
362                         bk.peers.ErrorHandler(bestPeerID, err)
363                         return false
364                 }
365                 return true
366         }
367
368         blockHeight := bk.chain.BestBlockHeight()
369         bestPeerID, bestPeerHeight = bk.peers.BestPeerInfo(consensus.SFFullNode)
370         if bestPeerID != "" && bestPeerHeight > blockHeight {
371                 bk.syncPeerID = bestPeerID
372                 targetHeight := blockHeight + maxBlockPerMsg
373                 if targetHeight > bestPeerHeight {
374                         targetHeight = bestPeerHeight
375                 }
376
377                 if err := bk.regularBlockSync(targetHeight); err != nil {
378                         log.WithFields(log.Fields{"module": logModule, "err": err}).Warning("fail on regularBlockSync")
379                         bk.peers.ErrorHandler(bestPeerID, err)
380                         return false
381                 }
382                 return true
383         }
384         return false
385 }
386
387 func (bk *blockKeeper) syncWorker() {
388         syncTicker := time.NewTicker(syncCycle)
389         defer syncTicker.Stop()
390
391         for {
392                 <-syncTicker.C
393                 if update := bk.startSync(); !update {
394                         continue
395                 }
396
397                 block, err := bk.chain.GetBlockByHeight(bk.chain.BestBlockHeight())
398                 if err != nil {
399                         log.WithFields(log.Fields{"module": logModule, "err": err}).Error("fail on syncWorker get best block")
400                 }
401                 headMsg, _ := newStatusBroadcastMsg(&block.BlockHeader, BlockchainChannel)
402
403                 if err = bk.peers.BroadcastMsg(headMsg); err != nil {
404                         log.WithFields(log.Fields{"module": logModule, "err": err}).Error("fail on syncWorker broadcast new status")
405                 }
406         }
407 }