4 "github.com/sirupsen/logrus"
5 "gopkg.in/karalabe/cookiejar.v2/collections/prque"
7 "github.com/vapor/netsync/peers"
8 "github.com/vapor/protocol/bc"
17 // blockFetcher is responsible for accumulating block announcements from various peers
18 // and scheduling them for retrieval.
19 type blockFetcher struct {
23 newBlockCh chan *blockMsg
25 msgSet map[bc.Hash]*blockMsg
28 //NewBlockFetcher creates a block fetcher to retrieve blocks of the new propose.
29 func newBlockFetcher(chain Chain, peers *peers.PeerSet) *blockFetcher {
33 newBlockCh: make(chan *blockMsg, newBlockChSize),
35 msgSet: make(map[bc.Hash]*blockMsg),
41 func (f *blockFetcher) blockProcessor() {
43 height := f.chain.BestBlockHeight()
44 for !f.queue.Empty() {
45 msg := f.queue.PopItem().(*blockMsg)
46 if msg.block.Height > height+1 {
47 f.queue.Push(msg, -float32(msg.block.Height))
52 delete(f.msgSet, msg.block.Hash())
58 func (f *blockFetcher) add(msg *blockMsg) {
59 bestHeight := f.chain.BestBlockHeight()
60 if len(f.msgSet) > maxMsgSetSize || bestHeight > msg.block.Height || msg.block.Height-bestHeight > maxBlockDistance {
64 blockHash := msg.block.Hash()
65 if _, ok := f.msgSet[blockHash]; !ok {
66 f.msgSet[blockHash] = msg
67 f.queue.Push(msg, -float32(msg.block.Height))
68 logrus.WithFields(logrus.Fields{
70 "block height": msg.block.Height,
71 "block hash": blockHash.String(),
72 }).Debug("blockFetcher receive propose block")
76 func (f *blockFetcher) insert(msg *blockMsg) {
77 isOrphan, err := f.chain.ProcessBlock(msg.block)
79 peer := f.peers.GetPeer(msg.peerID)
84 f.peers.AddBanScore(msg.peerID, 20, 0, err.Error())
92 hash := msg.block.Hash()
93 f.peers.SetStatus(msg.peerID, msg.block.Height, &hash)
94 proposeMsg, err := NewBlockProposeMsg(msg.block)
96 logrus.WithFields(logrus.Fields{"module": logModule, "err": err}).Error("failed on create BlockProposeMsg")
100 if err := f.peers.BroadcastMsg(NewBroadcastMsg(proposeMsg, consensusChannel)); err != nil {
101 logrus.WithFields(logrus.Fields{"module": logModule, "err": err}).Error("failed on broadcast proposed block")
106 func (f *blockFetcher) processNewBlock(msg *blockMsg) {