OSDN Git Service

e74a458927f398f63e47f942961e7252dae20603
[bytom/vapor.git] / netsync / peers / peer.go
1 package peers
2
3 import (
4         "encoding/hex"
5         "net"
6         "reflect"
7         "sync"
8
9         log "github.com/sirupsen/logrus"
10         "github.com/tendermint/tmlibs/flowrate"
11         "gopkg.in/fatih/set.v0"
12
13         "github.com/vapor/consensus"
14         "github.com/vapor/errors"
15         msgs "github.com/vapor/netsync/messages"
16         "github.com/vapor/protocol/bc"
17         "github.com/vapor/protocol/bc/types"
18 )
19
20 const (
21         maxKnownTxs           = 32768 // Maximum transactions hashes to keep in the known list (prevent DOS)
22         maxKnownSignatures    = 1024  // Maximum block signatures to keep in the known list (prevent DOS)
23         maxKnownBlocks        = 1024  // Maximum block hashes to keep in the known list (prevent DOS)
24         maxFilterAddressSize  = 50
25         maxFilterAddressCount = 1000
26
27         logModule = "peers"
28 )
29
30 var (
31         errSendStatusMsg = errors.New("send status msg fail")
32         ErrPeerMisbehave = errors.New("peer is misbehave")
33 )
34
35 //BasePeer is the interface for connection level peer
36 type BasePeer interface {
37         Addr() net.Addr
38         ID() string
39         RemoteAddrHost() string
40         ServiceFlag() consensus.ServiceFlag
41         TrafficStatus() (*flowrate.Status, *flowrate.Status)
42         TrySend(byte, interface{}) bool
43         IsLAN() bool
44 }
45
46 //BasePeerSet is the intergace for connection level peer manager
47 type BasePeerSet interface {
48         StopPeerGracefully(string)
49         IsBanned(ip string, level byte, reason string) bool
50 }
51
52 type BroadcastMsg interface {
53         FilterTargetPeers(ps *PeerSet) []string
54         MarkSendRecord(ps *PeerSet, peers []string)
55         GetChan() byte
56         GetMsg() interface{}
57         MsgString() string
58 }
59
60 // PeerInfo indicate peer status snap
61 type PeerInfo struct {
62         ID                  string `json:"peer_id"`
63         RemoteAddr          string `json:"remote_addr"`
64         Height              uint64 `json:"height"`
65         Ping                string `json:"ping"`
66         Duration            string `json:"duration"`
67         TotalSent           int64  `json:"total_sent"`
68         TotalReceived       int64  `json:"total_received"`
69         AverageSentRate     int64  `json:"average_sent_rate"`
70         AverageReceivedRate int64  `json:"average_received_rate"`
71         CurrentSentRate     int64  `json:"current_sent_rate"`
72         CurrentReceivedRate int64  `json:"current_received_rate"`
73 }
74
75 type Peer struct {
76         BasePeer
77         mtx                sync.RWMutex
78         services           consensus.ServiceFlag
79         bestHeight         uint64
80         bestHash           *bc.Hash
81         irreversibleHeight uint64
82         irreversibleHash   *bc.Hash
83         knownTxs           *set.Set // Set of transaction hashes known to be known by this peer
84         knownBlocks        *set.Set // Set of block hashes known to be known by this peer
85         knownSignatures    *set.Set // Set of block signatures known to be known by this peer
86         knownStatus        uint64   // Set of chain status known to be known by this peer
87         filterAdds         *set.Set // Set of addresses that the spv node cares about.
88 }
89
90 func newPeer(basePeer BasePeer) *Peer {
91         return &Peer{
92                 BasePeer:        basePeer,
93                 services:        basePeer.ServiceFlag(),
94                 knownTxs:        set.New(),
95                 knownBlocks:     set.New(),
96                 knownSignatures: set.New(),
97                 filterAdds:      set.New(),
98         }
99 }
100
101 func (p *Peer) Height() uint64 {
102         p.mtx.RLock()
103         defer p.mtx.RUnlock()
104
105         return p.bestHeight
106 }
107
108 func (p *Peer) IrreversibleHeight() uint64 {
109         p.mtx.RLock()
110         defer p.mtx.RUnlock()
111
112         return p.irreversibleHeight
113 }
114
115 func (p *Peer) AddFilterAddress(address []byte) {
116         p.mtx.Lock()
117         defer p.mtx.Unlock()
118
119         if p.filterAdds.Size() >= maxFilterAddressCount {
120                 log.WithField("module", logModule).Warn("the count of filter addresses is greater than limit")
121                 return
122         }
123         if len(address) > maxFilterAddressSize {
124                 log.WithField("module", logModule).Warn("the size of filter address is greater than limit")
125                 return
126         }
127
128         p.filterAdds.Add(hex.EncodeToString(address))
129 }
130
131 func (p *Peer) AddFilterAddresses(addresses [][]byte) {
132         if !p.filterAdds.IsEmpty() {
133                 p.filterAdds.Clear()
134         }
135         for _, address := range addresses {
136                 p.AddFilterAddress(address)
137         }
138 }
139
140 func (p *Peer) FilterClear() {
141         p.filterAdds.Clear()
142 }
143
144 func (p *Peer) GetBlockByHeight(height uint64) bool {
145         msg := struct{ msgs.BlockchainMessage }{&msgs.GetBlockMessage{Height: height}}
146         return p.TrySend(msgs.BlockchainChannel, msg)
147 }
148
149 func (p *Peer) GetBlocks(locator []*bc.Hash, stopHash *bc.Hash) bool {
150         msg := struct{ msgs.BlockchainMessage }{msgs.NewGetBlocksMessage(locator, stopHash)}
151         return p.TrySend(msgs.BlockchainChannel, msg)
152 }
153
154 func (p *Peer) GetHeaders(locator []*bc.Hash, stopHash *bc.Hash, skip uint64) bool {
155         msg := struct{ msgs.BlockchainMessage }{msgs.NewGetHeadersMessage(locator, stopHash, skip)}
156         return p.TrySend(msgs.BlockchainChannel, msg)
157 }
158
159 func (p *Peer) GetPeerInfo() *PeerInfo {
160         p.mtx.RLock()
161         defer p.mtx.RUnlock()
162
163         sentStatus, receivedStatus := p.TrafficStatus()
164         ping := sentStatus.Idle - receivedStatus.Idle
165         if receivedStatus.Idle > sentStatus.Idle {
166                 ping = -ping
167         }
168
169         return &PeerInfo{
170                 ID:                  p.ID(),
171                 RemoteAddr:          p.Addr().String(),
172                 Height:              p.bestHeight,
173                 Ping:                ping.String(),
174                 Duration:            sentStatus.Duration.String(),
175                 TotalSent:           sentStatus.Bytes,
176                 TotalReceived:       receivedStatus.Bytes,
177                 AverageSentRate:     sentStatus.AvgRate,
178                 AverageReceivedRate: receivedStatus.AvgRate,
179                 CurrentSentRate:     sentStatus.CurRate,
180                 CurrentReceivedRate: receivedStatus.CurRate,
181         }
182 }
183
184 func (p *Peer) getRelatedTxAndStatus(txs []*types.Tx, txStatuses *bc.TransactionStatus) ([]*types.Tx, []*bc.TxVerifyResult) {
185         var relatedTxs []*types.Tx
186         var relatedStatuses []*bc.TxVerifyResult
187         for i, tx := range txs {
188                 if p.isRelatedTx(tx) {
189                         relatedTxs = append(relatedTxs, tx)
190                         relatedStatuses = append(relatedStatuses, txStatuses.VerifyStatus[i])
191                 }
192         }
193         return relatedTxs, relatedStatuses
194 }
195
196 func (p *Peer) isRelatedTx(tx *types.Tx) bool {
197         for _, input := range tx.Inputs {
198                 switch inp := input.TypedInput.(type) {
199                 case *types.SpendInput:
200                         if p.filterAdds.Has(hex.EncodeToString(inp.ControlProgram)) {
201                                 return true
202                         }
203                 }
204         }
205         for _, output := range tx.Outputs {
206                 if p.filterAdds.Has(hex.EncodeToString(output.ControlProgram())) {
207                         return true
208                 }
209         }
210         return false
211 }
212
213 func (p *Peer) isSPVNode() bool {
214         return !p.services.IsEnable(consensus.SFFullNode)
215 }
216
217 func (p *Peer) MarkBlock(hash *bc.Hash) {
218         p.mtx.Lock()
219         defer p.mtx.Unlock()
220
221         for p.knownBlocks.Size() >= maxKnownBlocks {
222                 p.knownBlocks.Pop()
223         }
224         p.knownBlocks.Add(hash.String())
225 }
226
227 func (p *Peer) markNewStatus(height uint64) {
228         p.mtx.Lock()
229         defer p.mtx.Unlock()
230
231         p.knownStatus = height
232 }
233
234 func (p *Peer) markSign(signature []byte) {
235         p.mtx.Lock()
236         defer p.mtx.Unlock()
237
238         for p.knownSignatures.Size() >= maxKnownSignatures {
239                 p.knownSignatures.Pop()
240         }
241         p.knownSignatures.Add(hex.EncodeToString(signature))
242 }
243
244 func (p *Peer) markTransaction(hash *bc.Hash) {
245         p.mtx.Lock()
246         defer p.mtx.Unlock()
247
248         for p.knownTxs.Size() >= maxKnownTxs {
249                 p.knownTxs.Pop()
250         }
251         p.knownTxs.Add(hash.String())
252 }
253
254 func (ps *PeerSet) PeersWithoutBlock(hash bc.Hash) []string {
255         ps.mtx.RLock()
256         defer ps.mtx.RUnlock()
257
258         var peers []string
259         for _, peer := range ps.peers {
260                 if !peer.knownBlocks.Has(hash.String()) {
261                         peers = append(peers, peer.ID())
262                 }
263         }
264         return peers
265 }
266
267 func (ps *PeerSet) PeersWithoutSign(signature []byte) []string {
268         ps.mtx.RLock()
269         defer ps.mtx.RUnlock()
270
271         var peers []string
272         for _, peer := range ps.peers {
273                 if !peer.knownSignatures.Has(hex.EncodeToString(signature)) {
274                         peers = append(peers, peer.ID())
275                 }
276         }
277         return peers
278 }
279
280 func (p *Peer) SendBlock(block *types.Block) (bool, error) {
281         msg, err := msgs.NewBlockMessage(block)
282         if err != nil {
283                 return false, errors.Wrap(err, "fail on NewBlockMessage")
284         }
285
286         ok := p.TrySend(msgs.BlockchainChannel, struct{ msgs.BlockchainMessage }{msg})
287         if ok {
288                 blcokHash := block.Hash()
289                 p.knownBlocks.Add(blcokHash.String())
290         }
291         return ok, nil
292 }
293
294 func (p *Peer) SendBlocks(blocks []*types.Block) (bool, error) {
295         msg, err := msgs.NewBlocksMessage(blocks)
296         if err != nil {
297                 return false, errors.Wrap(err, "fail on NewBlocksMessage")
298         }
299
300         if ok := p.TrySend(msgs.BlockchainChannel, struct{ msgs.BlockchainMessage }{msg}); !ok {
301                 return ok, nil
302         }
303
304         for _, block := range blocks {
305                 blcokHash := block.Hash()
306                 p.knownBlocks.Add(blcokHash.String())
307         }
308         return true, nil
309 }
310
311 func (p *Peer) SendHeaders(headers []*types.BlockHeader) (bool, error) {
312         msg, err := msgs.NewHeadersMessage(headers)
313         if err != nil {
314                 return false, errors.New("fail on NewHeadersMessage")
315         }
316
317         ok := p.TrySend(msgs.BlockchainChannel, struct{ msgs.BlockchainMessage }{msg})
318         return ok, nil
319 }
320
321 func (p *Peer) SendMerkleBlock(block *types.Block, txStatuses *bc.TransactionStatus) (bool, error) {
322         msg := msgs.NewMerkleBlockMessage()
323         if err := msg.SetRawBlockHeader(block.BlockHeader); err != nil {
324                 return false, err
325         }
326
327         relatedTxs, relatedStatuses := p.getRelatedTxAndStatus(block.Transactions, txStatuses)
328
329         txHashes, txFlags := types.GetTxMerkleTreeProof(block.Transactions, relatedTxs)
330         if err := msg.SetTxInfo(txHashes, txFlags, relatedTxs); err != nil {
331                 return false, nil
332         }
333
334         statusHashes := types.GetStatusMerkleTreeProof(txStatuses.VerifyStatus, txFlags)
335         if err := msg.SetStatusInfo(statusHashes, relatedStatuses); err != nil {
336                 return false, nil
337         }
338
339         ok := p.TrySend(msgs.BlockchainChannel, struct{ msgs.BlockchainMessage }{msg})
340         return ok, nil
341 }
342
343 func (p *Peer) SendTransactions(txs []*types.Tx) error {
344         validTxs := make([]*types.Tx, 0, len(txs))
345         for i, tx := range txs {
346                 if p.isSPVNode() && !p.isRelatedTx(tx) || p.knownTxs.Has(tx.ID.String()) {
347                         continue
348                 }
349
350                 validTxs = append(validTxs, tx)
351                 if len(validTxs) != msgs.TxsMsgMaxTxNum && i != len(txs)-1 {
352                         continue
353                 }
354
355                 msg, err := msgs.NewTransactionsMessage(validTxs)
356                 if err != nil {
357                         return err
358                 }
359
360                 if ok := p.TrySend(msgs.BlockchainChannel, struct{ msgs.BlockchainMessage }{msg}); !ok {
361                         return errors.New("failed to send txs msg")
362                 }
363
364                 for _, validTx := range validTxs {
365                         p.knownTxs.Add(validTx.ID.String())
366                 }
367
368                 validTxs = make([]*types.Tx, 0, len(txs))
369         }
370
371         return nil
372 }
373
374 func (p *Peer) SendStatus(bestHeader, irreversibleHeader *types.BlockHeader) error {
375         msg := msgs.NewStatusMessage(bestHeader, irreversibleHeader)
376         if ok := p.TrySend(msgs.BlockchainChannel, struct{ msgs.BlockchainMessage }{msg}); !ok {
377                 return errSendStatusMsg
378         }
379         p.markNewStatus(bestHeader.Height)
380         return nil
381 }
382
383 func (p *Peer) SetBestStatus(bestHeight uint64, bestHash *bc.Hash) {
384         p.mtx.Lock()
385         defer p.mtx.Unlock()
386
387         p.bestHeight = bestHeight
388         p.bestHash = bestHash
389 }
390
391 func (p *Peer) SetIrreversibleStatus(irreversibleHeight uint64, irreversibleHash *bc.Hash) {
392         p.mtx.Lock()
393         defer p.mtx.Unlock()
394
395         p.irreversibleHeight = irreversibleHeight
396         p.irreversibleHash = irreversibleHash
397 }
398
399 type PeerSet struct {
400         BasePeerSet
401         mtx   sync.RWMutex
402         peers map[string]*Peer
403 }
404
405 // newPeerSet creates a new peer set to track the active participants.
406 func NewPeerSet(basePeerSet BasePeerSet) *PeerSet {
407         return &PeerSet{
408                 BasePeerSet: basePeerSet,
409                 peers:       make(map[string]*Peer),
410         }
411 }
412
413 func (ps *PeerSet) ProcessIllegal(peerID string, level byte, reason string) {
414         ps.mtx.Lock()
415         peer := ps.peers[peerID]
416         ps.mtx.Unlock()
417
418         if peer == nil {
419                 return
420         }
421
422         if banned := ps.IsBanned(peer.RemoteAddrHost(), level, reason); banned {
423                 ps.RemovePeer(peerID)
424         }
425         return
426 }
427
428 func (ps *PeerSet) AddPeer(peer BasePeer) {
429         ps.mtx.Lock()
430         defer ps.mtx.Unlock()
431
432         if _, ok := ps.peers[peer.ID()]; !ok {
433                 ps.peers[peer.ID()] = newPeer(peer)
434                 return
435         }
436         log.WithField("module", logModule).Warning("add existing peer to blockKeeper")
437 }
438
439 func (ps *PeerSet) BestPeer(flag consensus.ServiceFlag) *Peer {
440         ps.mtx.RLock()
441         defer ps.mtx.RUnlock()
442
443         var bestPeer *Peer
444         for _, p := range ps.peers {
445                 if !p.services.IsEnable(flag) {
446                         continue
447                 }
448                 if bestPeer == nil || p.bestHeight > bestPeer.bestHeight || (p.bestHeight == bestPeer.bestHeight && p.IsLAN()) {
449                         bestPeer = p
450                 }
451         }
452         return bestPeer
453 }
454
455 func (ps *PeerSet) BestIrreversiblePeer(flag consensus.ServiceFlag) *Peer {
456         ps.mtx.RLock()
457         defer ps.mtx.RUnlock()
458
459         var bestPeer *Peer
460         for _, p := range ps.peers {
461                 if !p.services.IsEnable(flag) {
462                         continue
463                 }
464                 if bestPeer == nil || p.irreversibleHeight > bestPeer.irreversibleHeight || (p.irreversibleHeight == bestPeer.irreversibleHeight && p.IsLAN()) {
465                         bestPeer = p
466                 }
467         }
468         return bestPeer
469 }
470
471 //SendMsg send message to the target peer.
472 func (ps *PeerSet) SendMsg(peerID string, msgChannel byte, msg interface{}) bool {
473         peer := ps.GetPeer(peerID)
474         if peer == nil {
475                 return false
476         }
477
478         ok := peer.TrySend(msgChannel, msg)
479         if !ok {
480                 ps.RemovePeer(peerID)
481         }
482         return ok
483 }
484
485 //BroadcastMsg Broadcast message to the target peers
486 // and mark the message send record
487 func (ps *PeerSet) BroadcastMsg(bm BroadcastMsg) error {
488         //filter target peers
489         peers := bm.FilterTargetPeers(ps)
490
491         //broadcast to target peers
492         peersSuccess := make([]string, 0)
493         for _, peer := range peers {
494                 if ok := ps.SendMsg(peer, bm.GetChan(), bm.GetMsg()); !ok {
495                         log.WithFields(log.Fields{"module": logModule, "peer": peer, "type": reflect.TypeOf(bm.GetMsg()), "message": bm.MsgString()}).Warning("send message to peer error")
496                         continue
497                 }
498                 peersSuccess = append(peersSuccess, peer)
499         }
500
501         //mark the message send record
502         bm.MarkSendRecord(ps, peersSuccess)
503         return nil
504 }
505
506 func (ps *PeerSet) BroadcastNewStatus(bestHeader, irreversibleHeader *types.BlockHeader) error {
507         msg := msgs.NewStatusMessage(bestHeader, irreversibleHeader)
508         peers := ps.peersWithoutNewStatus(bestHeader.Height)
509         for _, peer := range peers {
510                 if ok := peer.TrySend(msgs.BlockchainChannel, struct{ msgs.BlockchainMessage }{msg}); !ok {
511                         ps.RemovePeer(peer.ID())
512                         continue
513                 }
514
515                 peer.markNewStatus(bestHeader.Height)
516         }
517         return nil
518 }
519
520 func (ps *PeerSet) BroadcastTx(tx *types.Tx) error {
521         msg, err := msgs.NewTransactionMessage(tx)
522         if err != nil {
523                 return errors.Wrap(err, "fail on broadcast tx")
524         }
525
526         peers := ps.peersWithoutTx(&tx.ID)
527         for _, peer := range peers {
528                 if peer.isSPVNode() && !peer.isRelatedTx(tx) {
529                         continue
530                 }
531                 if ok := peer.TrySend(msgs.BlockchainChannel, struct{ msgs.BlockchainMessage }{msg}); !ok {
532                         log.WithFields(log.Fields{
533                                 "module":  logModule,
534                                 "peer":    peer.Addr(),
535                                 "type":    reflect.TypeOf(msg),
536                                 "message": msg.String(),
537                         }).Warning("send message to peer error")
538                         ps.RemovePeer(peer.ID())
539                         continue
540                 }
541                 peer.markTransaction(&tx.ID)
542         }
543         return nil
544 }
545
546 func (ps *PeerSet) ErrorHandler(peerID string, level byte, err error) {
547         if errors.Root(err) == ErrPeerMisbehave {
548                 ps.ProcessIllegal(peerID, level, err.Error())
549         } else {
550                 ps.RemovePeer(peerID)
551         }
552 }
553
554 // Peer retrieves the registered peer with the given id.
555 func (ps *PeerSet) GetPeer(id string) *Peer {
556         ps.mtx.RLock()
557         defer ps.mtx.RUnlock()
558         return ps.peers[id]
559 }
560
561 func (ps *PeerSet) GetPeerInfos() []*PeerInfo {
562         ps.mtx.RLock()
563         defer ps.mtx.RUnlock()
564
565         result := []*PeerInfo{}
566         for _, peer := range ps.peers {
567                 result = append(result, peer.GetPeerInfo())
568         }
569         return result
570 }
571
572 func (ps *PeerSet) MarkBlock(peerID string, hash *bc.Hash) {
573         peer := ps.GetPeer(peerID)
574         if peer == nil {
575                 return
576         }
577         peer.MarkBlock(hash)
578 }
579
580 func (ps *PeerSet) MarkBlockSignature(peerID string, signature []byte) {
581         peer := ps.GetPeer(peerID)
582         if peer == nil {
583                 return
584         }
585         peer.markSign(signature)
586 }
587
588 func (ps *PeerSet) MarkStatus(peerID string, height uint64) {
589         peer := ps.GetPeer(peerID)
590         if peer == nil {
591                 return
592         }
593         peer.markNewStatus(height)
594 }
595
596 func (ps *PeerSet) MarkTx(peerID string, txHash bc.Hash) {
597         ps.mtx.Lock()
598         peer := ps.peers[peerID]
599         ps.mtx.Unlock()
600
601         if peer == nil {
602                 return
603         }
604         peer.markTransaction(&txHash)
605 }
606
607 func (ps *PeerSet) peersWithoutBlock(hash *bc.Hash) []*Peer {
608         ps.mtx.RLock()
609         defer ps.mtx.RUnlock()
610
611         peers := []*Peer{}
612         for _, peer := range ps.peers {
613                 if !peer.knownBlocks.Has(hash.String()) {
614                         peers = append(peers, peer)
615                 }
616         }
617         return peers
618 }
619
620 func (ps *PeerSet) peersWithoutNewStatus(height uint64) []*Peer {
621         ps.mtx.RLock()
622         defer ps.mtx.RUnlock()
623
624         var peers []*Peer
625         for _, peer := range ps.peers {
626                 if peer.knownStatus < height {
627                         peers = append(peers, peer)
628                 }
629         }
630         return peers
631 }
632
633 func (ps *PeerSet) peersWithoutTx(hash *bc.Hash) []*Peer {
634         ps.mtx.RLock()
635         defer ps.mtx.RUnlock()
636
637         peers := []*Peer{}
638         for _, peer := range ps.peers {
639                 if !peer.knownTxs.Has(hash.String()) {
640                         peers = append(peers, peer)
641                 }
642         }
643         return peers
644 }
645
646 func (ps *PeerSet) RemovePeer(peerID string) {
647         ps.mtx.Lock()
648         delete(ps.peers, peerID)
649         ps.mtx.Unlock()
650         ps.StopPeerGracefully(peerID)
651 }
652
653 func (ps *PeerSet) SetStatus(peerID string, height uint64, hash *bc.Hash) {
654         peer := ps.GetPeer(peerID)
655         if peer == nil {
656                 return
657         }
658
659         peer.SetBestStatus(height, hash)
660 }