OSDN Git Service

Peer add announces new block message num limit
[bytom/vapor.git] / netsync / peers / peer.go
1 package peers
2
3 import (
4         "encoding/hex"
5         "net"
6         "reflect"
7         "sync"
8
9         log "github.com/sirupsen/logrus"
10         "github.com/tendermint/tmlibs/flowrate"
11         "gopkg.in/fatih/set.v0"
12
13         "github.com/vapor/consensus"
14         "github.com/vapor/errors"
15         msgs "github.com/vapor/netsync/messages"
16         "github.com/vapor/protocol/bc"
17         "github.com/vapor/protocol/bc/types"
18 )
19
20 const (
21         maxKnownTxs           = 32768 // Maximum transactions hashes to keep in the known list (prevent DOS)
22         maxKnownSignatures    = 1024  // Maximum block signatures to keep in the known list (prevent DOS)
23         maxKnownBlocks        = 1024  // Maximum block hashes to keep in the known list (prevent DOS)
24         maxFilterAddressSize  = 50
25         maxFilterAddressCount = 1000
26
27         logModule = "peers"
28 )
29
30 var (
31         errSendStatusMsg = errors.New("send status msg fail")
32         ErrPeerMisbehave = errors.New("peer is misbehave")
33         ErrNoValidPeer   = errors.New("Can't find valid fast sync peer")
34 )
35
36 //BasePeer is the interface for connection level peer
37 type BasePeer interface {
38         Addr() net.Addr
39         ID() string
40         RemoteAddrHost() string
41         ServiceFlag() consensus.ServiceFlag
42         TrafficStatus() (*flowrate.Status, *flowrate.Status)
43         TrySend(byte, interface{}) bool
44         IsLAN() bool
45 }
46
47 //BasePeerSet is the intergace for connection level peer manager
48 type BasePeerSet interface {
49         StopPeerGracefully(string)
50         IsBanned(ip string, level byte, reason string) bool
51 }
52
53 type BroadcastMsg interface {
54         FilterTargetPeers(ps *PeerSet) []string
55         MarkSendRecord(ps *PeerSet, peers []string)
56         GetChan() byte
57         GetMsg() interface{}
58         MsgString() string
59 }
60
61 // PeerInfo indicate peer status snap
62 type PeerInfo struct {
63         ID                  string `json:"peer_id"`
64         RemoteAddr          string `json:"remote_addr"`
65         Height              uint64 `json:"height"`
66         Ping                string `json:"ping"`
67         Duration            string `json:"duration"`
68         TotalSent           int64  `json:"total_sent"`
69         TotalReceived       int64  `json:"total_received"`
70         AverageSentRate     int64  `json:"average_sent_rate"`
71         AverageReceivedRate int64  `json:"average_received_rate"`
72         CurrentSentRate     int64  `json:"current_sent_rate"`
73         CurrentReceivedRate int64  `json:"current_received_rate"`
74 }
75
76 type Peer struct {
77         BasePeer
78         mtx                sync.RWMutex
79         services           consensus.ServiceFlag
80         bestHeight         uint64
81         bestHash           *bc.Hash
82         irreversibleHeight uint64
83         irreversibleHash   *bc.Hash
84         knownTxs           *set.Set // Set of transaction hashes known to be known by this peer
85         knownBlocks        *set.Set // Set of block hashes known to be known by this peer
86         knownSignatures    *set.Set // Set of block signatures known to be known by this peer
87         knownStatus        uint64   // Set of chain status known to be known by this peer
88         filterAdds         *set.Set // Set of addresses that the spv node cares about.
89 }
90
91 func newPeer(basePeer BasePeer) *Peer {
92         return &Peer{
93                 BasePeer:        basePeer,
94                 services:        basePeer.ServiceFlag(),
95                 knownTxs:        set.New(),
96                 knownBlocks:     set.New(),
97                 knownSignatures: set.New(),
98                 filterAdds:      set.New(),
99         }
100 }
101
102 func (p *Peer) Height() uint64 {
103         p.mtx.RLock()
104         defer p.mtx.RUnlock()
105
106         return p.bestHeight
107 }
108
109 func (p *Peer) IrreversibleHeight() uint64 {
110         p.mtx.RLock()
111         defer p.mtx.RUnlock()
112
113         return p.irreversibleHeight
114 }
115
116 func (p *Peer) AddFilterAddress(address []byte) {
117         p.mtx.Lock()
118         defer p.mtx.Unlock()
119
120         if p.filterAdds.Size() >= maxFilterAddressCount {
121                 log.WithField("module", logModule).Warn("the count of filter addresses is greater than limit")
122                 return
123         }
124         if len(address) > maxFilterAddressSize {
125                 log.WithField("module", logModule).Warn("the size of filter address is greater than limit")
126                 return
127         }
128
129         p.filterAdds.Add(hex.EncodeToString(address))
130 }
131
132 func (p *Peer) AddFilterAddresses(addresses [][]byte) {
133         if !p.filterAdds.IsEmpty() {
134                 p.filterAdds.Clear()
135         }
136         for _, address := range addresses {
137                 p.AddFilterAddress(address)
138         }
139 }
140
141 func (p *Peer) FilterClear() {
142         p.filterAdds.Clear()
143 }
144
145 func (p *Peer) GetBlockByHeight(height uint64) bool {
146         msg := struct{ msgs.BlockchainMessage }{&msgs.GetBlockMessage{Height: height}}
147         return p.TrySend(msgs.BlockchainChannel, msg)
148 }
149
150 func (p *Peer) GetBlocks(locator []*bc.Hash, stopHash *bc.Hash) bool {
151         msg := struct{ msgs.BlockchainMessage }{msgs.NewGetBlocksMessage(locator, stopHash)}
152         return p.TrySend(msgs.BlockchainChannel, msg)
153 }
154
155 func (p *Peer) GetHeaders(locator []*bc.Hash, stopHash *bc.Hash, skip uint64) bool {
156         msg := struct{ msgs.BlockchainMessage }{msgs.NewGetHeadersMessage(locator, stopHash, skip)}
157         return p.TrySend(msgs.BlockchainChannel, msg)
158 }
159
160 func (p *Peer) GetPeerInfo() *PeerInfo {
161         p.mtx.RLock()
162         defer p.mtx.RUnlock()
163
164         sentStatus, receivedStatus := p.TrafficStatus()
165         ping := sentStatus.Idle - receivedStatus.Idle
166         if receivedStatus.Idle > sentStatus.Idle {
167                 ping = -ping
168         }
169
170         return &PeerInfo{
171                 ID:                  p.ID(),
172                 RemoteAddr:          p.Addr().String(),
173                 Height:              p.bestHeight,
174                 Ping:                ping.String(),
175                 Duration:            sentStatus.Duration.String(),
176                 TotalSent:           sentStatus.Bytes,
177                 TotalReceived:       receivedStatus.Bytes,
178                 AverageSentRate:     sentStatus.AvgRate,
179                 AverageReceivedRate: receivedStatus.AvgRate,
180                 CurrentSentRate:     sentStatus.CurRate,
181                 CurrentReceivedRate: receivedStatus.CurRate,
182         }
183 }
184
185 func (p *Peer) getRelatedTxAndStatus(txs []*types.Tx, txStatuses *bc.TransactionStatus) ([]*types.Tx, []*bc.TxVerifyResult) {
186         var relatedTxs []*types.Tx
187         var relatedStatuses []*bc.TxVerifyResult
188         for i, tx := range txs {
189                 if p.isRelatedTx(tx) {
190                         relatedTxs = append(relatedTxs, tx)
191                         relatedStatuses = append(relatedStatuses, txStatuses.VerifyStatus[i])
192                 }
193         }
194         return relatedTxs, relatedStatuses
195 }
196
197 func (p *Peer) isRelatedTx(tx *types.Tx) bool {
198         for _, input := range tx.Inputs {
199                 switch inp := input.TypedInput.(type) {
200                 case *types.SpendInput:
201                         if p.filterAdds.Has(hex.EncodeToString(inp.ControlProgram)) {
202                                 return true
203                         }
204                 }
205         }
206         for _, output := range tx.Outputs {
207                 if p.filterAdds.Has(hex.EncodeToString(output.ControlProgram())) {
208                         return true
209                 }
210         }
211         return false
212 }
213
214 func (p *Peer) isSPVNode() bool {
215         return !p.services.IsEnable(consensus.SFFullNode)
216 }
217
218 func (p *Peer) MarkBlock(hash *bc.Hash) {
219         p.mtx.Lock()
220         defer p.mtx.Unlock()
221
222         for p.knownBlocks.Size() >= maxKnownBlocks {
223                 p.knownBlocks.Pop()
224         }
225         p.knownBlocks.Add(hash.String())
226 }
227
228 func (p *Peer) markNewStatus(height uint64) {
229         p.mtx.Lock()
230         defer p.mtx.Unlock()
231
232         p.knownStatus = height
233 }
234
235 func (p *Peer) markSign(signature []byte) {
236         p.mtx.Lock()
237         defer p.mtx.Unlock()
238
239         for p.knownSignatures.Size() >= maxKnownSignatures {
240                 p.knownSignatures.Pop()
241         }
242         p.knownSignatures.Add(hex.EncodeToString(signature))
243 }
244
245 func (p *Peer) markTransaction(hash *bc.Hash) {
246         p.mtx.Lock()
247         defer p.mtx.Unlock()
248
249         for p.knownTxs.Size() >= maxKnownTxs {
250                 p.knownTxs.Pop()
251         }
252         p.knownTxs.Add(hash.String())
253 }
254
255 func (p *Peer) SendBlock(block *types.Block) (bool, error) {
256         msg, err := msgs.NewBlockMessage(block)
257         if err != nil {
258                 return false, errors.Wrap(err, "fail on NewBlockMessage")
259         }
260
261         ok := p.TrySend(msgs.BlockchainChannel, struct{ msgs.BlockchainMessage }{msg})
262         if ok {
263                 blcokHash := block.Hash()
264                 p.knownBlocks.Add(blcokHash.String())
265         }
266         return ok, nil
267 }
268
269 func (p *Peer) SendBlocks(blocks []*types.Block) (bool, error) {
270         msg, err := msgs.NewBlocksMessage(blocks)
271         if err != nil {
272                 return false, errors.Wrap(err, "fail on NewBlocksMessage")
273         }
274
275         if ok := p.TrySend(msgs.BlockchainChannel, struct{ msgs.BlockchainMessage }{msg}); !ok {
276                 return ok, nil
277         }
278
279         for _, block := range blocks {
280                 blcokHash := block.Hash()
281                 p.knownBlocks.Add(blcokHash.String())
282         }
283         return true, nil
284 }
285
286 func (p *Peer) SendHeaders(headers []*types.BlockHeader) (bool, error) {
287         msg, err := msgs.NewHeadersMessage(headers)
288         if err != nil {
289                 return false, errors.New("fail on NewHeadersMessage")
290         }
291
292         ok := p.TrySend(msgs.BlockchainChannel, struct{ msgs.BlockchainMessage }{msg})
293         return ok, nil
294 }
295
296 func (p *Peer) SendMerkleBlock(block *types.Block, txStatuses *bc.TransactionStatus) (bool, error) {
297         msg := msgs.NewMerkleBlockMessage()
298         if err := msg.SetRawBlockHeader(block.BlockHeader); err != nil {
299                 return false, err
300         }
301
302         relatedTxs, relatedStatuses := p.getRelatedTxAndStatus(block.Transactions, txStatuses)
303
304         txHashes, txFlags := types.GetTxMerkleTreeProof(block.Transactions, relatedTxs)
305         if err := msg.SetTxInfo(txHashes, txFlags, relatedTxs); err != nil {
306                 return false, nil
307         }
308
309         statusHashes := types.GetStatusMerkleTreeProof(txStatuses.VerifyStatus, txFlags)
310         if err := msg.SetStatusInfo(statusHashes, relatedStatuses); err != nil {
311                 return false, nil
312         }
313
314         ok := p.TrySend(msgs.BlockchainChannel, struct{ msgs.BlockchainMessage }{msg})
315         return ok, nil
316 }
317
318 func (p *Peer) SendTransactions(txs []*types.Tx) error {
319         validTxs := make([]*types.Tx, 0, len(txs))
320         for i, tx := range txs {
321                 if p.isSPVNode() && !p.isRelatedTx(tx) || p.knownTxs.Has(tx.ID.String()) {
322                         continue
323                 }
324
325                 validTxs = append(validTxs, tx)
326                 if len(validTxs) != msgs.TxsMsgMaxTxNum && i != len(txs)-1 {
327                         continue
328                 }
329
330                 msg, err := msgs.NewTransactionsMessage(validTxs)
331                 if err != nil {
332                         return err
333                 }
334
335                 if ok := p.TrySend(msgs.BlockchainChannel, struct{ msgs.BlockchainMessage }{msg}); !ok {
336                         return errors.New("failed to send txs msg")
337                 }
338
339                 for _, validTx := range validTxs {
340                         p.knownTxs.Add(validTx.ID.String())
341                 }
342
343                 validTxs = make([]*types.Tx, 0, len(txs))
344         }
345
346         return nil
347 }
348
349 func (p *Peer) SendStatus(bestHeader, irreversibleHeader *types.BlockHeader) error {
350         msg := msgs.NewStatusMessage(bestHeader, irreversibleHeader)
351         if ok := p.TrySend(msgs.BlockchainChannel, struct{ msgs.BlockchainMessage }{msg}); !ok {
352                 return errSendStatusMsg
353         }
354         p.markNewStatus(bestHeader.Height)
355         return nil
356 }
357
358 func (p *Peer) SetBestStatus(bestHeight uint64, bestHash *bc.Hash) {
359         p.mtx.Lock()
360         defer p.mtx.Unlock()
361
362         p.bestHeight = bestHeight
363         p.bestHash = bestHash
364 }
365
366 func (p *Peer) SetIrreversibleStatus(irreversibleHeight uint64, irreversibleHash *bc.Hash) {
367         p.mtx.Lock()
368         defer p.mtx.Unlock()
369
370         p.irreversibleHeight = irreversibleHeight
371         p.irreversibleHash = irreversibleHash
372 }
373
374 type PeerSet struct {
375         BasePeerSet
376         mtx   sync.RWMutex
377         peers map[string]*Peer
378 }
379
380 // newPeerSet creates a new peer set to track the active participants.
381 func NewPeerSet(basePeerSet BasePeerSet) *PeerSet {
382         return &PeerSet{
383                 BasePeerSet: basePeerSet,
384                 peers:       make(map[string]*Peer),
385         }
386 }
387
388 func (ps *PeerSet) ProcessIllegal(peerID string, level byte, reason string) {
389         ps.mtx.Lock()
390         peer := ps.peers[peerID]
391         ps.mtx.Unlock()
392
393         if peer == nil {
394                 return
395         }
396
397         if banned := ps.IsBanned(peer.RemoteAddrHost(), level, reason); banned {
398                 ps.RemovePeer(peerID)
399         }
400         return
401 }
402
403 func (ps *PeerSet) AddPeer(peer BasePeer) {
404         ps.mtx.Lock()
405         defer ps.mtx.Unlock()
406
407         if _, ok := ps.peers[peer.ID()]; !ok {
408                 ps.peers[peer.ID()] = newPeer(peer)
409                 return
410         }
411         log.WithField("module", logModule).Warning("add existing peer to blockKeeper")
412 }
413
414 func (ps *PeerSet) BestPeer(flag consensus.ServiceFlag) *Peer {
415         ps.mtx.RLock()
416         defer ps.mtx.RUnlock()
417
418         var bestPeer *Peer
419         for _, p := range ps.peers {
420                 if !p.services.IsEnable(flag) {
421                         continue
422                 }
423                 if bestPeer == nil || p.bestHeight > bestPeer.bestHeight || (p.bestHeight == bestPeer.bestHeight && p.IsLAN()) {
424                         bestPeer = p
425                 }
426         }
427         return bestPeer
428 }
429
430 func (ps *PeerSet) BestIrreversiblePeer(flag consensus.ServiceFlag) *Peer {
431         ps.mtx.RLock()
432         defer ps.mtx.RUnlock()
433
434         var bestPeer *Peer
435         for _, p := range ps.peers {
436                 if !p.services.IsEnable(flag) {
437                         continue
438                 }
439                 if bestPeer == nil || p.irreversibleHeight > bestPeer.irreversibleHeight || (p.irreversibleHeight == bestPeer.irreversibleHeight && p.IsLAN()) {
440                         bestPeer = p
441                 }
442         }
443         return bestPeer
444 }
445
446 //SendMsg send message to the target peer.
447 func (ps *PeerSet) SendMsg(peerID string, msgChannel byte, msg interface{}) bool {
448         peer := ps.GetPeer(peerID)
449         if peer == nil {
450                 return false
451         }
452
453         ok := peer.TrySend(msgChannel, msg)
454         if !ok {
455                 ps.RemovePeer(peerID)
456         }
457         return ok
458 }
459
460 //BroadcastMsg Broadcast message to the target peers
461 // and mark the message send record
462 func (ps *PeerSet) BroadcastMsg(bm BroadcastMsg) error {
463         //filter target peers
464         peers := bm.FilterTargetPeers(ps)
465
466         //broadcast to target peers
467         peersSuccess := make([]string, 0)
468         for _, peer := range peers {
469                 if ok := ps.SendMsg(peer, bm.GetChan(), bm.GetMsg()); !ok {
470                         log.WithFields(log.Fields{"module": logModule, "peer": peer, "type": reflect.TypeOf(bm.GetMsg()), "message": bm.MsgString()}).Warning("send message to peer error")
471                         continue
472                 }
473                 peersSuccess = append(peersSuccess, peer)
474         }
475
476         //mark the message send record
477         bm.MarkSendRecord(ps, peersSuccess)
478         return nil
479 }
480
481 func (ps *PeerSet) BroadcastNewStatus(bestHeader, irreversibleHeader *types.BlockHeader) error {
482         msg := msgs.NewStatusMessage(bestHeader, irreversibleHeader)
483         peers := ps.peersWithoutNewStatus(bestHeader.Height)
484         for _, peer := range peers {
485                 if ok := peer.TrySend(msgs.BlockchainChannel, struct{ msgs.BlockchainMessage }{msg}); !ok {
486                         ps.RemovePeer(peer.ID())
487                         continue
488                 }
489
490                 peer.markNewStatus(bestHeader.Height)
491         }
492         return nil
493 }
494
495 func (ps *PeerSet) BroadcastTx(tx *types.Tx) error {
496         msg, err := msgs.NewTransactionMessage(tx)
497         if err != nil {
498                 return errors.Wrap(err, "fail on broadcast tx")
499         }
500
501         peers := ps.peersWithoutTx(&tx.ID)
502         for _, peer := range peers {
503                 if peer.isSPVNode() && !peer.isRelatedTx(tx) {
504                         continue
505                 }
506                 if ok := peer.TrySend(msgs.BlockchainChannel, struct{ msgs.BlockchainMessage }{msg}); !ok {
507                         log.WithFields(log.Fields{
508                                 "module":  logModule,
509                                 "peer":    peer.Addr(),
510                                 "type":    reflect.TypeOf(msg),
511                                 "message": msg.String(),
512                         }).Warning("send message to peer error")
513                         ps.RemovePeer(peer.ID())
514                         continue
515                 }
516                 peer.markTransaction(&tx.ID)
517         }
518         return nil
519 }
520
521 // Peer retrieves the registered peer with the given id.
522 func (ps *PeerSet) GetPeer(id string) *Peer {
523         ps.mtx.RLock()
524         defer ps.mtx.RUnlock()
525         return ps.peers[id]
526 }
527
528 func (ps *PeerSet) GetPeersByHeight(height uint64) []*Peer {
529         ps.mtx.RLock()
530         defer ps.mtx.RUnlock()
531
532         peers := []*Peer{}
533         for _, peer := range ps.peers {
534                 if peer.Height() >= height {
535                         peers = append(peers, peer)
536                 }
537         }
538         return peers
539 }
540
541 func (ps *PeerSet) GetPeerInfos() []*PeerInfo {
542         ps.mtx.RLock()
543         defer ps.mtx.RUnlock()
544
545         result := []*PeerInfo{}
546         for _, peer := range ps.peers {
547                 result = append(result, peer.GetPeerInfo())
548         }
549         return result
550 }
551
552 func (ps *PeerSet) MarkBlock(peerID string, hash *bc.Hash) {
553         peer := ps.GetPeer(peerID)
554         if peer == nil {
555                 return
556         }
557         peer.MarkBlock(hash)
558 }
559
560 func (ps *PeerSet) MarkBlockSignature(peerID string, signature []byte) {
561         peer := ps.GetPeer(peerID)
562         if peer == nil {
563                 return
564         }
565         peer.markSign(signature)
566 }
567
568 func (ps *PeerSet) MarkStatus(peerID string, height uint64) {
569         peer := ps.GetPeer(peerID)
570         if peer == nil {
571                 return
572         }
573         peer.markNewStatus(height)
574 }
575
576 func (ps *PeerSet) MarkTx(peerID string, txHash bc.Hash) {
577         ps.mtx.Lock()
578         peer := ps.peers[peerID]
579         ps.mtx.Unlock()
580
581         if peer == nil {
582                 return
583         }
584         peer.markTransaction(&txHash)
585 }
586
587 func (ps *PeerSet) PeersWithoutBlock(hash bc.Hash) []string {
588         ps.mtx.RLock()
589         defer ps.mtx.RUnlock()
590
591         var peers []string
592         for _, peer := range ps.peers {
593                 if !peer.knownBlocks.Has(hash.String()) {
594                         peers = append(peers, peer.ID())
595                 }
596         }
597         return peers
598 }
599
600 func (ps *PeerSet) PeersWithoutSignature(signature []byte) []string {
601         ps.mtx.RLock()
602         defer ps.mtx.RUnlock()
603
604         var peers []string
605         for _, peer := range ps.peers {
606                 if !peer.knownSignatures.Has(hex.EncodeToString(signature)) {
607                         peers = append(peers, peer.ID())
608                 }
609         }
610         return peers
611 }
612
613 func (ps *PeerSet) peersWithoutNewStatus(height uint64) []*Peer {
614         ps.mtx.RLock()
615         defer ps.mtx.RUnlock()
616
617         var peers []*Peer
618         for _, peer := range ps.peers {
619                 if peer.knownStatus < height {
620                         peers = append(peers, peer)
621                 }
622         }
623         return peers
624 }
625
626 func (ps *PeerSet) peersWithoutTx(hash *bc.Hash) []*Peer {
627         ps.mtx.RLock()
628         defer ps.mtx.RUnlock()
629
630         peers := []*Peer{}
631         for _, peer := range ps.peers {
632                 if !peer.knownTxs.Has(hash.String()) {
633                         peers = append(peers, peer)
634                 }
635         }
636         return peers
637 }
638
639 func (ps *PeerSet) RemovePeer(peerID string) {
640         ps.mtx.Lock()
641         delete(ps.peers, peerID)
642         ps.mtx.Unlock()
643         ps.StopPeerGracefully(peerID)
644 }
645
646 func (ps *PeerSet) SetStatus(peerID string, height uint64, hash *bc.Hash) {
647         peer := ps.GetPeer(peerID)
648         if peer == nil {
649                 return
650         }
651
652         peer.SetBestStatus(height, hash)
653 }
654
655 func (ps *PeerSet) SetIrreversibleStatus(peerID string, height uint64, hash *bc.Hash) {
656         peer := ps.GetPeer(peerID)
657         if peer == nil {
658                 return
659         }
660
661         peer.SetIrreversibleStatus(height, hash)
662 }