OSDN Git Service

prod config for mov (#524)
[bytom/vapor.git] / protocol / txpool.go
index 5a06c67..9413fe4 100644 (file)
@@ -9,11 +9,11 @@ import (
        "github.com/golang/groupcache/lru"
        log "github.com/sirupsen/logrus"
 
-       "github.com/vapor/consensus"
-       "github.com/vapor/event"
-       "github.com/vapor/protocol/bc"
-       "github.com/vapor/protocol/bc/types"
-       "github.com/vapor/protocol/state"
+       "github.com/bytom/vapor/consensus"
+       "github.com/bytom/vapor/event"
+       "github.com/bytom/vapor/protocol/bc"
+       "github.com/bytom/vapor/protocol/bc/types"
+       "github.com/bytom/vapor/protocol/state"
 )
 
 // msg type
@@ -26,11 +26,14 @@ const (
 var (
        maxCachedErrTxs = 1000
        maxMsgChSize    = 1000
-       maxNewTxNum     = 10000
-       maxOrphanNum    = 2000
+       maxNewTxNum     = 65536
+       maxOrphanNum    = 32768
 
-       orphanTTL                = 10 * time.Minute
-       orphanExpireScanInterval = 3 * time.Minute
+       orphanTTL                = 60 * time.Second
+       orphanExpireScanInterval = 30 * time.Second
+
+       txTTL                = 1 * time.Hour
+       txExpireScanInterval = 20 * time.Minute
 
        // ErrTransactionNotExist is the pre-defined error message
        ErrTransactionNotExist = errors.New("transaction are not existed in the mempool")
@@ -40,6 +43,12 @@ var (
        ErrDustTx = errors.New("transaction is dust tx")
 )
 
+// DustFilterer is inerface for dust transaction filter rule
+type DustFilterer interface {
+       IsDust(tx *types.Tx) bool
+}
+
+// TxMsgEvent is message wrap for subscribe event
 type TxMsgEvent struct{ TxMsg *TxPoolMsg }
 
 // TxDesc store tx and related info for mining strategy
@@ -73,11 +82,12 @@ type TxPool struct {
        orphans         map[bc.Hash]*orphanTx
        orphansByPrev   map[bc.Hash]map[bc.Hash]*orphanTx
        errCache        *lru.Cache
+       filters         []DustFilterer
        eventDispatcher *event.Dispatcher
 }
 
 // NewTxPool init a new TxPool
-func NewTxPool(store Store, dispatcher *event.Dispatcher) *TxPool {
+func NewTxPool(store Store, filters []DustFilterer, dispatcher *event.Dispatcher) *TxPool {
        tp := &TxPool{
                lastUpdated:     time.Now().Unix(),
                store:           store,
@@ -86,9 +96,11 @@ func NewTxPool(store Store, dispatcher *event.Dispatcher) *TxPool {
                orphans:         make(map[bc.Hash]*orphanTx),
                orphansByPrev:   make(map[bc.Hash]map[bc.Hash]*orphanTx),
                errCache:        lru.New(maxCachedErrTxs),
+               filters:         filters,
                eventDispatcher: dispatcher,
        }
        go tp.orphanExpireWorker()
+       go tp.txExpireWorker()
        return tp
 }
 
@@ -100,8 +112,8 @@ func (tp *TxPool) AddErrCache(txHash *bc.Hash, err error) {
        tp.errCache.Add(txHash, err)
 }
 
-// ExpireOrphan expire all the orphans that before the input time range
-func (tp *TxPool) ExpireOrphan(now time.Time) {
+// expireOrphan expire all the orphans that before the input time range
+func (tp *TxPool) expireOrphan(now time.Time) {
        tp.mtx.Lock()
        defer tp.mtx.Unlock()
 
@@ -129,19 +141,24 @@ func (tp *TxPool) RemoveTransaction(txHash *bc.Hash) {
        tp.mtx.Lock()
        defer tp.mtx.Unlock()
 
+       if txD := tp.removeTransaction(txHash); txD != nil {
+               atomic.StoreInt64(&tp.lastUpdated, time.Now().Unix())
+               tp.eventDispatcher.Post(TxMsgEvent{TxMsg: &TxPoolMsg{TxDesc: txD, MsgType: MsgRemoveTx}})
+               log.WithFields(log.Fields{"module": logModule, "tx_id": txHash}).Debug("remove tx from mempool")
+       }
+}
+
+func (tp *TxPool) removeTransaction(txHash *bc.Hash) *TxDesc {
        txD, ok := tp.pool[*txHash]
        if !ok {
-               return
+               return nil
        }
 
        for _, output := range txD.Tx.ResultIds {
                delete(tp.utxo, *output)
        }
        delete(tp.pool, *txHash)
-
-       atomic.StoreInt64(&tp.lastUpdated, time.Now().Unix())
-       tp.eventDispatcher.Post(TxMsgEvent{TxMsg: &TxPoolMsg{TxDesc: txD, MsgType: MsgRemoveTx}})
-       log.WithFields(log.Fields{"module": logModule, "tx_id": txHash}).Debug("remove tx from mempool")
+       return txD
 }
 
 // GetTransaction return the TxDesc by hash
@@ -201,8 +218,31 @@ func isTransactionZeroOutput(tx *types.Tx) bool {
        return false
 }
 
+func isNoGasStatusFail(tx *types.Tx, statusFail bool) bool {
+       if !statusFail {
+               return false
+       }
+
+       for _, input := range tx.TxData.Inputs {
+               if *consensus.BTMAssetID == input.AssetID() {
+                       return false
+               }
+       }
+       return true
+}
+
+//IsDust checks if a tx has zero output
 func (tp *TxPool) IsDust(tx *types.Tx) bool {
-       return isTransactionZeroOutput(tx)
+       if ok := isTransactionZeroOutput(tx); ok {
+               return ok
+       }
+
+       for _, filter := range tp.filters {
+               if ok := filter.IsDust(tx); ok {
+                       return ok
+               }
+       }
+       return false
 }
 
 func (tp *TxPool) processTransaction(tx *types.Tx, statusFail bool, height, fee uint64) (bool, error) {
@@ -239,6 +279,11 @@ func (tp *TxPool) ProcessTransaction(tx *types.Tx, statusFail bool, height, fee
                log.WithFields(log.Fields{"module": logModule, "tx_id": tx.ID.String()}).Warn("dust tx")
                return false, nil
        }
+
+       if isNoGasStatusFail(tx, statusFail) {
+               log.WithFields(log.Fields{"module": logModule, "tx_id": tx.ID.String()}).Warn("drop no gas status fail tx")
+               return false, nil
+       }
        return tp.processTransaction(tx, statusFail, height, fee)
 }
 
@@ -313,7 +358,7 @@ func (tp *TxPool) orphanExpireWorker() {
        defer ticker.Stop()
 
        for now := range ticker.C {
-               tp.ExpireOrphan(now)
+               tp.expireOrphan(now)
        }
 }
 
@@ -368,3 +413,25 @@ func (tp *TxPool) removeOrphan(hash *bc.Hash) {
        }
        delete(tp.orphans, *hash)
 }
+
+func (tp *TxPool) txExpireWorker() {
+       ticker := time.NewTicker(txExpireScanInterval)
+       defer ticker.Stop()
+
+       for now := range ticker.C {
+               tp.expireTx(now)
+       }
+}
+
+// expireTx expires all the Txs that before the input time range
+func (tp *TxPool) expireTx(now time.Time) {
+       tp.mtx.Lock()
+       defer tp.mtx.Unlock()
+
+       cutOff := now.Add(-txTTL)
+       for hash, txD := range tp.pool {
+               if txD.Added.Before(cutOff) {
+                       tp.removeTransaction(&hash)
+               }
+       }
+}