OSDN Git Service

fix ban node failed (#256)
[bytom/vapor.git] / netsync / chainmgr / msg_fetcher.go
1 package chainmgr
2
3 import (
4         "time"
5
6         "github.com/vapor/errors"
7         "github.com/vapor/netsync/peers"
8         "github.com/vapor/protocol/bc"
9         "github.com/vapor/protocol/bc/types"
10 )
11
12 const (
13         blockProcessChSize   = 1024
14         blocksProcessChSize  = 128
15         headersProcessChSize = 1024
16 )
17
18 type msgFetcher struct {
19         peers *peers.PeerSet
20
21         blockProcessCh   chan *blockMsg
22         blocksProcessCh  chan *blocksMsg
23         headersProcessCh chan *headersMsg
24 }
25
26 func newMsgFetcher(peers *peers.PeerSet) *msgFetcher {
27         return &msgFetcher{
28                 peers:            peers,
29                 blockProcessCh:   make(chan *blockMsg, blockProcessChSize),
30                 blocksProcessCh:  make(chan *blocksMsg, blocksProcessChSize),
31                 headersProcessCh: make(chan *headersMsg, headersProcessChSize),
32         }
33 }
34
35 func (mf *msgFetcher) processBlock(peerID string, block *types.Block) {
36         mf.blockProcessCh <- &blockMsg{block: block, peerID: peerID}
37 }
38
39 func (mf *msgFetcher) processBlocks(peerID string, blocks []*types.Block) {
40         mf.blocksProcessCh <- &blocksMsg{blocks: blocks, peerID: peerID}
41 }
42
43 func (mf *msgFetcher) processHeaders(peerID string, headers []*types.BlockHeader) {
44         mf.headersProcessCh <- &headersMsg{headers: headers, peerID: peerID}
45 }
46
47 func (mf *msgFetcher) requireBlock(peerID string, height uint64) (*types.Block, error) {
48         peer := mf.peers.GetPeer(peerID)
49         if peer == nil {
50                 return nil, errPeerDropped
51         }
52
53         if ok := peer.GetBlockByHeight(height); !ok {
54                 return nil, errPeerDropped
55         }
56
57         timeout := time.NewTimer(syncTimeout)
58         defer timeout.Stop()
59
60         for {
61                 select {
62                 case msg := <-mf.blockProcessCh:
63                         if msg.peerID != peerID {
64                                 continue
65                         }
66                         if msg.block.Height != height {
67                                 continue
68                         }
69                         return msg.block, nil
70                 case <-timeout.C:
71                         return nil, errors.Wrap(errRequestTimeout, "requireBlock")
72                 }
73         }
74 }
75
76 func (mf *msgFetcher) requireBlocks(peerID string, locator []*bc.Hash, stopHash *bc.Hash) ([]*types.Block, error) {
77         peer := mf.peers.GetPeer(peerID)
78         if peer == nil {
79                 return nil, errPeerDropped
80         }
81
82         if ok := peer.GetBlocks(locator, stopHash); !ok {
83                 return nil, errPeerDropped
84         }
85
86         timeout := time.NewTimer(syncTimeout)
87         defer timeout.Stop()
88
89         for {
90                 select {
91                 case msg := <-mf.blocksProcessCh:
92                         if msg.peerID != peerID {
93                                 continue
94                         }
95
96                         return msg.blocks, nil
97                 case <-timeout.C:
98                         return nil, errors.Wrap(errRequestTimeout, "requireBlocks")
99                 }
100         }
101 }
102
103 func (mf *msgFetcher) requireHeaders(peerID string, locator []*bc.Hash, stopHash *bc.Hash, skip uint64) ([]*types.BlockHeader, error) {
104         peer := mf.peers.GetPeer(peerID)
105         if peer == nil {
106                 return nil, errPeerDropped
107         }
108
109         if ok := peer.GetHeaders(locator, stopHash, skip); !ok {
110                 return nil, errPeerDropped
111         }
112
113         timeout := time.NewTimer(syncTimeout)
114         defer timeout.Stop()
115
116         for {
117                 select {
118                 case msg := <-mf.headersProcessCh:
119                         if msg.peerID != peerID {
120                                 continue
121                         }
122
123                         return msg.headers, nil
124                 case <-timeout.C:
125                         return nil, errors.Wrap(errRequestTimeout, "requireHeaders")
126                 }
127         }
128 }