OSDN Git Service

add fast sync func (#204)
[bytom/vapor.git] / netsync / peers / peer.go
1 package peers
2
3 import (
4         "encoding/hex"
5         "net"
6         "reflect"
7         "sync"
8
9         log "github.com/sirupsen/logrus"
10         "github.com/tendermint/tmlibs/flowrate"
11         "gopkg.in/fatih/set.v0"
12
13         "github.com/vapor/consensus"
14         "github.com/vapor/errors"
15         msgs "github.com/vapor/netsync/messages"
16         "github.com/vapor/protocol/bc"
17         "github.com/vapor/protocol/bc/types"
18 )
19
20 const (
21         maxKnownTxs           = 32768 // Maximum transactions hashes to keep in the known list (prevent DOS)
22         maxKnownSignatures    = 1024  // Maximum block signatures to keep in the known list (prevent DOS)
23         maxKnownBlocks        = 1024  // Maximum block hashes to keep in the known list (prevent DOS)
24         maxFilterAddressSize  = 50
25         maxFilterAddressCount = 1000
26
27         logModule = "peers"
28 )
29
30 var (
31         errSendStatusMsg = errors.New("send status msg fail")
32         ErrPeerMisbehave = errors.New("peer is misbehave")
33 )
34
35 //BasePeer is the interface for connection level peer
36 type BasePeer interface {
37         Addr() net.Addr
38         ID() string
39         ServiceFlag() consensus.ServiceFlag
40         TrafficStatus() (*flowrate.Status, *flowrate.Status)
41         TrySend(byte, interface{}) bool
42         IsLAN() bool
43 }
44
45 //BasePeerSet is the intergace for connection level peer manager
46 type BasePeerSet interface {
47         StopPeerGracefully(string)
48         IsBanned(peerID string, level byte, reason string) bool
49 }
50
51 type BroadcastMsg interface {
52         FilterTargetPeers(ps *PeerSet) []string
53         MarkSendRecord(ps *PeerSet, peers []string)
54         GetChan() byte
55         GetMsg() interface{}
56         MsgString() string
57 }
58
59 // PeerInfo indicate peer status snap
60 type PeerInfo struct {
61         ID                  string `json:"peer_id"`
62         RemoteAddr          string `json:"remote_addr"`
63         Height              uint64 `json:"height"`
64         Ping                string `json:"ping"`
65         Duration            string `json:"duration"`
66         TotalSent           int64  `json:"total_sent"`
67         TotalReceived       int64  `json:"total_received"`
68         AverageSentRate     int64  `json:"average_sent_rate"`
69         AverageReceivedRate int64  `json:"average_received_rate"`
70         CurrentSentRate     int64  `json:"current_sent_rate"`
71         CurrentReceivedRate int64  `json:"current_received_rate"`
72 }
73
74 type Peer struct {
75         BasePeer
76         mtx                sync.RWMutex
77         services           consensus.ServiceFlag
78         bestHeight         uint64
79         bestHash           *bc.Hash
80         irreversibleHeight uint64
81         irreversibleHash   *bc.Hash
82         knownTxs           *set.Set // Set of transaction hashes known to be known by this peer
83         knownBlocks        *set.Set // Set of block hashes known to be known by this peer
84         knownSignatures    *set.Set // Set of block signatures known to be known by this peer
85         knownStatus        uint64   // Set of chain status known to be known by this peer
86         filterAdds         *set.Set // Set of addresses that the spv node cares about.
87 }
88
89 func newPeer(basePeer BasePeer) *Peer {
90         return &Peer{
91                 BasePeer:        basePeer,
92                 services:        basePeer.ServiceFlag(),
93                 knownTxs:        set.New(),
94                 knownBlocks:     set.New(),
95                 knownSignatures: set.New(),
96                 filterAdds:      set.New(),
97         }
98 }
99
100 func (p *Peer) Height() uint64 {
101         p.mtx.RLock()
102         defer p.mtx.RUnlock()
103
104         return p.bestHeight
105 }
106
107 func (p *Peer) IrreversibleHeight() uint64 {
108         p.mtx.RLock()
109         defer p.mtx.RUnlock()
110
111         return p.irreversibleHeight
112 }
113
114 func (p *Peer) AddFilterAddress(address []byte) {
115         p.mtx.Lock()
116         defer p.mtx.Unlock()
117
118         if p.filterAdds.Size() >= maxFilterAddressCount {
119                 log.WithField("module", logModule).Warn("the count of filter addresses is greater than limit")
120                 return
121         }
122         if len(address) > maxFilterAddressSize {
123                 log.WithField("module", logModule).Warn("the size of filter address is greater than limit")
124                 return
125         }
126
127         p.filterAdds.Add(hex.EncodeToString(address))
128 }
129
130 func (p *Peer) AddFilterAddresses(addresses [][]byte) {
131         if !p.filterAdds.IsEmpty() {
132                 p.filterAdds.Clear()
133         }
134         for _, address := range addresses {
135                 p.AddFilterAddress(address)
136         }
137 }
138
139 func (p *Peer) FilterClear() {
140         p.filterAdds.Clear()
141 }
142
143 func (p *Peer) GetBlockByHeight(height uint64) bool {
144         msg := struct{ msgs.BlockchainMessage }{&msgs.GetBlockMessage{Height: height}}
145         return p.TrySend(msgs.BlockchainChannel, msg)
146 }
147
148 func (p *Peer) GetBlocks(locator []*bc.Hash, stopHash *bc.Hash) bool {
149         msg := struct{ msgs.BlockchainMessage }{msgs.NewGetBlocksMessage(locator, stopHash)}
150         return p.TrySend(msgs.BlockchainChannel, msg)
151 }
152
153 func (p *Peer) GetHeaders(locator []*bc.Hash, stopHash *bc.Hash, skip uint64) bool {
154         msg := struct{ msgs.BlockchainMessage }{msgs.NewGetHeadersMessage(locator, stopHash, skip)}
155         return p.TrySend(msgs.BlockchainChannel, msg)
156 }
157
158 func (p *Peer) GetPeerInfo() *PeerInfo {
159         p.mtx.RLock()
160         defer p.mtx.RUnlock()
161
162         sentStatus, receivedStatus := p.TrafficStatus()
163         ping := sentStatus.Idle - receivedStatus.Idle
164         if receivedStatus.Idle > sentStatus.Idle {
165                 ping = -ping
166         }
167
168         return &PeerInfo{
169                 ID:                  p.ID(),
170                 RemoteAddr:          p.Addr().String(),
171                 Height:              p.bestHeight,
172                 Ping:                ping.String(),
173                 Duration:            sentStatus.Duration.String(),
174                 TotalSent:           sentStatus.Bytes,
175                 TotalReceived:       receivedStatus.Bytes,
176                 AverageSentRate:     sentStatus.AvgRate,
177                 AverageReceivedRate: receivedStatus.AvgRate,
178                 CurrentSentRate:     sentStatus.CurRate,
179                 CurrentReceivedRate: receivedStatus.CurRate,
180         }
181 }
182
183 func (p *Peer) getRelatedTxAndStatus(txs []*types.Tx, txStatuses *bc.TransactionStatus) ([]*types.Tx, []*bc.TxVerifyResult) {
184         var relatedTxs []*types.Tx
185         var relatedStatuses []*bc.TxVerifyResult
186         for i, tx := range txs {
187                 if p.isRelatedTx(tx) {
188                         relatedTxs = append(relatedTxs, tx)
189                         relatedStatuses = append(relatedStatuses, txStatuses.VerifyStatus[i])
190                 }
191         }
192         return relatedTxs, relatedStatuses
193 }
194
195 func (p *Peer) isRelatedTx(tx *types.Tx) bool {
196         for _, input := range tx.Inputs {
197                 switch inp := input.TypedInput.(type) {
198                 case *types.SpendInput:
199                         if p.filterAdds.Has(hex.EncodeToString(inp.ControlProgram)) {
200                                 return true
201                         }
202                 }
203         }
204         for _, output := range tx.Outputs {
205                 if p.filterAdds.Has(hex.EncodeToString(output.ControlProgram())) {
206                         return true
207                 }
208         }
209         return false
210 }
211
212 func (p *Peer) isSPVNode() bool {
213         return !p.services.IsEnable(consensus.SFFullNode)
214 }
215
216 func (p *Peer) MarkBlock(hash *bc.Hash) {
217         p.mtx.Lock()
218         defer p.mtx.Unlock()
219
220         for p.knownBlocks.Size() >= maxKnownBlocks {
221                 p.knownBlocks.Pop()
222         }
223         p.knownBlocks.Add(hash.String())
224 }
225
226 func (p *Peer) markNewStatus(height uint64) {
227         p.mtx.Lock()
228         defer p.mtx.Unlock()
229
230         p.knownStatus = height
231 }
232
233 func (p *Peer) markSign(signature []byte) {
234         p.mtx.Lock()
235         defer p.mtx.Unlock()
236
237         for p.knownSignatures.Size() >= maxKnownSignatures {
238                 p.knownSignatures.Pop()
239         }
240         p.knownSignatures.Add(hex.EncodeToString(signature))
241 }
242
243 func (p *Peer) markTransaction(hash *bc.Hash) {
244         p.mtx.Lock()
245         defer p.mtx.Unlock()
246
247         for p.knownTxs.Size() >= maxKnownTxs {
248                 p.knownTxs.Pop()
249         }
250         p.knownTxs.Add(hash.String())
251 }
252
253 func (ps *PeerSet) PeersWithoutBlock(hash bc.Hash) []string {
254         ps.mtx.RLock()
255         defer ps.mtx.RUnlock()
256
257         var peers []string
258         for _, peer := range ps.peers {
259                 if !peer.knownBlocks.Has(hash.String()) {
260                         peers = append(peers, peer.ID())
261                 }
262         }
263         return peers
264 }
265
266 func (ps *PeerSet) PeersWithoutSign(signature []byte) []string {
267         ps.mtx.RLock()
268         defer ps.mtx.RUnlock()
269
270         var peers []string
271         for _, peer := range ps.peers {
272                 if !peer.knownSignatures.Has(hex.EncodeToString(signature)) {
273                         peers = append(peers, peer.ID())
274                 }
275         }
276         return peers
277 }
278
279 func (p *Peer) SendBlock(block *types.Block) (bool, error) {
280         msg, err := msgs.NewBlockMessage(block)
281         if err != nil {
282                 return false, errors.Wrap(err, "fail on NewBlockMessage")
283         }
284
285         ok := p.TrySend(msgs.BlockchainChannel, struct{ msgs.BlockchainMessage }{msg})
286         if ok {
287                 blcokHash := block.Hash()
288                 p.knownBlocks.Add(blcokHash.String())
289         }
290         return ok, nil
291 }
292
293 func (p *Peer) SendBlocks(blocks []*types.Block) (bool, error) {
294         msg, err := msgs.NewBlocksMessage(blocks)
295         if err != nil {
296                 return false, errors.Wrap(err, "fail on NewBlocksMessage")
297         }
298
299         if ok := p.TrySend(msgs.BlockchainChannel, struct{ msgs.BlockchainMessage }{msg}); !ok {
300                 return ok, nil
301         }
302
303         for _, block := range blocks {
304                 blcokHash := block.Hash()
305                 p.knownBlocks.Add(blcokHash.String())
306         }
307         return true, nil
308 }
309
310 func (p *Peer) SendHeaders(headers []*types.BlockHeader) (bool, error) {
311         msg, err := msgs.NewHeadersMessage(headers)
312         if err != nil {
313                 return false, errors.New("fail on NewHeadersMessage")
314         }
315
316         ok := p.TrySend(msgs.BlockchainChannel, struct{ msgs.BlockchainMessage }{msg})
317         return ok, nil
318 }
319
320 func (p *Peer) SendMerkleBlock(block *types.Block, txStatuses *bc.TransactionStatus) (bool, error) {
321         msg := msgs.NewMerkleBlockMessage()
322         if err := msg.SetRawBlockHeader(block.BlockHeader); err != nil {
323                 return false, err
324         }
325
326         relatedTxs, relatedStatuses := p.getRelatedTxAndStatus(block.Transactions, txStatuses)
327
328         txHashes, txFlags := types.GetTxMerkleTreeProof(block.Transactions, relatedTxs)
329         if err := msg.SetTxInfo(txHashes, txFlags, relatedTxs); err != nil {
330                 return false, nil
331         }
332
333         statusHashes := types.GetStatusMerkleTreeProof(txStatuses.VerifyStatus, txFlags)
334         if err := msg.SetStatusInfo(statusHashes, relatedStatuses); err != nil {
335                 return false, nil
336         }
337
338         ok := p.TrySend(msgs.BlockchainChannel, struct{ msgs.BlockchainMessage }{msg})
339         return ok, nil
340 }
341
342 func (p *Peer) SendTransactions(txs []*types.Tx) error {
343         validTxs := make([]*types.Tx, 0, len(txs))
344         for i, tx := range txs {
345                 if p.isSPVNode() && !p.isRelatedTx(tx) || p.knownTxs.Has(tx.ID.String()) {
346                         continue
347                 }
348
349                 validTxs = append(validTxs, tx)
350                 if len(validTxs) != msgs.TxsMsgMaxTxNum && i != len(txs)-1 {
351                         continue
352                 }
353
354                 msg, err := msgs.NewTransactionsMessage(validTxs)
355                 if err != nil {
356                         return err
357                 }
358
359                 if ok := p.TrySend(msgs.BlockchainChannel, struct{ msgs.BlockchainMessage }{msg}); !ok {
360                         return errors.New("failed to send txs msg")
361                 }
362
363                 for _, validTx := range validTxs {
364                         p.knownTxs.Add(validTx.ID.String())
365                 }
366
367                 validTxs = make([]*types.Tx, 0, len(txs))
368         }
369
370         return nil
371 }
372
373 func (p *Peer) SendStatus(bestHeader, irreversibleHeader *types.BlockHeader) error {
374         msg := msgs.NewStatusMessage(bestHeader, irreversibleHeader)
375         if ok := p.TrySend(msgs.BlockchainChannel, struct{ msgs.BlockchainMessage }{msg}); !ok {
376                 return errSendStatusMsg
377         }
378         p.markNewStatus(bestHeader.Height)
379         return nil
380 }
381
382 func (p *Peer) SetBestStatus(bestHeight uint64, bestHash *bc.Hash) {
383         p.mtx.Lock()
384         defer p.mtx.Unlock()
385
386         p.bestHeight = bestHeight
387         p.bestHash = bestHash
388 }
389
390 func (p *Peer) SetIrreversibleStatus(irreversibleHeight uint64, irreversibleHash *bc.Hash) {
391         p.mtx.Lock()
392         defer p.mtx.Unlock()
393
394         p.irreversibleHeight = irreversibleHeight
395         p.irreversibleHash = irreversibleHash
396 }
397
398 type PeerSet struct {
399         BasePeerSet
400         mtx   sync.RWMutex
401         peers map[string]*Peer
402 }
403
404 // newPeerSet creates a new peer set to track the active participants.
405 func NewPeerSet(basePeerSet BasePeerSet) *PeerSet {
406         return &PeerSet{
407                 BasePeerSet: basePeerSet,
408                 peers:       make(map[string]*Peer),
409         }
410 }
411
412 func (ps *PeerSet) ProcessIllegal(peerID string, level byte, reason string) {
413         ps.mtx.Lock()
414         peer := ps.peers[peerID]
415         ps.mtx.Unlock()
416
417         if peer == nil {
418                 return
419         }
420         if banned := ps.IsBanned(peer.Addr().String(), level, reason); banned {
421                 ps.RemovePeer(peerID)
422         }
423         return
424 }
425
426 func (ps *PeerSet) AddPeer(peer BasePeer) {
427         ps.mtx.Lock()
428         defer ps.mtx.Unlock()
429
430         if _, ok := ps.peers[peer.ID()]; !ok {
431                 ps.peers[peer.ID()] = newPeer(peer)
432                 return
433         }
434         log.WithField("module", logModule).Warning("add existing peer to blockKeeper")
435 }
436
437 func (ps *PeerSet) BestPeer(flag consensus.ServiceFlag) *Peer {
438         ps.mtx.RLock()
439         defer ps.mtx.RUnlock()
440
441         var bestPeer *Peer
442         for _, p := range ps.peers {
443                 if !p.services.IsEnable(flag) {
444                         continue
445                 }
446                 if bestPeer == nil || p.bestHeight > bestPeer.bestHeight || (p.bestHeight == bestPeer.bestHeight && p.IsLAN()) {
447                         bestPeer = p
448                 }
449         }
450         return bestPeer
451 }
452
453 func (ps *PeerSet) BestIrreversiblePeer(flag consensus.ServiceFlag) *Peer {
454         ps.mtx.RLock()
455         defer ps.mtx.RUnlock()
456
457         var bestPeer *Peer
458         for _, p := range ps.peers {
459                 if !p.services.IsEnable(flag) {
460                         continue
461                 }
462                 if bestPeer == nil || p.irreversibleHeight > bestPeer.irreversibleHeight || (p.irreversibleHeight == bestPeer.irreversibleHeight && p.IsLAN()) {
463                         bestPeer = p
464                 }
465         }
466         return bestPeer
467 }
468
469 //SendMsg send message to the target peer.
470 func (ps *PeerSet) SendMsg(peerID string, msgChannel byte, msg interface{}) bool {
471         peer := ps.GetPeer(peerID)
472         if peer == nil {
473                 return false
474         }
475
476         ok := peer.TrySend(msgChannel, msg)
477         if !ok {
478                 ps.RemovePeer(peerID)
479         }
480         return ok
481 }
482
483 //BroadcastMsg Broadcast message to the target peers
484 // and mark the message send record
485 func (ps *PeerSet) BroadcastMsg(bm BroadcastMsg) error {
486         //filter target peers
487         peers := bm.FilterTargetPeers(ps)
488
489         //broadcast to target peers
490         peersSuccess := make([]string, 0)
491         for _, peer := range peers {
492                 if ok := ps.SendMsg(peer, bm.GetChan(), bm.GetMsg()); !ok {
493                         log.WithFields(log.Fields{"module": logModule, "peer": peer, "type": reflect.TypeOf(bm.GetMsg()), "message": bm.MsgString()}).Warning("send message to peer error")
494                         continue
495                 }
496                 peersSuccess = append(peersSuccess, peer)
497         }
498
499         //mark the message send record
500         bm.MarkSendRecord(ps, peersSuccess)
501         return nil
502 }
503
504 func (ps *PeerSet) BroadcastNewStatus(bestHeader, irreversibleHeader *types.BlockHeader) error {
505         msg := msgs.NewStatusMessage(bestHeader, irreversibleHeader)
506         peers := ps.peersWithoutNewStatus(bestHeader.Height)
507         for _, peer := range peers {
508                 if ok := peer.TrySend(msgs.BlockchainChannel, struct{ msgs.BlockchainMessage }{msg}); !ok {
509                         ps.RemovePeer(peer.ID())
510                         continue
511                 }
512
513                 peer.markNewStatus(bestHeader.Height)
514         }
515         return nil
516 }
517
518 func (ps *PeerSet) BroadcastTx(tx *types.Tx) error {
519         msg, err := msgs.NewTransactionMessage(tx)
520         if err != nil {
521                 return errors.Wrap(err, "fail on broadcast tx")
522         }
523
524         peers := ps.peersWithoutTx(&tx.ID)
525         for _, peer := range peers {
526                 if peer.isSPVNode() && !peer.isRelatedTx(tx) {
527                         continue
528                 }
529                 if ok := peer.TrySend(msgs.BlockchainChannel, struct{ msgs.BlockchainMessage }{msg}); !ok {
530                         log.WithFields(log.Fields{
531                                 "module":  logModule,
532                                 "peer":    peer.Addr(),
533                                 "type":    reflect.TypeOf(msg),
534                                 "message": msg.String(),
535                         }).Warning("send message to peer error")
536                         ps.RemovePeer(peer.ID())
537                         continue
538                 }
539                 peer.markTransaction(&tx.ID)
540         }
541         return nil
542 }
543
544 func (ps *PeerSet) ErrorHandler(peerID string, level byte, err error) {
545         if errors.Root(err) == ErrPeerMisbehave {
546                 ps.ProcessIllegal(peerID, level, err.Error())
547         } else {
548                 ps.RemovePeer(peerID)
549         }
550 }
551
552 // Peer retrieves the registered peer with the given id.
553 func (ps *PeerSet) GetPeer(id string) *Peer {
554         ps.mtx.RLock()
555         defer ps.mtx.RUnlock()
556         return ps.peers[id]
557 }
558
559 func (ps *PeerSet) GetPeerInfos() []*PeerInfo {
560         ps.mtx.RLock()
561         defer ps.mtx.RUnlock()
562
563         result := []*PeerInfo{}
564         for _, peer := range ps.peers {
565                 result = append(result, peer.GetPeerInfo())
566         }
567         return result
568 }
569
570 func (ps *PeerSet) MarkBlock(peerID string, hash *bc.Hash) {
571         peer := ps.GetPeer(peerID)
572         if peer == nil {
573                 return
574         }
575         peer.MarkBlock(hash)
576 }
577
578 func (ps *PeerSet) MarkBlockSignature(peerID string, signature []byte) {
579         peer := ps.GetPeer(peerID)
580         if peer == nil {
581                 return
582         }
583         peer.markSign(signature)
584 }
585
586 func (ps *PeerSet) MarkStatus(peerID string, height uint64) {
587         peer := ps.GetPeer(peerID)
588         if peer == nil {
589                 return
590         }
591         peer.markNewStatus(height)
592 }
593
594 func (ps *PeerSet) MarkTx(peerID string, txHash bc.Hash) {
595         ps.mtx.Lock()
596         peer := ps.peers[peerID]
597         ps.mtx.Unlock()
598
599         if peer == nil {
600                 return
601         }
602         peer.markTransaction(&txHash)
603 }
604
605 func (ps *PeerSet) peersWithoutBlock(hash *bc.Hash) []*Peer {
606         ps.mtx.RLock()
607         defer ps.mtx.RUnlock()
608
609         peers := []*Peer{}
610         for _, peer := range ps.peers {
611                 if !peer.knownBlocks.Has(hash.String()) {
612                         peers = append(peers, peer)
613                 }
614         }
615         return peers
616 }
617
618 func (ps *PeerSet) peersWithoutNewStatus(height uint64) []*Peer {
619         ps.mtx.RLock()
620         defer ps.mtx.RUnlock()
621
622         var peers []*Peer
623         for _, peer := range ps.peers {
624                 if peer.knownStatus < height {
625                         peers = append(peers, peer)
626                 }
627         }
628         return peers
629 }
630
631 func (ps *PeerSet) peersWithoutTx(hash *bc.Hash) []*Peer {
632         ps.mtx.RLock()
633         defer ps.mtx.RUnlock()
634
635         peers := []*Peer{}
636         for _, peer := range ps.peers {
637                 if !peer.knownTxs.Has(hash.String()) {
638                         peers = append(peers, peer)
639                 }
640         }
641         return peers
642 }
643
644 func (ps *PeerSet) RemovePeer(peerID string) {
645         ps.mtx.Lock()
646         delete(ps.peers, peerID)
647         ps.mtx.Unlock()
648         ps.StopPeerGracefully(peerID)
649 }
650
651 func (ps *PeerSet) SetStatus(peerID string, height uint64, hash *bc.Hash) {
652         peer := ps.GetPeer(peerID)
653         if peer == nil {
654                 return
655         }
656
657         peer.SetBestStatus(height, hash)
658 }