X-Git-Url: http://git.osdn.net/view?p=bytom%2Fvapor.git;a=blobdiff_plain;f=protocol%2Ftxpool.go;h=09e32b16b1994a5a3d5aac788214f8ef06b3a9a7;hp=912091cdf1378f3e8e055959857b2ec68f408e3c;hb=3a6cd9640bdb0eed33451a5af7a0e12cfe423165;hpb=2e4f5f11ae3bec54e4918a7aac5c8b3e2381de44 diff --git a/protocol/txpool.go b/protocol/txpool.go index 912091cd..09e32b16 100644 --- a/protocol/txpool.go +++ b/protocol/txpool.go @@ -26,11 +26,14 @@ const ( var ( maxCachedErrTxs = 1000 maxMsgChSize = 1000 - maxNewTxNum = 10000 - maxOrphanNum = 2000 + maxNewTxNum = 65536 + maxOrphanNum = 32768 - orphanTTL = 10 * time.Minute - orphanExpireScanInterval = 3 * time.Minute + orphanTTL = 60 * time.Second + orphanExpireScanInterval = 30 * time.Second + + txTTL = 1 * time.Hour + txExpireScanInterval = 20 * time.Minute // ErrTransactionNotExist is the pre-defined error message ErrTransactionNotExist = errors.New("transaction are not existed in the mempool") @@ -89,6 +92,7 @@ func NewTxPool(store Store, dispatcher *event.Dispatcher) *TxPool { eventDispatcher: dispatcher, } go tp.orphanExpireWorker() + go tp.txExpireWorker() return tp } @@ -100,8 +104,8 @@ func (tp *TxPool) AddErrCache(txHash *bc.Hash, err error) { tp.errCache.Add(txHash, err) } -// ExpireOrphan expire all the orphans that before the input time range -func (tp *TxPool) ExpireOrphan(now time.Time) { +// expireOrphan expire all the orphans that before the input time range +func (tp *TxPool) expireOrphan(now time.Time) { tp.mtx.Lock() defer tp.mtx.Unlock() @@ -129,19 +133,24 @@ func (tp *TxPool) RemoveTransaction(txHash *bc.Hash) { tp.mtx.Lock() defer tp.mtx.Unlock() + if txD := tp.removeTransaction(txHash); txD != nil { + atomic.StoreInt64(&tp.lastUpdated, time.Now().Unix()) + tp.eventDispatcher.Post(TxMsgEvent{TxMsg: &TxPoolMsg{TxDesc: txD, MsgType: MsgRemoveTx}}) + log.WithFields(log.Fields{"module": logModule, "tx_id": txHash}).Debug("remove tx from mempool") + } +} + +func (tp *TxPool) removeTransaction(txHash *bc.Hash) *TxDesc { txD, ok := tp.pool[*txHash] if !ok { - return + return nil } for _, output := range txD.Tx.ResultIds { delete(tp.utxo, *output) } delete(tp.pool, *txHash) - - atomic.StoreInt64(&tp.lastUpdated, time.Now().Unix()) - tp.eventDispatcher.Post(TxMsgEvent{TxMsg: &TxPoolMsg{TxDesc: txD, MsgType: MsgRemoveTx}}) - log.WithFields(log.Fields{"module": logModule, "tx_id": txHash}).Debug("remove tx from mempool") + return txD } // GetTransaction return the TxDesc by hash @@ -192,20 +201,6 @@ func (tp *TxPool) HaveTransaction(txHash *bc.Hash) bool { return tp.IsTransactionInPool(txHash) || tp.IsTransactionInErrCache(txHash) } -func isTransactionNoBtmInput(tx *types.Tx) bool { - for _, input := range tx.TxData.Inputs { - switch input.InputType() { - case types.CrossChainInputType: - return false - } - if input.AssetID() == *consensus.BTMAssetID { - return false - } - } - - return true -} - func isTransactionZeroOutput(tx *types.Tx) bool { for _, output := range tx.TxData.Outputs { if value := output.AssetAmount(); value.Amount == uint64(0) { @@ -215,8 +210,9 @@ func isTransactionZeroOutput(tx *types.Tx) bool { return false } +//IsDust checks if a tx has zero output func (tp *TxPool) IsDust(tx *types.Tx) bool { - return isTransactionNoBtmInput(tx) || isTransactionZeroOutput(tx) + return isTransactionZeroOutput(tx) } func (tp *TxPool) processTransaction(tx *types.Tx, statusFail bool, height, fee uint64) (bool, error) { @@ -327,7 +323,7 @@ func (tp *TxPool) orphanExpireWorker() { defer ticker.Stop() for now := range ticker.C { - tp.ExpireOrphan(now) + tp.expireOrphan(now) } } @@ -382,3 +378,25 @@ func (tp *TxPool) removeOrphan(hash *bc.Hash) { } delete(tp.orphans, *hash) } + +func (tp *TxPool) txExpireWorker() { + ticker := time.NewTicker(txExpireScanInterval) + defer ticker.Stop() + + for now := range ticker.C { + tp.expireTx(now) + } +} + +// expireTx expires all the Txs that before the input time range +func (tp *TxPool) expireTx(now time.Time) { + tp.mtx.Lock() + defer tp.mtx.Unlock() + + cutOff := now.Add(-txTTL) + for hash, txD := range tp.pool { + if txD.Added.Before(cutOff) { + tp.removeTransaction(&hash) + } + } +}