9 "github.com/golang/groupcache/lru"
10 log "github.com/sirupsen/logrus"
12 "github.com/bytom/vapor/consensus"
13 "github.com/bytom/vapor/event"
14 "github.com/bytom/vapor/protocol/bc"
15 "github.com/bytom/vapor/protocol/bc/types"
16 "github.com/bytom/vapor/protocol/state"
23 logModule = "protocol"
27 maxCachedErrTxs = 1000
32 orphanTTL = 60 * time.Second
33 orphanExpireScanInterval = 30 * time.Second
36 txExpireScanInterval = 20 * time.Minute
38 // ErrTransactionNotExist is the pre-defined error message
39 ErrTransactionNotExist = errors.New("transaction are not existed in the mempool")
40 // ErrPoolIsFull indicates the pool is full
41 ErrPoolIsFull = errors.New("transaction pool reach the max number")
42 // ErrDustTx indicates transaction is dust tx
43 ErrDustTx = errors.New("transaction is dust tx")
46 // DustFilterer is inerface for dust transaction filter rule
47 type DustFilterer interface {
48 IsDust(tx *types.Tx) bool
51 // TxMsgEvent is message wrap for subscribe event
52 type TxMsgEvent struct{ TxMsg *TxPoolMsg }
54 // TxDesc store tx and related info for mining strategy
56 Tx *types.Tx `json:"transaction"`
57 Added time.Time `json:"-"`
58 StatusFail bool `json:"status_fail"`
59 Height uint64 `json:"-"`
60 Weight uint64 `json:"-"`
64 // TxPoolMsg is use for notify pool changes
65 type TxPoolMsg struct {
70 type orphanTx struct {
75 // TxPool is use for store the unconfirmed transaction
80 pool map[bc.Hash]*TxDesc
81 utxo map[bc.Hash]*types.Tx
82 orphans map[bc.Hash]*orphanTx
83 orphansByPrev map[bc.Hash]map[bc.Hash]*orphanTx
85 filters []DustFilterer
86 eventDispatcher *event.Dispatcher
89 // NewTxPool init a new TxPool
90 func NewTxPool(store Store, filters []DustFilterer, dispatcher *event.Dispatcher) *TxPool {
92 lastUpdated: time.Now().Unix(),
94 pool: make(map[bc.Hash]*TxDesc),
95 utxo: make(map[bc.Hash]*types.Tx),
96 orphans: make(map[bc.Hash]*orphanTx),
97 orphansByPrev: make(map[bc.Hash]map[bc.Hash]*orphanTx),
98 errCache: lru.New(maxCachedErrTxs),
100 eventDispatcher: dispatcher,
102 go tp.orphanExpireWorker()
103 go tp.txExpireWorker()
107 // AddErrCache add a failed transaction record to lru cache
108 func (tp *TxPool) AddErrCache(txHash *bc.Hash, err error) {
110 defer tp.mtx.Unlock()
112 tp.errCache.Add(txHash, err)
115 // expireOrphan expire all the orphans that before the input time range
116 func (tp *TxPool) expireOrphan(now time.Time) {
118 defer tp.mtx.Unlock()
120 for hash, orphan := range tp.orphans {
121 if orphan.expiration.Before(now) {
122 tp.removeOrphan(&hash)
127 // GetErrCache return the error of the transaction
128 func (tp *TxPool) GetErrCache(txHash *bc.Hash) error {
130 defer tp.mtx.Unlock()
132 v, ok := tp.errCache.Get(txHash)
139 // RemoveTransaction remove a transaction from the pool
140 func (tp *TxPool) RemoveTransaction(txHash *bc.Hash) {
142 defer tp.mtx.Unlock()
144 if txD := tp.removeTransaction(txHash); txD != nil {
145 atomic.StoreInt64(&tp.lastUpdated, time.Now().Unix())
146 tp.eventDispatcher.Post(TxMsgEvent{TxMsg: &TxPoolMsg{TxDesc: txD, MsgType: MsgRemoveTx}})
147 log.WithFields(log.Fields{"module": logModule, "tx_id": txHash}).Debug("remove tx from mempool")
151 func (tp *TxPool) removeTransaction(txHash *bc.Hash) *TxDesc {
152 txD, ok := tp.pool[*txHash]
157 for _, output := range txD.Tx.ResultIds {
158 delete(tp.utxo, *output)
160 delete(tp.pool, *txHash)
164 // GetTransaction return the TxDesc by hash
165 func (tp *TxPool) GetTransaction(txHash *bc.Hash) (*TxDesc, error) {
167 defer tp.mtx.RUnlock()
169 if txD, ok := tp.pool[*txHash]; ok {
172 return nil, ErrTransactionNotExist
175 // GetTransactions return all the transactions in the pool
176 func (tp *TxPool) GetTransactions() []*TxDesc {
178 defer tp.mtx.RUnlock()
180 txDs := make([]*TxDesc, len(tp.pool))
182 for _, desc := range tp.pool {
189 // IsTransactionInPool check wheather a transaction in pool or not
190 func (tp *TxPool) IsTransactionInPool(txHash *bc.Hash) bool {
192 defer tp.mtx.RUnlock()
194 _, ok := tp.pool[*txHash]
198 // IsTransactionInErrCache check wheather a transaction in errCache or not
199 func (tp *TxPool) IsTransactionInErrCache(txHash *bc.Hash) bool {
201 defer tp.mtx.RUnlock()
203 _, ok := tp.errCache.Get(txHash)
207 // HaveTransaction IsTransactionInErrCache check is transaction in errCache or pool
208 func (tp *TxPool) HaveTransaction(txHash *bc.Hash) bool {
209 return tp.IsTransactionInPool(txHash) || tp.IsTransactionInErrCache(txHash)
212 func isTransactionZeroOutput(tx *types.Tx) bool {
213 for _, output := range tx.TxData.Outputs {
214 if value := output.AssetAmount(); value.Amount == uint64(0) {
221 func isNoGasStatusFail(tx *types.Tx, statusFail bool) bool {
226 for _, input := range tx.TxData.Inputs {
227 if *consensus.BTMAssetID == input.AssetID() {
234 //IsDust checks if a tx has zero output
235 func (tp *TxPool) IsDust(tx *types.Tx) bool {
236 if ok := isTransactionZeroOutput(tx); ok {
240 for _, filter := range tp.filters {
241 if ok := filter.IsDust(tx); ok {
248 func (tp *TxPool) processTransaction(tx *types.Tx, statusFail bool, height, fee uint64) (bool, error) {
250 defer tp.mtx.Unlock()
254 StatusFail: statusFail,
255 Weight: tx.SerializedSize,
259 requireParents, err := tp.checkOrphanUtxos(tx)
264 if len(requireParents) > 0 {
265 return true, tp.addOrphan(txD, requireParents)
268 if err := tp.addTransaction(txD); err != nil {
272 tp.processOrphans(txD)
276 // ProcessTransaction is the main entry for txpool handle new tx, ignore dust tx.
277 func (tp *TxPool) ProcessTransaction(tx *types.Tx, statusFail bool, height, fee uint64) (bool, error) {
279 log.WithFields(log.Fields{"module": logModule, "tx_id": tx.ID.String()}).Warn("dust tx")
283 if isNoGasStatusFail(tx, statusFail) {
284 log.WithFields(log.Fields{"module": logModule, "tx_id": tx.ID.String()}).Warn("drop no gas status fail tx")
287 return tp.processTransaction(tx, statusFail, height, fee)
290 func (tp *TxPool) addOrphan(txD *TxDesc, requireParents []*bc.Hash) error {
291 if len(tp.orphans) >= maxOrphanNum {
295 orphan := &orphanTx{txD, time.Now().Add(orphanTTL)}
296 tp.orphans[txD.Tx.ID] = orphan
297 for _, hash := range requireParents {
298 if _, ok := tp.orphansByPrev[*hash]; !ok {
299 tp.orphansByPrev[*hash] = make(map[bc.Hash]*orphanTx)
301 tp.orphansByPrev[*hash][txD.Tx.ID] = orphan
306 func (tp *TxPool) addTransaction(txD *TxDesc) error {
307 if len(tp.pool) >= maxNewTxNum {
312 txD.Added = time.Now()
314 for _, id := range tx.ResultIds {
315 outputEntry, err := tx.Entry(*id)
320 var assetID bc.AssetID
321 switch output := outputEntry.(type) {
322 case *bc.IntraChainOutput:
323 assetID = *output.Source.Value.AssetId
325 assetID = *output.Source.Value.AssetId
330 if !txD.StatusFail || assetID == *consensus.BTMAssetID {
335 atomic.StoreInt64(&tp.lastUpdated, time.Now().Unix())
336 tp.eventDispatcher.Post(TxMsgEvent{TxMsg: &TxPoolMsg{TxDesc: txD, MsgType: MsgNewTx}})
337 log.WithFields(log.Fields{"module": logModule, "tx_id": tx.ID.String()}).Debug("Add tx to mempool")
341 func (tp *TxPool) checkOrphanUtxos(tx *types.Tx) ([]*bc.Hash, error) {
342 view := state.NewUtxoViewpoint()
343 if err := tp.store.GetTransactionsUtxo(view, []*bc.Tx{tx.Tx}); err != nil {
347 hashes := []*bc.Hash{}
348 for _, hash := range tx.SpentOutputIDs {
349 if !view.CanSpend(&hash) && tp.utxo[hash] == nil {
350 hashes = append(hashes, &hash)
356 func (tp *TxPool) orphanExpireWorker() {
357 ticker := time.NewTicker(orphanExpireScanInterval)
360 for now := range ticker.C {
365 func (tp *TxPool) processOrphans(txD *TxDesc) {
366 processOrphans := []*orphanTx{}
367 addRely := func(tx *types.Tx) {
368 for _, outHash := range tx.ResultIds {
369 orphans, ok := tp.orphansByPrev[*outHash]
374 for _, orphan := range orphans {
375 processOrphans = append(processOrphans, orphan)
377 delete(tp.orphansByPrev, *outHash)
382 for ; len(processOrphans) > 0; processOrphans = processOrphans[1:] {
383 processOrphan := processOrphans[0]
384 requireParents, err := tp.checkOrphanUtxos(processOrphan.Tx)
386 log.WithFields(log.Fields{"module": logModule, "err": err}).Error("processOrphans got unexpect error")
390 if len(requireParents) == 0 {
391 addRely(processOrphan.Tx)
392 tp.removeOrphan(&processOrphan.Tx.ID)
393 tp.addTransaction(processOrphan.TxDesc)
398 func (tp *TxPool) removeOrphan(hash *bc.Hash) {
399 orphan, ok := tp.orphans[*hash]
404 for _, spend := range orphan.Tx.SpentOutputIDs {
405 orphans, ok := tp.orphansByPrev[spend]
410 if delete(orphans, *hash); len(orphans) == 0 {
411 delete(tp.orphansByPrev, spend)
414 delete(tp.orphans, *hash)
417 func (tp *TxPool) txExpireWorker() {
418 ticker := time.NewTicker(txExpireScanInterval)
421 for now := range ticker.C {
426 // expireTx expires all the Txs that before the input time range
427 func (tp *TxPool) expireTx(now time.Time) {
429 defer tp.mtx.Unlock()
431 cutOff := now.Add(-txTTL)
432 for hash, txD := range tp.pool {
433 if txD.Added.Before(cutOff) {
434 tp.removeTransaction(&hash)