6 log "github.com/sirupsen/logrus"
8 "github.com/bytom/vapor/errors"
9 "github.com/bytom/vapor/netsync/peers"
10 "github.com/bytom/vapor/p2p/security"
11 "github.com/bytom/vapor/protocol/bc"
12 "github.com/bytom/vapor/protocol/bc/types"
16 minSizeOfSyncSkeleton = 2
17 maxSizeOfSyncSkeleton = 11
18 numOfBlocksSkeletonGap = maxNumOfBlocksPerMsg
19 maxNumOfBlocksPerSync = numOfBlocksSkeletonGap * uint64(maxSizeOfSyncSkeleton-1)
20 fastSyncPivotGap = uint64(64)
21 minGapStartFastSync = uint64(128)
23 errNoSyncPeer = errors.New("can't find sync peer")
24 errSkeletonSize = errors.New("fast sync skeleton size wrong")
25 errNoMainSkeleton = errors.New("No main skeleton found")
26 errNoSkeletonFound = errors.New("No skeleton found")
29 type fastSync struct {
32 blockProcessor BlockProcessor
34 mainSyncPeer *peers.Peer
37 func newFastSync(chain Chain, msgFetcher MsgFetcher, storage Storage, peers *peers.PeerSet) *fastSync {
40 msgFetcher: msgFetcher,
41 blockProcessor: newBlockProcessor(chain, storage, peers),
46 func (fs *fastSync) blockLocator() []*bc.Hash {
47 header := fs.chain.BestBlockHeader()
48 locator := []*bc.Hash{}
52 headerHash := header.Hash()
53 locator = append(locator, &headerHash)
54 if header.Height == 0 {
59 if header.Height < step {
60 header, err = fs.chain.GetHeaderByHeight(0)
62 header, err = fs.chain.GetHeaderByHeight(header.Height - step)
65 log.WithFields(log.Fields{"module": logModule, "err": err}).Error("blockKeeper fail on get blockLocator")
69 if len(locator) >= 9 {
76 // createFetchBlocksTasks get the skeleton and assign tasks according to the skeleton.
77 func (fs *fastSync) createFetchBlocksTasks(stopBlock *types.Block) ([]*fetchBlocksWork, error) {
78 // Find peers that meet the height requirements.
79 peers := fs.peers.GetPeersByHeight(stopBlock.Height + fastSyncPivotGap)
81 return nil, errNoSyncPeer
84 // parallel fetch the skeleton from peers.
85 stopHash := stopBlock.Hash()
86 skeletonMap := fs.msgFetcher.parallelFetchHeaders(peers, fs.blockLocator(), &stopHash, numOfBlocksSkeletonGap-1)
87 if len(skeletonMap) == 0 {
88 return nil, errNoSkeletonFound
91 mainSkeleton, ok := skeletonMap[fs.mainSyncPeer.ID()]
93 return nil, errNoMainSkeleton
96 if len(mainSkeleton) < minSizeOfSyncSkeleton {
97 fs.peers.ProcessIllegal(fs.mainSyncPeer.ID(), security.LevelMsgIllegal, errSkeletonSize.Error())
98 return nil, errSkeletonSize
101 // collect peers that match the skeleton of the primary sync peer
102 fs.msgFetcher.addSyncPeer(fs.mainSyncPeer.ID())
103 delete(skeletonMap, fs.mainSyncPeer.ID())
104 for peerID, skeleton := range skeletonMap {
105 if len(skeleton) != len(mainSkeleton) {
106 log.WithFields(log.Fields{"module": logModule, "main skeleton": len(mainSkeleton), "got skeleton": len(skeleton)}).Warn("different skeleton length")
110 for i, header := range skeleton {
111 if header.Hash() != mainSkeleton[i].Hash() {
112 log.WithFields(log.Fields{"module": logModule, "header index": i, "main skeleton": mainSkeleton[i].Hash(), "got skeleton": header.Hash()}).Warn("different skeleton hash")
116 fs.msgFetcher.addSyncPeer(peerID)
119 blockFetchTasks := make([]*fetchBlocksWork, 0)
120 // create download task
121 for i := 0; i < len(mainSkeleton)-1 && i < maxSizeOfSyncSkeleton-1; i++ {
122 blockFetchTasks = append(blockFetchTasks, &fetchBlocksWork{startHeader: mainSkeleton[i], stopHeader: mainSkeleton[i+1]})
125 return blockFetchTasks, nil
128 func (fs *fastSync) process() error {
129 stopBlock, err := fs.findSyncRange()
134 tasks, err := fs.createFetchBlocksTasks(stopBlock)
139 downloadNotifyCh := make(chan struct{}, 1)
140 processStopCh := make(chan struct{})
141 var wg sync.WaitGroup
143 go fs.msgFetcher.parallelFetchBlocks(tasks, downloadNotifyCh, processStopCh, &wg)
144 go fs.blockProcessor.process(downloadNotifyCh, processStopCh, tasks[0].startHeader.Height, &wg)
146 fs.msgFetcher.resetParameter()
147 log.WithFields(log.Fields{"module": logModule, "height": fs.chain.BestBlockHeight()}).Info("fast sync complete")
151 // findSyncRange find the start and end of this sync.
152 // sync length cannot be greater than maxFastSyncBlocksNum.
153 func (fs *fastSync) findSyncRange() (*types.Block, error) {
154 bestHeight := fs.chain.BestBlockHeight()
155 length := fs.mainSyncPeer.IrreversibleHeight() - fastSyncPivotGap - bestHeight
156 if length > maxNumOfBlocksPerSync {
157 length = maxNumOfBlocksPerSync
160 return fs.msgFetcher.requireBlock(fs.mainSyncPeer.ID(), bestHeight+length)
163 func (fs *fastSync) setSyncPeer(peer *peers.Peer) {
164 fs.mainSyncPeer = peer