log "github.com/sirupsen/logrus"
"github.com/vapor/consensus"
+ "github.com/vapor/event"
"github.com/vapor/protocol/bc"
"github.com/vapor/protocol/bc/types"
"github.com/vapor/protocol/state"
const (
MsgNewTx = iota
MsgRemoveTx
+ logModule = "protocol"
)
var (
ErrTransactionNotExist = errors.New("transaction are not existed in the mempool")
// ErrPoolIsFull indicates the pool is full
ErrPoolIsFull = errors.New("transaction pool reach the max number")
+ // ErrDustTx indicates transaction is dust tx
+ ErrDustTx = errors.New("transaction is dust tx")
)
+type TxMsgEvent struct{ TxMsg *TxPoolMsg }
+
// TxDesc store tx and related info for mining strategy
type TxDesc struct {
- Tx *types.Tx
- Added time.Time
- StatusFail bool
- Height uint64
- Weight uint64
- Fee uint64
+ Tx *types.Tx `json:"transaction"`
+ Added time.Time `json:"-"`
+ StatusFail bool `json:"status_fail"`
+ Height uint64 `json:"-"`
+ Weight uint64 `json:"-"`
+ Fee uint64 `json:"-"`
}
// TxPoolMsg is use for notify pool changes
// TxPool is use for store the unconfirmed transaction
type TxPool struct {
- lastUpdated int64
- mtx sync.RWMutex
- store Store
- pool map[bc.Hash]*TxDesc
- utxo map[bc.Hash]*types.Tx
- orphans map[bc.Hash]*orphanTx
- orphansByPrev map[bc.Hash]map[bc.Hash]*orphanTx
- claimTx map[bc.Hash]bool
- errCache *lru.Cache
- msgCh chan *TxPoolMsg
+ lastUpdated int64
+ mtx sync.RWMutex
+ store Store
+ pool map[bc.Hash]*TxDesc
+ utxo map[bc.Hash]*types.Tx
+ orphans map[bc.Hash]*orphanTx
+ orphansByPrev map[bc.Hash]map[bc.Hash]*orphanTx
+ errCache *lru.Cache
+ eventDispatcher *event.Dispatcher
}
// NewTxPool init a new TxPool
-func NewTxPool(store Store) *TxPool {
+func NewTxPool(store Store, dispatcher *event.Dispatcher) *TxPool {
tp := &TxPool{
- lastUpdated: time.Now().Unix(),
- store: store,
- pool: make(map[bc.Hash]*TxDesc),
- utxo: make(map[bc.Hash]*types.Tx),
- orphans: make(map[bc.Hash]*orphanTx),
- orphansByPrev: make(map[bc.Hash]map[bc.Hash]*orphanTx),
- claimTx: make(map[bc.Hash]bool),
- errCache: lru.New(maxCachedErrTxs),
- msgCh: make(chan *TxPoolMsg, maxMsgChSize),
+ lastUpdated: time.Now().Unix(),
+ store: store,
+ pool: make(map[bc.Hash]*TxDesc),
+ utxo: make(map[bc.Hash]*types.Tx),
+ orphans: make(map[bc.Hash]*orphanTx),
+ orphansByPrev: make(map[bc.Hash]map[bc.Hash]*orphanTx),
+ errCache: lru.New(maxCachedErrTxs),
+ eventDispatcher: dispatcher,
}
go tp.orphanExpireWorker()
return tp
return v.(error)
}
-// GetMsgCh return a unconfirmed transaction feed channel
-func (tp *TxPool) GetMsgCh() <-chan *TxPoolMsg {
- return tp.msgCh
-}
-
// RemoveTransaction remove a transaction from the pool
func (tp *TxPool) RemoveTransaction(txHash *bc.Hash) {
tp.mtx.Lock()
for _, output := range txD.Tx.ResultIds {
delete(tp.utxo, *output)
}
- tp.removeClaimTx(txD.Tx)
delete(tp.pool, *txHash)
atomic.StoreInt64(&tp.lastUpdated, time.Now().Unix())
- tp.msgCh <- &TxPoolMsg{TxDesc: txD, MsgType: MsgRemoveTx}
- log.WithField("tx_id", txHash).Debug("remove tx from mempool")
+ tp.eventDispatcher.Post(TxMsgEvent{TxMsg: &TxPoolMsg{TxDesc: txD, MsgType: MsgRemoveTx}})
+ log.WithFields(log.Fields{"module": logModule, "tx_id": txHash}).Debug("remove tx from mempool")
}
// GetTransaction return the TxDesc by hash
return tp.IsTransactionInPool(txHash) || tp.IsTransactionInErrCache(txHash)
}
-// ProcessTransaction is the main entry for txpool handle new tx
-func (tp *TxPool) ProcessTransaction(tx *types.Tx, statusFail bool, height, fee uint64) (bool, error) {
+func isTransactionNoBtmInput(tx *types.Tx) bool {
+ for _, input := range tx.TxData.Inputs {
+ switch input.InputType() {
+ case types.CrossChainInputType:
+ return false
+ }
+ if input.AssetID() == *consensus.BTMAssetID {
+ return false
+ }
+ }
+
+ return true
+}
+
+func isTransactionZeroOutput(tx *types.Tx) bool {
+ for _, output := range tx.TxData.Outputs {
+ if value := output.AssetAmount(); value.Amount == uint64(0) {
+ return true
+ }
+ }
+ return false
+}
+
+func (tp *TxPool) IsDust(tx *types.Tx) bool {
+ return isTransactionNoBtmInput(tx) || isTransactionZeroOutput(tx)
+}
+
+func (tp *TxPool) processTransaction(tx *types.Tx, statusFail bool, height, fee uint64) (bool, error) {
tp.mtx.Lock()
defer tp.mtx.Unlock()
- if tp.IsWithdrawSpent(tx) {
- log.WithFields(log.Fields{"module": "ProcessTransaction", "error": "pegin-already-claimed"}).Error("ProcessTransaction error")
- return false, errors.New("pegin-already-claimed")
- }
txD := &TxDesc{
Tx: tx,
StatusFail: statusFail,
if len(requireParents) > 0 {
return true, tp.addOrphan(txD, requireParents)
}
+
if err := tp.addTransaction(txD); err != nil {
return false, err
}
return false, nil
}
-func (tp *TxPool) IsWithdrawSpent(tx *types.Tx) bool {
- for key, value := range tx.Entries {
- switch value.(type) {
- case *bc.Claim:
- _, ok := tp.claimTx[key]
- return tp.store.IsWithdrawSpent(&key) || ok
- default:
- continue
- }
- }
-
- return false
-}
-
-func (tp *TxPool) addClaimTx(tx *types.Tx) {
- for key, value := range tx.Entries {
- switch value.(type) {
- case *bc.Claim:
- tp.claimTx[key] = true
- default:
- continue
- }
- }
-}
-
-func (tp *TxPool) removeClaimTx(tx *types.Tx) {
- for key, value := range tx.Entries {
- switch value.(type) {
- case *bc.Claim:
- delete(tp.claimTx, key)
- default:
- continue
- }
+// ProcessTransaction is the main entry for txpool handle new tx, ignore dust tx.
+func (tp *TxPool) ProcessTransaction(tx *types.Tx, statusFail bool, height, fee uint64) (bool, error) {
+ if tp.IsDust(tx) {
+ log.WithFields(log.Fields{"module": logModule, "tx_id": tx.ID.String()}).Warn("dust tx")
+ return false, nil
}
+ return tp.processTransaction(tx, statusFail, height, fee)
}
func (tp *TxPool) addOrphan(txD *TxDesc, requireParents []*bc.Hash) error {
tx := txD.Tx
txD.Added = time.Now()
tp.pool[tx.ID] = txD
- // 增加一个claim id 到到claim pool中
- tp.addClaimTx(tx)
for _, id := range tx.ResultIds {
- output, err := tx.Output(*id)
+ outputEntry, err := tx.Entry(*id)
if err != nil {
- // error due to it's a retirement, utxo doesn't care this output type so skip it
+ return err
+ }
+
+ var assetID bc.AssetID
+ switch output := outputEntry.(type) {
+ case *bc.IntraChainOutput:
+ assetID = *output.Source.Value.AssetId
+ case *bc.VoteOutput:
+ assetID = *output.Source.Value.AssetId
+ default:
continue
}
- if !txD.StatusFail || *output.Source.Value.AssetId == *consensus.BTMAssetID {
+
+ if !txD.StatusFail || assetID == *consensus.BTMAssetID {
tp.utxo[*id] = tx
}
}
atomic.StoreInt64(&tp.lastUpdated, time.Now().Unix())
- tp.msgCh <- &TxPoolMsg{TxDesc: txD, MsgType: MsgNewTx}
-
- log.WithField("tx_id", tx.ID.String()).Debug("Add tx to mempool")
+ tp.eventDispatcher.Post(TxMsgEvent{TxMsg: &TxPoolMsg{TxDesc: txD, MsgType: MsgNewTx}})
+ log.WithFields(log.Fields{"module": logModule, "tx_id": tx.ID.String()}).Debug("Add tx to mempool")
return nil
}
func (tp *TxPool) orphanExpireWorker() {
ticker := time.NewTicker(orphanExpireScanInterval)
+ defer ticker.Stop()
+
for now := range ticker.C {
tp.ExpireOrphan(now)
}
processOrphan := processOrphans[0]
requireParents, err := tp.checkOrphanUtxos(processOrphan.Tx)
if err != nil {
- log.WithField("err", err).Error("processOrphans got unexpect error")
+ log.WithFields(log.Fields{"module": logModule, "err": err}).Error("processOrphans got unexpect error")
continue
}