-package netsync
+package peers
import (
"encoding/hex"
"github.com/vapor/consensus"
"github.com/vapor/errors"
+ msgs "github.com/vapor/netsync/messages"
"github.com/vapor/p2p/trust"
"github.com/vapor/protocol/bc"
"github.com/vapor/protocol/bc/types"
)
const (
- maxKnownTxs = 32768 // Maximum transactions hashes to keep in the known list (prevent DOS)
- maxKnownBlocks = 1024 // Maximum block hashes to keep in the known list (prevent DOS)
- defaultBanThreshold = uint32(100)
+ maxKnownTxs = 32768 // Maximum transactions hashes to keep in the known list (prevent DOS)
+ maxKnownSignatures = 1024 // Maximum block signatures to keep in the known list (prevent DOS)
+ maxKnownBlocks = 1024 // Maximum block hashes to keep in the known list (prevent DOS)
+ defaultBanThreshold = uint32(100)
+ maxFilterAddressSize = 50
+ maxFilterAddressCount = 1000
+
+ logModule = "peers"
)
-var errSendStatusMsg = errors.New("send status msg fail")
+var (
+ errSendStatusMsg = errors.New("send status msg fail")
+ ErrPeerMisbehave = errors.New("peer is misbehave")
+)
//BasePeer is the interface for connection level peer
type BasePeer interface {
StopPeerGracefully(string)
}
+type BroadcastMsg interface {
+ FilterTargetPeers(ps *PeerSet) []string
+ MarkSendRecord(ps *PeerSet, peers []string)
+ GetChan() byte
+ GetMsg() interface{}
+ MsgString() string
+}
+
// PeerInfo indicate peer status snap
type PeerInfo struct {
ID string `json:"peer_id"`
CurrentReceivedRate int64 `json:"current_received_rate"`
}
-type peer struct {
+type Peer struct {
BasePeer
- mtx sync.RWMutex
- services consensus.ServiceFlag
- height uint64
- hash *bc.Hash
- banScore trust.DynamicBanScore
- knownTxs *set.Set // Set of transaction hashes known to be known by this peer
- knownBlocks *set.Set // Set of block hashes known to be known by this peer
- knownStatus uint64 // Set of chain status known to be known by this peer
- filterAdds *set.Set // Set of addresses that the spv node cares about.
-}
-
-func newPeer(basePeer BasePeer) *peer {
- return &peer{
- BasePeer: basePeer,
- services: basePeer.ServiceFlag(),
- knownTxs: set.New(),
- knownBlocks: set.New(),
- filterAdds: set.New(),
- }
-}
-
-func (p *peer) Height() uint64 {
+ mtx sync.RWMutex
+ services consensus.ServiceFlag
+ height uint64
+ hash *bc.Hash
+ banScore trust.DynamicBanScore
+ knownTxs *set.Set // Set of transaction hashes known to be known by this peer
+ knownBlocks *set.Set // Set of block hashes known to be known by this peer
+ knownSignatures *set.Set // Set of block signatures known to be known by this peer
+ knownStatus uint64 // Set of chain status known to be known by this peer
+ filterAdds *set.Set // Set of addresses that the spv node cares about.
+}
+
+func newPeer(basePeer BasePeer) *Peer {
+ return &Peer{
+ BasePeer: basePeer,
+ services: basePeer.ServiceFlag(),
+ knownTxs: set.New(),
+ knownBlocks: set.New(),
+ knownSignatures: set.New(),
+ filterAdds: set.New(),
+ }
+}
+
+func (p *Peer) Height() uint64 {
p.mtx.RLock()
defer p.mtx.RUnlock()
return p.height
}
-func (p *peer) addBanScore(persistent, transient uint32, reason string) bool {
+func (p *Peer) addBanScore(persistent, transient uint32, reason string) bool {
score := p.banScore.Increase(persistent, transient)
if score > defaultBanThreshold {
log.WithFields(log.Fields{
return false
}
-func (p *peer) addFilterAddress(address []byte) {
+func (p *Peer) AddFilterAddress(address []byte) {
p.mtx.Lock()
defer p.mtx.Unlock()
p.filterAdds.Add(hex.EncodeToString(address))
}
-func (p *peer) addFilterAddresses(addresses [][]byte) {
+func (p *Peer) AddFilterAddresses(addresses [][]byte) {
if !p.filterAdds.IsEmpty() {
p.filterAdds.Clear()
}
for _, address := range addresses {
- p.addFilterAddress(address)
+ p.AddFilterAddress(address)
}
}
-func (p *peer) getBlockByHeight(height uint64) bool {
- msg := struct{ BlockchainMessage }{&GetBlockMessage{Height: height}}
- return p.TrySend(BlockchainChannel, msg)
+func (p *Peer) FilterClear() {
+ p.filterAdds.Clear()
}
-func (p *peer) getBlocks(locator []*bc.Hash, stopHash *bc.Hash) bool {
- msg := struct{ BlockchainMessage }{NewGetBlocksMessage(locator, stopHash)}
- return p.TrySend(BlockchainChannel, msg)
+func (p *Peer) GetBlockByHeight(height uint64) bool {
+ msg := struct{ msgs.BlockchainMessage }{&msgs.GetBlockMessage{Height: height}}
+ return p.TrySend(msgs.BlockchainChannel, msg)
}
-func (p *peer) getHeaders(locator []*bc.Hash, stopHash *bc.Hash) bool {
- msg := struct{ BlockchainMessage }{NewGetHeadersMessage(locator, stopHash)}
- return p.TrySend(BlockchainChannel, msg)
+func (p *Peer) GetBlocks(locator []*bc.Hash, stopHash *bc.Hash) bool {
+ msg := struct{ msgs.BlockchainMessage }{msgs.NewGetBlocksMessage(locator, stopHash)}
+ return p.TrySend(msgs.BlockchainChannel, msg)
}
-func (p *peer) getPeerInfo() *PeerInfo {
+func (p *Peer) GetHeaders(locator []*bc.Hash, stopHash *bc.Hash) bool {
+ msg := struct{ msgs.BlockchainMessage }{msgs.NewGetHeadersMessage(locator, stopHash)}
+ return p.TrySend(msgs.BlockchainChannel, msg)
+}
+
+func (p *Peer) GetPeerInfo() *PeerInfo {
p.mtx.RLock()
defer p.mtx.RUnlock()
}
}
-func (p *peer) getRelatedTxAndStatus(txs []*types.Tx, txStatuses *bc.TransactionStatus) ([]*types.Tx, []*bc.TxVerifyResult) {
+func (p *Peer) getRelatedTxAndStatus(txs []*types.Tx, txStatuses *bc.TransactionStatus) ([]*types.Tx, []*bc.TxVerifyResult) {
var relatedTxs []*types.Tx
var relatedStatuses []*bc.TxVerifyResult
for i, tx := range txs {
return relatedTxs, relatedStatuses
}
-func (p *peer) isRelatedTx(tx *types.Tx) bool {
+func (p *Peer) isRelatedTx(tx *types.Tx) bool {
for _, input := range tx.Inputs {
switch inp := input.TypedInput.(type) {
case *types.SpendInput:
return false
}
-func (p *peer) isSPVNode() bool {
+func (p *Peer) isSPVNode() bool {
return !p.services.IsEnable(consensus.SFFullNode)
}
-func (p *peer) markBlock(hash *bc.Hash) {
+func (p *Peer) MarkBlock(hash *bc.Hash) {
p.mtx.Lock()
defer p.mtx.Unlock()
p.knownBlocks.Add(hash.String())
}
-func (p *peer) markNewStatus(height uint64) {
+func (p *Peer) markNewStatus(height uint64) {
p.mtx.Lock()
defer p.mtx.Unlock()
p.knownStatus = height
}
-func (p *peer) markTransaction(hash *bc.Hash) {
+func (p *Peer) markSign(signature []byte) {
+ p.mtx.Lock()
+ defer p.mtx.Unlock()
+
+ for p.knownSignatures.Size() >= maxKnownSignatures {
+ p.knownSignatures.Pop()
+ }
+ p.knownSignatures.Add(signature)
+}
+
+func (p *Peer) markTransaction(hash *bc.Hash) {
p.mtx.Lock()
defer p.mtx.Unlock()
p.knownTxs.Add(hash.String())
}
-func (p *peer) sendBlock(block *types.Block) (bool, error) {
- msg, err := NewBlockMessage(block)
+func (ps *PeerSet) PeersWithoutBlock(hash bc.Hash) []string {
+ ps.mtx.RLock()
+ defer ps.mtx.RUnlock()
+
+ var peers []string
+ for _, peer := range ps.peers {
+ if !peer.knownBlocks.Has(hash.String()) {
+ peers = append(peers, peer.ID())
+ }
+ }
+ return peers
+}
+
+func (ps *PeerSet) PeersWithoutSign(signature []byte) []string {
+ ps.mtx.RLock()
+ defer ps.mtx.RUnlock()
+
+ var peers []string
+ for _, peer := range ps.peers {
+ if !peer.knownSignatures.Has(signature) {
+ peers = append(peers, peer.ID())
+ }
+ }
+ return peers
+}
+
+func (p *Peer) SendBlock(block *types.Block) (bool, error) {
+ msg, err := msgs.NewBlockMessage(block)
if err != nil {
return false, errors.Wrap(err, "fail on NewBlockMessage")
}
- ok := p.TrySend(BlockchainChannel, struct{ BlockchainMessage }{msg})
+ ok := p.TrySend(msgs.BlockchainChannel, struct{ msgs.BlockchainMessage }{msg})
if ok {
blcokHash := block.Hash()
p.knownBlocks.Add(blcokHash.String())
return ok, nil
}
-func (p *peer) sendBlocks(blocks []*types.Block) (bool, error) {
- msg, err := NewBlocksMessage(blocks)
+func (p *Peer) SendBlocks(blocks []*types.Block) (bool, error) {
+ msg, err := msgs.NewBlocksMessage(blocks)
if err != nil {
return false, errors.Wrap(err, "fail on NewBlocksMessage")
}
- if ok := p.TrySend(BlockchainChannel, struct{ BlockchainMessage }{msg}); !ok {
+ if ok := p.TrySend(msgs.BlockchainChannel, struct{ msgs.BlockchainMessage }{msg}); !ok {
return ok, nil
}
return true, nil
}
-func (p *peer) sendHeaders(headers []*types.BlockHeader) (bool, error) {
- msg, err := NewHeadersMessage(headers)
+func (p *Peer) SendHeaders(headers []*types.BlockHeader) (bool, error) {
+ msg, err := msgs.NewHeadersMessage(headers)
if err != nil {
return false, errors.New("fail on NewHeadersMessage")
}
- ok := p.TrySend(BlockchainChannel, struct{ BlockchainMessage }{msg})
+ ok := p.TrySend(msgs.BlockchainChannel, struct{ msgs.BlockchainMessage }{msg})
return ok, nil
}
-func (p *peer) sendMerkleBlock(block *types.Block, txStatuses *bc.TransactionStatus) (bool, error) {
- msg := NewMerkleBlockMessage()
- if err := msg.setRawBlockHeader(block.BlockHeader); err != nil {
+func (p *Peer) SendMerkleBlock(block *types.Block, txStatuses *bc.TransactionStatus) (bool, error) {
+ msg := msgs.NewMerkleBlockMessage()
+ if err := msg.SetRawBlockHeader(block.BlockHeader); err != nil {
return false, err
}
relatedTxs, relatedStatuses := p.getRelatedTxAndStatus(block.Transactions, txStatuses)
txHashes, txFlags := types.GetTxMerkleTreeProof(block.Transactions, relatedTxs)
- if err := msg.setTxInfo(txHashes, txFlags, relatedTxs); err != nil {
+ if err := msg.SetTxInfo(txHashes, txFlags, relatedTxs); err != nil {
return false, nil
}
statusHashes := types.GetStatusMerkleTreeProof(txStatuses.VerifyStatus, txFlags)
- if err := msg.setStatusInfo(statusHashes, relatedStatuses); err != nil {
+ if err := msg.SetStatusInfo(statusHashes, relatedStatuses); err != nil {
return false, nil
}
- ok := p.TrySend(BlockchainChannel, struct{ BlockchainMessage }{msg})
+ ok := p.TrySend(msgs.BlockchainChannel, struct{ msgs.BlockchainMessage }{msg})
return ok, nil
}
-func (p *peer) sendTransactions(txs []*types.Tx) error {
+func (p *Peer) SendTransactions(txs []*types.Tx) error {
validTxs := make([]*types.Tx, 0, len(txs))
for i, tx := range txs {
if p.isSPVNode() && !p.isRelatedTx(tx) || p.knownTxs.Has(tx.ID.String()) {
}
validTxs = append(validTxs, tx)
- if len(validTxs) != txsMsgMaxTxNum && i != len(txs)-1 {
+ if len(validTxs) != msgs.TxsMsgMaxTxNum && i != len(txs)-1 {
continue
}
- msg, err := NewTransactionsMessage(validTxs)
+ msg, err := msgs.NewTransactionsMessage(validTxs)
if err != nil {
return err
}
- if ok := p.TrySend(BlockchainChannel, struct{ BlockchainMessage }{msg}); !ok {
+ if ok := p.TrySend(msgs.BlockchainChannel, struct{ msgs.BlockchainMessage }{msg}); !ok {
return errors.New("failed to send txs msg")
}
return nil
}
-func (p *peer) sendStatus(header *types.BlockHeader) error {
- msg := NewStatusMessage(header)
- if ok := p.TrySend(BlockchainChannel, struct{ BlockchainMessage }{msg}); !ok {
+func (p *Peer) SendStatus(header *types.BlockHeader) error {
+ msg := msgs.NewStatusMessage(header)
+ if ok := p.TrySend(msgs.BlockchainChannel, struct{ msgs.BlockchainMessage }{msg}); !ok {
return errSendStatusMsg
}
p.markNewStatus(header.Height)
return nil
}
-func (p *peer) setStatus(height uint64, hash *bc.Hash) {
+func (p *Peer) SetStatus(height uint64, hash *bc.Hash) {
p.mtx.Lock()
defer p.mtx.Unlock()
p.height = height
p.hash = hash
}
-type peerSet struct {
+type PeerSet struct {
BasePeerSet
mtx sync.RWMutex
- peers map[string]*peer
+ peers map[string]*Peer
}
// newPeerSet creates a new peer set to track the active participants.
-func newPeerSet(basePeerSet BasePeerSet) *peerSet {
- return &peerSet{
+func NewPeerSet(basePeerSet BasePeerSet) *PeerSet {
+ return &PeerSet{
BasePeerSet: basePeerSet,
- peers: make(map[string]*peer),
+ peers: make(map[string]*Peer),
}
}
-func (ps *peerSet) addBanScore(peerID string, persistent, transient uint32, reason string) {
+func (ps *PeerSet) AddBanScore(peerID string, persistent, transient uint32, reason string) {
ps.mtx.Lock()
peer := ps.peers[peerID]
ps.mtx.Unlock()
if err := ps.AddBannedPeer(peer.Addr().String()); err != nil {
log.WithFields(log.Fields{"module": logModule, "err": err}).Error("fail on add ban peer")
}
- ps.removePeer(peerID)
+ ps.RemovePeer(peerID)
}
-func (ps *peerSet) addPeer(peer BasePeer) {
+func (ps *PeerSet) AddPeer(peer BasePeer) {
ps.mtx.Lock()
defer ps.mtx.Unlock()
log.WithField("module", logModule).Warning("add existing peer to blockKeeper")
}
-func (ps *peerSet) bestPeer(flag consensus.ServiceFlag) *peer {
+func (ps *PeerSet) BestPeer(flag consensus.ServiceFlag) *Peer {
ps.mtx.RLock()
defer ps.mtx.RUnlock()
- var bestPeer *peer
+ var bestPeer *Peer
for _, p := range ps.peers {
if !p.services.IsEnable(flag) {
continue
return bestPeer
}
-func (ps *peerSet) broadcastMinedBlock(block *types.Block) error {
- msg, err := NewMinedBlockMessage(block)
- if err != nil {
- return errors.Wrap(err, "fail on broadcast mined block")
+//SendMsg send message to the target peer.
+func (ps *PeerSet) SendMsg(peerID string, msgChannel byte, msg interface{}) bool {
+ peer := ps.GetPeer(peerID)
+ if peer == nil {
+ return false
}
- hash := block.Hash()
- peers := ps.peersWithoutBlock(&hash)
+ ok := peer.TrySend(msgChannel, msg)
+ if !ok {
+ ps.RemovePeer(peerID)
+ }
+ return ok
+}
+
+//BroadcastMsg Broadcast message to the target peers
+// and mark the message send record
+func (ps *PeerSet) BroadcastMsg(bm BroadcastMsg) error {
+ //filter target peers
+ peers := bm.FilterTargetPeers(ps)
+
+ //broadcast to target peers
+ peersSuccess := make([]string, 0)
for _, peer := range peers {
- if peer.isSPVNode() {
+ if ok := ps.SendMsg(peer, bm.GetChan(), bm.GetMsg()); !ok {
+ log.WithFields(log.Fields{"module": logModule, "peer": peer, "type": reflect.TypeOf(bm.GetMsg()), "message": bm.MsgString()}).Warning("send message to peer error")
continue
}
- if ok := peer.TrySend(BlockchainChannel, struct{ BlockchainMessage }{msg}); !ok {
- log.WithFields(log.Fields{"module": logModule, "peer": peer.Addr(), "type": reflect.TypeOf(msg), "message": msg.String()}).Warning("send message to peer error")
- ps.removePeer(peer.ID())
- continue
- }
- peer.markBlock(&hash)
- peer.markNewStatus(block.Height)
+ peersSuccess = append(peersSuccess, peer)
}
+
+ //mark the message send record
+ bm.MarkSendRecord(ps, peersSuccess)
return nil
}
-func (ps *peerSet) broadcastNewStatus(bestBlock *types.Block) error {
- msg := NewStatusMessage(&bestBlock.BlockHeader)
+func (ps *PeerSet) BroadcastNewStatus(bestBlock *types.Block) error {
+ msg := msgs.NewStatusMessage(&bestBlock.BlockHeader)
peers := ps.peersWithoutNewStatus(bestBlock.Height)
for _, peer := range peers {
- if ok := peer.TrySend(BlockchainChannel, struct{ BlockchainMessage }{msg}); !ok {
- ps.removePeer(peer.ID())
+ if ok := peer.TrySend(msgs.BlockchainChannel, struct{ msgs.BlockchainMessage }{msg}); !ok {
+ ps.RemovePeer(peer.ID())
continue
}
return nil
}
-func (ps *peerSet) broadcastTx(tx *types.Tx) error {
- msg, err := NewTransactionMessage(tx)
+func (ps *PeerSet) BroadcastTx(tx *types.Tx) error {
+ msg, err := msgs.NewTransactionMessage(tx)
if err != nil {
return errors.Wrap(err, "fail on broadcast tx")
}
if peer.isSPVNode() && !peer.isRelatedTx(tx) {
continue
}
- if ok := peer.TrySend(BlockchainChannel, struct{ BlockchainMessage }{msg}); !ok {
+ if ok := peer.TrySend(msgs.BlockchainChannel, struct{ msgs.BlockchainMessage }{msg}); !ok {
log.WithFields(log.Fields{
"module": logModule,
"peer": peer.Addr(),
"type": reflect.TypeOf(msg),
"message": msg.String(),
}).Warning("send message to peer error")
- ps.removePeer(peer.ID())
+ ps.RemovePeer(peer.ID())
continue
}
peer.markTransaction(&tx.ID)
return nil
}
-func (ps *peerSet) errorHandler(peerID string, err error) {
- if errors.Root(err) == errPeerMisbehave {
- ps.addBanScore(peerID, 20, 0, err.Error())
+func (ps *PeerSet) ErrorHandler(peerID string, err error) {
+ if errors.Root(err) == ErrPeerMisbehave {
+ ps.AddBanScore(peerID, 20, 0, err.Error())
} else {
- ps.removePeer(peerID)
+ ps.RemovePeer(peerID)
}
}
// Peer retrieves the registered peer with the given id.
-func (ps *peerSet) getPeer(id string) *peer {
+func (ps *PeerSet) GetPeer(id string) *Peer {
ps.mtx.RLock()
defer ps.mtx.RUnlock()
return ps.peers[id]
}
-func (ps *peerSet) getPeerInfos() []*PeerInfo {
+func (ps *PeerSet) GetPeerInfos() []*PeerInfo {
ps.mtx.RLock()
defer ps.mtx.RUnlock()
result := []*PeerInfo{}
for _, peer := range ps.peers {
- result = append(result, peer.getPeerInfo())
+ result = append(result, peer.GetPeerInfo())
}
return result
}
-func (ps *peerSet) markTx(peerID string, txHash bc.Hash) {
+func (ps *PeerSet) MarkBlock(peerID string, hash *bc.Hash) {
+ peer := ps.GetPeer(peerID)
+ if peer == nil {
+ return
+ }
+ peer.MarkBlock(hash)
+}
+
+func (ps *PeerSet) MarkBlockSignature(peerID string, signature []byte) {
+ peer := ps.GetPeer(peerID)
+ if peer == nil {
+ return
+ }
+ peer.markSign(signature)
+}
+
+func (ps *PeerSet) MarkStatus(peerID string, height uint64) {
+ peer := ps.GetPeer(peerID)
+ if peer == nil {
+ return
+ }
+ peer.markNewStatus(height)
+}
+
+func (ps *PeerSet) MarkTx(peerID string, txHash bc.Hash) {
ps.mtx.Lock()
peer := ps.peers[peerID]
ps.mtx.Unlock()
peer.markTransaction(&txHash)
}
-func (ps *peerSet) peersWithoutBlock(hash *bc.Hash) []*peer {
+func (ps *PeerSet) peersWithoutBlock(hash *bc.Hash) []*Peer {
ps.mtx.RLock()
defer ps.mtx.RUnlock()
- peers := []*peer{}
+ peers := []*Peer{}
for _, peer := range ps.peers {
if !peer.knownBlocks.Has(hash.String()) {
peers = append(peers, peer)
return peers
}
-func (ps *peerSet) peersWithoutNewStatus(height uint64) []*peer {
+func (ps *PeerSet) peersWithoutNewStatus(height uint64) []*Peer {
ps.mtx.RLock()
defer ps.mtx.RUnlock()
- var peers []*peer
+ var peers []*Peer
for _, peer := range ps.peers {
if peer.knownStatus < height {
peers = append(peers, peer)
return peers
}
-func (ps *peerSet) peersWithoutTx(hash *bc.Hash) []*peer {
+func (ps *PeerSet) peersWithoutTx(hash *bc.Hash) []*Peer {
ps.mtx.RLock()
defer ps.mtx.RUnlock()
- peers := []*peer{}
+ peers := []*Peer{}
for _, peer := range ps.peers {
if !peer.knownTxs.Has(hash.String()) {
peers = append(peers, peer)
return peers
}
-func (ps *peerSet) removePeer(peerID string) {
+func (ps *PeerSet) RemovePeer(peerID string) {
ps.mtx.Lock()
delete(ps.peers, peerID)
ps.mtx.Unlock()
ps.StopPeerGracefully(peerID)
}
+
+func (ps *PeerSet) SetStatus(peerID string, height uint64, hash *bc.Hash) {
+ peer := ps.GetPeer(peerID)
+ if peer == nil {
+ return
+ }
+
+ peer.SetStatus(height, hash)
+}