X-Git-Url: http://git.osdn.net/view?p=bytom%2Fvapor.git;a=blobdiff_plain;f=protocol%2Ftxpool.go;h=912091cdf1378f3e8e055959857b2ec68f408e3c;hp=cd7ebf14f7051f2ce24af93966493953ca81ab62;hb=2e4f5f11ae3bec54e4918a7aac5c8b3e2381de44;hpb=cc968002ceac2dfd7665c2ac2b4c32ab6017b525 diff --git a/protocol/txpool.go b/protocol/txpool.go index cd7ebf14..912091cd 100644 --- a/protocol/txpool.go +++ b/protocol/txpool.go @@ -10,6 +10,7 @@ import ( log "github.com/sirupsen/logrus" "github.com/vapor/consensus" + "github.com/vapor/event" "github.com/vapor/protocol/bc" "github.com/vapor/protocol/bc/types" "github.com/vapor/protocol/state" @@ -19,6 +20,7 @@ import ( const ( MsgNewTx = iota MsgRemoveTx + logModule = "protocol" ) var ( @@ -34,16 +36,20 @@ var ( ErrTransactionNotExist = errors.New("transaction are not existed in the mempool") // ErrPoolIsFull indicates the pool is full ErrPoolIsFull = errors.New("transaction pool reach the max number") + // ErrDustTx indicates transaction is dust tx + ErrDustTx = errors.New("transaction is dust tx") ) +type TxMsgEvent struct{ TxMsg *TxPoolMsg } + // TxDesc store tx and related info for mining strategy type TxDesc struct { - Tx *types.Tx - Added time.Time - StatusFail bool - Height uint64 - Weight uint64 - Fee uint64 + Tx *types.Tx `json:"transaction"` + Added time.Time `json:"-"` + StatusFail bool `json:"status_fail"` + Height uint64 `json:"-"` + Weight uint64 `json:"-"` + Fee uint64 `json:"-"` } // TxPoolMsg is use for notify pool changes @@ -59,30 +65,28 @@ type orphanTx struct { // TxPool is use for store the unconfirmed transaction type TxPool struct { - lastUpdated int64 - mtx sync.RWMutex - store Store - pool map[bc.Hash]*TxDesc - utxo map[bc.Hash]*types.Tx - orphans map[bc.Hash]*orphanTx - orphansByPrev map[bc.Hash]map[bc.Hash]*orphanTx - claimTx map[bc.Hash]bool - errCache *lru.Cache - msgCh chan *TxPoolMsg + lastUpdated int64 + mtx sync.RWMutex + store Store + pool map[bc.Hash]*TxDesc + utxo map[bc.Hash]*types.Tx + orphans map[bc.Hash]*orphanTx + orphansByPrev map[bc.Hash]map[bc.Hash]*orphanTx + errCache *lru.Cache + eventDispatcher *event.Dispatcher } // NewTxPool init a new TxPool -func NewTxPool(store Store) *TxPool { +func NewTxPool(store Store, dispatcher *event.Dispatcher) *TxPool { tp := &TxPool{ - lastUpdated: time.Now().Unix(), - store: store, - pool: make(map[bc.Hash]*TxDesc), - utxo: make(map[bc.Hash]*types.Tx), - orphans: make(map[bc.Hash]*orphanTx), - orphansByPrev: make(map[bc.Hash]map[bc.Hash]*orphanTx), - claimTx: make(map[bc.Hash]bool), - errCache: lru.New(maxCachedErrTxs), - msgCh: make(chan *TxPoolMsg, maxMsgChSize), + lastUpdated: time.Now().Unix(), + store: store, + pool: make(map[bc.Hash]*TxDesc), + utxo: make(map[bc.Hash]*types.Tx), + orphans: make(map[bc.Hash]*orphanTx), + orphansByPrev: make(map[bc.Hash]map[bc.Hash]*orphanTx), + errCache: lru.New(maxCachedErrTxs), + eventDispatcher: dispatcher, } go tp.orphanExpireWorker() return tp @@ -120,11 +124,6 @@ func (tp *TxPool) GetErrCache(txHash *bc.Hash) error { return v.(error) } -// GetMsgCh return a unconfirmed transaction feed channel -func (tp *TxPool) GetMsgCh() <-chan *TxPoolMsg { - return tp.msgCh -} - // RemoveTransaction remove a transaction from the pool func (tp *TxPool) RemoveTransaction(txHash *bc.Hash) { tp.mtx.Lock() @@ -138,12 +137,11 @@ func (tp *TxPool) RemoveTransaction(txHash *bc.Hash) { for _, output := range txD.Tx.ResultIds { delete(tp.utxo, *output) } - tp.removeClaimTx(txD.Tx) delete(tp.pool, *txHash) atomic.StoreInt64(&tp.lastUpdated, time.Now().Unix()) - tp.msgCh <- &TxPoolMsg{TxDesc: txD, MsgType: MsgRemoveTx} - log.WithField("tx_id", txHash).Debug("remove tx from mempool") + tp.eventDispatcher.Post(TxMsgEvent{TxMsg: &TxPoolMsg{TxDesc: txD, MsgType: MsgRemoveTx}}) + log.WithFields(log.Fields{"module": logModule, "tx_id": txHash}).Debug("remove tx from mempool") } // GetTransaction return the TxDesc by hash @@ -194,15 +192,37 @@ func (tp *TxPool) HaveTransaction(txHash *bc.Hash) bool { return tp.IsTransactionInPool(txHash) || tp.IsTransactionInErrCache(txHash) } -// ProcessTransaction is the main entry for txpool handle new tx -func (tp *TxPool) ProcessTransaction(tx *types.Tx, statusFail bool, height, fee uint64) (bool, error) { +func isTransactionNoBtmInput(tx *types.Tx) bool { + for _, input := range tx.TxData.Inputs { + switch input.InputType() { + case types.CrossChainInputType: + return false + } + if input.AssetID() == *consensus.BTMAssetID { + return false + } + } + + return true +} + +func isTransactionZeroOutput(tx *types.Tx) bool { + for _, output := range tx.TxData.Outputs { + if value := output.AssetAmount(); value.Amount == uint64(0) { + return true + } + } + return false +} + +func (tp *TxPool) IsDust(tx *types.Tx) bool { + return isTransactionNoBtmInput(tx) || isTransactionZeroOutput(tx) +} + +func (tp *TxPool) processTransaction(tx *types.Tx, statusFail bool, height, fee uint64) (bool, error) { tp.mtx.Lock() defer tp.mtx.Unlock() - if tp.IsWithdrawSpent(tx) { - log.WithFields(log.Fields{"module": "ProcessTransaction", "error": "pegin-already-claimed"}).Error("ProcessTransaction error") - return false, errors.New("pegin-already-claimed") - } txD := &TxDesc{ Tx: tx, StatusFail: statusFail, @@ -218,6 +238,7 @@ func (tp *TxPool) ProcessTransaction(tx *types.Tx, statusFail bool, height, fee if len(requireParents) > 0 { return true, tp.addOrphan(txD, requireParents) } + if err := tp.addTransaction(txD); err != nil { return false, err } @@ -226,40 +247,13 @@ func (tp *TxPool) ProcessTransaction(tx *types.Tx, statusFail bool, height, fee return false, nil } -func (tp *TxPool) IsWithdrawSpent(tx *types.Tx) bool { - for key, value := range tx.Entries { - switch value.(type) { - case *bc.Claim: - _, ok := tp.claimTx[key] - return tp.store.IsWithdrawSpent(&key) || ok - default: - continue - } - } - - return false -} - -func (tp *TxPool) addClaimTx(tx *types.Tx) { - for key, value := range tx.Entries { - switch value.(type) { - case *bc.Claim: - tp.claimTx[key] = true - default: - continue - } - } -} - -func (tp *TxPool) removeClaimTx(tx *types.Tx) { - for key, value := range tx.Entries { - switch value.(type) { - case *bc.Claim: - delete(tp.claimTx, key) - default: - continue - } +// ProcessTransaction is the main entry for txpool handle new tx, ignore dust tx. +func (tp *TxPool) ProcessTransaction(tx *types.Tx, statusFail bool, height, fee uint64) (bool, error) { + if tp.IsDust(tx) { + log.WithFields(log.Fields{"module": logModule, "tx_id": tx.ID.String()}).Warn("dust tx") + return false, nil } + return tp.processTransaction(tx, statusFail, height, fee) } func (tp *TxPool) addOrphan(txD *TxDesc, requireParents []*bc.Hash) error { @@ -286,23 +280,30 @@ func (tp *TxPool) addTransaction(txD *TxDesc) error { tx := txD.Tx txD.Added = time.Now() tp.pool[tx.ID] = txD - // 增加一个claim id 到到claim pool中 - tp.addClaimTx(tx) for _, id := range tx.ResultIds { - output, err := tx.Output(*id) + outputEntry, err := tx.Entry(*id) if err != nil { - // error due to it's a retirement, utxo doesn't care this output type so skip it + return err + } + + var assetID bc.AssetID + switch output := outputEntry.(type) { + case *bc.IntraChainOutput: + assetID = *output.Source.Value.AssetId + case *bc.VoteOutput: + assetID = *output.Source.Value.AssetId + default: continue } - if !txD.StatusFail || *output.Source.Value.AssetId == *consensus.BTMAssetID { + + if !txD.StatusFail || assetID == *consensus.BTMAssetID { tp.utxo[*id] = tx } } atomic.StoreInt64(&tp.lastUpdated, time.Now().Unix()) - tp.msgCh <- &TxPoolMsg{TxDesc: txD, MsgType: MsgNewTx} - - log.WithField("tx_id", tx.ID.String()).Debug("Add tx to mempool") + tp.eventDispatcher.Post(TxMsgEvent{TxMsg: &TxPoolMsg{TxDesc: txD, MsgType: MsgNewTx}}) + log.WithFields(log.Fields{"module": logModule, "tx_id": tx.ID.String()}).Debug("Add tx to mempool") return nil } @@ -323,6 +324,8 @@ func (tp *TxPool) checkOrphanUtxos(tx *types.Tx) ([]*bc.Hash, error) { func (tp *TxPool) orphanExpireWorker() { ticker := time.NewTicker(orphanExpireScanInterval) + defer ticker.Stop() + for now := range ticker.C { tp.ExpireOrphan(now) } @@ -349,7 +352,7 @@ func (tp *TxPool) processOrphans(txD *TxDesc) { processOrphan := processOrphans[0] requireParents, err := tp.checkOrphanUtxos(processOrphan.Tx) if err != nil { - log.WithField("err", err).Error("processOrphans got unexpect error") + log.WithFields(log.Fields{"module": logModule, "err": err}).Error("processOrphans got unexpect error") continue }