"github.com/bytom/errors"
"github.com/bytom/p2p"
+ "github.com/bytom/p2p/trust"
"github.com/bytom/protocol/bc"
"github.com/bytom/protocol/bc/types"
)
errNotRegistered = errors.New("peer is not registered")
)
-const defaultVersion = 1
+const (
+ defaultVersion = 1
+ defaultBanThreshold = uint64(100)
+)
type peer struct {
- mtx sync.RWMutex
- version int // Protocol version negotiated
- id string
- height uint64
- hash *bc.Hash
- *p2p.Peer
+ mtx sync.RWMutex
+ version int // Protocol version negotiated
+ id string
+ height uint64
+ hash *bc.Hash
+ banScore trust.DynamicBanScore
+
+ swPeer *p2p.Peer
knownTxs *set.Set // Set of transaction hashes known to be known by this peer
knownBlocks *set.Set // Set of block hashes known to be known by this peer
func newPeer(height uint64, hash *bc.Hash, Peer *p2p.Peer) *peer {
return &peer{
version: defaultVersion,
+ id: Peer.Key,
height: height,
hash: hash,
- Peer: Peer,
+ swPeer: Peer,
knownTxs: set.New(),
knownBlocks: set.New(),
}
func (p *peer) requestBlockByHash(hash *bc.Hash) error {
msg := &BlockRequestMessage{RawHash: hash.Byte32()}
- p.Peer.TrySend(BlockchainChannel, struct{ BlockchainMessage }{msg})
+ p.swPeer.TrySend(BlockchainChannel, struct{ BlockchainMessage }{msg})
return nil
}
func (p *peer) requestBlockByHeight(height uint64) error {
msg := &BlockRequestMessage{Height: height}
- p.Peer.TrySend(BlockchainChannel, struct{ BlockchainMessage }{msg})
+ p.swPeer.TrySend(BlockchainChannel, struct{ BlockchainMessage }{msg})
return nil
}
}
hash := &tx.ID
p.knownTxs.Add(hash.String())
- p.Peer.TrySend(BlockchainChannel, struct{ BlockchainMessage }{msg})
+ if p.swPeer == nil {
+ return errPeerDropped
+ }
+ p.swPeer.TrySend(BlockchainChannel, struct{ BlockchainMessage }{msg})
}
return nil
}
p.mtx.RLock()
defer p.mtx.RUnlock()
- return p.Peer
+ return p.swPeer
}
// MarkTransaction marks a transaction as known for the peer, ensuring that it
p.knownBlocks.Add(hash.String())
}
+// addBanScore increases the persistent and decaying ban score fields by the
+// values passed as parameters. If the resulting score exceeds half of the ban
+// threshold, a warning is logged including the reason provided. Further, if
+// the score is above the ban threshold, the peer will be banned and
+// disconnected.
+func (p *peer) addBanScore(persistent, transient uint64, reason string) bool {
+ warnThreshold := defaultBanThreshold >> 1
+ if transient == 0 && persistent == 0 {
+ // The score is not being increased, but a warning message is still
+ // logged if the score is above the warn threshold.
+ score := p.banScore.Int()
+ if score > warnThreshold {
+ log.Infof("Misbehaving peer %s: %s -- ban score is %d, "+"it was not increased this time", p.id, reason, score)
+ }
+ return false
+ }
+ score := p.banScore.Increase(persistent, transient)
+ if score > warnThreshold {
+ log.Infof("Misbehaving peer %s: %s -- ban score increased to %d", p.id, reason, score)
+ if score > defaultBanThreshold {
+ log.Errorf("Misbehaving peer %s -- banning and disconnecting", p.id)
+ return true
+ }
+ }
+ return false
+}
+
type peerSet struct {
peers map[string]*peer
lock sync.RWMutex
return nil
}
-func (ps *peerSet) DropPeer(id string) error {
- ps.lock.Lock()
- defer ps.lock.Unlock()
-
- peer, ok := ps.peers[id]
- if !ok {
- return errNotRegistered
- }
- peer.CloseConn()
- return nil
-}
-
// Peer retrieves the registered peer with the given id.
-func (ps *peerSet) Peer(id string) *peer {
+func (ps *peerSet) Peer(id string) (*peer, bool) {
ps.lock.RLock()
defer ps.lock.RUnlock()
-
- return ps.peers[id]
+ p, ok := ps.peers[id]
+ return p, ok
}
// Len returns if the current number of peers in the set.
for _, p := range ps.peers {
if bestPeer == nil || p.height > bestHeight {
- bestPeer, bestHeight = p.Peer, p.height
+ bestPeer, bestHeight = p.swPeer, p.height
}
}
defer ps.lock.Unlock()
for _, p := range ps.peers {
- p.CloseConn()
+ p.swPeer.CloseConn()
}
ps.closed = true
}
}
func (ps *peerSet) requestBlockByHash(peerID string, hash *bc.Hash) error {
- ps.lock.Lock()
- defer ps.lock.Unlock()
-
- peer, ok := ps.peers[peerID]
+ peer, ok := ps.Peer(peerID)
if !ok {
return errors.New("Can't find peer. ")
}
}
func (ps *peerSet) requestBlockByHeight(peerID string, height uint64) error {
- ps.lock.Lock()
- defer ps.lock.Unlock()
-
- peer, ok := ps.peers[peerID]
+ peer, ok := ps.Peer(peerID)
if !ok {
return errors.New("Can't find peer. ")
}
return peer.requestBlockByHeight(height)
}
-func (ps *peerSet) BroadcastMinedBlock(block *types.Block) error {
+func (ps *peerSet) BroadcastMinedBlock(block *types.Block) ([]*peer, error) {
msg, err := NewMinedBlockMessage(block)
if err != nil {
- return errors.New("Failed construction block msg")
+ return nil, errors.New("Failed construction block msg")
}
hash := block.Hash()
peers := ps.PeersWithoutBlock(&hash)
+ abnormalPeers := make([]*peer, 0)
for _, peer := range peers {
- ps.MarkBlock(peer.Key, &hash)
- peer.Send(BlockchainChannel, struct{ BlockchainMessage }{msg})
+ if ok := peer.swPeer.TrySend(BlockchainChannel, struct{ BlockchainMessage }{msg}); !ok {
+ abnormalPeers = append(abnormalPeers, peer)
+ continue
+ }
+ if p, ok := ps.Peer(peer.id); ok {
+ p.MarkBlock(&hash)
+ }
}
- return nil
+ return abnormalPeers, nil
+}
+
+func (ps *peerSet) BroadcastNewStatus(block *types.Block) ([]*peer, error) {
+ return ps.BroadcastMinedBlock(block)
}
-func (ps *peerSet) BroadcastTx(tx *types.Tx) error {
+func (ps *peerSet) BroadcastTx(tx *types.Tx) ([]*peer, error) {
msg, err := NewTransactionNotifyMessage(tx)
if err != nil {
- return errors.New("Failed construction tx msg")
+ return nil, errors.New("Failed construction tx msg")
}
peers := ps.PeersWithoutTx(&tx.ID)
+ abnormalPeers := make([]*peer, 0)
for _, peer := range peers {
- ps.peers[peer.Key].MarkTransaction(&tx.ID)
- peer.Send(BlockchainChannel, struct{ BlockchainMessage }{msg})
+ if ok := peer.swPeer.TrySend(BlockchainChannel, struct{ BlockchainMessage }{msg}); !ok {
+ abnormalPeers = append(abnormalPeers, peer)
+ continue
+ }
+ if p, ok := ps.Peer(peer.id); ok {
+ p.MarkTransaction(&tx.ID)
+ }
}
- return nil
+ return abnormalPeers, nil
}