6 "github.com/vapor/errors"
7 "github.com/vapor/netsync/peers"
8 "github.com/vapor/protocol/bc"
9 "github.com/vapor/protocol/bc/types"
13 blockProcessChSize = 1024
14 blocksProcessChSize = 128
15 headersProcessChSize = 1024
18 type msgFetcher struct {
21 blockProcessCh chan *blockMsg
22 blocksProcessCh chan *blocksMsg
23 headersProcessCh chan *headersMsg
26 func newMsgFetcher(peers *peers.PeerSet) *msgFetcher {
29 blockProcessCh: make(chan *blockMsg, blockProcessChSize),
30 blocksProcessCh: make(chan *blocksMsg, blocksProcessChSize),
31 headersProcessCh: make(chan *headersMsg, headersProcessChSize),
35 func (mf *msgFetcher) processBlock(peerID string, block *types.Block) {
36 mf.blockProcessCh <- &blockMsg{block: block, peerID: peerID}
39 func (mf *msgFetcher) processBlocks(peerID string, blocks []*types.Block) {
40 mf.blocksProcessCh <- &blocksMsg{blocks: blocks, peerID: peerID}
43 func (mf *msgFetcher) processHeaders(peerID string, headers []*types.BlockHeader) {
44 mf.headersProcessCh <- &headersMsg{headers: headers, peerID: peerID}
47 func (mf *msgFetcher) requireBlock(peerID string, height uint64) (*types.Block, error) {
48 peer := mf.peers.GetPeer(peerID)
50 return nil, errPeerDropped
53 if ok := peer.GetBlockByHeight(height); !ok {
54 return nil, errPeerDropped
57 timeout := time.NewTimer(syncTimeout)
62 case msg := <-mf.blockProcessCh:
63 if msg.peerID != peerID {
66 if msg.block.Height != height {
71 return nil, errors.Wrap(errRequestTimeout, "requireBlock")
76 func (mf *msgFetcher) requireBlocks(peerID string, locator []*bc.Hash, stopHash *bc.Hash) ([]*types.Block, error) {
77 peer := mf.peers.GetPeer(peerID)
79 return nil, errPeerDropped
82 if ok := peer.GetBlocks(locator, stopHash); !ok {
83 return nil, errPeerDropped
86 timeout := time.NewTimer(syncTimeout)
91 case msg := <-mf.blocksProcessCh:
92 if msg.peerID != peerID {
96 return msg.blocks, nil
98 return nil, errors.Wrap(errRequestTimeout, "requireBlocks")
103 func (mf *msgFetcher) requireHeaders(peerID string, locator []*bc.Hash, stopHash *bc.Hash, skip uint64) ([]*types.BlockHeader, error) {
104 peer := mf.peers.GetPeer(peerID)
106 return nil, errPeerDropped
109 if ok := peer.GetHeaders(locator, stopHash, skip); !ok {
110 return nil, errPeerDropped
113 timeout := time.NewTimer(syncTimeout)
118 case msg := <-mf.headersProcessCh:
119 if msg.peerID != peerID {
123 return msg.headers, nil
125 return nil, errors.Wrap(errRequestTimeout, "requireHeaders")