6 log "github.com/sirupsen/logrus"
8 "github.com/vapor/consensus"
9 dbm "github.com/vapor/database/leveldb"
10 "github.com/vapor/netsync/peers"
11 "github.com/vapor/p2p/security"
12 "github.com/vapor/protocol/bc"
13 "github.com/vapor/protocol/bc/types"
17 syncCycle = 5 * time.Second
25 maxNumOfBlocksPerMsg = uint64(1000)
26 maxNumOfHeadersPerMsg = uint64(1000)
27 maxNumOfBlocksRegularSync = uint64(128)
30 type FastSync interface {
32 setSyncPeer(peer *peers.Peer)
35 type Fetcher interface {
36 processBlock(peerID string, block *types.Block)
37 processBlocks(peerID string, blocks []*types.Block)
38 processHeaders(peerID string, headers []*types.BlockHeader)
39 requireBlock(peerID string, height uint64) (*types.Block, error)
42 type blockMsg struct {
47 type blocksMsg struct {
52 type headersMsg struct {
53 headers []*types.BlockHeader
57 type blockKeeper struct {
67 func newBlockKeeper(chain Chain, peers *peers.PeerSet, fastSyncDB dbm.DB) *blockKeeper {
68 storage := newStorage(fastSyncDB)
69 msgFetcher := newMsgFetcher(storage, peers)
72 fastSync: newFastSync(chain, msgFetcher, storage, peers),
73 msgFetcher: msgFetcher,
75 quit: make(chan struct{}),
79 func (bk *blockKeeper) locateBlocks(locator []*bc.Hash, stopHash *bc.Hash) ([]*types.Block, error) {
80 headers, err := bk.locateHeaders(locator, stopHash, 0, maxNumOfBlocksPerMsg)
85 blocks := []*types.Block{}
86 for _, header := range headers {
87 headerHash := header.Hash()
88 block, err := bk.chain.GetBlockByHash(&headerHash)
93 blocks = append(blocks, block)
98 func (bk *blockKeeper) locateHeaders(locator []*bc.Hash, stopHash *bc.Hash, skip uint64, maxNum uint64) ([]*types.BlockHeader, error) {
99 startHeader, err := bk.chain.GetHeaderByHeight(0)
104 for _, hash := range locator {
105 header, err := bk.chain.GetHeaderByHash(hash)
106 if err == nil && bk.chain.InMainChain(header.Hash()) {
112 headers := make([]*types.BlockHeader, 0)
113 stopHeader, err := bk.chain.GetHeaderByHash(stopHash)
118 if !bk.chain.InMainChain(*stopHash) || stopHeader.Height < startHeader.Height {
122 headers = append(headers, startHeader)
123 if stopHeader.Height == startHeader.Height {
127 for num, index := uint64(0), startHeader.Height; num < maxNum-1; num++ {
129 if index >= stopHeader.Height {
130 headers = append(headers, stopHeader)
134 header, err := bk.chain.GetHeaderByHeight(index)
139 headers = append(headers, header)
145 func (bk *blockKeeper) processBlock(peerID string, block *types.Block) {
146 bk.msgFetcher.processBlock(peerID, block)
149 func (bk *blockKeeper) processBlocks(peerID string, blocks []*types.Block) {
150 bk.msgFetcher.processBlocks(peerID, blocks)
153 func (bk *blockKeeper) processHeaders(peerID string, headers []*types.BlockHeader) {
154 bk.msgFetcher.processHeaders(peerID, headers)
157 func (bk *blockKeeper) regularBlockSync() error {
158 peerHeight := bk.syncPeer.Height()
159 bestHeight := bk.chain.BestBlockHeight()
160 targetHeight := bestHeight + maxNumOfBlocksRegularSync
161 if targetHeight > peerHeight {
162 targetHeight = peerHeight
166 for i <= targetHeight {
167 block, err := bk.msgFetcher.requireBlock(bk.syncPeer.ID(), i)
169 bk.peers.ProcessIllegal(bk.syncPeer.ID(), security.LevelConnException, err.Error())
173 isOrphan, err := bk.chain.ProcessBlock(block)
175 bk.peers.ProcessIllegal(bk.syncPeer.ID(), security.LevelMsgIllegal, err.Error())
183 i = bk.chain.BestBlockHeight() + 1
185 log.WithFields(log.Fields{"module": logModule, "height": bk.chain.BestBlockHeight()}).Info("regular sync success")
189 func (bk *blockKeeper) start() {
193 func (bk *blockKeeper) checkSyncType() int {
194 bestHeight := bk.chain.BestBlockHeight()
195 peer := bk.peers.BestIrreversiblePeer(consensus.SFFullNode | consensus.SFFastSync)
197 if peerIrreversibleHeight := peer.IrreversibleHeight(); peerIrreversibleHeight >= bestHeight+minGapStartFastSync {
198 bk.fastSync.setSyncPeer(peer)
203 peer = bk.peers.BestPeer(consensus.SFFullNode)
205 log.WithFields(log.Fields{"module": logModule}).Debug("can't find sync peer")
209 if peer.Height() > bestHeight {
211 return regularSyncType
217 func (bk *blockKeeper) startSync() bool {
218 switch bk.checkSyncType() {
220 if err := bk.fastSync.process(); err != nil {
221 log.WithFields(log.Fields{"module": logModule, "err": err}).Warning("failed on fast sync")
224 case regularSyncType:
225 if err := bk.regularBlockSync(); err != nil {
226 log.WithFields(log.Fields{"module": logModule, "err": err}).Warning("fail on regularBlockSync")
236 func (bk *blockKeeper) stop() {
240 func (bk *blockKeeper) syncWorker() {
241 syncTicker := time.NewTicker(syncCycle)
242 defer syncTicker.Stop()
247 if update := bk.startSync(); !update {
251 if err := bk.peers.BroadcastNewStatus(bk.chain.BestBlockHeader(), bk.chain.LastIrreversibleHeader()); err != nil {
252 log.WithFields(log.Fields{"module": logModule, "err": err}).Error("fail on syncWorker broadcast new status")