OSDN Git Service

Optimize status message process (#66)
[bytom/vapor.git] / netsync / peer.go
index b024989..246f1bc 100644 (file)
@@ -23,6 +23,8 @@ const (
        defaultBanThreshold = uint32(100)
 )
 
+var errSendStatusMsg = errors.New("send status msg fail")
+
 //BasePeer is the interface for connection level peer
 type BasePeer interface {
        Addr() net.Addr
@@ -63,15 +65,14 @@ type peer struct {
        banScore    trust.DynamicBanScore
        knownTxs    *set.Set // Set of transaction hashes known to be known by this peer
        knownBlocks *set.Set // Set of block hashes known to be known by this peer
+       knownStatus uint64   // Set of chain status known to be known by this peer
        filterAdds  *set.Set // Set of addresses that the spv node cares about.
 }
 
-func newPeer(height uint64, hash *bc.Hash, basePeer BasePeer) *peer {
+func newPeer(basePeer BasePeer) *peer {
        return &peer{
                BasePeer:    basePeer,
                services:    basePeer.ServiceFlag(),
-               height:      height,
-               hash:        hash,
                knownTxs:    set.New(),
                knownBlocks: set.New(),
                filterAdds:  set.New(),
@@ -216,6 +217,13 @@ func (p *peer) markBlock(hash *bc.Hash) {
        p.knownBlocks.Add(hash.String())
 }
 
+func (p *peer) markNewStatus(height uint64) {
+       p.mtx.Lock()
+       defer p.mtx.Unlock()
+
+       p.knownStatus = height
+}
+
 func (p *peer) markTransaction(hash *bc.Hash) {
        p.mtx.Lock()
        defer p.mtx.Unlock()
@@ -310,6 +318,15 @@ func (p *peer) sendTransactions(txs []*types.Tx) (bool, error) {
        return true, nil
 }
 
+func (p *peer) sendStatus(header *types.BlockHeader) error {
+       msg := NewStatusMessage(header)
+       if ok := p.TrySend(BlockchainChannel, struct{ BlockchainMessage }{msg}); !ok {
+               return errSendStatusMsg
+       }
+       p.markNewStatus(header.Height)
+       return nil
+}
+
 func (p *peer) setStatus(height uint64, hash *bc.Hash) {
        p.mtx.Lock()
        defer p.mtx.Unlock()
@@ -348,12 +365,12 @@ func (ps *peerSet) addBanScore(peerID string, persistent, transient uint32, reas
        ps.removePeer(peerID)
 }
 
-func (ps *peerSet) addPeer(peer BasePeer, height uint64, hash *bc.Hash) {
+func (ps *peerSet) addPeer(peer BasePeer) {
        ps.mtx.Lock()
        defer ps.mtx.Unlock()
 
        if _, ok := ps.peers[peer.ID()]; !ok {
-               ps.peers[peer.ID()] = newPeer(height, hash, peer)
+               ps.peers[peer.ID()] = newPeer(peer)
                return
        }
        log.WithField("module", logModule).Warning("add existing peer to blockKeeper")
@@ -393,22 +410,21 @@ func (ps *peerSet) broadcastMinedBlock(block *types.Block) error {
                        continue
                }
                peer.markBlock(&hash)
+               peer.markNewStatus(block.Height)
        }
        return nil
 }
 
-func (ps *peerSet) broadcastNewStatus(bestBlock, genesisBlock *types.Block) error {
-       bestBlockHash := bestBlock.Hash()
-       peers := ps.peersWithoutBlock(&bestBlockHash)
-
-       genesisHash := genesisBlock.Hash()
-       msg := NewStatusResponseMessage(&bestBlock.BlockHeader, &genesisHash)
+func (ps *peerSet) broadcastNewStatus(bestBlock *types.Block) error {
+       msg := NewStatusMessage(&bestBlock.BlockHeader)
+       peers := ps.peersWithoutNewStatus(bestBlock.Height)
        for _, peer := range peers {
                if ok := peer.TrySend(BlockchainChannel, struct{ BlockchainMessage }{msg}); !ok {
-                       log.WithFields(log.Fields{"module": logModule, "peer": peer.Addr(), "type": reflect.TypeOf(msg), "message": msg.String()}).Warning("send message to peer error")
                        ps.removePeer(peer.ID())
                        continue
                }
+
+               peer.markNewStatus(bestBlock.Height)
        }
        return nil
 }
@@ -478,6 +494,19 @@ func (ps *peerSet) peersWithoutBlock(hash *bc.Hash) []*peer {
        return peers
 }
 
+func (ps *peerSet) peersWithoutNewStatus(height uint64) []*peer {
+       ps.mtx.RLock()
+       defer ps.mtx.RUnlock()
+
+       var peers []*peer
+       for _, peer := range ps.peers {
+               if peer.knownStatus < height {
+                       peers = append(peers, peer)
+               }
+       }
+       return peers
+}
+
 func (ps *peerSet) peersWithoutTx(hash *bc.Hash) []*peer {
        ps.mtx.RLock()
        defer ps.mtx.RUnlock()