OSDN Git Service

fix ban node failed (#256)
[bytom/vapor.git] / netsync / peers / peer.go
index 1f0ac24..e74a458 100644 (file)
@@ -36,6 +36,7 @@ var (
 type BasePeer interface {
        Addr() net.Addr
        ID() string
+       RemoteAddrHost() string
        ServiceFlag() consensus.ServiceFlag
        TrafficStatus() (*flowrate.Status, *flowrate.Status)
        TrySend(byte, interface{}) bool
@@ -45,7 +46,7 @@ type BasePeer interface {
 //BasePeerSet is the intergace for connection level peer manager
 type BasePeerSet interface {
        StopPeerGracefully(string)
-       IsBanned(peerID string, level byte, reason string) bool
+       IsBanned(ip string, level byte, reason string) bool
 }
 
 type BroadcastMsg interface {
@@ -73,15 +74,17 @@ type PeerInfo struct {
 
 type Peer struct {
        BasePeer
-       mtx             sync.RWMutex
-       services        consensus.ServiceFlag
-       height          uint64
-       hash            *bc.Hash
-       knownTxs        *set.Set // Set of transaction hashes known to be known by this peer
-       knownBlocks     *set.Set // Set of block hashes known to be known by this peer
-       knownSignatures *set.Set // Set of block signatures known to be known by this peer
-       knownStatus     uint64   // Set of chain status known to be known by this peer
-       filterAdds      *set.Set // Set of addresses that the spv node cares about.
+       mtx                sync.RWMutex
+       services           consensus.ServiceFlag
+       bestHeight         uint64
+       bestHash           *bc.Hash
+       irreversibleHeight uint64
+       irreversibleHash   *bc.Hash
+       knownTxs           *set.Set // Set of transaction hashes known to be known by this peer
+       knownBlocks        *set.Set // Set of block hashes known to be known by this peer
+       knownSignatures    *set.Set // Set of block signatures known to be known by this peer
+       knownStatus        uint64   // Set of chain status known to be known by this peer
+       filterAdds         *set.Set // Set of addresses that the spv node cares about.
 }
 
 func newPeer(basePeer BasePeer) *Peer {
@@ -98,7 +101,15 @@ func newPeer(basePeer BasePeer) *Peer {
 func (p *Peer) Height() uint64 {
        p.mtx.RLock()
        defer p.mtx.RUnlock()
-       return p.height
+
+       return p.bestHeight
+}
+
+func (p *Peer) IrreversibleHeight() uint64 {
+       p.mtx.RLock()
+       defer p.mtx.RUnlock()
+
+       return p.irreversibleHeight
 }
 
 func (p *Peer) AddFilterAddress(address []byte) {
@@ -140,8 +151,8 @@ func (p *Peer) GetBlocks(locator []*bc.Hash, stopHash *bc.Hash) bool {
        return p.TrySend(msgs.BlockchainChannel, msg)
 }
 
-func (p *Peer) GetHeaders(locator []*bc.Hash, stopHash *bc.Hash) bool {
-       msg := struct{ msgs.BlockchainMessage }{msgs.NewGetHeadersMessage(locator, stopHash)}
+func (p *Peer) GetHeaders(locator []*bc.Hash, stopHash *bc.Hash, skip uint64) bool {
+       msg := struct{ msgs.BlockchainMessage }{msgs.NewGetHeadersMessage(locator, stopHash, skip)}
        return p.TrySend(msgs.BlockchainChannel, msg)
 }
 
@@ -158,7 +169,7 @@ func (p *Peer) GetPeerInfo() *PeerInfo {
        return &PeerInfo{
                ID:                  p.ID(),
                RemoteAddr:          p.Addr().String(),
-               Height:              p.height,
+               Height:              p.bestHeight,
                Ping:                ping.String(),
                Duration:            sentStatus.Duration.String(),
                TotalSent:           sentStatus.Bytes,
@@ -360,20 +371,29 @@ func (p *Peer) SendTransactions(txs []*types.Tx) error {
        return nil
 }
 
-func (p *Peer) SendStatus(header *types.BlockHeader) error {
-       msg := msgs.NewStatusMessage(header)
+func (p *Peer) SendStatus(bestHeader, irreversibleHeader *types.BlockHeader) error {
+       msg := msgs.NewStatusMessage(bestHeader, irreversibleHeader)
        if ok := p.TrySend(msgs.BlockchainChannel, struct{ msgs.BlockchainMessage }{msg}); !ok {
                return errSendStatusMsg
        }
-       p.markNewStatus(header.Height)
+       p.markNewStatus(bestHeader.Height)
        return nil
 }
 
-func (p *Peer) SetStatus(height uint64, hash *bc.Hash) {
+func (p *Peer) SetBestStatus(bestHeight uint64, bestHash *bc.Hash) {
+       p.mtx.Lock()
+       defer p.mtx.Unlock()
+
+       p.bestHeight = bestHeight
+       p.bestHash = bestHash
+}
+
+func (p *Peer) SetIrreversibleStatus(irreversibleHeight uint64, irreversibleHash *bc.Hash) {
        p.mtx.Lock()
        defer p.mtx.Unlock()
-       p.height = height
-       p.hash = hash
+
+       p.irreversibleHeight = irreversibleHeight
+       p.irreversibleHash = irreversibleHash
 }
 
 type PeerSet struct {
@@ -398,7 +418,8 @@ func (ps *PeerSet) ProcessIllegal(peerID string, level byte, reason string) {
        if peer == nil {
                return
        }
-       if banned := ps.IsBanned(peer.Addr().String(), level, reason); banned {
+
+       if banned := ps.IsBanned(peer.RemoteAddrHost(), level, reason); banned {
                ps.RemovePeer(peerID)
        }
        return
@@ -424,7 +445,23 @@ func (ps *PeerSet) BestPeer(flag consensus.ServiceFlag) *Peer {
                if !p.services.IsEnable(flag) {
                        continue
                }
-               if bestPeer == nil || p.height > bestPeer.height || (p.height == bestPeer.height && p.IsLAN()) {
+               if bestPeer == nil || p.bestHeight > bestPeer.bestHeight || (p.bestHeight == bestPeer.bestHeight && p.IsLAN()) {
+                       bestPeer = p
+               }
+       }
+       return bestPeer
+}
+
+func (ps *PeerSet) BestIrreversiblePeer(flag consensus.ServiceFlag) *Peer {
+       ps.mtx.RLock()
+       defer ps.mtx.RUnlock()
+
+       var bestPeer *Peer
+       for _, p := range ps.peers {
+               if !p.services.IsEnable(flag) {
+                       continue
+               }
+               if bestPeer == nil || p.irreversibleHeight > bestPeer.irreversibleHeight || (p.irreversibleHeight == bestPeer.irreversibleHeight && p.IsLAN()) {
                        bestPeer = p
                }
        }
@@ -466,16 +503,16 @@ func (ps *PeerSet) BroadcastMsg(bm BroadcastMsg) error {
        return nil
 }
 
-func (ps *PeerSet) BroadcastNewStatus(bestBlock *types.Block) error {
-       msg := msgs.NewStatusMessage(&bestBlock.BlockHeader)
-       peers := ps.peersWithoutNewStatus(bestBlock.Height)
+func (ps *PeerSet) BroadcastNewStatus(bestHeader, irreversibleHeader *types.BlockHeader) error {
+       msg := msgs.NewStatusMessage(bestHeader, irreversibleHeader)
+       peers := ps.peersWithoutNewStatus(bestHeader.Height)
        for _, peer := range peers {
                if ok := peer.TrySend(msgs.BlockchainChannel, struct{ msgs.BlockchainMessage }{msg}); !ok {
                        ps.RemovePeer(peer.ID())
                        continue
                }
 
-               peer.markNewStatus(bestBlock.Height)
+               peer.markNewStatus(bestHeader.Height)
        }
        return nil
 }
@@ -619,5 +656,5 @@ func (ps *PeerSet) SetStatus(peerID string, height uint64, hash *bc.Hash) {
                return
        }
 
-       peer.SetStatus(height, hash)
+       peer.SetBestStatus(height, hash)
 }