X-Git-Url: http://git.osdn.net/view?p=bytom%2Fvapor.git;a=blobdiff_plain;f=netsync%2Fconsensusmgr%2Fblock_fetcher.go;fp=netsync%2Fconsensusmgr%2Fblock_fetcher.go;h=6278c6180a24b04a06af1b4f9c011123ed8cb866;hp=0000000000000000000000000000000000000000;hb=c071c7f9648e8e8f39c1826ccb4e5d2ba5e8efd7;hpb=2d27cad41ef79da3a46142401b5efcf92133ea1a diff --git a/netsync/consensusmgr/block_fetcher.go b/netsync/consensusmgr/block_fetcher.go new file mode 100644 index 00000000..6278c618 --- /dev/null +++ b/netsync/consensusmgr/block_fetcher.go @@ -0,0 +1,108 @@ +package consensusmgr + +import ( + "github.com/sirupsen/logrus" + "gopkg.in/karalabe/cookiejar.v2/collections/prque" + + "github.com/vapor/netsync/peers" + "github.com/vapor/protocol/bc" +) + +const ( + maxBlockDistance = 64 + maxMsgSetSize = 128 + newBlockChSize = 64 +) + +// blockFetcher is responsible for accumulating block announcements from various peers +// and scheduling them for retrieval. +type blockFetcher struct { + chain Chain + peers *peers.PeerSet + + newBlockCh chan *blockMsg + queue *prque.Prque + msgSet map[bc.Hash]*blockMsg +} + +//NewBlockFetcher creates a block fetcher to retrieve blocks of the new propose. +func newBlockFetcher(chain Chain, peers *peers.PeerSet) *blockFetcher { + f := &blockFetcher{ + chain: chain, + peers: peers, + newBlockCh: make(chan *blockMsg, newBlockChSize), + queue: prque.New(), + msgSet: make(map[bc.Hash]*blockMsg), + } + go f.blockProcessor() + return f +} + +func (f *blockFetcher) blockProcessor() { + for { + height := f.chain.BestBlockHeight() + for !f.queue.Empty() { + msg := f.queue.PopItem().(*blockMsg) + if msg.block.Height > height+1 { + f.queue.Push(msg, -float32(msg.block.Height)) + break + } + + f.insert(msg) + delete(f.msgSet, msg.block.Hash()) + } + f.add(<-f.newBlockCh) + } +} + +func (f *blockFetcher) add(msg *blockMsg) { + bestHeight := f.chain.BestBlockHeight() + if len(f.msgSet) > maxMsgSetSize || bestHeight > msg.block.Height || msg.block.Height-bestHeight > maxBlockDistance { + return + } + + blockHash := msg.block.Hash() + if _, ok := f.msgSet[blockHash]; !ok { + f.msgSet[blockHash] = msg + f.queue.Push(msg, -float32(msg.block.Height)) + logrus.WithFields(logrus.Fields{ + "module": logModule, + "block height": msg.block.Height, + "block hash": blockHash.String(), + }).Debug("blockFetcher receive propose block") + } +} + +func (f *blockFetcher) insert(msg *blockMsg) { + isOrphan, err := f.chain.ProcessBlock(msg.block) + if err != nil { + peer := f.peers.GetPeer(msg.peerID) + if peer == nil { + return + } + + f.peers.AddBanScore(msg.peerID, 20, 0, err.Error()) + return + } + + if isOrphan { + return + } + + hash := msg.block.Hash() + f.peers.SetStatus(msg.peerID, msg.block.Height, &hash) + proposeMsg, err := NewBlockProposeMsg(msg.block) + if err != nil { + logrus.WithFields(logrus.Fields{"module": logModule, "err": err}).Error("failed on create BlockProposeMsg") + return + } + + if err := f.peers.BroadcastMsg(NewBroadcastMsg(proposeMsg, consensusChannel)); err != nil { + logrus.WithFields(logrus.Fields{"module": logModule, "err": err}).Error("failed on broadcast proposed block") + return + } +} + +func (f *blockFetcher) processNewBlock(msg *blockMsg) { + f.newBlockCh <- msg +}