From: yahtoo Date: Thu, 25 Jul 2019 01:23:52 +0000 (+0800) Subject: Fix fast sync pending when all request blocks timeout (#347) X-Git-Tag: v1.0.5~78 X-Git-Url: http://git.osdn.net/view?p=bytom%2Fvapor.git;a=commitdiff_plain;h=a6c760fb2983802d13cea1e245a754502fe660f6 Fix fast sync pending when all request blocks timeout (#347) * Fix fast sync pending when all request blocks timeout * Add num of sync peer periodic detection * Fix review error --- diff --git a/netsync/chainmgr/msg_fetcher.go b/netsync/chainmgr/msg_fetcher.go index 2cfda5ec..1435602c 100644 --- a/netsync/chainmgr/msg_fetcher.go +++ b/netsync/chainmgr/msg_fetcher.go @@ -22,9 +22,10 @@ const ( ) var ( - requireBlockTimeout = 20 * time.Second - requireHeadersTimeout = 30 * time.Second - requireBlocksTimeout = 50 * time.Second + requireBlockTimeout = 20 * time.Second + requireHeadersTimeout = 30 * time.Second + requireBlocksTimeout = 50 * time.Second + checkSyncPeerNumInterval = 5 * time.Second errRequestBlocksTimeout = errors.New("request blocks timeout") errRequestTimeout = errors.New("request timeout") @@ -78,10 +79,14 @@ func (mf *msgFetcher) addSyncPeer(peerID string) { func (mf *msgFetcher) collectResultLoop(peerCh chan string, quit chan struct{}, resultCh chan *fetchBlocksResult, workerCloseCh chan struct{}, workSize int) { defer close(workerCloseCh) + ticker := time.NewTicker(checkSyncPeerNumInterval) + defer ticker.Stop() + //collect fetch results - for resultCount := 0; resultCount < workSize && mf.syncPeers.size() > 0; resultCount++ { + for resultCount := 0; resultCount < workSize && mf.syncPeers.size() > 0; { select { case result := <-resultCh: + resultCount++ if result.err != nil { log.WithFields(log.Fields{"module": logModule, "startHeight": result.startHeight, "stopHeight": result.stopHeight, "err": result.err}).Error("failed on fetch blocks") return @@ -93,6 +98,11 @@ func (mf *msgFetcher) collectResultLoop(peerCh chan string, quit chan struct{}, break } peerCh <- peer + case <-ticker.C: + if mf.syncPeers.size() == 0 { + log.WithFields(log.Fields{"module": logModule}).Warn("num of fast sync peer is 0") + return + } case _, ok := <-quit: if !ok { return @@ -107,11 +117,13 @@ func (mf *msgFetcher) fetchBlocks(work *fetchBlocksWork, peerID string) ([]*type stopHash := work.stopHeader.Hash() blocks, err := mf.requireBlocks(peerID, []*bc.Hash{&startHash}, &stopHash) if err != nil { + mf.syncPeers.delete(peerID) mf.peers.ProcessIllegal(peerID, security.LevelConnException, err.Error()) return nil, err } if err := mf.verifyBlocksMsg(blocks, work.startHeader, work.stopHeader); err != nil { + mf.syncPeers.delete(peerID) mf.peers.ProcessIllegal(peerID, security.LevelConnException, err.Error()) return nil, err }