OSDN Git Service

netsync: add txs msg (#78)
[bytom/vapor.git] / netsync / peer.go
index 246f1bc..794501b 100644 (file)
@@ -297,25 +297,35 @@ func (p *peer) sendMerkleBlock(block *types.Block, txStatuses *bc.TransactionSta
        return ok, nil
 }
 
-func (p *peer) sendTransactions(txs []*types.Tx) (bool, error) {
-       for _, tx := range txs {
-               if p.isSPVNode() && !p.isRelatedTx(tx) {
+func (p *peer) sendTransactions(txs []*types.Tx) error {
+       validTxs := make([]*types.Tx, 0, len(txs))
+       for i, tx := range txs {
+               if p.isSPVNode() && !p.isRelatedTx(tx) || p.knownTxs.Has(tx.ID.String()) {
                        continue
                }
-               msg, err := NewTransactionMessage(tx)
-               if err != nil {
-                       return false, errors.Wrap(err, "failed to tx msg")
-               }
 
-               if p.knownTxs.Has(tx.ID.String()) {
+               validTxs = append(validTxs, tx)
+               if len(validTxs) != txsMsgMaxTxNum && i != len(txs)-1 {
                        continue
                }
+
+               msg, err := NewTransactionsMessage(validTxs)
+               if err != nil {
+                       return err
+               }
+
                if ok := p.TrySend(BlockchainChannel, struct{ BlockchainMessage }{msg}); !ok {
-                       return ok, nil
+                       return errors.New("failed to send txs msg")
+               }
+
+               for _, validTx := range validTxs {
+                       p.knownTxs.Add(validTx.ID.String())
                }
-               p.knownTxs.Add(tx.ID.String())
+
+               validTxs = make([]*types.Tx, 0, len(txs))
        }
-       return true, nil
+
+       return nil
 }
 
 func (p *peer) sendStatus(header *types.BlockHeader) error {
@@ -481,6 +491,17 @@ func (ps *peerSet) getPeerInfos() []*PeerInfo {
        return result
 }
 
+func (ps *peerSet) markTx(peerID string, txHash bc.Hash) {
+       ps.mtx.Lock()
+       peer := ps.peers[peerID]
+       ps.mtx.Unlock()
+
+       if peer == nil {
+               return
+       }
+       peer.markTransaction(&txHash)
+}
+
 func (ps *peerSet) peersWithoutBlock(hash *bc.Hash) []*peer {
        ps.mtx.RLock()
        defer ps.mtx.RUnlock()