OSDN Git Service

Hulk did something
[bytom/vapor.git] / netsync / peer.go
1 package netsync
2
3 import (
4         "encoding/hex"
5         "net"
6         "reflect"
7         "sync"
8
9         log "github.com/sirupsen/logrus"
10         "github.com/tendermint/tmlibs/flowrate"
11         "gopkg.in/fatih/set.v0"
12
13         "github.com/vapor/consensus"
14         "github.com/vapor/errors"
15         "github.com/vapor/p2p/trust"
16         "github.com/vapor/protocol/bc"
17         "github.com/vapor/protocol/bc/types"
18 )
19
20 const (
21         maxKnownTxs         = 32768 // Maximum transactions hashes to keep in the known list (prevent DOS)
22         maxKnownBlocks      = 1024  // Maximum block hashes to keep in the known list (prevent DOS)
23         defaultBanThreshold = uint32(100)
24 )
25
26 //BasePeer is the interface for connection level peer
27 type BasePeer interface {
28         Addr() net.Addr
29         ID() string
30         ServiceFlag() consensus.ServiceFlag
31         TrafficStatus() (*flowrate.Status, *flowrate.Status)
32         TrySend(byte, interface{}) bool
33         IsLAN() bool
34 }
35
36 //BasePeerSet is the intergace for connection level peer manager
37 type BasePeerSet interface {
38         AddBannedPeer(string) error
39         StopPeerGracefully(string)
40 }
41
42 // PeerInfo indicate peer status snap
43 type PeerInfo struct {
44         ID                  string `json:"peer_id"`
45         RemoteAddr          string `json:"remote_addr"`
46         Height              uint64 `json:"height"`
47         Ping                string `json:"ping"`
48         Duration            string `json:"duration"`
49         TotalSent           int64  `json:"total_sent"`
50         TotalReceived       int64  `json:"total_received"`
51         AverageSentRate     int64  `json:"average_sent_rate"`
52         AverageReceivedRate int64  `json:"average_received_rate"`
53         CurrentSentRate     int64  `json:"current_sent_rate"`
54         CurrentReceivedRate int64  `json:"current_received_rate"`
55 }
56
57 type peer struct {
58         BasePeer
59         mtx         sync.RWMutex
60         services    consensus.ServiceFlag
61         height      uint64
62         hash        *bc.Hash
63         banScore    trust.DynamicBanScore
64         knownTxs    *set.Set // Set of transaction hashes known to be known by this peer
65         knownBlocks *set.Set // Set of block hashes known to be known by this peer
66         filterAdds  *set.Set // Set of addresses that the spv node cares about.
67 }
68
69 func newPeer(height uint64, hash *bc.Hash, basePeer BasePeer) *peer {
70         return &peer{
71                 BasePeer:    basePeer,
72                 services:    basePeer.ServiceFlag(),
73                 height:      height,
74                 hash:        hash,
75                 knownTxs:    set.New(),
76                 knownBlocks: set.New(),
77                 filterAdds:  set.New(),
78         }
79 }
80
81 func (p *peer) Height() uint64 {
82         p.mtx.RLock()
83         defer p.mtx.RUnlock()
84         return p.height
85 }
86
87 func (p *peer) addBanScore(persistent, transient uint32, reason string) bool {
88         score := p.banScore.Increase(persistent, transient)
89         if score > defaultBanThreshold {
90                 log.WithFields(log.Fields{
91                         "module":  logModule,
92                         "address": p.Addr(),
93                         "score":   score,
94                         "reason":  reason,
95                 }).Errorf("banning and disconnecting")
96                 return true
97         }
98
99         warnThreshold := defaultBanThreshold >> 1
100         if score > warnThreshold {
101                 log.WithFields(log.Fields{
102                         "module":  logModule,
103                         "address": p.Addr(),
104                         "score":   score,
105                         "reason":  reason,
106                 }).Warning("ban score increasing")
107         }
108         return false
109 }
110
111 func (p *peer) addFilterAddress(address []byte) {
112         p.mtx.Lock()
113         defer p.mtx.Unlock()
114
115         if p.filterAdds.Size() >= maxFilterAddressCount {
116                 log.WithField("module", logModule).Warn("the count of filter addresses is greater than limit")
117                 return
118         }
119         if len(address) > maxFilterAddressSize {
120                 log.WithField("module", logModule).Warn("the size of filter address is greater than limit")
121                 return
122         }
123
124         p.filterAdds.Add(hex.EncodeToString(address))
125 }
126
127 func (p *peer) addFilterAddresses(addresses [][]byte) {
128         if !p.filterAdds.IsEmpty() {
129                 p.filterAdds.Clear()
130         }
131         for _, address := range addresses {
132                 p.addFilterAddress(address)
133         }
134 }
135
136 func (p *peer) getBlockByHeight(height uint64) bool {
137         msg := struct{ BlockchainMessage }{&GetBlockMessage{Height: height}}
138         return p.TrySend(BlockchainChannel, msg)
139 }
140
141 func (p *peer) getBlocks(locator []*bc.Hash, stopHash *bc.Hash) bool {
142         msg := struct{ BlockchainMessage }{NewGetBlocksMessage(locator, stopHash)}
143         return p.TrySend(BlockchainChannel, msg)
144 }
145
146 func (p *peer) getHeaders(locator []*bc.Hash, stopHash *bc.Hash) bool {
147         msg := struct{ BlockchainMessage }{NewGetHeadersMessage(locator, stopHash)}
148         return p.TrySend(BlockchainChannel, msg)
149 }
150
151 func (p *peer) getPeerInfo() *PeerInfo {
152         p.mtx.RLock()
153         defer p.mtx.RUnlock()
154
155         sentStatus, receivedStatus := p.TrafficStatus()
156         ping := sentStatus.Idle - receivedStatus.Idle
157         if receivedStatus.Idle > sentStatus.Idle {
158                 ping = -ping
159         }
160
161         return &PeerInfo{
162                 ID:                  p.ID(),
163                 RemoteAddr:          p.Addr().String(),
164                 Height:              p.height,
165                 Ping:                ping.String(),
166                 Duration:            sentStatus.Duration.String(),
167                 TotalSent:           sentStatus.Bytes,
168                 TotalReceived:       receivedStatus.Bytes,
169                 AverageSentRate:     sentStatus.AvgRate,
170                 AverageReceivedRate: receivedStatus.AvgRate,
171                 CurrentSentRate:     sentStatus.CurRate,
172                 CurrentReceivedRate: receivedStatus.CurRate,
173         }
174 }
175
176 func (p *peer) getRelatedTxAndStatus(txs []*types.Tx, txStatuses *bc.TransactionStatus) ([]*types.Tx, []*bc.TxVerifyResult) {
177         var relatedTxs []*types.Tx
178         var relatedStatuses []*bc.TxVerifyResult
179         for i, tx := range txs {
180                 if p.isRelatedTx(tx) {
181                         relatedTxs = append(relatedTxs, tx)
182                         relatedStatuses = append(relatedStatuses, txStatuses.VerifyStatus[i])
183                 }
184         }
185         return relatedTxs, relatedStatuses
186 }
187
188 func (p *peer) isRelatedTx(tx *types.Tx) bool {
189         for _, input := range tx.Inputs {
190                 switch inp := input.TypedInput.(type) {
191                 case *types.SpendInput:
192                         if p.filterAdds.Has(hex.EncodeToString(inp.ControlProgram)) {
193                                 return true
194                         }
195                 }
196         }
197         for _, output := range tx.Outputs {
198                 if p.filterAdds.Has(hex.EncodeToString(output.ControlProgram)) {
199                         return true
200                 }
201         }
202         return false
203 }
204
205 func (p *peer) isSPVNode() bool {
206         return !p.services.IsEnable(consensus.SFFullNode)
207 }
208
209 func (p *peer) markBlock(hash *bc.Hash) {
210         p.mtx.Lock()
211         defer p.mtx.Unlock()
212
213         for p.knownBlocks.Size() >= maxKnownBlocks {
214                 p.knownBlocks.Pop()
215         }
216         p.knownBlocks.Add(hash.String())
217 }
218
219 func (p *peer) markTransaction(hash *bc.Hash) {
220         p.mtx.Lock()
221         defer p.mtx.Unlock()
222
223         for p.knownTxs.Size() >= maxKnownTxs {
224                 p.knownTxs.Pop()
225         }
226         p.knownTxs.Add(hash.String())
227 }
228
229 func (p *peer) sendBlock(block *types.Block) (bool, error) {
230         msg, err := NewBlockMessage(block)
231         if err != nil {
232                 return false, errors.Wrap(err, "fail on NewBlockMessage")
233         }
234
235         ok := p.TrySend(BlockchainChannel, struct{ BlockchainMessage }{msg})
236         if ok {
237                 blcokHash := block.Hash()
238                 p.knownBlocks.Add(blcokHash.String())
239         }
240         return ok, nil
241 }
242
243 func (p *peer) sendBlocks(blocks []*types.Block) (bool, error) {
244         msg, err := NewBlocksMessage(blocks)
245         if err != nil {
246                 return false, errors.Wrap(err, "fail on NewBlocksMessage")
247         }
248
249         if ok := p.TrySend(BlockchainChannel, struct{ BlockchainMessage }{msg}); !ok {
250                 return ok, nil
251         }
252
253         for _, block := range blocks {
254                 blcokHash := block.Hash()
255                 p.knownBlocks.Add(blcokHash.String())
256         }
257         return true, nil
258 }
259
260 func (p *peer) sendHeaders(headers []*types.BlockHeader) (bool, error) {
261         msg, err := NewHeadersMessage(headers)
262         if err != nil {
263                 return false, errors.New("fail on NewHeadersMessage")
264         }
265
266         ok := p.TrySend(BlockchainChannel, struct{ BlockchainMessage }{msg})
267         return ok, nil
268 }
269
270 func (p *peer) sendMerkleBlock(block *types.Block, txStatuses *bc.TransactionStatus) (bool, error) {
271         msg := NewMerkleBlockMessage()
272         if err := msg.setRawBlockHeader(block.BlockHeader); err != nil {
273                 return false, err
274         }
275
276         relatedTxs, relatedStatuses := p.getRelatedTxAndStatus(block.Transactions, txStatuses)
277
278         txHashes, txFlags := types.GetTxMerkleTreeProof(block.Transactions, relatedTxs)
279         if err := msg.setTxInfo(txHashes, txFlags, relatedTxs); err != nil {
280                 return false, nil
281         }
282
283         statusHashes := types.GetStatusMerkleTreeProof(txStatuses.VerifyStatus, txFlags)
284         if err := msg.setStatusInfo(statusHashes, relatedStatuses); err != nil {
285                 return false, nil
286         }
287
288         ok := p.TrySend(BlockchainChannel, struct{ BlockchainMessage }{msg})
289         return ok, nil
290 }
291
292 func (p *peer) sendTransactions(txs []*types.Tx) (bool, error) {
293         for _, tx := range txs {
294                 if p.isSPVNode() && !p.isRelatedTx(tx) {
295                         continue
296                 }
297                 msg, err := NewTransactionMessage(tx)
298                 if err != nil {
299                         return false, errors.Wrap(err, "failed to tx msg")
300                 }
301
302                 if p.knownTxs.Has(tx.ID.String()) {
303                         continue
304                 }
305                 if ok := p.TrySend(BlockchainChannel, struct{ BlockchainMessage }{msg}); !ok {
306                         return ok, nil
307                 }
308                 p.knownTxs.Add(tx.ID.String())
309         }
310         return true, nil
311 }
312
313 func (p *peer) setStatus(height uint64, hash *bc.Hash) {
314         p.mtx.Lock()
315         defer p.mtx.Unlock()
316         p.height = height
317         p.hash = hash
318 }
319
320 type peerSet struct {
321         BasePeerSet
322         mtx   sync.RWMutex
323         peers map[string]*peer
324 }
325
326 // newPeerSet creates a new peer set to track the active participants.
327 func newPeerSet(basePeerSet BasePeerSet) *peerSet {
328         return &peerSet{
329                 BasePeerSet: basePeerSet,
330                 peers:       make(map[string]*peer),
331         }
332 }
333
334 func (ps *peerSet) addBanScore(peerID string, persistent, transient uint32, reason string) {
335         ps.mtx.Lock()
336         peer := ps.peers[peerID]
337         ps.mtx.Unlock()
338
339         if peer == nil {
340                 return
341         }
342         if ban := peer.addBanScore(persistent, transient, reason); !ban {
343                 return
344         }
345         if err := ps.AddBannedPeer(peer.Addr().String()); err != nil {
346                 log.WithFields(log.Fields{"module": logModule, "err": err}).Error("fail on add ban peer")
347         }
348         ps.removePeer(peerID)
349 }
350
351 func (ps *peerSet) addPeer(peer BasePeer, height uint64, hash *bc.Hash) {
352         ps.mtx.Lock()
353         defer ps.mtx.Unlock()
354
355         if _, ok := ps.peers[peer.ID()]; !ok {
356                 ps.peers[peer.ID()] = newPeer(height, hash, peer)
357                 return
358         }
359         log.WithField("module", logModule).Warning("add existing peer to blockKeeper")
360 }
361
362 func (ps *peerSet) bestPeer(flag consensus.ServiceFlag) *peer {
363         ps.mtx.RLock()
364         defer ps.mtx.RUnlock()
365
366         var bestPeer *peer
367         for _, p := range ps.peers {
368                 if !p.services.IsEnable(flag) {
369                         continue
370                 }
371                 if bestPeer == nil || p.height > bestPeer.height || (p.height == bestPeer.height && p.IsLAN()) {
372                         bestPeer = p
373                 }
374         }
375         return bestPeer
376 }
377
378 func (ps *peerSet) broadcastMinedBlock(block *types.Block) error {
379         msg, err := NewMinedBlockMessage(block)
380         if err != nil {
381                 return errors.Wrap(err, "fail on broadcast mined block")
382         }
383
384         hash := block.Hash()
385         peers := ps.peersWithoutBlock(&hash)
386         for _, peer := range peers {
387                 if peer.isSPVNode() {
388                         continue
389                 }
390                 if ok := peer.TrySend(BlockchainChannel, struct{ BlockchainMessage }{msg}); !ok {
391                         log.WithFields(log.Fields{"module": logModule, "peer": peer.Addr(), "type": reflect.TypeOf(msg), "message": msg.String()}).Warning("send message to peer error")
392                         ps.removePeer(peer.ID())
393                         continue
394                 }
395                 peer.markBlock(&hash)
396         }
397         return nil
398 }
399
400 func (ps *peerSet) broadcastNewStatus(bestBlock, genesisBlock *types.Block) error {
401         bestBlockHash := bestBlock.Hash()
402         peers := ps.peersWithoutBlock(&bestBlockHash)
403
404         genesisHash := genesisBlock.Hash()
405         msg := NewStatusResponseMessage(&bestBlock.BlockHeader, &genesisHash)
406         for _, peer := range peers {
407                 if ok := peer.TrySend(BlockchainChannel, struct{ BlockchainMessage }{msg}); !ok {
408                         log.WithFields(log.Fields{"module": logModule, "peer": peer.Addr(), "type": reflect.TypeOf(msg), "message": msg.String()}).Warning("send message to peer error")
409                         ps.removePeer(peer.ID())
410                         continue
411                 }
412         }
413         return nil
414 }
415
416 func (ps *peerSet) broadcastTx(tx *types.Tx) error {
417         msg, err := NewTransactionMessage(tx)
418         if err != nil {
419                 return errors.Wrap(err, "fail on broadcast tx")
420         }
421
422         peers := ps.peersWithoutTx(&tx.ID)
423         for _, peer := range peers {
424                 if peer.isSPVNode() && !peer.isRelatedTx(tx) {
425                         continue
426                 }
427                 if ok := peer.TrySend(BlockchainChannel, struct{ BlockchainMessage }{msg}); !ok {
428                         log.WithFields(log.Fields{
429                                 "module":  logModule,
430                                 "peer":    peer.Addr(),
431                                 "type":    reflect.TypeOf(msg),
432                                 "message": msg.String(),
433                         }).Warning("send message to peer error")
434                         ps.removePeer(peer.ID())
435                         continue
436                 }
437                 peer.markTransaction(&tx.ID)
438         }
439         return nil
440 }
441
442 func (ps *peerSet) errorHandler(peerID string, err error) {
443         if errors.Root(err) == errPeerMisbehave {
444                 ps.addBanScore(peerID, 20, 0, err.Error())
445         } else {
446                 ps.removePeer(peerID)
447         }
448 }
449
450 // Peer retrieves the registered peer with the given id.
451 func (ps *peerSet) getPeer(id string) *peer {
452         ps.mtx.RLock()
453         defer ps.mtx.RUnlock()
454         return ps.peers[id]
455 }
456
457 func (ps *peerSet) getPeerInfos() []*PeerInfo {
458         ps.mtx.RLock()
459         defer ps.mtx.RUnlock()
460
461         result := []*PeerInfo{}
462         for _, peer := range ps.peers {
463                 result = append(result, peer.getPeerInfo())
464         }
465         return result
466 }
467
468 func (ps *peerSet) peersWithoutBlock(hash *bc.Hash) []*peer {
469         ps.mtx.RLock()
470         defer ps.mtx.RUnlock()
471
472         peers := []*peer{}
473         for _, peer := range ps.peers {
474                 if !peer.knownBlocks.Has(hash.String()) {
475                         peers = append(peers, peer)
476                 }
477         }
478         return peers
479 }
480
481 func (ps *peerSet) peersWithoutTx(hash *bc.Hash) []*peer {
482         ps.mtx.RLock()
483         defer ps.mtx.RUnlock()
484
485         peers := []*peer{}
486         for _, peer := range ps.peers {
487                 if !peer.knownTxs.Has(hash.String()) {
488                         peers = append(peers, peer)
489                 }
490         }
491         return peers
492 }
493
494 func (ps *peerSet) removePeer(peerID string) {
495         ps.mtx.Lock()
496         delete(ps.peers, peerID)
497         ps.mtx.Unlock()
498         ps.StopPeerGracefully(peerID)
499 }