OSDN Git Service

Mempool: add no btm input tx filter (#1605)
[bytom/bytom.git] / protocol / txpool.go
index c177cef..10fbbc5 100644 (file)
@@ -7,105 +7,109 @@ import (
        "time"
 
        "github.com/golang/groupcache/lru"
+       log "github.com/sirupsen/logrus"
 
        "github.com/bytom/consensus"
-       "github.com/bytom/database/storage"
+       "github.com/bytom/event"
        "github.com/bytom/protocol/bc"
        "github.com/bytom/protocol/bc/types"
        "github.com/bytom/protocol/state"
-       log "github.com/sirupsen/logrus"
+)
+
+// msg type
+const (
+       MsgNewTx = iota
+       MsgRemoveTx
+       logModule = "protocol"
 )
 
 var (
        maxCachedErrTxs = 1000
-       maxNewTxChSize  = 1000
+       maxMsgChSize    = 1000
        maxNewTxNum     = 10000
+       maxOrphanNum    = 2000
+
+       orphanTTL                = 10 * time.Minute
+       orphanExpireScanInterval = 3 * time.Minute
 
        // ErrTransactionNotExist is the pre-defined error message
        ErrTransactionNotExist = errors.New("transaction are not existed in the mempool")
        // ErrPoolIsFull indicates the pool is full
        ErrPoolIsFull = errors.New("transaction pool reach the max number")
+       // ErrDustTx indicates transaction is dust tx
+       ErrDustTx = errors.New("transaction is dust tx")
 )
 
+type TxMsgEvent struct{ TxMsg *TxPoolMsg }
+
 // TxDesc store tx and related info for mining strategy
 type TxDesc struct {
-       Tx       *types.Tx
-       Added    time.Time
-       Height   uint64
-       Weight   uint64
-       Fee      uint64
-       FeePerKB uint64
+       Tx         *types.Tx `json:"transaction"`
+       Added      time.Time `json:"-"`
+       StatusFail bool      `json:"status_fail"`
+       Height     uint64    `json:"-"`
+       Weight     uint64    `json:"-"`
+       Fee        uint64    `json:"-"`
+}
+
+// TxPoolMsg is use for notify pool changes
+type TxPoolMsg struct {
+       *TxDesc
+       MsgType int
+}
+
+type orphanTx struct {
+       *TxDesc
+       expiration time.Time
 }
 
 // TxPool is use for store the unconfirmed transaction
 type TxPool struct {
-       lastUpdated int64
-       mtx         sync.RWMutex
-       pool        map[bc.Hash]*TxDesc
-       utxo        map[bc.Hash]bc.Hash
-       errCache    *lru.Cache
-       newTxCh     chan *types.Tx
+       lastUpdated     int64
+       mtx             sync.RWMutex
+       store           Store
+       pool            map[bc.Hash]*TxDesc
+       utxo            map[bc.Hash]*types.Tx
+       orphans         map[bc.Hash]*orphanTx
+       orphansByPrev   map[bc.Hash]map[bc.Hash]*orphanTx
+       errCache        *lru.Cache
+       eventDispatcher *event.Dispatcher
 }
 
 // NewTxPool init a new TxPool
-func NewTxPool() *TxPool {
-       return &TxPool{
-               lastUpdated: time.Now().Unix(),
-               pool:        make(map[bc.Hash]*TxDesc),
-               utxo:        make(map[bc.Hash]bc.Hash),
-               errCache:    lru.New(maxCachedErrTxs),
-               newTxCh:     make(chan *types.Tx, maxNewTxChSize),
+func NewTxPool(store Store, dispatcher *event.Dispatcher) *TxPool {
+       tp := &TxPool{
+               lastUpdated:     time.Now().Unix(),
+               store:           store,
+               pool:            make(map[bc.Hash]*TxDesc),
+               utxo:            make(map[bc.Hash]*types.Tx),
+               orphans:         make(map[bc.Hash]*orphanTx),
+               orphansByPrev:   make(map[bc.Hash]map[bc.Hash]*orphanTx),
+               errCache:        lru.New(maxCachedErrTxs),
+               eventDispatcher: dispatcher,
        }
+       go tp.orphanExpireWorker()
+       return tp
 }
 
-// GetNewTxCh return a unconfirmed transaction feed channel
-func (tp *TxPool) GetNewTxCh() chan *types.Tx {
-       return tp.newTxCh
-}
-
-// AddTransaction add a verified transaction to pool
-func (tp *TxPool) AddTransaction(tx *types.Tx, gasOnlyTx bool, height, fee uint64) (*TxDesc, error) {
+// AddErrCache add a failed transaction record to lru cache
+func (tp *TxPool) AddErrCache(txHash *bc.Hash, err error) {
        tp.mtx.Lock()
        defer tp.mtx.Unlock()
 
-       if len(tp.pool) >= maxNewTxNum {
-               return nil, ErrPoolIsFull
-       }
-
-       txD := &TxDesc{
-               Tx:       tx,
-               Added:    time.Now(),
-               Weight:   tx.TxData.SerializedSize,
-               Height:   height,
-               Fee:      fee,
-               FeePerKB: fee * 1000 / tx.TxHeader.SerializedSize,
-       }
-
-       tp.pool[tx.Tx.ID] = txD
-       atomic.StoreInt64(&tp.lastUpdated, time.Now().Unix())
-
-       for _, id := range tx.TxHeader.ResultIds {
-               output, err := tx.Output(*id)
-               if err != nil {
-                       // error due to it's a retirement, utxo doesn't care this output type so skip it
-                       continue
-               }
-               if !gasOnlyTx || *output.Source.Value.AssetId == *consensus.BTMAssetID {
-                       tp.utxo[*id] = tx.Tx.ID
-               }
-       }
-
-       tp.newTxCh <- tx
-       log.WithField("tx_id", tx.Tx.ID.String()).Info("Add tx to mempool")
-       return txD, nil
+       tp.errCache.Add(txHash, err)
 }
 
-// AddErrCache add a failed transaction record to lru cache
-func (tp *TxPool) AddErrCache(txHash *bc.Hash, err error) {
+// ExpireOrphan expire all the orphans that before the input time range
+func (tp *TxPool) ExpireOrphan(now time.Time) {
        tp.mtx.Lock()
        defer tp.mtx.Unlock()
 
-       tp.errCache.Add(txHash, err)
+       for hash, orphan := range tp.orphans {
+               if orphan.expiration.Before(now) {
+                       tp.removeOrphan(&hash)
+               }
+       }
 }
 
 // GetErrCache return the error of the transaction
@@ -130,13 +134,14 @@ func (tp *TxPool) RemoveTransaction(txHash *bc.Hash) {
                return
        }
 
-       for _, output := range txD.Tx.TxHeader.ResultIds {
+       for _, output := range txD.Tx.ResultIds {
                delete(tp.utxo, *output)
        }
        delete(tp.pool, *txHash)
-       atomic.StoreInt64(&tp.lastUpdated, time.Now().Unix())
 
-       log.WithField("tx_id", txHash).Info("remove tx from mempool")
+       atomic.StoreInt64(&tp.lastUpdated, time.Now().Unix())
+       tp.eventDispatcher.Post(TxMsgEvent{TxMsg: &TxPoolMsg{TxDesc: txD, MsgType: MsgRemoveTx}})
+       log.WithFields(log.Fields{"module": logModule, "tx_id": txHash}).Debug("remove tx from mempool")
 }
 
 // GetTransaction return the TxDesc by hash
@@ -147,7 +152,6 @@ func (tp *TxPool) GetTransaction(txHash *bc.Hash) (*TxDesc, error) {
        if txD, ok := tp.pool[*txHash]; ok {
                return txD, nil
        }
-
        return nil, ErrTransactionNotExist
 }
 
@@ -165,29 +169,13 @@ func (tp *TxPool) GetTransactions() []*TxDesc {
        return txDs
 }
 
-// GetTransactionUTXO return unconfirmed utxo
-func (tp *TxPool) GetTransactionUTXO(tx *bc.Tx) *state.UtxoViewpoint {
-       tp.mtx.RLock()
-       defer tp.mtx.RUnlock()
-
-       view := state.NewUtxoViewpoint()
-       for _, prevout := range tx.SpentOutputIDs {
-               if _, ok := tp.utxo[prevout]; ok {
-                       view.Entries[prevout] = storage.NewUtxoEntry(false, 0, false)
-               }
-       }
-       return view
-}
-
 // IsTransactionInPool check wheather a transaction in pool or not
 func (tp *TxPool) IsTransactionInPool(txHash *bc.Hash) bool {
        tp.mtx.RLock()
        defer tp.mtx.RUnlock()
 
-       if _, ok := tp.pool[*txHash]; ok {
-               return true
-       }
-       return false
+       _, ok := tp.pool[*txHash]
+       return ok
 }
 
 // IsTransactionInErrCache check wheather a transaction in errCache or not
@@ -204,11 +192,167 @@ func (tp *TxPool) HaveTransaction(txHash *bc.Hash) bool {
        return tp.IsTransactionInPool(txHash) || tp.IsTransactionInErrCache(txHash)
 }
 
-// Count return number of transcation in pool
-func (tp *TxPool) Count() int {
-       tp.mtx.RLock()
-       defer tp.mtx.RUnlock()
+func isTransactionNoBtmInput(tx *types.Tx) bool {
+       for _, input := range tx.TxData.Inputs {
+               if input.AssetID() == *consensus.BTMAssetID {
+                       return false
+               }
+       }
+       return true
+}
+
+func (tp *TxPool) IsDust(tx *types.Tx) bool {
+       return isTransactionNoBtmInput(tx)
+}
+
+func (tp *TxPool) processTransaction(tx *types.Tx, statusFail bool, height, fee uint64) (bool, error) {
+       tp.mtx.Lock()
+       defer tp.mtx.Unlock()
+
+       txD := &TxDesc{
+               Tx:         tx,
+               StatusFail: statusFail,
+               Weight:     tx.SerializedSize,
+               Height:     height,
+               Fee:        fee,
+       }
+       requireParents, err := tp.checkOrphanUtxos(tx)
+       if err != nil {
+               return false, err
+       }
+
+       if len(requireParents) > 0 {
+               return true, tp.addOrphan(txD, requireParents)
+       }
+
+       if err := tp.addTransaction(txD); err != nil {
+               return false, err
+       }
+
+       tp.processOrphans(txD)
+       return false, nil
+}
+
+// ProcessTransaction is the main entry for txpool handle new tx, ignore dust tx.
+func (tp *TxPool) ProcessTransaction(tx *types.Tx, statusFail bool, height, fee uint64) (bool, error) {
+       if tp.IsDust(tx) {
+               log.WithFields(log.Fields{"module": logModule, "tx_id": tx.ID.String()}).Warn("dust tx")
+               return false, nil
+       }
+       return tp.processTransaction(tx, statusFail, height, fee)
+}
+
+func (tp *TxPool) addOrphan(txD *TxDesc, requireParents []*bc.Hash) error {
+       if len(tp.orphans) >= maxOrphanNum {
+               return ErrPoolIsFull
+       }
+
+       orphan := &orphanTx{txD, time.Now().Add(orphanTTL)}
+       tp.orphans[txD.Tx.ID] = orphan
+       for _, hash := range requireParents {
+               if _, ok := tp.orphansByPrev[*hash]; !ok {
+                       tp.orphansByPrev[*hash] = make(map[bc.Hash]*orphanTx)
+               }
+               tp.orphansByPrev[*hash][txD.Tx.ID] = orphan
+       }
+       return nil
+}
+
+func (tp *TxPool) addTransaction(txD *TxDesc) error {
+       if len(tp.pool) >= maxNewTxNum {
+               return ErrPoolIsFull
+       }
+
+       tx := txD.Tx
+       txD.Added = time.Now()
+       tp.pool[tx.ID] = txD
+       for _, id := range tx.ResultIds {
+               output, err := tx.Output(*id)
+               if err != nil {
+                       // error due to it's a retirement, utxo doesn't care this output type so skip it
+                       continue
+               }
+               if !txD.StatusFail || *output.Source.Value.AssetId == *consensus.BTMAssetID {
+                       tp.utxo[*id] = tx
+               }
+       }
+
+       atomic.StoreInt64(&tp.lastUpdated, time.Now().Unix())
+       tp.eventDispatcher.Post(TxMsgEvent{TxMsg: &TxPoolMsg{TxDesc: txD, MsgType: MsgNewTx}})
+       log.WithFields(log.Fields{"module": logModule, "tx_id": tx.ID.String()}).Debug("Add tx to mempool")
+       return nil
+}
 
-       count := len(tp.pool)
-       return count
+func (tp *TxPool) checkOrphanUtxos(tx *types.Tx) ([]*bc.Hash, error) {
+       view := state.NewUtxoViewpoint()
+       if err := tp.store.GetTransactionsUtxo(view, []*bc.Tx{tx.Tx}); err != nil {
+               return nil, err
+       }
+
+       hashes := []*bc.Hash{}
+       for _, hash := range tx.SpentOutputIDs {
+               if !view.CanSpend(&hash) && tp.utxo[hash] == nil {
+                       hashes = append(hashes, &hash)
+               }
+       }
+       return hashes, nil
+}
+
+func (tp *TxPool) orphanExpireWorker() {
+       ticker := time.NewTicker(orphanExpireScanInterval)
+       for now := range ticker.C {
+               tp.ExpireOrphan(now)
+       }
+}
+
+func (tp *TxPool) processOrphans(txD *TxDesc) {
+       processOrphans := []*orphanTx{}
+       addRely := func(tx *types.Tx) {
+               for _, outHash := range tx.ResultIds {
+                       orphans, ok := tp.orphansByPrev[*outHash]
+                       if !ok {
+                               continue
+                       }
+
+                       for _, orphan := range orphans {
+                               processOrphans = append(processOrphans, orphan)
+                       }
+                       delete(tp.orphansByPrev, *outHash)
+               }
+       }
+
+       addRely(txD.Tx)
+       for ; len(processOrphans) > 0; processOrphans = processOrphans[1:] {
+               processOrphan := processOrphans[0]
+               requireParents, err := tp.checkOrphanUtxos(processOrphan.Tx)
+               if err != nil {
+                       log.WithFields(log.Fields{"module": logModule, "err": err}).Error("processOrphans got unexpect error")
+                       continue
+               }
+
+               if len(requireParents) == 0 {
+                       addRely(processOrphan.Tx)
+                       tp.removeOrphan(&processOrphan.Tx.ID)
+                       tp.addTransaction(processOrphan.TxDesc)
+               }
+       }
+}
+
+func (tp *TxPool) removeOrphan(hash *bc.Hash) {
+       orphan, ok := tp.orphans[*hash]
+       if !ok {
+               return
+       }
+
+       for _, spend := range orphan.Tx.SpentOutputIDs {
+               orphans, ok := tp.orphansByPrev[spend]
+               if !ok {
+                       continue
+               }
+
+               if delete(orphans, *hash); len(orphans) == 0 {
+                       delete(tp.orphansByPrev, spend)
+               }
+       }
+       delete(tp.orphans, *hash)
 }