OSDN Git Service

Peer add announces new block message num limit
[bytom/vapor.git] / netsync / chainmgr / fast_sync.go
1 package chainmgr
2
3 import (
4         "sync"
5
6         log "github.com/sirupsen/logrus"
7
8         "github.com/vapor/errors"
9         "github.com/vapor/netsync/peers"
10         "github.com/vapor/p2p/security"
11         "github.com/vapor/protocol/bc"
12         "github.com/vapor/protocol/bc/types"
13 )
14
15 var (
16         minSizeOfSyncSkeleton  = 2
17         maxSizeOfSyncSkeleton  = 11
18         numOfBlocksSkeletonGap = maxNumOfBlocksPerMsg
19         maxNumOfBlocksPerSync  = numOfBlocksSkeletonGap * uint64(maxSizeOfSyncSkeleton-1)
20         fastSyncPivotGap       = uint64(64)
21         minGapStartFastSync    = uint64(128)
22
23         errNoSyncPeer      = errors.New("can't find sync peer")
24         errSkeletonSize    = errors.New("fast sync skeleton size wrong")
25         errNoMainSkeleton  = errors.New("No main skeleton found")
26         errNoSkeletonFound = errors.New("No skeleton found")
27 )
28
29 type fastSync struct {
30         chain          Chain
31         msgFetcher     MsgFetcher
32         blockProcessor BlockProcessor
33         peers          *peers.PeerSet
34         mainSyncPeer   *peers.Peer
35 }
36
37 func newFastSync(chain Chain, msgFetcher MsgFetcher, storage Storage, peers *peers.PeerSet) *fastSync {
38         return &fastSync{
39                 chain:          chain,
40                 msgFetcher:     msgFetcher,
41                 blockProcessor: newBlockProcessor(chain, storage, peers),
42                 peers:          peers,
43         }
44 }
45
46 func (fs *fastSync) blockLocator() []*bc.Hash {
47         header := fs.chain.BestBlockHeader()
48         locator := []*bc.Hash{}
49         step := uint64(1)
50
51         for header != nil {
52                 headerHash := header.Hash()
53                 locator = append(locator, &headerHash)
54                 if header.Height == 0 {
55                         break
56                 }
57
58                 var err error
59                 if header.Height < step {
60                         header, err = fs.chain.GetHeaderByHeight(0)
61                 } else {
62                         header, err = fs.chain.GetHeaderByHeight(header.Height - step)
63                 }
64                 if err != nil {
65                         log.WithFields(log.Fields{"module": logModule, "err": err}).Error("blockKeeper fail on get blockLocator")
66                         break
67                 }
68
69                 if len(locator) >= 9 {
70                         step *= 2
71                 }
72         }
73         return locator
74 }
75
76 // createFetchBlocksTasks get the skeleton and assign tasks according to the skeleton.
77 func (fs *fastSync) createFetchBlocksTasks(stopBlock *types.Block) ([]*fetchBlocksWork, error) {
78         // Find peers that meet the height requirements.
79         peers := fs.peers.GetPeersByHeight(stopBlock.Height + fastSyncPivotGap)
80         if len(peers) == 0 {
81                 return nil, errNoSyncPeer
82         }
83
84         // parallel fetch the skeleton from peers.
85         stopHash := stopBlock.Hash()
86         skeletonMap := fs.msgFetcher.parallelFetchHeaders(peers, fs.blockLocator(), &stopHash, numOfBlocksSkeletonGap-1)
87         if len(skeletonMap) == 0 {
88                 return nil, errNoSkeletonFound
89         }
90
91         mainSkeleton, ok := skeletonMap[fs.mainSyncPeer.ID()]
92         if !ok {
93                 return nil, errNoMainSkeleton
94         }
95
96         if len(mainSkeleton) < minSizeOfSyncSkeleton || len(mainSkeleton) > maxSizeOfSyncSkeleton {
97                 fs.peers.ProcessIllegal(fs.mainSyncPeer.ID(), security.LevelMsgIllegal, errSkeletonSize.Error())
98                 return nil, errSkeletonSize
99         }
100
101         // collect peers that match the skeleton of the primary sync peer
102         fs.msgFetcher.addSyncPeer(fs.mainSyncPeer.ID())
103         delete(skeletonMap, fs.mainSyncPeer.ID())
104         for peerID, skeleton := range skeletonMap {
105                 if len(skeleton) != len(mainSkeleton) {
106                         log.WithFields(log.Fields{"module": logModule, "main skeleton": len(mainSkeleton), "got skeleton": len(skeleton)}).Warn("different skeleton length")
107                         continue
108                 }
109
110                 for i, header := range skeleton {
111                         if header.Hash() != mainSkeleton[i].Hash() {
112                                 log.WithFields(log.Fields{"module": logModule, "header index": i, "main skeleton": mainSkeleton[i].Hash(), "got skeleton": header.Hash()}).Warn("different skeleton hash")
113                                 continue
114                         }
115                 }
116                 fs.msgFetcher.addSyncPeer(peerID)
117         }
118
119         blockFetchTasks := make([]*fetchBlocksWork, 0)
120         // create download task
121         for i := 0; i < len(mainSkeleton)-1; i++ {
122                 blockFetchTasks = append(blockFetchTasks, &fetchBlocksWork{startHeader: mainSkeleton[i], stopHeader: mainSkeleton[i+1]})
123         }
124
125         return blockFetchTasks, nil
126 }
127
128 func (fs *fastSync) process() error {
129         stopBlock, err := fs.findSyncRange()
130         if err != nil {
131                 return err
132         }
133
134         tasks, err := fs.createFetchBlocksTasks(stopBlock)
135         if err != nil {
136                 return err
137         }
138
139         downloadNotifyCh := make(chan struct{}, 1)
140         processStopCh := make(chan struct{})
141         var wg sync.WaitGroup
142         wg.Add(2)
143         go fs.msgFetcher.parallelFetchBlocks(tasks, downloadNotifyCh, processStopCh, &wg)
144         go fs.blockProcessor.process(downloadNotifyCh, processStopCh, tasks[0].startHeader.Height, &wg)
145         wg.Wait()
146         fs.msgFetcher.resetParameter()
147         log.WithFields(log.Fields{"module": logModule, "height": fs.chain.BestBlockHeight()}).Info("fast sync complete")
148         return nil
149 }
150
151 // findSyncRange find the start and end of this sync.
152 // sync length cannot be greater than maxFastSyncBlocksNum.
153 func (fs *fastSync) findSyncRange() (*types.Block, error) {
154         bestHeight := fs.chain.BestBlockHeight()
155         length := fs.mainSyncPeer.IrreversibleHeight() - fastSyncPivotGap - bestHeight
156         if length > maxNumOfBlocksPerSync {
157                 length = maxNumOfBlocksPerSync
158         }
159
160         return fs.msgFetcher.requireBlock(fs.mainSyncPeer.ID(), bestHeight+length)
161 }
162
163 func (fs *fastSync) setSyncPeer(peer *peers.Peer) {
164         fs.mainSyncPeer = peer
165 }