OSDN Git Service

Merge pull request #935 from Bytom/dev
[bytom/bytom.git] / netsync / peer.go
index f37cba5..c48e5ee 100644 (file)
@@ -8,6 +8,7 @@ import (
 
        "github.com/bytom/errors"
        "github.com/bytom/p2p"
+       "github.com/bytom/p2p/trust"
        "github.com/bytom/protocol/bc"
        "github.com/bytom/protocol/bc/types"
 )
@@ -18,15 +19,20 @@ var (
        errNotRegistered     = errors.New("peer is not registered")
 )
 
-const defaultVersion = 1
+const (
+       defaultVersion      = 1
+       defaultBanThreshold = uint64(100)
+)
 
 type peer struct {
-       mtx     sync.RWMutex
-       version int // Protocol version negotiated
-       id      string
-       height  uint64
-       hash    *bc.Hash
-       *p2p.Peer
+       mtx      sync.RWMutex
+       version  int // Protocol version negotiated
+       id       string
+       height   uint64
+       hash     *bc.Hash
+       banScore trust.DynamicBanScore
+
+       swPeer *p2p.Peer
 
        knownTxs    *set.Set // Set of transaction hashes known to be known by this peer
        knownBlocks *set.Set // Set of block hashes known to be known by this peer
@@ -35,9 +41,10 @@ type peer struct {
 func newPeer(height uint64, hash *bc.Hash, Peer *p2p.Peer) *peer {
        return &peer{
                version:     defaultVersion,
+               id:          Peer.Key,
                height:      height,
                hash:        hash,
-               Peer:        Peer,
+               swPeer:      Peer,
                knownTxs:    set.New(),
                knownBlocks: set.New(),
        }
@@ -59,13 +66,13 @@ func (p *peer) SetStatus(height uint64, hash *bc.Hash) {
 
 func (p *peer) requestBlockByHash(hash *bc.Hash) error {
        msg := &BlockRequestMessage{RawHash: hash.Byte32()}
-       p.Peer.TrySend(BlockchainChannel, struct{ BlockchainMessage }{msg})
+       p.swPeer.TrySend(BlockchainChannel, struct{ BlockchainMessage }{msg})
        return nil
 }
 
 func (p *peer) requestBlockByHeight(height uint64) error {
        msg := &BlockRequestMessage{Height: height}
-       p.Peer.TrySend(BlockchainChannel, struct{ BlockchainMessage }{msg})
+       p.swPeer.TrySend(BlockchainChannel, struct{ BlockchainMessage }{msg})
        return nil
 }
 
@@ -77,7 +84,10 @@ func (p *peer) SendTransactions(txs []*types.Tx) error {
                }
                hash := &tx.ID
                p.knownTxs.Add(hash.String())
-               p.Peer.TrySend(BlockchainChannel, struct{ BlockchainMessage }{msg})
+               if p.swPeer == nil {
+                       return errPeerDropped
+               }
+               p.swPeer.TrySend(BlockchainChannel, struct{ BlockchainMessage }{msg})
        }
        return nil
 }
@@ -86,7 +96,7 @@ func (p *peer) getPeer() *p2p.Peer {
        p.mtx.RLock()
        defer p.mtx.RUnlock()
 
-       return p.Peer
+       return p.swPeer
 }
 
 // MarkTransaction marks a transaction as known for the peer, ensuring that it
@@ -115,6 +125,33 @@ func (p *peer) MarkBlock(hash *bc.Hash) {
        p.knownBlocks.Add(hash.String())
 }
 
+// addBanScore increases the persistent and decaying ban score fields by the
+// values passed as parameters. If the resulting score exceeds half of the ban
+// threshold, a warning is logged including the reason provided. Further, if
+// the score is above the ban threshold, the peer will be banned and
+// disconnected.
+func (p *peer) addBanScore(persistent, transient uint64, reason string) bool {
+       warnThreshold := defaultBanThreshold >> 1
+       if transient == 0 && persistent == 0 {
+               // The score is not being increased, but a warning message is still
+               // logged if the score is above the warn threshold.
+               score := p.banScore.Int()
+               if score > warnThreshold {
+                       log.Infof("Misbehaving peer %s: %s -- ban score is %d, "+"it was not increased this time", p.id, reason, score)
+               }
+               return false
+       }
+       score := p.banScore.Increase(persistent, transient)
+       if score > warnThreshold {
+               log.Infof("Misbehaving peer %s: %s -- ban score increased to %d", p.id, reason, score)
+               if score > defaultBanThreshold {
+                       log.Errorf("Misbehaving peer %s -- banning and disconnecting", p.id)
+                       return true
+               }
+       }
+       return false
+}
+
 type peerSet struct {
        peers  map[string]*peer
        lock   sync.RWMutex
@@ -157,24 +194,12 @@ func (ps *peerSet) Unregister(id string) error {
        return nil
 }
 
-func (ps *peerSet) DropPeer(id string) error {
-       ps.lock.Lock()
-       defer ps.lock.Unlock()
-
-       peer, ok := ps.peers[id]
-       if !ok {
-               return errNotRegistered
-       }
-       peer.CloseConn()
-       return nil
-}
-
 // Peer retrieves the registered peer with the given id.
-func (ps *peerSet) Peer(id string) *peer {
+func (ps *peerSet) Peer(id string) (*peer, bool) {
        ps.lock.RLock()
        defer ps.lock.RUnlock()
-
-       return ps.peers[id]
+       p, ok := ps.peers[id]
+       return p, ok
 }
 
 // Len returns if the current number of peers in the set.
@@ -247,7 +272,7 @@ func (ps *peerSet) BestPeer() (*p2p.Peer, uint64) {
 
        for _, p := range ps.peers {
                if bestPeer == nil || p.height > bestHeight {
-                       bestPeer, bestHeight = p.Peer, p.height
+                       bestPeer, bestHeight = p.swPeer, p.height
                }
        }
 
@@ -261,7 +286,7 @@ func (ps *peerSet) Close() {
        defer ps.lock.Unlock()
 
        for _, p := range ps.peers {
-               p.CloseConn()
+               p.swPeer.CloseConn()
        }
        ps.closed = true
 }
@@ -297,10 +322,7 @@ func (ps *peerSet) SetPeerStatus(peerID string, height uint64, hash *bc.Hash) {
 }
 
 func (ps *peerSet) requestBlockByHash(peerID string, hash *bc.Hash) error {
-       ps.lock.Lock()
-       defer ps.lock.Unlock()
-
-       peer, ok := ps.peers[peerID]
+       peer, ok := ps.Peer(peerID)
        if !ok {
                return errors.New("Can't find peer. ")
        }
@@ -308,39 +330,52 @@ func (ps *peerSet) requestBlockByHash(peerID string, hash *bc.Hash) error {
 }
 
 func (ps *peerSet) requestBlockByHeight(peerID string, height uint64) error {
-       ps.lock.Lock()
-       defer ps.lock.Unlock()
-
-       peer, ok := ps.peers[peerID]
+       peer, ok := ps.Peer(peerID)
        if !ok {
                return errors.New("Can't find peer. ")
        }
        return peer.requestBlockByHeight(height)
 }
 
-func (ps *peerSet) BroadcastMinedBlock(block *types.Block) error {
+func (ps *peerSet) BroadcastMinedBlock(block *types.Block) ([]*peer, error) {
        msg, err := NewMinedBlockMessage(block)
        if err != nil {
-               return errors.New("Failed construction block msg")
+               return nil, errors.New("Failed construction block msg")
        }
        hash := block.Hash()
        peers := ps.PeersWithoutBlock(&hash)
+       abnormalPeers := make([]*peer, 0)
        for _, peer := range peers {
-               ps.MarkBlock(peer.Key, &hash)
-               peer.Send(BlockchainChannel, struct{ BlockchainMessage }{msg})
+               if ok := peer.swPeer.TrySend(BlockchainChannel, struct{ BlockchainMessage }{msg}); !ok {
+                       abnormalPeers = append(abnormalPeers, peer)
+                       continue
+               }
+               if p, ok := ps.Peer(peer.id); ok {
+                       p.MarkBlock(&hash)
+               }
        }
-       return nil
+       return abnormalPeers, nil
+}
+
+func (ps *peerSet) BroadcastNewStatus(block *types.Block) ([]*peer, error) {
+       return ps.BroadcastMinedBlock(block)
 }
 
-func (ps *peerSet) BroadcastTx(tx *types.Tx) error {
+func (ps *peerSet) BroadcastTx(tx *types.Tx) ([]*peer, error) {
        msg, err := NewTransactionNotifyMessage(tx)
        if err != nil {
-               return errors.New("Failed construction tx msg")
+               return nil, errors.New("Failed construction tx msg")
        }
        peers := ps.PeersWithoutTx(&tx.ID)
+       abnormalPeers := make([]*peer, 0)
        for _, peer := range peers {
-               ps.peers[peer.Key].MarkTransaction(&tx.ID)
-               peer.Send(BlockchainChannel, struct{ BlockchainMessage }{msg})
+               if ok := peer.swPeer.TrySend(BlockchainChannel, struct{ BlockchainMessage }{msg}); !ok {
+                       abnormalPeers = append(abnormalPeers, peer)
+                       continue
+               }
+               if p, ok := ps.Peer(peer.id); ok {
+                       p.MarkTransaction(&tx.ID)
+               }
        }
-       return nil
+       return abnormalPeers, nil
 }