OSDN Git Service

Peer add announces new block message num limit block_fetcher
authorYahtoo Ma <yahtoo.ma@gmail.com>
Mon, 5 Aug 2019 09:05:16 +0000 (17:05 +0800)
committerYahtoo Ma <yahtoo.ma@gmail.com>
Mon, 5 Aug 2019 09:05:16 +0000 (17:05 +0800)
netsync/consensusmgr/block_fetcher.go
netsync/consensusmgr/block_fetcher_test.go
netsync/consensusmgr/handle.go

index 8c28ff9..96f7fed 100644 (file)
@@ -1,7 +1,7 @@
 package consensusmgr
 
 import (
-       "github.com/sirupsen/logrus"
+       log "github.com/sirupsen/logrus"
        "gopkg.in/karalabe/cookiejar.v2/collections/prque"
 
        "github.com/vapor/p2p/security"
@@ -10,8 +10,8 @@ import (
 
 const (
        maxBlockDistance = 64
-       maxMsgSetSize    = 128
        newBlockChSize   = 64
+       msgLimit         = 128 // peer message number limit
 )
 
 // blockFetcher is responsible for accumulating block announcements from various peers
@@ -21,24 +21,24 @@ type blockFetcher struct {
        peers Peers
 
        newBlockCh chan *blockMsg
-       queue      *prque.Prque
-       msgSet     map[bc.Hash]*blockMsg
+       queue      *prque.Prque          // block import priority queue
+       msgSet     map[bc.Hash]*blockMsg // already queued blocks
+       msgCounter map[string]int        // per peer msg counter to prevent DOS
 }
 
 //NewBlockFetcher creates a block fetcher to retrieve blocks of the new propose.
 func newBlockFetcher(chain Chain, peers Peers) *blockFetcher {
-       f := &blockFetcher{
+       return &blockFetcher{
                chain:      chain,
                peers:      peers,
                newBlockCh: make(chan *blockMsg, newBlockChSize),
                queue:      prque.New(),
                msgSet:     make(map[bc.Hash]*blockMsg),
+               msgCounter: make(map[string]int),
        }
-       go f.blockProcessor()
-       return f
 }
 
-func (f *blockFetcher) blockProcessor() {
+func (f *blockFetcher) blockProcessorLoop() {
        for {
                for !f.queue.Empty() {
                        msg := f.queue.PopItem().(*blockMsg)
@@ -49,14 +49,25 @@ func (f *blockFetcher) blockProcessor() {
 
                        f.insert(msg)
                        delete(f.msgSet, msg.block.Hash())
+                       f.msgCounter[msg.peerID]--
+                       if f.msgCounter[msg.peerID] <= 0 {
+                               delete(f.msgCounter, msg.peerID)
+                       }
                }
-               f.add(<-f.newBlockCh)
+               f.add(<-f.newBlockCh, msgLimit)
        }
 }
 
-func (f *blockFetcher) add(msg *blockMsg) {
+func (f *blockFetcher) add(msg *blockMsg, limit int) {
+       // prevent DOS
+       count := f.msgCounter[msg.peerID] + 1
+       if count > limit {
+               log.WithFields(log.Fields{"module": logModule, "peer": msg.peerID, "limit": limit}).Warn("The number of peer messages exceeds the limit")
+               return
+       }
+
        bestHeight := f.chain.BestBlockHeight()
-       if len(f.msgSet) > maxMsgSetSize || bestHeight > msg.block.Height || msg.block.Height-bestHeight > maxBlockDistance {
+       if bestHeight > msg.block.Height || msg.block.Height-bestHeight > maxBlockDistance {
                return
        }
 
@@ -64,7 +75,8 @@ func (f *blockFetcher) add(msg *blockMsg) {
        if _, ok := f.msgSet[blockHash]; !ok {
                f.msgSet[blockHash] = msg
                f.queue.Push(msg, -float32(msg.block.Height))
-               logrus.WithFields(logrus.Fields{
+               f.msgCounter[msg.peerID] = count
+               log.WithFields(log.Fields{
                        "module":       logModule,
                        "block height": msg.block.Height,
                        "block hash":   blockHash.String(),
@@ -79,7 +91,6 @@ func (f *blockFetcher) insert(msg *blockMsg) {
                if peer == nil {
                        return
                }
-
                f.peers.ProcessIllegal(msg.peerID, security.LevelMsgIllegal, err.Error())
                return
        }
@@ -90,12 +101,12 @@ func (f *blockFetcher) insert(msg *blockMsg) {
 
        proposeMsg, err := NewBlockProposeMsg(msg.block)
        if err != nil {
-               logrus.WithFields(logrus.Fields{"module": logModule, "err": err}).Error("failed on create BlockProposeMsg")
+               log.WithFields(log.Fields{"module": logModule, "err": err}).Error("failed on create BlockProposeMsg")
                return
        }
 
        if err := f.peers.BroadcastMsg(NewBroadcastMsg(proposeMsg, consensusChannel)); err != nil {
-               logrus.WithFields(logrus.Fields{"module": logModule, "err": err}).Error("failed on broadcast proposed block")
+               log.WithFields(log.Fields{"module": logModule, "err": err}).Error("failed on broadcast proposed block")
                return
        }
 }
index 8d0df22..9670351 100644 (file)
@@ -127,7 +127,7 @@ func TestBlockFetcher(t *testing.T) {
                },
        }
        fetcher := newBlockFetcher(newChain(), peers)
-
+       go fetcher.blockProcessorLoop()
        for i, c := range testCase {
                fetcher.processNewBlock(c.blockMsg)
                time.Sleep(10 * time.Millisecond)
@@ -137,3 +137,159 @@ func TestBlockFetcher(t *testing.T) {
                }
        }
 }
+
+func TestAddBlockMsg(t *testing.T) {
+       peers := peers.NewPeerSet(&peerMgr{})
+       testPeer := "peer1"
+       testCase := []struct {
+               blocksMsg  []*blockMsg
+               limit      int
+               queueSize  int
+               msgSetSize int
+               msgCounter int
+       }{
+               //normal test
+               {
+                       blocksMsg: []*blockMsg{
+                               {
+                                       block: &types.Block{
+                                               BlockHeader: types.BlockHeader{
+                                                       Height: 100,
+                                               },
+                                       },
+                                       peerID: testPeer,
+                               },
+                               {
+                                       block: &types.Block{
+                                               BlockHeader: types.BlockHeader{
+                                                       Height: 101,
+                                               },
+                                       },
+                                       peerID: testPeer,
+                               },
+                               {
+                                       block: &types.Block{
+                                               BlockHeader: types.BlockHeader{
+                                                       Height: 102,
+                                               },
+                                       },
+                                       peerID: testPeer,
+                               },
+                       },
+                       limit:      5,
+                       queueSize:  3,
+                       msgSetSize: 3,
+                       msgCounter: 3,
+               },
+               // test DOS
+               {
+                       blocksMsg: []*blockMsg{
+                               {
+                                       block: &types.Block{
+                                               BlockHeader: types.BlockHeader{
+                                                       Version: 1,
+                                                       Height:  100,
+                                               },
+                                       },
+                                       peerID: testPeer,
+                               },
+                               {
+                                       block: &types.Block{
+                                               BlockHeader: types.BlockHeader{
+                                                       Version: 2,
+                                                       Height:  100,
+                                               },
+                                       },
+                                       peerID: testPeer,
+                               },
+                               {
+                                       block: &types.Block{
+                                               BlockHeader: types.BlockHeader{
+                                                       Version: 3,
+                                                       Height:  100,
+                                               },
+                                       },
+                                       peerID: testPeer,
+                               },
+                               {
+                                       block: &types.Block{
+                                               BlockHeader: types.BlockHeader{
+                                                       Version: 4,
+                                                       Height:  100,
+                                               },
+                                       },
+                                       peerID: testPeer,
+                               },
+                       },
+                       limit:      3,
+                       queueSize:  3,
+                       msgSetSize: 3,
+                       msgCounter: 3,
+               },
+
+               // test msg height does not meet the requirements
+               {
+                       blocksMsg: []*blockMsg{
+                               {
+                                       block: &types.Block{
+                                               BlockHeader: types.BlockHeader{
+                                                       Version: 1,
+                                                       Height:  98,
+                                               },
+                                       },
+                                       peerID: testPeer,
+                               },
+                               {
+                                       block: &types.Block{
+                                               BlockHeader: types.BlockHeader{
+                                                       Version: 2,
+                                                       Height:  97,
+                                               },
+                                       },
+                                       peerID: testPeer,
+                               },
+                               {
+                                       block: &types.Block{
+                                               BlockHeader: types.BlockHeader{
+                                                       Version: 3,
+                                                       Height:  164,
+                                               },
+                                       },
+                                       peerID: testPeer,
+                               },
+                               {
+                                       block: &types.Block{
+                                               BlockHeader: types.BlockHeader{
+                                                       Version: 4,
+                                                       Height:  165,
+                                               },
+                                       },
+                                       peerID: testPeer,
+                               },
+                       },
+                       limit:      5,
+                       queueSize:  0,
+                       msgSetSize: 0,
+                       msgCounter: 0,
+               },
+       }
+
+       for i, c := range testCase {
+               fetcher := newBlockFetcher(newChain(), peers)
+               for _, msg := range c.blocksMsg {
+                       fetcher.add(msg, c.limit)
+               }
+
+               if fetcher.queue.Size() != c.queueSize {
+                       t.Fatalf("index: %d queue size err got %d: want %d", i, fetcher.queue.Size(), c.queueSize)
+               }
+
+               if len(fetcher.msgSet) != c.msgSetSize {
+                       t.Fatalf("index: %d msg set size err got %d: want %d", i, len(fetcher.msgSet), c.msgSetSize)
+               }
+
+               if fetcher.msgCounter[testPeer] != c.msgCounter {
+                       t.Fatalf("index: %d peer msg counter err got %d: want %d", i, fetcher.msgCounter[testPeer], c.msgCounter)
+               }
+       }
+}
index 37f7ee8..a9e3a63 100644 (file)
@@ -191,6 +191,7 @@ func (m *Manager) removePeer(peerID string) {
 
 //Start consensus manager service.
 func (m *Manager) Start() error {
+       go m.blockFetcher.blockProcessorLoop()
        go m.blockProposeMsgBroadcastLoop()
        go m.blockSignatureMsgBroadcastLoop()
        return nil