return ok, nil
}
-func (p *peer) sendTransactions(txs []*types.Tx) (bool, error) {
- for _, tx := range txs {
- if p.isSPVNode() && !p.isRelatedTx(tx) {
+func (p *peer) sendTransactions(txs []*types.Tx) error {
+ validTxs := make([]*types.Tx, 0, len(txs))
+ for i, tx := range txs {
+ if p.isSPVNode() && !p.isRelatedTx(tx) || p.knownTxs.Has(tx.ID.String()) {
continue
}
- msg, err := NewTransactionMessage(tx)
- if err != nil {
- return false, errors.Wrap(err, "failed to tx msg")
- }
- if p.knownTxs.Has(tx.ID.String()) {
+ validTxs = append(validTxs, tx)
+ if len(validTxs) != txsMsgMaxTxNum && i != len(txs)-1 {
continue
}
+
+ msg, err := NewTransactionsMessage(validTxs)
+ if err != nil {
+ return err
+ }
+
if ok := p.TrySend(BlockchainChannel, struct{ BlockchainMessage }{msg}); !ok {
- return ok, nil
+ return errors.New("failed to send txs msg")
+ }
+
+ for _, validTx := range validTxs {
+ p.knownTxs.Add(validTx.ID.String())
}
- p.knownTxs.Add(tx.ID.String())
+
+ validTxs = make([]*types.Tx, 0, len(txs))
}
- return true, nil
+
+ return nil
}
func (p *peer) sendStatus(header *types.BlockHeader) error {
return result
}
+func (ps *peerSet) markTx(peerID string, txHash bc.Hash) {
+ ps.mtx.Lock()
+ peer := ps.peers[peerID]
+ ps.mtx.Unlock()
+
+ if peer == nil {
+ return
+ }
+ peer.markTransaction(&txHash)
+}
+
func (ps *peerSet) peersWithoutBlock(hash *bc.Hash) []*peer {
ps.mtx.RLock()
defer ps.mtx.RUnlock()