7 log "github.com/sirupsen/logrus"
9 "github.com/vapor/errors"
10 "github.com/vapor/netsync/peers"
11 "github.com/vapor/p2p/security"
12 "github.com/vapor/protocol/bc"
13 "github.com/vapor/protocol/bc/types"
17 maxNumOfParallelFetchBlocks = 7
18 blockProcessChSize = 1024
19 blocksProcessChSize = 128
20 headersProcessChSize = 1024
21 maxNumOfFastSyncPeers = 128
25 requireBlockTimeout = 20 * time.Second
26 requireHeadersTimeout = 30 * time.Second
27 requireBlocksTimeout = 50 * time.Second
28 checkSyncPeerNumInterval = 5 * time.Second
30 errRequestBlocksTimeout = errors.New("request blocks timeout")
31 errRequestTimeout = errors.New("request timeout")
32 errPeerDropped = errors.New("Peer dropped")
33 errSendMsg = errors.New("send message error")
36 type MsgFetcher interface {
38 addSyncPeer(peerID string)
39 requireBlock(peerID string, height uint64) (*types.Block, error)
40 parallelFetchBlocks(work []*fetchBlocksWork, downloadNotifyCh chan struct{}, ProcessStopCh chan struct{}, wg *sync.WaitGroup)
41 parallelFetchHeaders(peers []*peers.Peer, locator []*bc.Hash, stopHash *bc.Hash, skip uint64) map[string][]*types.BlockHeader
44 type fetchBlocksWork struct {
45 startHeader, stopHeader *types.BlockHeader
48 type fetchBlocksResult struct {
49 startHeight, stopHeight uint64
53 type msgFetcher struct {
55 syncPeers *fastSyncPeers
57 blockProcessCh chan *blockMsg
58 blocksProcessCh chan *blocksMsg
59 headersProcessCh chan *headersMsg
60 blocksMsgChanMap map[string]chan []*types.Block
64 func newMsgFetcher(storage Storage, peers *peers.PeerSet) *msgFetcher {
67 syncPeers: newFastSyncPeers(),
69 blockProcessCh: make(chan *blockMsg, blockProcessChSize),
70 blocksProcessCh: make(chan *blocksMsg, blocksProcessChSize),
71 headersProcessCh: make(chan *headersMsg, headersProcessChSize),
72 blocksMsgChanMap: make(map[string]chan []*types.Block),
76 func (mf *msgFetcher) addSyncPeer(peerID string) {
77 mf.syncPeers.add(peerID)
80 func (mf *msgFetcher) collectResultLoop(peerCh chan string, quit chan struct{}, resultCh chan *fetchBlocksResult, workerCloseCh chan struct{}, workSize int) {
81 defer close(workerCloseCh)
82 ticker := time.NewTicker(checkSyncPeerNumInterval)
85 //collect fetch results
86 for resultCount := 0; resultCount < workSize && mf.syncPeers.size() > 0; {
88 case result := <-resultCh:
90 if result.err != nil {
91 log.WithFields(log.Fields{"module": logModule, "startHeight": result.startHeight, "stopHeight": result.stopHeight, "err": result.err}).Error("failed on fetch blocks")
95 peer, err := mf.syncPeers.selectIdlePeer()
97 log.WithFields(log.Fields{"module": logModule, "err": result.err}).Warn("failed on find fast sync peer")
102 if mf.syncPeers.size() == 0 {
103 log.WithFields(log.Fields{"module": logModule}).Warn("num of fast sync peer is 0")
106 case _, ok := <-quit:
114 func (mf *msgFetcher) fetchBlocks(work *fetchBlocksWork, peerID string) ([]*types.Block, error) {
115 defer mf.syncPeers.setIdle(peerID)
116 startHash := work.startHeader.Hash()
117 stopHash := work.stopHeader.Hash()
118 blocks, err := mf.requireBlocks(peerID, []*bc.Hash{&startHash}, &stopHash)
120 mf.syncPeers.delete(peerID)
121 mf.peers.ProcessIllegal(peerID, security.LevelConnException, err.Error())
125 if err := mf.verifyBlocksMsg(blocks, work.startHeader, work.stopHeader); err != nil {
126 mf.syncPeers.delete(peerID)
127 mf.peers.ProcessIllegal(peerID, security.LevelConnException, err.Error())
134 func (mf *msgFetcher) fetchBlocksProcess(work *fetchBlocksWork, peerCh chan string, downloadNotifyCh chan struct{}, closeCh chan struct{}) error {
137 case peerID := <-peerCh:
139 blocks, err := mf.fetchBlocks(work, peerID)
141 log.WithFields(log.Fields{"module": logModule, "startHeight": work.startHeader.Height, "stopHeight": work.stopHeader.Height, "error": err}).Info("failed on fetch blocks")
145 if err := mf.storage.writeBlocks(peerID, blocks); err != nil {
146 log.WithFields(log.Fields{"module": logModule, "error": err}).Info("write block error")
150 // send to block process pool
152 case downloadNotifyCh <- struct{}{}:
157 if blocks[len(blocks)-1].Height >= work.stopHeader.Height-1 {
161 //unfinished work, continue
162 work.startHeader = &blocks[len(blocks)-1].BlockHeader
170 func (mf *msgFetcher) fetchBlocksWorker(workCh chan *fetchBlocksWork, peerCh chan string, resultCh chan *fetchBlocksResult, closeCh chan struct{}, downloadNotifyCh chan struct{}, wg *sync.WaitGroup) {
173 case work := <-workCh:
174 err := mf.fetchBlocksProcess(work, peerCh, downloadNotifyCh, closeCh)
175 resultCh <- &fetchBlocksResult{startHeight: work.startHeader.Height, stopHeight: work.stopHeader.Height, err: err}
183 func (mf *msgFetcher) parallelFetchBlocks(works []*fetchBlocksWork, downloadNotifyCh chan struct{}, ProcessStopCh chan struct{}, wg *sync.WaitGroup) {
184 workSize := len(works)
185 workCh := make(chan *fetchBlocksWork, workSize)
186 peerCh := make(chan string, maxNumOfFastSyncPeers)
187 resultCh := make(chan *fetchBlocksResult, workSize)
188 closeCh := make(chan struct{})
190 for _, work := range works {
193 syncPeers := mf.syncPeers.selectIdlePeers()
194 for i := 0; i < len(syncPeers) && i < maxNumOfFastSyncPeers; i++ {
195 peerCh <- syncPeers[i]
198 var workWg sync.WaitGroup
199 for i := 0; i <= maxNumOfParallelFetchBlocks && i < workSize; i++ {
201 go mf.fetchBlocksWorker(workCh, peerCh, resultCh, closeCh, downloadNotifyCh, &workWg)
204 go mf.collectResultLoop(peerCh, ProcessStopCh, resultCh, closeCh, workSize)
210 close(downloadNotifyCh)
214 func (mf *msgFetcher) parallelFetchHeaders(peers []*peers.Peer, locator []*bc.Hash, stopHash *bc.Hash, skip uint64) map[string][]*types.BlockHeader {
215 result := make(map[string][]*types.BlockHeader)
216 response := make(map[string]bool)
217 for _, peer := range peers {
218 if ok := peer.GetHeaders(locator, stopHash, skip); !ok {
221 result[peer.ID()] = nil
224 timeout := time.NewTimer(requireHeadersTimeout)
228 case msg := <-mf.headersProcessCh:
229 if _, ok := result[msg.peerID]; ok {
230 result[msg.peerID] = append(result[msg.peerID], msg.headers[:]...)
231 response[msg.peerID] = true
232 if len(response) == len(result) {
237 log.WithFields(log.Fields{"module": logModule, "err": errRequestTimeout}).Warn("failed on parallel fetch headers")
243 func (mf *msgFetcher) processBlock(peerID string, block *types.Block) {
244 mf.blockProcessCh <- &blockMsg{block: block, peerID: peerID}
247 func (mf *msgFetcher) processBlocks(peerID string, blocks []*types.Block) {
248 mf.blocksProcessCh <- &blocksMsg{blocks: blocks, peerID: peerID}
250 blocksMsgChan, ok := mf.blocksMsgChanMap[peerID]
253 mf.peers.ProcessIllegal(peerID, security.LevelMsgIllegal, "msg from unsolicited peer")
257 blocksMsgChan <- blocks
260 func (mf *msgFetcher) processHeaders(peerID string, headers []*types.BlockHeader) {
261 mf.headersProcessCh <- &headersMsg{headers: headers, peerID: peerID}
264 func (mf *msgFetcher) requireBlock(peerID string, height uint64) (*types.Block, error) {
265 peer := mf.peers.GetPeer(peerID)
267 return nil, errPeerDropped
270 if ok := peer.GetBlockByHeight(height); !ok {
271 return nil, errSendMsg
274 timeout := time.NewTimer(requireBlockTimeout)
279 case msg := <-mf.blockProcessCh:
280 if msg.peerID != peerID {
283 if msg.block.Height != height {
286 return msg.block, nil
288 return nil, errors.Wrap(errRequestTimeout, "requireBlock")
293 func (mf *msgFetcher) requireBlocks(peerID string, locator []*bc.Hash, stopHash *bc.Hash) ([]*types.Block, error) {
294 peer := mf.peers.GetPeer(peerID)
296 mf.syncPeers.delete(peerID)
297 return nil, errPeerDropped
300 receiveCh := make(chan []*types.Block, 1)
302 mf.blocksMsgChanMap[peerID] = receiveCh
305 if ok := peer.GetBlocks(locator, stopHash); !ok {
306 return nil, errSendMsg
309 timeout := time.NewTimer(requireBlocksTimeout)
312 case blocks := <-receiveCh:
315 return nil, errRequestBlocksTimeout
319 func (mf *msgFetcher) resetParameter() {
320 mf.blocksMsgChanMap = make(map[string]chan []*types.Block)
321 mf.syncPeers = newFastSyncPeers()
322 mf.storage.resetParameter()
326 case <-mf.blocksProcessCh:
327 case <-mf.headersProcessCh:
334 func (mf *msgFetcher) verifyBlocksMsg(blocks []*types.Block, startHeader, stopHeader *types.BlockHeader) error {
336 if len(blocks) == 0 {
337 return errors.New("null blocks msg")
340 // blocks more than request
341 if uint64(len(blocks)) > stopHeader.Height-startHeader.Height+1 {
342 return errors.New("exceed length blocks msg")
345 // verify start block
346 if blocks[0].Hash() != startHeader.Hash() {
347 return errors.New("get mismatch blocks msg")
350 // verify blocks continuity
351 for i := 0; i < len(blocks)-1; i++ {
352 if blocks[i].Hash() != blocks[i+1].PreviousBlockHash {
353 return errors.New("get discontinuous blocks msg")