OSDN Git Service

txpool: periodically sweep pool for stale Txs (#337)
[bytom/vapor.git] / protocol / txpool.go
index 957ab6a..09e32b1 100644 (file)
@@ -26,11 +26,14 @@ const (
 var (
        maxCachedErrTxs = 1000
        maxMsgChSize    = 1000
-       maxNewTxNum     = 10000
-       maxOrphanNum    = 2000
+       maxNewTxNum     = 65536
+       maxOrphanNum    = 32768
 
-       orphanTTL                = 10 * time.Minute
-       orphanExpireScanInterval = 3 * time.Minute
+       orphanTTL                = 60 * time.Second
+       orphanExpireScanInterval = 30 * time.Second
+
+       txTTL                = 1 * time.Hour
+       txExpireScanInterval = 20 * time.Minute
 
        // ErrTransactionNotExist is the pre-defined error message
        ErrTransactionNotExist = errors.New("transaction are not existed in the mempool")
@@ -89,6 +92,7 @@ func NewTxPool(store Store, dispatcher *event.Dispatcher) *TxPool {
                eventDispatcher: dispatcher,
        }
        go tp.orphanExpireWorker()
+       go tp.txExpireWorker()
        return tp
 }
 
@@ -100,8 +104,8 @@ func (tp *TxPool) AddErrCache(txHash *bc.Hash, err error) {
        tp.errCache.Add(txHash, err)
 }
 
-// ExpireOrphan expire all the orphans that before the input time range
-func (tp *TxPool) ExpireOrphan(now time.Time) {
+// expireOrphan expire all the orphans that before the input time range
+func (tp *TxPool) expireOrphan(now time.Time) {
        tp.mtx.Lock()
        defer tp.mtx.Unlock()
 
@@ -129,19 +133,24 @@ func (tp *TxPool) RemoveTransaction(txHash *bc.Hash) {
        tp.mtx.Lock()
        defer tp.mtx.Unlock()
 
+       if txD := tp.removeTransaction(txHash); txD != nil {
+               atomic.StoreInt64(&tp.lastUpdated, time.Now().Unix())
+               tp.eventDispatcher.Post(TxMsgEvent{TxMsg: &TxPoolMsg{TxDesc: txD, MsgType: MsgRemoveTx}})
+               log.WithFields(log.Fields{"module": logModule, "tx_id": txHash}).Debug("remove tx from mempool")
+       }
+}
+
+func (tp *TxPool) removeTransaction(txHash *bc.Hash) *TxDesc {
        txD, ok := tp.pool[*txHash]
        if !ok {
-               return
+               return nil
        }
 
        for _, output := range txD.Tx.ResultIds {
                delete(tp.utxo, *output)
        }
        delete(tp.pool, *txHash)
-
-       atomic.StoreInt64(&tp.lastUpdated, time.Now().Unix())
-       tp.eventDispatcher.Post(TxMsgEvent{TxMsg: &TxPoolMsg{TxDesc: txD, MsgType: MsgRemoveTx}})
-       log.WithFields(log.Fields{"module": logModule, "tx_id": txHash}).Debug("remove tx from mempool")
+       return txD
 }
 
 // GetTransaction return the TxDesc by hash
@@ -192,27 +201,18 @@ func (tp *TxPool) HaveTransaction(txHash *bc.Hash) bool {
        return tp.IsTransactionInPool(txHash) || tp.IsTransactionInErrCache(txHash)
 }
 
-func isTransactionNoBtmInput(tx *types.Tx) bool {
-       for _, input := range tx.TxData.Inputs {
-               if input.AssetID() == *consensus.BTMAssetID {
-                       return false
-               }
-       }
-       return true
-}
-
 func isTransactionZeroOutput(tx *types.Tx) bool {
        for _, output := range tx.TxData.Outputs {
-               value := output.AssetAmount()
-               if value.Amount == uint64(0) {
+               if value := output.AssetAmount(); value.Amount == uint64(0) {
                        return true
                }
        }
        return false
 }
 
+//IsDust checks if a tx has zero output
 func (tp *TxPool) IsDust(tx *types.Tx) bool {
-       return isTransactionNoBtmInput(tx) || isTransactionZeroOutput(tx)
+       return isTransactionZeroOutput(tx)
 }
 
 func (tp *TxPool) processTransaction(tx *types.Tx, statusFail bool, height, fee uint64) (bool, error) {
@@ -277,11 +277,12 @@ func (tp *TxPool) addTransaction(txD *TxDesc) error {
        txD.Added = time.Now()
        tp.pool[tx.ID] = txD
        for _, id := range tx.ResultIds {
-               var assetID bc.AssetID
                outputEntry, err := tx.Entry(*id)
                if err != nil {
                        return err
                }
+
+               var assetID bc.AssetID
                switch output := outputEntry.(type) {
                case *bc.IntraChainOutput:
                        assetID = *output.Source.Value.AssetId
@@ -322,7 +323,7 @@ func (tp *TxPool) orphanExpireWorker() {
        defer ticker.Stop()
 
        for now := range ticker.C {
-               tp.ExpireOrphan(now)
+               tp.expireOrphan(now)
        }
 }
 
@@ -377,3 +378,25 @@ func (tp *TxPool) removeOrphan(hash *bc.Hash) {
        }
        delete(tp.orphans, *hash)
 }
+
+func (tp *TxPool) txExpireWorker() {
+       ticker := time.NewTicker(txExpireScanInterval)
+       defer ticker.Stop()
+
+       for now := range ticker.C {
+               tp.expireTx(now)
+       }
+}
+
+// expireTx expires all the Txs that before the input time range
+func (tp *TxPool) expireTx(now time.Time) {
+       tp.mtx.Lock()
+       defer tp.mtx.Unlock()
+
+       cutOff := now.Add(-txTTL)
+       for hash, txD := range tp.pool {
+               if txD.Added.Before(cutOff) {
+                       tp.removeTransaction(&hash)
+               }
+       }
+}