"time"
"github.com/golang/groupcache/lru"
+ log "github.com/sirupsen/logrus"
"github.com/bytom/consensus"
- "github.com/bytom/database/storage"
+ "github.com/bytom/event"
"github.com/bytom/protocol/bc"
"github.com/bytom/protocol/bc/types"
"github.com/bytom/protocol/state"
- log "github.com/sirupsen/logrus"
+)
+
+// msg type
+const (
+ MsgNewTx = iota
+ MsgRemoveTx
+ logModule = "protocol"
)
var (
maxCachedErrTxs = 1000
- maxNewTxChSize = 1000
+ maxMsgChSize = 1000
maxNewTxNum = 10000
+ maxOrphanNum = 2000
+
+ orphanTTL = 10 * time.Minute
+ orphanExpireScanInterval = 3 * time.Minute
// ErrTransactionNotExist is the pre-defined error message
ErrTransactionNotExist = errors.New("transaction are not existed in the mempool")
// ErrPoolIsFull indicates the pool is full
ErrPoolIsFull = errors.New("transaction pool reach the max number")
+ // ErrDustTx indicates transaction is dust tx
+ ErrDustTx = errors.New("transaction is dust tx")
)
+type TxMsgEvent struct{ TxMsg *TxPoolMsg }
+
// TxDesc store tx and related info for mining strategy
type TxDesc struct {
- Tx *types.Tx
- Added time.Time
- Height uint64
- Weight uint64
- Fee uint64
- FeePerKB uint64
+ Tx *types.Tx `json:"transaction"`
+ Added time.Time `json:"-"`
+ StatusFail bool `json:"status_fail"`
+ Height uint64 `json:"-"`
+ Weight uint64 `json:"-"`
+ Fee uint64 `json:"-"`
+}
+
+// TxPoolMsg is use for notify pool changes
+type TxPoolMsg struct {
+ *TxDesc
+ MsgType int
+}
+
+type orphanTx struct {
+ *TxDesc
+ expiration time.Time
}
// TxPool is use for store the unconfirmed transaction
type TxPool struct {
- lastUpdated int64
- mtx sync.RWMutex
- pool map[bc.Hash]*TxDesc
- utxo map[bc.Hash]bc.Hash
- errCache *lru.Cache
- newTxCh chan *types.Tx
+ lastUpdated int64
+ mtx sync.RWMutex
+ store Store
+ pool map[bc.Hash]*TxDesc
+ utxo map[bc.Hash]*types.Tx
+ orphans map[bc.Hash]*orphanTx
+ orphansByPrev map[bc.Hash]map[bc.Hash]*orphanTx
+ errCache *lru.Cache
+ eventDispatcher *event.Dispatcher
}
// NewTxPool init a new TxPool
-func NewTxPool() *TxPool {
- return &TxPool{
- lastUpdated: time.Now().Unix(),
- pool: make(map[bc.Hash]*TxDesc),
- utxo: make(map[bc.Hash]bc.Hash),
- errCache: lru.New(maxCachedErrTxs),
- newTxCh: make(chan *types.Tx, maxNewTxChSize),
+func NewTxPool(store Store, dispatcher *event.Dispatcher) *TxPool {
+ tp := &TxPool{
+ lastUpdated: time.Now().Unix(),
+ store: store,
+ pool: make(map[bc.Hash]*TxDesc),
+ utxo: make(map[bc.Hash]*types.Tx),
+ orphans: make(map[bc.Hash]*orphanTx),
+ orphansByPrev: make(map[bc.Hash]map[bc.Hash]*orphanTx),
+ errCache: lru.New(maxCachedErrTxs),
+ eventDispatcher: dispatcher,
}
+ go tp.orphanExpireWorker()
+ return tp
}
-// GetNewTxCh return a unconfirmed transaction feed channel
-func (tp *TxPool) GetNewTxCh() chan *types.Tx {
- return tp.newTxCh
-}
-
-// AddTransaction add a verified transaction to pool
-func (tp *TxPool) AddTransaction(tx *types.Tx, gasOnlyTx bool, height, fee uint64) (*TxDesc, error) {
+// AddErrCache add a failed transaction record to lru cache
+func (tp *TxPool) AddErrCache(txHash *bc.Hash, err error) {
tp.mtx.Lock()
defer tp.mtx.Unlock()
- if len(tp.pool) >= maxNewTxNum {
- return nil, ErrPoolIsFull
- }
-
- txD := &TxDesc{
- Tx: tx,
- Added: time.Now(),
- Weight: tx.TxData.SerializedSize,
- Height: height,
- Fee: fee,
- FeePerKB: fee * 1000 / tx.TxHeader.SerializedSize,
- }
-
- tp.pool[tx.Tx.ID] = txD
- atomic.StoreInt64(&tp.lastUpdated, time.Now().Unix())
-
- for _, id := range tx.TxHeader.ResultIds {
- output, err := tx.Output(*id)
- if err != nil {
- // error due to it's a retirement, utxo doesn't care this output type so skip it
- continue
- }
- if !gasOnlyTx || *output.Source.Value.AssetId == *consensus.BTMAssetID {
- tp.utxo[*id] = tx.Tx.ID
- }
- }
-
- tp.newTxCh <- tx
- log.WithField("tx_id", tx.Tx.ID.String()).Debug("Add tx to mempool")
- return txD, nil
+ tp.errCache.Add(txHash, err)
}
-// AddErrCache add a failed transaction record to lru cache
-func (tp *TxPool) AddErrCache(txHash *bc.Hash, err error) {
+// ExpireOrphan expire all the orphans that before the input time range
+func (tp *TxPool) ExpireOrphan(now time.Time) {
tp.mtx.Lock()
defer tp.mtx.Unlock()
- tp.errCache.Add(txHash, err)
+ for hash, orphan := range tp.orphans {
+ if orphan.expiration.Before(now) {
+ tp.removeOrphan(&hash)
+ }
+ }
}
// GetErrCache return the error of the transaction
return
}
- for _, output := range txD.Tx.TxHeader.ResultIds {
+ for _, output := range txD.Tx.ResultIds {
delete(tp.utxo, *output)
}
delete(tp.pool, *txHash)
- atomic.StoreInt64(&tp.lastUpdated, time.Now().Unix())
- log.WithField("tx_id", txHash).Debug("remove tx from mempool")
+ atomic.StoreInt64(&tp.lastUpdated, time.Now().Unix())
+ tp.eventDispatcher.Post(TxMsgEvent{TxMsg: &TxPoolMsg{TxDesc: txD, MsgType: MsgRemoveTx}})
+ log.WithFields(log.Fields{"module": logModule, "tx_id": txHash}).Debug("remove tx from mempool")
}
// GetTransaction return the TxDesc by hash
if txD, ok := tp.pool[*txHash]; ok {
return txD, nil
}
-
return nil, ErrTransactionNotExist
}
return txDs
}
-// GetTransactionUTXO return unconfirmed utxo
-func (tp *TxPool) GetTransactionUTXO(tx *bc.Tx) *state.UtxoViewpoint {
- tp.mtx.RLock()
- defer tp.mtx.RUnlock()
-
- view := state.NewUtxoViewpoint()
- for _, prevout := range tx.SpentOutputIDs {
- if _, ok := tp.utxo[prevout]; ok {
- view.Entries[prevout] = storage.NewUtxoEntry(false, 0, false)
- }
- }
- return view
-}
-
// IsTransactionInPool check wheather a transaction in pool or not
func (tp *TxPool) IsTransactionInPool(txHash *bc.Hash) bool {
tp.mtx.RLock()
defer tp.mtx.RUnlock()
- if _, ok := tp.pool[*txHash]; ok {
- return true
- }
- return false
+ _, ok := tp.pool[*txHash]
+ return ok
}
// IsTransactionInErrCache check wheather a transaction in errCache or not
return tp.IsTransactionInPool(txHash) || tp.IsTransactionInErrCache(txHash)
}
-// Count return number of transcation in pool
-func (tp *TxPool) Count() int {
- tp.mtx.RLock()
- defer tp.mtx.RUnlock()
+func isTransactionNoBtmInput(tx *types.Tx) bool {
+ for _, input := range tx.TxData.Inputs {
+ if input.AssetID() == *consensus.BTMAssetID {
+ return false
+ }
+ }
+ return true
+}
+
+func (tp *TxPool) IsDust(tx *types.Tx) bool {
+ return isTransactionNoBtmInput(tx)
+}
+
+func (tp *TxPool) processTransaction(tx *types.Tx, statusFail bool, height, fee uint64) (bool, error) {
+ tp.mtx.Lock()
+ defer tp.mtx.Unlock()
+
+ txD := &TxDesc{
+ Tx: tx,
+ StatusFail: statusFail,
+ Weight: tx.SerializedSize,
+ Height: height,
+ Fee: fee,
+ }
+ requireParents, err := tp.checkOrphanUtxos(tx)
+ if err != nil {
+ return false, err
+ }
+
+ if len(requireParents) > 0 {
+ return true, tp.addOrphan(txD, requireParents)
+ }
+
+ if err := tp.addTransaction(txD); err != nil {
+ return false, err
+ }
+
+ tp.processOrphans(txD)
+ return false, nil
+}
+
+// ProcessTransaction is the main entry for txpool handle new tx, ignore dust tx.
+func (tp *TxPool) ProcessTransaction(tx *types.Tx, statusFail bool, height, fee uint64) (bool, error) {
+ if tp.IsDust(tx) {
+ log.WithFields(log.Fields{"module": logModule, "tx_id": tx.ID.String()}).Warn("dust tx")
+ return false, nil
+ }
+ return tp.processTransaction(tx, statusFail, height, fee)
+}
+
+func (tp *TxPool) addOrphan(txD *TxDesc, requireParents []*bc.Hash) error {
+ if len(tp.orphans) >= maxOrphanNum {
+ return ErrPoolIsFull
+ }
+
+ orphan := &orphanTx{txD, time.Now().Add(orphanTTL)}
+ tp.orphans[txD.Tx.ID] = orphan
+ for _, hash := range requireParents {
+ if _, ok := tp.orphansByPrev[*hash]; !ok {
+ tp.orphansByPrev[*hash] = make(map[bc.Hash]*orphanTx)
+ }
+ tp.orphansByPrev[*hash][txD.Tx.ID] = orphan
+ }
+ return nil
+}
+
+func (tp *TxPool) addTransaction(txD *TxDesc) error {
+ if len(tp.pool) >= maxNewTxNum {
+ return ErrPoolIsFull
+ }
+
+ tx := txD.Tx
+ txD.Added = time.Now()
+ tp.pool[tx.ID] = txD
+ for _, id := range tx.ResultIds {
+ output, err := tx.Output(*id)
+ if err != nil {
+ // error due to it's a retirement, utxo doesn't care this output type so skip it
+ continue
+ }
+ if !txD.StatusFail || *output.Source.Value.AssetId == *consensus.BTMAssetID {
+ tp.utxo[*id] = tx
+ }
+ }
+
+ atomic.StoreInt64(&tp.lastUpdated, time.Now().Unix())
+ tp.eventDispatcher.Post(TxMsgEvent{TxMsg: &TxPoolMsg{TxDesc: txD, MsgType: MsgNewTx}})
+ log.WithFields(log.Fields{"module": logModule, "tx_id": tx.ID.String()}).Debug("Add tx to mempool")
+ return nil
+}
- count := len(tp.pool)
- return count
+func (tp *TxPool) checkOrphanUtxos(tx *types.Tx) ([]*bc.Hash, error) {
+ view := state.NewUtxoViewpoint()
+ if err := tp.store.GetTransactionsUtxo(view, []*bc.Tx{tx.Tx}); err != nil {
+ return nil, err
+ }
+
+ hashes := []*bc.Hash{}
+ for _, hash := range tx.SpentOutputIDs {
+ if !view.CanSpend(&hash) && tp.utxo[hash] == nil {
+ hashes = append(hashes, &hash)
+ }
+ }
+ return hashes, nil
+}
+
+func (tp *TxPool) orphanExpireWorker() {
+ ticker := time.NewTicker(orphanExpireScanInterval)
+ for now := range ticker.C {
+ tp.ExpireOrphan(now)
+ }
+}
+
+func (tp *TxPool) processOrphans(txD *TxDesc) {
+ processOrphans := []*orphanTx{}
+ addRely := func(tx *types.Tx) {
+ for _, outHash := range tx.ResultIds {
+ orphans, ok := tp.orphansByPrev[*outHash]
+ if !ok {
+ continue
+ }
+
+ for _, orphan := range orphans {
+ processOrphans = append(processOrphans, orphan)
+ }
+ delete(tp.orphansByPrev, *outHash)
+ }
+ }
+
+ addRely(txD.Tx)
+ for ; len(processOrphans) > 0; processOrphans = processOrphans[1:] {
+ processOrphan := processOrphans[0]
+ requireParents, err := tp.checkOrphanUtxos(processOrphan.Tx)
+ if err != nil {
+ log.WithFields(log.Fields{"module": logModule, "err": err}).Error("processOrphans got unexpect error")
+ continue
+ }
+
+ if len(requireParents) == 0 {
+ addRely(processOrphan.Tx)
+ tp.removeOrphan(&processOrphan.Tx.ID)
+ tp.addTransaction(processOrphan.TxDesc)
+ }
+ }
+}
+
+func (tp *TxPool) removeOrphan(hash *bc.Hash) {
+ orphan, ok := tp.orphans[*hash]
+ if !ok {
+ return
+ }
+
+ for _, spend := range orphan.Tx.SpentOutputIDs {
+ orphans, ok := tp.orphansByPrev[spend]
+ if !ok {
+ continue
+ }
+
+ if delete(orphans, *hash); len(orphans) == 0 {
+ delete(tp.orphansByPrev, spend)
+ }
+ }
+ delete(tp.orphans, *hash)
}