9 log "github.com/sirupsen/logrus"
10 "github.com/tendermint/tmlibs/flowrate"
11 "gopkg.in/fatih/set.v0"
13 "github.com/vapor/consensus"
14 "github.com/vapor/errors"
15 msgs "github.com/vapor/netsync/messages"
16 "github.com/vapor/protocol/bc"
17 "github.com/vapor/protocol/bc/types"
21 maxKnownTxs = 32768 // Maximum transactions hashes to keep in the known list (prevent DOS)
22 maxKnownSignatures = 1024 // Maximum block signatures to keep in the known list (prevent DOS)
23 maxKnownBlocks = 1024 // Maximum block hashes to keep in the known list (prevent DOS)
24 maxFilterAddressSize = 50
25 maxFilterAddressCount = 1000
31 errSendStatusMsg = errors.New("send status msg fail")
32 ErrPeerMisbehave = errors.New("peer is misbehave")
33 ErrNoValidPeer = errors.New("Can't find valid fast sync peer")
36 //BasePeer is the interface for connection level peer
37 type BasePeer interface {
40 RemoteAddrHost() string
41 ServiceFlag() consensus.ServiceFlag
42 TrafficStatus() (*flowrate.Status, *flowrate.Status)
43 TrySend(byte, interface{}) bool
47 //BasePeerSet is the intergace for connection level peer manager
48 type BasePeerSet interface {
49 StopPeerGracefully(string)
50 IsBanned(ip string, level byte, reason string) bool
53 type BroadcastMsg interface {
54 FilterTargetPeers(ps *PeerSet) []string
55 MarkSendRecord(ps *PeerSet, peers []string)
61 // PeerInfo indicate peer status snap
62 type PeerInfo struct {
63 ID string `json:"peer_id"`
64 RemoteAddr string `json:"remote_addr"`
65 Height uint64 `json:"height"`
66 Ping string `json:"ping"`
67 Duration string `json:"duration"`
68 TotalSent int64 `json:"total_sent"`
69 TotalReceived int64 `json:"total_received"`
70 AverageSentRate int64 `json:"average_sent_rate"`
71 AverageReceivedRate int64 `json:"average_received_rate"`
72 CurrentSentRate int64 `json:"current_sent_rate"`
73 CurrentReceivedRate int64 `json:"current_received_rate"`
79 services consensus.ServiceFlag
82 irreversibleHeight uint64
83 irreversibleHash *bc.Hash
84 knownTxs *set.Set // Set of transaction hashes known to be known by this peer
85 knownBlocks *set.Set // Set of block hashes known to be known by this peer
86 knownSignatures *set.Set // Set of block signatures known to be known by this peer
87 knownStatus uint64 // Set of chain status known to be known by this peer
88 filterAdds *set.Set // Set of addresses that the spv node cares about.
91 func newPeer(basePeer BasePeer) *Peer {
94 services: basePeer.ServiceFlag(),
96 knownBlocks: set.New(),
97 knownSignatures: set.New(),
98 filterAdds: set.New(),
102 func (p *Peer) Height() uint64 {
104 defer p.mtx.RUnlock()
109 func (p *Peer) IrreversibleHeight() uint64 {
111 defer p.mtx.RUnlock()
113 return p.irreversibleHeight
116 func (p *Peer) AddFilterAddress(address []byte) {
120 if p.filterAdds.Size() >= maxFilterAddressCount {
121 log.WithField("module", logModule).Warn("the count of filter addresses is greater than limit")
124 if len(address) > maxFilterAddressSize {
125 log.WithField("module", logModule).Warn("the size of filter address is greater than limit")
129 p.filterAdds.Add(hex.EncodeToString(address))
132 func (p *Peer) AddFilterAddresses(addresses [][]byte) {
133 if !p.filterAdds.IsEmpty() {
136 for _, address := range addresses {
137 p.AddFilterAddress(address)
141 func (p *Peer) FilterClear() {
145 func (p *Peer) GetBlockByHeight(height uint64) bool {
146 msg := struct{ msgs.BlockchainMessage }{&msgs.GetBlockMessage{Height: height}}
147 return p.TrySend(msgs.BlockchainChannel, msg)
150 func (p *Peer) GetBlocks(locator []*bc.Hash, stopHash *bc.Hash) bool {
151 msg := struct{ msgs.BlockchainMessage }{msgs.NewGetBlocksMessage(locator, stopHash)}
152 return p.TrySend(msgs.BlockchainChannel, msg)
155 func (p *Peer) GetHeaders(locator []*bc.Hash, stopHash *bc.Hash, skip uint64) bool {
156 msg := struct{ msgs.BlockchainMessage }{msgs.NewGetHeadersMessage(locator, stopHash, skip)}
157 return p.TrySend(msgs.BlockchainChannel, msg)
160 func (p *Peer) GetPeerInfo() *PeerInfo {
162 defer p.mtx.RUnlock()
164 sentStatus, receivedStatus := p.TrafficStatus()
165 ping := sentStatus.Idle - receivedStatus.Idle
166 if receivedStatus.Idle > sentStatus.Idle {
172 RemoteAddr: p.Addr().String(),
173 Height: p.bestHeight,
175 Duration: sentStatus.Duration.String(),
176 TotalSent: sentStatus.Bytes,
177 TotalReceived: receivedStatus.Bytes,
178 AverageSentRate: sentStatus.AvgRate,
179 AverageReceivedRate: receivedStatus.AvgRate,
180 CurrentSentRate: sentStatus.CurRate,
181 CurrentReceivedRate: receivedStatus.CurRate,
185 func (p *Peer) getRelatedTxAndStatus(txs []*types.Tx, txStatuses *bc.TransactionStatus) ([]*types.Tx, []*bc.TxVerifyResult) {
186 var relatedTxs []*types.Tx
187 var relatedStatuses []*bc.TxVerifyResult
188 for i, tx := range txs {
189 if p.isRelatedTx(tx) {
190 relatedTxs = append(relatedTxs, tx)
191 relatedStatuses = append(relatedStatuses, txStatuses.VerifyStatus[i])
194 return relatedTxs, relatedStatuses
197 func (p *Peer) isRelatedTx(tx *types.Tx) bool {
198 for _, input := range tx.Inputs {
199 switch inp := input.TypedInput.(type) {
200 case *types.SpendInput:
201 if p.filterAdds.Has(hex.EncodeToString(inp.ControlProgram)) {
206 for _, output := range tx.Outputs {
207 if p.filterAdds.Has(hex.EncodeToString(output.ControlProgram())) {
214 func (p *Peer) isSPVNode() bool {
215 return !p.services.IsEnable(consensus.SFFullNode)
218 func (p *Peer) MarkBlock(hash *bc.Hash) {
222 for p.knownBlocks.Size() >= maxKnownBlocks {
225 p.knownBlocks.Add(hash.String())
228 func (p *Peer) markNewStatus(height uint64) {
232 p.knownStatus = height
235 func (p *Peer) markSign(signature []byte) {
239 for p.knownSignatures.Size() >= maxKnownSignatures {
240 p.knownSignatures.Pop()
242 p.knownSignatures.Add(hex.EncodeToString(signature))
245 func (p *Peer) markTransaction(hash *bc.Hash) {
249 for p.knownTxs.Size() >= maxKnownTxs {
252 p.knownTxs.Add(hash.String())
255 func (p *Peer) SendBlock(block *types.Block) (bool, error) {
256 msg, err := msgs.NewBlockMessage(block)
258 return false, errors.Wrap(err, "fail on NewBlockMessage")
261 ok := p.TrySend(msgs.BlockchainChannel, struct{ msgs.BlockchainMessage }{msg})
263 blcokHash := block.Hash()
264 p.knownBlocks.Add(blcokHash.String())
269 func (p *Peer) SendBlocks(blocks []*types.Block) (bool, error) {
270 msg, err := msgs.NewBlocksMessage(blocks)
272 return false, errors.Wrap(err, "fail on NewBlocksMessage")
275 if ok := p.TrySend(msgs.BlockchainChannel, struct{ msgs.BlockchainMessage }{msg}); !ok {
279 for _, block := range blocks {
280 blcokHash := block.Hash()
281 p.knownBlocks.Add(blcokHash.String())
286 func (p *Peer) SendHeaders(headers []*types.BlockHeader) (bool, error) {
287 msg, err := msgs.NewHeadersMessage(headers)
289 return false, errors.New("fail on NewHeadersMessage")
292 ok := p.TrySend(msgs.BlockchainChannel, struct{ msgs.BlockchainMessage }{msg})
296 func (p *Peer) SendMerkleBlock(block *types.Block, txStatuses *bc.TransactionStatus) (bool, error) {
297 msg := msgs.NewMerkleBlockMessage()
298 if err := msg.SetRawBlockHeader(block.BlockHeader); err != nil {
302 relatedTxs, relatedStatuses := p.getRelatedTxAndStatus(block.Transactions, txStatuses)
304 txHashes, txFlags := types.GetTxMerkleTreeProof(block.Transactions, relatedTxs)
305 if err := msg.SetTxInfo(txHashes, txFlags, relatedTxs); err != nil {
309 statusHashes := types.GetStatusMerkleTreeProof(txStatuses.VerifyStatus, txFlags)
310 if err := msg.SetStatusInfo(statusHashes, relatedStatuses); err != nil {
314 ok := p.TrySend(msgs.BlockchainChannel, struct{ msgs.BlockchainMessage }{msg})
318 func (p *Peer) SendTransactions(txs []*types.Tx) error {
319 validTxs := make([]*types.Tx, 0, len(txs))
320 for i, tx := range txs {
321 if p.isSPVNode() && !p.isRelatedTx(tx) || p.knownTxs.Has(tx.ID.String()) {
325 validTxs = append(validTxs, tx)
326 if len(validTxs) != msgs.TxsMsgMaxTxNum && i != len(txs)-1 {
330 msg, err := msgs.NewTransactionsMessage(validTxs)
335 if ok := p.TrySend(msgs.BlockchainChannel, struct{ msgs.BlockchainMessage }{msg}); !ok {
336 return errors.New("failed to send txs msg")
339 for _, validTx := range validTxs {
340 p.knownTxs.Add(validTx.ID.String())
343 validTxs = make([]*types.Tx, 0, len(txs))
349 func (p *Peer) SendStatus(bestHeader, irreversibleHeader *types.BlockHeader) error {
350 msg := msgs.NewStatusMessage(bestHeader, irreversibleHeader)
351 if ok := p.TrySend(msgs.BlockchainChannel, struct{ msgs.BlockchainMessage }{msg}); !ok {
352 return errSendStatusMsg
354 p.markNewStatus(bestHeader.Height)
358 func (p *Peer) SetBestStatus(bestHeight uint64, bestHash *bc.Hash) {
362 p.bestHeight = bestHeight
363 p.bestHash = bestHash
366 func (p *Peer) SetIrreversibleStatus(irreversibleHeight uint64, irreversibleHash *bc.Hash) {
370 p.irreversibleHeight = irreversibleHeight
371 p.irreversibleHash = irreversibleHash
374 type PeerSet struct {
377 peers map[string]*Peer
380 // newPeerSet creates a new peer set to track the active participants.
381 func NewPeerSet(basePeerSet BasePeerSet) *PeerSet {
383 BasePeerSet: basePeerSet,
384 peers: make(map[string]*Peer),
388 func (ps *PeerSet) ProcessIllegal(peerID string, level byte, reason string) {
390 peer := ps.peers[peerID]
397 if banned := ps.IsBanned(peer.RemoteAddrHost(), level, reason); banned {
398 ps.RemovePeer(peerID)
403 func (ps *PeerSet) AddPeer(peer BasePeer) {
405 defer ps.mtx.Unlock()
407 if _, ok := ps.peers[peer.ID()]; !ok {
408 ps.peers[peer.ID()] = newPeer(peer)
411 log.WithField("module", logModule).Warning("add existing peer to blockKeeper")
414 func (ps *PeerSet) BestPeer(flag consensus.ServiceFlag) *Peer {
416 defer ps.mtx.RUnlock()
419 for _, p := range ps.peers {
420 if !p.services.IsEnable(flag) {
423 if bestPeer == nil || p.bestHeight > bestPeer.bestHeight || (p.bestHeight == bestPeer.bestHeight && p.IsLAN()) {
430 func (ps *PeerSet) BestIrreversiblePeer(flag consensus.ServiceFlag) *Peer {
432 defer ps.mtx.RUnlock()
435 for _, p := range ps.peers {
436 if !p.services.IsEnable(flag) {
439 if bestPeer == nil || p.irreversibleHeight > bestPeer.irreversibleHeight || (p.irreversibleHeight == bestPeer.irreversibleHeight && p.IsLAN()) {
446 //SendMsg send message to the target peer.
447 func (ps *PeerSet) SendMsg(peerID string, msgChannel byte, msg interface{}) bool {
448 peer := ps.GetPeer(peerID)
453 ok := peer.TrySend(msgChannel, msg)
455 ps.RemovePeer(peerID)
460 //BroadcastMsg Broadcast message to the target peers
461 // and mark the message send record
462 func (ps *PeerSet) BroadcastMsg(bm BroadcastMsg) error {
463 //filter target peers
464 peers := bm.FilterTargetPeers(ps)
466 //broadcast to target peers
467 peersSuccess := make([]string, 0)
468 for _, peer := range peers {
469 if ok := ps.SendMsg(peer, bm.GetChan(), bm.GetMsg()); !ok {
470 log.WithFields(log.Fields{"module": logModule, "peer": peer, "type": reflect.TypeOf(bm.GetMsg()), "message": bm.MsgString()}).Warning("send message to peer error")
473 peersSuccess = append(peersSuccess, peer)
476 //mark the message send record
477 bm.MarkSendRecord(ps, peersSuccess)
481 func (ps *PeerSet) BroadcastNewStatus(bestHeader, irreversibleHeader *types.BlockHeader) error {
482 msg := msgs.NewStatusMessage(bestHeader, irreversibleHeader)
483 peers := ps.peersWithoutNewStatus(bestHeader.Height)
484 for _, peer := range peers {
485 if ok := peer.TrySend(msgs.BlockchainChannel, struct{ msgs.BlockchainMessage }{msg}); !ok {
486 ps.RemovePeer(peer.ID())
490 peer.markNewStatus(bestHeader.Height)
495 func (ps *PeerSet) BroadcastTx(tx *types.Tx) error {
496 msg, err := msgs.NewTransactionMessage(tx)
498 return errors.Wrap(err, "fail on broadcast tx")
501 peers := ps.peersWithoutTx(&tx.ID)
502 for _, peer := range peers {
503 if peer.isSPVNode() && !peer.isRelatedTx(tx) {
506 if ok := peer.TrySend(msgs.BlockchainChannel, struct{ msgs.BlockchainMessage }{msg}); !ok {
507 log.WithFields(log.Fields{
510 "type": reflect.TypeOf(msg),
511 "message": msg.String(),
512 }).Warning("send message to peer error")
513 ps.RemovePeer(peer.ID())
516 peer.markTransaction(&tx.ID)
521 // Peer retrieves the registered peer with the given id.
522 func (ps *PeerSet) GetPeer(id string) *Peer {
524 defer ps.mtx.RUnlock()
528 func (ps *PeerSet) GetPeersByHeight(height uint64) []*Peer {
530 defer ps.mtx.RUnlock()
533 for _, peer := range ps.peers {
534 if peer.Height() >= height {
535 peers = append(peers, peer)
541 func (ps *PeerSet) GetPeerInfos() []*PeerInfo {
543 defer ps.mtx.RUnlock()
545 result := []*PeerInfo{}
546 for _, peer := range ps.peers {
547 result = append(result, peer.GetPeerInfo())
552 func (ps *PeerSet) MarkBlock(peerID string, hash *bc.Hash) {
553 peer := ps.GetPeer(peerID)
560 func (ps *PeerSet) MarkBlockSignature(peerID string, signature []byte) {
561 peer := ps.GetPeer(peerID)
565 peer.markSign(signature)
568 func (ps *PeerSet) MarkStatus(peerID string, height uint64) {
569 peer := ps.GetPeer(peerID)
573 peer.markNewStatus(height)
576 func (ps *PeerSet) MarkTx(peerID string, txHash bc.Hash) {
578 peer := ps.peers[peerID]
584 peer.markTransaction(&txHash)
587 func (ps *PeerSet) PeersWithoutBlock(hash bc.Hash) []string {
589 defer ps.mtx.RUnlock()
592 for _, peer := range ps.peers {
593 if !peer.knownBlocks.Has(hash.String()) {
594 peers = append(peers, peer.ID())
600 func (ps *PeerSet) PeersWithoutSignature(signature []byte) []string {
602 defer ps.mtx.RUnlock()
605 for _, peer := range ps.peers {
606 if !peer.knownSignatures.Has(hex.EncodeToString(signature)) {
607 peers = append(peers, peer.ID())
613 func (ps *PeerSet) peersWithoutNewStatus(height uint64) []*Peer {
615 defer ps.mtx.RUnlock()
618 for _, peer := range ps.peers {
619 if peer.knownStatus < height {
620 peers = append(peers, peer)
626 func (ps *PeerSet) peersWithoutTx(hash *bc.Hash) []*Peer {
628 defer ps.mtx.RUnlock()
631 for _, peer := range ps.peers {
632 if !peer.knownTxs.Has(hash.String()) {
633 peers = append(peers, peer)
639 func (ps *PeerSet) RemovePeer(peerID string) {
641 delete(ps.peers, peerID)
643 ps.StopPeerGracefully(peerID)
646 func (ps *PeerSet) SetStatus(peerID string, height uint64, hash *bc.Hash) {
647 peer := ps.GetPeer(peerID)
652 peer.SetBestStatus(height, hash)
655 func (ps *PeerSet) SetIrreversibleStatus(peerID string, height uint64, hash *bc.Hash) {
656 peer := ps.GetPeer(peerID)
661 peer.SetIrreversibleStatus(height, hash)