6 log "github.com/sirupsen/logrus"
8 "github.com/bytom/vapor/consensus"
9 dbm "github.com/bytom/vapor/database/leveldb"
10 "github.com/bytom/vapor/netsync/peers"
11 "github.com/bytom/vapor/p2p/security"
12 "github.com/bytom/vapor/protocol/bc"
13 "github.com/bytom/vapor/protocol/bc/types"
17 syncCycle = 5 * time.Second
25 maxNumOfBlocksPerMsg = uint64(1000)
26 maxNumOfHeadersPerMsg = uint64(1000)
27 maxNumOfBlocksRegularSync = uint64(128)
30 type FastSync interface {
32 setSyncPeer(peer *peers.Peer)
35 type Fetcher interface {
36 processBlock(peerID string, block *types.Block)
37 processBlocks(peerID string, blocks []*types.Block)
38 processHeaders(peerID string, headers []*types.BlockHeader)
39 requireBlock(peerID string, height uint64) (*types.Block, error)
42 type blockMsg struct {
47 type blocksMsg struct {
52 type headersMsg struct {
53 headers []*types.BlockHeader
57 type blockKeeper struct {
67 func newBlockKeeper(chain Chain, peers *peers.PeerSet, fastSyncDB dbm.DB) *blockKeeper {
68 storage := newStorage(fastSyncDB)
69 msgFetcher := newMsgFetcher(storage, peers)
72 fastSync: newFastSync(chain, msgFetcher, storage, peers),
73 msgFetcher: msgFetcher,
75 quit: make(chan struct{}),
79 func (bk *blockKeeper) locateBlocks(locator []*bc.Hash, stopHash *bc.Hash) ([]*types.Block, error) {
80 headers, err := bk.locateHeaders(locator, stopHash, 0, maxNumOfBlocksPerMsg)
85 blocks := []*types.Block{}
86 for _, header := range headers {
87 headerHash := header.Hash()
88 block, err := bk.chain.GetBlockByHash(&headerHash)
93 blocks = append(blocks, block)
98 func (bk *blockKeeper) locateHeaders(locator []*bc.Hash, stopHash *bc.Hash, skip uint64, maxNum uint64) ([]*types.BlockHeader, error) {
99 startHeader, err := bk.chain.GetHeaderByHeight(0)
104 for _, hash := range locator {
105 header, err := bk.chain.GetHeaderByHash(hash)
106 if err == nil && bk.chain.InMainChain(header.Hash()) {
112 headers := make([]*types.BlockHeader, 0)
113 stopHeader, err := bk.chain.GetHeaderByHash(stopHash)
118 if !bk.chain.InMainChain(*stopHash) || stopHeader.Height < startHeader.Height {
122 headers = append(headers, startHeader)
123 if stopHeader.Height == startHeader.Height {
127 for num, index := uint64(0), startHeader.Height; num < maxNum-1; num++ {
129 if index >= stopHeader.Height {
130 headers = append(headers, stopHeader)
134 header, err := bk.chain.GetHeaderByHeight(index)
139 headers = append(headers, header)
145 func (bk *blockKeeper) processBlock(peerID string, block *types.Block) {
146 bk.msgFetcher.processBlock(peerID, block)
149 func (bk *blockKeeper) processBlocks(peerID string, blocks []*types.Block) {
150 bk.msgFetcher.processBlocks(peerID, blocks)
153 func (bk *blockKeeper) processHeaders(peerID string, headers []*types.BlockHeader) {
154 bk.msgFetcher.processHeaders(peerID, headers)
157 func (bk *blockKeeper) regularBlockSync() error {
158 peerHeight := bk.syncPeer.Height()
159 bestHeight := bk.chain.BestBlockHeight()
160 targetHeight := bestHeight + maxNumOfBlocksRegularSync
161 if targetHeight > peerHeight {
162 targetHeight = peerHeight
165 for i := bestHeight + 1; i <= targetHeight; {
166 block, err := bk.msgFetcher.requireBlock(bk.syncPeer.ID(), i)
168 bk.peers.ProcessIllegal(bk.syncPeer.ID(), security.LevelConnException, err.Error())
172 isOrphan, err := bk.chain.ProcessBlock(block)
174 bk.peers.ProcessIllegal(bk.syncPeer.ID(), security.LevelMsgIllegal, err.Error())
183 //This code is used to preventing the sync peer return a dust block which will not change the node's chain status
184 if bestHeight = bk.chain.BestBlockHeight(); i == bestHeight+1 {
185 log.WithFields(log.Fields{"module": logModule, "height": i}).Warn("stop regular sync due to loop sync same height")
191 log.WithFields(log.Fields{"module": logModule, "height": bk.chain.BestBlockHeight()}).Info("regular sync success")
195 func (bk *blockKeeper) start() {
199 func (bk *blockKeeper) checkSyncType() int {
200 bestHeight := bk.chain.BestBlockHeight()
201 peer := bk.peers.BestIrreversiblePeer(consensus.SFFullNode | consensus.SFFastSync)
203 if peerIrreversibleHeight := peer.IrreversibleHeight(); peerIrreversibleHeight >= bestHeight+minGapStartFastSync {
204 bk.fastSync.setSyncPeer(peer)
209 peer = bk.peers.BestPeer(consensus.SFFullNode)
211 log.WithFields(log.Fields{"module": logModule}).Debug("can't find sync peer")
215 if peer.Height() > bestHeight {
217 return regularSyncType
223 func (bk *blockKeeper) startSync() bool {
224 switch bk.checkSyncType() {
226 if err := bk.fastSync.process(); err != nil {
227 log.WithFields(log.Fields{"module": logModule, "err": err}).Warning("failed on fast sync")
230 case regularSyncType:
231 if err := bk.regularBlockSync(); err != nil {
232 log.WithFields(log.Fields{"module": logModule, "err": err}).Warning("fail on regularBlockSync")
242 func (bk *blockKeeper) stop() {
246 func (bk *blockKeeper) syncWorker() {
247 syncTicker := time.NewTicker(syncCycle)
248 defer syncTicker.Stop()
253 if update := bk.startSync(); !update {
257 if err := bk.peers.BroadcastNewStatus(bk.chain.BestBlockHeader(), bk.chain.LastIrreversibleHeader()); err != nil {
258 log.WithFields(log.Fields{"module": logModule, "err": err}).Error("fail on syncWorker broadcast new status")