* Fix fast sync pending when all request blocks timeout
* Add num of sync peer periodic detection
* Fix review error
- requireBlockTimeout = 20 * time.Second
- requireHeadersTimeout = 30 * time.Second
- requireBlocksTimeout = 50 * time.Second
+ requireBlockTimeout = 20 * time.Second
+ requireHeadersTimeout = 30 * time.Second
+ requireBlocksTimeout = 50 * time.Second
+ checkSyncPeerNumInterval = 5 * time.Second
errRequestBlocksTimeout = errors.New("request blocks timeout")
errRequestTimeout = errors.New("request timeout")
errRequestBlocksTimeout = errors.New("request blocks timeout")
errRequestTimeout = errors.New("request timeout")
func (mf *msgFetcher) collectResultLoop(peerCh chan string, quit chan struct{}, resultCh chan *fetchBlocksResult, workerCloseCh chan struct{}, workSize int) {
defer close(workerCloseCh)
func (mf *msgFetcher) collectResultLoop(peerCh chan string, quit chan struct{}, resultCh chan *fetchBlocksResult, workerCloseCh chan struct{}, workSize int) {
defer close(workerCloseCh)
+ ticker := time.NewTicker(checkSyncPeerNumInterval)
+ defer ticker.Stop()
+
- for resultCount := 0; resultCount < workSize && mf.syncPeers.size() > 0; resultCount++ {
+ for resultCount := 0; resultCount < workSize && mf.syncPeers.size() > 0; {
select {
case result := <-resultCh:
select {
case result := <-resultCh:
if result.err != nil {
log.WithFields(log.Fields{"module": logModule, "startHeight": result.startHeight, "stopHeight": result.stopHeight, "err": result.err}).Error("failed on fetch blocks")
return
if result.err != nil {
log.WithFields(log.Fields{"module": logModule, "startHeight": result.startHeight, "stopHeight": result.stopHeight, "err": result.err}).Error("failed on fetch blocks")
return
+ case <-ticker.C:
+ if mf.syncPeers.size() == 0 {
+ log.WithFields(log.Fields{"module": logModule}).Warn("num of fast sync peer is 0")
+ return
+ }
case _, ok := <-quit:
if !ok {
return
case _, ok := <-quit:
if !ok {
return
stopHash := work.stopHeader.Hash()
blocks, err := mf.requireBlocks(peerID, []*bc.Hash{&startHash}, &stopHash)
if err != nil {
stopHash := work.stopHeader.Hash()
blocks, err := mf.requireBlocks(peerID, []*bc.Hash{&startHash}, &stopHash)
if err != nil {
+ mf.syncPeers.delete(peerID)
mf.peers.ProcessIllegal(peerID, security.LevelConnException, err.Error())
return nil, err
}
if err := mf.verifyBlocksMsg(blocks, work.startHeader, work.stopHeader); err != nil {
mf.peers.ProcessIllegal(peerID, security.LevelConnException, err.Error())
return nil, err
}
if err := mf.verifyBlocksMsg(blocks, work.startHeader, work.stopHeader); err != nil {
+ mf.syncPeers.delete(peerID)
mf.peers.ProcessIllegal(peerID, security.LevelConnException, err.Error())
return nil, err
}
mf.peers.ProcessIllegal(peerID, security.LevelConnException, err.Error())
return nil, err
}