+ blocksMsgChanMap: make(map[string]chan []*types.Block),
+ }
+}
+
+func (mf *msgFetcher) addSyncPeer(peerID string) {
+ mf.syncPeers.add(peerID)
+}
+
+func (mf *msgFetcher) collectResultLoop(peerCh chan string, quit chan struct{}, resultCh chan *fetchBlocksResult, workerCloseCh chan struct{}, workSize int) {
+ defer close(workerCloseCh)
+ ticker := time.NewTicker(checkSyncPeerNumInterval)
+ defer ticker.Stop()
+
+ //collect fetch results
+ for resultCount := 0; resultCount < workSize && mf.syncPeers.size() > 0; {
+ select {
+ case result := <-resultCh:
+ resultCount++
+ if result.err != nil {
+ log.WithFields(log.Fields{"module": logModule, "startHeight": result.startHeight, "stopHeight": result.stopHeight, "err": result.err}).Error("failed on fetch blocks")
+ return
+ }
+
+ peer, err := mf.syncPeers.selectIdlePeer()
+ if err != nil {
+ log.WithFields(log.Fields{"module": logModule, "err": result.err}).Warn("failed on find fast sync peer")
+ break
+ }
+ peerCh <- peer
+ case <-ticker.C:
+ if mf.syncPeers.size() == 0 {
+ log.WithFields(log.Fields{"module": logModule}).Warn("num of fast sync peer is 0")
+ return
+ }
+ case _, ok := <-quit:
+ if !ok {
+ return
+ }
+ }
+ }
+}
+
+func (mf *msgFetcher) fetchBlocks(work *fetchBlocksWork, peerID string) ([]*types.Block, error) {
+ defer mf.syncPeers.setIdle(peerID)
+ startHash := work.startHeader.Hash()
+ stopHash := work.stopHeader.Hash()
+ blocks, err := mf.requireBlocks(peerID, []*bc.Hash{&startHash}, &stopHash)
+ if err != nil {
+ mf.syncPeers.delete(peerID)
+ mf.peers.ProcessIllegal(peerID, security.LevelConnException, err.Error())
+ return nil, err
+ }
+
+ if err := mf.verifyBlocksMsg(blocks, work.startHeader, work.stopHeader); err != nil {
+ mf.syncPeers.delete(peerID)
+ mf.peers.ProcessIllegal(peerID, security.LevelConnException, err.Error())
+ return nil, err
+ }
+
+ return blocks, nil
+}
+
+func (mf *msgFetcher) fetchBlocksProcess(work *fetchBlocksWork, peerCh chan string, downloadNotifyCh chan struct{}, closeCh chan struct{}) error {
+ for {
+ select {
+ case peerID := <-peerCh:
+ for {
+ blocks, err := mf.fetchBlocks(work, peerID)
+ if err != nil {
+ log.WithFields(log.Fields{"module": logModule, "startHeight": work.startHeader.Height, "stopHeight": work.stopHeader.Height, "error": err}).Info("failed on fetch blocks")
+ break
+ }
+
+ if err := mf.storage.writeBlocks(peerID, blocks); err != nil {
+ log.WithFields(log.Fields{"module": logModule, "error": err}).Info("write block error")
+ return err
+ }
+
+ // send to block process pool
+ select {
+ case downloadNotifyCh <- struct{}{}:
+ default:
+ }
+
+ // work completed
+ if blocks[len(blocks)-1].Height >= work.stopHeader.Height-1 {
+ return nil
+ }
+
+ //unfinished work, continue
+ work.startHeader = &blocks[len(blocks)-1].BlockHeader
+ }
+ case <-closeCh:
+ return nil
+ }
+ }
+}
+
+func (mf *msgFetcher) fetchBlocksWorker(workCh chan *fetchBlocksWork, peerCh chan string, resultCh chan *fetchBlocksResult, closeCh chan struct{}, downloadNotifyCh chan struct{}, wg *sync.WaitGroup) {
+ for {
+ select {
+ case work := <-workCh:
+ err := mf.fetchBlocksProcess(work, peerCh, downloadNotifyCh, closeCh)
+ resultCh <- &fetchBlocksResult{startHeight: work.startHeader.Height, stopHeight: work.stopHeader.Height, err: err}
+ case <-closeCh:
+ wg.Done()
+ return
+ }
+ }
+}
+
+func (mf *msgFetcher) parallelFetchBlocks(works []*fetchBlocksWork, downloadNotifyCh chan struct{}, ProcessStopCh chan struct{}, wg *sync.WaitGroup) {
+ workSize := len(works)
+ workCh := make(chan *fetchBlocksWork, workSize)
+ peerCh := make(chan string, maxNumOfFastSyncPeers)
+ resultCh := make(chan *fetchBlocksResult, workSize)
+ closeCh := make(chan struct{})
+
+ for _, work := range works {
+ workCh <- work
+ }
+ syncPeers := mf.syncPeers.selectIdlePeers()
+ for i := 0; i < len(syncPeers) && i < maxNumOfFastSyncPeers; i++ {
+ peerCh <- syncPeers[i]
+ }
+
+ var workWg sync.WaitGroup
+ for i := 0; i <= maxNumOfParallelFetchBlocks && i < workSize; i++ {
+ workWg.Add(1)
+ go mf.fetchBlocksWorker(workCh, peerCh, resultCh, closeCh, downloadNotifyCh, &workWg)
+ }
+
+ go mf.collectResultLoop(peerCh, ProcessStopCh, resultCh, closeCh, workSize)
+
+ workWg.Wait()
+ close(resultCh)
+ close(peerCh)
+ close(workCh)
+ close(downloadNotifyCh)
+ wg.Done()
+}
+
+func (mf *msgFetcher) parallelFetchHeaders(peers []*peers.Peer, locator []*bc.Hash, stopHash *bc.Hash, skip uint64) map[string][]*types.BlockHeader {
+ result := make(map[string][]*types.BlockHeader)
+ response := make(map[string]bool)
+ for _, peer := range peers {
+ if ok := peer.GetHeaders(locator, stopHash, skip); !ok {
+ continue
+ }
+ result[peer.ID()] = nil
+ }
+
+ timeout := time.NewTimer(requireHeadersTimeout)
+ defer timeout.Stop()
+ for {
+ select {
+ case msg := <-mf.headersProcessCh:
+ if _, ok := result[msg.peerID]; ok {
+ result[msg.peerID] = append(result[msg.peerID], msg.headers[:]...)
+ response[msg.peerID] = true
+ if len(response) == len(result) {
+ return result
+ }
+ }
+ case <-timeout.C:
+ log.WithFields(log.Fields{"module": logModule, "err": errRequestTimeout}).Warn("failed on parallel fetch headers")
+ return result
+ }