7 log "github.com/sirupsen/logrus"
9 "github.com/vapor/errors"
10 "github.com/vapor/netsync/peers"
11 "github.com/vapor/p2p/security"
12 "github.com/vapor/protocol/bc"
13 "github.com/vapor/protocol/bc/types"
17 maxNumOfParallelFetchBlocks = 7
18 blockProcessChSize = 1024
19 blocksProcessChSize = 128
20 headersProcessChSize = 1024
21 maxNumOfFastSyncPeers = 128
25 requireBlockTimeout = 20 * time.Second
26 requireHeadersTimeout = 30 * time.Second
27 requireBlocksTimeout = 50 * time.Second
29 errRequestBlocksTimeout = errors.New("request blocks timeout")
30 errRequestTimeout = errors.New("request timeout")
31 errPeerDropped = errors.New("Peer dropped")
32 errSendMsg = errors.New("send message error")
35 type MsgFetcher interface {
37 addSyncPeer(peerID string)
38 requireBlock(peerID string, height uint64) (*types.Block, error)
39 parallelFetchBlocks(work []*fetchBlocksWork, downloadNotifyCh chan struct{}, ProcessStopCh chan struct{}, wg *sync.WaitGroup)
40 parallelFetchHeaders(peers []*peers.Peer, locator []*bc.Hash, stopHash *bc.Hash, skip uint64) map[string][]*types.BlockHeader
43 type fetchBlocksWork struct {
44 startHeader, stopHeader *types.BlockHeader
47 type fetchBlocksResult struct {
48 startHeight, stopHeight uint64
52 type msgFetcher struct {
54 syncPeers *fastSyncPeers
56 blockProcessCh chan *blockMsg
57 blocksProcessCh chan *blocksMsg
58 headersProcessCh chan *headersMsg
59 blocksMsgChanMap map[string]chan []*types.Block
63 func newMsgFetcher(storage Storage, peers *peers.PeerSet) *msgFetcher {
66 syncPeers: newFastSyncPeers(),
68 blockProcessCh: make(chan *blockMsg, blockProcessChSize),
69 blocksProcessCh: make(chan *blocksMsg, blocksProcessChSize),
70 headersProcessCh: make(chan *headersMsg, headersProcessChSize),
71 blocksMsgChanMap: make(map[string]chan []*types.Block),
75 func (mf *msgFetcher) addSyncPeer(peerID string) {
76 mf.syncPeers.add(peerID)
79 func (mf *msgFetcher) collectResultLoop(peerCh chan string, quit chan struct{}, resultCh chan *fetchBlocksResult, workerCloseCh chan struct{}, workSize int) {
80 defer close(workerCloseCh)
81 //collect fetch results
82 for resultCount := 0; resultCount < workSize && mf.syncPeers.size() > 0; resultCount++ {
84 case result := <-resultCh:
85 if result.err != nil {
86 log.WithFields(log.Fields{"module": logModule, "startHeight": result.startHeight, "stopHeight": result.stopHeight, "err": result.err}).Error("failed on fetch blocks")
90 peer, err := mf.syncPeers.selectIdlePeer()
92 log.WithFields(log.Fields{"module": logModule, "err": result.err}).Warn("failed on find fast sync peer")
104 func (mf *msgFetcher) fetchBlocks(work *fetchBlocksWork, peerID string) ([]*types.Block, error) {
105 defer mf.syncPeers.setIdle(peerID)
106 startHash := work.startHeader.Hash()
107 stopHash := work.stopHeader.Hash()
108 blocks, err := mf.requireBlocks(peerID, []*bc.Hash{&startHash}, &stopHash)
110 mf.peers.ProcessIllegal(peerID, security.LevelConnException, err.Error())
114 if err := mf.verifyBlocksMsg(blocks, work.startHeader, work.stopHeader); err != nil {
115 mf.peers.ProcessIllegal(peerID, security.LevelConnException, err.Error())
122 func (mf *msgFetcher) fetchBlocksProcess(work *fetchBlocksWork, peerCh chan string, downloadNotifyCh chan struct{}, closeCh chan struct{}) error {
125 case peerID := <-peerCh:
127 blocks, err := mf.fetchBlocks(work, peerID)
129 log.WithFields(log.Fields{"module": logModule, "startHeight": work.startHeader.Height, "stopHeight": work.stopHeader.Height, "error": err}).Info("failed on fetch blocks")
133 if err := mf.storage.writeBlocks(peerID, blocks); err != nil {
134 log.WithFields(log.Fields{"module": logModule, "error": err}).Info("write block error")
138 // send to block process pool
140 case downloadNotifyCh <- struct{}{}:
145 if blocks[len(blocks)-1].Height >= work.stopHeader.Height-1 {
149 //unfinished work, continue
150 work.startHeader = &blocks[len(blocks)-1].BlockHeader
158 func (mf *msgFetcher) fetchBlocksWorker(workCh chan *fetchBlocksWork, peerCh chan string, resultCh chan *fetchBlocksResult, closeCh chan struct{}, downloadNotifyCh chan struct{}, wg *sync.WaitGroup) {
161 case work := <-workCh:
162 err := mf.fetchBlocksProcess(work, peerCh, downloadNotifyCh, closeCh)
163 resultCh <- &fetchBlocksResult{startHeight: work.startHeader.Height, stopHeight: work.stopHeader.Height, err: err}
171 func (mf *msgFetcher) parallelFetchBlocks(works []*fetchBlocksWork, downloadNotifyCh chan struct{}, ProcessStopCh chan struct{}, wg *sync.WaitGroup) {
172 workSize := len(works)
173 workCh := make(chan *fetchBlocksWork, workSize)
174 peerCh := make(chan string, maxNumOfFastSyncPeers)
175 resultCh := make(chan *fetchBlocksResult, workSize)
176 closeCh := make(chan struct{})
178 for _, work := range works {
181 syncPeers := mf.syncPeers.selectIdlePeers()
182 for i := 0; i < len(syncPeers) && i < maxNumOfFastSyncPeers; i++ {
183 peerCh <- syncPeers[i]
186 var workWg sync.WaitGroup
187 for i := 0; i <= maxNumOfParallelFetchBlocks && i < workSize; i++ {
189 go mf.fetchBlocksWorker(workCh, peerCh, resultCh, closeCh, downloadNotifyCh, &workWg)
192 go mf.collectResultLoop(peerCh, ProcessStopCh, resultCh, closeCh, workSize)
198 close(downloadNotifyCh)
202 func (mf *msgFetcher) parallelFetchHeaders(peers []*peers.Peer, locator []*bc.Hash, stopHash *bc.Hash, skip uint64) map[string][]*types.BlockHeader {
203 result := make(map[string][]*types.BlockHeader)
204 response := make(map[string]bool)
205 for _, peer := range peers {
206 if ok := peer.GetHeaders(locator, stopHash, skip); !ok {
209 result[peer.ID()] = nil
212 timeout := time.NewTimer(requireHeadersTimeout)
216 case msg := <-mf.headersProcessCh:
217 if _, ok := result[msg.peerID]; ok {
218 result[msg.peerID] = append(result[msg.peerID], msg.headers[:]...)
219 response[msg.peerID] = true
220 if len(response) == len(result) {
225 log.WithFields(log.Fields{"module": logModule, "err": errRequestTimeout}).Warn("failed on parallel fetch headers")
231 func (mf *msgFetcher) processBlock(peerID string, block *types.Block) {
232 mf.blockProcessCh <- &blockMsg{block: block, peerID: peerID}
235 func (mf *msgFetcher) processBlocks(peerID string, blocks []*types.Block) {
236 mf.blocksProcessCh <- &blocksMsg{blocks: blocks, peerID: peerID}
238 blocksMsgChan, ok := mf.blocksMsgChanMap[peerID]
241 mf.peers.ProcessIllegal(peerID, security.LevelMsgIllegal, "msg from unsolicited peer")
245 blocksMsgChan <- blocks
248 func (mf *msgFetcher) processHeaders(peerID string, headers []*types.BlockHeader) {
249 mf.headersProcessCh <- &headersMsg{headers: headers, peerID: peerID}
252 func (mf *msgFetcher) requireBlock(peerID string, height uint64) (*types.Block, error) {
253 peer := mf.peers.GetPeer(peerID)
255 return nil, errPeerDropped
258 if ok := peer.GetBlockByHeight(height); !ok {
259 return nil, errSendMsg
262 timeout := time.NewTimer(requireBlockTimeout)
267 case msg := <-mf.blockProcessCh:
268 if msg.peerID != peerID {
271 if msg.block.Height != height {
274 return msg.block, nil
276 return nil, errors.Wrap(errRequestTimeout, "requireBlock")
281 func (mf *msgFetcher) requireBlocks(peerID string, locator []*bc.Hash, stopHash *bc.Hash) ([]*types.Block, error) {
282 peer := mf.peers.GetPeer(peerID)
284 mf.syncPeers.delete(peerID)
285 return nil, errPeerDropped
288 receiveCh := make(chan []*types.Block, 1)
290 mf.blocksMsgChanMap[peerID] = receiveCh
293 if ok := peer.GetBlocks(locator, stopHash); !ok {
294 return nil, errSendMsg
297 timeout := time.NewTimer(requireBlocksTimeout)
300 case blocks := <-receiveCh:
303 return nil, errRequestBlocksTimeout
307 func (mf *msgFetcher) resetParameter() {
308 mf.blocksMsgChanMap = make(map[string]chan []*types.Block)
309 mf.syncPeers = newFastSyncPeers()
310 mf.storage.resetParameter()
314 case <-mf.blocksProcessCh:
315 case <-mf.headersProcessCh:
322 func (mf *msgFetcher) verifyBlocksMsg(blocks []*types.Block, startHeader, stopHeader *types.BlockHeader) error {
324 if len(blocks) == 0 {
325 return errors.New("null blocks msg")
328 // blocks more than request
329 if uint64(len(blocks)) > stopHeader.Height-startHeader.Height+1 {
330 return errors.New("exceed length blocks msg")
333 // verify start block
334 if blocks[0].Hash() != startHeader.Hash() {
335 return errors.New("get mismatch blocks msg")
338 // verify blocks continuity
339 for i := 0; i < len(blocks)-1; i++ {
340 if blocks[i].Hash() != blocks[i+1].PreviousBlockHash {
341 return errors.New("get discontinuous blocks msg")