OSDN Git Service

3b6c746a40bb6212a3ee98b7186a8b3663b68b5f
[bytom/vapor.git] / netsync / consensusmgr / block_fetcher.go
1 package consensusmgr
2
3 import (
4         "github.com/sirupsen/logrus"
5         "gopkg.in/karalabe/cookiejar.v2/collections/prque"
6
7         "github.com/vapor/netsync/peers"
8         "github.com/vapor/protocol/bc"
9 )
10
11 const (
12         maxBlockDistance = 64
13         maxMsgSetSize    = 128
14         newBlockChSize   = 64
15 )
16
17 // blockFetcher is responsible for accumulating block announcements from various peers
18 // and scheduling them for retrieval.
19 type blockFetcher struct {
20         chain Chain
21         peers *peers.PeerSet
22
23         newBlockCh chan *blockMsg
24         queue      *prque.Prque
25         msgSet     map[bc.Hash]*blockMsg
26 }
27
28 //NewBlockFetcher creates a block fetcher to retrieve blocks of the new propose.
29 func newBlockFetcher(chain Chain, peers *peers.PeerSet) *blockFetcher {
30         f := &blockFetcher{
31                 chain:      chain,
32                 peers:      peers,
33                 newBlockCh: make(chan *blockMsg, newBlockChSize),
34                 queue:      prque.New(),
35                 msgSet:     make(map[bc.Hash]*blockMsg),
36         }
37         go f.blockProcessor()
38         return f
39 }
40
41 func (f *blockFetcher) blockProcessor() {
42         for {
43                 for !f.queue.Empty() {
44                         msg := f.queue.PopItem().(*blockMsg)
45                         if msg.block.Height > f.chain.BestBlockHeight()+1 {
46                                 f.queue.Push(msg, -float32(msg.block.Height))
47                                 break
48                         }
49
50                         f.insert(msg)
51                         delete(f.msgSet, msg.block.Hash())
52                 }
53                 f.add(<-f.newBlockCh)
54         }
55 }
56
57 func (f *blockFetcher) add(msg *blockMsg) {
58         bestHeight := f.chain.BestBlockHeight()
59         if len(f.msgSet) > maxMsgSetSize || bestHeight > msg.block.Height || msg.block.Height-bestHeight > maxBlockDistance {
60                 return
61         }
62
63         blockHash := msg.block.Hash()
64         if _, ok := f.msgSet[blockHash]; !ok {
65                 f.msgSet[blockHash] = msg
66                 f.queue.Push(msg, -float32(msg.block.Height))
67                 logrus.WithFields(logrus.Fields{
68                         "module":       logModule,
69                         "block height": msg.block.Height,
70                         "block hash":   blockHash.String(),
71                 }).Debug("blockFetcher receive propose block")
72         }
73 }
74
75 func (f *blockFetcher) insert(msg *blockMsg) {
76         isOrphan, err := f.chain.ProcessBlock(msg.block)
77         if err != nil {
78                 peer := f.peers.GetPeer(msg.peerID)
79                 if peer == nil {
80                         return
81                 }
82
83                 f.peers.AddBanScore(msg.peerID, 20, 0, err.Error())
84                 return
85         }
86
87         if isOrphan {
88                 return
89         }
90
91         hash := msg.block.Hash()
92         f.peers.SetStatus(msg.peerID, msg.block.Height, &hash)
93         proposeMsg, err := NewBlockProposeMsg(msg.block)
94         if err != nil {
95                 logrus.WithFields(logrus.Fields{"module": logModule, "err": err}).Error("failed on create BlockProposeMsg")
96                 return
97         }
98
99         if err := f.peers.BroadcastMsg(NewBroadcastMsg(proposeMsg, consensusChannel)); err != nil {
100                 logrus.WithFields(logrus.Fields{"module": logModule, "err": err}).Error("failed on broadcast proposed block")
101                 return
102         }
103 }
104
105 func (f *blockFetcher) processNewBlock(msg *blockMsg) {
106         f.newBlockCh <- msg
107 }