OSDN Git Service

fk
[bytom/vapor.git] / netsync / peers / peer.go
1 package peers
2
3 import (
4         "encoding/hex"
5         "net"
6         "reflect"
7         "sync"
8
9         log "github.com/sirupsen/logrus"
10         "github.com/tendermint/tmlibs/flowrate"
11         "gopkg.in/fatih/set.v0"
12
13         "github.com/vapor/consensus"
14         "github.com/vapor/errors"
15         msgs "github.com/vapor/netsync/messages"
16         "github.com/vapor/protocol/bc"
17         "github.com/vapor/protocol/bc/types"
18 )
19
20 const (
21         maxKnownTxs           = 32768 // Maximum transactions hashes to keep in the known list (prevent DOS)
22         maxKnownSignatures    = 1024  // Maximum block signatures to keep in the known list (prevent DOS)
23         maxKnownBlocks        = 1024  // Maximum block hashes to keep in the known list (prevent DOS)
24         maxFilterAddressSize  = 50
25         maxFilterAddressCount = 1000
26
27         logModule = "peers"
28 )
29
30 var (
31         errSendStatusMsg = errors.New("send status msg fail")
32         ErrPeerMisbehave = errors.New("peer is misbehave")
33         ErrNoValidPeer   = errors.New("Can't find valid fast sync peer")
34 )
35
36 //BasePeer is the interface for connection level peer
37 type BasePeer interface {
38         Addr() net.Addr
39         ID() string
40         RemoteAddrHost() string
41         ServiceFlag() consensus.ServiceFlag
42         TrafficStatus() (*flowrate.Status, *flowrate.Status)
43         TrySend(byte, interface{}) bool
44         IsLAN() bool
45 }
46
47 //BasePeerSet is the intergace for connection level peer manager
48 type BasePeerSet interface {
49         StopPeerGracefully(string)
50         IsBanned(ip string, level byte, reason string) bool
51 }
52
53 type BroadcastMsg interface {
54         FilterTargetPeers(ps *PeerSet) []string
55         MarkSendRecord(ps *PeerSet, peers []string)
56         GetChan() byte
57         GetMsg() interface{}
58         MsgString() string
59 }
60
61 // PeerInfo indicate peer status snap
62 type PeerInfo struct {
63         ID                  string `json:"peer_id"`
64         RemoteAddr          string `json:"remote_addr"`
65         Height              uint64 `json:"height"`
66         Ping                string `json:"ping"`
67         Duration            string `json:"duration"`
68         TotalSent           int64  `json:"total_sent"`
69         TotalReceived       int64  `json:"total_received"`
70         AverageSentRate     int64  `json:"average_sent_rate"`
71         AverageReceivedRate int64  `json:"average_received_rate"`
72         CurrentSentRate     int64  `json:"current_sent_rate"`
73         CurrentReceivedRate int64  `json:"current_received_rate"`
74 }
75
76 type Peer struct {
77         BasePeer
78         mtx                sync.RWMutex
79         services           consensus.ServiceFlag
80         bestHeight         uint64
81         bestHash           *bc.Hash
82         irreversibleHeight uint64
83         irreversibleHash   *bc.Hash
84         knownTxs           *set.Set // Set of transaction hashes known to be known by this peer
85         knownBlocks        *set.Set // Set of block hashes known to be known by this peer
86         knownSignatures    *set.Set // Set of block signatures known to be known by this peer
87         knownStatus        uint64   // Set of chain status known to be known by this peer
88         filterAdds         *set.Set // Set of addresses that the spv node cares about.
89 }
90
91 func newPeer(basePeer BasePeer) *Peer {
92         return &Peer{
93                 BasePeer:        basePeer,
94                 services:        basePeer.ServiceFlag(),
95                 knownTxs:        set.New(),
96                 knownBlocks:     set.New(),
97                 knownSignatures: set.New(),
98                 filterAdds:      set.New(),
99         }
100 }
101
102 func (p *Peer) Height() uint64 {
103         p.mtx.RLock()
104         defer p.mtx.RUnlock()
105
106         return p.bestHeight
107 }
108
109 func (p *Peer) IrreversibleHeight() uint64 {
110         p.mtx.RLock()
111         defer p.mtx.RUnlock()
112
113         return p.irreversibleHeight
114 }
115
116 func (p *Peer) AddFilterAddress(address []byte) {
117         p.mtx.Lock()
118         defer p.mtx.Unlock()
119
120         if p.filterAdds.Size() >= maxFilterAddressCount {
121                 log.WithField("module", logModule).Warn("the count of filter addresses is greater than limit")
122                 return
123         }
124         if len(address) > maxFilterAddressSize {
125                 log.WithField("module", logModule).Warn("the size of filter address is greater than limit")
126                 return
127         }
128
129         p.filterAdds.Add(hex.EncodeToString(address))
130 }
131
132 func (p *Peer) AddFilterAddresses(addresses [][]byte) {
133         if !p.filterAdds.IsEmpty() {
134                 p.filterAdds.Clear()
135         }
136         for _, address := range addresses {
137                 p.AddFilterAddress(address)
138         }
139 }
140
141 func (p *Peer) FilterClear() {
142         p.filterAdds.Clear()
143 }
144
145 func (p *Peer) GetBlockByHeight(height uint64) bool {
146         msg := struct{ msgs.BlockchainMessage }{&msgs.GetBlockMessage{Height: height}}
147         return p.TrySend(msgs.BlockchainChannel, msg)
148 }
149
150 func (p *Peer) GetBlocks(locator []*bc.Hash, stopHash *bc.Hash) bool {
151         msg := struct{ msgs.BlockchainMessage }{msgs.NewGetBlocksMessage(locator, stopHash)}
152         return p.TrySend(msgs.BlockchainChannel, msg)
153 }
154
155 func (p *Peer) GetHeaders(locator []*bc.Hash, stopHash *bc.Hash, skip uint64) bool {
156         msg := struct{ msgs.BlockchainMessage }{msgs.NewGetHeadersMessage(locator, stopHash, skip)}
157         return p.TrySend(msgs.BlockchainChannel, msg)
158 }
159
160 func (p *Peer) GetPeerInfo() *PeerInfo {
161         p.mtx.RLock()
162         defer p.mtx.RUnlock()
163
164         sentStatus, receivedStatus := p.TrafficStatus()
165         ping := sentStatus.Idle - receivedStatus.Idle
166         if receivedStatus.Idle > sentStatus.Idle {
167                 ping = -ping
168         }
169
170         return &PeerInfo{
171                 ID:                  p.ID(),
172                 RemoteAddr:          p.Addr().String(),
173                 Height:              p.bestHeight,
174                 Ping:                ping.String(),
175                 Duration:            sentStatus.Duration.String(),
176                 TotalSent:           sentStatus.Bytes,
177                 TotalReceived:       receivedStatus.Bytes,
178                 AverageSentRate:     sentStatus.AvgRate,
179                 AverageReceivedRate: receivedStatus.AvgRate,
180                 CurrentSentRate:     sentStatus.CurRate,
181                 CurrentReceivedRate: receivedStatus.CurRate,
182         }
183 }
184
185 func (p *Peer) getRelatedTxAndStatus(txs []*types.Tx, txStatuses *bc.TransactionStatus) ([]*types.Tx, []*bc.TxVerifyResult) {
186         var relatedTxs []*types.Tx
187         var relatedStatuses []*bc.TxVerifyResult
188         for i, tx := range txs {
189                 if p.isRelatedTx(tx) {
190                         relatedTxs = append(relatedTxs, tx)
191                         relatedStatuses = append(relatedStatuses, txStatuses.VerifyStatus[i])
192                 }
193         }
194         return relatedTxs, relatedStatuses
195 }
196
197 func (p *Peer) isRelatedTx(tx *types.Tx) bool {
198         for _, input := range tx.Inputs {
199                 switch inp := input.TypedInput.(type) {
200                 case *types.SpendInput:
201                         if p.filterAdds.Has(hex.EncodeToString(inp.ControlProgram)) {
202                                 return true
203                         }
204                 }
205         }
206         for _, output := range tx.Outputs {
207                 if p.filterAdds.Has(hex.EncodeToString(output.ControlProgram())) {
208                         return true
209                 }
210         }
211         return false
212 }
213
214 func (p *Peer) isSPVNode() bool {
215         return !p.services.IsEnable(consensus.SFFullNode)
216 }
217
218 func (p *Peer) MarkBlock(hash *bc.Hash) {
219         p.mtx.Lock()
220         defer p.mtx.Unlock()
221
222         for p.knownBlocks.Size() >= maxKnownBlocks {
223                 p.knownBlocks.Pop()
224         }
225         p.knownBlocks.Add(hash.String())
226 }
227
228 func (p *Peer) markNewStatus(height uint64) {
229         p.mtx.Lock()
230         defer p.mtx.Unlock()
231
232         p.knownStatus = height
233 }
234
235 func (p *Peer) markSign(signature []byte) {
236         p.mtx.Lock()
237         defer p.mtx.Unlock()
238
239         for p.knownSignatures.Size() >= maxKnownSignatures {
240                 p.knownSignatures.Pop()
241         }
242         p.knownSignatures.Add(hex.EncodeToString(signature))
243 }
244
245 func (p *Peer) markTransaction(hash *bc.Hash) {
246         p.mtx.Lock()
247         defer p.mtx.Unlock()
248
249         for p.knownTxs.Size() >= maxKnownTxs {
250                 p.knownTxs.Pop()
251         }
252         p.knownTxs.Add(hash.String())
253 }
254
255 func (p *Peer) SendBlock(block *types.Block) (bool, error) {
256         msg, err := msgs.NewBlockMessage(block)
257         if err != nil {
258                 return false, errors.Wrap(err, "fail on NewBlockMessage")
259         }
260
261         ok := p.TrySend(msgs.BlockchainChannel, struct{ msgs.BlockchainMessage }{msg})
262         if ok {
263                 blcokHash := block.Hash()
264                 p.knownBlocks.Add(blcokHash.String())
265         }
266         return ok, nil
267 }
268
269 func (p *Peer) SendBlocks(blocks []*types.Block) (bool, error) {
270         msg, err := msgs.NewBlocksMessage(blocks)
271         if err != nil {
272                 return false, errors.Wrap(err, "fail on NewBlocksMessage")
273         }
274
275         if ok := p.TrySend(msgs.BlockchainChannel, struct{ msgs.BlockchainMessage }{msg}); !ok {
276                 return ok, nil
277         }
278
279         for _, block := range blocks {
280                 blcokHash := block.Hash()
281                 p.knownBlocks.Add(blcokHash.String())
282         }
283         return true, nil
284 }
285
286 func (p *Peer) SendHeaders(headers []*types.BlockHeader) (bool, error) {
287         msg, err := msgs.NewHeadersMessage(headers)
288         if err != nil {
289                 return false, errors.New("fail on NewHeadersMessage")
290         }
291
292         ok := p.TrySend(msgs.BlockchainChannel, struct{ msgs.BlockchainMessage }{msg})
293         return ok, nil
294 }
295
296 func (p *Peer) SendMerkleBlock(block *types.Block, txStatuses *bc.TransactionStatus) (bool, error) {
297         msg := msgs.NewMerkleBlockMessage()
298         if err := msg.SetRawBlockHeader(block.BlockHeader); err != nil {
299                 return false, err
300         }
301
302         relatedTxs, relatedStatuses := p.getRelatedTxAndStatus(block.Transactions, txStatuses)
303
304         txHashes, txFlags := types.GetTxMerkleTreeProof(block.Transactions, relatedTxs)
305         if err := msg.SetTxInfo(txHashes, txFlags, relatedTxs); err != nil {
306                 return false, nil
307         }
308
309         statusHashes := types.GetStatusMerkleTreeProof(txStatuses.VerifyStatus, txFlags)
310         if err := msg.SetStatusInfo(statusHashes, relatedStatuses); err != nil {
311                 return false, nil
312         }
313
314         ok := p.TrySend(msgs.BlockchainChannel, struct{ msgs.BlockchainMessage }{msg})
315         return ok, nil
316 }
317
318 func (p *Peer) SendTransactions(txs []*types.Tx) error {
319         validTxs := make([]*types.Tx, 0, len(txs))
320         for i, tx := range txs {
321                 if p.isSPVNode() && !p.isRelatedTx(tx) || p.knownTxs.Has(tx.ID.String()) {
322                         continue
323                 }
324
325                 validTxs = append(validTxs, tx)
326                 if len(validTxs) != msgs.TxsMsgMaxTxNum && i != len(txs)-1 {
327                         continue
328                 }
329
330                 msg, err := msgs.NewTransactionsMessage(validTxs)
331                 if err != nil {
332                         return err
333                 }
334
335                 if ok := p.TrySend(msgs.BlockchainChannel, struct{ msgs.BlockchainMessage }{msg}); !ok {
336                         return errors.New("failed to send txs msg")
337                 }
338
339                 for _, validTx := range validTxs {
340                         p.knownTxs.Add(validTx.ID.String())
341                 }
342
343                 validTxs = make([]*types.Tx, 0, len(txs))
344         }
345
346         return nil
347 }
348
349 func (p *Peer) SendStatus(bestHeader, irreversibleHeader *types.BlockHeader) error {
350         msg := msgs.NewStatusMessage(bestHeader, irreversibleHeader)
351         if ok := p.TrySend(msgs.BlockchainChannel, struct{ msgs.BlockchainMessage }{msg}); !ok {
352                 return errSendStatusMsg
353         }
354         p.markNewStatus(bestHeader.Height)
355         return nil
356 }
357
358 func (p *Peer) SetBestStatus(bestHeight uint64, bestHash *bc.Hash) {
359         p.mtx.Lock()
360         defer p.mtx.Unlock()
361
362         p.bestHeight = bestHeight
363         p.bestHash = bestHash
364 }
365
366 func (p *Peer) SetIrreversibleStatus(irreversibleHeight uint64, irreversibleHash *bc.Hash) {
367         p.mtx.Lock()
368         defer p.mtx.Unlock()
369
370         p.irreversibleHeight = irreversibleHeight
371         p.irreversibleHash = irreversibleHash
372 }
373
374 type PeerSet struct {
375         BasePeerSet
376         mtx   sync.RWMutex
377         peers map[string]*Peer
378 }
379
380 // newPeerSet creates a new peer set to track the active participants.
381 func NewPeerSet(basePeerSet BasePeerSet) *PeerSet {
382         return &PeerSet{
383                 BasePeerSet: basePeerSet,
384                 peers:       make(map[string]*Peer),
385         }
386 }
387
388 func (ps *PeerSet) ProcessIllegal(peerID string, level byte, reason string) {
389         ps.mtx.Lock()
390         peer := ps.peers[peerID]
391         ps.mtx.Unlock()
392
393         if peer == nil {
394                 return
395         }
396
397         if banned := ps.IsBanned(peer.RemoteAddrHost(), level, reason); banned {
398                 ps.RemovePeer(peerID)
399         }
400         return
401 }
402
403 func (ps *PeerSet) AddPeer(peer BasePeer) {
404         ps.mtx.Lock()
405         defer ps.mtx.Unlock()
406
407         if _, ok := ps.peers[peer.ID()]; !ok {
408                 ps.peers[peer.ID()] = newPeer(peer)
409                 return
410         }
411         // TODO: change back to warn
412         log.WithField("module", logModule).Debug("add existing peer to blockKeeper")
413 }
414
415 func (ps *PeerSet) BestPeer(flag consensus.ServiceFlag) *Peer {
416         ps.mtx.RLock()
417         defer ps.mtx.RUnlock()
418
419         var bestPeer *Peer
420         for _, p := range ps.peers {
421                 if !p.services.IsEnable(flag) {
422                         continue
423                 }
424                 if bestPeer == nil || p.bestHeight > bestPeer.bestHeight || (p.bestHeight == bestPeer.bestHeight && p.IsLAN()) {
425                         bestPeer = p
426                 }
427         }
428         return bestPeer
429 }
430
431 func (ps *PeerSet) BestIrreversiblePeer(flag consensus.ServiceFlag) *Peer {
432         ps.mtx.RLock()
433         defer ps.mtx.RUnlock()
434
435         var bestPeer *Peer
436         for _, p := range ps.peers {
437                 if !p.services.IsEnable(flag) {
438                         continue
439                 }
440                 if bestPeer == nil || p.irreversibleHeight > bestPeer.irreversibleHeight || (p.irreversibleHeight == bestPeer.irreversibleHeight && p.IsLAN()) {
441                         bestPeer = p
442                 }
443         }
444         return bestPeer
445 }
446
447 //SendMsg send message to the target peer.
448 func (ps *PeerSet) SendMsg(peerID string, msgChannel byte, msg interface{}) bool {
449         peer := ps.GetPeer(peerID)
450         if peer == nil {
451                 return false
452         }
453
454         ok := peer.TrySend(msgChannel, msg)
455         if !ok {
456                 ps.RemovePeer(peerID)
457         }
458         return ok
459 }
460
461 //BroadcastMsg Broadcast message to the target peers
462 // and mark the message send record
463 func (ps *PeerSet) BroadcastMsg(bm BroadcastMsg) error {
464         //filter target peers
465         peers := bm.FilterTargetPeers(ps)
466
467         //broadcast to target peers
468         peersSuccess := make([]string, 0)
469         for _, peer := range peers {
470                 if ok := ps.SendMsg(peer, bm.GetChan(), bm.GetMsg()); !ok {
471                         log.WithFields(log.Fields{"module": logModule, "peer": peer, "type": reflect.TypeOf(bm.GetMsg()), "message": bm.MsgString()}).Warning("send message to peer error")
472                         continue
473                 }
474                 peersSuccess = append(peersSuccess, peer)
475         }
476
477         //mark the message send record
478         bm.MarkSendRecord(ps, peersSuccess)
479         return nil
480 }
481
482 func (ps *PeerSet) BroadcastNewStatus(bestHeader, irreversibleHeader *types.BlockHeader) error {
483         msg := msgs.NewStatusMessage(bestHeader, irreversibleHeader)
484         peers := ps.peersWithoutNewStatus(bestHeader.Height)
485         for _, peer := range peers {
486                 if ok := peer.TrySend(msgs.BlockchainChannel, struct{ msgs.BlockchainMessage }{msg}); !ok {
487                         ps.RemovePeer(peer.ID())
488                         continue
489                 }
490
491                 peer.markNewStatus(bestHeader.Height)
492         }
493         return nil
494 }
495
496 func (ps *PeerSet) BroadcastTx(tx *types.Tx) error {
497         msg, err := msgs.NewTransactionMessage(tx)
498         if err != nil {
499                 return errors.Wrap(err, "fail on broadcast tx")
500         }
501
502         peers := ps.peersWithoutTx(&tx.ID)
503         for _, peer := range peers {
504                 if peer.isSPVNode() && !peer.isRelatedTx(tx) {
505                         continue
506                 }
507                 if ok := peer.TrySend(msgs.BlockchainChannel, struct{ msgs.BlockchainMessage }{msg}); !ok {
508                         log.WithFields(log.Fields{
509                                 "module":  logModule,
510                                 "peer":    peer.Addr(),
511                                 "type":    reflect.TypeOf(msg),
512                                 "message": msg.String(),
513                         }).Warning("send message to peer error")
514                         ps.RemovePeer(peer.ID())
515                         continue
516                 }
517                 peer.markTransaction(&tx.ID)
518         }
519         return nil
520 }
521
522 // Peer retrieves the registered peer with the given id.
523 func (ps *PeerSet) GetPeer(id string) *Peer {
524         ps.mtx.RLock()
525         defer ps.mtx.RUnlock()
526         return ps.peers[id]
527 }
528
529 func (ps *PeerSet) GetPeersByHeight(height uint64) []*Peer {
530         ps.mtx.RLock()
531         defer ps.mtx.RUnlock()
532
533         peers := []*Peer{}
534         for _, peer := range ps.peers {
535                 if peer.Height() >= height {
536                         peers = append(peers, peer)
537                 }
538         }
539         return peers
540 }
541
542 func (ps *PeerSet) GetPeerInfos() []*PeerInfo {
543         ps.mtx.RLock()
544         defer ps.mtx.RUnlock()
545
546         result := []*PeerInfo{}
547         for _, peer := range ps.peers {
548                 result = append(result, peer.GetPeerInfo())
549         }
550         return result
551 }
552
553 func (ps *PeerSet) MarkBlock(peerID string, hash *bc.Hash) {
554         peer := ps.GetPeer(peerID)
555         if peer == nil {
556                 return
557         }
558         peer.MarkBlock(hash)
559 }
560
561 func (ps *PeerSet) MarkBlockSignature(peerID string, signature []byte) {
562         peer := ps.GetPeer(peerID)
563         if peer == nil {
564                 return
565         }
566         peer.markSign(signature)
567 }
568
569 func (ps *PeerSet) MarkStatus(peerID string, height uint64) {
570         peer := ps.GetPeer(peerID)
571         if peer == nil {
572                 return
573         }
574         peer.markNewStatus(height)
575 }
576
577 func (ps *PeerSet) MarkTx(peerID string, txHash bc.Hash) {
578         ps.mtx.Lock()
579         peer := ps.peers[peerID]
580         ps.mtx.Unlock()
581
582         if peer == nil {
583                 return
584         }
585         peer.markTransaction(&txHash)
586 }
587
588 func (ps *PeerSet) PeersWithoutBlock(hash bc.Hash) []string {
589         ps.mtx.RLock()
590         defer ps.mtx.RUnlock()
591
592         var peers []string
593         for _, peer := range ps.peers {
594                 if !peer.knownBlocks.Has(hash.String()) {
595                         peers = append(peers, peer.ID())
596                 }
597         }
598         return peers
599 }
600
601 func (ps *PeerSet) PeersWithoutSignature(signature []byte) []string {
602         ps.mtx.RLock()
603         defer ps.mtx.RUnlock()
604
605         var peers []string
606         for _, peer := range ps.peers {
607                 if !peer.knownSignatures.Has(hex.EncodeToString(signature)) {
608                         peers = append(peers, peer.ID())
609                 }
610         }
611         return peers
612 }
613
614 func (ps *PeerSet) peersWithoutNewStatus(height uint64) []*Peer {
615         ps.mtx.RLock()
616         defer ps.mtx.RUnlock()
617
618         var peers []*Peer
619         for _, peer := range ps.peers {
620                 if peer.knownStatus < height {
621                         peers = append(peers, peer)
622                 }
623         }
624         return peers
625 }
626
627 func (ps *PeerSet) peersWithoutTx(hash *bc.Hash) []*Peer {
628         ps.mtx.RLock()
629         defer ps.mtx.RUnlock()
630
631         peers := []*Peer{}
632         for _, peer := range ps.peers {
633                 if !peer.knownTxs.Has(hash.String()) {
634                         peers = append(peers, peer)
635                 }
636         }
637         return peers
638 }
639
640 func (ps *PeerSet) RemovePeer(peerID string) {
641         ps.mtx.Lock()
642         delete(ps.peers, peerID)
643         ps.mtx.Unlock()
644         ps.StopPeerGracefully(peerID)
645 }
646
647 func (ps *PeerSet) SetStatus(peerID string, height uint64, hash *bc.Hash) {
648         peer := ps.GetPeer(peerID)
649         if peer == nil {
650                 return
651         }
652
653         peer.SetBestStatus(height, hash)
654 }
655
656 func (ps *PeerSet) SetIrreversibleStatus(peerID string, height uint64, hash *bc.Hash) {
657         peer := ps.GetPeer(peerID)
658         if peer == nil {
659                 return
660         }
661
662         peer.SetIrreversibleStatus(height, hash)
663 }