7 log "github.com/sirupsen/logrus"
9 "github.com/bytom/consensus"
10 "github.com/bytom/errors"
11 "github.com/bytom/mining/tensority"
12 "github.com/bytom/p2p/security"
13 "github.com/bytom/protocol/bc"
14 "github.com/bytom/protocol/bc/types"
18 syncCycle = 5 * time.Second
19 blockProcessChSize = 1024
20 blocksProcessChSize = 128
21 headersProcessChSize = 1024
25 maxBlockPerMsg = uint64(128)
26 maxBlockHeadersPerMsg = uint64(2048)
27 syncTimeout = 30 * time.Second
29 errAppendHeaders = errors.New("fail to append list due to order dismatch")
30 errRequestTimeout = errors.New("request timeout")
31 errPeerDropped = errors.New("Peer dropped")
32 errPeerMisbehave = errors.New("peer is misbehave")
33 ErrPeerMisbehave = errors.New("peer is misbehave")
36 type blockMsg struct {
41 type blocksMsg struct {
46 type headersMsg struct {
47 headers []*types.BlockHeader
51 type blockKeeper struct {
56 blockProcessCh chan *blockMsg
57 blocksProcessCh chan *blocksMsg
58 headersProcessCh chan *headersMsg
63 func newBlockKeeper(chain Chain, peers *peerSet) *blockKeeper {
67 blockProcessCh: make(chan *blockMsg, blockProcessChSize),
68 blocksProcessCh: make(chan *blocksMsg, blocksProcessChSize),
69 headersProcessCh: make(chan *headersMsg, headersProcessChSize),
70 headerList: list.New(),
77 func (bk *blockKeeper) appendHeaderList(headers []*types.BlockHeader) error {
78 for _, header := range headers {
79 prevHeader := bk.headerList.Back().Value.(*types.BlockHeader)
80 if prevHeader.Hash() != header.PreviousBlockHash {
81 return errAppendHeaders
83 bk.headerList.PushBack(header)
88 func (bk *blockKeeper) blockLocator() []*bc.Hash {
89 header := bk.chain.BestBlockHeader()
90 locator := []*bc.Hash{}
94 headerHash := header.Hash()
95 locator = append(locator, &headerHash)
96 if header.Height == 0 {
101 if header.Height < step {
102 header, err = bk.chain.GetHeaderByHeight(0)
104 header, err = bk.chain.GetHeaderByHeight(header.Height - step)
107 log.WithFields(log.Fields{"module": logModule, "err": err}).Error("blockKeeper fail on get blockLocator")
111 if len(locator) >= 9 {
118 func (bk *blockKeeper) fastBlockSync(checkPoint *consensus.Checkpoint) error {
119 bk.resetHeaderState()
120 lastHeader := bk.headerList.Back().Value.(*types.BlockHeader)
121 for ; lastHeader.Hash() != checkPoint.Hash; lastHeader = bk.headerList.Back().Value.(*types.BlockHeader) {
122 if lastHeader.Height >= checkPoint.Height {
123 return errors.Wrap(errPeerMisbehave, "peer is not in the checkpoint branch")
126 lastHash := lastHeader.Hash()
127 headers, err := bk.requireHeaders([]*bc.Hash{&lastHash}, &checkPoint.Hash)
132 if len(headers) == 0 {
133 return errors.Wrap(errPeerMisbehave, "requireHeaders return empty list")
136 if err := bk.appendHeaderList(headers); err != nil {
141 fastHeader := bk.headerList.Front()
142 for bk.chain.BestBlockHeight() < checkPoint.Height {
143 locator := bk.blockLocator()
144 blocks, err := bk.requireBlocks(locator, &checkPoint.Hash)
149 if len(blocks) == 0 {
150 return errors.Wrap(errPeerMisbehave, "requireBlocks return empty list")
153 for _, block := range blocks {
154 if fastHeader = fastHeader.Next(); fastHeader == nil {
155 return errors.New("get block than is higher than checkpoint")
158 blockHash := block.Hash()
159 if blockHash != fastHeader.Value.(*types.BlockHeader).Hash() {
160 return errPeerMisbehave
163 seed, err := bk.chain.CalcNextSeed(&block.PreviousBlockHash)
165 return errors.Wrap(err, "fail on fastBlockSync calculate seed")
168 tensority.AIHash.AddCache(&blockHash, seed, &bc.Hash{})
169 _, err = bk.chain.ProcessBlock(block)
170 tensority.AIHash.RemoveCache(&blockHash, seed)
172 return errors.Wrap(err, "fail on fastBlockSync process block")
179 func (bk *blockKeeper) locateBlocks(locator []*bc.Hash, stopHash *bc.Hash) ([]*types.Block, error) {
180 headers, err := bk.locateHeaders(locator, stopHash)
185 blocks := []*types.Block{}
186 for i, header := range headers {
187 if uint64(i) >= maxBlockPerMsg {
191 headerHash := header.Hash()
192 block, err := bk.chain.GetBlockByHash(&headerHash)
197 blocks = append(blocks, block)
202 func (bk *blockKeeper) locateHeaders(locator []*bc.Hash, stopHash *bc.Hash) ([]*types.BlockHeader, error) {
203 stopHeader, err := bk.chain.GetHeaderByHash(stopHash)
208 startHeader, err := bk.chain.GetHeaderByHeight(0)
213 for _, hash := range locator {
214 header, err := bk.chain.GetHeaderByHash(hash)
215 if err == nil && bk.chain.InMainChain(header.Hash()) {
221 totalHeaders := stopHeader.Height - startHeader.Height
222 if totalHeaders > maxBlockHeadersPerMsg {
223 totalHeaders = maxBlockHeadersPerMsg
226 headers := []*types.BlockHeader{}
227 for i := uint64(1); i <= totalHeaders; i++ {
228 header, err := bk.chain.GetHeaderByHeight(startHeader.Height + i)
233 headers = append(headers, header)
238 func (bk *blockKeeper) nextCheckpoint() *consensus.Checkpoint {
239 height := bk.chain.BestBlockHeader().Height
240 checkpoints := consensus.ActiveNetParams.Checkpoints
241 if len(checkpoints) == 0 || height >= checkpoints[len(checkpoints)-1].Height {
245 nextCheckpoint := &checkpoints[len(checkpoints)-1]
246 for i := len(checkpoints) - 2; i >= 0; i-- {
247 if height >= checkpoints[i].Height {
250 nextCheckpoint = &checkpoints[i]
252 return nextCheckpoint
255 func (bk *blockKeeper) processBlock(peerID string, block *types.Block) {
256 bk.blockProcessCh <- &blockMsg{block: block, peerID: peerID}
259 func (bk *blockKeeper) processBlocks(peerID string, blocks []*types.Block) {
260 bk.blocksProcessCh <- &blocksMsg{blocks: blocks, peerID: peerID}
263 func (bk *blockKeeper) processHeaders(peerID string, headers []*types.BlockHeader) {
264 bk.headersProcessCh <- &headersMsg{headers: headers, peerID: peerID}
267 func (bk *blockKeeper) regularBlockSync(wantHeight uint64) error {
268 i := bk.chain.BestBlockHeight() + 1
269 for i <= wantHeight {
270 block, err := bk.requireBlock(i)
275 isOrphan, err := bk.chain.ProcessBlock(block)
284 i = bk.chain.BestBlockHeight() + 1
289 func (bk *blockKeeper) requireBlock(height uint64) (*types.Block, error) {
290 if ok := bk.syncPeer.getBlockByHeight(height); !ok {
291 return nil, errPeerDropped
294 timeout := time.NewTimer(syncTimeout)
299 case msg := <-bk.blockProcessCh:
300 if msg.peerID != bk.syncPeer.ID() {
303 if msg.block.Height != height {
306 return msg.block, nil
308 return nil, errors.Wrap(errRequestTimeout, "requireBlock")
313 func (bk *blockKeeper) requireBlocks(locator []*bc.Hash, stopHash *bc.Hash) ([]*types.Block, error) {
314 if ok := bk.syncPeer.getBlocks(locator, stopHash); !ok {
315 return nil, errPeerDropped
318 timeout := time.NewTimer(syncTimeout)
323 case msg := <-bk.blocksProcessCh:
324 if msg.peerID != bk.syncPeer.ID() {
327 return msg.blocks, nil
329 return nil, errors.Wrap(errRequestTimeout, "requireBlocks")
334 func (bk *blockKeeper) requireHeaders(locator []*bc.Hash, stopHash *bc.Hash) ([]*types.BlockHeader, error) {
335 if ok := bk.syncPeer.getHeaders(locator, stopHash); !ok {
336 return nil, errPeerDropped
339 timeout := time.NewTimer(syncTimeout)
344 case msg := <-bk.headersProcessCh:
345 if msg.peerID != bk.syncPeer.ID() {
348 return msg.headers, nil
350 return nil, errors.Wrap(errRequestTimeout, "requireHeaders")
355 // resetHeaderState sets the headers-first mode state to values appropriate for
356 // syncing from a new peer.
357 func (bk *blockKeeper) resetHeaderState() {
358 header := bk.chain.BestBlockHeader()
360 if bk.nextCheckpoint() != nil {
361 bk.headerList.PushBack(header)
365 func (bk *blockKeeper) startSync() bool {
366 checkPoint := bk.nextCheckpoint()
367 peer := bk.peers.bestPeer(consensus.SFFastSync | consensus.SFFullNode)
368 if peer != nil && checkPoint != nil && peer.Height() >= checkPoint.Height {
370 if err := bk.fastBlockSync(checkPoint); err != nil {
371 log.WithFields(log.Fields{"module": logModule, "err": err}).Warning("fail on fastBlockSync")
372 bk.peers.ProcessIllegal(peer.ID(), security.LevelMsgIllegal, err.Error())
378 blockHeight := bk.chain.BestBlockHeight()
379 peer = bk.peers.bestPeer(consensus.SFFullNode)
380 if peer != nil && peer.Height() > blockHeight {
382 targetHeight := blockHeight + maxBlockPerMsg
383 if targetHeight > peer.Height() {
384 targetHeight = peer.Height()
387 if err := bk.regularBlockSync(targetHeight); err != nil {
388 log.WithFields(log.Fields{"module": logModule, "err": err}).Warning("fail on regularBlockSync")
389 bk.peers.ProcessIllegal(peer.ID(), security.LevelMsgIllegal, err.Error())
397 func (bk *blockKeeper) syncWorker() {
398 genesisBlock, err := bk.chain.GetBlockByHeight(0)
400 log.WithFields(log.Fields{"module": logModule, "err": err}).Error("fail on handleStatusRequestMsg get genesis")
403 syncTicker := time.NewTicker(syncCycle)
404 defer syncTicker.Stop()
408 if update := bk.startSync(); !update {
412 block, err := bk.chain.GetBlockByHeight(bk.chain.BestBlockHeight())
414 log.WithFields(log.Fields{"module": logModule, "err": err}).Error("fail on syncWorker get best block")
417 if err := bk.peers.broadcastMinedBlock(block); err != nil {
418 log.WithFields(log.Fields{"module": logModule, "err": err}).Error("fail on syncWorker broadcast new block")
421 if err = bk.peers.broadcastNewStatus(block, genesisBlock); err != nil {
422 log.WithFields(log.Fields{"module": logModule, "err": err}).Error("fail on syncWorker broadcast new status")