OSDN Git Service

ed426b7c644c26c02bc281aa56a321f3c1d89f7f
[bytom/vapor.git] / netsync / chainmgr / fast_sync.go
1 package chainmgr
2
3 import (
4         "sync"
5
6         log "github.com/sirupsen/logrus"
7
8         "github.com/vapor/errors"
9         "github.com/vapor/netsync/peers"
10         "github.com/vapor/protocol/bc"
11         "github.com/vapor/protocol/bc/types"
12 )
13
14 var (
15         maxNumOfSkeletonPerSync = uint64(10)
16         numOfBlocksSkeletonGap  = maxNumOfBlocksPerMsg
17         maxNumOfBlocksPerSync   = numOfBlocksSkeletonGap * maxNumOfSkeletonPerSync
18         fastSyncPivotGap        = uint64(64)
19         minGapStartFastSync     = uint64(128)
20
21         errNoSyncPeer = errors.New("can't find sync peer")
22 )
23
24 type fastSync struct {
25         chain          Chain
26         msgFetcher     MsgFetcher
27         blockProcessor BlockProcessor
28         peers          *peers.PeerSet
29         mainSyncPeer   *peers.Peer
30 }
31
32 func newFastSync(chain Chain, msgFetcher MsgFetcher, storage Storage, peers *peers.PeerSet) *fastSync {
33         return &fastSync{
34                 chain:          chain,
35                 msgFetcher:     msgFetcher,
36                 blockProcessor: newBlockProcessor(chain, storage, peers),
37                 peers:          peers,
38         }
39 }
40
41 func (fs *fastSync) blockLocator() []*bc.Hash {
42         header := fs.chain.BestBlockHeader()
43         locator := []*bc.Hash{}
44         step := uint64(1)
45
46         for header != nil {
47                 headerHash := header.Hash()
48                 locator = append(locator, &headerHash)
49                 if header.Height == 0 {
50                         break
51                 }
52
53                 var err error
54                 if header.Height < step {
55                         header, err = fs.chain.GetHeaderByHeight(0)
56                 } else {
57                         header, err = fs.chain.GetHeaderByHeight(header.Height - step)
58                 }
59                 if err != nil {
60                         log.WithFields(log.Fields{"module": logModule, "err": err}).Error("blockKeeper fail on get blockLocator")
61                         break
62                 }
63
64                 if len(locator) >= 9 {
65                         step *= 2
66                 }
67         }
68         return locator
69 }
70
71 // createFetchBlocksTasks get the skeleton and assign tasks according to the skeleton.
72 func (fs *fastSync) createFetchBlocksTasks(stopBlock *types.Block) ([]*fetchBlocksWork, error) {
73         // Find peers that meet the height requirements.
74         peers := fs.peers.GetPeersByHeight(stopBlock.Height + fastSyncPivotGap)
75         if len(peers) == 0 {
76                 return nil, errNoSyncPeer
77         }
78
79         // parallel fetch the skeleton from peers.
80         stopHash := stopBlock.Hash()
81         skeletonMap := fs.msgFetcher.parallelFetchHeaders(peers, fs.blockLocator(), &stopHash, numOfBlocksSkeletonGap-1)
82         if len(skeletonMap) == 0 {
83                 return nil, errors.New("No skeleton found")
84         }
85
86         mainSkeleton, ok := skeletonMap[fs.mainSyncPeer.ID()]
87         if !ok {
88                 return nil, errors.New("No main skeleton found")
89         }
90
91         // collect peers that match the skeleton of the primary sync peer
92         fs.msgFetcher.addSyncPeer(fs.mainSyncPeer.ID())
93         delete(skeletonMap, fs.mainSyncPeer.ID())
94         for peerID, skeleton := range skeletonMap {
95                 if len(skeleton) != len(mainSkeleton) {
96                         log.WithFields(log.Fields{"module": logModule, "main skeleton": len(mainSkeleton), "got skeleton": len(skeleton)}).Warn("different skeleton length")
97                         continue
98                 }
99
100                 for i, header := range skeleton {
101                         if header.Hash() != mainSkeleton[i].Hash() {
102                                 log.WithFields(log.Fields{"module": logModule, "header index": i, "main skeleton": mainSkeleton[i].Hash(), "got skeleton": header.Hash()}).Warn("different skeleton hash")
103                                 continue
104                         }
105                 }
106                 fs.msgFetcher.addSyncPeer(peerID)
107         }
108
109         blockFetchTasks := make([]*fetchBlocksWork, 0)
110         // create download task
111         for i := 0; i < len(mainSkeleton)-1; i++ {
112                 blockFetchTasks = append(blockFetchTasks, &fetchBlocksWork{startHeader: mainSkeleton[i], stopHeader: mainSkeleton[i+1]})
113         }
114
115         return blockFetchTasks, nil
116 }
117
118 func (fs *fastSync) process() error {
119         stopBlock, err := fs.findSyncRange()
120         if err != nil {
121                 return err
122         }
123
124         tasks, err := fs.createFetchBlocksTasks(stopBlock)
125         if err != nil {
126                 return err
127         }
128
129         downloadNotifyCh := make(chan struct{}, 1)
130         processStopCh := make(chan struct{})
131         var wg sync.WaitGroup
132         wg.Add(2)
133         go fs.msgFetcher.parallelFetchBlocks(tasks, downloadNotifyCh, processStopCh, &wg)
134         go fs.blockProcessor.process(downloadNotifyCh, processStopCh, tasks[0].startHeader.Height, &wg)
135         wg.Wait()
136         fs.msgFetcher.resetParameter()
137         log.WithFields(log.Fields{"module": logModule, "height": fs.chain.BestBlockHeight()}).Info("fast sync complete")
138         return nil
139 }
140
141 // findSyncRange find the start and end of this sync.
142 // sync length cannot be greater than maxFastSyncBlocksNum.
143 func (fs *fastSync) findSyncRange() (*types.Block, error) {
144         bestHeight := fs.chain.BestBlockHeight()
145         length := fs.mainSyncPeer.IrreversibleHeight() - fastSyncPivotGap - bestHeight
146         if length > maxNumOfBlocksPerSync {
147                 length = maxNumOfBlocksPerSync
148         }
149
150         return fs.msgFetcher.requireBlock(fs.mainSyncPeer.ID(), bestHeight+length)
151 }
152
153 func (fs *fastSync) setSyncPeer(peer *peers.Peer) {
154         fs.mainSyncPeer = peer
155 }