OSDN Git Service

Thanos did someting
[bytom/vapor.git] / protocol / txpool.go
diff --git a/protocol/txpool.go b/protocol/txpool.go
deleted file mode 100644 (file)
index cd7ebf1..0000000
+++ /dev/null
@@ -1,381 +0,0 @@
-package protocol
-
-import (
-       "errors"
-       "sync"
-       "sync/atomic"
-       "time"
-
-       "github.com/golang/groupcache/lru"
-       log "github.com/sirupsen/logrus"
-
-       "github.com/vapor/consensus"
-       "github.com/vapor/protocol/bc"
-       "github.com/vapor/protocol/bc/types"
-       "github.com/vapor/protocol/state"
-)
-
-// msg type
-const (
-       MsgNewTx = iota
-       MsgRemoveTx
-)
-
-var (
-       maxCachedErrTxs = 1000
-       maxMsgChSize    = 1000
-       maxNewTxNum     = 10000
-       maxOrphanNum    = 2000
-
-       orphanTTL                = 10 * time.Minute
-       orphanExpireScanInterval = 3 * time.Minute
-
-       // ErrTransactionNotExist is the pre-defined error message
-       ErrTransactionNotExist = errors.New("transaction are not existed in the mempool")
-       // ErrPoolIsFull indicates the pool is full
-       ErrPoolIsFull = errors.New("transaction pool reach the max number")
-)
-
-// TxDesc store tx and related info for mining strategy
-type TxDesc struct {
-       Tx         *types.Tx
-       Added      time.Time
-       StatusFail bool
-       Height     uint64
-       Weight     uint64
-       Fee        uint64
-}
-
-// TxPoolMsg is use for notify pool changes
-type TxPoolMsg struct {
-       *TxDesc
-       MsgType int
-}
-
-type orphanTx struct {
-       *TxDesc
-       expiration time.Time
-}
-
-// TxPool is use for store the unconfirmed transaction
-type TxPool struct {
-       lastUpdated   int64
-       mtx           sync.RWMutex
-       store         Store
-       pool          map[bc.Hash]*TxDesc
-       utxo          map[bc.Hash]*types.Tx
-       orphans       map[bc.Hash]*orphanTx
-       orphansByPrev map[bc.Hash]map[bc.Hash]*orphanTx
-       claimTx       map[bc.Hash]bool
-       errCache      *lru.Cache
-       msgCh         chan *TxPoolMsg
-}
-
-// NewTxPool init a new TxPool
-func NewTxPool(store Store) *TxPool {
-       tp := &TxPool{
-               lastUpdated:   time.Now().Unix(),
-               store:         store,
-               pool:          make(map[bc.Hash]*TxDesc),
-               utxo:          make(map[bc.Hash]*types.Tx),
-               orphans:       make(map[bc.Hash]*orphanTx),
-               orphansByPrev: make(map[bc.Hash]map[bc.Hash]*orphanTx),
-               claimTx:       make(map[bc.Hash]bool),
-               errCache:      lru.New(maxCachedErrTxs),
-               msgCh:         make(chan *TxPoolMsg, maxMsgChSize),
-       }
-       go tp.orphanExpireWorker()
-       return tp
-}
-
-// AddErrCache add a failed transaction record to lru cache
-func (tp *TxPool) AddErrCache(txHash *bc.Hash, err error) {
-       tp.mtx.Lock()
-       defer tp.mtx.Unlock()
-
-       tp.errCache.Add(txHash, err)
-}
-
-// ExpireOrphan expire all the orphans that before the input time range
-func (tp *TxPool) ExpireOrphan(now time.Time) {
-       tp.mtx.Lock()
-       defer tp.mtx.Unlock()
-
-       for hash, orphan := range tp.orphans {
-               if orphan.expiration.Before(now) {
-                       tp.removeOrphan(&hash)
-               }
-       }
-}
-
-// GetErrCache return the error of the transaction
-func (tp *TxPool) GetErrCache(txHash *bc.Hash) error {
-       tp.mtx.Lock()
-       defer tp.mtx.Unlock()
-
-       v, ok := tp.errCache.Get(txHash)
-       if !ok {
-               return nil
-       }
-       return v.(error)
-}
-
-// GetMsgCh return a unconfirmed transaction feed channel
-func (tp *TxPool) GetMsgCh() <-chan *TxPoolMsg {
-       return tp.msgCh
-}
-
-// RemoveTransaction remove a transaction from the pool
-func (tp *TxPool) RemoveTransaction(txHash *bc.Hash) {
-       tp.mtx.Lock()
-       defer tp.mtx.Unlock()
-
-       txD, ok := tp.pool[*txHash]
-       if !ok {
-               return
-       }
-
-       for _, output := range txD.Tx.ResultIds {
-               delete(tp.utxo, *output)
-       }
-       tp.removeClaimTx(txD.Tx)
-       delete(tp.pool, *txHash)
-
-       atomic.StoreInt64(&tp.lastUpdated, time.Now().Unix())
-       tp.msgCh <- &TxPoolMsg{TxDesc: txD, MsgType: MsgRemoveTx}
-       log.WithField("tx_id", txHash).Debug("remove tx from mempool")
-}
-
-// GetTransaction return the TxDesc by hash
-func (tp *TxPool) GetTransaction(txHash *bc.Hash) (*TxDesc, error) {
-       tp.mtx.RLock()
-       defer tp.mtx.RUnlock()
-
-       if txD, ok := tp.pool[*txHash]; ok {
-               return txD, nil
-       }
-       return nil, ErrTransactionNotExist
-}
-
-// GetTransactions return all the transactions in the pool
-func (tp *TxPool) GetTransactions() []*TxDesc {
-       tp.mtx.RLock()
-       defer tp.mtx.RUnlock()
-
-       txDs := make([]*TxDesc, len(tp.pool))
-       i := 0
-       for _, desc := range tp.pool {
-               txDs[i] = desc
-               i++
-       }
-       return txDs
-}
-
-// IsTransactionInPool check wheather a transaction in pool or not
-func (tp *TxPool) IsTransactionInPool(txHash *bc.Hash) bool {
-       tp.mtx.RLock()
-       defer tp.mtx.RUnlock()
-
-       _, ok := tp.pool[*txHash]
-       return ok
-}
-
-// IsTransactionInErrCache check wheather a transaction in errCache or not
-func (tp *TxPool) IsTransactionInErrCache(txHash *bc.Hash) bool {
-       tp.mtx.RLock()
-       defer tp.mtx.RUnlock()
-
-       _, ok := tp.errCache.Get(txHash)
-       return ok
-}
-
-// HaveTransaction IsTransactionInErrCache check is  transaction in errCache or pool
-func (tp *TxPool) HaveTransaction(txHash *bc.Hash) bool {
-       return tp.IsTransactionInPool(txHash) || tp.IsTransactionInErrCache(txHash)
-}
-
-// ProcessTransaction is the main entry for txpool handle new tx
-func (tp *TxPool) ProcessTransaction(tx *types.Tx, statusFail bool, height, fee uint64) (bool, error) {
-       tp.mtx.Lock()
-       defer tp.mtx.Unlock()
-
-       if tp.IsWithdrawSpent(tx) {
-               log.WithFields(log.Fields{"module": "ProcessTransaction", "error": "pegin-already-claimed"}).Error("ProcessTransaction error")
-               return false, errors.New("pegin-already-claimed")
-       }
-       txD := &TxDesc{
-               Tx:         tx,
-               StatusFail: statusFail,
-               Weight:     tx.SerializedSize,
-               Height:     height,
-               Fee:        fee,
-       }
-       requireParents, err := tp.checkOrphanUtxos(tx)
-       if err != nil {
-               return false, err
-       }
-
-       if len(requireParents) > 0 {
-               return true, tp.addOrphan(txD, requireParents)
-       }
-       if err := tp.addTransaction(txD); err != nil {
-               return false, err
-       }
-
-       tp.processOrphans(txD)
-       return false, nil
-}
-
-func (tp *TxPool) IsWithdrawSpent(tx *types.Tx) bool {
-       for key, value := range tx.Entries {
-               switch value.(type) {
-               case *bc.Claim:
-                       _, ok := tp.claimTx[key]
-                       return tp.store.IsWithdrawSpent(&key) || ok
-               default:
-                       continue
-               }
-       }
-
-       return false
-}
-
-func (tp *TxPool) addClaimTx(tx *types.Tx) {
-       for key, value := range tx.Entries {
-               switch value.(type) {
-               case *bc.Claim:
-                       tp.claimTx[key] = true
-               default:
-                       continue
-               }
-       }
-}
-
-func (tp *TxPool) removeClaimTx(tx *types.Tx) {
-       for key, value := range tx.Entries {
-               switch value.(type) {
-               case *bc.Claim:
-                       delete(tp.claimTx, key)
-               default:
-                       continue
-               }
-       }
-}
-
-func (tp *TxPool) addOrphan(txD *TxDesc, requireParents []*bc.Hash) error {
-       if len(tp.orphans) >= maxOrphanNum {
-               return ErrPoolIsFull
-       }
-
-       orphan := &orphanTx{txD, time.Now().Add(orphanTTL)}
-       tp.orphans[txD.Tx.ID] = orphan
-       for _, hash := range requireParents {
-               if _, ok := tp.orphansByPrev[*hash]; !ok {
-                       tp.orphansByPrev[*hash] = make(map[bc.Hash]*orphanTx)
-               }
-               tp.orphansByPrev[*hash][txD.Tx.ID] = orphan
-       }
-       return nil
-}
-
-func (tp *TxPool) addTransaction(txD *TxDesc) error {
-       if len(tp.pool) >= maxNewTxNum {
-               return ErrPoolIsFull
-       }
-
-       tx := txD.Tx
-       txD.Added = time.Now()
-       tp.pool[tx.ID] = txD
-       // 增加一个claim id 到到claim pool中
-       tp.addClaimTx(tx)
-       for _, id := range tx.ResultIds {
-               output, err := tx.Output(*id)
-               if err != nil {
-                       // error due to it's a retirement, utxo doesn't care this output type so skip it
-                       continue
-               }
-               if !txD.StatusFail || *output.Source.Value.AssetId == *consensus.BTMAssetID {
-                       tp.utxo[*id] = tx
-               }
-       }
-
-       atomic.StoreInt64(&tp.lastUpdated, time.Now().Unix())
-       tp.msgCh <- &TxPoolMsg{TxDesc: txD, MsgType: MsgNewTx}
-
-       log.WithField("tx_id", tx.ID.String()).Debug("Add tx to mempool")
-       return nil
-}
-
-func (tp *TxPool) checkOrphanUtxos(tx *types.Tx) ([]*bc.Hash, error) {
-       view := state.NewUtxoViewpoint()
-       if err := tp.store.GetTransactionsUtxo(view, []*bc.Tx{tx.Tx}); err != nil {
-               return nil, err
-       }
-
-       hashes := []*bc.Hash{}
-       for _, hash := range tx.SpentOutputIDs {
-               if !view.CanSpend(&hash) && tp.utxo[hash] == nil {
-                       hashes = append(hashes, &hash)
-               }
-       }
-       return hashes, nil
-}
-
-func (tp *TxPool) orphanExpireWorker() {
-       ticker := time.NewTicker(orphanExpireScanInterval)
-       for now := range ticker.C {
-               tp.ExpireOrphan(now)
-       }
-}
-
-func (tp *TxPool) processOrphans(txD *TxDesc) {
-       processOrphans := []*orphanTx{}
-       addRely := func(tx *types.Tx) {
-               for _, outHash := range tx.ResultIds {
-                       orphans, ok := tp.orphansByPrev[*outHash]
-                       if !ok {
-                               continue
-                       }
-
-                       for _, orphan := range orphans {
-                               processOrphans = append(processOrphans, orphan)
-                       }
-                       delete(tp.orphansByPrev, *outHash)
-               }
-       }
-
-       addRely(txD.Tx)
-       for ; len(processOrphans) > 0; processOrphans = processOrphans[1:] {
-               processOrphan := processOrphans[0]
-               requireParents, err := tp.checkOrphanUtxos(processOrphan.Tx)
-               if err != nil {
-                       log.WithField("err", err).Error("processOrphans got unexpect error")
-                       continue
-               }
-
-               if len(requireParents) == 0 {
-                       addRely(processOrphan.Tx)
-                       tp.removeOrphan(&processOrphan.Tx.ID)
-                       tp.addTransaction(processOrphan.TxDesc)
-               }
-       }
-}
-
-func (tp *TxPool) removeOrphan(hash *bc.Hash) {
-       orphan, ok := tp.orphans[*hash]
-       if !ok {
-               return
-       }
-
-       for _, spend := range orphan.Tx.SpentOutputIDs {
-               orphans, ok := tp.orphansByPrev[spend]
-               if !ok {
-                       continue
-               }
-
-               if delete(orphans, *hash); len(orphans) == 0 {
-                       delete(tp.orphansByPrev, spend)
-               }
-       }
-       delete(tp.orphans, *hash)
-}