4 log "github.com/sirupsen/logrus"
6 "github.com/vapor/errors"
7 "github.com/vapor/netsync/peers"
8 "github.com/vapor/p2p/security"
9 "github.com/vapor/protocol/bc"
10 "github.com/vapor/protocol/bc/types"
14 maxBlocksPerMsg = uint64(1000)
15 maxHeadersPerMsg = uint64(1000)
16 fastSyncPivotGap = uint64(64)
17 minGapStartFastSync = uint64(128)
18 maxFastSyncBlocksNum = uint64(10000)
20 errOrphanBlock = errors.New("fast sync block is orphan")
23 type MsgFetcher interface {
24 requireBlock(peerID string, height uint64) (*types.Block, error)
25 requireBlocks(peerID string, locator []*bc.Hash, stopHash *bc.Hash) ([]*types.Block, error)
28 type fastSync struct {
33 stopHeader *types.BlockHeader
39 func newFastSync(chain Chain, msgFether MsgFetcher, peers *peers.PeerSet) *fastSync {
42 msgFetcher: msgFether,
44 quite: make(chan struct{}),
48 func (fs *fastSync) blockLocator() []*bc.Hash {
49 header := fs.chain.BestBlockHeader()
50 locator := []*bc.Hash{}
54 headerHash := header.Hash()
55 locator = append(locator, &headerHash)
56 if header.Height == 0 {
61 if header.Height < step {
62 header, err = fs.chain.GetHeaderByHeight(0)
64 header, err = fs.chain.GetHeaderByHeight(header.Height - step)
67 log.WithFields(log.Fields{"module": logModule, "err": err}).Error("blockKeeper fail on get blockLocator")
71 if len(locator) >= 9 {
78 func (fs *fastSync) process() error {
79 if err := fs.findFastSyncRange(); err != nil {
83 stopHash := fs.stopHeader.Hash()
84 for fs.chain.BestBlockHeight() < fs.stopHeader.Height {
85 blocks, err := fs.msgFetcher.requireBlocks(fs.syncPeer.ID(), fs.blockLocator(), &stopHash)
87 fs.peers.ErrorHandler(fs.syncPeer.ID(), security.LevelConnException, err)
91 if err := fs.verifyBlocks(blocks); err != nil {
92 fs.peers.ErrorHandler(fs.syncPeer.ID(), security.LevelMsgIllegal, err)
97 log.WithFields(log.Fields{"module": logModule, "height": fs.chain.BestBlockHeight()}).Info("fast sync success")
101 func (fs *fastSync) findFastSyncRange() error {
102 bestHeight := fs.chain.BestBlockHeight()
103 fs.length = fs.syncPeer.IrreversibleHeight() - fastSyncPivotGap - bestHeight
104 if fs.length > maxFastSyncBlocksNum {
105 fs.length = maxFastSyncBlocksNum
108 stopBlock, err := fs.msgFetcher.requireBlock(fs.syncPeer.ID(), bestHeight+fs.length)
113 fs.stopHeader = &stopBlock.BlockHeader
117 func (fs *fastSync) locateBlocks(locator []*bc.Hash, stopHash *bc.Hash) ([]*types.Block, error) {
118 headers, err := fs.locateHeaders(locator, stopHash, 0, maxBlocksPerMsg)
123 blocks := []*types.Block{}
124 for _, header := range headers {
125 headerHash := header.Hash()
126 block, err := fs.chain.GetBlockByHash(&headerHash)
131 blocks = append(blocks, block)
136 func (fs *fastSync) locateHeaders(locator []*bc.Hash, stopHash *bc.Hash, skip uint64, maxNum uint64) ([]*types.BlockHeader, error) {
137 startHeader, err := fs.chain.GetHeaderByHeight(0)
142 for _, hash := range locator {
143 header, err := fs.chain.GetHeaderByHash(hash)
144 if err == nil && fs.chain.InMainChain(header.Hash()) {
150 headers := make([]*types.BlockHeader, 0)
151 stopHeader, err := fs.chain.GetHeaderByHash(stopHash)
156 if !fs.chain.InMainChain(*stopHash) {
161 for i := startHeader.Height; i <= stopHeader.Height && num < maxNum; i += skip + 1 {
162 header, err := fs.chain.GetHeaderByHeight(i)
167 headers = append(headers, header)
174 func (fs *fastSync) setSyncPeer(peer *peers.Peer) {
178 func (fs *fastSync) verifyBlocks(blocks []*types.Block) error {
179 for _, block := range blocks {
180 isOrphan, err := fs.chain.ProcessBlock(block)
186 log.WithFields(log.Fields{"module": logModule, "height": block.Height, "hash": block.Hash()}).Error("fast sync block is orphan")
187 return errOrphanBlock