OSDN Git Service

versoin1.1.9 (#594)
[bytom/vapor.git] / netsync / chainmgr / msg_fetcher.go
1 package chainmgr
2
3 import (
4         "sync"
5         "time"
6
7         log "github.com/sirupsen/logrus"
8
9         "github.com/bytom/vapor/errors"
10         "github.com/bytom/vapor/netsync/peers"
11         "github.com/bytom/vapor/p2p/security"
12         "github.com/bytom/vapor/protocol/bc"
13         "github.com/bytom/vapor/protocol/bc/types"
14 )
15
16 const (
17         maxNumOfParallelFetchBlocks = 7
18         blockProcessChSize          = 1024
19         blocksProcessChSize         = 128
20         headersProcessChSize        = 1024
21         maxNumOfFastSyncPeers       = 128
22 )
23
24 var (
25         requireBlockTimeout      = 20 * time.Second
26         requireHeadersTimeout    = 30 * time.Second
27         requireBlocksTimeout     = 90 * time.Second
28         checkSyncPeerNumInterval = 5 * time.Second
29
30         errRequestBlocksTimeout = errors.New("request blocks timeout")
31         errRequestTimeout       = errors.New("request timeout")
32         errPeerDropped          = errors.New("Peer dropped")
33         errSendMsg              = errors.New("send message error")
34 )
35
36 // MsgFetcher is the interface for msg fetch struct
37 type MsgFetcher interface {
38         resetParameter()
39         addSyncPeer(peerID string)
40         requireBlock(peerID string, height uint64) (*types.Block, error)
41         parallelFetchBlocks(work []*fetchBlocksWork, downloadNotifyCh chan struct{}, ProcessStopCh chan struct{}, wg *sync.WaitGroup)
42         parallelFetchHeaders(peers []*peers.Peer, locator []*bc.Hash, stopHash *bc.Hash, skip uint64) map[string][]*types.BlockHeader
43 }
44
45 type fetchBlocksWork struct {
46         startHeader, stopHeader *types.BlockHeader
47 }
48
49 type fetchBlocksResult struct {
50         startHeight, stopHeight uint64
51         err                     error
52 }
53
54 type msgFetcher struct {
55         storage          *storage
56         syncPeers        *fastSyncPeers
57         peers            *peers.PeerSet
58         blockProcessCh   chan *blockMsg
59         blocksProcessCh  chan *blocksMsg
60         headersProcessCh chan *headersMsg
61         blocksMsgChanMap map[string]chan []*types.Block
62         mux              sync.RWMutex
63 }
64
65 func newMsgFetcher(storage *storage, peers *peers.PeerSet) *msgFetcher {
66         return &msgFetcher{
67                 storage:          storage,
68                 syncPeers:        newFastSyncPeers(),
69                 peers:            peers,
70                 blockProcessCh:   make(chan *blockMsg, blockProcessChSize),
71                 blocksProcessCh:  make(chan *blocksMsg, blocksProcessChSize),
72                 headersProcessCh: make(chan *headersMsg, headersProcessChSize),
73                 blocksMsgChanMap: make(map[string]chan []*types.Block),
74         }
75 }
76
77 func (mf *msgFetcher) addSyncPeer(peerID string) {
78         mf.syncPeers.add(peerID)
79 }
80
81 func (mf *msgFetcher) collectResultLoop(peerCh chan string, quit chan struct{}, resultCh chan *fetchBlocksResult, workerCloseCh chan struct{}, workSize int) {
82         defer close(workerCloseCh)
83         ticker := time.NewTicker(checkSyncPeerNumInterval)
84         defer ticker.Stop()
85
86         //collect fetch results
87         for resultCount := 0; resultCount < workSize && mf.syncPeers.size() > 0; {
88                 select {
89                 case result := <-resultCh:
90                         resultCount++
91                         if result.err != nil {
92                                 log.WithFields(log.Fields{"module": logModule, "startHeight": result.startHeight, "stopHeight": result.stopHeight, "err": result.err}).Error("failed on fetch blocks")
93                                 return
94                         }
95
96                         peer, err := mf.syncPeers.selectIdlePeer()
97                         if err != nil {
98                                 log.WithFields(log.Fields{"module": logModule, "err": result.err}).Warn("failed on find fast sync peer")
99                                 break
100                         }
101                         peerCh <- peer
102                 case <-ticker.C:
103                         if mf.syncPeers.size() == 0 {
104                                 log.WithFields(log.Fields{"module": logModule}).Warn("num of fast sync peer is 0")
105                                 return
106                         }
107                 case _, ok := <-quit:
108                         if !ok {
109                                 return
110                         }
111                 }
112         }
113 }
114
115 func (mf *msgFetcher) fetchBlocks(work *fetchBlocksWork, peerID string) ([]*types.Block, error) {
116         defer mf.syncPeers.setIdle(peerID)
117         startHash := work.startHeader.Hash()
118         stopHash := work.stopHeader.Hash()
119         blocks, err := mf.requireBlocks(peerID, []*bc.Hash{&startHash}, &stopHash)
120         if err != nil {
121                 mf.syncPeers.delete(peerID)
122                 mf.peers.ProcessIllegal(peerID, security.LevelConnException, err.Error())
123                 return nil, err
124         }
125
126         if err := mf.verifyBlocksMsg(blocks, work.startHeader, work.stopHeader); err != nil {
127                 mf.syncPeers.delete(peerID)
128                 mf.peers.ProcessIllegal(peerID, security.LevelConnException, err.Error())
129                 return nil, err
130         }
131
132         return blocks, nil
133 }
134
135 func (mf *msgFetcher) fetchBlocksProcess(work *fetchBlocksWork, peerCh chan string, downloadNotifyCh chan struct{}, closeCh chan struct{}) error {
136         for {
137                 select {
138                 case peerID := <-peerCh:
139                         for {
140                                 blocks, err := mf.fetchBlocks(work, peerID)
141                                 if err != nil {
142                                         log.WithFields(log.Fields{"module": logModule, "startHeight": work.startHeader.Height, "stopHeight": work.stopHeader.Height, "error": err}).Info("failed on fetch blocks")
143                                         break
144                                 }
145
146                                 if err := mf.storage.writeBlocks(peerID, blocks); err != nil {
147                                         log.WithFields(log.Fields{"module": logModule, "error": err}).Info("write block error")
148                                         return err
149                                 }
150
151                                 // send to block process pool
152                                 select {
153                                 case downloadNotifyCh <- struct{}{}:
154                                 default:
155                                 }
156
157                                 // work completed
158                                 if blocks[len(blocks)-1].Height >= work.stopHeader.Height-1 {
159                                         return nil
160                                 }
161
162                                 //unfinished work, continue
163                                 work.startHeader = &blocks[len(blocks)-1].BlockHeader
164                         }
165                 case <-closeCh:
166                         return nil
167                 }
168         }
169 }
170
171 func (mf *msgFetcher) fetchBlocksWorker(workCh chan *fetchBlocksWork, peerCh chan string, resultCh chan *fetchBlocksResult, closeCh chan struct{}, downloadNotifyCh chan struct{}, wg *sync.WaitGroup) {
172         for {
173                 select {
174                 case work := <-workCh:
175                         err := mf.fetchBlocksProcess(work, peerCh, downloadNotifyCh, closeCh)
176                         resultCh <- &fetchBlocksResult{startHeight: work.startHeader.Height, stopHeight: work.stopHeader.Height, err: err}
177                 case <-closeCh:
178                         wg.Done()
179                         return
180                 }
181         }
182 }
183
184 func (mf *msgFetcher) parallelFetchBlocks(works []*fetchBlocksWork, downloadNotifyCh chan struct{}, ProcessStopCh chan struct{}, wg *sync.WaitGroup) {
185         workSize := len(works)
186         workCh := make(chan *fetchBlocksWork, workSize)
187         peerCh := make(chan string, maxNumOfFastSyncPeers)
188         resultCh := make(chan *fetchBlocksResult, workSize)
189         closeCh := make(chan struct{})
190
191         for _, work := range works {
192                 workCh <- work
193         }
194         syncPeers := mf.syncPeers.selectIdlePeers()
195         for i := 0; i < len(syncPeers) && i < maxNumOfFastSyncPeers; i++ {
196                 peerCh <- syncPeers[i]
197         }
198
199         var workWg sync.WaitGroup
200         for i := 0; i <= maxNumOfParallelFetchBlocks && i < workSize; i++ {
201                 workWg.Add(1)
202                 go mf.fetchBlocksWorker(workCh, peerCh, resultCh, closeCh, downloadNotifyCh, &workWg)
203         }
204
205         go mf.collectResultLoop(peerCh, ProcessStopCh, resultCh, closeCh, workSize)
206
207         workWg.Wait()
208         close(resultCh)
209         close(peerCh)
210         close(workCh)
211         close(downloadNotifyCh)
212         wg.Done()
213 }
214
215 func (mf *msgFetcher) parallelFetchHeaders(peers []*peers.Peer, locator []*bc.Hash, stopHash *bc.Hash, skip uint64) map[string][]*types.BlockHeader {
216         result := make(map[string][]*types.BlockHeader)
217         response := make(map[string]bool)
218         for _, peer := range peers {
219                 if ok := peer.GetHeaders(locator, stopHash, skip); !ok {
220                         continue
221                 }
222                 result[peer.ID()] = nil
223         }
224
225         timeout := time.NewTimer(requireHeadersTimeout)
226         defer timeout.Stop()
227         for {
228                 select {
229                 case msg := <-mf.headersProcessCh:
230                         if _, ok := result[msg.peerID]; ok {
231                                 result[msg.peerID] = append(result[msg.peerID], msg.headers[:]...)
232                                 response[msg.peerID] = true
233                                 if len(response) == len(result) {
234                                         return result
235                                 }
236                         }
237                 case <-timeout.C:
238                         log.WithFields(log.Fields{"module": logModule, "err": errRequestTimeout}).Warn("failed on parallel fetch headers")
239                         return result
240                 }
241         }
242 }
243
244 func (mf *msgFetcher) processBlock(peerID string, block *types.Block) {
245         mf.blockProcessCh <- &blockMsg{block: block, peerID: peerID}
246 }
247
248 func (mf *msgFetcher) processBlocks(peerID string, blocks []*types.Block) {
249         mf.blocksProcessCh <- &blocksMsg{blocks: blocks, peerID: peerID}
250         mf.mux.RLock()
251         blocksMsgChan, ok := mf.blocksMsgChanMap[peerID]
252         mf.mux.RUnlock()
253         if !ok {
254                 mf.peers.ProcessIllegal(peerID, security.LevelMsgIllegal, "msg from unsolicited peer")
255                 return
256         }
257
258         blocksMsgChan <- blocks
259 }
260
261 func (mf *msgFetcher) processHeaders(peerID string, headers []*types.BlockHeader) {
262         mf.headersProcessCh <- &headersMsg{headers: headers, peerID: peerID}
263 }
264
265 func (mf *msgFetcher) requireBlock(peerID string, height uint64) (*types.Block, error) {
266         peer := mf.peers.GetPeer(peerID)
267         if peer == nil {
268                 return nil, errPeerDropped
269         }
270
271         if ok := peer.GetBlockByHeight(height); !ok {
272                 return nil, errSendMsg
273         }
274
275         timeout := time.NewTimer(requireBlockTimeout)
276         defer timeout.Stop()
277
278         for {
279                 select {
280                 case msg := <-mf.blockProcessCh:
281                         if msg.peerID != peerID {
282                                 continue
283                         }
284                         if msg.block.Height != height {
285                                 continue
286                         }
287                         return msg.block, nil
288                 case <-timeout.C:
289                         return nil, errors.Wrap(errRequestTimeout, "requireBlock")
290                 }
291         }
292 }
293
294 func (mf *msgFetcher) requireBlocks(peerID string, locator []*bc.Hash, stopHash *bc.Hash) ([]*types.Block, error) {
295         peer := mf.peers.GetPeer(peerID)
296         if peer == nil {
297                 mf.syncPeers.delete(peerID)
298                 return nil, errPeerDropped
299         }
300
301         receiveCh := make(chan []*types.Block, 1)
302         mf.mux.Lock()
303         mf.blocksMsgChanMap[peerID] = receiveCh
304         mf.mux.Unlock()
305
306         if ok := peer.GetBlocks(locator, stopHash); !ok {
307                 return nil, errSendMsg
308         }
309
310         timeout := time.NewTimer(requireBlocksTimeout)
311         defer timeout.Stop()
312         select {
313         case blocks := <-receiveCh:
314                 return blocks, nil
315         case <-timeout.C:
316                 return nil, errRequestBlocksTimeout
317         }
318 }
319
320 func (mf *msgFetcher) resetParameter() {
321         mf.blocksMsgChanMap = make(map[string]chan []*types.Block)
322         mf.syncPeers = newFastSyncPeers()
323         mf.storage.resetParameter()
324         //empty chan
325         for {
326                 select {
327                 case <-mf.blocksProcessCh:
328                 case <-mf.headersProcessCh:
329                 default:
330                         return
331                 }
332         }
333 }
334
335 func (mf *msgFetcher) verifyBlocksMsg(blocks []*types.Block, startHeader, stopHeader *types.BlockHeader) error {
336         // null blocks
337         if len(blocks) == 0 {
338                 return errors.New("null blocks msg")
339         }
340
341         // blocks more than request
342         if uint64(len(blocks)) > stopHeader.Height-startHeader.Height+1 {
343                 return errors.New("exceed length blocks msg")
344         }
345
346         // verify start block
347         if blocks[0].Hash() != startHeader.Hash() {
348                 return errors.New("get mismatch blocks msg")
349         }
350
351         // verify blocks continuity
352         for i := 0; i < len(blocks)-1; i++ {
353                 if blocks[i].Hash() != blocks[i+1].PreviousBlockHash {
354                         return errors.New("get discontinuous blocks msg")
355                 }
356         }
357
358         return nil
359 }