OSDN Git Service

txpool: periodically sweep pool for stale Txs (#337)
[bytom/vapor.git] / protocol / txpool.go
index cd7ebf1..09e32b1 100644 (file)
@@ -10,6 +10,7 @@ import (
        log "github.com/sirupsen/logrus"
 
        "github.com/vapor/consensus"
+       "github.com/vapor/event"
        "github.com/vapor/protocol/bc"
        "github.com/vapor/protocol/bc/types"
        "github.com/vapor/protocol/state"
@@ -19,31 +20,39 @@ import (
 const (
        MsgNewTx = iota
        MsgRemoveTx
+       logModule = "protocol"
 )
 
 var (
        maxCachedErrTxs = 1000
        maxMsgChSize    = 1000
-       maxNewTxNum     = 10000
-       maxOrphanNum    = 2000
+       maxNewTxNum     = 65536
+       maxOrphanNum    = 32768
 
-       orphanTTL                = 10 * time.Minute
-       orphanExpireScanInterval = 3 * time.Minute
+       orphanTTL                = 60 * time.Second
+       orphanExpireScanInterval = 30 * time.Second
+
+       txTTL                = 1 * time.Hour
+       txExpireScanInterval = 20 * time.Minute
 
        // ErrTransactionNotExist is the pre-defined error message
        ErrTransactionNotExist = errors.New("transaction are not existed in the mempool")
        // ErrPoolIsFull indicates the pool is full
        ErrPoolIsFull = errors.New("transaction pool reach the max number")
+       // ErrDustTx indicates transaction is dust tx
+       ErrDustTx = errors.New("transaction is dust tx")
 )
 
+type TxMsgEvent struct{ TxMsg *TxPoolMsg }
+
 // TxDesc store tx and related info for mining strategy
 type TxDesc struct {
-       Tx         *types.Tx
-       Added      time.Time
-       StatusFail bool
-       Height     uint64
-       Weight     uint64
-       Fee        uint64
+       Tx         *types.Tx `json:"transaction"`
+       Added      time.Time `json:"-"`
+       StatusFail bool      `json:"status_fail"`
+       Height     uint64    `json:"-"`
+       Weight     uint64    `json:"-"`
+       Fee        uint64    `json:"-"`
 }
 
 // TxPoolMsg is use for notify pool changes
@@ -59,32 +68,31 @@ type orphanTx struct {
 
 // TxPool is use for store the unconfirmed transaction
 type TxPool struct {
-       lastUpdated   int64
-       mtx           sync.RWMutex
-       store         Store
-       pool          map[bc.Hash]*TxDesc
-       utxo          map[bc.Hash]*types.Tx
-       orphans       map[bc.Hash]*orphanTx
-       orphansByPrev map[bc.Hash]map[bc.Hash]*orphanTx
-       claimTx       map[bc.Hash]bool
-       errCache      *lru.Cache
-       msgCh         chan *TxPoolMsg
+       lastUpdated     int64
+       mtx             sync.RWMutex
+       store           Store
+       pool            map[bc.Hash]*TxDesc
+       utxo            map[bc.Hash]*types.Tx
+       orphans         map[bc.Hash]*orphanTx
+       orphansByPrev   map[bc.Hash]map[bc.Hash]*orphanTx
+       errCache        *lru.Cache
+       eventDispatcher *event.Dispatcher
 }
 
 // NewTxPool init a new TxPool
-func NewTxPool(store Store) *TxPool {
+func NewTxPool(store Store, dispatcher *event.Dispatcher) *TxPool {
        tp := &TxPool{
-               lastUpdated:   time.Now().Unix(),
-               store:         store,
-               pool:          make(map[bc.Hash]*TxDesc),
-               utxo:          make(map[bc.Hash]*types.Tx),
-               orphans:       make(map[bc.Hash]*orphanTx),
-               orphansByPrev: make(map[bc.Hash]map[bc.Hash]*orphanTx),
-               claimTx:       make(map[bc.Hash]bool),
-               errCache:      lru.New(maxCachedErrTxs),
-               msgCh:         make(chan *TxPoolMsg, maxMsgChSize),
+               lastUpdated:     time.Now().Unix(),
+               store:           store,
+               pool:            make(map[bc.Hash]*TxDesc),
+               utxo:            make(map[bc.Hash]*types.Tx),
+               orphans:         make(map[bc.Hash]*orphanTx),
+               orphansByPrev:   make(map[bc.Hash]map[bc.Hash]*orphanTx),
+               errCache:        lru.New(maxCachedErrTxs),
+               eventDispatcher: dispatcher,
        }
        go tp.orphanExpireWorker()
+       go tp.txExpireWorker()
        return tp
 }
 
@@ -96,8 +104,8 @@ func (tp *TxPool) AddErrCache(txHash *bc.Hash, err error) {
        tp.errCache.Add(txHash, err)
 }
 
-// ExpireOrphan expire all the orphans that before the input time range
-func (tp *TxPool) ExpireOrphan(now time.Time) {
+// expireOrphan expire all the orphans that before the input time range
+func (tp *TxPool) expireOrphan(now time.Time) {
        tp.mtx.Lock()
        defer tp.mtx.Unlock()
 
@@ -120,30 +128,29 @@ func (tp *TxPool) GetErrCache(txHash *bc.Hash) error {
        return v.(error)
 }
 
-// GetMsgCh return a unconfirmed transaction feed channel
-func (tp *TxPool) GetMsgCh() <-chan *TxPoolMsg {
-       return tp.msgCh
-}
-
 // RemoveTransaction remove a transaction from the pool
 func (tp *TxPool) RemoveTransaction(txHash *bc.Hash) {
        tp.mtx.Lock()
        defer tp.mtx.Unlock()
 
+       if txD := tp.removeTransaction(txHash); txD != nil {
+               atomic.StoreInt64(&tp.lastUpdated, time.Now().Unix())
+               tp.eventDispatcher.Post(TxMsgEvent{TxMsg: &TxPoolMsg{TxDesc: txD, MsgType: MsgRemoveTx}})
+               log.WithFields(log.Fields{"module": logModule, "tx_id": txHash}).Debug("remove tx from mempool")
+       }
+}
+
+func (tp *TxPool) removeTransaction(txHash *bc.Hash) *TxDesc {
        txD, ok := tp.pool[*txHash]
        if !ok {
-               return
+               return nil
        }
 
        for _, output := range txD.Tx.ResultIds {
                delete(tp.utxo, *output)
        }
-       tp.removeClaimTx(txD.Tx)
        delete(tp.pool, *txHash)
-
-       atomic.StoreInt64(&tp.lastUpdated, time.Now().Unix())
-       tp.msgCh <- &TxPoolMsg{TxDesc: txD, MsgType: MsgRemoveTx}
-       log.WithField("tx_id", txHash).Debug("remove tx from mempool")
+       return txD
 }
 
 // GetTransaction return the TxDesc by hash
@@ -194,15 +201,24 @@ func (tp *TxPool) HaveTransaction(txHash *bc.Hash) bool {
        return tp.IsTransactionInPool(txHash) || tp.IsTransactionInErrCache(txHash)
 }
 
-// ProcessTransaction is the main entry for txpool handle new tx
-func (tp *TxPool) ProcessTransaction(tx *types.Tx, statusFail bool, height, fee uint64) (bool, error) {
+func isTransactionZeroOutput(tx *types.Tx) bool {
+       for _, output := range tx.TxData.Outputs {
+               if value := output.AssetAmount(); value.Amount == uint64(0) {
+                       return true
+               }
+       }
+       return false
+}
+
+//IsDust checks if a tx has zero output
+func (tp *TxPool) IsDust(tx *types.Tx) bool {
+       return isTransactionZeroOutput(tx)
+}
+
+func (tp *TxPool) processTransaction(tx *types.Tx, statusFail bool, height, fee uint64) (bool, error) {
        tp.mtx.Lock()
        defer tp.mtx.Unlock()
 
-       if tp.IsWithdrawSpent(tx) {
-               log.WithFields(log.Fields{"module": "ProcessTransaction", "error": "pegin-already-claimed"}).Error("ProcessTransaction error")
-               return false, errors.New("pegin-already-claimed")
-       }
        txD := &TxDesc{
                Tx:         tx,
                StatusFail: statusFail,
@@ -218,6 +234,7 @@ func (tp *TxPool) ProcessTransaction(tx *types.Tx, statusFail bool, height, fee
        if len(requireParents) > 0 {
                return true, tp.addOrphan(txD, requireParents)
        }
+
        if err := tp.addTransaction(txD); err != nil {
                return false, err
        }
@@ -226,40 +243,13 @@ func (tp *TxPool) ProcessTransaction(tx *types.Tx, statusFail bool, height, fee
        return false, nil
 }
 
-func (tp *TxPool) IsWithdrawSpent(tx *types.Tx) bool {
-       for key, value := range tx.Entries {
-               switch value.(type) {
-               case *bc.Claim:
-                       _, ok := tp.claimTx[key]
-                       return tp.store.IsWithdrawSpent(&key) || ok
-               default:
-                       continue
-               }
-       }
-
-       return false
-}
-
-func (tp *TxPool) addClaimTx(tx *types.Tx) {
-       for key, value := range tx.Entries {
-               switch value.(type) {
-               case *bc.Claim:
-                       tp.claimTx[key] = true
-               default:
-                       continue
-               }
-       }
-}
-
-func (tp *TxPool) removeClaimTx(tx *types.Tx) {
-       for key, value := range tx.Entries {
-               switch value.(type) {
-               case *bc.Claim:
-                       delete(tp.claimTx, key)
-               default:
-                       continue
-               }
+// ProcessTransaction is the main entry for txpool handle new tx, ignore dust tx.
+func (tp *TxPool) ProcessTransaction(tx *types.Tx, statusFail bool, height, fee uint64) (bool, error) {
+       if tp.IsDust(tx) {
+               log.WithFields(log.Fields{"module": logModule, "tx_id": tx.ID.String()}).Warn("dust tx")
+               return false, nil
        }
+       return tp.processTransaction(tx, statusFail, height, fee)
 }
 
 func (tp *TxPool) addOrphan(txD *TxDesc, requireParents []*bc.Hash) error {
@@ -286,23 +276,30 @@ func (tp *TxPool) addTransaction(txD *TxDesc) error {
        tx := txD.Tx
        txD.Added = time.Now()
        tp.pool[tx.ID] = txD
-       // 增加一个claim id 到到claim pool中
-       tp.addClaimTx(tx)
        for _, id := range tx.ResultIds {
-               output, err := tx.Output(*id)
+               outputEntry, err := tx.Entry(*id)
                if err != nil {
-                       // error due to it's a retirement, utxo doesn't care this output type so skip it
+                       return err
+               }
+
+               var assetID bc.AssetID
+               switch output := outputEntry.(type) {
+               case *bc.IntraChainOutput:
+                       assetID = *output.Source.Value.AssetId
+               case *bc.VoteOutput:
+                       assetID = *output.Source.Value.AssetId
+               default:
                        continue
                }
-               if !txD.StatusFail || *output.Source.Value.AssetId == *consensus.BTMAssetID {
+
+               if !txD.StatusFail || assetID == *consensus.BTMAssetID {
                        tp.utxo[*id] = tx
                }
        }
 
        atomic.StoreInt64(&tp.lastUpdated, time.Now().Unix())
-       tp.msgCh <- &TxPoolMsg{TxDesc: txD, MsgType: MsgNewTx}
-
-       log.WithField("tx_id", tx.ID.String()).Debug("Add tx to mempool")
+       tp.eventDispatcher.Post(TxMsgEvent{TxMsg: &TxPoolMsg{TxDesc: txD, MsgType: MsgNewTx}})
+       log.WithFields(log.Fields{"module": logModule, "tx_id": tx.ID.String()}).Debug("Add tx to mempool")
        return nil
 }
 
@@ -323,8 +320,10 @@ func (tp *TxPool) checkOrphanUtxos(tx *types.Tx) ([]*bc.Hash, error) {
 
 func (tp *TxPool) orphanExpireWorker() {
        ticker := time.NewTicker(orphanExpireScanInterval)
+       defer ticker.Stop()
+
        for now := range ticker.C {
-               tp.ExpireOrphan(now)
+               tp.expireOrphan(now)
        }
 }
 
@@ -349,7 +348,7 @@ func (tp *TxPool) processOrphans(txD *TxDesc) {
                processOrphan := processOrphans[0]
                requireParents, err := tp.checkOrphanUtxos(processOrphan.Tx)
                if err != nil {
-                       log.WithField("err", err).Error("processOrphans got unexpect error")
+                       log.WithFields(log.Fields{"module": logModule, "err": err}).Error("processOrphans got unexpect error")
                        continue
                }
 
@@ -379,3 +378,25 @@ func (tp *TxPool) removeOrphan(hash *bc.Hash) {
        }
        delete(tp.orphans, *hash)
 }
+
+func (tp *TxPool) txExpireWorker() {
+       ticker := time.NewTicker(txExpireScanInterval)
+       defer ticker.Stop()
+
+       for now := range ticker.C {
+               tp.expireTx(now)
+       }
+}
+
+// expireTx expires all the Txs that before the input time range
+func (tp *TxPool) expireTx(now time.Time) {
+       tp.mtx.Lock()
+       defer tp.mtx.Unlock()
+
+       cutOff := now.Add(-txTTL)
+       for hash, txD := range tp.pool {
+               if txD.Added.Before(cutOff) {
+                       tp.removeTransaction(&hash)
+               }
+       }
+}