From: Yahtoo Ma Date: Mon, 5 Aug 2019 09:05:16 +0000 (+0800) Subject: Peer add announces new block message num limit X-Git-Url: http://git.osdn.net/view?p=bytom%2Fvapor.git;a=commitdiff_plain;h=refs%2Fheads%2Fblock_fetcher Peer add announces new block message num limit --- diff --git a/netsync/consensusmgr/block_fetcher.go b/netsync/consensusmgr/block_fetcher.go index 8c28ff90..96f7fed6 100644 --- a/netsync/consensusmgr/block_fetcher.go +++ b/netsync/consensusmgr/block_fetcher.go @@ -1,7 +1,7 @@ package consensusmgr import ( - "github.com/sirupsen/logrus" + log "github.com/sirupsen/logrus" "gopkg.in/karalabe/cookiejar.v2/collections/prque" "github.com/vapor/p2p/security" @@ -10,8 +10,8 @@ import ( const ( maxBlockDistance = 64 - maxMsgSetSize = 128 newBlockChSize = 64 + msgLimit = 128 // peer message number limit ) // blockFetcher is responsible for accumulating block announcements from various peers @@ -21,24 +21,24 @@ type blockFetcher struct { peers Peers newBlockCh chan *blockMsg - queue *prque.Prque - msgSet map[bc.Hash]*blockMsg + queue *prque.Prque // block import priority queue + msgSet map[bc.Hash]*blockMsg // already queued blocks + msgCounter map[string]int // per peer msg counter to prevent DOS } //NewBlockFetcher creates a block fetcher to retrieve blocks of the new propose. func newBlockFetcher(chain Chain, peers Peers) *blockFetcher { - f := &blockFetcher{ + return &blockFetcher{ chain: chain, peers: peers, newBlockCh: make(chan *blockMsg, newBlockChSize), queue: prque.New(), msgSet: make(map[bc.Hash]*blockMsg), + msgCounter: make(map[string]int), } - go f.blockProcessor() - return f } -func (f *blockFetcher) blockProcessor() { +func (f *blockFetcher) blockProcessorLoop() { for { for !f.queue.Empty() { msg := f.queue.PopItem().(*blockMsg) @@ -49,14 +49,25 @@ func (f *blockFetcher) blockProcessor() { f.insert(msg) delete(f.msgSet, msg.block.Hash()) + f.msgCounter[msg.peerID]-- + if f.msgCounter[msg.peerID] <= 0 { + delete(f.msgCounter, msg.peerID) + } } - f.add(<-f.newBlockCh) + f.add(<-f.newBlockCh, msgLimit) } } -func (f *blockFetcher) add(msg *blockMsg) { +func (f *blockFetcher) add(msg *blockMsg, limit int) { + // prevent DOS + count := f.msgCounter[msg.peerID] + 1 + if count > limit { + log.WithFields(log.Fields{"module": logModule, "peer": msg.peerID, "limit": limit}).Warn("The number of peer messages exceeds the limit") + return + } + bestHeight := f.chain.BestBlockHeight() - if len(f.msgSet) > maxMsgSetSize || bestHeight > msg.block.Height || msg.block.Height-bestHeight > maxBlockDistance { + if bestHeight > msg.block.Height || msg.block.Height-bestHeight > maxBlockDistance { return } @@ -64,7 +75,8 @@ func (f *blockFetcher) add(msg *blockMsg) { if _, ok := f.msgSet[blockHash]; !ok { f.msgSet[blockHash] = msg f.queue.Push(msg, -float32(msg.block.Height)) - logrus.WithFields(logrus.Fields{ + f.msgCounter[msg.peerID] = count + log.WithFields(log.Fields{ "module": logModule, "block height": msg.block.Height, "block hash": blockHash.String(), @@ -79,7 +91,6 @@ func (f *blockFetcher) insert(msg *blockMsg) { if peer == nil { return } - f.peers.ProcessIllegal(msg.peerID, security.LevelMsgIllegal, err.Error()) return } @@ -90,12 +101,12 @@ func (f *blockFetcher) insert(msg *blockMsg) { proposeMsg, err := NewBlockProposeMsg(msg.block) if err != nil { - logrus.WithFields(logrus.Fields{"module": logModule, "err": err}).Error("failed on create BlockProposeMsg") + log.WithFields(log.Fields{"module": logModule, "err": err}).Error("failed on create BlockProposeMsg") return } if err := f.peers.BroadcastMsg(NewBroadcastMsg(proposeMsg, consensusChannel)); err != nil { - logrus.WithFields(logrus.Fields{"module": logModule, "err": err}).Error("failed on broadcast proposed block") + log.WithFields(log.Fields{"module": logModule, "err": err}).Error("failed on broadcast proposed block") return } } diff --git a/netsync/consensusmgr/block_fetcher_test.go b/netsync/consensusmgr/block_fetcher_test.go index 8d0df223..96703516 100644 --- a/netsync/consensusmgr/block_fetcher_test.go +++ b/netsync/consensusmgr/block_fetcher_test.go @@ -127,7 +127,7 @@ func TestBlockFetcher(t *testing.T) { }, } fetcher := newBlockFetcher(newChain(), peers) - + go fetcher.blockProcessorLoop() for i, c := range testCase { fetcher.processNewBlock(c.blockMsg) time.Sleep(10 * time.Millisecond) @@ -137,3 +137,159 @@ func TestBlockFetcher(t *testing.T) { } } } + +func TestAddBlockMsg(t *testing.T) { + peers := peers.NewPeerSet(&peerMgr{}) + testPeer := "peer1" + testCase := []struct { + blocksMsg []*blockMsg + limit int + queueSize int + msgSetSize int + msgCounter int + }{ + //normal test + { + blocksMsg: []*blockMsg{ + { + block: &types.Block{ + BlockHeader: types.BlockHeader{ + Height: 100, + }, + }, + peerID: testPeer, + }, + { + block: &types.Block{ + BlockHeader: types.BlockHeader{ + Height: 101, + }, + }, + peerID: testPeer, + }, + { + block: &types.Block{ + BlockHeader: types.BlockHeader{ + Height: 102, + }, + }, + peerID: testPeer, + }, + }, + limit: 5, + queueSize: 3, + msgSetSize: 3, + msgCounter: 3, + }, + // test DOS + { + blocksMsg: []*blockMsg{ + { + block: &types.Block{ + BlockHeader: types.BlockHeader{ + Version: 1, + Height: 100, + }, + }, + peerID: testPeer, + }, + { + block: &types.Block{ + BlockHeader: types.BlockHeader{ + Version: 2, + Height: 100, + }, + }, + peerID: testPeer, + }, + { + block: &types.Block{ + BlockHeader: types.BlockHeader{ + Version: 3, + Height: 100, + }, + }, + peerID: testPeer, + }, + { + block: &types.Block{ + BlockHeader: types.BlockHeader{ + Version: 4, + Height: 100, + }, + }, + peerID: testPeer, + }, + }, + limit: 3, + queueSize: 3, + msgSetSize: 3, + msgCounter: 3, + }, + + // test msg height does not meet the requirements + { + blocksMsg: []*blockMsg{ + { + block: &types.Block{ + BlockHeader: types.BlockHeader{ + Version: 1, + Height: 98, + }, + }, + peerID: testPeer, + }, + { + block: &types.Block{ + BlockHeader: types.BlockHeader{ + Version: 2, + Height: 97, + }, + }, + peerID: testPeer, + }, + { + block: &types.Block{ + BlockHeader: types.BlockHeader{ + Version: 3, + Height: 164, + }, + }, + peerID: testPeer, + }, + { + block: &types.Block{ + BlockHeader: types.BlockHeader{ + Version: 4, + Height: 165, + }, + }, + peerID: testPeer, + }, + }, + limit: 5, + queueSize: 0, + msgSetSize: 0, + msgCounter: 0, + }, + } + + for i, c := range testCase { + fetcher := newBlockFetcher(newChain(), peers) + for _, msg := range c.blocksMsg { + fetcher.add(msg, c.limit) + } + + if fetcher.queue.Size() != c.queueSize { + t.Fatalf("index: %d queue size err got %d: want %d", i, fetcher.queue.Size(), c.queueSize) + } + + if len(fetcher.msgSet) != c.msgSetSize { + t.Fatalf("index: %d msg set size err got %d: want %d", i, len(fetcher.msgSet), c.msgSetSize) + } + + if fetcher.msgCounter[testPeer] != c.msgCounter { + t.Fatalf("index: %d peer msg counter err got %d: want %d", i, fetcher.msgCounter[testPeer], c.msgCounter) + } + } +} diff --git a/netsync/consensusmgr/handle.go b/netsync/consensusmgr/handle.go index 37f7ee82..a9e3a631 100644 --- a/netsync/consensusmgr/handle.go +++ b/netsync/consensusmgr/handle.go @@ -191,6 +191,7 @@ func (m *Manager) removePeer(peerID string) { //Start consensus manager service. func (m *Manager) Start() error { + go m.blockFetcher.blockProcessorLoop() go m.blockProposeMsgBroadcastLoop() go m.blockSignatureMsgBroadcastLoop() return nil