+++ /dev/null
-package netsync
-
-import (
- log "github.com/sirupsen/logrus"
- "gopkg.in/karalabe/cookiejar.v2/collections/prque"
-
- "github.com/bytom/protocol/bc"
-)
-
-const (
- maxBlockDistance = 64
- maxMsgSetSize = 128
- newBlockChSize = 64
-)
-
-// blockFetcher is responsible for accumulating block announcements from various peers
-// and scheduling them for retrieval.
-type blockFetcher struct {
- chain Chain
- peers *peerSet
-
- newBlockCh chan *blockMsg
- queue *prque.Prque
- msgSet map[bc.Hash]*blockMsg
-}
-
-//NewBlockFetcher creates a block fetcher to retrieve blocks of the new mined.
-func newBlockFetcher(chain Chain, peers *peerSet) *blockFetcher {
- f := &blockFetcher{
- chain: chain,
- peers: peers,
- newBlockCh: make(chan *blockMsg, newBlockChSize),
- queue: prque.New(),
- msgSet: make(map[bc.Hash]*blockMsg),
- }
- go f.blockProcessor()
- return f
-}
-
-func (f *blockFetcher) blockProcessor() {
- for {
- height := f.chain.BestBlockHeight()
- for !f.queue.Empty() {
- msg := f.queue.PopItem().(*blockMsg)
- if msg.block.Height > height+1 {
- f.queue.Push(msg, -float32(msg.block.Height))
- break
- }
-
- f.insert(msg)
- delete(f.msgSet, msg.block.Hash())
- }
- f.add(<-f.newBlockCh)
- }
-}
-
-func (f *blockFetcher) add(msg *blockMsg) {
- bestHeight := f.chain.BestBlockHeight()
- if len(f.msgSet) > maxMsgSetSize || bestHeight > msg.block.Height || msg.block.Height-bestHeight > maxBlockDistance {
- return
- }
-
- blockHash := msg.block.Hash()
- if _, ok := f.msgSet[blockHash]; !ok {
- f.msgSet[blockHash] = msg
- f.queue.Push(msg, -float32(msg.block.Height))
- log.WithField("block height", msg.block.Height).Debug("fetcher receive mine block")
- }
-}
-
-func (f *blockFetcher) insert(msg *blockMsg) {
- if _, err := f.chain.ProcessBlock(msg.block); err != nil {
- peer := f.peers.getPeer(msg.peerID)
- if peer == nil {
- return
- }
-
- f.peers.addBanScore(msg.peerID, 20, 0, err.Error())
- return
- }
-
- if err := f.peers.broadcastMinedBlock(msg.block); err != nil {
- log.WithField("err", err).Error("fail on fetcher broadcast new block")
- return
- }
-}
-
-func (f *blockFetcher) processNewBlock(msg *blockMsg) {
- f.newBlockCh <- msg
-}
privKey crypto.PrivKeyEd25519 // local node's p2p key
chain Chain
txPool *core.TxPool
- blockFetcher *blockFetcher
blockKeeper *blockKeeper
peers *peerSet
txPool: txPool,
chain: chain,
privKey: crypto.GenPrivKeyEd25519(),
- blockFetcher: newBlockFetcher(chain, peers),
blockKeeper: newBlockKeeper(chain, peers),
peers: peers,
newTxCh: make(chan *types.Tx, maxTxChanSize),
sm.blockKeeper.processHeaders(peer.ID(), headers)
}
-func (sm *SyncManager) handleMineBlockMsg(peer *peer, msg *MineBlockMessage) {
- block, err := msg.GetMineBlock()
- if err != nil {
- log.WithField("err", err).Warning("fail on handleMineBlockMsg GetMineBlock")
- return
- }
-
- hash := block.Hash()
- peer.markBlock(&hash)
- sm.blockFetcher.processNewBlock(&blockMsg{peerID: peer.ID(), block: block})
- peer.setStatus(block.Height, &hash)
-}
-
func (sm *SyncManager) handleStatusRequestMsg(peer BasePeer) {
bestHeader := sm.chain.BestBlockHeader()
genesisBlock, err := sm.chain.GetBlockByHeight(0)