OSDN Git Service

txpool: periodically sweep pool for stale Txs (#337) v1.0.0
authorapolloww <32606824+apolloww@users.noreply.github.com>
Sat, 20 Jul 2019 02:58:14 +0000 (10:58 +0800)
committerPaladz <yzhu101@uottawa.ca>
Sat, 20 Jul 2019 02:58:14 +0000 (10:58 +0800)
protocol/txpool.go
protocol/txpool_test.go

index bbcee18..09e32b1 100644 (file)
@@ -32,6 +32,9 @@ var (
        orphanTTL                = 60 * time.Second
        orphanExpireScanInterval = 30 * time.Second
 
        orphanTTL                = 60 * time.Second
        orphanExpireScanInterval = 30 * time.Second
 
+       txTTL                = 1 * time.Hour
+       txExpireScanInterval = 20 * time.Minute
+
        // ErrTransactionNotExist is the pre-defined error message
        ErrTransactionNotExist = errors.New("transaction are not existed in the mempool")
        // ErrPoolIsFull indicates the pool is full
        // ErrTransactionNotExist is the pre-defined error message
        ErrTransactionNotExist = errors.New("transaction are not existed in the mempool")
        // ErrPoolIsFull indicates the pool is full
@@ -89,6 +92,7 @@ func NewTxPool(store Store, dispatcher *event.Dispatcher) *TxPool {
                eventDispatcher: dispatcher,
        }
        go tp.orphanExpireWorker()
                eventDispatcher: dispatcher,
        }
        go tp.orphanExpireWorker()
+       go tp.txExpireWorker()
        return tp
 }
 
        return tp
 }
 
@@ -100,8 +104,8 @@ func (tp *TxPool) AddErrCache(txHash *bc.Hash, err error) {
        tp.errCache.Add(txHash, err)
 }
 
        tp.errCache.Add(txHash, err)
 }
 
-// ExpireOrphan expire all the orphans that before the input time range
-func (tp *TxPool) ExpireOrphan(now time.Time) {
+// expireOrphan expire all the orphans that before the input time range
+func (tp *TxPool) expireOrphan(now time.Time) {
        tp.mtx.Lock()
        defer tp.mtx.Unlock()
 
        tp.mtx.Lock()
        defer tp.mtx.Unlock()
 
@@ -129,19 +133,24 @@ func (tp *TxPool) RemoveTransaction(txHash *bc.Hash) {
        tp.mtx.Lock()
        defer tp.mtx.Unlock()
 
        tp.mtx.Lock()
        defer tp.mtx.Unlock()
 
+       if txD := tp.removeTransaction(txHash); txD != nil {
+               atomic.StoreInt64(&tp.lastUpdated, time.Now().Unix())
+               tp.eventDispatcher.Post(TxMsgEvent{TxMsg: &TxPoolMsg{TxDesc: txD, MsgType: MsgRemoveTx}})
+               log.WithFields(log.Fields{"module": logModule, "tx_id": txHash}).Debug("remove tx from mempool")
+       }
+}
+
+func (tp *TxPool) removeTransaction(txHash *bc.Hash) *TxDesc {
        txD, ok := tp.pool[*txHash]
        if !ok {
        txD, ok := tp.pool[*txHash]
        if !ok {
-               return
+               return nil
        }
 
        for _, output := range txD.Tx.ResultIds {
                delete(tp.utxo, *output)
        }
        delete(tp.pool, *txHash)
        }
 
        for _, output := range txD.Tx.ResultIds {
                delete(tp.utxo, *output)
        }
        delete(tp.pool, *txHash)
-
-       atomic.StoreInt64(&tp.lastUpdated, time.Now().Unix())
-       tp.eventDispatcher.Post(TxMsgEvent{TxMsg: &TxPoolMsg{TxDesc: txD, MsgType: MsgRemoveTx}})
-       log.WithFields(log.Fields{"module": logModule, "tx_id": txHash}).Debug("remove tx from mempool")
+       return txD
 }
 
 // GetTransaction return the TxDesc by hash
 }
 
 // GetTransaction return the TxDesc by hash
@@ -201,6 +210,7 @@ func isTransactionZeroOutput(tx *types.Tx) bool {
        return false
 }
 
        return false
 }
 
+//IsDust checks if a tx has zero output
 func (tp *TxPool) IsDust(tx *types.Tx) bool {
        return isTransactionZeroOutput(tx)
 }
 func (tp *TxPool) IsDust(tx *types.Tx) bool {
        return isTransactionZeroOutput(tx)
 }
@@ -313,7 +323,7 @@ func (tp *TxPool) orphanExpireWorker() {
        defer ticker.Stop()
 
        for now := range ticker.C {
        defer ticker.Stop()
 
        for now := range ticker.C {
-               tp.ExpireOrphan(now)
+               tp.expireOrphan(now)
        }
 }
 
        }
 }
 
@@ -368,3 +378,25 @@ func (tp *TxPool) removeOrphan(hash *bc.Hash) {
        }
        delete(tp.orphans, *hash)
 }
        }
        delete(tp.orphans, *hash)
 }
+
+func (tp *TxPool) txExpireWorker() {
+       ticker := time.NewTicker(txExpireScanInterval)
+       defer ticker.Stop()
+
+       for now := range ticker.C {
+               tp.expireTx(now)
+       }
+}
+
+// expireTx expires all the Txs that before the input time range
+func (tp *TxPool) expireTx(now time.Time) {
+       tp.mtx.Lock()
+       defer tp.mtx.Unlock()
+
+       cutOff := now.Add(-txTTL)
+       for hash, txD := range tp.pool {
+               if txD.Added.Before(cutOff) {
+                       tp.removeTransaction(&hash)
+               }
+       }
+}
index 9af97ef..a2e63a2 100644 (file)
@@ -422,7 +422,7 @@ func TestExpireOrphan(t *testing.T) {
                },
        }
 
                },
        }
 
-       before.ExpireOrphan(time.Unix(1633479701, 0))
+       before.expireOrphan(time.Unix(1633479701, 0))
        if !testutil.DeepEqual(before, want) {
                t.Errorf("got %v want %v", before, want)
        }
        if !testutil.DeepEqual(before, want) {
                t.Errorf("got %v want %v", before, want)
        }