OSDN Git Service

rename (#465)
[bytom/vapor.git] / netsync / consensusmgr / block_fetcher.go
1 package consensusmgr
2
3 import (
4         log "github.com/sirupsen/logrus"
5         "gopkg.in/karalabe/cookiejar.v2/collections/prque"
6
7         "github.com/bytom/vapor/p2p/security"
8         "github.com/bytom/vapor/protocol/bc"
9 )
10
11 const (
12         maxBlockDistance = 64
13         newBlockChSize   = 64
14         msgLimit         = 128 // peer message number limit
15 )
16
17 // blockFetcher is responsible for accumulating block announcements from various peers
18 // and scheduling them for retrieval.
19 type blockFetcher struct {
20         chain Chain
21         peers Peers
22
23         newBlockCh chan *blockMsg
24         queue      *prque.Prque          // block import priority queue
25         msgSet     map[bc.Hash]*blockMsg // already queued blocks
26         msgCounter map[string]int        // per peer msg counter to prevent DOS
27 }
28
29 //NewBlockFetcher creates a block fetcher to retrieve blocks of the new propose.
30 func newBlockFetcher(chain Chain, peers Peers) *blockFetcher {
31         return &blockFetcher{
32                 chain:      chain,
33                 peers:      peers,
34                 newBlockCh: make(chan *blockMsg, newBlockChSize),
35                 queue:      prque.New(),
36                 msgSet:     make(map[bc.Hash]*blockMsg),
37                 msgCounter: make(map[string]int),
38         }
39 }
40
41 func (f *blockFetcher) blockProcessorLoop() {
42         for {
43                 for !f.queue.Empty() {
44                         msg := f.queue.PopItem().(*blockMsg)
45                         if msg.block.Height > f.chain.BestBlockHeight()+1 {
46                                 f.queue.Push(msg, -float32(msg.block.Height))
47                                 break
48                         }
49
50                         f.insert(msg)
51                         delete(f.msgSet, msg.block.Hash())
52                         f.msgCounter[msg.peerID]--
53                         if f.msgCounter[msg.peerID] <= 0 {
54                                 delete(f.msgCounter, msg.peerID)
55                         }
56                 }
57                 f.add(<-f.newBlockCh, msgLimit)
58         }
59 }
60
61 func (f *blockFetcher) add(msg *blockMsg, limit int) {
62         // prevent DOS
63         count := f.msgCounter[msg.peerID] + 1
64         if count > limit {
65                 log.WithFields(log.Fields{"module": logModule, "peer": msg.peerID, "limit": limit}).Warn("The number of peer messages exceeds the limit")
66                 return
67         }
68
69         bestHeight := f.chain.BestBlockHeight()
70         if bestHeight > msg.block.Height || msg.block.Height-bestHeight > maxBlockDistance {
71                 return
72         }
73
74         blockHash := msg.block.Hash()
75         if _, ok := f.msgSet[blockHash]; !ok {
76                 f.msgSet[blockHash] = msg
77                 f.queue.Push(msg, -float32(msg.block.Height))
78                 f.msgCounter[msg.peerID] = count
79                 log.WithFields(log.Fields{
80                         "module":       logModule,
81                         "block height": msg.block.Height,
82                         "block hash":   blockHash.String(),
83                 }).Debug("blockFetcher receive propose block")
84         }
85 }
86
87 func (f *blockFetcher) insert(msg *blockMsg) {
88         isOrphan, err := f.chain.ProcessBlock(msg.block)
89         if err != nil {
90                 peer := f.peers.GetPeer(msg.peerID)
91                 if peer == nil {
92                         return
93                 }
94                 f.peers.ProcessIllegal(msg.peerID, security.LevelMsgIllegal, err.Error())
95                 return
96         }
97
98         if isOrphan {
99                 return
100         }
101
102         proposeMsg, err := NewBlockProposeMsg(msg.block)
103         if err != nil {
104                 log.WithFields(log.Fields{"module": logModule, "err": err}).Error("failed on create BlockProposeMsg")
105                 return
106         }
107
108         if err := f.peers.BroadcastMsg(NewBroadcastMsg(proposeMsg, consensusChannel)); err != nil {
109                 log.WithFields(log.Fields{"module": logModule, "err": err}).Error("failed on broadcast proposed block")
110                 return
111         }
112 }
113
114 func (f *blockFetcher) processNewBlock(msg *blockMsg) {
115         f.newBlockCh <- msg
116 }