OSDN Git Service

feat: add node discovery and status check (#374)
[bytom/vapor.git] / netsync / peers / peer.go
1 package peers
2
3 import (
4         "encoding/hex"
5         "net"
6         "reflect"
7         "sync"
8
9         log "github.com/sirupsen/logrus"
10         "github.com/tendermint/tmlibs/flowrate"
11         "gopkg.in/fatih/set.v0"
12
13         "github.com/vapor/consensus"
14         "github.com/vapor/errors"
15         msgs "github.com/vapor/netsync/messages"
16         "github.com/vapor/protocol/bc"
17         "github.com/vapor/protocol/bc/types"
18 )
19
20 const (
21         maxKnownTxs           = 32768 // Maximum transactions hashes to keep in the known list (prevent DOS)
22         maxKnownSignatures    = 1024  // Maximum block signatures to keep in the known list (prevent DOS)
23         maxKnownBlocks        = 1024  // Maximum block hashes to keep in the known list (prevent DOS)
24         maxFilterAddressSize  = 50
25         maxFilterAddressCount = 1000
26
27         logModule = "peers"
28 )
29
30 var (
31         errSendStatusMsg = errors.New("send status msg fail")
32         ErrPeerMisbehave = errors.New("peer is misbehave")
33         ErrNoValidPeer   = errors.New("Can't find valid fast sync peer")
34 )
35
36 //BasePeer is the interface for connection level peer
37 type BasePeer interface {
38         Moniker() string
39         Addr() net.Addr
40         ID() string
41         RemoteAddrHost() string
42         ServiceFlag() consensus.ServiceFlag
43         TrafficStatus() (*flowrate.Status, *flowrate.Status)
44         TrySend(byte, interface{}) bool
45         IsLAN() bool
46 }
47
48 //BasePeerSet is the intergace for connection level peer manager
49 type BasePeerSet interface {
50         StopPeerGracefully(string)
51         IsBanned(ip string, level byte, reason string) bool
52 }
53
54 type BroadcastMsg interface {
55         FilterTargetPeers(ps *PeerSet) []string
56         MarkSendRecord(ps *PeerSet, peers []string)
57         GetChan() byte
58         GetMsg() interface{}
59         MsgString() string
60 }
61
62 // PeerInfo indicate peer status snap
63 type PeerInfo struct {
64         ID                  string `json:"peer_id"`
65         Moniker             string `json:"moniker"`
66         RemoteAddr          string `json:"remote_addr"`
67         Height              uint64 `json:"height"`
68         Ping                string `json:"ping"`
69         Duration            string `json:"duration"`
70         TotalSent           int64  `json:"total_sent"`
71         TotalReceived       int64  `json:"total_received"`
72         AverageSentRate     int64  `json:"average_sent_rate"`
73         AverageReceivedRate int64  `json:"average_received_rate"`
74         CurrentSentRate     int64  `json:"current_sent_rate"`
75         CurrentReceivedRate int64  `json:"current_received_rate"`
76 }
77
78 type Peer struct {
79         BasePeer
80         mtx                sync.RWMutex
81         services           consensus.ServiceFlag
82         bestHeight         uint64
83         bestHash           *bc.Hash
84         irreversibleHeight uint64
85         irreversibleHash   *bc.Hash
86         knownTxs           *set.Set // Set of transaction hashes known to be known by this peer
87         knownBlocks        *set.Set // Set of block hashes known to be known by this peer
88         knownSignatures    *set.Set // Set of block signatures known to be known by this peer
89         knownStatus        uint64   // Set of chain status known to be known by this peer
90         filterAdds         *set.Set // Set of addresses that the spv node cares about.
91 }
92
93 func newPeer(basePeer BasePeer) *Peer {
94         return &Peer{
95                 BasePeer:        basePeer,
96                 services:        basePeer.ServiceFlag(),
97                 knownTxs:        set.New(),
98                 knownBlocks:     set.New(),
99                 knownSignatures: set.New(),
100                 filterAdds:      set.New(),
101         }
102 }
103
104 func (p *Peer) Height() uint64 {
105         p.mtx.RLock()
106         defer p.mtx.RUnlock()
107
108         return p.bestHeight
109 }
110
111 func (p *Peer) IrreversibleHeight() uint64 {
112         p.mtx.RLock()
113         defer p.mtx.RUnlock()
114
115         return p.irreversibleHeight
116 }
117
118 func (p *Peer) AddFilterAddress(address []byte) {
119         p.mtx.Lock()
120         defer p.mtx.Unlock()
121
122         if p.filterAdds.Size() >= maxFilterAddressCount {
123                 log.WithField("module", logModule).Warn("the count of filter addresses is greater than limit")
124                 return
125         }
126         if len(address) > maxFilterAddressSize {
127                 log.WithField("module", logModule).Warn("the size of filter address is greater than limit")
128                 return
129         }
130
131         p.filterAdds.Add(hex.EncodeToString(address))
132 }
133
134 func (p *Peer) AddFilterAddresses(addresses [][]byte) {
135         if !p.filterAdds.IsEmpty() {
136                 p.filterAdds.Clear()
137         }
138         for _, address := range addresses {
139                 p.AddFilterAddress(address)
140         }
141 }
142
143 func (p *Peer) FilterClear() {
144         p.filterAdds.Clear()
145 }
146
147 func (p *Peer) GetBlockByHeight(height uint64) bool {
148         msg := struct{ msgs.BlockchainMessage }{&msgs.GetBlockMessage{Height: height}}
149         return p.TrySend(msgs.BlockchainChannel, msg)
150 }
151
152 func (p *Peer) GetBlocks(locator []*bc.Hash, stopHash *bc.Hash) bool {
153         msg := struct{ msgs.BlockchainMessage }{msgs.NewGetBlocksMessage(locator, stopHash)}
154         return p.TrySend(msgs.BlockchainChannel, msg)
155 }
156
157 func (p *Peer) GetHeaders(locator []*bc.Hash, stopHash *bc.Hash, skip uint64) bool {
158         msg := struct{ msgs.BlockchainMessage }{msgs.NewGetHeadersMessage(locator, stopHash, skip)}
159         return p.TrySend(msgs.BlockchainChannel, msg)
160 }
161
162 func (p *Peer) GetPeerInfo() *PeerInfo {
163         p.mtx.RLock()
164         defer p.mtx.RUnlock()
165
166         sentStatus, receivedStatus := p.TrafficStatus()
167         ping := sentStatus.Idle - receivedStatus.Idle
168         if receivedStatus.Idle > sentStatus.Idle {
169                 ping = -ping
170         }
171
172         return &PeerInfo{
173                 ID:                  p.ID(),
174                 Moniker:             p.BasePeer.Moniker(),
175                 RemoteAddr:          p.Addr().String(),
176                 Height:              p.bestHeight,
177                 Ping:                ping.String(),
178                 Duration:            sentStatus.Duration.String(),
179                 TotalSent:           sentStatus.Bytes,
180                 TotalReceived:       receivedStatus.Bytes,
181                 AverageSentRate:     sentStatus.AvgRate,
182                 AverageReceivedRate: receivedStatus.AvgRate,
183                 CurrentSentRate:     sentStatus.CurRate,
184                 CurrentReceivedRate: receivedStatus.CurRate,
185         }
186 }
187
188 func (p *Peer) getRelatedTxAndStatus(txs []*types.Tx, txStatuses *bc.TransactionStatus) ([]*types.Tx, []*bc.TxVerifyResult) {
189         var relatedTxs []*types.Tx
190         var relatedStatuses []*bc.TxVerifyResult
191         for i, tx := range txs {
192                 if p.isRelatedTx(tx) {
193                         relatedTxs = append(relatedTxs, tx)
194                         relatedStatuses = append(relatedStatuses, txStatuses.VerifyStatus[i])
195                 }
196         }
197         return relatedTxs, relatedStatuses
198 }
199
200 func (p *Peer) isRelatedTx(tx *types.Tx) bool {
201         for _, input := range tx.Inputs {
202                 switch inp := input.TypedInput.(type) {
203                 case *types.SpendInput:
204                         if p.filterAdds.Has(hex.EncodeToString(inp.ControlProgram)) {
205                                 return true
206                         }
207                 }
208         }
209         for _, output := range tx.Outputs {
210                 if p.filterAdds.Has(hex.EncodeToString(output.ControlProgram())) {
211                         return true
212                 }
213         }
214         return false
215 }
216
217 func (p *Peer) isSPVNode() bool {
218         return !p.services.IsEnable(consensus.SFFullNode)
219 }
220
221 func (p *Peer) MarkBlock(hash *bc.Hash) {
222         p.mtx.Lock()
223         defer p.mtx.Unlock()
224
225         for p.knownBlocks.Size() >= maxKnownBlocks {
226                 p.knownBlocks.Pop()
227         }
228         p.knownBlocks.Add(hash.String())
229 }
230
231 func (p *Peer) markNewStatus(height uint64) {
232         p.mtx.Lock()
233         defer p.mtx.Unlock()
234
235         p.knownStatus = height
236 }
237
238 func (p *Peer) markSign(signature []byte) {
239         p.mtx.Lock()
240         defer p.mtx.Unlock()
241
242         for p.knownSignatures.Size() >= maxKnownSignatures {
243                 p.knownSignatures.Pop()
244         }
245         p.knownSignatures.Add(hex.EncodeToString(signature))
246 }
247
248 func (p *Peer) markTransaction(hash *bc.Hash) {
249         p.mtx.Lock()
250         defer p.mtx.Unlock()
251
252         for p.knownTxs.Size() >= maxKnownTxs {
253                 p.knownTxs.Pop()
254         }
255         p.knownTxs.Add(hash.String())
256 }
257
258 func (p *Peer) SendBlock(block *types.Block) (bool, error) {
259         msg, err := msgs.NewBlockMessage(block)
260         if err != nil {
261                 return false, errors.Wrap(err, "fail on NewBlockMessage")
262         }
263
264         ok := p.TrySend(msgs.BlockchainChannel, struct{ msgs.BlockchainMessage }{msg})
265         if ok {
266                 blcokHash := block.Hash()
267                 p.knownBlocks.Add(blcokHash.String())
268         }
269         return ok, nil
270 }
271
272 func (p *Peer) SendBlocks(blocks []*types.Block) (bool, error) {
273         msg, err := msgs.NewBlocksMessage(blocks)
274         if err != nil {
275                 return false, errors.Wrap(err, "fail on NewBlocksMessage")
276         }
277
278         if ok := p.TrySend(msgs.BlockchainChannel, struct{ msgs.BlockchainMessage }{msg}); !ok {
279                 return ok, nil
280         }
281
282         for _, block := range blocks {
283                 blcokHash := block.Hash()
284                 p.knownBlocks.Add(blcokHash.String())
285         }
286         return true, nil
287 }
288
289 func (p *Peer) SendHeaders(headers []*types.BlockHeader) (bool, error) {
290         msg, err := msgs.NewHeadersMessage(headers)
291         if err != nil {
292                 return false, errors.New("fail on NewHeadersMessage")
293         }
294
295         ok := p.TrySend(msgs.BlockchainChannel, struct{ msgs.BlockchainMessage }{msg})
296         return ok, nil
297 }
298
299 func (p *Peer) SendMerkleBlock(block *types.Block, txStatuses *bc.TransactionStatus) (bool, error) {
300         msg := msgs.NewMerkleBlockMessage()
301         if err := msg.SetRawBlockHeader(block.BlockHeader); err != nil {
302                 return false, err
303         }
304
305         relatedTxs, relatedStatuses := p.getRelatedTxAndStatus(block.Transactions, txStatuses)
306
307         txHashes, txFlags := types.GetTxMerkleTreeProof(block.Transactions, relatedTxs)
308         if err := msg.SetTxInfo(txHashes, txFlags, relatedTxs); err != nil {
309                 return false, nil
310         }
311
312         statusHashes := types.GetStatusMerkleTreeProof(txStatuses.VerifyStatus, txFlags)
313         if err := msg.SetStatusInfo(statusHashes, relatedStatuses); err != nil {
314                 return false, nil
315         }
316
317         ok := p.TrySend(msgs.BlockchainChannel, struct{ msgs.BlockchainMessage }{msg})
318         return ok, nil
319 }
320
321 func (p *Peer) SendTransactions(txs []*types.Tx) error {
322         validTxs := make([]*types.Tx, 0, len(txs))
323         for i, tx := range txs {
324                 if p.isSPVNode() && !p.isRelatedTx(tx) || p.knownTxs.Has(tx.ID.String()) {
325                         continue
326                 }
327
328                 validTxs = append(validTxs, tx)
329                 if len(validTxs) != msgs.TxsMsgMaxTxNum && i != len(txs)-1 {
330                         continue
331                 }
332
333                 msg, err := msgs.NewTransactionsMessage(validTxs)
334                 if err != nil {
335                         return err
336                 }
337
338                 if ok := p.TrySend(msgs.BlockchainChannel, struct{ msgs.BlockchainMessage }{msg}); !ok {
339                         return errors.New("failed to send txs msg")
340                 }
341
342                 for _, validTx := range validTxs {
343                         p.knownTxs.Add(validTx.ID.String())
344                 }
345
346                 validTxs = make([]*types.Tx, 0, len(txs))
347         }
348
349         return nil
350 }
351
352 func (p *Peer) SendStatus(bestHeader, irreversibleHeader *types.BlockHeader) error {
353         msg := msgs.NewStatusMessage(bestHeader, irreversibleHeader)
354         if ok := p.TrySend(msgs.BlockchainChannel, struct{ msgs.BlockchainMessage }{msg}); !ok {
355                 return errSendStatusMsg
356         }
357         p.markNewStatus(bestHeader.Height)
358         return nil
359 }
360
361 func (p *Peer) SetBestStatus(bestHeight uint64, bestHash *bc.Hash) {
362         p.mtx.Lock()
363         defer p.mtx.Unlock()
364
365         p.bestHeight = bestHeight
366         p.bestHash = bestHash
367 }
368
369 func (p *Peer) SetIrreversibleStatus(irreversibleHeight uint64, irreversibleHash *bc.Hash) {
370         p.mtx.Lock()
371         defer p.mtx.Unlock()
372
373         p.irreversibleHeight = irreversibleHeight
374         p.irreversibleHash = irreversibleHash
375 }
376
377 type PeerSet struct {
378         BasePeerSet
379         mtx   sync.RWMutex
380         peers map[string]*Peer
381 }
382
383 // newPeerSet creates a new peer set to track the active participants.
384 func NewPeerSet(basePeerSet BasePeerSet) *PeerSet {
385         return &PeerSet{
386                 BasePeerSet: basePeerSet,
387                 peers:       make(map[string]*Peer),
388         }
389 }
390
391 func (ps *PeerSet) ProcessIllegal(peerID string, level byte, reason string) {
392         ps.mtx.Lock()
393         peer := ps.peers[peerID]
394         ps.mtx.Unlock()
395
396         if peer == nil {
397                 return
398         }
399
400         if banned := ps.IsBanned(peer.RemoteAddrHost(), level, reason); banned {
401                 ps.RemovePeer(peerID)
402         }
403         return
404 }
405
406 func (ps *PeerSet) AddPeer(peer BasePeer) {
407         ps.mtx.Lock()
408         defer ps.mtx.Unlock()
409
410         if _, ok := ps.peers[peer.ID()]; !ok {
411                 ps.peers[peer.ID()] = newPeer(peer)
412                 return
413         }
414         log.WithField("module", logModule).Warning("add existing peer to blockKeeper")
415 }
416
417 func (ps *PeerSet) BestPeer(flag consensus.ServiceFlag) *Peer {
418         ps.mtx.RLock()
419         defer ps.mtx.RUnlock()
420
421         var bestPeer *Peer
422         for _, p := range ps.peers {
423                 if !p.services.IsEnable(flag) {
424                         continue
425                 }
426                 if bestPeer == nil || p.bestHeight > bestPeer.bestHeight || (p.bestHeight == bestPeer.bestHeight && p.IsLAN()) {
427                         bestPeer = p
428                 }
429         }
430         return bestPeer
431 }
432
433 func (ps *PeerSet) BestIrreversiblePeer(flag consensus.ServiceFlag) *Peer {
434         ps.mtx.RLock()
435         defer ps.mtx.RUnlock()
436
437         var bestPeer *Peer
438         for _, p := range ps.peers {
439                 if !p.services.IsEnable(flag) {
440                         continue
441                 }
442                 if bestPeer == nil || p.irreversibleHeight > bestPeer.irreversibleHeight || (p.irreversibleHeight == bestPeer.irreversibleHeight && p.IsLAN()) {
443                         bestPeer = p
444                 }
445         }
446         return bestPeer
447 }
448
449 //SendMsg send message to the target peer.
450 func (ps *PeerSet) SendMsg(peerID string, msgChannel byte, msg interface{}) bool {
451         peer := ps.GetPeer(peerID)
452         if peer == nil {
453                 return false
454         }
455
456         ok := peer.TrySend(msgChannel, msg)
457         if !ok {
458                 ps.RemovePeer(peerID)
459         }
460         return ok
461 }
462
463 //BroadcastMsg Broadcast message to the target peers
464 // and mark the message send record
465 func (ps *PeerSet) BroadcastMsg(bm BroadcastMsg) error {
466         //filter target peers
467         peers := bm.FilterTargetPeers(ps)
468
469         //broadcast to target peers
470         peersSuccess := make([]string, 0)
471         for _, peer := range peers {
472                 if ok := ps.SendMsg(peer, bm.GetChan(), bm.GetMsg()); !ok {
473                         log.WithFields(log.Fields{"module": logModule, "peer": peer, "type": reflect.TypeOf(bm.GetMsg()), "message": bm.MsgString()}).Warning("send message to peer error")
474                         continue
475                 }
476                 peersSuccess = append(peersSuccess, peer)
477         }
478
479         //mark the message send record
480         bm.MarkSendRecord(ps, peersSuccess)
481         return nil
482 }
483
484 func (ps *PeerSet) BroadcastNewStatus(bestHeader, irreversibleHeader *types.BlockHeader) error {
485         msg := msgs.NewStatusMessage(bestHeader, irreversibleHeader)
486         peers := ps.peersWithoutNewStatus(bestHeader.Height)
487         for _, peer := range peers {
488                 if ok := peer.TrySend(msgs.BlockchainChannel, struct{ msgs.BlockchainMessage }{msg}); !ok {
489                         ps.RemovePeer(peer.ID())
490                         continue
491                 }
492
493                 peer.markNewStatus(bestHeader.Height)
494         }
495         return nil
496 }
497
498 func (ps *PeerSet) BroadcastTx(tx *types.Tx) error {
499         msg, err := msgs.NewTransactionMessage(tx)
500         if err != nil {
501                 return errors.Wrap(err, "fail on broadcast tx")
502         }
503
504         peers := ps.peersWithoutTx(&tx.ID)
505         for _, peer := range peers {
506                 if peer.isSPVNode() && !peer.isRelatedTx(tx) {
507                         continue
508                 }
509                 if ok := peer.TrySend(msgs.BlockchainChannel, struct{ msgs.BlockchainMessage }{msg}); !ok {
510                         log.WithFields(log.Fields{
511                                 "module":  logModule,
512                                 "peer":    peer.Addr(),
513                                 "type":    reflect.TypeOf(msg),
514                                 "message": msg.String(),
515                         }).Warning("send message to peer error")
516                         ps.RemovePeer(peer.ID())
517                         continue
518                 }
519                 peer.markTransaction(&tx.ID)
520         }
521         return nil
522 }
523
524 // Peer retrieves the registered peer with the given id.
525 func (ps *PeerSet) GetPeer(id string) *Peer {
526         ps.mtx.RLock()
527         defer ps.mtx.RUnlock()
528         return ps.peers[id]
529 }
530
531 func (ps *PeerSet) GetPeersByHeight(height uint64) []*Peer {
532         ps.mtx.RLock()
533         defer ps.mtx.RUnlock()
534
535         peers := []*Peer{}
536         for _, peer := range ps.peers {
537                 if peer.Height() >= height {
538                         peers = append(peers, peer)
539                 }
540         }
541         return peers
542 }
543
544 func (ps *PeerSet) GetPeerInfos() []*PeerInfo {
545         ps.mtx.RLock()
546         defer ps.mtx.RUnlock()
547
548         result := []*PeerInfo{}
549         for _, peer := range ps.peers {
550                 result = append(result, peer.GetPeerInfo())
551         }
552         return result
553 }
554
555 func (ps *PeerSet) MarkBlock(peerID string, hash *bc.Hash) {
556         peer := ps.GetPeer(peerID)
557         if peer == nil {
558                 return
559         }
560         peer.MarkBlock(hash)
561 }
562
563 func (ps *PeerSet) MarkBlockSignature(peerID string, signature []byte) {
564         peer := ps.GetPeer(peerID)
565         if peer == nil {
566                 return
567         }
568         peer.markSign(signature)
569 }
570
571 func (ps *PeerSet) MarkStatus(peerID string, height uint64) {
572         peer := ps.GetPeer(peerID)
573         if peer == nil {
574                 return
575         }
576         peer.markNewStatus(height)
577 }
578
579 func (ps *PeerSet) MarkTx(peerID string, txHash bc.Hash) {
580         ps.mtx.Lock()
581         peer := ps.peers[peerID]
582         ps.mtx.Unlock()
583
584         if peer == nil {
585                 return
586         }
587         peer.markTransaction(&txHash)
588 }
589
590 func (ps *PeerSet) PeersWithoutBlock(hash bc.Hash) []string {
591         ps.mtx.RLock()
592         defer ps.mtx.RUnlock()
593
594         var peers []string
595         for _, peer := range ps.peers {
596                 if !peer.knownBlocks.Has(hash.String()) {
597                         peers = append(peers, peer.ID())
598                 }
599         }
600         return peers
601 }
602
603 func (ps *PeerSet) PeersWithoutSignature(signature []byte) []string {
604         ps.mtx.RLock()
605         defer ps.mtx.RUnlock()
606
607         var peers []string
608         for _, peer := range ps.peers {
609                 if !peer.knownSignatures.Has(hex.EncodeToString(signature)) {
610                         peers = append(peers, peer.ID())
611                 }
612         }
613         return peers
614 }
615
616 func (ps *PeerSet) peersWithoutNewStatus(height uint64) []*Peer {
617         ps.mtx.RLock()
618         defer ps.mtx.RUnlock()
619
620         var peers []*Peer
621         for _, peer := range ps.peers {
622                 if peer.knownStatus < height {
623                         peers = append(peers, peer)
624                 }
625         }
626         return peers
627 }
628
629 func (ps *PeerSet) peersWithoutTx(hash *bc.Hash) []*Peer {
630         ps.mtx.RLock()
631         defer ps.mtx.RUnlock()
632
633         peers := []*Peer{}
634         for _, peer := range ps.peers {
635                 if !peer.knownTxs.Has(hash.String()) {
636                         peers = append(peers, peer)
637                 }
638         }
639         return peers
640 }
641
642 func (ps *PeerSet) RemovePeer(peerID string) {
643         ps.mtx.Lock()
644         delete(ps.peers, peerID)
645         ps.mtx.Unlock()
646         ps.StopPeerGracefully(peerID)
647 }
648
649 func (ps *PeerSet) SetStatus(peerID string, height uint64, hash *bc.Hash) {
650         peer := ps.GetPeer(peerID)
651         if peer == nil {
652                 return
653         }
654
655         peer.SetBestStatus(height, hash)
656 }
657
658 func (ps *PeerSet) SetIrreversibleStatus(peerID string, height uint64, hash *bc.Hash) {
659         peer := ps.GetPeer(peerID)
660         if peer == nil {
661                 return
662         }
663
664         peer.SetIrreversibleStatus(height, hash)
665 }