OSDN Git Service

Txpool upgrade (#327)
[bytom/bytom.git] / protocol / mempool.go
index 4833e02..0c8d788 100644 (file)
@@ -6,17 +6,26 @@ import (
        "sync/atomic"
        "time"
 
+       "github.com/golang/groupcache/lru"
+
+       "github.com/bytom/blockchain/txdb/storage"
+       "github.com/bytom/consensus"
        "github.com/bytom/protocol/bc"
        "github.com/bytom/protocol/bc/legacy"
-       "github.com/golang/groupcache/lru"
+       "github.com/bytom/protocol/state"
+       log "github.com/sirupsen/logrus"
 )
 
 var (
        maxCachedErrTxs = 1000
-
+       maxNewTxChSize  = 1000
+       maxNewTxNum     = 10000
+       // ErrTransactionNotExist is the pre-defined error message
        ErrTransactionNotExist = errors.New("transaction are not existed in the mempool")
+       ErrPoolIsFull          = errors.New("transaction pool reach the max number")
 )
 
+// TxDesc store tx and related info for mining strategy
 type TxDesc struct {
        Tx       *legacy.Tx
        Added    time.Time
@@ -26,56 +35,108 @@ type TxDesc struct {
        FeePerKB uint64
 }
 
+// TxPool is use for store the unconfirmed transaction
 type TxPool struct {
        lastUpdated int64
        mtx         sync.RWMutex
        pool        map[bc.Hash]*TxDesc
+       utxo        map[bc.Hash]bc.Hash
        errCache    *lru.Cache
+       newTxCh     chan *legacy.Tx
 }
 
+// NewTxPool init a new TxPool
 func NewTxPool() *TxPool {
        return &TxPool{
                lastUpdated: time.Now().Unix(),
                pool:        make(map[bc.Hash]*TxDesc),
+               utxo:        make(map[bc.Hash]bc.Hash),
                errCache:    lru.New(maxCachedErrTxs),
+               newTxCh:     make(chan *legacy.Tx, maxNewTxChSize),
        }
 }
 
-func (mp *TxPool) AddTransaction(tx *legacy.Tx, weight, height, fee uint64) *TxDesc {
+// GetNewTxCh return a unconfirmed transaction feed channel
+func (mp *TxPool) GetNewTxCh() chan *legacy.Tx {
+       return mp.newTxCh
+}
+
+// AddTransaction add a verified transaction to pool
+func (mp *TxPool) AddTransaction(tx *legacy.Tx, gasOnlyTx bool, height, fee uint64) (*TxDesc, error) {
+       mp.mtx.Lock()
+       defer mp.mtx.Unlock()
+
+       if len(mp.pool) >= maxNewTxNum {
+               return nil, ErrPoolIsFull
+       }
+
        txD := &TxDesc{
                Tx:       tx,
                Added:    time.Now(),
-               Weight:   weight,
+               Weight:   tx.TxData.SerializedSize,
                Height:   height,
                Fee:      fee,
                FeePerKB: fee * 1000 / tx.TxHeader.SerializedSize,
        }
 
+       mp.pool[tx.Tx.ID] = txD
+       atomic.StoreInt64(&mp.lastUpdated, time.Now().Unix())
+
+       for _, id := range tx.TxHeader.ResultIds {
+               output, err := tx.Output(*id)
+               if err != nil {
+                       return nil, err
+               }
+               if !gasOnlyTx || *output.Source.Value.AssetId == *consensus.BTMAssetID {
+                       mp.utxo[*id] = tx.Tx.ID
+               }
+       }
+
+       mp.newTxCh <- tx
+       log.WithField("tx_id", tx.Tx.ID).Info("Add tx to mempool")
+       return txD, nil
+}
+
+// AddErrCache add a failed transaction record to lru cache
+func (mp *TxPool) AddErrCache(txHash *bc.Hash, err error) {
        mp.mtx.Lock()
        defer mp.mtx.Unlock()
 
-       mp.pool[tx.Tx.ID] = txD
-       atomic.StoreInt64(&mp.lastUpdated, time.Now().Unix())
-       return txD
+       mp.errCache.Add(txHash, err)
 }
 
-func (mp *TxPool) AddErrCache(txHash *bc.Hash) {
+// GetErrCache return the error of the transaction
+func (mp *TxPool) GetErrCache(txHash *bc.Hash) error {
        mp.mtx.Lock()
        defer mp.mtx.Unlock()
 
-       mp.errCache.Add(txHash, nil)
+       v, ok := mp.errCache.Get(txHash)
+       if !ok {
+               return nil
+       }
+       return v.(error)
 }
 
+// RemoveTransaction remove a transaction from the pool
 func (mp *TxPool) RemoveTransaction(txHash *bc.Hash) {
        mp.mtx.Lock()
        defer mp.mtx.Unlock()
 
-       if _, ok := mp.pool[*txHash]; ok {
-               delete(mp.pool, *txHash)
-               atomic.StoreInt64(&mp.lastUpdated, time.Now().Unix())
+       txD, ok := mp.pool[*txHash]
+       if !ok {
+               return
+       }
+
+       for _, output := range txD.Tx.TxHeader.ResultIds {
+               delete(mp.utxo, *output)
        }
+       delete(mp.pool, *txHash)
+       atomic.StoreInt64(&mp.lastUpdated, time.Now().Unix())
+
+       log.WithField("tx_id", txHash).Info("remove tx from mempool")
 }
 
+// GetTransaction return the TxDesc by hash
 func (mp *TxPool) GetTransaction(txHash *bc.Hash) (*TxDesc, error) {
        mp.mtx.RLock()
        defer mp.mtx.RUnlock()
@@ -87,6 +148,7 @@ func (mp *TxPool) GetTransaction(txHash *bc.Hash) (*TxDesc, error) {
        return nil, ErrTransactionNotExist
 }
 
+// GetTransactions return all the transactions in the pool
 func (mp *TxPool) GetTransactions() []*TxDesc {
        mp.mtx.RLock()
        defer mp.mtx.RUnlock()
@@ -100,6 +162,21 @@ func (mp *TxPool) GetTransactions() []*TxDesc {
        return txDs
 }
 
+// GetTransactionUTXO return unconfirmed utxo
+func (mp *TxPool) GetTransactionUTXO(tx *bc.Tx) *state.UtxoViewpoint {
+       mp.mtx.RLock()
+       defer mp.mtx.RUnlock()
+
+       view := state.NewUtxoViewpoint()
+       for _, prevout := range tx.SpentOutputIDs {
+               if _, ok := mp.utxo[prevout]; ok {
+                       view.Entries[prevout] = storage.NewUtxoEntry(false, 0, false)
+               }
+       }
+       return view
+}
+
+// IsTransactionInPool check wheather a transaction in pool or not
 func (mp *TxPool) IsTransactionInPool(txHash *bc.Hash) bool {
        mp.mtx.RLock()
        defer mp.mtx.RUnlock()
@@ -110,6 +187,7 @@ func (mp *TxPool) IsTransactionInPool(txHash *bc.Hash) bool {
        return false
 }
 
+// IsTransactionInErrCache check wheather a transaction in errCache or not
 func (mp *TxPool) IsTransactionInErrCache(txHash *bc.Hash) bool {
        mp.mtx.RLock()
        defer mp.mtx.RUnlock()
@@ -118,10 +196,12 @@ func (mp *TxPool) IsTransactionInErrCache(txHash *bc.Hash) bool {
        return ok
 }
 
+// HaveTransaction IsTransactionInErrCache check is  transaction in errCache or pool
 func (mp *TxPool) HaveTransaction(txHash *bc.Hash) bool {
        return mp.IsTransactionInPool(txHash) || mp.IsTransactionInErrCache(txHash)
 }
 
+// Count return number of transcation in pool
 func (mp *TxPool) Count() int {
        mp.mtx.RLock()
        defer mp.mtx.RUnlock()