9 log "github.com/sirupsen/logrus"
10 "github.com/tendermint/tmlibs/flowrate"
11 "gopkg.in/fatih/set.v0"
13 "github.com/vapor/consensus"
14 "github.com/vapor/errors"
15 msgs "github.com/vapor/netsync/messages"
16 "github.com/vapor/protocol/bc"
17 "github.com/vapor/protocol/bc/types"
21 maxKnownTxs = 32768 // Maximum transactions hashes to keep in the known list (prevent DOS)
22 maxKnownSignatures = 1024 // Maximum block signatures to keep in the known list (prevent DOS)
23 maxKnownBlocks = 1024 // Maximum block hashes to keep in the known list (prevent DOS)
24 maxFilterAddressSize = 50
25 maxFilterAddressCount = 1000
31 errSendStatusMsg = errors.New("send status msg fail")
32 ErrPeerMisbehave = errors.New("peer is misbehave")
35 //BasePeer is the interface for connection level peer
36 type BasePeer interface {
39 RemoteAddrHost() string
40 ServiceFlag() consensus.ServiceFlag
41 TrafficStatus() (*flowrate.Status, *flowrate.Status)
42 TrySend(byte, interface{}) bool
46 //BasePeerSet is the intergace for connection level peer manager
47 type BasePeerSet interface {
48 StopPeerGracefully(string)
49 IsBanned(ip string, level byte, reason string) bool
52 type BroadcastMsg interface {
53 FilterTargetPeers(ps *PeerSet) []string
54 MarkSendRecord(ps *PeerSet, peers []string)
60 // PeerInfo indicate peer status snap
61 type PeerInfo struct {
62 ID string `json:"peer_id"`
63 RemoteAddr string `json:"remote_addr"`
64 Height uint64 `json:"height"`
65 Ping string `json:"ping"`
66 Duration string `json:"duration"`
67 TotalSent int64 `json:"total_sent"`
68 TotalReceived int64 `json:"total_received"`
69 AverageSentRate int64 `json:"average_sent_rate"`
70 AverageReceivedRate int64 `json:"average_received_rate"`
71 CurrentSentRate int64 `json:"current_sent_rate"`
72 CurrentReceivedRate int64 `json:"current_received_rate"`
78 services consensus.ServiceFlag
81 irreversibleHeight uint64
82 irreversibleHash *bc.Hash
83 knownTxs *set.Set // Set of transaction hashes known to be known by this peer
84 knownBlocks *set.Set // Set of block hashes known to be known by this peer
85 knownSignatures *set.Set // Set of block signatures known to be known by this peer
86 knownStatus uint64 // Set of chain status known to be known by this peer
87 filterAdds *set.Set // Set of addresses that the spv node cares about.
90 func newPeer(basePeer BasePeer) *Peer {
93 services: basePeer.ServiceFlag(),
95 knownBlocks: set.New(),
96 knownSignatures: set.New(),
97 filterAdds: set.New(),
101 func (p *Peer) Height() uint64 {
103 defer p.mtx.RUnlock()
108 func (p *Peer) IrreversibleHeight() uint64 {
110 defer p.mtx.RUnlock()
112 return p.irreversibleHeight
115 func (p *Peer) AddFilterAddress(address []byte) {
119 if p.filterAdds.Size() >= maxFilterAddressCount {
120 log.WithField("module", logModule).Warn("the count of filter addresses is greater than limit")
123 if len(address) > maxFilterAddressSize {
124 log.WithField("module", logModule).Warn("the size of filter address is greater than limit")
128 p.filterAdds.Add(hex.EncodeToString(address))
131 func (p *Peer) AddFilterAddresses(addresses [][]byte) {
132 if !p.filterAdds.IsEmpty() {
135 for _, address := range addresses {
136 p.AddFilterAddress(address)
140 func (p *Peer) FilterClear() {
144 func (p *Peer) GetBlockByHeight(height uint64) bool {
145 msg := struct{ msgs.BlockchainMessage }{&msgs.GetBlockMessage{Height: height}}
146 return p.TrySend(msgs.BlockchainChannel, msg)
149 func (p *Peer) GetBlocks(locator []*bc.Hash, stopHash *bc.Hash) bool {
150 msg := struct{ msgs.BlockchainMessage }{msgs.NewGetBlocksMessage(locator, stopHash)}
151 return p.TrySend(msgs.BlockchainChannel, msg)
154 func (p *Peer) GetHeaders(locator []*bc.Hash, stopHash *bc.Hash, skip uint64) bool {
155 msg := struct{ msgs.BlockchainMessage }{msgs.NewGetHeadersMessage(locator, stopHash, skip)}
156 return p.TrySend(msgs.BlockchainChannel, msg)
159 func (p *Peer) GetPeerInfo() *PeerInfo {
161 defer p.mtx.RUnlock()
163 sentStatus, receivedStatus := p.TrafficStatus()
164 ping := sentStatus.Idle - receivedStatus.Idle
165 if receivedStatus.Idle > sentStatus.Idle {
171 RemoteAddr: p.Addr().String(),
172 Height: p.bestHeight,
174 Duration: sentStatus.Duration.String(),
175 TotalSent: sentStatus.Bytes,
176 TotalReceived: receivedStatus.Bytes,
177 AverageSentRate: sentStatus.AvgRate,
178 AverageReceivedRate: receivedStatus.AvgRate,
179 CurrentSentRate: sentStatus.CurRate,
180 CurrentReceivedRate: receivedStatus.CurRate,
184 func (p *Peer) getRelatedTxAndStatus(txs []*types.Tx, txStatuses *bc.TransactionStatus) ([]*types.Tx, []*bc.TxVerifyResult) {
185 var relatedTxs []*types.Tx
186 var relatedStatuses []*bc.TxVerifyResult
187 for i, tx := range txs {
188 if p.isRelatedTx(tx) {
189 relatedTxs = append(relatedTxs, tx)
190 relatedStatuses = append(relatedStatuses, txStatuses.VerifyStatus[i])
193 return relatedTxs, relatedStatuses
196 func (p *Peer) isRelatedTx(tx *types.Tx) bool {
197 for _, input := range tx.Inputs {
198 switch inp := input.TypedInput.(type) {
199 case *types.SpendInput:
200 if p.filterAdds.Has(hex.EncodeToString(inp.ControlProgram)) {
205 for _, output := range tx.Outputs {
206 if p.filterAdds.Has(hex.EncodeToString(output.ControlProgram())) {
213 func (p *Peer) isSPVNode() bool {
214 return !p.services.IsEnable(consensus.SFFullNode)
217 func (p *Peer) MarkBlock(hash *bc.Hash) {
221 for p.knownBlocks.Size() >= maxKnownBlocks {
224 p.knownBlocks.Add(hash.String())
227 func (p *Peer) markNewStatus(height uint64) {
231 p.knownStatus = height
234 func (p *Peer) markSign(signature []byte) {
238 for p.knownSignatures.Size() >= maxKnownSignatures {
239 p.knownSignatures.Pop()
241 p.knownSignatures.Add(hex.EncodeToString(signature))
244 func (p *Peer) markTransaction(hash *bc.Hash) {
248 for p.knownTxs.Size() >= maxKnownTxs {
251 p.knownTxs.Add(hash.String())
254 func (ps *PeerSet) PeersWithoutBlock(hash bc.Hash) []string {
256 defer ps.mtx.RUnlock()
259 for _, peer := range ps.peers {
260 if !peer.knownBlocks.Has(hash.String()) {
261 peers = append(peers, peer.ID())
267 func (ps *PeerSet) PeersWithoutSign(signature []byte) []string {
269 defer ps.mtx.RUnlock()
272 for _, peer := range ps.peers {
273 if !peer.knownSignatures.Has(hex.EncodeToString(signature)) {
274 peers = append(peers, peer.ID())
280 func (p *Peer) SendBlock(block *types.Block) (bool, error) {
281 msg, err := msgs.NewBlockMessage(block)
283 return false, errors.Wrap(err, "fail on NewBlockMessage")
286 ok := p.TrySend(msgs.BlockchainChannel, struct{ msgs.BlockchainMessage }{msg})
288 blcokHash := block.Hash()
289 p.knownBlocks.Add(blcokHash.String())
294 func (p *Peer) SendBlocks(blocks []*types.Block) (bool, error) {
295 msg, err := msgs.NewBlocksMessage(blocks)
297 return false, errors.Wrap(err, "fail on NewBlocksMessage")
300 if ok := p.TrySend(msgs.BlockchainChannel, struct{ msgs.BlockchainMessage }{msg}); !ok {
304 for _, block := range blocks {
305 blcokHash := block.Hash()
306 p.knownBlocks.Add(blcokHash.String())
311 func (p *Peer) SendHeaders(headers []*types.BlockHeader) (bool, error) {
312 msg, err := msgs.NewHeadersMessage(headers)
314 return false, errors.New("fail on NewHeadersMessage")
317 ok := p.TrySend(msgs.BlockchainChannel, struct{ msgs.BlockchainMessage }{msg})
321 func (p *Peer) SendMerkleBlock(block *types.Block, txStatuses *bc.TransactionStatus) (bool, error) {
322 msg := msgs.NewMerkleBlockMessage()
323 if err := msg.SetRawBlockHeader(block.BlockHeader); err != nil {
327 relatedTxs, relatedStatuses := p.getRelatedTxAndStatus(block.Transactions, txStatuses)
329 txHashes, txFlags := types.GetTxMerkleTreeProof(block.Transactions, relatedTxs)
330 if err := msg.SetTxInfo(txHashes, txFlags, relatedTxs); err != nil {
334 statusHashes := types.GetStatusMerkleTreeProof(txStatuses.VerifyStatus, txFlags)
335 if err := msg.SetStatusInfo(statusHashes, relatedStatuses); err != nil {
339 ok := p.TrySend(msgs.BlockchainChannel, struct{ msgs.BlockchainMessage }{msg})
343 func (p *Peer) SendTransactions(txs []*types.Tx) error {
344 validTxs := make([]*types.Tx, 0, len(txs))
345 for i, tx := range txs {
346 if p.isSPVNode() && !p.isRelatedTx(tx) || p.knownTxs.Has(tx.ID.String()) {
350 validTxs = append(validTxs, tx)
351 if len(validTxs) != msgs.TxsMsgMaxTxNum && i != len(txs)-1 {
355 msg, err := msgs.NewTransactionsMessage(validTxs)
360 if ok := p.TrySend(msgs.BlockchainChannel, struct{ msgs.BlockchainMessage }{msg}); !ok {
361 return errors.New("failed to send txs msg")
364 for _, validTx := range validTxs {
365 p.knownTxs.Add(validTx.ID.String())
368 validTxs = make([]*types.Tx, 0, len(txs))
374 func (p *Peer) SendStatus(bestHeader, irreversibleHeader *types.BlockHeader) error {
375 msg := msgs.NewStatusMessage(bestHeader, irreversibleHeader)
376 if ok := p.TrySend(msgs.BlockchainChannel, struct{ msgs.BlockchainMessage }{msg}); !ok {
377 return errSendStatusMsg
379 p.markNewStatus(bestHeader.Height)
383 func (p *Peer) SetBestStatus(bestHeight uint64, bestHash *bc.Hash) {
387 p.bestHeight = bestHeight
388 p.bestHash = bestHash
391 func (p *Peer) SetIrreversibleStatus(irreversibleHeight uint64, irreversibleHash *bc.Hash) {
395 p.irreversibleHeight = irreversibleHeight
396 p.irreversibleHash = irreversibleHash
399 type PeerSet struct {
402 peers map[string]*Peer
405 // newPeerSet creates a new peer set to track the active participants.
406 func NewPeerSet(basePeerSet BasePeerSet) *PeerSet {
408 BasePeerSet: basePeerSet,
409 peers: make(map[string]*Peer),
413 func (ps *PeerSet) ProcessIllegal(peerID string, level byte, reason string) {
415 peer := ps.peers[peerID]
422 if banned := ps.IsBanned(peer.RemoteAddrHost(), level, reason); banned {
423 ps.RemovePeer(peerID)
428 func (ps *PeerSet) AddPeer(peer BasePeer) {
430 defer ps.mtx.Unlock()
432 if _, ok := ps.peers[peer.ID()]; !ok {
433 ps.peers[peer.ID()] = newPeer(peer)
436 log.WithField("module", logModule).Warning("add existing peer to blockKeeper")
439 func (ps *PeerSet) BestPeer(flag consensus.ServiceFlag) *Peer {
441 defer ps.mtx.RUnlock()
444 for _, p := range ps.peers {
445 if !p.services.IsEnable(flag) {
448 if bestPeer == nil || p.bestHeight > bestPeer.bestHeight || (p.bestHeight == bestPeer.bestHeight && p.IsLAN()) {
455 func (ps *PeerSet) BestIrreversiblePeer(flag consensus.ServiceFlag) *Peer {
457 defer ps.mtx.RUnlock()
460 for _, p := range ps.peers {
461 if !p.services.IsEnable(flag) {
464 if bestPeer == nil || p.irreversibleHeight > bestPeer.irreversibleHeight || (p.irreversibleHeight == bestPeer.irreversibleHeight && p.IsLAN()) {
471 //SendMsg send message to the target peer.
472 func (ps *PeerSet) SendMsg(peerID string, msgChannel byte, msg interface{}) bool {
473 peer := ps.GetPeer(peerID)
478 ok := peer.TrySend(msgChannel, msg)
480 ps.RemovePeer(peerID)
485 //BroadcastMsg Broadcast message to the target peers
486 // and mark the message send record
487 func (ps *PeerSet) BroadcastMsg(bm BroadcastMsg) error {
488 //filter target peers
489 peers := bm.FilterTargetPeers(ps)
491 //broadcast to target peers
492 peersSuccess := make([]string, 0)
493 for _, peer := range peers {
494 if ok := ps.SendMsg(peer, bm.GetChan(), bm.GetMsg()); !ok {
495 log.WithFields(log.Fields{"module": logModule, "peer": peer, "type": reflect.TypeOf(bm.GetMsg()), "message": bm.MsgString()}).Warning("send message to peer error")
498 peersSuccess = append(peersSuccess, peer)
501 //mark the message send record
502 bm.MarkSendRecord(ps, peersSuccess)
506 func (ps *PeerSet) BroadcastNewStatus(bestHeader, irreversibleHeader *types.BlockHeader) error {
507 msg := msgs.NewStatusMessage(bestHeader, irreversibleHeader)
508 peers := ps.peersWithoutNewStatus(bestHeader.Height)
509 for _, peer := range peers {
510 if ok := peer.TrySend(msgs.BlockchainChannel, struct{ msgs.BlockchainMessage }{msg}); !ok {
511 ps.RemovePeer(peer.ID())
515 peer.markNewStatus(bestHeader.Height)
520 func (ps *PeerSet) BroadcastTx(tx *types.Tx) error {
521 msg, err := msgs.NewTransactionMessage(tx)
523 return errors.Wrap(err, "fail on broadcast tx")
526 peers := ps.peersWithoutTx(&tx.ID)
527 for _, peer := range peers {
528 if peer.isSPVNode() && !peer.isRelatedTx(tx) {
531 if ok := peer.TrySend(msgs.BlockchainChannel, struct{ msgs.BlockchainMessage }{msg}); !ok {
532 log.WithFields(log.Fields{
535 "type": reflect.TypeOf(msg),
536 "message": msg.String(),
537 }).Warning("send message to peer error")
538 ps.RemovePeer(peer.ID())
541 peer.markTransaction(&tx.ID)
546 func (ps *PeerSet) ErrorHandler(peerID string, level byte, err error) {
547 if errors.Root(err) == ErrPeerMisbehave {
548 ps.ProcessIllegal(peerID, level, err.Error())
550 ps.RemovePeer(peerID)
554 // Peer retrieves the registered peer with the given id.
555 func (ps *PeerSet) GetPeer(id string) *Peer {
557 defer ps.mtx.RUnlock()
561 func (ps *PeerSet) GetPeerInfos() []*PeerInfo {
563 defer ps.mtx.RUnlock()
565 result := []*PeerInfo{}
566 for _, peer := range ps.peers {
567 result = append(result, peer.GetPeerInfo())
572 func (ps *PeerSet) MarkBlock(peerID string, hash *bc.Hash) {
573 peer := ps.GetPeer(peerID)
580 func (ps *PeerSet) MarkBlockSignature(peerID string, signature []byte) {
581 peer := ps.GetPeer(peerID)
585 peer.markSign(signature)
588 func (ps *PeerSet) MarkStatus(peerID string, height uint64) {
589 peer := ps.GetPeer(peerID)
593 peer.markNewStatus(height)
596 func (ps *PeerSet) MarkTx(peerID string, txHash bc.Hash) {
598 peer := ps.peers[peerID]
604 peer.markTransaction(&txHash)
607 func (ps *PeerSet) peersWithoutBlock(hash *bc.Hash) []*Peer {
609 defer ps.mtx.RUnlock()
612 for _, peer := range ps.peers {
613 if !peer.knownBlocks.Has(hash.String()) {
614 peers = append(peers, peer)
620 func (ps *PeerSet) peersWithoutNewStatus(height uint64) []*Peer {
622 defer ps.mtx.RUnlock()
625 for _, peer := range ps.peers {
626 if peer.knownStatus < height {
627 peers = append(peers, peer)
633 func (ps *PeerSet) peersWithoutTx(hash *bc.Hash) []*Peer {
635 defer ps.mtx.RUnlock()
638 for _, peer := range ps.peers {
639 if !peer.knownTxs.Has(hash.String()) {
640 peers = append(peers, peer)
646 func (ps *PeerSet) RemovePeer(peerID string) {
648 delete(ps.peers, peerID)
650 ps.StopPeerGracefully(peerID)
653 func (ps *PeerSet) SetStatus(peerID string, height uint64, hash *bc.Hash) {
654 peer := ps.GetPeer(peerID)
659 peer.SetBestStatus(height, hash)