OSDN Git Service

ef90812f8f4e449b909b47cede1adb696d3d5ea5
[bytom/vapor.git] / netsync / peers / peer.go
1 package peers
2
3 import (
4         "encoding/hex"
5         "net"
6         "reflect"
7         "sync"
8
9         log "github.com/sirupsen/logrus"
10         "github.com/tendermint/tmlibs/flowrate"
11         "gopkg.in/fatih/set.v0"
12
13         "github.com/vapor/consensus"
14         "github.com/vapor/errors"
15         msgs "github.com/vapor/netsync/messages"
16         "github.com/vapor/protocol/bc"
17         "github.com/vapor/protocol/bc/types"
18 )
19
20 const (
21         maxKnownTxs           = 32768 // Maximum transactions hashes to keep in the known list (prevent DOS)
22         maxKnownSignatures    = 1024  // Maximum block signatures to keep in the known list (prevent DOS)
23         maxKnownBlocks        = 1024  // Maximum block hashes to keep in the known list (prevent DOS)
24         maxFilterAddressSize  = 50
25         maxFilterAddressCount = 1000
26
27         logModule = "peers"
28 )
29
30 var (
31         errSendStatusMsg = errors.New("send status msg fail")
32         ErrPeerMisbehave = errors.New("peer is misbehave")
33         ErrNoValidPeer   = errors.New("Can't find valid fast sync peer")
34 )
35
36 //BasePeer is the interface for connection level peer
37 type BasePeer interface {
38         Addr() net.Addr
39         ID() string
40         RemoteAddrHost() string
41         ServiceFlag() consensus.ServiceFlag
42         TrafficStatus() (*flowrate.Status, *flowrate.Status)
43         TrySend(byte, interface{}) bool
44         IsLAN() bool
45 }
46
47 //BasePeerSet is the intergace for connection level peer manager
48 type BasePeerSet interface {
49         StopPeerGracefully(string)
50         IsBanned(ip string, level byte, reason string) bool
51 }
52
53 type BroadcastMsg interface {
54         FilterTargetPeers(ps *PeerSet) []string
55         MarkSendRecord(ps *PeerSet, peers []string)
56         GetChan() byte
57         GetMsg() interface{}
58         MsgString() string
59 }
60
61 // PeerInfo indicate peer status snap
62 type PeerInfo struct {
63         ID                  string `json:"peer_id"`
64         RemoteAddr          string `json:"remote_addr"`
65         Height              uint64 `json:"height"`
66         Ping                string `json:"ping"`
67         Duration            string `json:"duration"`
68         TotalSent           int64  `json:"total_sent"`
69         TotalReceived       int64  `json:"total_received"`
70         AverageSentRate     int64  `json:"average_sent_rate"`
71         AverageReceivedRate int64  `json:"average_received_rate"`
72         CurrentSentRate     int64  `json:"current_sent_rate"`
73         CurrentReceivedRate int64  `json:"current_received_rate"`
74 }
75
76 type Peer struct {
77         BasePeer
78         mtx                sync.RWMutex
79         services           consensus.ServiceFlag
80         bestHeight         uint64
81         bestHash           *bc.Hash
82         irreversibleHeight uint64
83         irreversibleHash   *bc.Hash
84         knownTxs           *set.Set // Set of transaction hashes known to be known by this peer
85         knownBlocks        *set.Set // Set of block hashes known to be known by this peer
86         knownSignatures    *set.Set // Set of block signatures known to be known by this peer
87         knownStatus        uint64   // Set of chain status known to be known by this peer
88         filterAdds         *set.Set // Set of addresses that the spv node cares about.
89 }
90
91 func newPeer(basePeer BasePeer) *Peer {
92         return &Peer{
93                 BasePeer:        basePeer,
94                 services:        basePeer.ServiceFlag(),
95                 knownTxs:        set.New(),
96                 knownBlocks:     set.New(),
97                 knownSignatures: set.New(),
98                 filterAdds:      set.New(),
99         }
100 }
101
102 func (p *Peer) Height() uint64 {
103         p.mtx.RLock()
104         defer p.mtx.RUnlock()
105
106         return p.bestHeight
107 }
108
109 func (p *Peer) IrreversibleHeight() uint64 {
110         p.mtx.RLock()
111         defer p.mtx.RUnlock()
112
113         return p.irreversibleHeight
114 }
115
116 func (p *Peer) AddFilterAddress(address []byte) {
117         p.mtx.Lock()
118         defer p.mtx.Unlock()
119
120         if p.filterAdds.Size() >= maxFilterAddressCount {
121                 log.WithField("module", logModule).Warn("the count of filter addresses is greater than limit")
122                 return
123         }
124         if len(address) > maxFilterAddressSize {
125                 log.WithField("module", logModule).Warn("the size of filter address is greater than limit")
126                 return
127         }
128
129         p.filterAdds.Add(hex.EncodeToString(address))
130 }
131
132 func (p *Peer) AddFilterAddresses(addresses [][]byte) {
133         if !p.filterAdds.IsEmpty() {
134                 p.filterAdds.Clear()
135         }
136         for _, address := range addresses {
137                 p.AddFilterAddress(address)
138         }
139 }
140
141 func (p *Peer) FilterClear() {
142         p.filterAdds.Clear()
143 }
144
145 func (p *Peer) GetBlockByHeight(height uint64) bool {
146         msg := struct{ msgs.BlockchainMessage }{&msgs.GetBlockMessage{Height: height}}
147         return p.TrySend(msgs.BlockchainChannel, msg)
148 }
149
150 func (p *Peer) GetBlocks(locator []*bc.Hash, stopHash *bc.Hash) bool {
151         msg := struct{ msgs.BlockchainMessage }{msgs.NewGetBlocksMessage(locator, stopHash)}
152         return p.TrySend(msgs.BlockchainChannel, msg)
153 }
154
155 func (p *Peer) GetHeaders(locator []*bc.Hash, stopHash *bc.Hash, skip uint64) bool {
156         msg := struct{ msgs.BlockchainMessage }{msgs.NewGetHeadersMessage(locator, stopHash, skip)}
157         return p.TrySend(msgs.BlockchainChannel, msg)
158 }
159
160 func (p *Peer) GetPeerInfo() *PeerInfo {
161         p.mtx.RLock()
162         defer p.mtx.RUnlock()
163
164         sentStatus, receivedStatus := p.TrafficStatus()
165         ping := sentStatus.Idle - receivedStatus.Idle
166         if receivedStatus.Idle > sentStatus.Idle {
167                 ping = -ping
168         }
169
170         return &PeerInfo{
171                 ID:                  p.ID(),
172                 RemoteAddr:          p.Addr().String(),
173                 Height:              p.bestHeight,
174                 Ping:                ping.String(),
175                 Duration:            sentStatus.Duration.String(),
176                 TotalSent:           sentStatus.Bytes,
177                 TotalReceived:       receivedStatus.Bytes,
178                 AverageSentRate:     sentStatus.AvgRate,
179                 AverageReceivedRate: receivedStatus.AvgRate,
180                 CurrentSentRate:     sentStatus.CurRate,
181                 CurrentReceivedRate: receivedStatus.CurRate,
182         }
183 }
184
185 func (p *Peer) getRelatedTxAndStatus(txs []*types.Tx, txStatuses *bc.TransactionStatus) ([]*types.Tx, []*bc.TxVerifyResult) {
186         var relatedTxs []*types.Tx
187         var relatedStatuses []*bc.TxVerifyResult
188         for i, tx := range txs {
189                 if p.isRelatedTx(tx) {
190                         relatedTxs = append(relatedTxs, tx)
191                         relatedStatuses = append(relatedStatuses, txStatuses.VerifyStatus[i])
192                 }
193         }
194         return relatedTxs, relatedStatuses
195 }
196
197 func (p *Peer) isRelatedTx(tx *types.Tx) bool {
198         for _, input := range tx.Inputs {
199                 switch inp := input.TypedInput.(type) {
200                 case *types.SpendInput:
201                         if p.filterAdds.Has(hex.EncodeToString(inp.ControlProgram)) {
202                                 return true
203                         }
204                 }
205         }
206         for _, output := range tx.Outputs {
207                 if p.filterAdds.Has(hex.EncodeToString(output.ControlProgram())) {
208                         return true
209                 }
210         }
211         return false
212 }
213
214 func (p *Peer) isSPVNode() bool {
215         return !p.services.IsEnable(consensus.SFFullNode)
216 }
217
218 func (p *Peer) MarkBlock(hash *bc.Hash) {
219         p.mtx.Lock()
220         defer p.mtx.Unlock()
221
222         for p.knownBlocks.Size() >= maxKnownBlocks {
223                 p.knownBlocks.Pop()
224         }
225         p.knownBlocks.Add(hash.String())
226 }
227
228 func (p *Peer) markNewStatus(height uint64) {
229         p.mtx.Lock()
230         defer p.mtx.Unlock()
231
232         p.knownStatus = height
233 }
234
235 func (p *Peer) markSign(signature []byte) {
236         p.mtx.Lock()
237         defer p.mtx.Unlock()
238
239         for p.knownSignatures.Size() >= maxKnownSignatures {
240                 p.knownSignatures.Pop()
241         }
242         p.knownSignatures.Add(hex.EncodeToString(signature))
243 }
244
245 func (p *Peer) markTransaction(hash *bc.Hash) {
246         p.mtx.Lock()
247         defer p.mtx.Unlock()
248
249         for p.knownTxs.Size() >= maxKnownTxs {
250                 p.knownTxs.Pop()
251         }
252         p.knownTxs.Add(hash.String())
253 }
254
255 func (ps *PeerSet) PeersWithoutBlock(hash bc.Hash) []string {
256         ps.mtx.RLock()
257         defer ps.mtx.RUnlock()
258
259         var peers []string
260         for _, peer := range ps.peers {
261                 if !peer.knownBlocks.Has(hash.String()) {
262                         peers = append(peers, peer.ID())
263                 }
264         }
265         return peers
266 }
267
268 func (ps *PeerSet) PeersWithoutSign(signature []byte) []string {
269         ps.mtx.RLock()
270         defer ps.mtx.RUnlock()
271
272         var peers []string
273         for _, peer := range ps.peers {
274                 if !peer.knownSignatures.Has(hex.EncodeToString(signature)) {
275                         peers = append(peers, peer.ID())
276                 }
277         }
278         return peers
279 }
280
281 func (p *Peer) SendBlock(block *types.Block) (bool, error) {
282         msg, err := msgs.NewBlockMessage(block)
283         if err != nil {
284                 return false, errors.Wrap(err, "fail on NewBlockMessage")
285         }
286
287         ok := p.TrySend(msgs.BlockchainChannel, struct{ msgs.BlockchainMessage }{msg})
288         if ok {
289                 blcokHash := block.Hash()
290                 p.knownBlocks.Add(blcokHash.String())
291         }
292         return ok, nil
293 }
294
295 func (p *Peer) SendBlocks(blocks []*types.Block) (bool, error) {
296         msg, err := msgs.NewBlocksMessage(blocks)
297         if err != nil {
298                 return false, errors.Wrap(err, "fail on NewBlocksMessage")
299         }
300
301         if ok := p.TrySend(msgs.BlockchainChannel, struct{ msgs.BlockchainMessage }{msg}); !ok {
302                 return ok, nil
303         }
304
305         for _, block := range blocks {
306                 blcokHash := block.Hash()
307                 p.knownBlocks.Add(blcokHash.String())
308         }
309         return true, nil
310 }
311
312 func (p *Peer) SendHeaders(headers []*types.BlockHeader) (bool, error) {
313         msg, err := msgs.NewHeadersMessage(headers)
314         if err != nil {
315                 return false, errors.New("fail on NewHeadersMessage")
316         }
317
318         ok := p.TrySend(msgs.BlockchainChannel, struct{ msgs.BlockchainMessage }{msg})
319         return ok, nil
320 }
321
322 func (p *Peer) SendMerkleBlock(block *types.Block, txStatuses *bc.TransactionStatus) (bool, error) {
323         msg := msgs.NewMerkleBlockMessage()
324         if err := msg.SetRawBlockHeader(block.BlockHeader); err != nil {
325                 return false, err
326         }
327
328         relatedTxs, relatedStatuses := p.getRelatedTxAndStatus(block.Transactions, txStatuses)
329
330         txHashes, txFlags := types.GetTxMerkleTreeProof(block.Transactions, relatedTxs)
331         if err := msg.SetTxInfo(txHashes, txFlags, relatedTxs); err != nil {
332                 return false, nil
333         }
334
335         statusHashes := types.GetStatusMerkleTreeProof(txStatuses.VerifyStatus, txFlags)
336         if err := msg.SetStatusInfo(statusHashes, relatedStatuses); err != nil {
337                 return false, nil
338         }
339
340         ok := p.TrySend(msgs.BlockchainChannel, struct{ msgs.BlockchainMessage }{msg})
341         return ok, nil
342 }
343
344 func (p *Peer) SendTransactions(txs []*types.Tx) error {
345         validTxs := make([]*types.Tx, 0, len(txs))
346         for i, tx := range txs {
347                 if p.isSPVNode() && !p.isRelatedTx(tx) || p.knownTxs.Has(tx.ID.String()) {
348                         continue
349                 }
350
351                 validTxs = append(validTxs, tx)
352                 if len(validTxs) != msgs.TxsMsgMaxTxNum && i != len(txs)-1 {
353                         continue
354                 }
355
356                 msg, err := msgs.NewTransactionsMessage(validTxs)
357                 if err != nil {
358                         return err
359                 }
360
361                 if ok := p.TrySend(msgs.BlockchainChannel, struct{ msgs.BlockchainMessage }{msg}); !ok {
362                         return errors.New("failed to send txs msg")
363                 }
364
365                 for _, validTx := range validTxs {
366                         p.knownTxs.Add(validTx.ID.String())
367                 }
368
369                 validTxs = make([]*types.Tx, 0, len(txs))
370         }
371
372         return nil
373 }
374
375 func (p *Peer) SendStatus(bestHeader, irreversibleHeader *types.BlockHeader) error {
376         msg := msgs.NewStatusMessage(bestHeader, irreversibleHeader)
377         if ok := p.TrySend(msgs.BlockchainChannel, struct{ msgs.BlockchainMessage }{msg}); !ok {
378                 return errSendStatusMsg
379         }
380         p.markNewStatus(bestHeader.Height)
381         return nil
382 }
383
384 func (p *Peer) SetBestStatus(bestHeight uint64, bestHash *bc.Hash) {
385         p.mtx.Lock()
386         defer p.mtx.Unlock()
387
388         p.bestHeight = bestHeight
389         p.bestHash = bestHash
390 }
391
392 func (p *Peer) SetIrreversibleStatus(irreversibleHeight uint64, irreversibleHash *bc.Hash) {
393         p.mtx.Lock()
394         defer p.mtx.Unlock()
395
396         p.irreversibleHeight = irreversibleHeight
397         p.irreversibleHash = irreversibleHash
398 }
399
400 type PeerSet struct {
401         BasePeerSet
402         mtx   sync.RWMutex
403         peers map[string]*Peer
404 }
405
406 // newPeerSet creates a new peer set to track the active participants.
407 func NewPeerSet(basePeerSet BasePeerSet) *PeerSet {
408         return &PeerSet{
409                 BasePeerSet: basePeerSet,
410                 peers:       make(map[string]*Peer),
411         }
412 }
413
414 func (ps *PeerSet) ProcessIllegal(peerID string, level byte, reason string) {
415         ps.mtx.Lock()
416         peer := ps.peers[peerID]
417         ps.mtx.Unlock()
418
419         if peer == nil {
420                 return
421         }
422
423         if banned := ps.IsBanned(peer.RemoteAddrHost(), level, reason); banned {
424                 ps.RemovePeer(peerID)
425         }
426         return
427 }
428
429 func (ps *PeerSet) AddPeer(peer BasePeer) {
430         ps.mtx.Lock()
431         defer ps.mtx.Unlock()
432
433         if _, ok := ps.peers[peer.ID()]; !ok {
434                 ps.peers[peer.ID()] = newPeer(peer)
435                 return
436         }
437         log.WithField("module", logModule).Warning("add existing peer to blockKeeper")
438 }
439
440 func (ps *PeerSet) BestPeer(flag consensus.ServiceFlag) *Peer {
441         ps.mtx.RLock()
442         defer ps.mtx.RUnlock()
443
444         var bestPeer *Peer
445         for _, p := range ps.peers {
446                 if !p.services.IsEnable(flag) {
447                         continue
448                 }
449                 if bestPeer == nil || p.bestHeight > bestPeer.bestHeight || (p.bestHeight == bestPeer.bestHeight && p.IsLAN()) {
450                         bestPeer = p
451                 }
452         }
453         return bestPeer
454 }
455
456 func (ps *PeerSet) BestIrreversiblePeer(flag consensus.ServiceFlag) *Peer {
457         ps.mtx.RLock()
458         defer ps.mtx.RUnlock()
459
460         var bestPeer *Peer
461         for _, p := range ps.peers {
462                 if !p.services.IsEnable(flag) {
463                         continue
464                 }
465                 if bestPeer == nil || p.irreversibleHeight > bestPeer.irreversibleHeight || (p.irreversibleHeight == bestPeer.irreversibleHeight && p.IsLAN()) {
466                         bestPeer = p
467                 }
468         }
469         return bestPeer
470 }
471
472 //SendMsg send message to the target peer.
473 func (ps *PeerSet) SendMsg(peerID string, msgChannel byte, msg interface{}) bool {
474         peer := ps.GetPeer(peerID)
475         if peer == nil {
476                 return false
477         }
478
479         ok := peer.TrySend(msgChannel, msg)
480         if !ok {
481                 ps.RemovePeer(peerID)
482         }
483         return ok
484 }
485
486 //BroadcastMsg Broadcast message to the target peers
487 // and mark the message send record
488 func (ps *PeerSet) BroadcastMsg(bm BroadcastMsg) error {
489         //filter target peers
490         peers := bm.FilterTargetPeers(ps)
491
492         //broadcast to target peers
493         peersSuccess := make([]string, 0)
494         for _, peer := range peers {
495                 if ok := ps.SendMsg(peer, bm.GetChan(), bm.GetMsg()); !ok {
496                         log.WithFields(log.Fields{"module": logModule, "peer": peer, "type": reflect.TypeOf(bm.GetMsg()), "message": bm.MsgString()}).Warning("send message to peer error")
497                         continue
498                 }
499                 peersSuccess = append(peersSuccess, peer)
500         }
501
502         //mark the message send record
503         bm.MarkSendRecord(ps, peersSuccess)
504         return nil
505 }
506
507 func (ps *PeerSet) BroadcastNewStatus(bestHeader, irreversibleHeader *types.BlockHeader) error {
508         msg := msgs.NewStatusMessage(bestHeader, irreversibleHeader)
509         peers := ps.peersWithoutNewStatus(bestHeader.Height)
510         for _, peer := range peers {
511                 if ok := peer.TrySend(msgs.BlockchainChannel, struct{ msgs.BlockchainMessage }{msg}); !ok {
512                         ps.RemovePeer(peer.ID())
513                         continue
514                 }
515
516                 peer.markNewStatus(bestHeader.Height)
517         }
518         return nil
519 }
520
521 func (ps *PeerSet) BroadcastTx(tx *types.Tx) error {
522         msg, err := msgs.NewTransactionMessage(tx)
523         if err != nil {
524                 return errors.Wrap(err, "fail on broadcast tx")
525         }
526
527         peers := ps.peersWithoutTx(&tx.ID)
528         for _, peer := range peers {
529                 if peer.isSPVNode() && !peer.isRelatedTx(tx) {
530                         continue
531                 }
532                 if ok := peer.TrySend(msgs.BlockchainChannel, struct{ msgs.BlockchainMessage }{msg}); !ok {
533                         log.WithFields(log.Fields{
534                                 "module":  logModule,
535                                 "peer":    peer.Addr(),
536                                 "type":    reflect.TypeOf(msg),
537                                 "message": msg.String(),
538                         }).Warning("send message to peer error")
539                         ps.RemovePeer(peer.ID())
540                         continue
541                 }
542                 peer.markTransaction(&tx.ID)
543         }
544         return nil
545 }
546
547 func (ps *PeerSet) ErrorHandler(peerID string, level byte, err error) {
548         if errors.Root(err) == ErrPeerMisbehave {
549                 ps.ProcessIllegal(peerID, level, err.Error())
550         } else {
551                 ps.RemovePeer(peerID)
552         }
553 }
554
555 // Peer retrieves the registered peer with the given id.
556 func (ps *PeerSet) GetPeer(id string) *Peer {
557         ps.mtx.RLock()
558         defer ps.mtx.RUnlock()
559         return ps.peers[id]
560 }
561
562 func (ps *PeerSet) GetPeersByHeight(height uint64) []*Peer {
563         ps.mtx.RLock()
564         defer ps.mtx.RUnlock()
565
566         peers := []*Peer{}
567         for _, peer := range ps.peers {
568                 if peer.Height() >= height {
569                         peers = append(peers, peer)
570                 }
571         }
572         return peers
573 }
574
575 func (ps *PeerSet) GetPeerInfos() []*PeerInfo {
576         ps.mtx.RLock()
577         defer ps.mtx.RUnlock()
578
579         result := []*PeerInfo{}
580         for _, peer := range ps.peers {
581                 result = append(result, peer.GetPeerInfo())
582         }
583         return result
584 }
585
586 func (ps *PeerSet) MarkBlock(peerID string, hash *bc.Hash) {
587         peer := ps.GetPeer(peerID)
588         if peer == nil {
589                 return
590         }
591         peer.MarkBlock(hash)
592 }
593
594 func (ps *PeerSet) MarkBlockSignature(peerID string, signature []byte) {
595         peer := ps.GetPeer(peerID)
596         if peer == nil {
597                 return
598         }
599         peer.markSign(signature)
600 }
601
602 func (ps *PeerSet) MarkStatus(peerID string, height uint64) {
603         peer := ps.GetPeer(peerID)
604         if peer == nil {
605                 return
606         }
607         peer.markNewStatus(height)
608 }
609
610 func (ps *PeerSet) MarkTx(peerID string, txHash bc.Hash) {
611         ps.mtx.Lock()
612         peer := ps.peers[peerID]
613         ps.mtx.Unlock()
614
615         if peer == nil {
616                 return
617         }
618         peer.markTransaction(&txHash)
619 }
620
621 func (ps *PeerSet) peersWithoutBlock(hash *bc.Hash) []*Peer {
622         ps.mtx.RLock()
623         defer ps.mtx.RUnlock()
624
625         peers := []*Peer{}
626         for _, peer := range ps.peers {
627                 if !peer.knownBlocks.Has(hash.String()) {
628                         peers = append(peers, peer)
629                 }
630         }
631         return peers
632 }
633
634 func (ps *PeerSet) peersWithoutNewStatus(height uint64) []*Peer {
635         ps.mtx.RLock()
636         defer ps.mtx.RUnlock()
637
638         var peers []*Peer
639         for _, peer := range ps.peers {
640                 if peer.knownStatus < height {
641                         peers = append(peers, peer)
642                 }
643         }
644         return peers
645 }
646
647 func (ps *PeerSet) peersWithoutTx(hash *bc.Hash) []*Peer {
648         ps.mtx.RLock()
649         defer ps.mtx.RUnlock()
650
651         peers := []*Peer{}
652         for _, peer := range ps.peers {
653                 if !peer.knownTxs.Has(hash.String()) {
654                         peers = append(peers, peer)
655                 }
656         }
657         return peers
658 }
659
660 func (ps *PeerSet) RemovePeer(peerID string) {
661         ps.mtx.Lock()
662         delete(ps.peers, peerID)
663         ps.mtx.Unlock()
664         ps.StopPeerGracefully(peerID)
665 }
666
667 func (ps *PeerSet) SetStatus(peerID string, height uint64, hash *bc.Hash) {
668         peer := ps.GetPeer(peerID)
669         if peer == nil {
670                 return
671         }
672
673         peer.SetBestStatus(height, hash)
674 }
675
676 func (ps *PeerSet) SetIrreversibleStatus(peerID string, height uint64, hash *bc.Hash) {
677         peer := ps.GetPeer(peerID)
678         if peer == nil {
679                 return
680         }
681
682         peer.SetIrreversibleStatus(height, hash)
683 }
684
685 func (ps *PeerSet) Size() int {
686         ps.mtx.RLock()
687         defer ps.mtx.RUnlock()
688
689         return len(ps.peers)
690 }