OSDN Git Service

Add consensus messages transfer (#90)
[bytom/vapor.git] / netsync / consensusmgr / block_fetcher.go
1 package consensusmgr
2
3 import (
4         "github.com/sirupsen/logrus"
5         "gopkg.in/karalabe/cookiejar.v2/collections/prque"
6
7         "github.com/vapor/netsync/peers"
8         "github.com/vapor/protocol/bc"
9 )
10
11 const (
12         maxBlockDistance = 64
13         maxMsgSetSize    = 128
14         newBlockChSize   = 64
15 )
16
17 // blockFetcher is responsible for accumulating block announcements from various peers
18 // and scheduling them for retrieval.
19 type blockFetcher struct {
20         chain Chain
21         peers *peers.PeerSet
22
23         newBlockCh chan *blockMsg
24         queue      *prque.Prque
25         msgSet     map[bc.Hash]*blockMsg
26 }
27
28 //NewBlockFetcher creates a block fetcher to retrieve blocks of the new propose.
29 func newBlockFetcher(chain Chain, peers *peers.PeerSet) *blockFetcher {
30         f := &blockFetcher{
31                 chain:      chain,
32                 peers:      peers,
33                 newBlockCh: make(chan *blockMsg, newBlockChSize),
34                 queue:      prque.New(),
35                 msgSet:     make(map[bc.Hash]*blockMsg),
36         }
37         go f.blockProcessor()
38         return f
39 }
40
41 func (f *blockFetcher) blockProcessor() {
42         for {
43                 height := f.chain.BestBlockHeight()
44                 for !f.queue.Empty() {
45                         msg := f.queue.PopItem().(*blockMsg)
46                         if msg.block.Height > height+1 {
47                                 f.queue.Push(msg, -float32(msg.block.Height))
48                                 break
49                         }
50
51                         f.insert(msg)
52                         delete(f.msgSet, msg.block.Hash())
53                 }
54                 f.add(<-f.newBlockCh)
55         }
56 }
57
58 func (f *blockFetcher) add(msg *blockMsg) {
59         bestHeight := f.chain.BestBlockHeight()
60         if len(f.msgSet) > maxMsgSetSize || bestHeight > msg.block.Height || msg.block.Height-bestHeight > maxBlockDistance {
61                 return
62         }
63
64         blockHash := msg.block.Hash()
65         if _, ok := f.msgSet[blockHash]; !ok {
66                 f.msgSet[blockHash] = msg
67                 f.queue.Push(msg, -float32(msg.block.Height))
68                 logrus.WithFields(logrus.Fields{
69                         "module":       logModule,
70                         "block height": msg.block.Height,
71                         "block hash":   blockHash.String(),
72                 }).Debug("blockFetcher receive propose block")
73         }
74 }
75
76 func (f *blockFetcher) insert(msg *blockMsg) {
77         isOrphan, err := f.chain.ProcessBlock(msg.block)
78         if err != nil {
79                 peer := f.peers.GetPeer(msg.peerID)
80                 if peer == nil {
81                         return
82                 }
83
84                 f.peers.AddBanScore(msg.peerID, 20, 0, err.Error())
85                 return
86         }
87
88         if isOrphan {
89                 return
90         }
91
92         hash := msg.block.Hash()
93         f.peers.SetStatus(msg.peerID, msg.block.Height, &hash)
94         proposeMsg, err := NewBlockProposeMsg(msg.block)
95         if err != nil {
96                 logrus.WithFields(logrus.Fields{"module": logModule, "err": err}).Error("failed on create BlockProposeMsg")
97                 return
98         }
99
100         if err := f.peers.BroadcastMsg(NewBroadcastMsg(proposeMsg, consensusChannel)); err != nil {
101                 logrus.WithFields(logrus.Fields{"module": logModule, "err": err}).Error("failed on broadcast proposed block")
102                 return
103         }
104 }
105
106 func (f *blockFetcher) processNewBlock(msg *blockMsg) {
107         f.newBlockCh <- msg
108 }