package consensusmgr
import (
- "github.com/sirupsen/logrus"
+ log "github.com/sirupsen/logrus"
"gopkg.in/karalabe/cookiejar.v2/collections/prque"
"github.com/vapor/p2p/security"
const (
maxBlockDistance = 64
- maxMsgSetSize = 128
newBlockChSize = 64
+ msgLimit = 128 // peer message number limit
)
// blockFetcher is responsible for accumulating block announcements from various peers
peers Peers
newBlockCh chan *blockMsg
- queue *prque.Prque
- msgSet map[bc.Hash]*blockMsg
+ queue *prque.Prque // block import priority queue
+ msgSet map[bc.Hash]*blockMsg // already queued blocks
+ msgCounter map[string]int // per peer msg counter to prevent DOS
}
//NewBlockFetcher creates a block fetcher to retrieve blocks of the new propose.
func newBlockFetcher(chain Chain, peers Peers) *blockFetcher {
- f := &blockFetcher{
+ return &blockFetcher{
chain: chain,
peers: peers,
newBlockCh: make(chan *blockMsg, newBlockChSize),
queue: prque.New(),
msgSet: make(map[bc.Hash]*blockMsg),
+ msgCounter: make(map[string]int),
}
- go f.blockProcessor()
- return f
}
-func (f *blockFetcher) blockProcessor() {
+func (f *blockFetcher) blockProcessorLoop() {
for {
for !f.queue.Empty() {
msg := f.queue.PopItem().(*blockMsg)
f.insert(msg)
delete(f.msgSet, msg.block.Hash())
+ f.msgCounter[msg.peerID]--
+ if f.msgCounter[msg.peerID] <= 0 {
+ delete(f.msgCounter, msg.peerID)
+ }
}
- f.add(<-f.newBlockCh)
+ f.add(<-f.newBlockCh, msgLimit)
}
}
-func (f *blockFetcher) add(msg *blockMsg) {
+func (f *blockFetcher) add(msg *blockMsg, limit int) {
+ // prevent DOS
+ count := f.msgCounter[msg.peerID] + 1
+ if count > limit {
+ log.WithFields(log.Fields{"module": logModule, "peer": msg.peerID, "limit": limit}).Warn("The number of peer messages exceeds the limit")
+ return
+ }
+
bestHeight := f.chain.BestBlockHeight()
- if len(f.msgSet) > maxMsgSetSize || bestHeight > msg.block.Height || msg.block.Height-bestHeight > maxBlockDistance {
+ if bestHeight > msg.block.Height || msg.block.Height-bestHeight > maxBlockDistance {
return
}
if _, ok := f.msgSet[blockHash]; !ok {
f.msgSet[blockHash] = msg
f.queue.Push(msg, -float32(msg.block.Height))
- logrus.WithFields(logrus.Fields{
+ f.msgCounter[msg.peerID] = count
+ log.WithFields(log.Fields{
"module": logModule,
"block height": msg.block.Height,
"block hash": blockHash.String(),
if peer == nil {
return
}
-
f.peers.ProcessIllegal(msg.peerID, security.LevelMsgIllegal, err.Error())
return
}
proposeMsg, err := NewBlockProposeMsg(msg.block)
if err != nil {
- logrus.WithFields(logrus.Fields{"module": logModule, "err": err}).Error("failed on create BlockProposeMsg")
+ log.WithFields(log.Fields{"module": logModule, "err": err}).Error("failed on create BlockProposeMsg")
return
}
if err := f.peers.BroadcastMsg(NewBroadcastMsg(proposeMsg, consensusChannel)); err != nil {
- logrus.WithFields(logrus.Fields{"module": logModule, "err": err}).Error("failed on broadcast proposed block")
+ log.WithFields(log.Fields{"module": logModule, "err": err}).Error("failed on broadcast proposed block")
return
}
}
},
}
fetcher := newBlockFetcher(newChain(), peers)
-
+ go fetcher.blockProcessorLoop()
for i, c := range testCase {
fetcher.processNewBlock(c.blockMsg)
time.Sleep(10 * time.Millisecond)
}
}
}
+
+func TestAddBlockMsg(t *testing.T) {
+ peers := peers.NewPeerSet(&peerMgr{})
+ testPeer := "peer1"
+ testCase := []struct {
+ blocksMsg []*blockMsg
+ limit int
+ queueSize int
+ msgSetSize int
+ msgCounter int
+ }{
+ //normal test
+ {
+ blocksMsg: []*blockMsg{
+ {
+ block: &types.Block{
+ BlockHeader: types.BlockHeader{
+ Height: 100,
+ },
+ },
+ peerID: testPeer,
+ },
+ {
+ block: &types.Block{
+ BlockHeader: types.BlockHeader{
+ Height: 101,
+ },
+ },
+ peerID: testPeer,
+ },
+ {
+ block: &types.Block{
+ BlockHeader: types.BlockHeader{
+ Height: 102,
+ },
+ },
+ peerID: testPeer,
+ },
+ },
+ limit: 5,
+ queueSize: 3,
+ msgSetSize: 3,
+ msgCounter: 3,
+ },
+ // test DOS
+ {
+ blocksMsg: []*blockMsg{
+ {
+ block: &types.Block{
+ BlockHeader: types.BlockHeader{
+ Version: 1,
+ Height: 100,
+ },
+ },
+ peerID: testPeer,
+ },
+ {
+ block: &types.Block{
+ BlockHeader: types.BlockHeader{
+ Version: 2,
+ Height: 100,
+ },
+ },
+ peerID: testPeer,
+ },
+ {
+ block: &types.Block{
+ BlockHeader: types.BlockHeader{
+ Version: 3,
+ Height: 100,
+ },
+ },
+ peerID: testPeer,
+ },
+ {
+ block: &types.Block{
+ BlockHeader: types.BlockHeader{
+ Version: 4,
+ Height: 100,
+ },
+ },
+ peerID: testPeer,
+ },
+ },
+ limit: 3,
+ queueSize: 3,
+ msgSetSize: 3,
+ msgCounter: 3,
+ },
+
+ // test msg height does not meet the requirements
+ {
+ blocksMsg: []*blockMsg{
+ {
+ block: &types.Block{
+ BlockHeader: types.BlockHeader{
+ Version: 1,
+ Height: 98,
+ },
+ },
+ peerID: testPeer,
+ },
+ {
+ block: &types.Block{
+ BlockHeader: types.BlockHeader{
+ Version: 2,
+ Height: 97,
+ },
+ },
+ peerID: testPeer,
+ },
+ {
+ block: &types.Block{
+ BlockHeader: types.BlockHeader{
+ Version: 3,
+ Height: 164,
+ },
+ },
+ peerID: testPeer,
+ },
+ {
+ block: &types.Block{
+ BlockHeader: types.BlockHeader{
+ Version: 4,
+ Height: 165,
+ },
+ },
+ peerID: testPeer,
+ },
+ },
+ limit: 5,
+ queueSize: 0,
+ msgSetSize: 0,
+ msgCounter: 0,
+ },
+ }
+
+ for i, c := range testCase {
+ fetcher := newBlockFetcher(newChain(), peers)
+ for _, msg := range c.blocksMsg {
+ fetcher.add(msg, c.limit)
+ }
+
+ if fetcher.queue.Size() != c.queueSize {
+ t.Fatalf("index: %d queue size err got %d: want %d", i, fetcher.queue.Size(), c.queueSize)
+ }
+
+ if len(fetcher.msgSet) != c.msgSetSize {
+ t.Fatalf("index: %d msg set size err got %d: want %d", i, len(fetcher.msgSet), c.msgSetSize)
+ }
+
+ if fetcher.msgCounter[testPeer] != c.msgCounter {
+ t.Fatalf("index: %d peer msg counter err got %d: want %d", i, fetcher.msgCounter[testPeer], c.msgCounter)
+ }
+ }
+}