OSDN Git Service

Merge pull request #201 from Bytom/v0.1
[bytom/vapor.git] / netsync / consensusmgr / block_fetcher.go
diff --git a/netsync/consensusmgr/block_fetcher.go b/netsync/consensusmgr/block_fetcher.go
new file mode 100644 (file)
index 0000000..3c92b38
--- /dev/null
@@ -0,0 +1,106 @@
+package consensusmgr
+
+import (
+       "github.com/sirupsen/logrus"
+       "gopkg.in/karalabe/cookiejar.v2/collections/prque"
+
+       "github.com/vapor/netsync/peers"
+       "github.com/vapor/p2p/security"
+       "github.com/vapor/protocol/bc"
+)
+
+const (
+       maxBlockDistance = 64
+       maxMsgSetSize    = 128
+       newBlockChSize   = 64
+)
+
+// blockFetcher is responsible for accumulating block announcements from various peers
+// and scheduling them for retrieval.
+type blockFetcher struct {
+       chain Chain
+       peers *peers.PeerSet
+
+       newBlockCh chan *blockMsg
+       queue      *prque.Prque
+       msgSet     map[bc.Hash]*blockMsg
+}
+
+//NewBlockFetcher creates a block fetcher to retrieve blocks of the new propose.
+func newBlockFetcher(chain Chain, peers *peers.PeerSet) *blockFetcher {
+       f := &blockFetcher{
+               chain:      chain,
+               peers:      peers,
+               newBlockCh: make(chan *blockMsg, newBlockChSize),
+               queue:      prque.New(),
+               msgSet:     make(map[bc.Hash]*blockMsg),
+       }
+       go f.blockProcessor()
+       return f
+}
+
+func (f *blockFetcher) blockProcessor() {
+       for {
+               for !f.queue.Empty() {
+                       msg := f.queue.PopItem().(*blockMsg)
+                       if msg.block.Height > f.chain.BestBlockHeight()+1 {
+                               f.queue.Push(msg, -float32(msg.block.Height))
+                               break
+                       }
+
+                       f.insert(msg)
+                       delete(f.msgSet, msg.block.Hash())
+               }
+               f.add(<-f.newBlockCh)
+       }
+}
+
+func (f *blockFetcher) add(msg *blockMsg) {
+       bestHeight := f.chain.BestBlockHeight()
+       if len(f.msgSet) > maxMsgSetSize || bestHeight > msg.block.Height || msg.block.Height-bestHeight > maxBlockDistance {
+               return
+       }
+
+       blockHash := msg.block.Hash()
+       if _, ok := f.msgSet[blockHash]; !ok {
+               f.msgSet[blockHash] = msg
+               f.queue.Push(msg, -float32(msg.block.Height))
+               logrus.WithFields(logrus.Fields{
+                       "module":       logModule,
+                       "block height": msg.block.Height,
+                       "block hash":   blockHash.String(),
+               }).Debug("blockFetcher receive propose block")
+       }
+}
+
+func (f *blockFetcher) insert(msg *blockMsg) {
+       isOrphan, err := f.chain.ProcessBlock(msg.block)
+       if err != nil {
+               peer := f.peers.GetPeer(msg.peerID)
+               if peer == nil {
+                       return
+               }
+
+               f.peers.ProcessIllegal(msg.peerID, security.LevelMsgIllegal, err.Error())
+               return
+       }
+
+       if isOrphan {
+               return
+       }
+
+       proposeMsg, err := NewBlockProposeMsg(msg.block)
+       if err != nil {
+               logrus.WithFields(logrus.Fields{"module": logModule, "err": err}).Error("failed on create BlockProposeMsg")
+               return
+       }
+
+       if err := f.peers.BroadcastMsg(NewBroadcastMsg(proposeMsg, consensusChannel)); err != nil {
+               logrus.WithFields(logrus.Fields{"module": logModule, "err": err}).Error("failed on broadcast proposed block")
+               return
+       }
+}
+
+func (f *blockFetcher) processNewBlock(msg *blockMsg) {
+       f.newBlockCh <- msg
+}