4 log "github.com/sirupsen/logrus"
5 "gopkg.in/karalabe/cookiejar.v2/collections/prque"
7 "github.com/vapor/p2p/security"
8 "github.com/vapor/protocol/bc"
14 msgLimit = 128 // peer message number limit
17 // blockFetcher is responsible for accumulating block announcements from various peers
18 // and scheduling them for retrieval.
19 type blockFetcher struct {
23 newBlockCh chan *blockMsg
24 queue *prque.Prque // block import priority queue
25 msgSet map[bc.Hash]*blockMsg // already queued blocks
26 msgCounter map[string]int // per peer msg counter to prevent DOS
29 //NewBlockFetcher creates a block fetcher to retrieve blocks of the new propose.
30 func newBlockFetcher(chain Chain, peers Peers) *blockFetcher {
34 newBlockCh: make(chan *blockMsg, newBlockChSize),
36 msgSet: make(map[bc.Hash]*blockMsg),
37 msgCounter: make(map[string]int),
41 func (f *blockFetcher) blockProcessorLoop() {
43 for !f.queue.Empty() {
44 msg := f.queue.PopItem().(*blockMsg)
45 if msg.block.Height > f.chain.BestBlockHeight()+1 {
46 f.queue.Push(msg, -float32(msg.block.Height))
51 delete(f.msgSet, msg.block.Hash())
52 f.msgCounter[msg.peerID]--
53 if f.msgCounter[msg.peerID] <= 0 {
54 delete(f.msgCounter, msg.peerID)
57 f.add(<-f.newBlockCh, msgLimit)
61 func (f *blockFetcher) add(msg *blockMsg, limit int) {
63 count := f.msgCounter[msg.peerID] + 1
65 log.WithFields(log.Fields{"module": logModule, "peer": msg.peerID, "limit": limit}).Warn("The number of peer messages exceeds the limit")
69 bestHeight := f.chain.BestBlockHeight()
70 if bestHeight > msg.block.Height || msg.block.Height-bestHeight > maxBlockDistance {
74 blockHash := msg.block.Hash()
75 if _, ok := f.msgSet[blockHash]; !ok {
76 f.msgSet[blockHash] = msg
77 f.queue.Push(msg, -float32(msg.block.Height))
78 f.msgCounter[msg.peerID] = count
79 log.WithFields(log.Fields{
81 "block height": msg.block.Height,
82 "block hash": blockHash.String(),
83 }).Debug("blockFetcher receive propose block")
87 func (f *blockFetcher) insert(msg *blockMsg) {
88 isOrphan, err := f.chain.ProcessBlock(msg.block)
90 peer := f.peers.GetPeer(msg.peerID)
94 f.peers.ProcessIllegal(msg.peerID, security.LevelMsgIllegal, err.Error())
102 proposeMsg, err := NewBlockProposeMsg(msg.block)
104 log.WithFields(log.Fields{"module": logModule, "err": err}).Error("failed on create BlockProposeMsg")
108 if err := f.peers.BroadcastMsg(NewBroadcastMsg(proposeMsg, consensusChannel)); err != nil {
109 log.WithFields(log.Fields{"module": logModule, "err": err}).Error("failed on broadcast proposed block")
114 func (f *blockFetcher) processNewBlock(msg *blockMsg) {