OSDN Git Service

Fix fast sync pending when all request blocks timeout (#347)
[bytom/vapor.git] / netsync / chainmgr / msg_fetcher.go
index 2cfda5e..1435602 100644 (file)
@@ -22,9 +22,10 @@ const (
 )
 
 var (
-       requireBlockTimeout   = 20 * time.Second
-       requireHeadersTimeout = 30 * time.Second
-       requireBlocksTimeout  = 50 * time.Second
+       requireBlockTimeout      = 20 * time.Second
+       requireHeadersTimeout    = 30 * time.Second
+       requireBlocksTimeout     = 50 * time.Second
+       checkSyncPeerNumInterval = 5 * time.Second
 
        errRequestBlocksTimeout = errors.New("request blocks timeout")
        errRequestTimeout       = errors.New("request timeout")
@@ -78,10 +79,14 @@ func (mf *msgFetcher) addSyncPeer(peerID string) {
 
 func (mf *msgFetcher) collectResultLoop(peerCh chan string, quit chan struct{}, resultCh chan *fetchBlocksResult, workerCloseCh chan struct{}, workSize int) {
        defer close(workerCloseCh)
+       ticker := time.NewTicker(checkSyncPeerNumInterval)
+       defer ticker.Stop()
+
        //collect fetch results
-       for resultCount := 0; resultCount < workSize && mf.syncPeers.size() > 0; resultCount++ {
+       for resultCount := 0; resultCount < workSize && mf.syncPeers.size() > 0; {
                select {
                case result := <-resultCh:
+                       resultCount++
                        if result.err != nil {
                                log.WithFields(log.Fields{"module": logModule, "startHeight": result.startHeight, "stopHeight": result.stopHeight, "err": result.err}).Error("failed on fetch blocks")
                                return
@@ -93,6 +98,11 @@ func (mf *msgFetcher) collectResultLoop(peerCh chan string, quit chan struct{},
                                break
                        }
                        peerCh <- peer
+               case <-ticker.C:
+                       if mf.syncPeers.size() == 0 {
+                               log.WithFields(log.Fields{"module": logModule}).Warn("num of fast sync peer is 0")
+                               return
+                       }
                case _, ok := <-quit:
                        if !ok {
                                return
@@ -107,11 +117,13 @@ func (mf *msgFetcher) fetchBlocks(work *fetchBlocksWork, peerID string) ([]*type
        stopHash := work.stopHeader.Hash()
        blocks, err := mf.requireBlocks(peerID, []*bc.Hash{&startHash}, &stopHash)
        if err != nil {
+               mf.syncPeers.delete(peerID)
                mf.peers.ProcessIllegal(peerID, security.LevelConnException, err.Error())
                return nil, err
        }
 
        if err := mf.verifyBlocksMsg(blocks, work.startHeader, work.stopHeader); err != nil {
+               mf.syncPeers.delete(peerID)
                mf.peers.ProcessIllegal(peerID, security.LevelConnException, err.Error())
                return nil, err
        }