OSDN Git Service

fix casper rollback (#1951)
[bytom/bytom.git] / netsync / peers / peer.go
1 package peers
2
3 import (
4         "encoding/hex"
5         "net"
6         "reflect"
7         "sync"
8
9         log "github.com/sirupsen/logrus"
10         "github.com/tendermint/tmlibs/flowrate"
11         "gopkg.in/fatih/set.v0"
12
13         "github.com/bytom/bytom/consensus"
14         "github.com/bytom/bytom/errors"
15         msgs "github.com/bytom/bytom/netsync/messages"
16         "github.com/bytom/bytom/protocol/bc"
17         "github.com/bytom/bytom/protocol/bc/types"
18 )
19
20 const (
21         maxKnownTxs           = 32768 // Maximum transactions hashes to keep in the known list (prevent DOS)
22         maxKnownSignatures    = 1024  // Maximum block signatures to keep in the known list (prevent DOS)
23         maxKnownBlocks        = 1024  // Maximum block hashes to keep in the known list (prevent DOS)
24         maxFilterAddressSize  = 50
25         maxFilterAddressCount = 1000
26
27         logModule = "peers"
28 )
29
30 var (
31         errSendStatusMsg = errors.New("send status msg fail")
32         ErrPeerMisbehave = errors.New("peer is misbehave")
33         ErrNoValidPeer   = errors.New("Can't find valid fast sync peer")
34 )
35
36 //BasePeer is the interface for connection level peer
37 type BasePeer interface {
38         Moniker() string
39         Addr() net.Addr
40         ID() string
41         RemoteAddrHost() string
42         ServiceFlag() consensus.ServiceFlag
43         TrafficStatus() (*flowrate.Status, *flowrate.Status)
44         TrySend(byte, interface{}) bool
45         IsLAN() bool
46 }
47
48 //BasePeerSet is the intergace for connection level peer manager
49 type BasePeerSet interface {
50         StopPeerGracefully(string)
51         IsBanned(ip string, level byte, reason string) bool
52 }
53
54 type BroadcastMsg interface {
55         FilterTargetPeers(ps *PeerSet) []string
56         MarkSendRecord(ps *PeerSet, peers []string)
57         GetChan() byte
58         GetMsg() interface{}
59         MsgString() string
60 }
61
62 // PeerInfo indicate peer status snap
63 type PeerInfo struct {
64         ID                  string `json:"peer_id"`
65         Moniker             string `json:"moniker"`
66         RemoteAddr          string `json:"remote_addr"`
67         Height              uint64 `json:"height"`
68         Ping                string `json:"ping"`
69         Duration            string `json:"duration"`
70         TotalSent           int64  `json:"total_sent"`
71         TotalReceived       int64  `json:"total_received"`
72         AverageSentRate     int64  `json:"average_sent_rate"`
73         AverageReceivedRate int64  `json:"average_received_rate"`
74         CurrentSentRate     int64  `json:"current_sent_rate"`
75         CurrentReceivedRate int64  `json:"current_received_rate"`
76 }
77
78 type Peer struct {
79         BasePeer
80         mtx             sync.RWMutex
81         services        consensus.ServiceFlag
82         bestHeight      uint64
83         bestHash        *bc.Hash
84         justifiedHeight uint64
85         justifiedHash   *bc.Hash
86         knownTxs        *set.Set // Set of transaction hashes known to be known by this peer
87         knownBlocks     *set.Set // Set of block hashes known to be known by this peer
88         knownSignatures *set.Set // Set of block signatures known to be known by this peer
89         knownStatus     uint64   // Set of chain status known to be known by this peer
90         filterAdds      *set.Set // Set of addresses that the spv node cares about.
91 }
92
93 func newPeer(basePeer BasePeer) *Peer {
94         return &Peer{
95                 BasePeer:        basePeer,
96                 services:        basePeer.ServiceFlag(),
97                 knownTxs:        set.New(set.ThreadSafe).(*set.Set),
98                 knownBlocks:     set.New(set.ThreadSafe).(*set.Set),
99                 knownSignatures: set.New(set.ThreadSafe).(*set.Set),
100                 filterAdds:      set.New(set.ThreadSafe).(*set.Set),
101         }
102 }
103
104 func (p *Peer) Height() uint64 {
105         p.mtx.RLock()
106         defer p.mtx.RUnlock()
107
108         return p.bestHeight
109 }
110
111 func (p *Peer) JustifiedHeight() uint64 {
112         p.mtx.RLock()
113         defer p.mtx.RUnlock()
114
115         return p.justifiedHeight
116 }
117
118 func (p *Peer) AddFilterAddress(address []byte) {
119         p.mtx.Lock()
120         defer p.mtx.Unlock()
121
122         if p.filterAdds.Size() >= maxFilterAddressCount {
123                 log.WithField("module", logModule).Warn("the count of filter addresses is greater than limit")
124                 return
125         }
126         if len(address) > maxFilterAddressSize {
127                 log.WithField("module", logModule).Warn("the size of filter address is greater than limit")
128                 return
129         }
130
131         p.filterAdds.Add(hex.EncodeToString(address))
132 }
133
134 func (p *Peer) AddFilterAddresses(addresses [][]byte) {
135         if !p.filterAdds.IsEmpty() {
136                 p.filterAdds.Clear()
137         }
138         for _, address := range addresses {
139                 p.AddFilterAddress(address)
140         }
141 }
142
143 func (p *Peer) FilterClear() {
144         p.filterAdds.Clear()
145 }
146
147 func (p *Peer) GetBlockByHeight(height uint64) bool {
148         msg := struct{ msgs.BlockchainMessage }{&msgs.GetBlockMessage{Height: height}}
149         return p.TrySend(msgs.BlockchainChannel, msg)
150 }
151
152 func (p *Peer) GetBlocks(locator []*bc.Hash, stopHash *bc.Hash) bool {
153         msg := struct{ msgs.BlockchainMessage }{msgs.NewGetBlocksMessage(locator, stopHash)}
154         return p.TrySend(msgs.BlockchainChannel, msg)
155 }
156
157 func (p *Peer) GetHeaders(locator []*bc.Hash, stopHash *bc.Hash, skip uint64) bool {
158         msg := struct{ msgs.BlockchainMessage }{msgs.NewGetHeadersMessage(locator, stopHash, skip)}
159         return p.TrySend(msgs.BlockchainChannel, msg)
160 }
161
162 func (p *Peer) GetPeerInfo() *PeerInfo {
163         p.mtx.RLock()
164         defer p.mtx.RUnlock()
165
166         sentStatus, receivedStatus := p.TrafficStatus()
167         ping := sentStatus.Idle - receivedStatus.Idle
168         if receivedStatus.Idle > sentStatus.Idle {
169                 ping = -ping
170         }
171
172         return &PeerInfo{
173                 ID:                  p.ID(),
174                 Moniker:             p.BasePeer.Moniker(),
175                 RemoteAddr:          p.Addr().String(),
176                 Height:              p.bestHeight,
177                 Ping:                ping.String(),
178                 Duration:            sentStatus.Duration.String(),
179                 TotalSent:           sentStatus.Bytes,
180                 TotalReceived:       receivedStatus.Bytes,
181                 AverageSentRate:     sentStatus.AvgRate,
182                 AverageReceivedRate: receivedStatus.AvgRate,
183                 CurrentSentRate:     sentStatus.CurRate,
184                 CurrentReceivedRate: receivedStatus.CurRate,
185         }
186 }
187
188 func (p *Peer) getRelatedTxs(txs []*types.Tx) ([]*types.Tx) {
189         var relatedTxs []*types.Tx
190         for _, tx := range txs {
191                 if p.isRelatedTx(tx) {
192                         relatedTxs = append(relatedTxs, tx)
193                 }
194         }
195         return relatedTxs
196 }
197
198 func (p *Peer) isRelatedTx(tx *types.Tx) bool {
199         for _, input := range tx.Inputs {
200                 switch inp := input.TypedInput.(type) {
201                 case *types.SpendInput:
202                         if p.filterAdds.Has(hex.EncodeToString(inp.ControlProgram)) {
203                                 return true
204                         }
205                 }
206         }
207         for _, output := range tx.Outputs {
208                 if p.filterAdds.Has(hex.EncodeToString(output.ControlProgram)) {
209                         return true
210                 }
211         }
212         return false
213 }
214
215 func (p *Peer) isSPVNode() bool {
216         return !p.services.IsEnable(consensus.SFFullNode)
217 }
218
219 func (p *Peer) MarkBlock(hash *bc.Hash) {
220         p.mtx.Lock()
221         defer p.mtx.Unlock()
222
223         for p.knownBlocks.Size() >= maxKnownBlocks {
224                 p.knownBlocks.Pop()
225         }
226         p.knownBlocks.Add(hash.String())
227 }
228
229 func (p *Peer) markNewStatus(height uint64) {
230         p.mtx.Lock()
231         defer p.mtx.Unlock()
232
233         p.knownStatus = height
234 }
235
236 func (p *Peer) markSign(signature []byte) {
237         p.mtx.Lock()
238         defer p.mtx.Unlock()
239
240         for p.knownSignatures.Size() >= maxKnownSignatures {
241                 p.knownSignatures.Pop()
242         }
243         p.knownSignatures.Add(hex.EncodeToString(signature))
244 }
245
246 func (p *Peer) markTransaction(hash *bc.Hash) {
247         p.mtx.Lock()
248         defer p.mtx.Unlock()
249
250         for p.knownTxs.Size() >= maxKnownTxs {
251                 p.knownTxs.Pop()
252         }
253         p.knownTxs.Add(hash.String())
254 }
255
256 func (p *Peer) SendBlock(block *types.Block) (bool, error) {
257         msg, err := msgs.NewBlockMessage(block)
258         if err != nil {
259                 return false, errors.Wrap(err, "fail on NewBlockMessage")
260         }
261
262         ok := p.TrySend(msgs.BlockchainChannel, struct{ msgs.BlockchainMessage }{msg})
263         if ok {
264                 blcokHash := block.Hash()
265                 p.knownBlocks.Add(blcokHash.String())
266         }
267         return ok, nil
268 }
269
270 func (p *Peer) SendBlocks(blocks []*types.Block) (bool, error) {
271         msg, err := msgs.NewBlocksMessage(blocks)
272         if err != nil {
273                 return false, errors.Wrap(err, "fail on NewBlocksMessage")
274         }
275
276         if ok := p.TrySend(msgs.BlockchainChannel, struct{ msgs.BlockchainMessage }{msg}); !ok {
277                 return ok, nil
278         }
279
280         for _, block := range blocks {
281                 blcokHash := block.Hash()
282                 p.knownBlocks.Add(blcokHash.String())
283         }
284         return true, nil
285 }
286
287 func (p *Peer) SendHeaders(headers []*types.BlockHeader) (bool, error) {
288         msg, err := msgs.NewHeadersMessage(headers)
289         if err != nil {
290                 return false, errors.New("fail on NewHeadersMessage")
291         }
292
293         ok := p.TrySend(msgs.BlockchainChannel, struct{ msgs.BlockchainMessage }{msg})
294         return ok, nil
295 }
296
297 func (p *Peer) SendMerkleBlock(block *types.Block) (bool, error) {
298         msg := msgs.NewMerkleBlockMessage()
299         if err := msg.SetRawBlockHeader(block.BlockHeader); err != nil {
300                 return false, err
301         }
302
303         relatedTxs := p.getRelatedTxs(block.Transactions)
304
305         txHashes, txFlags := types.GetTxMerkleTreeProof(block.Transactions, relatedTxs)
306         if err := msg.SetTxInfo(txHashes, txFlags, relatedTxs); err != nil {
307                 return false, nil
308         }
309
310         ok := p.TrySend(msgs.BlockchainChannel, struct{ msgs.BlockchainMessage }{msg})
311         return ok, nil
312 }
313
314 func (p *Peer) SendTransactions(txs []*types.Tx) error {
315         validTxs := make([]*types.Tx, 0, len(txs))
316         for i, tx := range txs {
317                 if p.isSPVNode() && !p.isRelatedTx(tx) || p.knownTxs.Has(tx.ID.String()) {
318                         continue
319                 }
320
321                 validTxs = append(validTxs, tx)
322                 if len(validTxs) != msgs.TxsMsgMaxTxNum && i != len(txs)-1 {
323                         continue
324                 }
325
326                 msg, err := msgs.NewTransactionsMessage(validTxs)
327                 if err != nil {
328                         return err
329                 }
330
331                 if ok := p.TrySend(msgs.BlockchainChannel, struct{ msgs.BlockchainMessage }{msg}); !ok {
332                         return errors.New("failed to send txs msg")
333                 }
334
335                 for _, validTx := range validTxs {
336                         p.knownTxs.Add(validTx.ID.String())
337                 }
338
339                 validTxs = make([]*types.Tx, 0, len(txs))
340         }
341
342         return nil
343 }
344
345 func (p *Peer) SendStatus(bestHeader, justifiedHeader *types.BlockHeader) error {
346         msg := msgs.NewStatusMessage(bestHeader, justifiedHeader)
347         if ok := p.TrySend(msgs.BlockchainChannel, struct{ msgs.BlockchainMessage }{msg}); !ok {
348                 return errSendStatusMsg
349         }
350         p.markNewStatus(bestHeader.Height)
351         return nil
352 }
353
354 func (p *Peer) SetBestStatus(bestHeight uint64, bestHash *bc.Hash) {
355         p.mtx.Lock()
356         defer p.mtx.Unlock()
357
358         p.bestHeight = bestHeight
359         p.bestHash = bestHash
360 }
361
362 func (p *Peer) SetJustifiedStatus(justifiedHeight uint64, justifiedHash *bc.Hash) {
363         p.mtx.Lock()
364         defer p.mtx.Unlock()
365
366         p.justifiedHeight = justifiedHeight
367         p.justifiedHash = justifiedHash
368 }
369
370 type PeerSet struct {
371         BasePeerSet
372         mtx   sync.RWMutex
373         peers map[string]*Peer
374 }
375
376 // newPeerSet creates a new peer set to track the active participants.
377 func NewPeerSet(basePeerSet BasePeerSet) *PeerSet {
378         return &PeerSet{
379                 BasePeerSet: basePeerSet,
380                 peers:       make(map[string]*Peer),
381         }
382 }
383
384 func (ps *PeerSet) ProcessIllegal(peerID string, level byte, reason string) {
385         ps.mtx.Lock()
386         peer := ps.peers[peerID]
387         ps.mtx.Unlock()
388
389         if peer == nil {
390                 return
391         }
392
393         if banned := ps.IsBanned(peer.RemoteAddrHost(), level, reason); banned {
394                 ps.RemovePeer(peerID)
395         }
396         return
397 }
398
399 func (ps *PeerSet) AddPeer(peer BasePeer) {
400         ps.mtx.Lock()
401         defer ps.mtx.Unlock()
402
403         if _, ok := ps.peers[peer.ID()]; !ok {
404                 ps.peers[peer.ID()] = newPeer(peer)
405                 return
406         }
407         log.WithField("module", logModule).Warning("add existing peer to blockKeeper")
408 }
409
410 func (ps *PeerSet) BestPeer(flag consensus.ServiceFlag) *Peer {
411         ps.mtx.RLock()
412         defer ps.mtx.RUnlock()
413
414         var bestPeer *Peer
415         for _, p := range ps.peers {
416                 if !p.services.IsEnable(flag) {
417                         continue
418                 }
419                 if bestPeer == nil || p.JustifiedHeight() > bestPeer.JustifiedHeight() ||
420                         (p.JustifiedHeight() == bestPeer.JustifiedHeight() && p.bestHeight > bestPeer.bestHeight) ||
421                         (p.JustifiedHeight() == bestPeer.JustifiedHeight() && p.bestHeight == bestPeer.bestHeight && p.IsLAN()) {
422                         bestPeer = p
423                 }
424         }
425         return bestPeer
426 }
427
428 //SendMsg send message to the target peer.
429 func (ps *PeerSet) SendMsg(peerID string, msgChannel byte, msg interface{}) bool {
430         peer := ps.GetPeer(peerID)
431         if peer == nil {
432                 return false
433         }
434
435         ok := peer.TrySend(msgChannel, msg)
436         if !ok {
437                 ps.RemovePeer(peerID)
438         }
439         return ok
440 }
441
442 //BroadcastMsg Broadcast message to the target peers
443 // and mark the message send record
444 func (ps *PeerSet) BroadcastMsg(bm BroadcastMsg) error {
445         //filter target peers
446         peers := bm.FilterTargetPeers(ps)
447
448         //broadcast to target peers
449         peersSuccess := make([]string, 0)
450         for _, peer := range peers {
451                 if ok := ps.SendMsg(peer, bm.GetChan(), bm.GetMsg()); !ok {
452                         log.WithFields(log.Fields{"module": logModule, "peer": peer, "type": reflect.TypeOf(bm.GetMsg()), "message": bm.MsgString()}).Warning("send message to peer error")
453                         continue
454                 }
455                 peersSuccess = append(peersSuccess, peer)
456         }
457
458         //mark the message send record
459         bm.MarkSendRecord(ps, peersSuccess)
460         return nil
461 }
462
463 func (ps *PeerSet) BroadcastNewStatus(bestHeader, justifiedHeader *types.BlockHeader) error {
464         msg := msgs.NewStatusMessage(bestHeader, justifiedHeader)
465         peers := ps.peersWithoutNewStatus(bestHeader.Height)
466         for _, peer := range peers {
467                 if ok := peer.TrySend(msgs.BlockchainChannel, struct{ msgs.BlockchainMessage }{msg}); !ok {
468                         ps.RemovePeer(peer.ID())
469                         continue
470                 }
471
472                 peer.markNewStatus(bestHeader.Height)
473         }
474         return nil
475 }
476
477 func (ps *PeerSet) BroadcastTx(tx *types.Tx) error {
478         msg, err := msgs.NewTransactionMessage(tx)
479         if err != nil {
480                 return errors.Wrap(err, "fail on broadcast tx")
481         }
482
483         peers := ps.peersWithoutTx(&tx.ID)
484         for _, peer := range peers {
485                 if peer.isSPVNode() && !peer.isRelatedTx(tx) {
486                         continue
487                 }
488                 if ok := peer.TrySend(msgs.BlockchainChannel, struct{ msgs.BlockchainMessage }{msg}); !ok {
489                         log.WithFields(log.Fields{
490                                 "module":  logModule,
491                                 "peer":    peer.Addr(),
492                                 "type":    reflect.TypeOf(msg),
493                                 "message": msg.String(),
494                         }).Warning("send message to peer error")
495                         ps.RemovePeer(peer.ID())
496                         continue
497                 }
498                 peer.markTransaction(&tx.ID)
499         }
500         return nil
501 }
502
503 // Peer retrieves the registered peer with the given id.
504 func (ps *PeerSet) GetPeer(id string) *Peer {
505         ps.mtx.RLock()
506         defer ps.mtx.RUnlock()
507         return ps.peers[id]
508 }
509
510 func (ps *PeerSet) GetPeersByHeight(height uint64) []*Peer {
511         ps.mtx.RLock()
512         defer ps.mtx.RUnlock()
513
514         peers := []*Peer{}
515         for _, peer := range ps.peers {
516                 if peer.Height() >= height {
517                         peers = append(peers, peer)
518                 }
519         }
520         return peers
521 }
522
523 func (ps *PeerSet) GetPeerInfos() []*PeerInfo {
524         ps.mtx.RLock()
525         defer ps.mtx.RUnlock()
526
527         result := []*PeerInfo{}
528         for _, peer := range ps.peers {
529                 result = append(result, peer.GetPeerInfo())
530         }
531         return result
532 }
533
534 func (ps *PeerSet) MarkBlock(peerID string, hash *bc.Hash) {
535         peer := ps.GetPeer(peerID)
536         if peer == nil {
537                 return
538         }
539         peer.MarkBlock(hash)
540 }
541
542 func (ps *PeerSet) MarkBlockVerification(peerID string, signature []byte) {
543         peer := ps.GetPeer(peerID)
544         if peer == nil {
545                 return
546         }
547         peer.markSign(signature)
548 }
549
550 func (ps *PeerSet) MarkStatus(peerID string, height uint64) {
551         peer := ps.GetPeer(peerID)
552         if peer == nil {
553                 return
554         }
555         peer.markNewStatus(height)
556 }
557
558 func (ps *PeerSet) MarkTx(peerID string, txHash bc.Hash) {
559         ps.mtx.Lock()
560         peer := ps.peers[peerID]
561         ps.mtx.Unlock()
562
563         if peer == nil {
564                 return
565         }
566         peer.markTransaction(&txHash)
567 }
568
569 func (ps *PeerSet) PeersWithoutBlock(hash bc.Hash) []string {
570         ps.mtx.RLock()
571         defer ps.mtx.RUnlock()
572
573         var peers []string
574         for _, peer := range ps.peers {
575                 if !peer.knownBlocks.Has(hash.String()) {
576                         peers = append(peers, peer.ID())
577                 }
578         }
579         return peers
580 }
581
582 func (ps *PeerSet) PeersWithoutSignature(signature []byte) []string {
583         ps.mtx.RLock()
584         defer ps.mtx.RUnlock()
585
586         var peers []string
587         for _, peer := range ps.peers {
588                 if !peer.knownSignatures.Has(hex.EncodeToString(signature)) {
589                         peers = append(peers, peer.ID())
590                 }
591         }
592         return peers
593 }
594
595 func (ps *PeerSet) peersWithoutNewStatus(height uint64) []*Peer {
596         ps.mtx.RLock()
597         defer ps.mtx.RUnlock()
598
599         var peers []*Peer
600         for _, peer := range ps.peers {
601                 if peer.knownStatus < height {
602                         peers = append(peers, peer)
603                 }
604         }
605         return peers
606 }
607
608 func (ps *PeerSet) peersWithoutTx(hash *bc.Hash) []*Peer {
609         ps.mtx.RLock()
610         defer ps.mtx.RUnlock()
611
612         peers := []*Peer{}
613         for _, peer := range ps.peers {
614                 if !peer.knownTxs.Has(hash.String()) {
615                         peers = append(peers, peer)
616                 }
617         }
618         return peers
619 }
620
621 func (ps *PeerSet) RemovePeer(peerID string) {
622         ps.mtx.Lock()
623         delete(ps.peers, peerID)
624         ps.mtx.Unlock()
625         ps.StopPeerGracefully(peerID)
626 }
627
628 func (ps *PeerSet) SetStatus(peerID string, height uint64, hash *bc.Hash) {
629         peer := ps.GetPeer(peerID)
630         if peer == nil {
631                 return
632         }
633
634         peer.SetBestStatus(height, hash)
635 }
636
637 func (ps *PeerSet) SetJustifiedStatus(peerID string, height uint64, hash *bc.Hash) {
638         peer := ps.GetPeer(peerID)
639         if peer == nil {
640                 return
641         }
642
643         peer.SetJustifiedStatus(height, hash)
644 }