6 log "github.com/sirupsen/logrus"
8 "github.com/bytom/vapor/consensus"
9 dbm "github.com/bytom/vapor/database/leveldb"
10 "github.com/bytom/vapor/errors"
11 "github.com/bytom/vapor/netsync/peers"
12 "github.com/bytom/vapor/p2p/security"
13 "github.com/bytom/vapor/protocol"
14 "github.com/bytom/vapor/protocol/bc"
15 "github.com/bytom/vapor/protocol/bc/types"
19 syncCycle = 5 * time.Second
27 maxNumOfBlocksPerMsg = uint64(64)
28 maxNumOfHeadersPerMsg = uint64(1000)
29 maxNumOfBlocksRegularSync = uint64(128)
32 // Fetcher is the interface for fetch struct
33 type Fetcher interface {
34 processBlock(peerID string, block *types.Block)
35 processBlocks(peerID string, blocks []*types.Block)
36 processHeaders(peerID string, headers []*types.BlockHeader)
37 requireBlock(peerID string, height uint64) (*types.Block, error)
40 type blockMsg struct {
45 type blocksMsg struct {
50 type headersMsg struct {
51 headers []*types.BlockHeader
55 type blockKeeper struct {
65 func newBlockKeeper(chain Chain, peers *peers.PeerSet, fastSyncDB dbm.DB) *blockKeeper {
66 storage := newStorage(fastSyncDB)
67 msgFetcher := newMsgFetcher(storage, peers)
70 fastSync: newFastSync(chain, msgFetcher, storage, peers),
71 msgFetcher: msgFetcher,
73 quit: make(chan struct{}),
77 func (bk *blockKeeper) locateBlocks(locator []*bc.Hash, stopHash *bc.Hash, isTimeout func() bool) ([]*types.Block, error) {
78 headers, err := bk.locateHeaders(locator, stopHash, 0, maxNumOfBlocksPerMsg)
83 blocks := []*types.Block{}
84 for _, header := range headers {
85 headerHash := header.Hash()
86 block, err := bk.chain.GetBlockByHash(&headerHash)
91 blocks = append(blocks, block)
99 func (bk *blockKeeper) locateHeaders(locator []*bc.Hash, stopHash *bc.Hash, skip uint64, maxNum uint64) ([]*types.BlockHeader, error) {
100 startHeader, err := bk.chain.GetHeaderByHeight(0)
105 for _, hash := range locator {
106 header, err := bk.chain.GetHeaderByHash(hash)
107 if err == nil && bk.chain.InMainChain(header.Hash()) {
113 headers := make([]*types.BlockHeader, 0)
114 stopHeader, err := bk.chain.GetHeaderByHash(stopHash)
119 if !bk.chain.InMainChain(*stopHash) || stopHeader.Height < startHeader.Height {
123 headers = append(headers, startHeader)
124 if stopHeader.Height == startHeader.Height {
128 for num, index := uint64(0), startHeader.Height; num < maxNum-1; num++ {
130 if index >= stopHeader.Height {
131 headers = append(headers, stopHeader)
135 header, err := bk.chain.GetHeaderByHeight(index)
140 headers = append(headers, header)
146 func (bk *blockKeeper) processBlock(peerID string, block *types.Block) {
147 bk.msgFetcher.processBlock(peerID, block)
150 func (bk *blockKeeper) processBlocks(peerID string, blocks []*types.Block) {
151 bk.msgFetcher.processBlocks(peerID, blocks)
154 func (bk *blockKeeper) processHeaders(peerID string, headers []*types.BlockHeader) {
155 bk.msgFetcher.processHeaders(peerID, headers)
158 func (bk *blockKeeper) regularBlockSync() error {
159 peerHeight := bk.syncPeer.Height()
160 bestHeight := bk.chain.BestBlockHeight()
161 targetHeight := bestHeight + maxNumOfBlocksRegularSync
162 if targetHeight > peerHeight {
163 targetHeight = peerHeight
166 for i := bestHeight + 1; i <= targetHeight; {
167 block, err := bk.msgFetcher.requireBlock(bk.syncPeer.ID(), i)
169 bk.peers.ProcessIllegal(bk.syncPeer.ID(), security.LevelConnException, err.Error())
173 isOrphan, err := bk.chain.ProcessBlock(block)
175 if errors.Root(err) != protocol.ErrDoubleSignBlock {
176 bk.peers.ProcessIllegal(bk.syncPeer.ID(), security.LevelMsgIllegal, err.Error())
186 //This code is used to preventing the sync peer return a dust block which will not change the node's chain status
187 if bestHeight = bk.chain.BestBlockHeight(); i == bestHeight+1 {
188 log.WithFields(log.Fields{"module": logModule, "height": i}).Warn("stop regular sync due to loop sync same height")
194 log.WithFields(log.Fields{"module": logModule, "height": bk.chain.BestBlockHeight()}).Info("regular sync success")
198 func (bk *blockKeeper) start() {
202 func (bk *blockKeeper) checkSyncType() int {
203 bestHeight := bk.chain.BestBlockHeight()
204 peer := bk.peers.BestIrreversiblePeer(consensus.SFFullNode | consensus.SFFastSync)
206 if peerIrreversibleHeight := peer.Height(); peerIrreversibleHeight >= bestHeight+minGapStartFastSync {
207 bk.fastSync.setSyncPeer(peer)
212 peer = bk.peers.BestPeer(consensus.SFFullNode)
214 log.WithFields(log.Fields{"module": logModule}).Debug("can't find sync peer")
218 if peer.Height() > bestHeight {
220 return regularSyncType
226 func (bk *blockKeeper) startSync() bool {
227 switch bk.checkSyncType() {
229 if err := bk.fastSync.process(); err != nil {
230 log.WithFields(log.Fields{"module": logModule, "err": err}).Warning("failed on fast sync")
233 case regularSyncType:
234 if err := bk.regularBlockSync(); err != nil {
235 log.WithFields(log.Fields{"module": logModule, "err": err}).Warning("fail on regularBlockSync")
245 func (bk *blockKeeper) stop() {
249 func (bk *blockKeeper) syncWorker() {
250 syncTicker := time.NewTicker(syncCycle)
251 defer syncTicker.Stop()
256 if update := bk.startSync(); !update {
260 if err := bk.peers.BroadcastNewStatus(bk.chain.BestBlockHeader(), bk.chain.LastIrreversibleHeader()); err != nil {
261 log.WithFields(log.Fields{"module": logModule, "err": err}).Error("fail on syncWorker broadcast new status")