6 log "github.com/sirupsen/logrus"
8 "github.com/vapor/consensus"
9 dbm "github.com/vapor/database/leveldb"
10 "github.com/vapor/netsync/peers"
11 "github.com/vapor/p2p/security"
12 "github.com/vapor/protocol/bc"
13 "github.com/vapor/protocol/bc/types"
17 syncCycle = 5 * time.Second
25 maxNumOfBlocksPerMsg = uint64(1000)
26 maxNumOfHeadersPerMsg = uint64(1000)
29 type FastSync interface {
31 setSyncPeer(peer *peers.Peer)
34 type Fetcher interface {
35 processBlock(peerID string, block *types.Block)
36 processBlocks(peerID string, blocks []*types.Block)
37 processHeaders(peerID string, headers []*types.BlockHeader)
38 requireBlock(peerID string, height uint64) (*types.Block, error)
41 type blockMsg struct {
46 type blocksMsg struct {
51 type headersMsg struct {
52 headers []*types.BlockHeader
56 type blockKeeper struct {
66 func newBlockKeeper(chain Chain, peers *peers.PeerSet, fastSyncDB dbm.DB) *blockKeeper {
67 storage := newStorage(fastSyncDB)
68 msgFetcher := newMsgFetcher(storage, peers)
71 fastSync: newFastSync(chain, msgFetcher, storage, peers),
72 msgFetcher: msgFetcher,
74 quit: make(chan struct{}),
78 func (bk *blockKeeper) locateBlocks(locator []*bc.Hash, stopHash *bc.Hash) ([]*types.Block, error) {
79 headers, err := bk.locateHeaders(locator, stopHash, 0, maxNumOfBlocksPerMsg)
84 blocks := []*types.Block{}
85 for _, header := range headers {
86 headerHash := header.Hash()
87 block, err := bk.chain.GetBlockByHash(&headerHash)
92 blocks = append(blocks, block)
97 func (bk *blockKeeper) locateHeaders(locator []*bc.Hash, stopHash *bc.Hash, skip uint64, maxNum uint64) ([]*types.BlockHeader, error) {
98 startHeader, err := bk.chain.GetHeaderByHeight(0)
103 for _, hash := range locator {
104 header, err := bk.chain.GetHeaderByHash(hash)
105 if err == nil && bk.chain.InMainChain(header.Hash()) {
111 headers := make([]*types.BlockHeader, 0)
112 stopHeader, err := bk.chain.GetHeaderByHash(stopHash)
117 if !bk.chain.InMainChain(*stopHash) || stopHeader.Height < startHeader.Height {
121 headers = append(headers, startHeader)
122 if stopHeader.Height == startHeader.Height {
126 for num, index := uint64(0), startHeader.Height; num < maxNum-1; num++ {
128 if index >= stopHeader.Height {
129 headers = append(headers, stopHeader)
133 header, err := bk.chain.GetHeaderByHeight(index)
138 headers = append(headers, header)
144 func (bk *blockKeeper) processBlock(peerID string, block *types.Block) {
145 bk.msgFetcher.processBlock(peerID, block)
148 func (bk *blockKeeper) processBlocks(peerID string, blocks []*types.Block) {
149 bk.msgFetcher.processBlocks(peerID, blocks)
152 func (bk *blockKeeper) processHeaders(peerID string, headers []*types.BlockHeader) {
153 bk.msgFetcher.processHeaders(peerID, headers)
156 func (bk *blockKeeper) regularBlockSync() error {
157 peerHeight := bk.syncPeer.Height()
158 bestHeight := bk.chain.BestBlockHeight()
160 for i <= peerHeight {
161 block, err := bk.msgFetcher.requireBlock(bk.syncPeer.ID(), i)
163 bk.peers.ProcessIllegal(bk.syncPeer.ID(), security.LevelConnException, err.Error())
167 isOrphan, err := bk.chain.ProcessBlock(block)
169 bk.peers.ProcessIllegal(bk.syncPeer.ID(), security.LevelMsgIllegal, err.Error())
177 i = bk.chain.BestBlockHeight() + 1
179 log.WithFields(log.Fields{"module": logModule, "height": bk.chain.BestBlockHeight()}).Info("regular sync success")
183 func (bk *blockKeeper) start() {
187 func (bk *blockKeeper) checkSyncType() int {
188 peer := bk.peers.BestIrreversiblePeer(consensus.SFFullNode | consensus.SFFastSync)
190 log.WithFields(log.Fields{"module": logModule}).Debug("can't find fast sync peer")
194 bestHeight := bk.chain.BestBlockHeight()
195 if peerIrreversibleHeight := peer.IrreversibleHeight(); peerIrreversibleHeight >= bestHeight+minGapStartFastSync {
196 bk.fastSync.setSyncPeer(peer)
200 peer = bk.peers.BestPeer(consensus.SFFullNode)
202 log.WithFields(log.Fields{"module": logModule}).Debug("can't find sync peer")
206 peerHeight := peer.Height()
207 if peerHeight > bestHeight {
209 return regularSyncType
215 func (bk *blockKeeper) startSync() bool {
216 switch bk.checkSyncType() {
218 if err := bk.fastSync.process(); err != nil {
219 log.WithFields(log.Fields{"module": logModule, "err": err}).Warning("failed on fast sync")
222 case regularSyncType:
223 if err := bk.regularBlockSync(); err != nil {
224 log.WithFields(log.Fields{"module": logModule, "err": err}).Warning("fail on regularBlockSync")
232 func (bk *blockKeeper) stop() {
236 func (bk *blockKeeper) syncWorker() {
237 syncTicker := time.NewTicker(syncCycle)
238 defer syncTicker.Stop()
243 if update := bk.startSync(); !update {
247 if err := bk.peers.BroadcastNewStatus(bk.chain.BestBlockHeader(), bk.chain.LastIrreversibleHeader()); err != nil {
248 log.WithFields(log.Fields{"module": logModule, "err": err}).Error("fail on syncWorker broadcast new status")