var (
maxCachedErrTxs = 1000
maxMsgChSize = 1000
- maxNewTxNum = 10000
- maxOrphanNum = 2000
+ maxNewTxNum = 65536
+ maxOrphanNum = 32768
- orphanTTL = 10 * time.Minute
- orphanExpireScanInterval = 3 * time.Minute
+ orphanTTL = 60 * time.Second
+ orphanExpireScanInterval = 30 * time.Second
+
+ txTTL = 1 * time.Hour
+ txExpireScanInterval = 20 * time.Minute
// ErrTransactionNotExist is the pre-defined error message
ErrTransactionNotExist = errors.New("transaction are not existed in the mempool")
eventDispatcher: dispatcher,
}
go tp.orphanExpireWorker()
+ go tp.txExpireWorker()
return tp
}
tp.errCache.Add(txHash, err)
}
-// ExpireOrphan expire all the orphans that before the input time range
-func (tp *TxPool) ExpireOrphan(now time.Time) {
+// expireOrphan expire all the orphans that before the input time range
+func (tp *TxPool) expireOrphan(now time.Time) {
tp.mtx.Lock()
defer tp.mtx.Unlock()
tp.mtx.Lock()
defer tp.mtx.Unlock()
+ if txD := tp.removeTransaction(txHash); txD != nil {
+ atomic.StoreInt64(&tp.lastUpdated, time.Now().Unix())
+ tp.eventDispatcher.Post(TxMsgEvent{TxMsg: &TxPoolMsg{TxDesc: txD, MsgType: MsgRemoveTx}})
+ log.WithFields(log.Fields{"module": logModule, "tx_id": txHash}).Debug("remove tx from mempool")
+ }
+}
+
+func (tp *TxPool) removeTransaction(txHash *bc.Hash) *TxDesc {
txD, ok := tp.pool[*txHash]
if !ok {
- return
+ return nil
}
for _, output := range txD.Tx.ResultIds {
delete(tp.utxo, *output)
}
delete(tp.pool, *txHash)
-
- atomic.StoreInt64(&tp.lastUpdated, time.Now().Unix())
- tp.eventDispatcher.Post(TxMsgEvent{TxMsg: &TxPoolMsg{TxDesc: txD, MsgType: MsgRemoveTx}})
- log.WithFields(log.Fields{"module": logModule, "tx_id": txHash}).Debug("remove tx from mempool")
+ return txD
}
// GetTransaction return the TxDesc by hash
return tp.IsTransactionInPool(txHash) || tp.IsTransactionInErrCache(txHash)
}
-func isTransactionNoBtmInput(tx *types.Tx) bool {
- for _, input := range tx.TxData.Inputs {
- if input.AssetID() == *consensus.BTMAssetID {
- return false
- }
- }
- return true
-}
-
func isTransactionZeroOutput(tx *types.Tx) bool {
for _, output := range tx.TxData.Outputs {
- if output.Amount == uint64(0) {
+ if value := output.AssetAmount(); value.Amount == uint64(0) {
return true
}
}
return false
}
+//IsDust checks if a tx has zero output
func (tp *TxPool) IsDust(tx *types.Tx) bool {
- return isTransactionNoBtmInput(tx) || isTransactionZeroOutput(tx)
+ return isTransactionZeroOutput(tx)
}
func (tp *TxPool) processTransaction(tx *types.Tx, statusFail bool, height, fee uint64) (bool, error) {
txD.Added = time.Now()
tp.pool[tx.ID] = txD
for _, id := range tx.ResultIds {
- output, err := tx.Output(*id)
+ outputEntry, err := tx.Entry(*id)
if err != nil {
- // error due to it's a retirement, utxo doesn't care this output type so skip it
+ return err
+ }
+
+ var assetID bc.AssetID
+ switch output := outputEntry.(type) {
+ case *bc.IntraChainOutput:
+ assetID = *output.Source.Value.AssetId
+ case *bc.VoteOutput:
+ assetID = *output.Source.Value.AssetId
+ default:
continue
}
- if !txD.StatusFail || *output.Source.Value.AssetId == *consensus.BTMAssetID {
+
+ if !txD.StatusFail || assetID == *consensus.BTMAssetID {
tp.utxo[*id] = tx
}
}
defer ticker.Stop()
for now := range ticker.C {
- tp.ExpireOrphan(now)
+ tp.expireOrphan(now)
}
}
}
delete(tp.orphans, *hash)
}
+
+func (tp *TxPool) txExpireWorker() {
+ ticker := time.NewTicker(txExpireScanInterval)
+ defer ticker.Stop()
+
+ for now := range ticker.C {
+ tp.expireTx(now)
+ }
+}
+
+// expireTx expires all the Txs that before the input time range
+func (tp *TxPool) expireTx(now time.Time) {
+ tp.mtx.Lock()
+ defer tp.mtx.Unlock()
+
+ cutOff := now.Add(-txTTL)
+ for hash, txD := range tp.pool {
+ if txD.Added.Before(cutOff) {
+ tp.removeTransaction(&hash)
+ }
+ }
+}