X-Git-Url: http://git.osdn.net/view?p=bytom%2Fvapor.git;a=blobdiff_plain;f=protocol%2Ftxpool.go;h=09e32b16b1994a5a3d5aac788214f8ef06b3a9a7;hp=bbcee181a3b31abbd6a70098faef0ce09784c4db;hb=3a6cd9640bdb0eed33451a5af7a0e12cfe423165;hpb=6deb17f8c788a61804614bb25837b9bc3980c2aa diff --git a/protocol/txpool.go b/protocol/txpool.go index bbcee181..09e32b16 100644 --- a/protocol/txpool.go +++ b/protocol/txpool.go @@ -32,6 +32,9 @@ var ( orphanTTL = 60 * time.Second orphanExpireScanInterval = 30 * time.Second + txTTL = 1 * time.Hour + txExpireScanInterval = 20 * time.Minute + // ErrTransactionNotExist is the pre-defined error message ErrTransactionNotExist = errors.New("transaction are not existed in the mempool") // ErrPoolIsFull indicates the pool is full @@ -89,6 +92,7 @@ func NewTxPool(store Store, dispatcher *event.Dispatcher) *TxPool { eventDispatcher: dispatcher, } go tp.orphanExpireWorker() + go tp.txExpireWorker() return tp } @@ -100,8 +104,8 @@ func (tp *TxPool) AddErrCache(txHash *bc.Hash, err error) { tp.errCache.Add(txHash, err) } -// ExpireOrphan expire all the orphans that before the input time range -func (tp *TxPool) ExpireOrphan(now time.Time) { +// expireOrphan expire all the orphans that before the input time range +func (tp *TxPool) expireOrphan(now time.Time) { tp.mtx.Lock() defer tp.mtx.Unlock() @@ -129,19 +133,24 @@ func (tp *TxPool) RemoveTransaction(txHash *bc.Hash) { tp.mtx.Lock() defer tp.mtx.Unlock() + if txD := tp.removeTransaction(txHash); txD != nil { + atomic.StoreInt64(&tp.lastUpdated, time.Now().Unix()) + tp.eventDispatcher.Post(TxMsgEvent{TxMsg: &TxPoolMsg{TxDesc: txD, MsgType: MsgRemoveTx}}) + log.WithFields(log.Fields{"module": logModule, "tx_id": txHash}).Debug("remove tx from mempool") + } +} + +func (tp *TxPool) removeTransaction(txHash *bc.Hash) *TxDesc { txD, ok := tp.pool[*txHash] if !ok { - return + return nil } for _, output := range txD.Tx.ResultIds { delete(tp.utxo, *output) } delete(tp.pool, *txHash) - - atomic.StoreInt64(&tp.lastUpdated, time.Now().Unix()) - tp.eventDispatcher.Post(TxMsgEvent{TxMsg: &TxPoolMsg{TxDesc: txD, MsgType: MsgRemoveTx}}) - log.WithFields(log.Fields{"module": logModule, "tx_id": txHash}).Debug("remove tx from mempool") + return txD } // GetTransaction return the TxDesc by hash @@ -201,6 +210,7 @@ func isTransactionZeroOutput(tx *types.Tx) bool { return false } +//IsDust checks if a tx has zero output func (tp *TxPool) IsDust(tx *types.Tx) bool { return isTransactionZeroOutput(tx) } @@ -313,7 +323,7 @@ func (tp *TxPool) orphanExpireWorker() { defer ticker.Stop() for now := range ticker.C { - tp.ExpireOrphan(now) + tp.expireOrphan(now) } } @@ -368,3 +378,25 @@ func (tp *TxPool) removeOrphan(hash *bc.Hash) { } delete(tp.orphans, *hash) } + +func (tp *TxPool) txExpireWorker() { + ticker := time.NewTicker(txExpireScanInterval) + defer ticker.Stop() + + for now := range ticker.C { + tp.expireTx(now) + } +} + +// expireTx expires all the Txs that before the input time range +func (tp *TxPool) expireTx(now time.Time) { + tp.mtx.Lock() + defer tp.mtx.Unlock() + + cutOff := now.Add(-txTTL) + for hash, txD := range tp.pool { + if txD.Added.Before(cutOff) { + tp.removeTransaction(&hash) + } + } +}