7 log "github.com/sirupsen/logrus"
9 "github.com/bytom/vapor/errors"
10 "github.com/bytom/vapor/netsync/peers"
11 "github.com/bytom/vapor/p2p/security"
12 "github.com/bytom/vapor/protocol/bc"
13 "github.com/bytom/vapor/protocol/bc/types"
17 maxNumOfParallelFetchBlocks = 7
18 blockProcessChSize = 1024
19 blocksProcessChSize = 128
20 headersProcessChSize = 1024
21 maxNumOfFastSyncPeers = 128
25 requireBlockTimeout = 20 * time.Second
26 requireHeadersTimeout = 30 * time.Second
27 requireBlocksTimeout = 90 * time.Second
28 checkSyncPeerNumInterval = 5 * time.Second
30 errRequestBlocksTimeout = errors.New("request blocks timeout")
31 errRequestTimeout = errors.New("request timeout")
32 errPeerDropped = errors.New("Peer dropped")
33 errSendMsg = errors.New("send message error")
36 // MsgFetcher is the interface for msg fetch struct
37 type MsgFetcher interface {
39 addSyncPeer(peerID string)
40 requireBlock(peerID string, height uint64) (*types.Block, error)
41 parallelFetchBlocks(work []*fetchBlocksWork, downloadNotifyCh chan struct{}, ProcessStopCh chan struct{}, wg *sync.WaitGroup)
42 parallelFetchHeaders(peers []*peers.Peer, locator []*bc.Hash, stopHash *bc.Hash, skip uint64) map[string][]*types.BlockHeader
45 type fetchBlocksWork struct {
46 startHeader, stopHeader *types.BlockHeader
49 type fetchBlocksResult struct {
50 startHeight, stopHeight uint64
54 type msgFetcher struct {
56 syncPeers *fastSyncPeers
58 blockProcessCh chan *blockMsg
59 blocksProcessCh chan *blocksMsg
60 headersProcessCh chan *headersMsg
61 blocksMsgChanMap map[string]chan []*types.Block
65 func newMsgFetcher(storage *storage, peers *peers.PeerSet) *msgFetcher {
68 syncPeers: newFastSyncPeers(),
70 blockProcessCh: make(chan *blockMsg, blockProcessChSize),
71 blocksProcessCh: make(chan *blocksMsg, blocksProcessChSize),
72 headersProcessCh: make(chan *headersMsg, headersProcessChSize),
73 blocksMsgChanMap: make(map[string]chan []*types.Block),
77 func (mf *msgFetcher) addSyncPeer(peerID string) {
78 mf.syncPeers.add(peerID)
81 func (mf *msgFetcher) collectResultLoop(peerCh chan string, quit chan struct{}, resultCh chan *fetchBlocksResult, workerCloseCh chan struct{}, workSize int) {
82 defer close(workerCloseCh)
83 ticker := time.NewTicker(checkSyncPeerNumInterval)
86 //collect fetch results
87 for resultCount := 0; resultCount < workSize && mf.syncPeers.size() > 0; {
89 case result := <-resultCh:
91 if result.err != nil {
92 log.WithFields(log.Fields{"module": logModule, "startHeight": result.startHeight, "stopHeight": result.stopHeight, "err": result.err}).Error("failed on fetch blocks")
96 peer, err := mf.syncPeers.selectIdlePeer()
98 log.WithFields(log.Fields{"module": logModule, "err": result.err}).Warn("failed on find fast sync peer")
103 if mf.syncPeers.size() == 0 {
104 log.WithFields(log.Fields{"module": logModule}).Warn("num of fast sync peer is 0")
107 case _, ok := <-quit:
115 func (mf *msgFetcher) fetchBlocks(work *fetchBlocksWork, peerID string) ([]*types.Block, error) {
116 defer mf.syncPeers.setIdle(peerID)
117 startHash := work.startHeader.Hash()
118 stopHash := work.stopHeader.Hash()
119 blocks, err := mf.requireBlocks(peerID, []*bc.Hash{&startHash}, &stopHash)
121 mf.syncPeers.delete(peerID)
122 mf.peers.ProcessIllegal(peerID, security.LevelConnException, err.Error())
126 if err := mf.verifyBlocksMsg(blocks, work.startHeader, work.stopHeader); err != nil {
127 mf.syncPeers.delete(peerID)
128 mf.peers.ProcessIllegal(peerID, security.LevelConnException, err.Error())
135 func (mf *msgFetcher) fetchBlocksProcess(work *fetchBlocksWork, peerCh chan string, downloadNotifyCh chan struct{}, closeCh chan struct{}) error {
138 case peerID := <-peerCh:
140 blocks, err := mf.fetchBlocks(work, peerID)
142 log.WithFields(log.Fields{"module": logModule, "startHeight": work.startHeader.Height, "stopHeight": work.stopHeader.Height, "error": err}).Info("failed on fetch blocks")
146 if err := mf.storage.writeBlocks(peerID, blocks); err != nil {
147 log.WithFields(log.Fields{"module": logModule, "error": err}).Info("write block error")
151 // send to block process pool
153 case downloadNotifyCh <- struct{}{}:
158 if blocks[len(blocks)-1].Height >= work.stopHeader.Height-1 {
162 //unfinished work, continue
163 work.startHeader = &blocks[len(blocks)-1].BlockHeader
171 func (mf *msgFetcher) fetchBlocksWorker(workCh chan *fetchBlocksWork, peerCh chan string, resultCh chan *fetchBlocksResult, closeCh chan struct{}, downloadNotifyCh chan struct{}, wg *sync.WaitGroup) {
174 case work := <-workCh:
175 err := mf.fetchBlocksProcess(work, peerCh, downloadNotifyCh, closeCh)
176 resultCh <- &fetchBlocksResult{startHeight: work.startHeader.Height, stopHeight: work.stopHeader.Height, err: err}
184 func (mf *msgFetcher) parallelFetchBlocks(works []*fetchBlocksWork, downloadNotifyCh chan struct{}, ProcessStopCh chan struct{}, wg *sync.WaitGroup) {
185 workSize := len(works)
186 workCh := make(chan *fetchBlocksWork, workSize)
187 peerCh := make(chan string, maxNumOfFastSyncPeers)
188 resultCh := make(chan *fetchBlocksResult, workSize)
189 closeCh := make(chan struct{})
191 for _, work := range works {
194 syncPeers := mf.syncPeers.selectIdlePeers()
195 for i := 0; i < len(syncPeers) && i < maxNumOfFastSyncPeers; i++ {
196 peerCh <- syncPeers[i]
199 var workWg sync.WaitGroup
200 for i := 0; i <= maxNumOfParallelFetchBlocks && i < workSize; i++ {
202 go mf.fetchBlocksWorker(workCh, peerCh, resultCh, closeCh, downloadNotifyCh, &workWg)
205 go mf.collectResultLoop(peerCh, ProcessStopCh, resultCh, closeCh, workSize)
211 close(downloadNotifyCh)
215 func (mf *msgFetcher) parallelFetchHeaders(peers []*peers.Peer, locator []*bc.Hash, stopHash *bc.Hash, skip uint64) map[string][]*types.BlockHeader {
216 result := make(map[string][]*types.BlockHeader)
217 response := make(map[string]bool)
218 for _, peer := range peers {
219 if ok := peer.GetHeaders(locator, stopHash, skip); !ok {
222 result[peer.ID()] = nil
225 timeout := time.NewTimer(requireHeadersTimeout)
229 case msg := <-mf.headersProcessCh:
230 if _, ok := result[msg.peerID]; ok {
231 result[msg.peerID] = append(result[msg.peerID], msg.headers[:]...)
232 response[msg.peerID] = true
233 if len(response) == len(result) {
238 log.WithFields(log.Fields{"module": logModule, "err": errRequestTimeout}).Warn("failed on parallel fetch headers")
244 func (mf *msgFetcher) processBlock(peerID string, block *types.Block) {
245 mf.blockProcessCh <- &blockMsg{block: block, peerID: peerID}
248 func (mf *msgFetcher) processBlocks(peerID string, blocks []*types.Block) {
249 mf.blocksProcessCh <- &blocksMsg{blocks: blocks, peerID: peerID}
251 blocksMsgChan, ok := mf.blocksMsgChanMap[peerID]
254 mf.peers.ProcessIllegal(peerID, security.LevelMsgIllegal, "msg from unsolicited peer")
258 blocksMsgChan <- blocks
261 func (mf *msgFetcher) processHeaders(peerID string, headers []*types.BlockHeader) {
262 mf.headersProcessCh <- &headersMsg{headers: headers, peerID: peerID}
265 func (mf *msgFetcher) requireBlock(peerID string, height uint64) (*types.Block, error) {
266 peer := mf.peers.GetPeer(peerID)
268 return nil, errPeerDropped
271 if ok := peer.GetBlockByHeight(height); !ok {
272 return nil, errSendMsg
275 timeout := time.NewTimer(requireBlockTimeout)
280 case msg := <-mf.blockProcessCh:
281 if msg.peerID != peerID {
284 if msg.block.Height != height {
287 return msg.block, nil
289 return nil, errors.Wrap(errRequestTimeout, "requireBlock")
294 func (mf *msgFetcher) requireBlocks(peerID string, locator []*bc.Hash, stopHash *bc.Hash) ([]*types.Block, error) {
295 peer := mf.peers.GetPeer(peerID)
297 mf.syncPeers.delete(peerID)
298 return nil, errPeerDropped
301 receiveCh := make(chan []*types.Block, 1)
303 mf.blocksMsgChanMap[peerID] = receiveCh
306 if ok := peer.GetBlocks(locator, stopHash); !ok {
307 return nil, errSendMsg
310 timeout := time.NewTimer(requireBlocksTimeout)
313 case blocks := <-receiveCh:
316 return nil, errRequestBlocksTimeout
320 func (mf *msgFetcher) resetParameter() {
321 mf.blocksMsgChanMap = make(map[string]chan []*types.Block)
322 mf.syncPeers = newFastSyncPeers()
323 mf.storage.resetParameter()
327 case <-mf.blocksProcessCh:
328 case <-mf.headersProcessCh:
335 func (mf *msgFetcher) verifyBlocksMsg(blocks []*types.Block, startHeader, stopHeader *types.BlockHeader) error {
337 if len(blocks) == 0 {
338 return errors.New("null blocks msg")
341 // blocks more than request
342 if uint64(len(blocks)) > stopHeader.Height-startHeader.Height+1 {
343 return errors.New("exceed length blocks msg")
346 // verify start block
347 if blocks[0].Hash() != startHeader.Hash() {
348 return errors.New("get mismatch blocks msg")
351 // verify blocks continuity
352 for i := 0; i < len(blocks)-1; i++ {
353 if blocks[i].Hash() != blocks[i+1].PreviousBlockHash {
354 return errors.New("get discontinuous blocks msg")