9 log "github.com/sirupsen/logrus"
10 "github.com/tendermint/tmlibs/flowrate"
11 "gopkg.in/fatih/set.v0"
13 "github.com/vapor/consensus"
14 "github.com/vapor/errors"
15 msgs "github.com/vapor/netsync/messages"
16 "github.com/vapor/protocol/bc"
17 "github.com/vapor/protocol/bc/types"
21 maxKnownTxs = 32768 // Maximum transactions hashes to keep in the known list (prevent DOS)
22 maxKnownSignatures = 1024 // Maximum block signatures to keep in the known list (prevent DOS)
23 maxKnownBlocks = 1024 // Maximum block hashes to keep in the known list (prevent DOS)
24 maxFilterAddressSize = 50
25 maxFilterAddressCount = 1000
31 errSendStatusMsg = errors.New("send status msg fail")
32 ErrPeerMisbehave = errors.New("peer is misbehave")
33 ErrNoValidPeer = errors.New("Can't find valid fast sync peer")
36 //BasePeer is the interface for connection level peer
37 type BasePeer interface {
40 RemoteAddrHost() string
41 ServiceFlag() consensus.ServiceFlag
42 TrafficStatus() (*flowrate.Status, *flowrate.Status)
43 TrySend(byte, interface{}) bool
47 //BasePeerSet is the intergace for connection level peer manager
48 type BasePeerSet interface {
49 StopPeerGracefully(string)
50 IsBanned(ip string, level byte, reason string) bool
53 type BroadcastMsg interface {
54 FilterTargetPeers(ps *PeerSet) []string
55 MarkSendRecord(ps *PeerSet, peers []string)
61 // PeerInfo indicate peer status snap
62 type PeerInfo struct {
63 ID string `json:"peer_id"`
64 RemoteAddr string `json:"remote_addr"`
65 Height uint64 `json:"height"`
66 Ping string `json:"ping"`
67 Duration string `json:"duration"`
68 TotalSent int64 `json:"total_sent"`
69 TotalReceived int64 `json:"total_received"`
70 AverageSentRate int64 `json:"average_sent_rate"`
71 AverageReceivedRate int64 `json:"average_received_rate"`
72 CurrentSentRate int64 `json:"current_sent_rate"`
73 CurrentReceivedRate int64 `json:"current_received_rate"`
79 services consensus.ServiceFlag
82 irreversibleHeight uint64
83 irreversibleHash *bc.Hash
84 knownTxs *set.Set // Set of transaction hashes known to be known by this peer
85 knownBlocks *set.Set // Set of block hashes known to be known by this peer
86 knownSignatures *set.Set // Set of block signatures known to be known by this peer
87 knownStatus uint64 // Set of chain status known to be known by this peer
88 filterAdds *set.Set // Set of addresses that the spv node cares about.
91 func newPeer(basePeer BasePeer) *Peer {
94 services: basePeer.ServiceFlag(),
96 knownBlocks: set.New(),
97 knownSignatures: set.New(),
98 filterAdds: set.New(),
102 func (p *Peer) Height() uint64 {
104 defer p.mtx.RUnlock()
109 func (p *Peer) IrreversibleHeight() uint64 {
111 defer p.mtx.RUnlock()
113 return p.irreversibleHeight
116 func (p *Peer) AddFilterAddress(address []byte) {
120 if p.filterAdds.Size() >= maxFilterAddressCount {
121 log.WithField("module", logModule).Warn("the count of filter addresses is greater than limit")
124 if len(address) > maxFilterAddressSize {
125 log.WithField("module", logModule).Warn("the size of filter address is greater than limit")
129 p.filterAdds.Add(hex.EncodeToString(address))
132 func (p *Peer) AddFilterAddresses(addresses [][]byte) {
133 if !p.filterAdds.IsEmpty() {
136 for _, address := range addresses {
137 p.AddFilterAddress(address)
141 func (p *Peer) FilterClear() {
145 func (p *Peer) GetBlockByHeight(height uint64) bool {
146 msg := struct{ msgs.BlockchainMessage }{&msgs.GetBlockMessage{Height: height}}
147 return p.TrySend(msgs.BlockchainChannel, msg)
150 func (p *Peer) GetBlocks(locator []*bc.Hash, stopHash *bc.Hash) bool {
151 msg := struct{ msgs.BlockchainMessage }{msgs.NewGetBlocksMessage(locator, stopHash)}
152 return p.TrySend(msgs.BlockchainChannel, msg)
155 func (p *Peer) GetHeaders(locator []*bc.Hash, stopHash *bc.Hash, skip uint64) bool {
156 msg := struct{ msgs.BlockchainMessage }{msgs.NewGetHeadersMessage(locator, stopHash, skip)}
157 return p.TrySend(msgs.BlockchainChannel, msg)
160 func (p *Peer) GetPeerInfo() *PeerInfo {
162 defer p.mtx.RUnlock()
164 sentStatus, receivedStatus := p.TrafficStatus()
165 ping := sentStatus.Idle - receivedStatus.Idle
166 if receivedStatus.Idle > sentStatus.Idle {
172 RemoteAddr: p.Addr().String(),
173 Height: p.bestHeight,
175 Duration: sentStatus.Duration.String(),
176 TotalSent: sentStatus.Bytes,
177 TotalReceived: receivedStatus.Bytes,
178 AverageSentRate: sentStatus.AvgRate,
179 AverageReceivedRate: receivedStatus.AvgRate,
180 CurrentSentRate: sentStatus.CurRate,
181 CurrentReceivedRate: receivedStatus.CurRate,
185 func (p *Peer) getRelatedTxAndStatus(txs []*types.Tx, txStatuses *bc.TransactionStatus) ([]*types.Tx, []*bc.TxVerifyResult) {
186 var relatedTxs []*types.Tx
187 var relatedStatuses []*bc.TxVerifyResult
188 for i, tx := range txs {
189 if p.isRelatedTx(tx) {
190 relatedTxs = append(relatedTxs, tx)
191 relatedStatuses = append(relatedStatuses, txStatuses.VerifyStatus[i])
194 return relatedTxs, relatedStatuses
197 func (p *Peer) isRelatedTx(tx *types.Tx) bool {
198 for _, input := range tx.Inputs {
199 switch inp := input.TypedInput.(type) {
200 case *types.SpendInput:
201 if p.filterAdds.Has(hex.EncodeToString(inp.ControlProgram)) {
206 for _, output := range tx.Outputs {
207 if p.filterAdds.Has(hex.EncodeToString(output.ControlProgram())) {
214 func (p *Peer) isSPVNode() bool {
215 return !p.services.IsEnable(consensus.SFFullNode)
218 func (p *Peer) MarkBlock(hash *bc.Hash) {
222 for p.knownBlocks.Size() >= maxKnownBlocks {
225 p.knownBlocks.Add(hash.String())
228 func (p *Peer) markNewStatus(height uint64) {
232 p.knownStatus = height
235 func (p *Peer) markSign(signature []byte) {
239 for p.knownSignatures.Size() >= maxKnownSignatures {
240 p.knownSignatures.Pop()
242 p.knownSignatures.Add(hex.EncodeToString(signature))
245 func (p *Peer) markTransaction(hash *bc.Hash) {
249 for p.knownTxs.Size() >= maxKnownTxs {
252 p.knownTxs.Add(hash.String())
255 func (ps *PeerSet) PeersWithoutBlock(hash bc.Hash) []string {
257 defer ps.mtx.RUnlock()
260 for _, peer := range ps.peers {
261 if !peer.knownBlocks.Has(hash.String()) {
262 peers = append(peers, peer.ID())
268 func (ps *PeerSet) PeersWithoutSign(signature []byte) []string {
270 defer ps.mtx.RUnlock()
273 for _, peer := range ps.peers {
274 if !peer.knownSignatures.Has(hex.EncodeToString(signature)) {
275 peers = append(peers, peer.ID())
281 func (p *Peer) SendBlock(block *types.Block) (bool, error) {
282 msg, err := msgs.NewBlockMessage(block)
284 return false, errors.Wrap(err, "fail on NewBlockMessage")
287 ok := p.TrySend(msgs.BlockchainChannel, struct{ msgs.BlockchainMessage }{msg})
289 blcokHash := block.Hash()
290 p.knownBlocks.Add(blcokHash.String())
295 func (p *Peer) SendBlocks(blocks []*types.Block) (bool, error) {
296 msg, err := msgs.NewBlocksMessage(blocks)
298 return false, errors.Wrap(err, "fail on NewBlocksMessage")
301 if ok := p.TrySend(msgs.BlockchainChannel, struct{ msgs.BlockchainMessage }{msg}); !ok {
305 for _, block := range blocks {
306 blcokHash := block.Hash()
307 p.knownBlocks.Add(blcokHash.String())
312 func (p *Peer) SendHeaders(headers []*types.BlockHeader) (bool, error) {
313 msg, err := msgs.NewHeadersMessage(headers)
315 return false, errors.New("fail on NewHeadersMessage")
318 ok := p.TrySend(msgs.BlockchainChannel, struct{ msgs.BlockchainMessage }{msg})
322 func (p *Peer) SendMerkleBlock(block *types.Block, txStatuses *bc.TransactionStatus) (bool, error) {
323 msg := msgs.NewMerkleBlockMessage()
324 if err := msg.SetRawBlockHeader(block.BlockHeader); err != nil {
328 relatedTxs, relatedStatuses := p.getRelatedTxAndStatus(block.Transactions, txStatuses)
330 txHashes, txFlags := types.GetTxMerkleTreeProof(block.Transactions, relatedTxs)
331 if err := msg.SetTxInfo(txHashes, txFlags, relatedTxs); err != nil {
335 statusHashes := types.GetStatusMerkleTreeProof(txStatuses.VerifyStatus, txFlags)
336 if err := msg.SetStatusInfo(statusHashes, relatedStatuses); err != nil {
340 ok := p.TrySend(msgs.BlockchainChannel, struct{ msgs.BlockchainMessage }{msg})
344 func (p *Peer) SendTransactions(txs []*types.Tx) error {
345 validTxs := make([]*types.Tx, 0, len(txs))
346 for i, tx := range txs {
347 if p.isSPVNode() && !p.isRelatedTx(tx) || p.knownTxs.Has(tx.ID.String()) {
351 validTxs = append(validTxs, tx)
352 if len(validTxs) != msgs.TxsMsgMaxTxNum && i != len(txs)-1 {
356 msg, err := msgs.NewTransactionsMessage(validTxs)
361 if ok := p.TrySend(msgs.BlockchainChannel, struct{ msgs.BlockchainMessage }{msg}); !ok {
362 return errors.New("failed to send txs msg")
365 for _, validTx := range validTxs {
366 p.knownTxs.Add(validTx.ID.String())
369 validTxs = make([]*types.Tx, 0, len(txs))
375 func (p *Peer) SendStatus(bestHeader, irreversibleHeader *types.BlockHeader) error {
376 msg := msgs.NewStatusMessage(bestHeader, irreversibleHeader)
377 if ok := p.TrySend(msgs.BlockchainChannel, struct{ msgs.BlockchainMessage }{msg}); !ok {
378 return errSendStatusMsg
380 p.markNewStatus(bestHeader.Height)
384 func (p *Peer) SetBestStatus(bestHeight uint64, bestHash *bc.Hash) {
388 p.bestHeight = bestHeight
389 p.bestHash = bestHash
392 func (p *Peer) SetIrreversibleStatus(irreversibleHeight uint64, irreversibleHash *bc.Hash) {
396 p.irreversibleHeight = irreversibleHeight
397 p.irreversibleHash = irreversibleHash
400 type PeerSet struct {
403 peers map[string]*Peer
406 // newPeerSet creates a new peer set to track the active participants.
407 func NewPeerSet(basePeerSet BasePeerSet) *PeerSet {
409 BasePeerSet: basePeerSet,
410 peers: make(map[string]*Peer),
414 func (ps *PeerSet) ProcessIllegal(peerID string, level byte, reason string) {
416 peer := ps.peers[peerID]
423 if banned := ps.IsBanned(peer.RemoteAddrHost(), level, reason); banned {
424 ps.RemovePeer(peerID)
429 func (ps *PeerSet) AddPeer(peer BasePeer) {
431 defer ps.mtx.Unlock()
433 if _, ok := ps.peers[peer.ID()]; !ok {
434 ps.peers[peer.ID()] = newPeer(peer)
437 log.WithField("module", logModule).Warning("add existing peer to blockKeeper")
440 func (ps *PeerSet) BestPeer(flag consensus.ServiceFlag) *Peer {
442 defer ps.mtx.RUnlock()
445 for _, p := range ps.peers {
446 if !p.services.IsEnable(flag) {
449 if bestPeer == nil || p.bestHeight > bestPeer.bestHeight || (p.bestHeight == bestPeer.bestHeight && p.IsLAN()) {
456 func (ps *PeerSet) BestIrreversiblePeer(flag consensus.ServiceFlag) *Peer {
458 defer ps.mtx.RUnlock()
461 for _, p := range ps.peers {
462 if !p.services.IsEnable(flag) {
465 if bestPeer == nil || p.irreversibleHeight > bestPeer.irreversibleHeight || (p.irreversibleHeight == bestPeer.irreversibleHeight && p.IsLAN()) {
472 //SendMsg send message to the target peer.
473 func (ps *PeerSet) SendMsg(peerID string, msgChannel byte, msg interface{}) bool {
474 peer := ps.GetPeer(peerID)
479 ok := peer.TrySend(msgChannel, msg)
481 ps.RemovePeer(peerID)
486 //BroadcastMsg Broadcast message to the target peers
487 // and mark the message send record
488 func (ps *PeerSet) BroadcastMsg(bm BroadcastMsg) error {
489 //filter target peers
490 peers := bm.FilterTargetPeers(ps)
492 //broadcast to target peers
493 peersSuccess := make([]string, 0)
494 for _, peer := range peers {
495 if ok := ps.SendMsg(peer, bm.GetChan(), bm.GetMsg()); !ok {
496 log.WithFields(log.Fields{"module": logModule, "peer": peer, "type": reflect.TypeOf(bm.GetMsg()), "message": bm.MsgString()}).Warning("send message to peer error")
499 peersSuccess = append(peersSuccess, peer)
502 //mark the message send record
503 bm.MarkSendRecord(ps, peersSuccess)
507 func (ps *PeerSet) BroadcastNewStatus(bestHeader, irreversibleHeader *types.BlockHeader) error {
508 msg := msgs.NewStatusMessage(bestHeader, irreversibleHeader)
509 peers := ps.peersWithoutNewStatus(bestHeader.Height)
510 for _, peer := range peers {
511 if ok := peer.TrySend(msgs.BlockchainChannel, struct{ msgs.BlockchainMessage }{msg}); !ok {
512 ps.RemovePeer(peer.ID())
516 peer.markNewStatus(bestHeader.Height)
521 func (ps *PeerSet) BroadcastTx(tx *types.Tx) error {
522 msg, err := msgs.NewTransactionMessage(tx)
524 return errors.Wrap(err, "fail on broadcast tx")
527 peers := ps.peersWithoutTx(&tx.ID)
528 for _, peer := range peers {
529 if peer.isSPVNode() && !peer.isRelatedTx(tx) {
532 if ok := peer.TrySend(msgs.BlockchainChannel, struct{ msgs.BlockchainMessage }{msg}); !ok {
533 log.WithFields(log.Fields{
536 "type": reflect.TypeOf(msg),
537 "message": msg.String(),
538 }).Warning("send message to peer error")
539 ps.RemovePeer(peer.ID())
542 peer.markTransaction(&tx.ID)
547 func (ps *PeerSet) ErrorHandler(peerID string, level byte, err error) {
548 if errors.Root(err) == ErrPeerMisbehave {
549 ps.ProcessIllegal(peerID, level, err.Error())
551 ps.RemovePeer(peerID)
555 // Peer retrieves the registered peer with the given id.
556 func (ps *PeerSet) GetPeer(id string) *Peer {
558 defer ps.mtx.RUnlock()
562 func (ps *PeerSet) GetPeersByHeight(height uint64) []*Peer {
564 defer ps.mtx.RUnlock()
567 for _, peer := range ps.peers {
568 if peer.Height() >= height {
569 peers = append(peers, peer)
575 func (ps *PeerSet) GetPeerInfos() []*PeerInfo {
577 defer ps.mtx.RUnlock()
579 result := []*PeerInfo{}
580 for _, peer := range ps.peers {
581 result = append(result, peer.GetPeerInfo())
586 func (ps *PeerSet) MarkBlock(peerID string, hash *bc.Hash) {
587 peer := ps.GetPeer(peerID)
594 func (ps *PeerSet) MarkBlockSignature(peerID string, signature []byte) {
595 peer := ps.GetPeer(peerID)
599 peer.markSign(signature)
602 func (ps *PeerSet) MarkStatus(peerID string, height uint64) {
603 peer := ps.GetPeer(peerID)
607 peer.markNewStatus(height)
610 func (ps *PeerSet) MarkTx(peerID string, txHash bc.Hash) {
612 peer := ps.peers[peerID]
618 peer.markTransaction(&txHash)
621 func (ps *PeerSet) peersWithoutBlock(hash *bc.Hash) []*Peer {
623 defer ps.mtx.RUnlock()
626 for _, peer := range ps.peers {
627 if !peer.knownBlocks.Has(hash.String()) {
628 peers = append(peers, peer)
634 func (ps *PeerSet) peersWithoutNewStatus(height uint64) []*Peer {
636 defer ps.mtx.RUnlock()
639 for _, peer := range ps.peers {
640 if peer.knownStatus < height {
641 peers = append(peers, peer)
647 func (ps *PeerSet) peersWithoutTx(hash *bc.Hash) []*Peer {
649 defer ps.mtx.RUnlock()
652 for _, peer := range ps.peers {
653 if !peer.knownTxs.Has(hash.String()) {
654 peers = append(peers, peer)
660 func (ps *PeerSet) RemovePeer(peerID string) {
662 delete(ps.peers, peerID)
664 ps.StopPeerGracefully(peerID)
667 func (ps *PeerSet) SetStatus(peerID string, height uint64, hash *bc.Hash) {
668 peer := ps.GetPeer(peerID)
673 peer.SetBestStatus(height, hash)
676 func (ps *PeerSet) SetIrreversibleStatus(peerID string, height uint64, hash *bc.Hash) {
677 peer := ps.GetPeer(peerID)
682 peer.SetIrreversibleStatus(height, hash)
685 func (ps *PeerSet) Size() int {
687 defer ps.mtx.RUnlock()