OSDN Git Service

Merge pull request #771 from Bytom/fix-lru-cache-get
[bytom/bytom.git] / netsync / fetcher.go
1 package netsync
2
3 import (
4         "errors"
5
6         log "github.com/sirupsen/logrus"
7         "gopkg.in/karalabe/cookiejar.v2/collections/prque"
8
9         "github.com/bytom/p2p"
10         core "github.com/bytom/protocol"
11         "github.com/bytom/protocol/bc"
12         "github.com/bytom/protocol/bc/types"
13 )
14
15 const (
16         maxQueueDist = 1024 //32 // Maximum allowed distance from the chain head to queue
17 )
18
19 var (
20         errTerminated = errors.New("terminated")
21 )
22
23 // Fetcher is responsible for accumulating block announcements from various peers
24 // and scheduling them for retrieval.
25 type Fetcher struct {
26         chain *core.Chain
27         sw    *p2p.Switch
28         peers *peerSet
29
30         // Various event channels
31         newMinedBlock chan *blockPending
32         quit          chan struct{}
33
34         // Block cache
35         queue  *prque.Prque              // Queue containing the import operations (block number sorted)
36         queues map[string]int            // Per peer block counts to prevent memory exhaustion
37         queued map[bc.Hash]*blockPending // Set of already queued blocks (to dedup imports)
38 }
39
40 //NewFetcher New creates a block fetcher to retrieve blocks of the new mined.
41 func NewFetcher(chain *core.Chain, sw *p2p.Switch, peers *peerSet) *Fetcher {
42         return &Fetcher{
43                 chain:         chain,
44                 sw:            sw,
45                 peers:         peers,
46                 newMinedBlock: make(chan *blockPending),
47                 quit:          make(chan struct{}),
48                 queue:         prque.New(),
49                 queues:        make(map[string]int),
50                 queued:        make(map[bc.Hash]*blockPending),
51         }
52 }
53
54 // Start boots up the announcement based synchroniser, accepting and processing
55 // hash notifications and block fetches until termination requested.
56 func (f *Fetcher) Start() {
57         go f.loop()
58 }
59
60 // Stop terminates the announcement based synchroniser, canceling all pending
61 // operations.
62 func (f *Fetcher) Stop() {
63         close(f.quit)
64 }
65
66 // Enqueue tries to fill gaps the the fetcher's future import queue.
67 func (f *Fetcher) Enqueue(peer string, block *types.Block) error {
68         op := &blockPending{
69                 peerID: peer,
70                 block:  block,
71         }
72         select {
73         case f.newMinedBlock <- op:
74                 return nil
75         case <-f.quit:
76                 return errTerminated
77         }
78 }
79
80 // Loop is the main fetcher loop, checking and processing various notification
81 // events.
82 func (f *Fetcher) loop() {
83         for {
84                 // Import any queued blocks that could potentially fit
85                 height := f.chain.BestBlockHeight()
86                 for !f.queue.Empty() {
87                         op := f.queue.PopItem().(*blockPending)
88                         // If too high up the chain or phase, continue later
89                         number := op.block.Height
90                         if number > height+1 {
91                                 f.queue.Push(op, -float32(op.block.Height))
92                                 break
93                         }
94                         // Otherwise if fresh and still unknown, try and import
95                         hash := op.block.Hash()
96                         block, _ := f.chain.GetBlockByHash(&hash)
97                         if block != nil {
98                                 f.forgetBlock(hash)
99                                 continue
100                         }
101                         if op.block.PreviousBlockHash.String() != f.chain.BestBlockHash().String() {
102                                 f.forgetBlock(hash)
103                                 continue
104                         }
105                         f.insert(op.peerID, op.block)
106                 }
107                 // Wait for an outside event to occur
108                 select {
109                 case <-f.quit:
110                         // Fetcher terminating, abort all operations
111                         return
112
113                 case op := <-f.newMinedBlock:
114                         // A direct block insertion was requested, try and fill any pending gaps
115                         f.enqueue(op.peerID, op.block)
116                 }
117         }
118 }
119
120 // enqueue schedules a new future import operation, if the block to be imported
121 // has not yet been seen.
122 func (f *Fetcher) enqueue(peer string, block *types.Block) {
123         hash := block.Hash()
124
125         //TODO: Ensure the peer isn't DOSing us
126         // Discard any past or too distant blocks
127         if dist := int64(block.Height) - int64(f.chain.BestBlockHeight()); dist < 0 || dist > maxQueueDist {
128                 log.Info("Discarded propagated block, too far away", " peer: ", peer, "number: ", block.Height, "distance: ", dist)
129                 return
130         }
131         // Schedule the block for future importing
132         if _, ok := f.queued[hash]; !ok {
133                 op := &blockPending{
134                         peerID: peer,
135                         block:  block,
136                 }
137                 f.queued[hash] = op
138                 f.queue.Push(op, -float32(block.Height))
139                 log.Info("Queued receive mine block.", " peer:", peer, " number:", block.Height, " queued:", f.queue.Size())
140         }
141 }
142
143 // insert spawns a new goroutine to run a block insertion into the chain. If the
144 // block's number is at the same height as the current import phase, it updates
145 // the phase states accordingly.
146 func (f *Fetcher) insert(peerID string, block *types.Block) {
147         // Run the import on a new thread
148         log.Info("Importing propagated block", " from peer: ", peerID, " height: ", block.Height)
149         // Run the actual import and log any issues
150         if _, err := f.chain.ProcessBlock(block); err != nil {
151                 log.Info("Propagated block import failed", " from peer: ", peerID, " height: ", block.Height, "err: ", err)
152                 peer := f.peers.Peer(peerID)
153                 if ban := peer.addBanScore(50, 0, "block process error"); ban {
154                         f.sw.AddBannedPeer(peer.getPeer())
155                         f.sw.StopPeerGracefully(peer.getPeer())
156                 }
157                 return
158         }
159         // If import succeeded, broadcast the block
160         log.Info("success process a block from new mined blocks cache. block height: ", block.Height)
161         peers, err := f.peers.BroadcastMinedBlock(block)
162         if err != nil {
163                 log.Errorf("Broadcast mine block error. %v", err)
164                 return
165         }
166         for _, peer := range peers {
167                 if ban := peer.addBanScore(0, 50, "Broadcast block error"); ban {
168                         peer := f.peers.Peer(peer.id).getPeer()
169                         f.sw.AddBannedPeer(peer)
170                         f.sw.StopPeerGracefully(peer)
171                 }
172         }
173 }
174
175 // forgetBlock removes all traces of a queued block from the fetcher's internal
176 // state.
177 func (f *Fetcher) forgetBlock(hash bc.Hash) {
178         if insert := f.queued[hash]; insert != nil {
179                 f.queues[insert.peerID]--
180                 if f.queues[insert.peerID] == 0 {
181                         delete(f.queues, insert.peerID)
182                 }
183                 delete(f.queued, hash)
184         }
185 }