7 log "github.com/sirupsen/logrus"
9 "github.com/vapor/consensus"
10 "github.com/vapor/errors"
11 "github.com/vapor/mining/tensority"
12 "github.com/vapor/protocol/bc"
13 "github.com/vapor/protocol/bc/types"
17 syncCycle = 5 * time.Second
18 blockProcessChSize = 1024
19 blocksProcessChSize = 128
20 headersProcessChSize = 1024
24 maxBlockPerMsg = uint64(128)
25 maxBlockHeadersPerMsg = uint64(2048)
26 syncTimeout = 30 * time.Second
28 errAppendHeaders = errors.New("fail to append list due to order dismatch")
29 errRequestTimeout = errors.New("request timeout")
30 errPeerDropped = errors.New("Peer dropped")
31 errPeerMisbehave = errors.New("peer is misbehave")
34 type blockMsg struct {
39 type blocksMsg struct {
44 type headersMsg struct {
45 headers []*types.BlockHeader
49 type blockKeeper struct {
54 blockProcessCh chan *blockMsg
55 blocksProcessCh chan *blocksMsg
56 headersProcessCh chan *headersMsg
61 func newBlockKeeper(chain Chain, peers *peerSet) *blockKeeper {
65 blockProcessCh: make(chan *blockMsg, blockProcessChSize),
66 blocksProcessCh: make(chan *blocksMsg, blocksProcessChSize),
67 headersProcessCh: make(chan *headersMsg, headersProcessChSize),
68 headerList: list.New(),
75 func (bk *blockKeeper) appendHeaderList(headers []*types.BlockHeader) error {
76 for _, header := range headers {
77 prevHeader := bk.headerList.Back().Value.(*types.BlockHeader)
78 if prevHeader.Hash() != header.PreviousBlockHash {
79 return errAppendHeaders
81 bk.headerList.PushBack(header)
86 func (bk *blockKeeper) blockLocator() []*bc.Hash {
87 header := bk.chain.BestBlockHeader()
88 locator := []*bc.Hash{}
92 headerHash := header.Hash()
93 locator = append(locator, &headerHash)
94 if header.Height == 0 {
99 if header.Height < step {
100 header, err = bk.chain.GetHeaderByHeight(0)
102 header, err = bk.chain.GetHeaderByHeight(header.Height - step)
105 log.WithFields(log.Fields{"module": logModule, "err": err}).Error("blockKeeper fail on get blockLocator")
109 if len(locator) >= 9 {
116 func (bk *blockKeeper) fastBlockSync(checkPoint *consensus.Checkpoint) error {
117 bk.resetHeaderState()
118 lastHeader := bk.headerList.Back().Value.(*types.BlockHeader)
119 for ; lastHeader.Hash() != checkPoint.Hash; lastHeader = bk.headerList.Back().Value.(*types.BlockHeader) {
120 if lastHeader.Height >= checkPoint.Height {
121 return errors.Wrap(errPeerMisbehave, "peer is not in the checkpoint branch")
124 lastHash := lastHeader.Hash()
125 headers, err := bk.requireHeaders([]*bc.Hash{&lastHash}, &checkPoint.Hash)
130 if len(headers) == 0 {
131 return errors.Wrap(errPeerMisbehave, "requireHeaders return empty list")
134 if err := bk.appendHeaderList(headers); err != nil {
139 fastHeader := bk.headerList.Front()
140 for bk.chain.BestBlockHeight() < checkPoint.Height {
141 locator := bk.blockLocator()
142 blocks, err := bk.requireBlocks(locator, &checkPoint.Hash)
147 if len(blocks) == 0 {
148 return errors.Wrap(errPeerMisbehave, "requireBlocks return empty list")
151 for _, block := range blocks {
152 if fastHeader = fastHeader.Next(); fastHeader == nil {
153 return errors.New("get block than is higher than checkpoint")
156 blockHash := block.Hash()
157 if blockHash != fastHeader.Value.(*types.BlockHeader).Hash() {
158 return errPeerMisbehave
161 seed, err := bk.chain.CalcNextSeed(&block.PreviousBlockHash)
163 return errors.Wrap(err, "fail on fastBlockSync calculate seed")
166 tensority.AIHash.AddCache(&blockHash, seed, &bc.Hash{})
167 _, err = bk.chain.ProcessBlock(block)
168 tensority.AIHash.RemoveCache(&blockHash, seed)
170 return errors.Wrap(err, "fail on fastBlockSync process block")
177 func (bk *blockKeeper) locateBlocks(locator []*bc.Hash, stopHash *bc.Hash) ([]*types.Block, error) {
178 headers, err := bk.locateHeaders(locator, stopHash)
183 blocks := []*types.Block{}
184 for i, header := range headers {
185 if uint64(i) >= maxBlockPerMsg {
189 headerHash := header.Hash()
190 block, err := bk.chain.GetBlockByHash(&headerHash)
195 blocks = append(blocks, block)
200 func (bk *blockKeeper) locateHeaders(locator []*bc.Hash, stopHash *bc.Hash) ([]*types.BlockHeader, error) {
201 stopHeader, err := bk.chain.GetHeaderByHash(stopHash)
206 startHeader, err := bk.chain.GetHeaderByHeight(0)
211 for _, hash := range locator {
212 header, err := bk.chain.GetHeaderByHash(hash)
213 if err == nil && bk.chain.InMainChain(header.Hash()) {
219 totalHeaders := stopHeader.Height - startHeader.Height
220 if totalHeaders > maxBlockHeadersPerMsg {
221 totalHeaders = maxBlockHeadersPerMsg
224 headers := []*types.BlockHeader{}
225 for i := uint64(1); i <= totalHeaders; i++ {
226 header, err := bk.chain.GetHeaderByHeight(startHeader.Height + i)
231 headers = append(headers, header)
236 func (bk *blockKeeper) nextCheckpoint() *consensus.Checkpoint {
237 height := bk.chain.BestBlockHeader().Height
238 checkpoints := consensus.ActiveNetParams.Checkpoints
239 if len(checkpoints) == 0 || height >= checkpoints[len(checkpoints)-1].Height {
243 nextCheckpoint := &checkpoints[len(checkpoints)-1]
244 for i := len(checkpoints) - 2; i >= 0; i-- {
245 if height >= checkpoints[i].Height {
248 nextCheckpoint = &checkpoints[i]
250 return nextCheckpoint
253 func (bk *blockKeeper) processBlock(peerID string, block *types.Block) {
254 bk.blockProcessCh <- &blockMsg{block: block, peerID: peerID}
257 func (bk *blockKeeper) processBlocks(peerID string, blocks []*types.Block) {
258 bk.blocksProcessCh <- &blocksMsg{blocks: blocks, peerID: peerID}
261 func (bk *blockKeeper) processHeaders(peerID string, headers []*types.BlockHeader) {
262 bk.headersProcessCh <- &headersMsg{headers: headers, peerID: peerID}
265 func (bk *blockKeeper) regularBlockSync(wantHeight uint64) error {
266 i := bk.chain.BestBlockHeight() + 1
267 for i <= wantHeight {
268 block, err := bk.requireBlock(i)
273 isOrphan, err := bk.chain.ProcessBlock(block)
282 i = bk.chain.BestBlockHeight() + 1
287 func (bk *blockKeeper) requireBlock(height uint64) (*types.Block, error) {
288 if ok := bk.syncPeer.getBlockByHeight(height); !ok {
289 return nil, errPeerDropped
292 timeout := time.NewTimer(syncTimeout)
297 case msg := <-bk.blockProcessCh:
298 if msg.peerID != bk.syncPeer.ID() {
301 if msg.block.Height != height {
304 return msg.block, nil
306 return nil, errors.Wrap(errRequestTimeout, "requireBlock")
311 func (bk *blockKeeper) requireBlocks(locator []*bc.Hash, stopHash *bc.Hash) ([]*types.Block, error) {
312 if ok := bk.syncPeer.getBlocks(locator, stopHash); !ok {
313 return nil, errPeerDropped
316 timeout := time.NewTimer(syncTimeout)
321 case msg := <-bk.blocksProcessCh:
322 if msg.peerID != bk.syncPeer.ID() {
325 return msg.blocks, nil
327 return nil, errors.Wrap(errRequestTimeout, "requireBlocks")
332 func (bk *blockKeeper) requireHeaders(locator []*bc.Hash, stopHash *bc.Hash) ([]*types.BlockHeader, error) {
333 if ok := bk.syncPeer.getHeaders(locator, stopHash); !ok {
334 return nil, errPeerDropped
337 timeout := time.NewTimer(syncTimeout)
342 case msg := <-bk.headersProcessCh:
343 if msg.peerID != bk.syncPeer.ID() {
346 return msg.headers, nil
348 return nil, errors.Wrap(errRequestTimeout, "requireHeaders")
353 // resetHeaderState sets the headers-first mode state to values appropriate for
354 // syncing from a new peer.
355 func (bk *blockKeeper) resetHeaderState() {
356 header := bk.chain.BestBlockHeader()
358 if bk.nextCheckpoint() != nil {
359 bk.headerList.PushBack(header)
363 func (bk *blockKeeper) startSync() bool {
364 checkPoint := bk.nextCheckpoint()
365 peer := bk.peers.bestPeer(consensus.SFFastSync | consensus.SFFullNode)
366 if peer != nil && checkPoint != nil && peer.Height() >= checkPoint.Height {
368 if err := bk.fastBlockSync(checkPoint); err != nil {
369 log.WithFields(log.Fields{"module": logModule, "err": err}).Warning("fail on fastBlockSync")
370 bk.peers.errorHandler(peer.ID(), err)
376 blockHeight := bk.chain.BestBlockHeight()
377 peer = bk.peers.bestPeer(consensus.SFFullNode)
378 if peer != nil && peer.Height() > blockHeight {
380 targetHeight := blockHeight + maxBlockPerMsg
381 if targetHeight > peer.Height() {
382 targetHeight = peer.Height()
385 if err := bk.regularBlockSync(targetHeight); err != nil {
386 log.WithFields(log.Fields{"module": logModule, "err": err}).Warning("fail on regularBlockSync")
387 bk.peers.errorHandler(peer.ID(), err)
395 func (bk *blockKeeper) syncWorker() {
396 genesisBlock, err := bk.chain.GetBlockByHeight(0)
398 log.WithFields(log.Fields{"module": logModule, "err": err}).Error("fail on handleStatusRequestMsg get genesis")
401 syncTicker := time.NewTicker(syncCycle)
402 defer syncTicker.Stop()
406 if update := bk.startSync(); !update {
410 block, err := bk.chain.GetBlockByHeight(bk.chain.BestBlockHeight())
412 log.WithFields(log.Fields{"module": logModule, "err": err}).Error("fail on syncWorker get best block")
415 if err := bk.peers.broadcastMinedBlock(block); err != nil {
416 log.WithFields(log.Fields{"module": logModule, "err": err}).Error("fail on syncWorker broadcast new block")
419 if err = bk.peers.broadcastNewStatus(block, genesisBlock); err != nil {
420 log.WithFields(log.Fields{"module": logModule, "err": err}).Error("fail on syncWorker broadcast new status")