6 log "github.com/sirupsen/logrus"
7 "gopkg.in/karalabe/cookiejar.v2/collections/prque"
10 core "github.com/bytom/protocol"
11 "github.com/bytom/protocol/bc"
12 "github.com/bytom/protocol/bc/types"
16 maxQueueDist = 1024 //32 // Maximum allowed distance from the chain head to queue
20 errTerminated = errors.New("terminated")
23 // Fetcher is responsible for accumulating block announcements from various peers
24 // and scheduling them for retrieval.
30 // Various event channels
31 newMinedBlock chan *blockPending
35 queue *prque.Prque // Queue containing the import operations (block number sorted)
36 queues map[string]int // Per peer block counts to prevent memory exhaustion
37 queued map[bc.Hash]*blockPending // Set of already queued blocks (to dedup imports)
40 //NewFetcher New creates a block fetcher to retrieve blocks of the new mined.
41 func NewFetcher(chain *core.Chain, sw *p2p.Switch, peers *peerSet) *Fetcher {
46 newMinedBlock: make(chan *blockPending),
47 quit: make(chan struct{}),
49 queues: make(map[string]int),
50 queued: make(map[bc.Hash]*blockPending),
54 // Start boots up the announcement based synchroniser, accepting and processing
55 // hash notifications and block fetches until termination requested.
56 func (f *Fetcher) Start() {
60 // Stop terminates the announcement based synchroniser, canceling all pending
62 func (f *Fetcher) Stop() {
66 // Enqueue tries to fill gaps the the fetcher's future import queue.
67 func (f *Fetcher) Enqueue(peer string, block *types.Block) error {
73 case f.newMinedBlock <- op:
80 // Loop is the main fetcher loop, checking and processing various notification
82 func (f *Fetcher) loop() {
84 // Import any queued blocks that could potentially fit
85 height := f.chain.BestBlockHeight()
86 for !f.queue.Empty() {
87 op := f.queue.PopItem().(*blockPending)
88 // If too high up the chain or phase, continue later
89 number := op.block.Height
90 if number > height+1 {
91 f.queue.Push(op, -float32(op.block.Height))
94 // Otherwise if fresh and still unknown, try and import
95 hash := op.block.Hash()
96 block, _ := f.chain.GetBlockByHash(&hash)
101 if op.block.PreviousBlockHash.String() != f.chain.BestBlockHash().String() {
105 f.insert(op.peerID, op.block)
107 // Wait for an outside event to occur
110 // Fetcher terminating, abort all operations
113 case op := <-f.newMinedBlock:
114 // A direct block insertion was requested, try and fill any pending gaps
115 f.enqueue(op.peerID, op.block)
120 // enqueue schedules a new future import operation, if the block to be imported
121 // has not yet been seen.
122 func (f *Fetcher) enqueue(peer string, block *types.Block) {
125 //TODO: Ensure the peer isn't DOSing us
126 // Discard any past or too distant blocks
127 if dist := int64(block.Height) - int64(f.chain.BestBlockHeight()); dist < 0 || dist > maxQueueDist {
128 log.Info("Discarded propagated block, too far away", " peer: ", peer, "number: ", block.Height, "distance: ", dist)
131 // Schedule the block for future importing
132 if _, ok := f.queued[hash]; !ok {
138 f.queue.Push(op, -float32(block.Height))
139 log.Info("Queued receive mine block.", " peer:", peer, " number:", block.Height, " queued:", f.queue.Size())
143 // insert spawns a new goroutine to run a block insertion into the chain. If the
144 // block's number is at the same height as the current import phase, it updates
145 // the phase states accordingly.
146 func (f *Fetcher) insert(peerID string, block *types.Block) {
147 // Run the import on a new thread
148 log.Info("Importing propagated block", " from peer: ", peerID, " height: ", block.Height)
149 // Run the actual import and log any issues
150 if _, err := f.chain.ProcessBlock(block); err != nil {
151 log.Info("Propagated block import failed", " from peer: ", peerID, " height: ", block.Height, "err: ", err)
152 fPeer, ok := f.peers.Peer(peerID)
156 swPeer := fPeer.getPeer()
157 if ban := fPeer.addBanScore(20, 0, "block process error"); ban {
158 f.sw.AddBannedPeer(swPeer)
159 f.sw.StopPeerGracefully(swPeer)
163 // If import succeeded, broadcast the block
164 log.Info("success process a block from new mined blocks cache. block height: ", block.Height)
165 peers, err := f.peers.BroadcastMinedBlock(block)
167 log.Errorf("Broadcast mine block error. %v", err)
170 for _, fPeer := range peers {
174 swPeer := fPeer.getPeer()
175 if ban := fPeer.addBanScore(0, 50, "Broadcast block error"); ban {
176 f.sw.AddBannedPeer(swPeer)
177 f.sw.StopPeerGracefully(swPeer)
182 // forgetBlock removes all traces of a queued block from the fetcher's internal
184 func (f *Fetcher) forgetBlock(hash bc.Hash) {
185 if insert := f.queued[hash]; insert != nil {
186 f.queues[insert.peerID]--
187 if f.queues[insert.peerID] == 0 {
188 delete(f.queues, insert.peerID)
190 delete(f.queued, hash)