OSDN Git Service

netsync add test case (#365)
[bytom/vapor.git] / netsync / peers / peer.go
index 1f0ac24..ef90812 100644 (file)
@@ -30,12 +30,14 @@ const (
 var (
        errSendStatusMsg = errors.New("send status msg fail")
        ErrPeerMisbehave = errors.New("peer is misbehave")
+       ErrNoValidPeer   = errors.New("Can't find valid fast sync peer")
 )
 
 //BasePeer is the interface for connection level peer
 type BasePeer interface {
        Addr() net.Addr
        ID() string
+       RemoteAddrHost() string
        ServiceFlag() consensus.ServiceFlag
        TrafficStatus() (*flowrate.Status, *flowrate.Status)
        TrySend(byte, interface{}) bool
@@ -45,7 +47,7 @@ type BasePeer interface {
 //BasePeerSet is the intergace for connection level peer manager
 type BasePeerSet interface {
        StopPeerGracefully(string)
-       IsBanned(peerID string, level byte, reason string) bool
+       IsBanned(ip string, level byte, reason string) bool
 }
 
 type BroadcastMsg interface {
@@ -73,15 +75,17 @@ type PeerInfo struct {
 
 type Peer struct {
        BasePeer
-       mtx             sync.RWMutex
-       services        consensus.ServiceFlag
-       height          uint64
-       hash            *bc.Hash
-       knownTxs        *set.Set // Set of transaction hashes known to be known by this peer
-       knownBlocks     *set.Set // Set of block hashes known to be known by this peer
-       knownSignatures *set.Set // Set of block signatures known to be known by this peer
-       knownStatus     uint64   // Set of chain status known to be known by this peer
-       filterAdds      *set.Set // Set of addresses that the spv node cares about.
+       mtx                sync.RWMutex
+       services           consensus.ServiceFlag
+       bestHeight         uint64
+       bestHash           *bc.Hash
+       irreversibleHeight uint64
+       irreversibleHash   *bc.Hash
+       knownTxs           *set.Set // Set of transaction hashes known to be known by this peer
+       knownBlocks        *set.Set // Set of block hashes known to be known by this peer
+       knownSignatures    *set.Set // Set of block signatures known to be known by this peer
+       knownStatus        uint64   // Set of chain status known to be known by this peer
+       filterAdds         *set.Set // Set of addresses that the spv node cares about.
 }
 
 func newPeer(basePeer BasePeer) *Peer {
@@ -98,7 +102,15 @@ func newPeer(basePeer BasePeer) *Peer {
 func (p *Peer) Height() uint64 {
        p.mtx.RLock()
        defer p.mtx.RUnlock()
-       return p.height
+
+       return p.bestHeight
+}
+
+func (p *Peer) IrreversibleHeight() uint64 {
+       p.mtx.RLock()
+       defer p.mtx.RUnlock()
+
+       return p.irreversibleHeight
 }
 
 func (p *Peer) AddFilterAddress(address []byte) {
@@ -140,8 +152,8 @@ func (p *Peer) GetBlocks(locator []*bc.Hash, stopHash *bc.Hash) bool {
        return p.TrySend(msgs.BlockchainChannel, msg)
 }
 
-func (p *Peer) GetHeaders(locator []*bc.Hash, stopHash *bc.Hash) bool {
-       msg := struct{ msgs.BlockchainMessage }{msgs.NewGetHeadersMessage(locator, stopHash)}
+func (p *Peer) GetHeaders(locator []*bc.Hash, stopHash *bc.Hash, skip uint64) bool {
+       msg := struct{ msgs.BlockchainMessage }{msgs.NewGetHeadersMessage(locator, stopHash, skip)}
        return p.TrySend(msgs.BlockchainChannel, msg)
 }
 
@@ -158,7 +170,7 @@ func (p *Peer) GetPeerInfo() *PeerInfo {
        return &PeerInfo{
                ID:                  p.ID(),
                RemoteAddr:          p.Addr().String(),
-               Height:              p.height,
+               Height:              p.bestHeight,
                Ping:                ping.String(),
                Duration:            sentStatus.Duration.String(),
                TotalSent:           sentStatus.Bytes,
@@ -360,20 +372,29 @@ func (p *Peer) SendTransactions(txs []*types.Tx) error {
        return nil
 }
 
-func (p *Peer) SendStatus(header *types.BlockHeader) error {
-       msg := msgs.NewStatusMessage(header)
+func (p *Peer) SendStatus(bestHeader, irreversibleHeader *types.BlockHeader) error {
+       msg := msgs.NewStatusMessage(bestHeader, irreversibleHeader)
        if ok := p.TrySend(msgs.BlockchainChannel, struct{ msgs.BlockchainMessage }{msg}); !ok {
                return errSendStatusMsg
        }
-       p.markNewStatus(header.Height)
+       p.markNewStatus(bestHeader.Height)
        return nil
 }
 
-func (p *Peer) SetStatus(height uint64, hash *bc.Hash) {
+func (p *Peer) SetBestStatus(bestHeight uint64, bestHash *bc.Hash) {
        p.mtx.Lock()
        defer p.mtx.Unlock()
-       p.height = height
-       p.hash = hash
+
+       p.bestHeight = bestHeight
+       p.bestHash = bestHash
+}
+
+func (p *Peer) SetIrreversibleStatus(irreversibleHeight uint64, irreversibleHash *bc.Hash) {
+       p.mtx.Lock()
+       defer p.mtx.Unlock()
+
+       p.irreversibleHeight = irreversibleHeight
+       p.irreversibleHash = irreversibleHash
 }
 
 type PeerSet struct {
@@ -398,7 +419,8 @@ func (ps *PeerSet) ProcessIllegal(peerID string, level byte, reason string) {
        if peer == nil {
                return
        }
-       if banned := ps.IsBanned(peer.Addr().String(), level, reason); banned {
+
+       if banned := ps.IsBanned(peer.RemoteAddrHost(), level, reason); banned {
                ps.RemovePeer(peerID)
        }
        return
@@ -424,7 +446,23 @@ func (ps *PeerSet) BestPeer(flag consensus.ServiceFlag) *Peer {
                if !p.services.IsEnable(flag) {
                        continue
                }
-               if bestPeer == nil || p.height > bestPeer.height || (p.height == bestPeer.height && p.IsLAN()) {
+               if bestPeer == nil || p.bestHeight > bestPeer.bestHeight || (p.bestHeight == bestPeer.bestHeight && p.IsLAN()) {
+                       bestPeer = p
+               }
+       }
+       return bestPeer
+}
+
+func (ps *PeerSet) BestIrreversiblePeer(flag consensus.ServiceFlag) *Peer {
+       ps.mtx.RLock()
+       defer ps.mtx.RUnlock()
+
+       var bestPeer *Peer
+       for _, p := range ps.peers {
+               if !p.services.IsEnable(flag) {
+                       continue
+               }
+               if bestPeer == nil || p.irreversibleHeight > bestPeer.irreversibleHeight || (p.irreversibleHeight == bestPeer.irreversibleHeight && p.IsLAN()) {
                        bestPeer = p
                }
        }
@@ -466,16 +504,16 @@ func (ps *PeerSet) BroadcastMsg(bm BroadcastMsg) error {
        return nil
 }
 
-func (ps *PeerSet) BroadcastNewStatus(bestBlock *types.Block) error {
-       msg := msgs.NewStatusMessage(&bestBlock.BlockHeader)
-       peers := ps.peersWithoutNewStatus(bestBlock.Height)
+func (ps *PeerSet) BroadcastNewStatus(bestHeader, irreversibleHeader *types.BlockHeader) error {
+       msg := msgs.NewStatusMessage(bestHeader, irreversibleHeader)
+       peers := ps.peersWithoutNewStatus(bestHeader.Height)
        for _, peer := range peers {
                if ok := peer.TrySend(msgs.BlockchainChannel, struct{ msgs.BlockchainMessage }{msg}); !ok {
                        ps.RemovePeer(peer.ID())
                        continue
                }
 
-               peer.markNewStatus(bestBlock.Height)
+               peer.markNewStatus(bestHeader.Height)
        }
        return nil
 }
@@ -521,6 +559,19 @@ func (ps *PeerSet) GetPeer(id string) *Peer {
        return ps.peers[id]
 }
 
+func (ps *PeerSet) GetPeersByHeight(height uint64) []*Peer {
+       ps.mtx.RLock()
+       defer ps.mtx.RUnlock()
+
+       peers := []*Peer{}
+       for _, peer := range ps.peers {
+               if peer.Height() >= height {
+                       peers = append(peers, peer)
+               }
+       }
+       return peers
+}
+
 func (ps *PeerSet) GetPeerInfos() []*PeerInfo {
        ps.mtx.RLock()
        defer ps.mtx.RUnlock()
@@ -619,5 +670,21 @@ func (ps *PeerSet) SetStatus(peerID string, height uint64, hash *bc.Hash) {
                return
        }
 
-       peer.SetStatus(height, hash)
+       peer.SetBestStatus(height, hash)
+}
+
+func (ps *PeerSet) SetIrreversibleStatus(peerID string, height uint64, hash *bc.Hash) {
+       peer := ps.GetPeer(peerID)
+       if peer == nil {
+               return
+       }
+
+       peer.SetIrreversibleStatus(height, hash)
+}
+
+func (ps *PeerSet) Size() int {
+       ps.mtx.RLock()
+       defer ps.mtx.RUnlock()
+
+       return len(ps.peers)
 }