OSDN Git Service

netsync add test case (#365)
[bytom/vapor.git] / netsync / peers / peer.go
index 0f62149..ef90812 100644 (file)
@@ -13,7 +13,6 @@ import (
        "github.com/vapor/consensus"
        "github.com/vapor/errors"
        msgs "github.com/vapor/netsync/messages"
-       "github.com/vapor/p2p/trust"
        "github.com/vapor/protocol/bc"
        "github.com/vapor/protocol/bc/types"
 )
@@ -22,7 +21,6 @@ const (
        maxKnownTxs           = 32768 // Maximum transactions hashes to keep in the known list (prevent DOS)
        maxKnownSignatures    = 1024  // Maximum block signatures to keep in the known list (prevent DOS)
        maxKnownBlocks        = 1024  // Maximum block hashes to keep in the known list (prevent DOS)
-       defaultBanThreshold   = uint32(100)
        maxFilterAddressSize  = 50
        maxFilterAddressCount = 1000
 
@@ -32,12 +30,14 @@ const (
 var (
        errSendStatusMsg = errors.New("send status msg fail")
        ErrPeerMisbehave = errors.New("peer is misbehave")
+       ErrNoValidPeer   = errors.New("Can't find valid fast sync peer")
 )
 
 //BasePeer is the interface for connection level peer
 type BasePeer interface {
        Addr() net.Addr
        ID() string
+       RemoteAddrHost() string
        ServiceFlag() consensus.ServiceFlag
        TrafficStatus() (*flowrate.Status, *flowrate.Status)
        TrySend(byte, interface{}) bool
@@ -46,8 +46,8 @@ type BasePeer interface {
 
 //BasePeerSet is the intergace for connection level peer manager
 type BasePeerSet interface {
-       AddBannedPeer(string) error
        StopPeerGracefully(string)
+       IsBanned(ip string, level byte, reason string) bool
 }
 
 type BroadcastMsg interface {
@@ -75,16 +75,17 @@ type PeerInfo struct {
 
 type Peer struct {
        BasePeer
-       mtx             sync.RWMutex
-       services        consensus.ServiceFlag
-       height          uint64
-       hash            *bc.Hash
-       banScore        trust.DynamicBanScore
-       knownTxs        *set.Set // Set of transaction hashes known to be known by this peer
-       knownBlocks     *set.Set // Set of block hashes known to be known by this peer
-       knownSignatures *set.Set // Set of block signatures known to be known by this peer
-       knownStatus     uint64   // Set of chain status known to be known by this peer
-       filterAdds      *set.Set // Set of addresses that the spv node cares about.
+       mtx                sync.RWMutex
+       services           consensus.ServiceFlag
+       bestHeight         uint64
+       bestHash           *bc.Hash
+       irreversibleHeight uint64
+       irreversibleHash   *bc.Hash
+       knownTxs           *set.Set // Set of transaction hashes known to be known by this peer
+       knownBlocks        *set.Set // Set of block hashes known to be known by this peer
+       knownSignatures    *set.Set // Set of block signatures known to be known by this peer
+       knownStatus        uint64   // Set of chain status known to be known by this peer
+       filterAdds         *set.Set // Set of addresses that the spv node cares about.
 }
 
 func newPeer(basePeer BasePeer) *Peer {
@@ -101,31 +102,15 @@ func newPeer(basePeer BasePeer) *Peer {
 func (p *Peer) Height() uint64 {
        p.mtx.RLock()
        defer p.mtx.RUnlock()
-       return p.height
-}
-
-func (p *Peer) addBanScore(persistent, transient uint32, reason string) bool {
-       score := p.banScore.Increase(persistent, transient)
-       if score > defaultBanThreshold {
-               log.WithFields(log.Fields{
-                       "module":  logModule,
-                       "address": p.Addr(),
-                       "score":   score,
-                       "reason":  reason,
-               }).Errorf("banning and disconnecting")
-               return true
-       }
-
-       warnThreshold := defaultBanThreshold >> 1
-       if score > warnThreshold {
-               log.WithFields(log.Fields{
-                       "module":  logModule,
-                       "address": p.Addr(),
-                       "score":   score,
-                       "reason":  reason,
-               }).Warning("ban score increasing")
-       }
-       return false
+
+       return p.bestHeight
+}
+
+func (p *Peer) IrreversibleHeight() uint64 {
+       p.mtx.RLock()
+       defer p.mtx.RUnlock()
+
+       return p.irreversibleHeight
 }
 
 func (p *Peer) AddFilterAddress(address []byte) {
@@ -167,8 +152,8 @@ func (p *Peer) GetBlocks(locator []*bc.Hash, stopHash *bc.Hash) bool {
        return p.TrySend(msgs.BlockchainChannel, msg)
 }
 
-func (p *Peer) GetHeaders(locator []*bc.Hash, stopHash *bc.Hash) bool {
-       msg := struct{ msgs.BlockchainMessage }{msgs.NewGetHeadersMessage(locator, stopHash)}
+func (p *Peer) GetHeaders(locator []*bc.Hash, stopHash *bc.Hash, skip uint64) bool {
+       msg := struct{ msgs.BlockchainMessage }{msgs.NewGetHeadersMessage(locator, stopHash, skip)}
        return p.TrySend(msgs.BlockchainChannel, msg)
 }
 
@@ -185,7 +170,7 @@ func (p *Peer) GetPeerInfo() *PeerInfo {
        return &PeerInfo{
                ID:                  p.ID(),
                RemoteAddr:          p.Addr().String(),
-               Height:              p.height,
+               Height:              p.bestHeight,
                Ping:                ping.String(),
                Duration:            sentStatus.Duration.String(),
                TotalSent:           sentStatus.Bytes,
@@ -387,20 +372,29 @@ func (p *Peer) SendTransactions(txs []*types.Tx) error {
        return nil
 }
 
-func (p *Peer) SendStatus(header *types.BlockHeader) error {
-       msg := msgs.NewStatusMessage(header)
+func (p *Peer) SendStatus(bestHeader, irreversibleHeader *types.BlockHeader) error {
+       msg := msgs.NewStatusMessage(bestHeader, irreversibleHeader)
        if ok := p.TrySend(msgs.BlockchainChannel, struct{ msgs.BlockchainMessage }{msg}); !ok {
                return errSendStatusMsg
        }
-       p.markNewStatus(header.Height)
+       p.markNewStatus(bestHeader.Height)
        return nil
 }
 
-func (p *Peer) SetStatus(height uint64, hash *bc.Hash) {
+func (p *Peer) SetBestStatus(bestHeight uint64, bestHash *bc.Hash) {
        p.mtx.Lock()
        defer p.mtx.Unlock()
-       p.height = height
-       p.hash = hash
+
+       p.bestHeight = bestHeight
+       p.bestHash = bestHash
+}
+
+func (p *Peer) SetIrreversibleStatus(irreversibleHeight uint64, irreversibleHash *bc.Hash) {
+       p.mtx.Lock()
+       defer p.mtx.Unlock()
+
+       p.irreversibleHeight = irreversibleHeight
+       p.irreversibleHash = irreversibleHash
 }
 
 type PeerSet struct {
@@ -417,7 +411,7 @@ func NewPeerSet(basePeerSet BasePeerSet) *PeerSet {
        }
 }
 
-func (ps *PeerSet) AddBanScore(peerID string, persistent, transient uint32, reason string) {
+func (ps *PeerSet) ProcessIllegal(peerID string, level byte, reason string) {
        ps.mtx.Lock()
        peer := ps.peers[peerID]
        ps.mtx.Unlock()
@@ -425,13 +419,11 @@ func (ps *PeerSet) AddBanScore(peerID string, persistent, transient uint32, reas
        if peer == nil {
                return
        }
-       if ban := peer.addBanScore(persistent, transient, reason); !ban {
-               return
-       }
-       if err := ps.AddBannedPeer(peer.Addr().String()); err != nil {
-               log.WithFields(log.Fields{"module": logModule, "err": err}).Error("fail on add ban peer")
+
+       if banned := ps.IsBanned(peer.RemoteAddrHost(), level, reason); banned {
+               ps.RemovePeer(peerID)
        }
-       ps.RemovePeer(peerID)
+       return
 }
 
 func (ps *PeerSet) AddPeer(peer BasePeer) {
@@ -454,7 +446,23 @@ func (ps *PeerSet) BestPeer(flag consensus.ServiceFlag) *Peer {
                if !p.services.IsEnable(flag) {
                        continue
                }
-               if bestPeer == nil || p.height > bestPeer.height || (p.height == bestPeer.height && p.IsLAN()) {
+               if bestPeer == nil || p.bestHeight > bestPeer.bestHeight || (p.bestHeight == bestPeer.bestHeight && p.IsLAN()) {
+                       bestPeer = p
+               }
+       }
+       return bestPeer
+}
+
+func (ps *PeerSet) BestIrreversiblePeer(flag consensus.ServiceFlag) *Peer {
+       ps.mtx.RLock()
+       defer ps.mtx.RUnlock()
+
+       var bestPeer *Peer
+       for _, p := range ps.peers {
+               if !p.services.IsEnable(flag) {
+                       continue
+               }
+               if bestPeer == nil || p.irreversibleHeight > bestPeer.irreversibleHeight || (p.irreversibleHeight == bestPeer.irreversibleHeight && p.IsLAN()) {
                        bestPeer = p
                }
        }
@@ -496,16 +504,16 @@ func (ps *PeerSet) BroadcastMsg(bm BroadcastMsg) error {
        return nil
 }
 
-func (ps *PeerSet) BroadcastNewStatus(bestBlock *types.Block) error {
-       msg := msgs.NewStatusMessage(&bestBlock.BlockHeader)
-       peers := ps.peersWithoutNewStatus(bestBlock.Height)
+func (ps *PeerSet) BroadcastNewStatus(bestHeader, irreversibleHeader *types.BlockHeader) error {
+       msg := msgs.NewStatusMessage(bestHeader, irreversibleHeader)
+       peers := ps.peersWithoutNewStatus(bestHeader.Height)
        for _, peer := range peers {
                if ok := peer.TrySend(msgs.BlockchainChannel, struct{ msgs.BlockchainMessage }{msg}); !ok {
                        ps.RemovePeer(peer.ID())
                        continue
                }
 
-               peer.markNewStatus(bestBlock.Height)
+               peer.markNewStatus(bestHeader.Height)
        }
        return nil
 }
@@ -536,9 +544,9 @@ func (ps *PeerSet) BroadcastTx(tx *types.Tx) error {
        return nil
 }
 
-func (ps *PeerSet) ErrorHandler(peerID string, err error) {
+func (ps *PeerSet) ErrorHandler(peerID string, level byte, err error) {
        if errors.Root(err) == ErrPeerMisbehave {
-               ps.AddBanScore(peerID, 20, 0, err.Error())
+               ps.ProcessIllegal(peerID, level, err.Error())
        } else {
                ps.RemovePeer(peerID)
        }
@@ -551,6 +559,19 @@ func (ps *PeerSet) GetPeer(id string) *Peer {
        return ps.peers[id]
 }
 
+func (ps *PeerSet) GetPeersByHeight(height uint64) []*Peer {
+       ps.mtx.RLock()
+       defer ps.mtx.RUnlock()
+
+       peers := []*Peer{}
+       for _, peer := range ps.peers {
+               if peer.Height() >= height {
+                       peers = append(peers, peer)
+               }
+       }
+       return peers
+}
+
 func (ps *PeerSet) GetPeerInfos() []*PeerInfo {
        ps.mtx.RLock()
        defer ps.mtx.RUnlock()
@@ -649,5 +670,21 @@ func (ps *PeerSet) SetStatus(peerID string, height uint64, hash *bc.Hash) {
                return
        }
 
-       peer.SetStatus(height, hash)
+       peer.SetBestStatus(height, hash)
+}
+
+func (ps *PeerSet) SetIrreversibleStatus(peerID string, height uint64, hash *bc.Hash) {
+       peer := ps.GetPeer(peerID)
+       if peer == nil {
+               return
+       }
+
+       peer.SetIrreversibleStatus(height, hash)
+}
+
+func (ps *PeerSet) Size() int {
+       ps.mtx.RLock()
+       defer ps.mtx.RUnlock()
+
+       return len(ps.peers)
 }