OSDN Git Service

0f62149e5dea56826f556e427a4710bd9fbe208f
[bytom/vapor.git] / netsync / peers / peer.go
1 package peers
2
3 import (
4         "encoding/hex"
5         "net"
6         "reflect"
7         "sync"
8
9         log "github.com/sirupsen/logrus"
10         "github.com/tendermint/tmlibs/flowrate"
11         "gopkg.in/fatih/set.v0"
12
13         "github.com/vapor/consensus"
14         "github.com/vapor/errors"
15         msgs "github.com/vapor/netsync/messages"
16         "github.com/vapor/p2p/trust"
17         "github.com/vapor/protocol/bc"
18         "github.com/vapor/protocol/bc/types"
19 )
20
21 const (
22         maxKnownTxs           = 32768 // Maximum transactions hashes to keep in the known list (prevent DOS)
23         maxKnownSignatures    = 1024  // Maximum block signatures to keep in the known list (prevent DOS)
24         maxKnownBlocks        = 1024  // Maximum block hashes to keep in the known list (prevent DOS)
25         defaultBanThreshold   = uint32(100)
26         maxFilterAddressSize  = 50
27         maxFilterAddressCount = 1000
28
29         logModule = "peers"
30 )
31
32 var (
33         errSendStatusMsg = errors.New("send status msg fail")
34         ErrPeerMisbehave = errors.New("peer is misbehave")
35 )
36
37 //BasePeer is the interface for connection level peer
38 type BasePeer interface {
39         Addr() net.Addr
40         ID() string
41         ServiceFlag() consensus.ServiceFlag
42         TrafficStatus() (*flowrate.Status, *flowrate.Status)
43         TrySend(byte, interface{}) bool
44         IsLAN() bool
45 }
46
47 //BasePeerSet is the intergace for connection level peer manager
48 type BasePeerSet interface {
49         AddBannedPeer(string) error
50         StopPeerGracefully(string)
51 }
52
53 type BroadcastMsg interface {
54         FilterTargetPeers(ps *PeerSet) []string
55         MarkSendRecord(ps *PeerSet, peers []string)
56         GetChan() byte
57         GetMsg() interface{}
58         MsgString() string
59 }
60
61 // PeerInfo indicate peer status snap
62 type PeerInfo struct {
63         ID                  string `json:"peer_id"`
64         RemoteAddr          string `json:"remote_addr"`
65         Height              uint64 `json:"height"`
66         Ping                string `json:"ping"`
67         Duration            string `json:"duration"`
68         TotalSent           int64  `json:"total_sent"`
69         TotalReceived       int64  `json:"total_received"`
70         AverageSentRate     int64  `json:"average_sent_rate"`
71         AverageReceivedRate int64  `json:"average_received_rate"`
72         CurrentSentRate     int64  `json:"current_sent_rate"`
73         CurrentReceivedRate int64  `json:"current_received_rate"`
74 }
75
76 type Peer struct {
77         BasePeer
78         mtx             sync.RWMutex
79         services        consensus.ServiceFlag
80         height          uint64
81         hash            *bc.Hash
82         banScore        trust.DynamicBanScore
83         knownTxs        *set.Set // Set of transaction hashes known to be known by this peer
84         knownBlocks     *set.Set // Set of block hashes known to be known by this peer
85         knownSignatures *set.Set // Set of block signatures known to be known by this peer
86         knownStatus     uint64   // Set of chain status known to be known by this peer
87         filterAdds      *set.Set // Set of addresses that the spv node cares about.
88 }
89
90 func newPeer(basePeer BasePeer) *Peer {
91         return &Peer{
92                 BasePeer:        basePeer,
93                 services:        basePeer.ServiceFlag(),
94                 knownTxs:        set.New(),
95                 knownBlocks:     set.New(),
96                 knownSignatures: set.New(),
97                 filterAdds:      set.New(),
98         }
99 }
100
101 func (p *Peer) Height() uint64 {
102         p.mtx.RLock()
103         defer p.mtx.RUnlock()
104         return p.height
105 }
106
107 func (p *Peer) addBanScore(persistent, transient uint32, reason string) bool {
108         score := p.banScore.Increase(persistent, transient)
109         if score > defaultBanThreshold {
110                 log.WithFields(log.Fields{
111                         "module":  logModule,
112                         "address": p.Addr(),
113                         "score":   score,
114                         "reason":  reason,
115                 }).Errorf("banning and disconnecting")
116                 return true
117         }
118
119         warnThreshold := defaultBanThreshold >> 1
120         if score > warnThreshold {
121                 log.WithFields(log.Fields{
122                         "module":  logModule,
123                         "address": p.Addr(),
124                         "score":   score,
125                         "reason":  reason,
126                 }).Warning("ban score increasing")
127         }
128         return false
129 }
130
131 func (p *Peer) AddFilterAddress(address []byte) {
132         p.mtx.Lock()
133         defer p.mtx.Unlock()
134
135         if p.filterAdds.Size() >= maxFilterAddressCount {
136                 log.WithField("module", logModule).Warn("the count of filter addresses is greater than limit")
137                 return
138         }
139         if len(address) > maxFilterAddressSize {
140                 log.WithField("module", logModule).Warn("the size of filter address is greater than limit")
141                 return
142         }
143
144         p.filterAdds.Add(hex.EncodeToString(address))
145 }
146
147 func (p *Peer) AddFilterAddresses(addresses [][]byte) {
148         if !p.filterAdds.IsEmpty() {
149                 p.filterAdds.Clear()
150         }
151         for _, address := range addresses {
152                 p.AddFilterAddress(address)
153         }
154 }
155
156 func (p *Peer) FilterClear() {
157         p.filterAdds.Clear()
158 }
159
160 func (p *Peer) GetBlockByHeight(height uint64) bool {
161         msg := struct{ msgs.BlockchainMessage }{&msgs.GetBlockMessage{Height: height}}
162         return p.TrySend(msgs.BlockchainChannel, msg)
163 }
164
165 func (p *Peer) GetBlocks(locator []*bc.Hash, stopHash *bc.Hash) bool {
166         msg := struct{ msgs.BlockchainMessage }{msgs.NewGetBlocksMessage(locator, stopHash)}
167         return p.TrySend(msgs.BlockchainChannel, msg)
168 }
169
170 func (p *Peer) GetHeaders(locator []*bc.Hash, stopHash *bc.Hash) bool {
171         msg := struct{ msgs.BlockchainMessage }{msgs.NewGetHeadersMessage(locator, stopHash)}
172         return p.TrySend(msgs.BlockchainChannel, msg)
173 }
174
175 func (p *Peer) GetPeerInfo() *PeerInfo {
176         p.mtx.RLock()
177         defer p.mtx.RUnlock()
178
179         sentStatus, receivedStatus := p.TrafficStatus()
180         ping := sentStatus.Idle - receivedStatus.Idle
181         if receivedStatus.Idle > sentStatus.Idle {
182                 ping = -ping
183         }
184
185         return &PeerInfo{
186                 ID:                  p.ID(),
187                 RemoteAddr:          p.Addr().String(),
188                 Height:              p.height,
189                 Ping:                ping.String(),
190                 Duration:            sentStatus.Duration.String(),
191                 TotalSent:           sentStatus.Bytes,
192                 TotalReceived:       receivedStatus.Bytes,
193                 AverageSentRate:     sentStatus.AvgRate,
194                 AverageReceivedRate: receivedStatus.AvgRate,
195                 CurrentSentRate:     sentStatus.CurRate,
196                 CurrentReceivedRate: receivedStatus.CurRate,
197         }
198 }
199
200 func (p *Peer) getRelatedTxAndStatus(txs []*types.Tx, txStatuses *bc.TransactionStatus) ([]*types.Tx, []*bc.TxVerifyResult) {
201         var relatedTxs []*types.Tx
202         var relatedStatuses []*bc.TxVerifyResult
203         for i, tx := range txs {
204                 if p.isRelatedTx(tx) {
205                         relatedTxs = append(relatedTxs, tx)
206                         relatedStatuses = append(relatedStatuses, txStatuses.VerifyStatus[i])
207                 }
208         }
209         return relatedTxs, relatedStatuses
210 }
211
212 func (p *Peer) isRelatedTx(tx *types.Tx) bool {
213         for _, input := range tx.Inputs {
214                 switch inp := input.TypedInput.(type) {
215                 case *types.SpendInput:
216                         if p.filterAdds.Has(hex.EncodeToString(inp.ControlProgram)) {
217                                 return true
218                         }
219                 }
220         }
221         for _, output := range tx.Outputs {
222                 if p.filterAdds.Has(hex.EncodeToString(output.ControlProgram())) {
223                         return true
224                 }
225         }
226         return false
227 }
228
229 func (p *Peer) isSPVNode() bool {
230         return !p.services.IsEnable(consensus.SFFullNode)
231 }
232
233 func (p *Peer) MarkBlock(hash *bc.Hash) {
234         p.mtx.Lock()
235         defer p.mtx.Unlock()
236
237         for p.knownBlocks.Size() >= maxKnownBlocks {
238                 p.knownBlocks.Pop()
239         }
240         p.knownBlocks.Add(hash.String())
241 }
242
243 func (p *Peer) markNewStatus(height uint64) {
244         p.mtx.Lock()
245         defer p.mtx.Unlock()
246
247         p.knownStatus = height
248 }
249
250 func (p *Peer) markSign(signature []byte) {
251         p.mtx.Lock()
252         defer p.mtx.Unlock()
253
254         for p.knownSignatures.Size() >= maxKnownSignatures {
255                 p.knownSignatures.Pop()
256         }
257         p.knownSignatures.Add(hex.EncodeToString(signature))
258 }
259
260 func (p *Peer) markTransaction(hash *bc.Hash) {
261         p.mtx.Lock()
262         defer p.mtx.Unlock()
263
264         for p.knownTxs.Size() >= maxKnownTxs {
265                 p.knownTxs.Pop()
266         }
267         p.knownTxs.Add(hash.String())
268 }
269
270 func (ps *PeerSet) PeersWithoutBlock(hash bc.Hash) []string {
271         ps.mtx.RLock()
272         defer ps.mtx.RUnlock()
273
274         var peers []string
275         for _, peer := range ps.peers {
276                 if !peer.knownBlocks.Has(hash.String()) {
277                         peers = append(peers, peer.ID())
278                 }
279         }
280         return peers
281 }
282
283 func (ps *PeerSet) PeersWithoutSign(signature []byte) []string {
284         ps.mtx.RLock()
285         defer ps.mtx.RUnlock()
286
287         var peers []string
288         for _, peer := range ps.peers {
289                 if !peer.knownSignatures.Has(hex.EncodeToString(signature)) {
290                         peers = append(peers, peer.ID())
291                 }
292         }
293         return peers
294 }
295
296 func (p *Peer) SendBlock(block *types.Block) (bool, error) {
297         msg, err := msgs.NewBlockMessage(block)
298         if err != nil {
299                 return false, errors.Wrap(err, "fail on NewBlockMessage")
300         }
301
302         ok := p.TrySend(msgs.BlockchainChannel, struct{ msgs.BlockchainMessage }{msg})
303         if ok {
304                 blcokHash := block.Hash()
305                 p.knownBlocks.Add(blcokHash.String())
306         }
307         return ok, nil
308 }
309
310 func (p *Peer) SendBlocks(blocks []*types.Block) (bool, error) {
311         msg, err := msgs.NewBlocksMessage(blocks)
312         if err != nil {
313                 return false, errors.Wrap(err, "fail on NewBlocksMessage")
314         }
315
316         if ok := p.TrySend(msgs.BlockchainChannel, struct{ msgs.BlockchainMessage }{msg}); !ok {
317                 return ok, nil
318         }
319
320         for _, block := range blocks {
321                 blcokHash := block.Hash()
322                 p.knownBlocks.Add(blcokHash.String())
323         }
324         return true, nil
325 }
326
327 func (p *Peer) SendHeaders(headers []*types.BlockHeader) (bool, error) {
328         msg, err := msgs.NewHeadersMessage(headers)
329         if err != nil {
330                 return false, errors.New("fail on NewHeadersMessage")
331         }
332
333         ok := p.TrySend(msgs.BlockchainChannel, struct{ msgs.BlockchainMessage }{msg})
334         return ok, nil
335 }
336
337 func (p *Peer) SendMerkleBlock(block *types.Block, txStatuses *bc.TransactionStatus) (bool, error) {
338         msg := msgs.NewMerkleBlockMessage()
339         if err := msg.SetRawBlockHeader(block.BlockHeader); err != nil {
340                 return false, err
341         }
342
343         relatedTxs, relatedStatuses := p.getRelatedTxAndStatus(block.Transactions, txStatuses)
344
345         txHashes, txFlags := types.GetTxMerkleTreeProof(block.Transactions, relatedTxs)
346         if err := msg.SetTxInfo(txHashes, txFlags, relatedTxs); err != nil {
347                 return false, nil
348         }
349
350         statusHashes := types.GetStatusMerkleTreeProof(txStatuses.VerifyStatus, txFlags)
351         if err := msg.SetStatusInfo(statusHashes, relatedStatuses); err != nil {
352                 return false, nil
353         }
354
355         ok := p.TrySend(msgs.BlockchainChannel, struct{ msgs.BlockchainMessage }{msg})
356         return ok, nil
357 }
358
359 func (p *Peer) SendTransactions(txs []*types.Tx) error {
360         validTxs := make([]*types.Tx, 0, len(txs))
361         for i, tx := range txs {
362                 if p.isSPVNode() && !p.isRelatedTx(tx) || p.knownTxs.Has(tx.ID.String()) {
363                         continue
364                 }
365
366                 validTxs = append(validTxs, tx)
367                 if len(validTxs) != msgs.TxsMsgMaxTxNum && i != len(txs)-1 {
368                         continue
369                 }
370
371                 msg, err := msgs.NewTransactionsMessage(validTxs)
372                 if err != nil {
373                         return err
374                 }
375
376                 if ok := p.TrySend(msgs.BlockchainChannel, struct{ msgs.BlockchainMessage }{msg}); !ok {
377                         return errors.New("failed to send txs msg")
378                 }
379
380                 for _, validTx := range validTxs {
381                         p.knownTxs.Add(validTx.ID.String())
382                 }
383
384                 validTxs = make([]*types.Tx, 0, len(txs))
385         }
386
387         return nil
388 }
389
390 func (p *Peer) SendStatus(header *types.BlockHeader) error {
391         msg := msgs.NewStatusMessage(header)
392         if ok := p.TrySend(msgs.BlockchainChannel, struct{ msgs.BlockchainMessage }{msg}); !ok {
393                 return errSendStatusMsg
394         }
395         p.markNewStatus(header.Height)
396         return nil
397 }
398
399 func (p *Peer) SetStatus(height uint64, hash *bc.Hash) {
400         p.mtx.Lock()
401         defer p.mtx.Unlock()
402         p.height = height
403         p.hash = hash
404 }
405
406 type PeerSet struct {
407         BasePeerSet
408         mtx   sync.RWMutex
409         peers map[string]*Peer
410 }
411
412 // newPeerSet creates a new peer set to track the active participants.
413 func NewPeerSet(basePeerSet BasePeerSet) *PeerSet {
414         return &PeerSet{
415                 BasePeerSet: basePeerSet,
416                 peers:       make(map[string]*Peer),
417         }
418 }
419
420 func (ps *PeerSet) AddBanScore(peerID string, persistent, transient uint32, reason string) {
421         ps.mtx.Lock()
422         peer := ps.peers[peerID]
423         ps.mtx.Unlock()
424
425         if peer == nil {
426                 return
427         }
428         if ban := peer.addBanScore(persistent, transient, reason); !ban {
429                 return
430         }
431         if err := ps.AddBannedPeer(peer.Addr().String()); err != nil {
432                 log.WithFields(log.Fields{"module": logModule, "err": err}).Error("fail on add ban peer")
433         }
434         ps.RemovePeer(peerID)
435 }
436
437 func (ps *PeerSet) AddPeer(peer BasePeer) {
438         ps.mtx.Lock()
439         defer ps.mtx.Unlock()
440
441         if _, ok := ps.peers[peer.ID()]; !ok {
442                 ps.peers[peer.ID()] = newPeer(peer)
443                 return
444         }
445         log.WithField("module", logModule).Warning("add existing peer to blockKeeper")
446 }
447
448 func (ps *PeerSet) BestPeer(flag consensus.ServiceFlag) *Peer {
449         ps.mtx.RLock()
450         defer ps.mtx.RUnlock()
451
452         var bestPeer *Peer
453         for _, p := range ps.peers {
454                 if !p.services.IsEnable(flag) {
455                         continue
456                 }
457                 if bestPeer == nil || p.height > bestPeer.height || (p.height == bestPeer.height && p.IsLAN()) {
458                         bestPeer = p
459                 }
460         }
461         return bestPeer
462 }
463
464 //SendMsg send message to the target peer.
465 func (ps *PeerSet) SendMsg(peerID string, msgChannel byte, msg interface{}) bool {
466         peer := ps.GetPeer(peerID)
467         if peer == nil {
468                 return false
469         }
470
471         ok := peer.TrySend(msgChannel, msg)
472         if !ok {
473                 ps.RemovePeer(peerID)
474         }
475         return ok
476 }
477
478 //BroadcastMsg Broadcast message to the target peers
479 // and mark the message send record
480 func (ps *PeerSet) BroadcastMsg(bm BroadcastMsg) error {
481         //filter target peers
482         peers := bm.FilterTargetPeers(ps)
483
484         //broadcast to target peers
485         peersSuccess := make([]string, 0)
486         for _, peer := range peers {
487                 if ok := ps.SendMsg(peer, bm.GetChan(), bm.GetMsg()); !ok {
488                         log.WithFields(log.Fields{"module": logModule, "peer": peer, "type": reflect.TypeOf(bm.GetMsg()), "message": bm.MsgString()}).Warning("send message to peer error")
489                         continue
490                 }
491                 peersSuccess = append(peersSuccess, peer)
492         }
493
494         //mark the message send record
495         bm.MarkSendRecord(ps, peersSuccess)
496         return nil
497 }
498
499 func (ps *PeerSet) BroadcastNewStatus(bestBlock *types.Block) error {
500         msg := msgs.NewStatusMessage(&bestBlock.BlockHeader)
501         peers := ps.peersWithoutNewStatus(bestBlock.Height)
502         for _, peer := range peers {
503                 if ok := peer.TrySend(msgs.BlockchainChannel, struct{ msgs.BlockchainMessage }{msg}); !ok {
504                         ps.RemovePeer(peer.ID())
505                         continue
506                 }
507
508                 peer.markNewStatus(bestBlock.Height)
509         }
510         return nil
511 }
512
513 func (ps *PeerSet) BroadcastTx(tx *types.Tx) error {
514         msg, err := msgs.NewTransactionMessage(tx)
515         if err != nil {
516                 return errors.Wrap(err, "fail on broadcast tx")
517         }
518
519         peers := ps.peersWithoutTx(&tx.ID)
520         for _, peer := range peers {
521                 if peer.isSPVNode() && !peer.isRelatedTx(tx) {
522                         continue
523                 }
524                 if ok := peer.TrySend(msgs.BlockchainChannel, struct{ msgs.BlockchainMessage }{msg}); !ok {
525                         log.WithFields(log.Fields{
526                                 "module":  logModule,
527                                 "peer":    peer.Addr(),
528                                 "type":    reflect.TypeOf(msg),
529                                 "message": msg.String(),
530                         }).Warning("send message to peer error")
531                         ps.RemovePeer(peer.ID())
532                         continue
533                 }
534                 peer.markTransaction(&tx.ID)
535         }
536         return nil
537 }
538
539 func (ps *PeerSet) ErrorHandler(peerID string, err error) {
540         if errors.Root(err) == ErrPeerMisbehave {
541                 ps.AddBanScore(peerID, 20, 0, err.Error())
542         } else {
543                 ps.RemovePeer(peerID)
544         }
545 }
546
547 // Peer retrieves the registered peer with the given id.
548 func (ps *PeerSet) GetPeer(id string) *Peer {
549         ps.mtx.RLock()
550         defer ps.mtx.RUnlock()
551         return ps.peers[id]
552 }
553
554 func (ps *PeerSet) GetPeerInfos() []*PeerInfo {
555         ps.mtx.RLock()
556         defer ps.mtx.RUnlock()
557
558         result := []*PeerInfo{}
559         for _, peer := range ps.peers {
560                 result = append(result, peer.GetPeerInfo())
561         }
562         return result
563 }
564
565 func (ps *PeerSet) MarkBlock(peerID string, hash *bc.Hash) {
566         peer := ps.GetPeer(peerID)
567         if peer == nil {
568                 return
569         }
570         peer.MarkBlock(hash)
571 }
572
573 func (ps *PeerSet) MarkBlockSignature(peerID string, signature []byte) {
574         peer := ps.GetPeer(peerID)
575         if peer == nil {
576                 return
577         }
578         peer.markSign(signature)
579 }
580
581 func (ps *PeerSet) MarkStatus(peerID string, height uint64) {
582         peer := ps.GetPeer(peerID)
583         if peer == nil {
584                 return
585         }
586         peer.markNewStatus(height)
587 }
588
589 func (ps *PeerSet) MarkTx(peerID string, txHash bc.Hash) {
590         ps.mtx.Lock()
591         peer := ps.peers[peerID]
592         ps.mtx.Unlock()
593
594         if peer == nil {
595                 return
596         }
597         peer.markTransaction(&txHash)
598 }
599
600 func (ps *PeerSet) peersWithoutBlock(hash *bc.Hash) []*Peer {
601         ps.mtx.RLock()
602         defer ps.mtx.RUnlock()
603
604         peers := []*Peer{}
605         for _, peer := range ps.peers {
606                 if !peer.knownBlocks.Has(hash.String()) {
607                         peers = append(peers, peer)
608                 }
609         }
610         return peers
611 }
612
613 func (ps *PeerSet) peersWithoutNewStatus(height uint64) []*Peer {
614         ps.mtx.RLock()
615         defer ps.mtx.RUnlock()
616
617         var peers []*Peer
618         for _, peer := range ps.peers {
619                 if peer.knownStatus < height {
620                         peers = append(peers, peer)
621                 }
622         }
623         return peers
624 }
625
626 func (ps *PeerSet) peersWithoutTx(hash *bc.Hash) []*Peer {
627         ps.mtx.RLock()
628         defer ps.mtx.RUnlock()
629
630         peers := []*Peer{}
631         for _, peer := range ps.peers {
632                 if !peer.knownTxs.Has(hash.String()) {
633                         peers = append(peers, peer)
634                 }
635         }
636         return peers
637 }
638
639 func (ps *PeerSet) RemovePeer(peerID string) {
640         ps.mtx.Lock()
641         delete(ps.peers, peerID)
642         ps.mtx.Unlock()
643         ps.StopPeerGracefully(peerID)
644 }
645
646 func (ps *PeerSet) SetStatus(peerID string, height uint64, hash *bc.Hash) {
647         peer := ps.GetPeer(peerID)
648         if peer == nil {
649                 return
650         }
651
652         peer.SetStatus(height, hash)
653 }