OSDN Git Service

Merge pull request #935 from Bytom/dev
[bytom/bytom.git] / netsync / peer.go
1 package netsync
2
3 import (
4         "sync"
5
6         log "github.com/sirupsen/logrus"
7         "gopkg.in/fatih/set.v0"
8
9         "github.com/bytom/errors"
10         "github.com/bytom/p2p"
11         "github.com/bytom/p2p/trust"
12         "github.com/bytom/protocol/bc"
13         "github.com/bytom/protocol/bc/types"
14 )
15
16 var (
17         errClosed            = errors.New("peer set is closed")
18         errAlreadyRegistered = errors.New("peer is already registered")
19         errNotRegistered     = errors.New("peer is not registered")
20 )
21
22 const (
23         defaultVersion      = 1
24         defaultBanThreshold = uint64(100)
25 )
26
27 type peer struct {
28         mtx      sync.RWMutex
29         version  int // Protocol version negotiated
30         id       string
31         height   uint64
32         hash     *bc.Hash
33         banScore trust.DynamicBanScore
34
35         swPeer *p2p.Peer
36
37         knownTxs    *set.Set // Set of transaction hashes known to be known by this peer
38         knownBlocks *set.Set // Set of block hashes known to be known by this peer
39 }
40
41 func newPeer(height uint64, hash *bc.Hash, Peer *p2p.Peer) *peer {
42         return &peer{
43                 version:     defaultVersion,
44                 id:          Peer.Key,
45                 height:      height,
46                 hash:        hash,
47                 swPeer:      Peer,
48                 knownTxs:    set.New(),
49                 knownBlocks: set.New(),
50         }
51 }
52
53 func (p *peer) GetStatus() (height uint64, hash *bc.Hash) {
54         p.mtx.RLock()
55         defer p.mtx.RUnlock()
56         return p.height, p.hash
57 }
58
59 func (p *peer) SetStatus(height uint64, hash *bc.Hash) {
60         p.mtx.Lock()
61         defer p.mtx.Unlock()
62
63         p.height = height
64         p.hash = hash
65 }
66
67 func (p *peer) requestBlockByHash(hash *bc.Hash) error {
68         msg := &BlockRequestMessage{RawHash: hash.Byte32()}
69         p.swPeer.TrySend(BlockchainChannel, struct{ BlockchainMessage }{msg})
70         return nil
71 }
72
73 func (p *peer) requestBlockByHeight(height uint64) error {
74         msg := &BlockRequestMessage{Height: height}
75         p.swPeer.TrySend(BlockchainChannel, struct{ BlockchainMessage }{msg})
76         return nil
77 }
78
79 func (p *peer) SendTransactions(txs []*types.Tx) error {
80         for _, tx := range txs {
81                 msg, err := NewTransactionNotifyMessage(tx)
82                 if err != nil {
83                         return errors.New("Failed construction tx msg")
84                 }
85                 hash := &tx.ID
86                 p.knownTxs.Add(hash.String())
87                 if p.swPeer == nil {
88                         return errPeerDropped
89                 }
90                 p.swPeer.TrySend(BlockchainChannel, struct{ BlockchainMessage }{msg})
91         }
92         return nil
93 }
94
95 func (p *peer) getPeer() *p2p.Peer {
96         p.mtx.RLock()
97         defer p.mtx.RUnlock()
98
99         return p.swPeer
100 }
101
102 // MarkTransaction marks a transaction as known for the peer, ensuring that it
103 // will never be propagated to this particular peer.
104 func (p *peer) MarkTransaction(hash *bc.Hash) {
105         p.mtx.Lock()
106         defer p.mtx.Unlock()
107
108         // If we reached the memory allowance, drop a previously known transaction hash
109         for p.knownTxs.Size() >= maxKnownTxs {
110                 p.knownTxs.Pop()
111         }
112         p.knownTxs.Add(hash.String())
113 }
114
115 // MarkBlock marks a block as known for the peer, ensuring that the block will
116 // never be propagated to this particular peer.
117 func (p *peer) MarkBlock(hash *bc.Hash) {
118         p.mtx.Lock()
119         defer p.mtx.Unlock()
120
121         // If we reached the memory allowance, drop a previously known block hash
122         for p.knownBlocks.Size() >= maxKnownBlocks {
123                 p.knownBlocks.Pop()
124         }
125         p.knownBlocks.Add(hash.String())
126 }
127
128 // addBanScore increases the persistent and decaying ban score fields by the
129 // values passed as parameters. If the resulting score exceeds half of the ban
130 // threshold, a warning is logged including the reason provided. Further, if
131 // the score is above the ban threshold, the peer will be banned and
132 // disconnected.
133 func (p *peer) addBanScore(persistent, transient uint64, reason string) bool {
134         warnThreshold := defaultBanThreshold >> 1
135         if transient == 0 && persistent == 0 {
136                 // The score is not being increased, but a warning message is still
137                 // logged if the score is above the warn threshold.
138                 score := p.banScore.Int()
139                 if score > warnThreshold {
140                         log.Infof("Misbehaving peer %s: %s -- ban score is %d, "+"it was not increased this time", p.id, reason, score)
141                 }
142                 return false
143         }
144         score := p.banScore.Increase(persistent, transient)
145         if score > warnThreshold {
146                 log.Infof("Misbehaving peer %s: %s -- ban score increased to %d", p.id, reason, score)
147                 if score > defaultBanThreshold {
148                         log.Errorf("Misbehaving peer %s -- banning and disconnecting", p.id)
149                         return true
150                 }
151         }
152         return false
153 }
154
155 type peerSet struct {
156         peers  map[string]*peer
157         lock   sync.RWMutex
158         closed bool
159 }
160
161 // newPeerSet creates a new peer set to track the active participants.
162 func newPeerSet() *peerSet {
163         return &peerSet{
164                 peers: make(map[string]*peer),
165         }
166 }
167
168 // Register injects a new peer into the working set, or returns an error if the
169 // peer is already known.
170 func (ps *peerSet) Register(p *peer) error {
171         ps.lock.Lock()
172         defer ps.lock.Unlock()
173
174         if ps.closed {
175                 return errClosed
176         }
177         if _, ok := ps.peers[p.id]; ok {
178                 return errAlreadyRegistered
179         }
180         ps.peers[p.id] = p
181         return nil
182 }
183
184 // Unregister removes a remote peer from the active set, disabling any further
185 // actions to/from that particular entity.
186 func (ps *peerSet) Unregister(id string) error {
187         ps.lock.Lock()
188         defer ps.lock.Unlock()
189
190         if _, ok := ps.peers[id]; !ok {
191                 return errNotRegistered
192         }
193         delete(ps.peers, id)
194         return nil
195 }
196
197 // Peer retrieves the registered peer with the given id.
198 func (ps *peerSet) Peer(id string) (*peer, bool) {
199         ps.lock.RLock()
200         defer ps.lock.RUnlock()
201         p, ok := ps.peers[id]
202         return p, ok
203 }
204
205 // Len returns if the current number of peers in the set.
206 func (ps *peerSet) Len() int {
207         ps.lock.RLock()
208         defer ps.lock.RUnlock()
209
210         return len(ps.peers)
211 }
212
213 // MarkTransaction marks a transaction as known for the peer, ensuring that it
214 // will never be propagated to this particular peer.
215 func (ps *peerSet) MarkTransaction(peerID string, hash *bc.Hash) {
216         ps.lock.RLock()
217         defer ps.lock.RUnlock()
218
219         if peer, ok := ps.peers[peerID]; ok {
220                 peer.MarkTransaction(hash)
221         }
222 }
223
224 // MarkBlock marks a block as known for the peer, ensuring that the block will
225 // never be propagated to this particular peer.
226 func (ps *peerSet) MarkBlock(peerID string, hash *bc.Hash) {
227         ps.lock.RLock()
228         defer ps.lock.RUnlock()
229
230         if peer, ok := ps.peers[peerID]; ok {
231                 peer.MarkBlock(hash)
232         }
233 }
234
235 // PeersWithoutBlock retrieves a list of peers that do not have a given block in
236 // their set of known hashes.
237 func (ps *peerSet) PeersWithoutBlock(hash *bc.Hash) []*peer {
238         ps.lock.RLock()
239         defer ps.lock.RUnlock()
240
241         list := make([]*peer, 0, len(ps.peers))
242         for _, p := range ps.peers {
243                 if !p.knownBlocks.Has(hash.String()) {
244                         list = append(list, p)
245                 }
246         }
247         return list
248 }
249
250 // PeersWithoutTx retrieves a list of peers that do not have a given transaction
251 // in their set of known hashes.
252 func (ps *peerSet) PeersWithoutTx(hash *bc.Hash) []*peer {
253         ps.lock.RLock()
254         defer ps.lock.RUnlock()
255
256         list := make([]*peer, 0, len(ps.peers))
257         for _, p := range ps.peers {
258                 if !p.knownTxs.Has(hash.String()) {
259                         list = append(list, p)
260                 }
261         }
262         return list
263 }
264
265 // BestPeer retrieves the known peer with the currently highest total difficulty.
266 func (ps *peerSet) BestPeer() (*p2p.Peer, uint64) {
267         ps.lock.RLock()
268         defer ps.lock.RUnlock()
269
270         var bestPeer *p2p.Peer
271         var bestHeight uint64
272
273         for _, p := range ps.peers {
274                 if bestPeer == nil || p.height > bestHeight {
275                         bestPeer, bestHeight = p.swPeer, p.height
276                 }
277         }
278
279         return bestPeer, bestHeight
280 }
281
282 // Close disconnects all peers.
283 // No new peers can be registered after Close has returned.
284 func (ps *peerSet) Close() {
285         ps.lock.Lock()
286         defer ps.lock.Unlock()
287
288         for _, p := range ps.peers {
289                 p.swPeer.CloseConn()
290         }
291         ps.closed = true
292 }
293
294 func (ps *peerSet) AddPeer(peer *p2p.Peer) {
295         ps.lock.Lock()
296         defer ps.lock.Unlock()
297
298         if _, ok := ps.peers[peer.Key]; !ok {
299                 keeperPeer := newPeer(0, nil, peer)
300                 ps.peers[peer.Key] = keeperPeer
301                 log.WithFields(log.Fields{"ID": peer.Key}).Info("Add new peer to blockKeeper")
302                 return
303         }
304         log.WithField("ID", peer.Key).Warning("Add existing peer to blockKeeper")
305 }
306
307 func (ps *peerSet) RemovePeer(peerID string) {
308         ps.lock.Lock()
309         defer ps.lock.Unlock()
310
311         delete(ps.peers, peerID)
312         log.WithField("ID", peerID).Info("Delete peer from peerset")
313 }
314
315 func (ps *peerSet) SetPeerStatus(peerID string, height uint64, hash *bc.Hash) {
316         ps.lock.Lock()
317         defer ps.lock.Unlock()
318
319         if peer, ok := ps.peers[peerID]; ok {
320                 peer.SetStatus(height, hash)
321         }
322 }
323
324 func (ps *peerSet) requestBlockByHash(peerID string, hash *bc.Hash) error {
325         peer, ok := ps.Peer(peerID)
326         if !ok {
327                 return errors.New("Can't find peer. ")
328         }
329         return peer.requestBlockByHash(hash)
330 }
331
332 func (ps *peerSet) requestBlockByHeight(peerID string, height uint64) error {
333         peer, ok := ps.Peer(peerID)
334         if !ok {
335                 return errors.New("Can't find peer. ")
336         }
337         return peer.requestBlockByHeight(height)
338 }
339
340 func (ps *peerSet) BroadcastMinedBlock(block *types.Block) ([]*peer, error) {
341         msg, err := NewMinedBlockMessage(block)
342         if err != nil {
343                 return nil, errors.New("Failed construction block msg")
344         }
345         hash := block.Hash()
346         peers := ps.PeersWithoutBlock(&hash)
347         abnormalPeers := make([]*peer, 0)
348         for _, peer := range peers {
349                 if ok := peer.swPeer.TrySend(BlockchainChannel, struct{ BlockchainMessage }{msg}); !ok {
350                         abnormalPeers = append(abnormalPeers, peer)
351                         continue
352                 }
353                 if p, ok := ps.Peer(peer.id); ok {
354                         p.MarkBlock(&hash)
355                 }
356         }
357         return abnormalPeers, nil
358 }
359
360 func (ps *peerSet) BroadcastNewStatus(block *types.Block) ([]*peer, error) {
361         return ps.BroadcastMinedBlock(block)
362 }
363
364 func (ps *peerSet) BroadcastTx(tx *types.Tx) ([]*peer, error) {
365         msg, err := NewTransactionNotifyMessage(tx)
366         if err != nil {
367                 return nil, errors.New("Failed construction tx msg")
368         }
369         peers := ps.PeersWithoutTx(&tx.ID)
370         abnormalPeers := make([]*peer, 0)
371         for _, peer := range peers {
372                 if ok := peer.swPeer.TrySend(BlockchainChannel, struct{ BlockchainMessage }{msg}); !ok {
373                         abnormalPeers = append(abnormalPeers, peer)
374                         continue
375                 }
376                 if p, ok := ps.Peer(peer.id); ok {
377                         p.MarkTransaction(&tx.ID)
378                 }
379         }
380         return abnormalPeers, nil
381 }