X-Git-Url: http://git.osdn.net/view?p=bytom%2Fvapor.git;a=blobdiff_plain;f=protocol%2Ftxpool.go;h=09e32b16b1994a5a3d5aac788214f8ef06b3a9a7;hp=051117167000efadf7ac4b0664c2a2ff91f436b8;hb=3a6cd9640bdb0eed33451a5af7a0e12cfe423165;hpb=1accb05ca43992587ace7559732900098953980f diff --git a/protocol/txpool.go b/protocol/txpool.go index 05111716..09e32b16 100644 --- a/protocol/txpool.go +++ b/protocol/txpool.go @@ -10,6 +10,7 @@ import ( log "github.com/sirupsen/logrus" "github.com/vapor/consensus" + "github.com/vapor/event" "github.com/vapor/protocol/bc" "github.com/vapor/protocol/bc/types" "github.com/vapor/protocol/state" @@ -19,31 +20,39 @@ import ( const ( MsgNewTx = iota MsgRemoveTx + logModule = "protocol" ) var ( maxCachedErrTxs = 1000 maxMsgChSize = 1000 - maxNewTxNum = 10000 - maxOrphanNum = 2000 + maxNewTxNum = 65536 + maxOrphanNum = 32768 - orphanTTL = 10 * time.Minute - orphanExpireScanInterval = 3 * time.Minute + orphanTTL = 60 * time.Second + orphanExpireScanInterval = 30 * time.Second + + txTTL = 1 * time.Hour + txExpireScanInterval = 20 * time.Minute // ErrTransactionNotExist is the pre-defined error message ErrTransactionNotExist = errors.New("transaction are not existed in the mempool") // ErrPoolIsFull indicates the pool is full ErrPoolIsFull = errors.New("transaction pool reach the max number") + // ErrDustTx indicates transaction is dust tx + ErrDustTx = errors.New("transaction is dust tx") ) +type TxMsgEvent struct{ TxMsg *TxPoolMsg } + // TxDesc store tx and related info for mining strategy type TxDesc struct { - Tx *types.Tx - Added time.Time - StatusFail bool - Height uint64 - Weight uint64 - Fee uint64 + Tx *types.Tx `json:"transaction"` + Added time.Time `json:"-"` + StatusFail bool `json:"status_fail"` + Height uint64 `json:"-"` + Weight uint64 `json:"-"` + Fee uint64 `json:"-"` } // TxPoolMsg is use for notify pool changes @@ -59,32 +68,31 @@ type orphanTx struct { // TxPool is use for store the unconfirmed transaction type TxPool struct { - lastUpdated int64 - mtx sync.RWMutex - store Store - pool map[bc.Hash]*TxDesc - utxo map[bc.Hash]*types.Tx - orphans map[bc.Hash]*orphanTx - orphansByPrev map[bc.Hash]map[bc.Hash]*orphanTx - claimTx map[bc.Hash]bool - errCache *lru.Cache - msgCh chan *TxPoolMsg + lastUpdated int64 + mtx sync.RWMutex + store Store + pool map[bc.Hash]*TxDesc + utxo map[bc.Hash]*types.Tx + orphans map[bc.Hash]*orphanTx + orphansByPrev map[bc.Hash]map[bc.Hash]*orphanTx + errCache *lru.Cache + eventDispatcher *event.Dispatcher } // NewTxPool init a new TxPool -func NewTxPool(store Store) *TxPool { +func NewTxPool(store Store, dispatcher *event.Dispatcher) *TxPool { tp := &TxPool{ - lastUpdated: time.Now().Unix(), - store: store, - pool: make(map[bc.Hash]*TxDesc), - utxo: make(map[bc.Hash]*types.Tx), - orphans: make(map[bc.Hash]*orphanTx), - orphansByPrev: make(map[bc.Hash]map[bc.Hash]*orphanTx), - claimTx: make(map[bc.Hash]bool), - errCache: lru.New(maxCachedErrTxs), - msgCh: make(chan *TxPoolMsg, maxMsgChSize), + lastUpdated: time.Now().Unix(), + store: store, + pool: make(map[bc.Hash]*TxDesc), + utxo: make(map[bc.Hash]*types.Tx), + orphans: make(map[bc.Hash]*orphanTx), + orphansByPrev: make(map[bc.Hash]map[bc.Hash]*orphanTx), + errCache: lru.New(maxCachedErrTxs), + eventDispatcher: dispatcher, } go tp.orphanExpireWorker() + go tp.txExpireWorker() return tp } @@ -96,8 +104,8 @@ func (tp *TxPool) AddErrCache(txHash *bc.Hash, err error) { tp.errCache.Add(txHash, err) } -// ExpireOrphan expire all the orphans that before the input time range -func (tp *TxPool) ExpireOrphan(now time.Time) { +// expireOrphan expire all the orphans that before the input time range +func (tp *TxPool) expireOrphan(now time.Time) { tp.mtx.Lock() defer tp.mtx.Unlock() @@ -120,30 +128,29 @@ func (tp *TxPool) GetErrCache(txHash *bc.Hash) error { return v.(error) } -// GetMsgCh return a unconfirmed transaction feed channel -func (tp *TxPool) GetMsgCh() <-chan *TxPoolMsg { - return tp.msgCh -} - // RemoveTransaction remove a transaction from the pool func (tp *TxPool) RemoveTransaction(txHash *bc.Hash) { tp.mtx.Lock() defer tp.mtx.Unlock() + if txD := tp.removeTransaction(txHash); txD != nil { + atomic.StoreInt64(&tp.lastUpdated, time.Now().Unix()) + tp.eventDispatcher.Post(TxMsgEvent{TxMsg: &TxPoolMsg{TxDesc: txD, MsgType: MsgRemoveTx}}) + log.WithFields(log.Fields{"module": logModule, "tx_id": txHash}).Debug("remove tx from mempool") + } +} + +func (tp *TxPool) removeTransaction(txHash *bc.Hash) *TxDesc { txD, ok := tp.pool[*txHash] if !ok { - return + return nil } for _, output := range txD.Tx.ResultIds { delete(tp.utxo, *output) } - tp.removeClaimTx(txD.Tx) delete(tp.pool, *txHash) - - atomic.StoreInt64(&tp.lastUpdated, time.Now().Unix()) - tp.msgCh <- &TxPoolMsg{TxDesc: txD, MsgType: MsgRemoveTx} - log.WithField("tx_id", txHash).Debug("remove tx from mempool") + return txD } // GetTransaction return the TxDesc by hash @@ -194,16 +201,24 @@ func (tp *TxPool) HaveTransaction(txHash *bc.Hash) bool { return tp.IsTransactionInPool(txHash) || tp.IsTransactionInErrCache(txHash) } -// ProcessTransaction is the main entry for txpool handle new tx -func (tp *TxPool) ProcessTransaction(tx *types.Tx, statusFail bool, height, fee uint64) (bool, error) { +func isTransactionZeroOutput(tx *types.Tx) bool { + for _, output := range tx.TxData.Outputs { + if value := output.AssetAmount(); value.Amount == uint64(0) { + return true + } + } + return false +} + +//IsDust checks if a tx has zero output +func (tp *TxPool) IsDust(tx *types.Tx) bool { + return isTransactionZeroOutput(tx) +} + +func (tp *TxPool) processTransaction(tx *types.Tx, statusFail bool, height, fee uint64) (bool, error) { tp.mtx.Lock() defer tp.mtx.Unlock() - if tp.IsWithdrawSpent(tx) { - log.WithFields(log.Fields{"module": "ProcessTransaction", "error": "pegin-already-claimed"}).Error("ProcessTransaction error") - return false, errors.New("pegin-already-claimed") - } - txD := &TxDesc{ Tx: tx, StatusFail: statusFail, @@ -219,6 +234,7 @@ func (tp *TxPool) ProcessTransaction(tx *types.Tx, statusFail bool, height, fee if len(requireParents) > 0 { return true, tp.addOrphan(txD, requireParents) } + if err := tp.addTransaction(txD); err != nil { return false, err } @@ -227,40 +243,13 @@ func (tp *TxPool) ProcessTransaction(tx *types.Tx, statusFail bool, height, fee return false, nil } -func (tp *TxPool) IsWithdrawSpent(tx *types.Tx) bool { - for key, value := range tx.Entries { - switch value.(type) { - case *bc.Claim: - _, ok := tp.claimTx[key] - return tp.store.IsWithdrawSpent(&key) || ok - default: - continue - } - } - - return false -} - -func (tp *TxPool) addClaimTx(tx *types.Tx) { - for key, value := range tx.Entries { - switch value.(type) { - case *bc.Claim: - tp.claimTx[key] = true - default: - continue - } - } -} - -func (tp *TxPool) removeClaimTx(tx *types.Tx) { - for key, value := range tx.Entries { - switch value.(type) { - case *bc.Claim: - delete(tp.claimTx, key) - default: - continue - } +// ProcessTransaction is the main entry for txpool handle new tx, ignore dust tx. +func (tp *TxPool) ProcessTransaction(tx *types.Tx, statusFail bool, height, fee uint64) (bool, error) { + if tp.IsDust(tx) { + log.WithFields(log.Fields{"module": logModule, "tx_id": tx.ID.String()}).Warn("dust tx") + return false, nil } + return tp.processTransaction(tx, statusFail, height, fee) } func (tp *TxPool) addOrphan(txD *TxDesc, requireParents []*bc.Hash) error { @@ -287,22 +276,30 @@ func (tp *TxPool) addTransaction(txD *TxDesc) error { tx := txD.Tx txD.Added = time.Now() tp.pool[tx.ID] = txD - // 增加一个claim id 到到claim pool中 - tp.addClaimTx(tx) for _, id := range tx.ResultIds { - output, err := tx.Output(*id) + outputEntry, err := tx.Entry(*id) if err != nil { - // error due to it's a retirement, utxo doesn't care this output type so skip it + return err + } + + var assetID bc.AssetID + switch output := outputEntry.(type) { + case *bc.IntraChainOutput: + assetID = *output.Source.Value.AssetId + case *bc.VoteOutput: + assetID = *output.Source.Value.AssetId + default: continue } - if !txD.StatusFail || *output.Source.Value.AssetId == *consensus.BTMAssetID { + + if !txD.StatusFail || assetID == *consensus.BTMAssetID { tp.utxo[*id] = tx } } atomic.StoreInt64(&tp.lastUpdated, time.Now().Unix()) - tp.msgCh <- &TxPoolMsg{TxDesc: txD, MsgType: MsgNewTx} - log.WithField("tx_id", tx.ID.String()).Debug("Add tx to mempool") + tp.eventDispatcher.Post(TxMsgEvent{TxMsg: &TxPoolMsg{TxDesc: txD, MsgType: MsgNewTx}}) + log.WithFields(log.Fields{"module": logModule, "tx_id": tx.ID.String()}).Debug("Add tx to mempool") return nil } @@ -323,8 +320,10 @@ func (tp *TxPool) checkOrphanUtxos(tx *types.Tx) ([]*bc.Hash, error) { func (tp *TxPool) orphanExpireWorker() { ticker := time.NewTicker(orphanExpireScanInterval) + defer ticker.Stop() + for now := range ticker.C { - tp.ExpireOrphan(now) + tp.expireOrphan(now) } } @@ -349,7 +348,7 @@ func (tp *TxPool) processOrphans(txD *TxDesc) { processOrphan := processOrphans[0] requireParents, err := tp.checkOrphanUtxos(processOrphan.Tx) if err != nil { - log.WithField("err", err).Error("processOrphans got unexpect error") + log.WithFields(log.Fields{"module": logModule, "err": err}).Error("processOrphans got unexpect error") continue } @@ -379,3 +378,25 @@ func (tp *TxPool) removeOrphan(hash *bc.Hash) { } delete(tp.orphans, *hash) } + +func (tp *TxPool) txExpireWorker() { + ticker := time.NewTicker(txExpireScanInterval) + defer ticker.Stop() + + for now := range ticker.C { + tp.expireTx(now) + } +} + +// expireTx expires all the Txs that before the input time range +func (tp *TxPool) expireTx(now time.Time) { + tp.mtx.Lock() + defer tp.mtx.Unlock() + + cutOff := now.Add(-txTTL) + for hash, txD := range tp.pool { + if txD.Added.Before(cutOff) { + tp.removeTransaction(&hash) + } + } +}