OSDN Git Service

Peer add announces new block message num limit
[bytom/vapor.git] / netsync / chainmgr / msg_fetcher.go
1 package chainmgr
2
3 import (
4         "sync"
5         "time"
6
7         log "github.com/sirupsen/logrus"
8
9         "github.com/vapor/errors"
10         "github.com/vapor/netsync/peers"
11         "github.com/vapor/p2p/security"
12         "github.com/vapor/protocol/bc"
13         "github.com/vapor/protocol/bc/types"
14 )
15
16 const (
17         maxNumOfParallelFetchBlocks = 7
18         blockProcessChSize          = 1024
19         blocksProcessChSize         = 128
20         headersProcessChSize        = 1024
21         maxNumOfFastSyncPeers       = 128
22 )
23
24 var (
25         requireBlockTimeout      = 20 * time.Second
26         requireHeadersTimeout    = 30 * time.Second
27         requireBlocksTimeout     = 50 * time.Second
28         checkSyncPeerNumInterval = 5 * time.Second
29
30         errRequestBlocksTimeout = errors.New("request blocks timeout")
31         errRequestTimeout       = errors.New("request timeout")
32         errPeerDropped          = errors.New("Peer dropped")
33         errSendMsg              = errors.New("send message error")
34 )
35
36 type MsgFetcher interface {
37         resetParameter()
38         addSyncPeer(peerID string)
39         requireBlock(peerID string, height uint64) (*types.Block, error)
40         parallelFetchBlocks(work []*fetchBlocksWork, downloadNotifyCh chan struct{}, ProcessStopCh chan struct{}, wg *sync.WaitGroup)
41         parallelFetchHeaders(peers []*peers.Peer, locator []*bc.Hash, stopHash *bc.Hash, skip uint64) map[string][]*types.BlockHeader
42 }
43
44 type fetchBlocksWork struct {
45         startHeader, stopHeader *types.BlockHeader
46 }
47
48 type fetchBlocksResult struct {
49         startHeight, stopHeight uint64
50         err                     error
51 }
52
53 type msgFetcher struct {
54         storage          Storage
55         syncPeers        *fastSyncPeers
56         peers            *peers.PeerSet
57         blockProcessCh   chan *blockMsg
58         blocksProcessCh  chan *blocksMsg
59         headersProcessCh chan *headersMsg
60         blocksMsgChanMap map[string]chan []*types.Block
61         mux              sync.RWMutex
62 }
63
64 func newMsgFetcher(storage Storage, peers *peers.PeerSet) *msgFetcher {
65         return &msgFetcher{
66                 storage:          storage,
67                 syncPeers:        newFastSyncPeers(),
68                 peers:            peers,
69                 blockProcessCh:   make(chan *blockMsg, blockProcessChSize),
70                 blocksProcessCh:  make(chan *blocksMsg, blocksProcessChSize),
71                 headersProcessCh: make(chan *headersMsg, headersProcessChSize),
72                 blocksMsgChanMap: make(map[string]chan []*types.Block),
73         }
74 }
75
76 func (mf *msgFetcher) addSyncPeer(peerID string) {
77         mf.syncPeers.add(peerID)
78 }
79
80 func (mf *msgFetcher) collectResultLoop(peerCh chan string, quit chan struct{}, resultCh chan *fetchBlocksResult, workerCloseCh chan struct{}, workSize int) {
81         defer close(workerCloseCh)
82         ticker := time.NewTicker(checkSyncPeerNumInterval)
83         defer ticker.Stop()
84
85         //collect fetch results
86         for resultCount := 0; resultCount < workSize && mf.syncPeers.size() > 0; {
87                 select {
88                 case result := <-resultCh:
89                         resultCount++
90                         if result.err != nil {
91                                 log.WithFields(log.Fields{"module": logModule, "startHeight": result.startHeight, "stopHeight": result.stopHeight, "err": result.err}).Error("failed on fetch blocks")
92                                 return
93                         }
94
95                         peer, err := mf.syncPeers.selectIdlePeer()
96                         if err != nil {
97                                 log.WithFields(log.Fields{"module": logModule, "err": result.err}).Warn("failed on find fast sync peer")
98                                 break
99                         }
100                         peerCh <- peer
101                 case <-ticker.C:
102                         if mf.syncPeers.size() == 0 {
103                                 log.WithFields(log.Fields{"module": logModule}).Warn("num of fast sync peer is 0")
104                                 return
105                         }
106                 case _, ok := <-quit:
107                         if !ok {
108                                 return
109                         }
110                 }
111         }
112 }
113
114 func (mf *msgFetcher) fetchBlocks(work *fetchBlocksWork, peerID string) ([]*types.Block, error) {
115         defer mf.syncPeers.setIdle(peerID)
116         startHash := work.startHeader.Hash()
117         stopHash := work.stopHeader.Hash()
118         blocks, err := mf.requireBlocks(peerID, []*bc.Hash{&startHash}, &stopHash)
119         if err != nil {
120                 mf.syncPeers.delete(peerID)
121                 mf.peers.ProcessIllegal(peerID, security.LevelConnException, err.Error())
122                 return nil, err
123         }
124
125         if err := mf.verifyBlocksMsg(blocks, work.startHeader, work.stopHeader); err != nil {
126                 mf.syncPeers.delete(peerID)
127                 mf.peers.ProcessIllegal(peerID, security.LevelConnException, err.Error())
128                 return nil, err
129         }
130
131         return blocks, nil
132 }
133
134 func (mf *msgFetcher) fetchBlocksProcess(work *fetchBlocksWork, peerCh chan string, downloadNotifyCh chan struct{}, closeCh chan struct{}) error {
135         for {
136                 select {
137                 case peerID := <-peerCh:
138                         for {
139                                 blocks, err := mf.fetchBlocks(work, peerID)
140                                 if err != nil {
141                                         log.WithFields(log.Fields{"module": logModule, "startHeight": work.startHeader.Height, "stopHeight": work.stopHeader.Height, "error": err}).Info("failed on fetch blocks")
142                                         break
143                                 }
144
145                                 if err := mf.storage.writeBlocks(peerID, blocks); err != nil {
146                                         log.WithFields(log.Fields{"module": logModule, "error": err}).Info("write block error")
147                                         return err
148                                 }
149
150                                 // send to block process pool
151                                 select {
152                                 case downloadNotifyCh <- struct{}{}:
153                                 default:
154                                 }
155
156                                 // work completed
157                                 if blocks[len(blocks)-1].Height >= work.stopHeader.Height-1 {
158                                         return nil
159                                 }
160
161                                 //unfinished work, continue
162                                 work.startHeader = &blocks[len(blocks)-1].BlockHeader
163                         }
164                 case <-closeCh:
165                         return nil
166                 }
167         }
168 }
169
170 func (mf *msgFetcher) fetchBlocksWorker(workCh chan *fetchBlocksWork, peerCh chan string, resultCh chan *fetchBlocksResult, closeCh chan struct{}, downloadNotifyCh chan struct{}, wg *sync.WaitGroup) {
171         for {
172                 select {
173                 case work := <-workCh:
174                         err := mf.fetchBlocksProcess(work, peerCh, downloadNotifyCh, closeCh)
175                         resultCh <- &fetchBlocksResult{startHeight: work.startHeader.Height, stopHeight: work.stopHeader.Height, err: err}
176                 case <-closeCh:
177                         wg.Done()
178                         return
179                 }
180         }
181 }
182
183 func (mf *msgFetcher) parallelFetchBlocks(works []*fetchBlocksWork, downloadNotifyCh chan struct{}, ProcessStopCh chan struct{}, wg *sync.WaitGroup) {
184         workSize := len(works)
185         workCh := make(chan *fetchBlocksWork, workSize)
186         peerCh := make(chan string, maxNumOfFastSyncPeers)
187         resultCh := make(chan *fetchBlocksResult, workSize)
188         closeCh := make(chan struct{})
189
190         for _, work := range works {
191                 workCh <- work
192         }
193         syncPeers := mf.syncPeers.selectIdlePeers()
194         for i := 0; i < len(syncPeers) && i < maxNumOfFastSyncPeers; i++ {
195                 peerCh <- syncPeers[i]
196         }
197
198         var workWg sync.WaitGroup
199         for i := 0; i <= maxNumOfParallelFetchBlocks && i < workSize; i++ {
200                 workWg.Add(1)
201                 go mf.fetchBlocksWorker(workCh, peerCh, resultCh, closeCh, downloadNotifyCh, &workWg)
202         }
203
204         go mf.collectResultLoop(peerCh, ProcessStopCh, resultCh, closeCh, workSize)
205
206         workWg.Wait()
207         close(resultCh)
208         close(peerCh)
209         close(workCh)
210         close(downloadNotifyCh)
211         wg.Done()
212 }
213
214 func (mf *msgFetcher) parallelFetchHeaders(peers []*peers.Peer, locator []*bc.Hash, stopHash *bc.Hash, skip uint64) map[string][]*types.BlockHeader {
215         result := make(map[string][]*types.BlockHeader)
216         response := make(map[string]bool)
217         for _, peer := range peers {
218                 if ok := peer.GetHeaders(locator, stopHash, skip); !ok {
219                         continue
220                 }
221                 result[peer.ID()] = nil
222         }
223
224         timeout := time.NewTimer(requireHeadersTimeout)
225         defer timeout.Stop()
226         for {
227                 select {
228                 case msg := <-mf.headersProcessCh:
229                         if _, ok := result[msg.peerID]; ok {
230                                 result[msg.peerID] = append(result[msg.peerID], msg.headers[:]...)
231                                 response[msg.peerID] = true
232                                 if len(response) == len(result) {
233                                         return result
234                                 }
235                         }
236                 case <-timeout.C:
237                         log.WithFields(log.Fields{"module": logModule, "err": errRequestTimeout}).Warn("failed on parallel fetch headers")
238                         return result
239                 }
240         }
241 }
242
243 func (mf *msgFetcher) processBlock(peerID string, block *types.Block) {
244         mf.blockProcessCh <- &blockMsg{block: block, peerID: peerID}
245 }
246
247 func (mf *msgFetcher) processBlocks(peerID string, blocks []*types.Block) {
248         mf.blocksProcessCh <- &blocksMsg{blocks: blocks, peerID: peerID}
249         mf.mux.RLock()
250         blocksMsgChan, ok := mf.blocksMsgChanMap[peerID]
251         mf.mux.RUnlock()
252         if !ok {
253                 mf.peers.ProcessIllegal(peerID, security.LevelMsgIllegal, "msg from unsolicited peer")
254                 return
255         }
256
257         blocksMsgChan <- blocks
258 }
259
260 func (mf *msgFetcher) processHeaders(peerID string, headers []*types.BlockHeader) {
261         mf.headersProcessCh <- &headersMsg{headers: headers, peerID: peerID}
262 }
263
264 func (mf *msgFetcher) requireBlock(peerID string, height uint64) (*types.Block, error) {
265         peer := mf.peers.GetPeer(peerID)
266         if peer == nil {
267                 return nil, errPeerDropped
268         }
269
270         if ok := peer.GetBlockByHeight(height); !ok {
271                 return nil, errSendMsg
272         }
273
274         timeout := time.NewTimer(requireBlockTimeout)
275         defer timeout.Stop()
276
277         for {
278                 select {
279                 case msg := <-mf.blockProcessCh:
280                         if msg.peerID != peerID {
281                                 continue
282                         }
283                         if msg.block.Height != height {
284                                 continue
285                         }
286                         return msg.block, nil
287                 case <-timeout.C:
288                         return nil, errors.Wrap(errRequestTimeout, "requireBlock")
289                 }
290         }
291 }
292
293 func (mf *msgFetcher) requireBlocks(peerID string, locator []*bc.Hash, stopHash *bc.Hash) ([]*types.Block, error) {
294         peer := mf.peers.GetPeer(peerID)
295         if peer == nil {
296                 mf.syncPeers.delete(peerID)
297                 return nil, errPeerDropped
298         }
299
300         receiveCh := make(chan []*types.Block, 1)
301         mf.mux.Lock()
302         mf.blocksMsgChanMap[peerID] = receiveCh
303         mf.mux.Unlock()
304
305         if ok := peer.GetBlocks(locator, stopHash); !ok {
306                 return nil, errSendMsg
307         }
308
309         timeout := time.NewTimer(requireBlocksTimeout)
310         defer timeout.Stop()
311         select {
312         case blocks := <-receiveCh:
313                 return blocks, nil
314         case <-timeout.C:
315                 return nil, errRequestBlocksTimeout
316         }
317 }
318
319 func (mf *msgFetcher) resetParameter() {
320         mf.blocksMsgChanMap = make(map[string]chan []*types.Block)
321         mf.syncPeers = newFastSyncPeers()
322         mf.storage.resetParameter()
323         //empty chan
324         for {
325                 select {
326                 case <-mf.blocksProcessCh:
327                 case <-mf.headersProcessCh:
328                 default:
329                         return
330                 }
331         }
332 }
333
334 func (mf *msgFetcher) verifyBlocksMsg(blocks []*types.Block, startHeader, stopHeader *types.BlockHeader) error {
335         // null blocks
336         if len(blocks) == 0 {
337                 return errors.New("null blocks msg")
338         }
339
340         // blocks more than request
341         if uint64(len(blocks)) > stopHeader.Height-startHeader.Height+1 {
342                 return errors.New("exceed length blocks msg")
343         }
344
345         // verify start block
346         if blocks[0].Hash() != startHeader.Hash() {
347                 return errors.New("get mismatch blocks msg")
348         }
349
350         // verify blocks continuity
351         for i := 0; i < len(blocks)-1; i++ {
352                 if blocks[i].Hash() != blocks[i+1].PreviousBlockHash {
353                         return errors.New("get discontinuous blocks msg")
354                 }
355         }
356
357         return nil
358 }