OSDN Git Service

1f0ac2496b9c9c613a9ca8f9f2fa2cf53aa59cba
[bytom/vapor.git] / netsync / peers / peer.go
1 package peers
2
3 import (
4         "encoding/hex"
5         "net"
6         "reflect"
7         "sync"
8
9         log "github.com/sirupsen/logrus"
10         "github.com/tendermint/tmlibs/flowrate"
11         "gopkg.in/fatih/set.v0"
12
13         "github.com/vapor/consensus"
14         "github.com/vapor/errors"
15         msgs "github.com/vapor/netsync/messages"
16         "github.com/vapor/protocol/bc"
17         "github.com/vapor/protocol/bc/types"
18 )
19
20 const (
21         maxKnownTxs           = 32768 // Maximum transactions hashes to keep in the known list (prevent DOS)
22         maxKnownSignatures    = 1024  // Maximum block signatures to keep in the known list (prevent DOS)
23         maxKnownBlocks        = 1024  // Maximum block hashes to keep in the known list (prevent DOS)
24         maxFilterAddressSize  = 50
25         maxFilterAddressCount = 1000
26
27         logModule = "peers"
28 )
29
30 var (
31         errSendStatusMsg = errors.New("send status msg fail")
32         ErrPeerMisbehave = errors.New("peer is misbehave")
33 )
34
35 //BasePeer is the interface for connection level peer
36 type BasePeer interface {
37         Addr() net.Addr
38         ID() string
39         ServiceFlag() consensus.ServiceFlag
40         TrafficStatus() (*flowrate.Status, *flowrate.Status)
41         TrySend(byte, interface{}) bool
42         IsLAN() bool
43 }
44
45 //BasePeerSet is the intergace for connection level peer manager
46 type BasePeerSet interface {
47         StopPeerGracefully(string)
48         IsBanned(peerID string, level byte, reason string) bool
49 }
50
51 type BroadcastMsg interface {
52         FilterTargetPeers(ps *PeerSet) []string
53         MarkSendRecord(ps *PeerSet, peers []string)
54         GetChan() byte
55         GetMsg() interface{}
56         MsgString() string
57 }
58
59 // PeerInfo indicate peer status snap
60 type PeerInfo struct {
61         ID                  string `json:"peer_id"`
62         RemoteAddr          string `json:"remote_addr"`
63         Height              uint64 `json:"height"`
64         Ping                string `json:"ping"`
65         Duration            string `json:"duration"`
66         TotalSent           int64  `json:"total_sent"`
67         TotalReceived       int64  `json:"total_received"`
68         AverageSentRate     int64  `json:"average_sent_rate"`
69         AverageReceivedRate int64  `json:"average_received_rate"`
70         CurrentSentRate     int64  `json:"current_sent_rate"`
71         CurrentReceivedRate int64  `json:"current_received_rate"`
72 }
73
74 type Peer struct {
75         BasePeer
76         mtx             sync.RWMutex
77         services        consensus.ServiceFlag
78         height          uint64
79         hash            *bc.Hash
80         knownTxs        *set.Set // Set of transaction hashes known to be known by this peer
81         knownBlocks     *set.Set // Set of block hashes known to be known by this peer
82         knownSignatures *set.Set // Set of block signatures known to be known by this peer
83         knownStatus     uint64   // Set of chain status known to be known by this peer
84         filterAdds      *set.Set // Set of addresses that the spv node cares about.
85 }
86
87 func newPeer(basePeer BasePeer) *Peer {
88         return &Peer{
89                 BasePeer:        basePeer,
90                 services:        basePeer.ServiceFlag(),
91                 knownTxs:        set.New(),
92                 knownBlocks:     set.New(),
93                 knownSignatures: set.New(),
94                 filterAdds:      set.New(),
95         }
96 }
97
98 func (p *Peer) Height() uint64 {
99         p.mtx.RLock()
100         defer p.mtx.RUnlock()
101         return p.height
102 }
103
104 func (p *Peer) AddFilterAddress(address []byte) {
105         p.mtx.Lock()
106         defer p.mtx.Unlock()
107
108         if p.filterAdds.Size() >= maxFilterAddressCount {
109                 log.WithField("module", logModule).Warn("the count of filter addresses is greater than limit")
110                 return
111         }
112         if len(address) > maxFilterAddressSize {
113                 log.WithField("module", logModule).Warn("the size of filter address is greater than limit")
114                 return
115         }
116
117         p.filterAdds.Add(hex.EncodeToString(address))
118 }
119
120 func (p *Peer) AddFilterAddresses(addresses [][]byte) {
121         if !p.filterAdds.IsEmpty() {
122                 p.filterAdds.Clear()
123         }
124         for _, address := range addresses {
125                 p.AddFilterAddress(address)
126         }
127 }
128
129 func (p *Peer) FilterClear() {
130         p.filterAdds.Clear()
131 }
132
133 func (p *Peer) GetBlockByHeight(height uint64) bool {
134         msg := struct{ msgs.BlockchainMessage }{&msgs.GetBlockMessage{Height: height}}
135         return p.TrySend(msgs.BlockchainChannel, msg)
136 }
137
138 func (p *Peer) GetBlocks(locator []*bc.Hash, stopHash *bc.Hash) bool {
139         msg := struct{ msgs.BlockchainMessage }{msgs.NewGetBlocksMessage(locator, stopHash)}
140         return p.TrySend(msgs.BlockchainChannel, msg)
141 }
142
143 func (p *Peer) GetHeaders(locator []*bc.Hash, stopHash *bc.Hash) bool {
144         msg := struct{ msgs.BlockchainMessage }{msgs.NewGetHeadersMessage(locator, stopHash)}
145         return p.TrySend(msgs.BlockchainChannel, msg)
146 }
147
148 func (p *Peer) GetPeerInfo() *PeerInfo {
149         p.mtx.RLock()
150         defer p.mtx.RUnlock()
151
152         sentStatus, receivedStatus := p.TrafficStatus()
153         ping := sentStatus.Idle - receivedStatus.Idle
154         if receivedStatus.Idle > sentStatus.Idle {
155                 ping = -ping
156         }
157
158         return &PeerInfo{
159                 ID:                  p.ID(),
160                 RemoteAddr:          p.Addr().String(),
161                 Height:              p.height,
162                 Ping:                ping.String(),
163                 Duration:            sentStatus.Duration.String(),
164                 TotalSent:           sentStatus.Bytes,
165                 TotalReceived:       receivedStatus.Bytes,
166                 AverageSentRate:     sentStatus.AvgRate,
167                 AverageReceivedRate: receivedStatus.AvgRate,
168                 CurrentSentRate:     sentStatus.CurRate,
169                 CurrentReceivedRate: receivedStatus.CurRate,
170         }
171 }
172
173 func (p *Peer) getRelatedTxAndStatus(txs []*types.Tx, txStatuses *bc.TransactionStatus) ([]*types.Tx, []*bc.TxVerifyResult) {
174         var relatedTxs []*types.Tx
175         var relatedStatuses []*bc.TxVerifyResult
176         for i, tx := range txs {
177                 if p.isRelatedTx(tx) {
178                         relatedTxs = append(relatedTxs, tx)
179                         relatedStatuses = append(relatedStatuses, txStatuses.VerifyStatus[i])
180                 }
181         }
182         return relatedTxs, relatedStatuses
183 }
184
185 func (p *Peer) isRelatedTx(tx *types.Tx) bool {
186         for _, input := range tx.Inputs {
187                 switch inp := input.TypedInput.(type) {
188                 case *types.SpendInput:
189                         if p.filterAdds.Has(hex.EncodeToString(inp.ControlProgram)) {
190                                 return true
191                         }
192                 }
193         }
194         for _, output := range tx.Outputs {
195                 if p.filterAdds.Has(hex.EncodeToString(output.ControlProgram())) {
196                         return true
197                 }
198         }
199         return false
200 }
201
202 func (p *Peer) isSPVNode() bool {
203         return !p.services.IsEnable(consensus.SFFullNode)
204 }
205
206 func (p *Peer) MarkBlock(hash *bc.Hash) {
207         p.mtx.Lock()
208         defer p.mtx.Unlock()
209
210         for p.knownBlocks.Size() >= maxKnownBlocks {
211                 p.knownBlocks.Pop()
212         }
213         p.knownBlocks.Add(hash.String())
214 }
215
216 func (p *Peer) markNewStatus(height uint64) {
217         p.mtx.Lock()
218         defer p.mtx.Unlock()
219
220         p.knownStatus = height
221 }
222
223 func (p *Peer) markSign(signature []byte) {
224         p.mtx.Lock()
225         defer p.mtx.Unlock()
226
227         for p.knownSignatures.Size() >= maxKnownSignatures {
228                 p.knownSignatures.Pop()
229         }
230         p.knownSignatures.Add(hex.EncodeToString(signature))
231 }
232
233 func (p *Peer) markTransaction(hash *bc.Hash) {
234         p.mtx.Lock()
235         defer p.mtx.Unlock()
236
237         for p.knownTxs.Size() >= maxKnownTxs {
238                 p.knownTxs.Pop()
239         }
240         p.knownTxs.Add(hash.String())
241 }
242
243 func (ps *PeerSet) PeersWithoutBlock(hash bc.Hash) []string {
244         ps.mtx.RLock()
245         defer ps.mtx.RUnlock()
246
247         var peers []string
248         for _, peer := range ps.peers {
249                 if !peer.knownBlocks.Has(hash.String()) {
250                         peers = append(peers, peer.ID())
251                 }
252         }
253         return peers
254 }
255
256 func (ps *PeerSet) PeersWithoutSign(signature []byte) []string {
257         ps.mtx.RLock()
258         defer ps.mtx.RUnlock()
259
260         var peers []string
261         for _, peer := range ps.peers {
262                 if !peer.knownSignatures.Has(hex.EncodeToString(signature)) {
263                         peers = append(peers, peer.ID())
264                 }
265         }
266         return peers
267 }
268
269 func (p *Peer) SendBlock(block *types.Block) (bool, error) {
270         msg, err := msgs.NewBlockMessage(block)
271         if err != nil {
272                 return false, errors.Wrap(err, "fail on NewBlockMessage")
273         }
274
275         ok := p.TrySend(msgs.BlockchainChannel, struct{ msgs.BlockchainMessage }{msg})
276         if ok {
277                 blcokHash := block.Hash()
278                 p.knownBlocks.Add(blcokHash.String())
279         }
280         return ok, nil
281 }
282
283 func (p *Peer) SendBlocks(blocks []*types.Block) (bool, error) {
284         msg, err := msgs.NewBlocksMessage(blocks)
285         if err != nil {
286                 return false, errors.Wrap(err, "fail on NewBlocksMessage")
287         }
288
289         if ok := p.TrySend(msgs.BlockchainChannel, struct{ msgs.BlockchainMessage }{msg}); !ok {
290                 return ok, nil
291         }
292
293         for _, block := range blocks {
294                 blcokHash := block.Hash()
295                 p.knownBlocks.Add(blcokHash.String())
296         }
297         return true, nil
298 }
299
300 func (p *Peer) SendHeaders(headers []*types.BlockHeader) (bool, error) {
301         msg, err := msgs.NewHeadersMessage(headers)
302         if err != nil {
303                 return false, errors.New("fail on NewHeadersMessage")
304         }
305
306         ok := p.TrySend(msgs.BlockchainChannel, struct{ msgs.BlockchainMessage }{msg})
307         return ok, nil
308 }
309
310 func (p *Peer) SendMerkleBlock(block *types.Block, txStatuses *bc.TransactionStatus) (bool, error) {
311         msg := msgs.NewMerkleBlockMessage()
312         if err := msg.SetRawBlockHeader(block.BlockHeader); err != nil {
313                 return false, err
314         }
315
316         relatedTxs, relatedStatuses := p.getRelatedTxAndStatus(block.Transactions, txStatuses)
317
318         txHashes, txFlags := types.GetTxMerkleTreeProof(block.Transactions, relatedTxs)
319         if err := msg.SetTxInfo(txHashes, txFlags, relatedTxs); err != nil {
320                 return false, nil
321         }
322
323         statusHashes := types.GetStatusMerkleTreeProof(txStatuses.VerifyStatus, txFlags)
324         if err := msg.SetStatusInfo(statusHashes, relatedStatuses); err != nil {
325                 return false, nil
326         }
327
328         ok := p.TrySend(msgs.BlockchainChannel, struct{ msgs.BlockchainMessage }{msg})
329         return ok, nil
330 }
331
332 func (p *Peer) SendTransactions(txs []*types.Tx) error {
333         validTxs := make([]*types.Tx, 0, len(txs))
334         for i, tx := range txs {
335                 if p.isSPVNode() && !p.isRelatedTx(tx) || p.knownTxs.Has(tx.ID.String()) {
336                         continue
337                 }
338
339                 validTxs = append(validTxs, tx)
340                 if len(validTxs) != msgs.TxsMsgMaxTxNum && i != len(txs)-1 {
341                         continue
342                 }
343
344                 msg, err := msgs.NewTransactionsMessage(validTxs)
345                 if err != nil {
346                         return err
347                 }
348
349                 if ok := p.TrySend(msgs.BlockchainChannel, struct{ msgs.BlockchainMessage }{msg}); !ok {
350                         return errors.New("failed to send txs msg")
351                 }
352
353                 for _, validTx := range validTxs {
354                         p.knownTxs.Add(validTx.ID.String())
355                 }
356
357                 validTxs = make([]*types.Tx, 0, len(txs))
358         }
359
360         return nil
361 }
362
363 func (p *Peer) SendStatus(header *types.BlockHeader) error {
364         msg := msgs.NewStatusMessage(header)
365         if ok := p.TrySend(msgs.BlockchainChannel, struct{ msgs.BlockchainMessage }{msg}); !ok {
366                 return errSendStatusMsg
367         }
368         p.markNewStatus(header.Height)
369         return nil
370 }
371
372 func (p *Peer) SetStatus(height uint64, hash *bc.Hash) {
373         p.mtx.Lock()
374         defer p.mtx.Unlock()
375         p.height = height
376         p.hash = hash
377 }
378
379 type PeerSet struct {
380         BasePeerSet
381         mtx   sync.RWMutex
382         peers map[string]*Peer
383 }
384
385 // newPeerSet creates a new peer set to track the active participants.
386 func NewPeerSet(basePeerSet BasePeerSet) *PeerSet {
387         return &PeerSet{
388                 BasePeerSet: basePeerSet,
389                 peers:       make(map[string]*Peer),
390         }
391 }
392
393 func (ps *PeerSet) ProcessIllegal(peerID string, level byte, reason string) {
394         ps.mtx.Lock()
395         peer := ps.peers[peerID]
396         ps.mtx.Unlock()
397
398         if peer == nil {
399                 return
400         }
401         if banned := ps.IsBanned(peer.Addr().String(), level, reason); banned {
402                 ps.RemovePeer(peerID)
403         }
404         return
405 }
406
407 func (ps *PeerSet) AddPeer(peer BasePeer) {
408         ps.mtx.Lock()
409         defer ps.mtx.Unlock()
410
411         if _, ok := ps.peers[peer.ID()]; !ok {
412                 ps.peers[peer.ID()] = newPeer(peer)
413                 return
414         }
415         log.WithField("module", logModule).Warning("add existing peer to blockKeeper")
416 }
417
418 func (ps *PeerSet) BestPeer(flag consensus.ServiceFlag) *Peer {
419         ps.mtx.RLock()
420         defer ps.mtx.RUnlock()
421
422         var bestPeer *Peer
423         for _, p := range ps.peers {
424                 if !p.services.IsEnable(flag) {
425                         continue
426                 }
427                 if bestPeer == nil || p.height > bestPeer.height || (p.height == bestPeer.height && p.IsLAN()) {
428                         bestPeer = p
429                 }
430         }
431         return bestPeer
432 }
433
434 //SendMsg send message to the target peer.
435 func (ps *PeerSet) SendMsg(peerID string, msgChannel byte, msg interface{}) bool {
436         peer := ps.GetPeer(peerID)
437         if peer == nil {
438                 return false
439         }
440
441         ok := peer.TrySend(msgChannel, msg)
442         if !ok {
443                 ps.RemovePeer(peerID)
444         }
445         return ok
446 }
447
448 //BroadcastMsg Broadcast message to the target peers
449 // and mark the message send record
450 func (ps *PeerSet) BroadcastMsg(bm BroadcastMsg) error {
451         //filter target peers
452         peers := bm.FilterTargetPeers(ps)
453
454         //broadcast to target peers
455         peersSuccess := make([]string, 0)
456         for _, peer := range peers {
457                 if ok := ps.SendMsg(peer, bm.GetChan(), bm.GetMsg()); !ok {
458                         log.WithFields(log.Fields{"module": logModule, "peer": peer, "type": reflect.TypeOf(bm.GetMsg()), "message": bm.MsgString()}).Warning("send message to peer error")
459                         continue
460                 }
461                 peersSuccess = append(peersSuccess, peer)
462         }
463
464         //mark the message send record
465         bm.MarkSendRecord(ps, peersSuccess)
466         return nil
467 }
468
469 func (ps *PeerSet) BroadcastNewStatus(bestBlock *types.Block) error {
470         msg := msgs.NewStatusMessage(&bestBlock.BlockHeader)
471         peers := ps.peersWithoutNewStatus(bestBlock.Height)
472         for _, peer := range peers {
473                 if ok := peer.TrySend(msgs.BlockchainChannel, struct{ msgs.BlockchainMessage }{msg}); !ok {
474                         ps.RemovePeer(peer.ID())
475                         continue
476                 }
477
478                 peer.markNewStatus(bestBlock.Height)
479         }
480         return nil
481 }
482
483 func (ps *PeerSet) BroadcastTx(tx *types.Tx) error {
484         msg, err := msgs.NewTransactionMessage(tx)
485         if err != nil {
486                 return errors.Wrap(err, "fail on broadcast tx")
487         }
488
489         peers := ps.peersWithoutTx(&tx.ID)
490         for _, peer := range peers {
491                 if peer.isSPVNode() && !peer.isRelatedTx(tx) {
492                         continue
493                 }
494                 if ok := peer.TrySend(msgs.BlockchainChannel, struct{ msgs.BlockchainMessage }{msg}); !ok {
495                         log.WithFields(log.Fields{
496                                 "module":  logModule,
497                                 "peer":    peer.Addr(),
498                                 "type":    reflect.TypeOf(msg),
499                                 "message": msg.String(),
500                         }).Warning("send message to peer error")
501                         ps.RemovePeer(peer.ID())
502                         continue
503                 }
504                 peer.markTransaction(&tx.ID)
505         }
506         return nil
507 }
508
509 func (ps *PeerSet) ErrorHandler(peerID string, level byte, err error) {
510         if errors.Root(err) == ErrPeerMisbehave {
511                 ps.ProcessIllegal(peerID, level, err.Error())
512         } else {
513                 ps.RemovePeer(peerID)
514         }
515 }
516
517 // Peer retrieves the registered peer with the given id.
518 func (ps *PeerSet) GetPeer(id string) *Peer {
519         ps.mtx.RLock()
520         defer ps.mtx.RUnlock()
521         return ps.peers[id]
522 }
523
524 func (ps *PeerSet) GetPeerInfos() []*PeerInfo {
525         ps.mtx.RLock()
526         defer ps.mtx.RUnlock()
527
528         result := []*PeerInfo{}
529         for _, peer := range ps.peers {
530                 result = append(result, peer.GetPeerInfo())
531         }
532         return result
533 }
534
535 func (ps *PeerSet) MarkBlock(peerID string, hash *bc.Hash) {
536         peer := ps.GetPeer(peerID)
537         if peer == nil {
538                 return
539         }
540         peer.MarkBlock(hash)
541 }
542
543 func (ps *PeerSet) MarkBlockSignature(peerID string, signature []byte) {
544         peer := ps.GetPeer(peerID)
545         if peer == nil {
546                 return
547         }
548         peer.markSign(signature)
549 }
550
551 func (ps *PeerSet) MarkStatus(peerID string, height uint64) {
552         peer := ps.GetPeer(peerID)
553         if peer == nil {
554                 return
555         }
556         peer.markNewStatus(height)
557 }
558
559 func (ps *PeerSet) MarkTx(peerID string, txHash bc.Hash) {
560         ps.mtx.Lock()
561         peer := ps.peers[peerID]
562         ps.mtx.Unlock()
563
564         if peer == nil {
565                 return
566         }
567         peer.markTransaction(&txHash)
568 }
569
570 func (ps *PeerSet) peersWithoutBlock(hash *bc.Hash) []*Peer {
571         ps.mtx.RLock()
572         defer ps.mtx.RUnlock()
573
574         peers := []*Peer{}
575         for _, peer := range ps.peers {
576                 if !peer.knownBlocks.Has(hash.String()) {
577                         peers = append(peers, peer)
578                 }
579         }
580         return peers
581 }
582
583 func (ps *PeerSet) peersWithoutNewStatus(height uint64) []*Peer {
584         ps.mtx.RLock()
585         defer ps.mtx.RUnlock()
586
587         var peers []*Peer
588         for _, peer := range ps.peers {
589                 if peer.knownStatus < height {
590                         peers = append(peers, peer)
591                 }
592         }
593         return peers
594 }
595
596 func (ps *PeerSet) peersWithoutTx(hash *bc.Hash) []*Peer {
597         ps.mtx.RLock()
598         defer ps.mtx.RUnlock()
599
600         peers := []*Peer{}
601         for _, peer := range ps.peers {
602                 if !peer.knownTxs.Has(hash.String()) {
603                         peers = append(peers, peer)
604                 }
605         }
606         return peers
607 }
608
609 func (ps *PeerSet) RemovePeer(peerID string) {
610         ps.mtx.Lock()
611         delete(ps.peers, peerID)
612         ps.mtx.Unlock()
613         ps.StopPeerGracefully(peerID)
614 }
615
616 func (ps *PeerSet) SetStatus(peerID string, height uint64, hash *bc.Hash) {
617         peer := ps.GetPeer(peerID)
618         if peer == nil {
619                 return
620         }
621
622         peer.SetStatus(height, hash)
623 }