OSDN Git Service

fix SyncManager Peer and Switch peer not match (#560)
[bytom/bytom.git] / netsync / peer.go
1 package netsync
2
3 import (
4         "sync"
5
6         log "github.com/sirupsen/logrus"
7         "gopkg.in/fatih/set.v0"
8
9         "github.com/bytom/errors"
10         "github.com/bytom/p2p"
11         "github.com/bytom/protocol/bc"
12         "github.com/bytom/protocol/bc/types"
13 )
14
15 var (
16         errClosed            = errors.New("peer set is closed")
17         errAlreadyRegistered = errors.New("peer is already registered")
18         errNotRegistered     = errors.New("peer is not registered")
19 )
20
21 const defaultVersion = 1
22
23 type peer struct {
24         mtx     sync.RWMutex
25         version int // Protocol version negotiated
26         id      string
27         height  uint64
28         hash    *bc.Hash
29         *p2p.Peer
30
31         knownTxs    *set.Set // Set of transaction hashes known to be known by this peer
32         knownBlocks *set.Set // Set of block hashes known to be known by this peer
33 }
34
35 func newPeer(height uint64, hash *bc.Hash, Peer *p2p.Peer) *peer {
36         return &peer{
37                 version:     defaultVersion,
38                 height:      height,
39                 hash:        hash,
40                 Peer:        Peer,
41                 knownTxs:    set.New(),
42                 knownBlocks: set.New(),
43         }
44 }
45
46 func (p *peer) GetStatus() (height uint64, hash *bc.Hash) {
47         p.mtx.RLock()
48         defer p.mtx.RUnlock()
49         return p.height, p.hash
50 }
51
52 func (p *peer) SetStatus(height uint64, hash *bc.Hash) {
53         p.mtx.Lock()
54         defer p.mtx.Unlock()
55
56         p.height = height
57         p.hash = hash
58 }
59
60 func (p *peer) requestBlockByHash(hash *bc.Hash) error {
61         msg := &BlockRequestMessage{RawHash: hash.Byte32()}
62         p.Peer.TrySend(BlockchainChannel, struct{ BlockchainMessage }{msg})
63         return nil
64 }
65
66 func (p *peer) requestBlockByHeight(height uint64) error {
67         msg := &BlockRequestMessage{Height: height}
68         p.Peer.TrySend(BlockchainChannel, struct{ BlockchainMessage }{msg})
69         return nil
70 }
71
72 func (p *peer) SendTransactions(txs []*types.Tx) error {
73         for _, tx := range txs {
74                 msg, err := NewTransactionNotifyMessage(tx)
75                 if err != nil {
76                         return errors.New("Failed construction tx msg")
77                 }
78                 hash := &tx.ID
79                 p.knownTxs.Add(hash.String())
80                 p.Peer.TrySend(BlockchainChannel, struct{ BlockchainMessage }{msg})
81         }
82         return nil
83 }
84
85 func (p *peer) getPeer() *p2p.Peer {
86         p.mtx.RLock()
87         defer p.mtx.RUnlock()
88
89         return p.Peer
90 }
91
92 // MarkTransaction marks a transaction as known for the peer, ensuring that it
93 // will never be propagated to this particular peer.
94 func (p *peer) MarkTransaction(hash *bc.Hash) {
95         p.mtx.Lock()
96         defer p.mtx.Unlock()
97
98         // If we reached the memory allowance, drop a previously known transaction hash
99         for p.knownTxs.Size() >= maxKnownTxs {
100                 p.knownTxs.Pop()
101         }
102         p.knownTxs.Add(hash.String())
103 }
104
105 // MarkBlock marks a block as known for the peer, ensuring that the block will
106 // never be propagated to this particular peer.
107 func (p *peer) MarkBlock(hash *bc.Hash) {
108         p.mtx.Lock()
109         defer p.mtx.Unlock()
110
111         // If we reached the memory allowance, drop a previously known block hash
112         for p.knownBlocks.Size() >= maxKnownBlocks {
113                 p.knownBlocks.Pop()
114         }
115         p.knownBlocks.Add(hash.String())
116 }
117
118 type peerSet struct {
119         peers  map[string]*peer
120         lock   sync.RWMutex
121         closed bool
122 }
123
124 // newPeerSet creates a new peer set to track the active participants.
125 func newPeerSet() *peerSet {
126         return &peerSet{
127                 peers: make(map[string]*peer),
128         }
129 }
130
131 // Register injects a new peer into the working set, or returns an error if the
132 // peer is already known.
133 func (ps *peerSet) Register(p *peer) error {
134         ps.lock.Lock()
135         defer ps.lock.Unlock()
136
137         if ps.closed {
138                 return errClosed
139         }
140         if _, ok := ps.peers[p.id]; ok {
141                 return errAlreadyRegistered
142         }
143         ps.peers[p.id] = p
144         return nil
145 }
146
147 // Unregister removes a remote peer from the active set, disabling any further
148 // actions to/from that particular entity.
149 func (ps *peerSet) Unregister(id string) error {
150         ps.lock.Lock()
151         defer ps.lock.Unlock()
152
153         if _, ok := ps.peers[id]; !ok {
154                 return errNotRegistered
155         }
156         delete(ps.peers, id)
157         return nil
158 }
159
160 func (ps *peerSet) DropPeer(id string) error {
161         ps.lock.Lock()
162         defer ps.lock.Unlock()
163
164         peer, ok := ps.peers[id]
165         if !ok {
166                 return errNotRegistered
167         }
168         peer.CloseConn()
169         return nil
170 }
171
172 // Peer retrieves the registered peer with the given id.
173 func (ps *peerSet) Peer(id string) *peer {
174         ps.lock.RLock()
175         defer ps.lock.RUnlock()
176
177         return ps.peers[id]
178 }
179
180 // Len returns if the current number of peers in the set.
181 func (ps *peerSet) Len() int {
182         ps.lock.RLock()
183         defer ps.lock.RUnlock()
184
185         return len(ps.peers)
186 }
187
188 // MarkTransaction marks a transaction as known for the peer, ensuring that it
189 // will never be propagated to this particular peer.
190 func (ps *peerSet) MarkTransaction(peerID string, hash *bc.Hash) {
191         ps.lock.RLock()
192         defer ps.lock.RUnlock()
193
194         if peer, ok := ps.peers[peerID]; ok {
195                 peer.MarkTransaction(hash)
196         }
197 }
198
199 // MarkBlock marks a block as known for the peer, ensuring that the block will
200 // never be propagated to this particular peer.
201 func (ps *peerSet) MarkBlock(peerID string, hash *bc.Hash) {
202         ps.lock.RLock()
203         defer ps.lock.RUnlock()
204
205         if peer, ok := ps.peers[peerID]; ok {
206                 peer.MarkBlock(hash)
207         }
208 }
209
210 // PeersWithoutBlock retrieves a list of peers that do not have a given block in
211 // their set of known hashes.
212 func (ps *peerSet) PeersWithoutBlock(hash *bc.Hash) []*peer {
213         ps.lock.RLock()
214         defer ps.lock.RUnlock()
215
216         list := make([]*peer, 0, len(ps.peers))
217         for _, p := range ps.peers {
218                 if !p.knownBlocks.Has(hash.String()) {
219                         list = append(list, p)
220                 }
221         }
222         return list
223 }
224
225 // PeersWithoutTx retrieves a list of peers that do not have a given transaction
226 // in their set of known hashes.
227 func (ps *peerSet) PeersWithoutTx(hash *bc.Hash) []*peer {
228         ps.lock.RLock()
229         defer ps.lock.RUnlock()
230
231         list := make([]*peer, 0, len(ps.peers))
232         for _, p := range ps.peers {
233                 if !p.knownTxs.Has(hash.String()) {
234                         list = append(list, p)
235                 }
236         }
237         return list
238 }
239
240 // BestPeer retrieves the known peer with the currently highest total difficulty.
241 func (ps *peerSet) BestPeer() (*p2p.Peer, uint64) {
242         ps.lock.RLock()
243         defer ps.lock.RUnlock()
244
245         var bestPeer *p2p.Peer
246         var bestHeight uint64
247
248         for _, p := range ps.peers {
249                 if bestPeer == nil || p.height > bestHeight {
250                         bestPeer, bestHeight = p.Peer, p.height
251                 }
252         }
253
254         return bestPeer, bestHeight
255 }
256
257 // Close disconnects all peers.
258 // No new peers can be registered after Close has returned.
259 func (ps *peerSet) Close() {
260         ps.lock.Lock()
261         defer ps.lock.Unlock()
262
263         for _, p := range ps.peers {
264                 p.CloseConn()
265         }
266         ps.closed = true
267 }
268
269 func (ps *peerSet) AddPeer(peer *p2p.Peer) {
270         ps.lock.Lock()
271         defer ps.lock.Unlock()
272
273         if _, ok := ps.peers[peer.Key]; !ok {
274                 keeperPeer := newPeer(0, nil, peer)
275                 ps.peers[peer.Key] = keeperPeer
276                 log.WithFields(log.Fields{"ID": peer.Key}).Info("Add new peer to blockKeeper")
277                 return
278         }
279         log.WithField("ID", peer.Key).Warning("Add existing peer to blockKeeper")
280 }
281
282 func (ps *peerSet) RemovePeer(peerID string) {
283         ps.lock.Lock()
284         defer ps.lock.Unlock()
285
286         delete(ps.peers, peerID)
287         log.WithField("ID", peerID).Info("Delete peer from peerset")
288 }
289
290 func (ps *peerSet) SetPeerStatus(peerID string, height uint64, hash *bc.Hash) {
291         ps.lock.Lock()
292         defer ps.lock.Unlock()
293
294         if peer, ok := ps.peers[peerID]; ok {
295                 peer.SetStatus(height, hash)
296         }
297 }
298
299 func (ps *peerSet) requestBlockByHash(peerID string, hash *bc.Hash) error {
300         ps.lock.Lock()
301         defer ps.lock.Unlock()
302
303         peer, ok := ps.peers[peerID]
304         if !ok {
305                 return errors.New("Can't find peer. ")
306         }
307         return peer.requestBlockByHash(hash)
308 }
309
310 func (ps *peerSet) requestBlockByHeight(peerID string, height uint64) error {
311         ps.lock.Lock()
312         defer ps.lock.Unlock()
313
314         peer, ok := ps.peers[peerID]
315         if !ok {
316                 return errors.New("Can't find peer. ")
317         }
318         return peer.requestBlockByHeight(height)
319 }
320
321 func (ps *peerSet) BroadcastMinedBlock(block *types.Block) error {
322         msg, err := NewMinedBlockMessage(block)
323         if err != nil {
324                 return errors.New("Failed construction block msg")
325         }
326         hash := block.Hash()
327         peers := ps.PeersWithoutBlock(&hash)
328         for _, peer := range peers {
329                 ps.MarkBlock(peer.Key, &hash)
330                 peer.Send(BlockchainChannel, struct{ BlockchainMessage }{msg})
331         }
332         return nil
333 }
334
335 func (ps *peerSet) BroadcastTx(tx *types.Tx) error {
336         msg, err := NewTransactionNotifyMessage(tx)
337         if err != nil {
338                 return errors.New("Failed construction tx msg")
339         }
340         peers := ps.PeersWithoutTx(&tx.ID)
341         for _, peer := range peers {
342                 ps.peers[peer.Key].MarkTransaction(&tx.ID)
343                 peer.Send(BlockchainChannel, struct{ BlockchainMessage }{msg})
344         }
345         return nil
346 }