package consensusmgr
import (
- "github.com/sirupsen/logrus"
+ log "github.com/sirupsen/logrus"
"gopkg.in/karalabe/cookiejar.v2/collections/prque"
"github.com/vapor/p2p/security"
const (
maxBlockDistance = 64
- maxMsgSetSize = 128
newBlockChSize = 64
+ msgLimit = 128 // peer message number limit
)
// blockFetcher is responsible for accumulating block announcements from various peers
peers Peers
newBlockCh chan *blockMsg
- queue *prque.Prque
- msgSet map[bc.Hash]*blockMsg
+ queue *prque.Prque // block import priority queue
+ msgSet map[bc.Hash]*blockMsg // already queued blocks
+ msgCounter map[string]int // per peer msg counter to prevent DOS
}
//NewBlockFetcher creates a block fetcher to retrieve blocks of the new propose.
func newBlockFetcher(chain Chain, peers Peers) *blockFetcher {
- f := &blockFetcher{
+ return &blockFetcher{
chain: chain,
peers: peers,
newBlockCh: make(chan *blockMsg, newBlockChSize),
queue: prque.New(),
msgSet: make(map[bc.Hash]*blockMsg),
+ msgCounter: make(map[string]int),
}
- go f.blockProcessor()
- return f
}
-func (f *blockFetcher) blockProcessor() {
+func (f *blockFetcher) blockProcessorLoop() {
for {
for !f.queue.Empty() {
msg := f.queue.PopItem().(*blockMsg)
f.insert(msg)
delete(f.msgSet, msg.block.Hash())
+ f.msgCounter[msg.peerID]--
+ if f.msgCounter[msg.peerID] <= 0 {
+ delete(f.msgCounter, msg.peerID)
+ }
}
- f.add(<-f.newBlockCh)
+ f.add(<-f.newBlockCh, msgLimit)
}
}
-func (f *blockFetcher) add(msg *blockMsg) {
+func (f *blockFetcher) add(msg *blockMsg, limit int) {
+ // prevent DOS
+ count := f.msgCounter[msg.peerID] + 1
+ if count > limit {
+ log.WithFields(log.Fields{"module": logModule, "peer": msg.peerID, "limit": limit}).Warn("The number of peer messages exceeds the limit")
+ return
+ }
+
bestHeight := f.chain.BestBlockHeight()
- if len(f.msgSet) > maxMsgSetSize || bestHeight > msg.block.Height || msg.block.Height-bestHeight > maxBlockDistance {
+ if bestHeight > msg.block.Height || msg.block.Height-bestHeight > maxBlockDistance {
return
}
if _, ok := f.msgSet[blockHash]; !ok {
f.msgSet[blockHash] = msg
f.queue.Push(msg, -float32(msg.block.Height))
- logrus.WithFields(logrus.Fields{
+ f.msgCounter[msg.peerID] = count
+ log.WithFields(log.Fields{
"module": logModule,
"block height": msg.block.Height,
"block hash": blockHash.String(),
if peer == nil {
return
}
-
f.peers.ProcessIllegal(msg.peerID, security.LevelMsgIllegal, err.Error())
return
}
proposeMsg, err := NewBlockProposeMsg(msg.block)
if err != nil {
- logrus.WithFields(logrus.Fields{"module": logModule, "err": err}).Error("failed on create BlockProposeMsg")
+ log.WithFields(log.Fields{"module": logModule, "err": err}).Error("failed on create BlockProposeMsg")
return
}
if err := f.peers.BroadcastMsg(NewBroadcastMsg(proposeMsg, consensusChannel)); err != nil {
- logrus.WithFields(logrus.Fields{"module": logModule, "err": err}).Error("failed on broadcast proposed block")
+ log.WithFields(log.Fields{"module": logModule, "err": err}).Error("failed on broadcast proposed block")
return
}
}