9 log "github.com/sirupsen/logrus"
10 "github.com/tendermint/tmlibs/flowrate"
11 "gopkg.in/fatih/set.v0"
13 "github.com/vapor/consensus"
14 "github.com/vapor/errors"
15 msgs "github.com/vapor/netsync/messages"
16 "github.com/vapor/p2p/trust"
17 "github.com/vapor/protocol/bc"
18 "github.com/vapor/protocol/bc/types"
22 maxKnownTxs = 32768 // Maximum transactions hashes to keep in the known list (prevent DOS)
23 maxKnownSignatures = 1024 // Maximum block signatures to keep in the known list (prevent DOS)
24 maxKnownBlocks = 1024 // Maximum block hashes to keep in the known list (prevent DOS)
25 defaultBanThreshold = uint32(100)
26 maxFilterAddressSize = 50
27 maxFilterAddressCount = 1000
33 errSendStatusMsg = errors.New("send status msg fail")
34 ErrPeerMisbehave = errors.New("peer is misbehave")
37 //BasePeer is the interface for connection level peer
38 type BasePeer interface {
41 ServiceFlag() consensus.ServiceFlag
42 TrafficStatus() (*flowrate.Status, *flowrate.Status)
43 TrySend(byte, interface{}) bool
47 //BasePeerSet is the intergace for connection level peer manager
48 type BasePeerSet interface {
49 AddBannedPeer(string) error
50 StopPeerGracefully(string)
53 type BroadcastMsg interface {
54 FilterTargetPeers(ps *PeerSet) []string
55 MarkSendRecord(ps *PeerSet, peers []string)
61 // PeerInfo indicate peer status snap
62 type PeerInfo struct {
63 ID string `json:"peer_id"`
64 RemoteAddr string `json:"remote_addr"`
65 Height uint64 `json:"height"`
66 Ping string `json:"ping"`
67 Duration string `json:"duration"`
68 TotalSent int64 `json:"total_sent"`
69 TotalReceived int64 `json:"total_received"`
70 AverageSentRate int64 `json:"average_sent_rate"`
71 AverageReceivedRate int64 `json:"average_received_rate"`
72 CurrentSentRate int64 `json:"current_sent_rate"`
73 CurrentReceivedRate int64 `json:"current_received_rate"`
79 services consensus.ServiceFlag
82 banScore trust.DynamicBanScore
83 knownTxs *set.Set // Set of transaction hashes known to be known by this peer
84 knownBlocks *set.Set // Set of block hashes known to be known by this peer
85 knownSignatures *set.Set // Set of block signatures known to be known by this peer
86 knownStatus uint64 // Set of chain status known to be known by this peer
87 filterAdds *set.Set // Set of addresses that the spv node cares about.
90 func newPeer(basePeer BasePeer) *Peer {
93 services: basePeer.ServiceFlag(),
95 knownBlocks: set.New(),
96 knownSignatures: set.New(),
97 filterAdds: set.New(),
101 func (p *Peer) Height() uint64 {
103 defer p.mtx.RUnlock()
107 func (p *Peer) addBanScore(persistent, transient uint32, reason string) bool {
108 score := p.banScore.Increase(persistent, transient)
109 if score > defaultBanThreshold {
110 log.WithFields(log.Fields{
115 }).Errorf("banning and disconnecting")
119 warnThreshold := defaultBanThreshold >> 1
120 if score > warnThreshold {
121 log.WithFields(log.Fields{
126 }).Warning("ban score increasing")
131 func (p *Peer) AddFilterAddress(address []byte) {
135 if p.filterAdds.Size() >= maxFilterAddressCount {
136 log.WithField("module", logModule).Warn("the count of filter addresses is greater than limit")
139 if len(address) > maxFilterAddressSize {
140 log.WithField("module", logModule).Warn("the size of filter address is greater than limit")
144 p.filterAdds.Add(hex.EncodeToString(address))
147 func (p *Peer) AddFilterAddresses(addresses [][]byte) {
148 if !p.filterAdds.IsEmpty() {
151 for _, address := range addresses {
152 p.AddFilterAddress(address)
156 func (p *Peer) FilterClear() {
160 func (p *Peer) GetBlockByHeight(height uint64) bool {
161 msg := struct{ msgs.BlockchainMessage }{&msgs.GetBlockMessage{Height: height}}
162 return p.TrySend(msgs.BlockchainChannel, msg)
165 func (p *Peer) GetBlocks(locator []*bc.Hash, stopHash *bc.Hash) bool {
166 msg := struct{ msgs.BlockchainMessage }{msgs.NewGetBlocksMessage(locator, stopHash)}
167 return p.TrySend(msgs.BlockchainChannel, msg)
170 func (p *Peer) GetHeaders(locator []*bc.Hash, stopHash *bc.Hash) bool {
171 msg := struct{ msgs.BlockchainMessage }{msgs.NewGetHeadersMessage(locator, stopHash)}
172 return p.TrySend(msgs.BlockchainChannel, msg)
175 func (p *Peer) GetPeerInfo() *PeerInfo {
177 defer p.mtx.RUnlock()
179 sentStatus, receivedStatus := p.TrafficStatus()
180 ping := sentStatus.Idle - receivedStatus.Idle
181 if receivedStatus.Idle > sentStatus.Idle {
187 RemoteAddr: p.Addr().String(),
190 Duration: sentStatus.Duration.String(),
191 TotalSent: sentStatus.Bytes,
192 TotalReceived: receivedStatus.Bytes,
193 AverageSentRate: sentStatus.AvgRate,
194 AverageReceivedRate: receivedStatus.AvgRate,
195 CurrentSentRate: sentStatus.CurRate,
196 CurrentReceivedRate: receivedStatus.CurRate,
200 func (p *Peer) getRelatedTxAndStatus(txs []*types.Tx, txStatuses *bc.TransactionStatus) ([]*types.Tx, []*bc.TxVerifyResult) {
201 var relatedTxs []*types.Tx
202 var relatedStatuses []*bc.TxVerifyResult
203 for i, tx := range txs {
204 if p.isRelatedTx(tx) {
205 relatedTxs = append(relatedTxs, tx)
206 relatedStatuses = append(relatedStatuses, txStatuses.VerifyStatus[i])
209 return relatedTxs, relatedStatuses
212 func (p *Peer) isRelatedTx(tx *types.Tx) bool {
213 for _, input := range tx.Inputs {
214 switch inp := input.TypedInput.(type) {
215 case *types.SpendInput:
216 if p.filterAdds.Has(hex.EncodeToString(inp.ControlProgram)) {
221 for _, output := range tx.Outputs {
222 if p.filterAdds.Has(hex.EncodeToString(output.ControlProgram())) {
229 func (p *Peer) isSPVNode() bool {
230 return !p.services.IsEnable(consensus.SFFullNode)
233 func (p *Peer) MarkBlock(hash *bc.Hash) {
237 for p.knownBlocks.Size() >= maxKnownBlocks {
240 p.knownBlocks.Add(hash.String())
243 func (p *Peer) markNewStatus(height uint64) {
247 p.knownStatus = height
250 func (p *Peer) markSign(signature []byte) {
254 for p.knownSignatures.Size() >= maxKnownSignatures {
255 p.knownSignatures.Pop()
257 p.knownSignatures.Add(hex.EncodeToString(signature))
260 func (p *Peer) markTransaction(hash *bc.Hash) {
264 for p.knownTxs.Size() >= maxKnownTxs {
267 p.knownTxs.Add(hash.String())
270 func (ps *PeerSet) PeersWithoutBlock(hash bc.Hash) []string {
272 defer ps.mtx.RUnlock()
275 for _, peer := range ps.peers {
276 if !peer.knownBlocks.Has(hash.String()) {
277 peers = append(peers, peer.ID())
283 func (ps *PeerSet) PeersWithoutSign(signature []byte) []string {
285 defer ps.mtx.RUnlock()
288 for _, peer := range ps.peers {
289 if !peer.knownSignatures.Has(hex.EncodeToString(signature)) {
290 peers = append(peers, peer.ID())
296 func (p *Peer) SendBlock(block *types.Block) (bool, error) {
297 msg, err := msgs.NewBlockMessage(block)
299 return false, errors.Wrap(err, "fail on NewBlockMessage")
302 ok := p.TrySend(msgs.BlockchainChannel, struct{ msgs.BlockchainMessage }{msg})
304 blcokHash := block.Hash()
305 p.knownBlocks.Add(blcokHash.String())
310 func (p *Peer) SendBlocks(blocks []*types.Block) (bool, error) {
311 msg, err := msgs.NewBlocksMessage(blocks)
313 return false, errors.Wrap(err, "fail on NewBlocksMessage")
316 if ok := p.TrySend(msgs.BlockchainChannel, struct{ msgs.BlockchainMessage }{msg}); !ok {
320 for _, block := range blocks {
321 blcokHash := block.Hash()
322 p.knownBlocks.Add(blcokHash.String())
327 func (p *Peer) SendHeaders(headers []*types.BlockHeader) (bool, error) {
328 msg, err := msgs.NewHeadersMessage(headers)
330 return false, errors.New("fail on NewHeadersMessage")
333 ok := p.TrySend(msgs.BlockchainChannel, struct{ msgs.BlockchainMessage }{msg})
337 func (p *Peer) SendMerkleBlock(block *types.Block, txStatuses *bc.TransactionStatus) (bool, error) {
338 msg := msgs.NewMerkleBlockMessage()
339 if err := msg.SetRawBlockHeader(block.BlockHeader); err != nil {
343 relatedTxs, relatedStatuses := p.getRelatedTxAndStatus(block.Transactions, txStatuses)
345 txHashes, txFlags := types.GetTxMerkleTreeProof(block.Transactions, relatedTxs)
346 if err := msg.SetTxInfo(txHashes, txFlags, relatedTxs); err != nil {
350 statusHashes := types.GetStatusMerkleTreeProof(txStatuses.VerifyStatus, txFlags)
351 if err := msg.SetStatusInfo(statusHashes, relatedStatuses); err != nil {
355 ok := p.TrySend(msgs.BlockchainChannel, struct{ msgs.BlockchainMessage }{msg})
359 func (p *Peer) SendTransactions(txs []*types.Tx) error {
360 validTxs := make([]*types.Tx, 0, len(txs))
361 for i, tx := range txs {
362 if p.isSPVNode() && !p.isRelatedTx(tx) || p.knownTxs.Has(tx.ID.String()) {
366 validTxs = append(validTxs, tx)
367 if len(validTxs) != msgs.TxsMsgMaxTxNum && i != len(txs)-1 {
371 msg, err := msgs.NewTransactionsMessage(validTxs)
376 if ok := p.TrySend(msgs.BlockchainChannel, struct{ msgs.BlockchainMessage }{msg}); !ok {
377 return errors.New("failed to send txs msg")
380 for _, validTx := range validTxs {
381 p.knownTxs.Add(validTx.ID.String())
384 validTxs = make([]*types.Tx, 0, len(txs))
390 func (p *Peer) SendStatus(header *types.BlockHeader) error {
391 msg := msgs.NewStatusMessage(header)
392 if ok := p.TrySend(msgs.BlockchainChannel, struct{ msgs.BlockchainMessage }{msg}); !ok {
393 return errSendStatusMsg
395 p.markNewStatus(header.Height)
399 func (p *Peer) SetStatus(height uint64, hash *bc.Hash) {
406 type PeerSet struct {
409 peers map[string]*Peer
412 // newPeerSet creates a new peer set to track the active participants.
413 func NewPeerSet(basePeerSet BasePeerSet) *PeerSet {
415 BasePeerSet: basePeerSet,
416 peers: make(map[string]*Peer),
420 func (ps *PeerSet) AddBanScore(peerID string, persistent, transient uint32, reason string) {
422 peer := ps.peers[peerID]
428 if ban := peer.addBanScore(persistent, transient, reason); !ban {
431 if err := ps.AddBannedPeer(peer.Addr().String()); err != nil {
432 log.WithFields(log.Fields{"module": logModule, "err": err}).Error("fail on add ban peer")
434 ps.RemovePeer(peerID)
437 func (ps *PeerSet) AddPeer(peer BasePeer) {
439 defer ps.mtx.Unlock()
441 if _, ok := ps.peers[peer.ID()]; !ok {
442 ps.peers[peer.ID()] = newPeer(peer)
445 log.WithField("module", logModule).Warning("add existing peer to blockKeeper")
448 func (ps *PeerSet) BestPeer(flag consensus.ServiceFlag) *Peer {
450 defer ps.mtx.RUnlock()
453 for _, p := range ps.peers {
454 if !p.services.IsEnable(flag) {
457 if bestPeer == nil || p.height > bestPeer.height || (p.height == bestPeer.height && p.IsLAN()) {
464 //SendMsg send message to the target peer.
465 func (ps *PeerSet) SendMsg(peerID string, msgChannel byte, msg interface{}) bool {
466 peer := ps.GetPeer(peerID)
471 ok := peer.TrySend(msgChannel, msg)
473 ps.RemovePeer(peerID)
478 //BroadcastMsg Broadcast message to the target peers
479 // and mark the message send record
480 func (ps *PeerSet) BroadcastMsg(bm BroadcastMsg) error {
481 //filter target peers
482 peers := bm.FilterTargetPeers(ps)
484 //broadcast to target peers
485 peersSuccess := make([]string, 0)
486 for _, peer := range peers {
487 if ok := ps.SendMsg(peer, bm.GetChan(), bm.GetMsg()); !ok {
488 log.WithFields(log.Fields{"module": logModule, "peer": peer, "type": reflect.TypeOf(bm.GetMsg()), "message": bm.MsgString()}).Warning("send message to peer error")
491 peersSuccess = append(peersSuccess, peer)
494 //mark the message send record
495 bm.MarkSendRecord(ps, peersSuccess)
499 func (ps *PeerSet) BroadcastNewStatus(bestBlock *types.Block) error {
500 msg := msgs.NewStatusMessage(&bestBlock.BlockHeader)
501 peers := ps.peersWithoutNewStatus(bestBlock.Height)
502 for _, peer := range peers {
503 if ok := peer.TrySend(msgs.BlockchainChannel, struct{ msgs.BlockchainMessage }{msg}); !ok {
504 ps.RemovePeer(peer.ID())
508 peer.markNewStatus(bestBlock.Height)
513 func (ps *PeerSet) BroadcastTx(tx *types.Tx) error {
514 msg, err := msgs.NewTransactionMessage(tx)
516 return errors.Wrap(err, "fail on broadcast tx")
519 peers := ps.peersWithoutTx(&tx.ID)
520 for _, peer := range peers {
521 if peer.isSPVNode() && !peer.isRelatedTx(tx) {
524 if ok := peer.TrySend(msgs.BlockchainChannel, struct{ msgs.BlockchainMessage }{msg}); !ok {
525 log.WithFields(log.Fields{
528 "type": reflect.TypeOf(msg),
529 "message": msg.String(),
530 }).Warning("send message to peer error")
531 ps.RemovePeer(peer.ID())
534 peer.markTransaction(&tx.ID)
539 func (ps *PeerSet) ErrorHandler(peerID string, err error) {
540 if errors.Root(err) == ErrPeerMisbehave {
541 ps.AddBanScore(peerID, 20, 0, err.Error())
543 ps.RemovePeer(peerID)
547 // Peer retrieves the registered peer with the given id.
548 func (ps *PeerSet) GetPeer(id string) *Peer {
550 defer ps.mtx.RUnlock()
554 func (ps *PeerSet) GetPeerInfos() []*PeerInfo {
556 defer ps.mtx.RUnlock()
558 result := []*PeerInfo{}
559 for _, peer := range ps.peers {
560 result = append(result, peer.GetPeerInfo())
565 func (ps *PeerSet) MarkBlock(peerID string, hash *bc.Hash) {
566 peer := ps.GetPeer(peerID)
573 func (ps *PeerSet) MarkBlockSignature(peerID string, signature []byte) {
574 peer := ps.GetPeer(peerID)
578 peer.markSign(signature)
581 func (ps *PeerSet) MarkStatus(peerID string, height uint64) {
582 peer := ps.GetPeer(peerID)
586 peer.markNewStatus(height)
589 func (ps *PeerSet) MarkTx(peerID string, txHash bc.Hash) {
591 peer := ps.peers[peerID]
597 peer.markTransaction(&txHash)
600 func (ps *PeerSet) peersWithoutBlock(hash *bc.Hash) []*Peer {
602 defer ps.mtx.RUnlock()
605 for _, peer := range ps.peers {
606 if !peer.knownBlocks.Has(hash.String()) {
607 peers = append(peers, peer)
613 func (ps *PeerSet) peersWithoutNewStatus(height uint64) []*Peer {
615 defer ps.mtx.RUnlock()
618 for _, peer := range ps.peers {
619 if peer.knownStatus < height {
620 peers = append(peers, peer)
626 func (ps *PeerSet) peersWithoutTx(hash *bc.Hash) []*Peer {
628 defer ps.mtx.RUnlock()
631 for _, peer := range ps.peers {
632 if !peer.knownTxs.Has(hash.String()) {
633 peers = append(peers, peer)
639 func (ps *PeerSet) RemovePeer(peerID string) {
641 delete(ps.peers, peerID)
643 ps.StopPeerGracefully(peerID)
646 func (ps *PeerSet) SetStatus(peerID string, height uint64, hash *bc.Hash) {
647 peer := ps.GetPeer(peerID)
652 peer.SetStatus(height, hash)