OSDN Git Service

2cfda5ec2b3304e23215f8ff424935b00e23f227
[bytom/vapor.git] / netsync / chainmgr / msg_fetcher.go
1 package chainmgr
2
3 import (
4         "sync"
5         "time"
6
7         log "github.com/sirupsen/logrus"
8
9         "github.com/vapor/errors"
10         "github.com/vapor/netsync/peers"
11         "github.com/vapor/p2p/security"
12         "github.com/vapor/protocol/bc"
13         "github.com/vapor/protocol/bc/types"
14 )
15
16 const (
17         maxNumOfParallelFetchBlocks = 7
18         blockProcessChSize          = 1024
19         blocksProcessChSize         = 128
20         headersProcessChSize        = 1024
21         maxNumOfFastSyncPeers       = 128
22 )
23
24 var (
25         requireBlockTimeout   = 20 * time.Second
26         requireHeadersTimeout = 30 * time.Second
27         requireBlocksTimeout  = 50 * time.Second
28
29         errRequestBlocksTimeout = errors.New("request blocks timeout")
30         errRequestTimeout       = errors.New("request timeout")
31         errPeerDropped          = errors.New("Peer dropped")
32         errSendMsg              = errors.New("send message error")
33 )
34
35 type MsgFetcher interface {
36         resetParameter()
37         addSyncPeer(peerID string)
38         requireBlock(peerID string, height uint64) (*types.Block, error)
39         parallelFetchBlocks(work []*fetchBlocksWork, downloadNotifyCh chan struct{}, ProcessStopCh chan struct{}, wg *sync.WaitGroup)
40         parallelFetchHeaders(peers []*peers.Peer, locator []*bc.Hash, stopHash *bc.Hash, skip uint64) map[string][]*types.BlockHeader
41 }
42
43 type fetchBlocksWork struct {
44         startHeader, stopHeader *types.BlockHeader
45 }
46
47 type fetchBlocksResult struct {
48         startHeight, stopHeight uint64
49         err                     error
50 }
51
52 type msgFetcher struct {
53         storage          Storage
54         syncPeers        *fastSyncPeers
55         peers            *peers.PeerSet
56         blockProcessCh   chan *blockMsg
57         blocksProcessCh  chan *blocksMsg
58         headersProcessCh chan *headersMsg
59         blocksMsgChanMap map[string]chan []*types.Block
60         mux              sync.RWMutex
61 }
62
63 func newMsgFetcher(storage Storage, peers *peers.PeerSet) *msgFetcher {
64         return &msgFetcher{
65                 storage:          storage,
66                 syncPeers:        newFastSyncPeers(),
67                 peers:            peers,
68                 blockProcessCh:   make(chan *blockMsg, blockProcessChSize),
69                 blocksProcessCh:  make(chan *blocksMsg, blocksProcessChSize),
70                 headersProcessCh: make(chan *headersMsg, headersProcessChSize),
71                 blocksMsgChanMap: make(map[string]chan []*types.Block),
72         }
73 }
74
75 func (mf *msgFetcher) addSyncPeer(peerID string) {
76         mf.syncPeers.add(peerID)
77 }
78
79 func (mf *msgFetcher) collectResultLoop(peerCh chan string, quit chan struct{}, resultCh chan *fetchBlocksResult, workerCloseCh chan struct{}, workSize int) {
80         defer close(workerCloseCh)
81         //collect fetch results
82         for resultCount := 0; resultCount < workSize && mf.syncPeers.size() > 0; resultCount++ {
83                 select {
84                 case result := <-resultCh:
85                         if result.err != nil {
86                                 log.WithFields(log.Fields{"module": logModule, "startHeight": result.startHeight, "stopHeight": result.stopHeight, "err": result.err}).Error("failed on fetch blocks")
87                                 return
88                         }
89
90                         peer, err := mf.syncPeers.selectIdlePeer()
91                         if err != nil {
92                                 log.WithFields(log.Fields{"module": logModule, "err": result.err}).Warn("failed on find fast sync peer")
93                                 break
94                         }
95                         peerCh <- peer
96                 case _, ok := <-quit:
97                         if !ok {
98                                 return
99                         }
100                 }
101         }
102 }
103
104 func (mf *msgFetcher) fetchBlocks(work *fetchBlocksWork, peerID string) ([]*types.Block, error) {
105         defer mf.syncPeers.setIdle(peerID)
106         startHash := work.startHeader.Hash()
107         stopHash := work.stopHeader.Hash()
108         blocks, err := mf.requireBlocks(peerID, []*bc.Hash{&startHash}, &stopHash)
109         if err != nil {
110                 mf.peers.ProcessIllegal(peerID, security.LevelConnException, err.Error())
111                 return nil, err
112         }
113
114         if err := mf.verifyBlocksMsg(blocks, work.startHeader, work.stopHeader); err != nil {
115                 mf.peers.ProcessIllegal(peerID, security.LevelConnException, err.Error())
116                 return nil, err
117         }
118
119         return blocks, nil
120 }
121
122 func (mf *msgFetcher) fetchBlocksProcess(work *fetchBlocksWork, peerCh chan string, downloadNotifyCh chan struct{}, closeCh chan struct{}) error {
123         for {
124                 select {
125                 case peerID := <-peerCh:
126                         for {
127                                 blocks, err := mf.fetchBlocks(work, peerID)
128                                 if err != nil {
129                                         log.WithFields(log.Fields{"module": logModule, "startHeight": work.startHeader.Height, "stopHeight": work.stopHeader.Height, "error": err}).Info("failed on fetch blocks")
130                                         break
131                                 }
132
133                                 if err := mf.storage.writeBlocks(peerID, blocks); err != nil {
134                                         log.WithFields(log.Fields{"module": logModule, "error": err}).Info("write block error")
135                                         return err
136                                 }
137
138                                 // send to block process pool
139                                 select {
140                                 case downloadNotifyCh <- struct{}{}:
141                                 default:
142                                 }
143
144                                 // work completed
145                                 if blocks[len(blocks)-1].Height >= work.stopHeader.Height-1 {
146                                         return nil
147                                 }
148
149                                 //unfinished work, continue
150                                 work.startHeader = &blocks[len(blocks)-1].BlockHeader
151                         }
152                 case <-closeCh:
153                         return nil
154                 }
155         }
156 }
157
158 func (mf *msgFetcher) fetchBlocksWorker(workCh chan *fetchBlocksWork, peerCh chan string, resultCh chan *fetchBlocksResult, closeCh chan struct{}, downloadNotifyCh chan struct{}, wg *sync.WaitGroup) {
159         for {
160                 select {
161                 case work := <-workCh:
162                         err := mf.fetchBlocksProcess(work, peerCh, downloadNotifyCh, closeCh)
163                         resultCh <- &fetchBlocksResult{startHeight: work.startHeader.Height, stopHeight: work.stopHeader.Height, err: err}
164                 case <-closeCh:
165                         wg.Done()
166                         return
167                 }
168         }
169 }
170
171 func (mf *msgFetcher) parallelFetchBlocks(works []*fetchBlocksWork, downloadNotifyCh chan struct{}, ProcessStopCh chan struct{}, wg *sync.WaitGroup) {
172         workSize := len(works)
173         workCh := make(chan *fetchBlocksWork, workSize)
174         peerCh := make(chan string, maxNumOfFastSyncPeers)
175         resultCh := make(chan *fetchBlocksResult, workSize)
176         closeCh := make(chan struct{})
177
178         for _, work := range works {
179                 workCh <- work
180         }
181         syncPeers := mf.syncPeers.selectIdlePeers()
182         for i := 0; i < len(syncPeers) && i < maxNumOfFastSyncPeers; i++ {
183                 peerCh <- syncPeers[i]
184         }
185
186         var workWg sync.WaitGroup
187         for i := 0; i <= maxNumOfParallelFetchBlocks && i < workSize; i++ {
188                 workWg.Add(1)
189                 go mf.fetchBlocksWorker(workCh, peerCh, resultCh, closeCh, downloadNotifyCh, &workWg)
190         }
191
192         go mf.collectResultLoop(peerCh, ProcessStopCh, resultCh, closeCh, workSize)
193
194         workWg.Wait()
195         close(resultCh)
196         close(peerCh)
197         close(workCh)
198         close(downloadNotifyCh)
199         wg.Done()
200 }
201
202 func (mf *msgFetcher) parallelFetchHeaders(peers []*peers.Peer, locator []*bc.Hash, stopHash *bc.Hash, skip uint64) map[string][]*types.BlockHeader {
203         result := make(map[string][]*types.BlockHeader)
204         response := make(map[string]bool)
205         for _, peer := range peers {
206                 if ok := peer.GetHeaders(locator, stopHash, skip); !ok {
207                         continue
208                 }
209                 result[peer.ID()] = nil
210         }
211
212         timeout := time.NewTimer(requireHeadersTimeout)
213         defer timeout.Stop()
214         for {
215                 select {
216                 case msg := <-mf.headersProcessCh:
217                         if _, ok := result[msg.peerID]; ok {
218                                 result[msg.peerID] = append(result[msg.peerID], msg.headers[:]...)
219                                 response[msg.peerID] = true
220                                 if len(response) == len(result) {
221                                         return result
222                                 }
223                         }
224                 case <-timeout.C:
225                         log.WithFields(log.Fields{"module": logModule, "err": errRequestTimeout}).Warn("failed on parallel fetch headers")
226                         return result
227                 }
228         }
229 }
230
231 func (mf *msgFetcher) processBlock(peerID string, block *types.Block) {
232         mf.blockProcessCh <- &blockMsg{block: block, peerID: peerID}
233 }
234
235 func (mf *msgFetcher) processBlocks(peerID string, blocks []*types.Block) {
236         mf.blocksProcessCh <- &blocksMsg{blocks: blocks, peerID: peerID}
237         mf.mux.RLock()
238         blocksMsgChan, ok := mf.blocksMsgChanMap[peerID]
239         mf.mux.RUnlock()
240         if !ok {
241                 mf.peers.ProcessIllegal(peerID, security.LevelMsgIllegal, "msg from unsolicited peer")
242                 return
243         }
244
245         blocksMsgChan <- blocks
246 }
247
248 func (mf *msgFetcher) processHeaders(peerID string, headers []*types.BlockHeader) {
249         mf.headersProcessCh <- &headersMsg{headers: headers, peerID: peerID}
250 }
251
252 func (mf *msgFetcher) requireBlock(peerID string, height uint64) (*types.Block, error) {
253         peer := mf.peers.GetPeer(peerID)
254         if peer == nil {
255                 return nil, errPeerDropped
256         }
257
258         if ok := peer.GetBlockByHeight(height); !ok {
259                 return nil, errSendMsg
260         }
261
262         timeout := time.NewTimer(requireBlockTimeout)
263         defer timeout.Stop()
264
265         for {
266                 select {
267                 case msg := <-mf.blockProcessCh:
268                         if msg.peerID != peerID {
269                                 continue
270                         }
271                         if msg.block.Height != height {
272                                 continue
273                         }
274                         return msg.block, nil
275                 case <-timeout.C:
276                         return nil, errors.Wrap(errRequestTimeout, "requireBlock")
277                 }
278         }
279 }
280
281 func (mf *msgFetcher) requireBlocks(peerID string, locator []*bc.Hash, stopHash *bc.Hash) ([]*types.Block, error) {
282         peer := mf.peers.GetPeer(peerID)
283         if peer == nil {
284                 mf.syncPeers.delete(peerID)
285                 return nil, errPeerDropped
286         }
287
288         receiveCh := make(chan []*types.Block, 1)
289         mf.mux.Lock()
290         mf.blocksMsgChanMap[peerID] = receiveCh
291         mf.mux.Unlock()
292
293         if ok := peer.GetBlocks(locator, stopHash); !ok {
294                 return nil, errSendMsg
295         }
296
297         timeout := time.NewTimer(requireBlocksTimeout)
298         defer timeout.Stop()
299         select {
300         case blocks := <-receiveCh:
301                 return blocks, nil
302         case <-timeout.C:
303                 return nil, errRequestBlocksTimeout
304         }
305 }
306
307 func (mf *msgFetcher) resetParameter() {
308         mf.blocksMsgChanMap = make(map[string]chan []*types.Block)
309         mf.syncPeers = newFastSyncPeers()
310         mf.storage.resetParameter()
311         //empty chan
312         for {
313                 select {
314                 case <-mf.blocksProcessCh:
315                 case <-mf.headersProcessCh:
316                 default:
317                         return
318                 }
319         }
320 }
321
322 func (mf *msgFetcher) verifyBlocksMsg(blocks []*types.Block, startHeader, stopHeader *types.BlockHeader) error {
323         // null blocks
324         if len(blocks) == 0 {
325                 return errors.New("null blocks msg")
326         }
327
328         // blocks more than request
329         if uint64(len(blocks)) > stopHeader.Height-startHeader.Height+1 {
330                 return errors.New("exceed length blocks msg")
331         }
332
333         // verify start block
334         if blocks[0].Hash() != startHeader.Hash() {
335                 return errors.New("get mismatch blocks msg")
336         }
337
338         // verify blocks continuity
339         for i := 0; i < len(blocks)-1; i++ {
340                 if blocks[i].Hash() != blocks[i+1].PreviousBlockHash {
341                         return errors.New("get discontinuous blocks msg")
342                 }
343         }
344
345         return nil
346 }