OSDN Git Service

dispatch signature when proccess block (#85)
[bytom/vapor.git] / netsync / peer.go
index 92d7199..794501b 100644 (file)
@@ -23,6 +23,8 @@ const (
        defaultBanThreshold = uint32(100)
 )
 
+var errSendStatusMsg = errors.New("send status msg fail")
+
 //BasePeer is the interface for connection level peer
 type BasePeer interface {
        Addr() net.Addr
@@ -63,15 +65,14 @@ type peer struct {
        banScore    trust.DynamicBanScore
        knownTxs    *set.Set // Set of transaction hashes known to be known by this peer
        knownBlocks *set.Set // Set of block hashes known to be known by this peer
+       knownStatus uint64   // Set of chain status known to be known by this peer
        filterAdds  *set.Set // Set of addresses that the spv node cares about.
 }
 
-func newPeer(height uint64, hash *bc.Hash, basePeer BasePeer) *peer {
+func newPeer(basePeer BasePeer) *peer {
        return &peer{
                BasePeer:    basePeer,
                services:    basePeer.ServiceFlag(),
-               height:      height,
-               hash:        hash,
                knownTxs:    set.New(),
                knownBlocks: set.New(),
                filterAdds:  set.New(),
@@ -195,7 +196,7 @@ func (p *peer) isRelatedTx(tx *types.Tx) bool {
                }
        }
        for _, output := range tx.Outputs {
-               if p.filterAdds.Has(hex.EncodeToString(output.ControlProgram)) {
+               if p.filterAdds.Has(hex.EncodeToString(output.ControlProgram())) {
                        return true
                }
        }
@@ -216,6 +217,13 @@ func (p *peer) markBlock(hash *bc.Hash) {
        p.knownBlocks.Add(hash.String())
 }
 
+func (p *peer) markNewStatus(height uint64) {
+       p.mtx.Lock()
+       defer p.mtx.Unlock()
+
+       p.knownStatus = height
+}
+
 func (p *peer) markTransaction(hash *bc.Hash) {
        p.mtx.Lock()
        defer p.mtx.Unlock()
@@ -289,25 +297,44 @@ func (p *peer) sendMerkleBlock(block *types.Block, txStatuses *bc.TransactionSta
        return ok, nil
 }
 
-func (p *peer) sendTransactions(txs []*types.Tx) (bool, error) {
-       for _, tx := range txs {
-               if p.isSPVNode() && !p.isRelatedTx(tx) {
+func (p *peer) sendTransactions(txs []*types.Tx) error {
+       validTxs := make([]*types.Tx, 0, len(txs))
+       for i, tx := range txs {
+               if p.isSPVNode() && !p.isRelatedTx(tx) || p.knownTxs.Has(tx.ID.String()) {
                        continue
                }
-               msg, err := NewTransactionMessage(tx)
-               if err != nil {
-                       return false, errors.Wrap(err, "failed to tx msg")
-               }
 
-               if p.knownTxs.Has(tx.ID.String()) {
+               validTxs = append(validTxs, tx)
+               if len(validTxs) != txsMsgMaxTxNum && i != len(txs)-1 {
                        continue
                }
+
+               msg, err := NewTransactionsMessage(validTxs)
+               if err != nil {
+                       return err
+               }
+
                if ok := p.TrySend(BlockchainChannel, struct{ BlockchainMessage }{msg}); !ok {
-                       return ok, nil
+                       return errors.New("failed to send txs msg")
+               }
+
+               for _, validTx := range validTxs {
+                       p.knownTxs.Add(validTx.ID.String())
                }
-               p.knownTxs.Add(tx.ID.String())
+
+               validTxs = make([]*types.Tx, 0, len(txs))
        }
-       return true, nil
+
+       return nil
+}
+
+func (p *peer) sendStatus(header *types.BlockHeader) error {
+       msg := NewStatusMessage(header)
+       if ok := p.TrySend(BlockchainChannel, struct{ BlockchainMessage }{msg}); !ok {
+               return errSendStatusMsg
+       }
+       p.markNewStatus(header.Height)
+       return nil
 }
 
 func (p *peer) setStatus(height uint64, hash *bc.Hash) {
@@ -348,12 +375,12 @@ func (ps *peerSet) addBanScore(peerID string, persistent, transient uint32, reas
        ps.removePeer(peerID)
 }
 
-func (ps *peerSet) addPeer(peer BasePeer, height uint64, hash *bc.Hash) {
+func (ps *peerSet) addPeer(peer BasePeer) {
        ps.mtx.Lock()
        defer ps.mtx.Unlock()
 
        if _, ok := ps.peers[peer.ID()]; !ok {
-               ps.peers[peer.ID()] = newPeer(height, hash, peer)
+               ps.peers[peer.ID()] = newPeer(peer)
                return
        }
        log.WithField("module", logModule).Warning("add existing peer to blockKeeper")
@@ -393,22 +420,21 @@ func (ps *peerSet) broadcastMinedBlock(block *types.Block) error {
                        continue
                }
                peer.markBlock(&hash)
+               peer.markNewStatus(block.Height)
        }
        return nil
 }
 
-func (ps *peerSet) broadcastNewStatus(bestBlock, genesisBlock *types.Block) error {
-       bestBlockHash := bestBlock.Hash()
-       peers := ps.peersWithoutBlock(&bestBlockHash)
-
-       genesisHash := genesisBlock.Hash()
-       msg := NewStatusResponseMessage(&bestBlock.BlockHeader, &genesisHash)
+func (ps *peerSet) broadcastNewStatus(bestBlock *types.Block) error {
+       msg := NewStatusMessage(&bestBlock.BlockHeader)
+       peers := ps.peersWithoutNewStatus(bestBlock.Height)
        for _, peer := range peers {
                if ok := peer.TrySend(BlockchainChannel, struct{ BlockchainMessage }{msg}); !ok {
-                       log.WithFields(log.Fields{"module": logModule, "peer": peer.Addr(), "type": reflect.TypeOf(msg), "message": msg.String()}).Warning("send message to peer error")
                        ps.removePeer(peer.ID())
                        continue
                }
+
+               peer.markNewStatus(bestBlock.Height)
        }
        return nil
 }
@@ -465,6 +491,17 @@ func (ps *peerSet) getPeerInfos() []*PeerInfo {
        return result
 }
 
+func (ps *peerSet) markTx(peerID string, txHash bc.Hash) {
+       ps.mtx.Lock()
+       peer := ps.peers[peerID]
+       ps.mtx.Unlock()
+
+       if peer == nil {
+               return
+       }
+       peer.markTransaction(&txHash)
+}
+
 func (ps *peerSet) peersWithoutBlock(hash *bc.Hash) []*peer {
        ps.mtx.RLock()
        defer ps.mtx.RUnlock()
@@ -478,6 +515,19 @@ func (ps *peerSet) peersWithoutBlock(hash *bc.Hash) []*peer {
        return peers
 }
 
+func (ps *peerSet) peersWithoutNewStatus(height uint64) []*peer {
+       ps.mtx.RLock()
+       defer ps.mtx.RUnlock()
+
+       var peers []*peer
+       for _, peer := range ps.peers {
+               if peer.knownStatus < height {
+                       peers = append(peers, peer)
+               }
+       }
+       return peers
+}
+
 func (ps *peerSet) peersWithoutTx(hash *bc.Hash) []*peer {
        ps.mtx.RLock()
        defer ps.mtx.RUnlock()