9 "github.com/golang/groupcache/lru"
10 log "github.com/sirupsen/logrus"
12 "github.com/bytom/vapor/consensus"
13 "github.com/bytom/vapor/event"
14 "github.com/bytom/vapor/protocol/bc"
15 "github.com/bytom/vapor/protocol/bc/types"
16 "github.com/bytom/vapor/protocol/state"
23 logModule = "protocol"
27 maxCachedErrTxs = 1000
32 orphanTTL = 60 * time.Second
33 orphanExpireScanInterval = 30 * time.Second
36 txExpireScanInterval = 20 * time.Minute
38 // ErrTransactionNotExist is the pre-defined error message
39 ErrTransactionNotExist = errors.New("transaction are not existed in the mempool")
40 // ErrPoolIsFull indicates the pool is full
41 ErrPoolIsFull = errors.New("transaction pool reach the max number")
42 // ErrDustTx indicates transaction is dust tx
43 ErrDustTx = errors.New("transaction is dust tx")
46 type DustFilterer interface {
47 IsDust(tx *types.Tx) bool
50 type TxMsgEvent struct{ TxMsg *TxPoolMsg }
52 // TxDesc store tx and related info for mining strategy
54 Tx *types.Tx `json:"transaction"`
55 Added time.Time `json:"-"`
56 StatusFail bool `json:"status_fail"`
57 Height uint64 `json:"-"`
58 Weight uint64 `json:"-"`
62 // TxPoolMsg is use for notify pool changes
63 type TxPoolMsg struct {
68 type orphanTx struct {
73 // TxPool is use for store the unconfirmed transaction
78 pool map[bc.Hash]*TxDesc
79 utxo map[bc.Hash]*types.Tx
80 orphans map[bc.Hash]*orphanTx
81 orphansByPrev map[bc.Hash]map[bc.Hash]*orphanTx
83 filters []DustFilterer
84 eventDispatcher *event.Dispatcher
87 // NewTxPool init a new TxPool
88 func NewTxPool(store Store, filters []DustFilterer, dispatcher *event.Dispatcher) *TxPool {
90 lastUpdated: time.Now().Unix(),
92 pool: make(map[bc.Hash]*TxDesc),
93 utxo: make(map[bc.Hash]*types.Tx),
94 orphans: make(map[bc.Hash]*orphanTx),
95 orphansByPrev: make(map[bc.Hash]map[bc.Hash]*orphanTx),
96 errCache: lru.New(maxCachedErrTxs),
98 eventDispatcher: dispatcher,
100 go tp.orphanExpireWorker()
101 go tp.txExpireWorker()
105 // AddErrCache add a failed transaction record to lru cache
106 func (tp *TxPool) AddErrCache(txHash *bc.Hash, err error) {
108 defer tp.mtx.Unlock()
110 tp.errCache.Add(txHash, err)
113 // expireOrphan expire all the orphans that before the input time range
114 func (tp *TxPool) expireOrphan(now time.Time) {
116 defer tp.mtx.Unlock()
118 for hash, orphan := range tp.orphans {
119 if orphan.expiration.Before(now) {
120 tp.removeOrphan(&hash)
125 // GetErrCache return the error of the transaction
126 func (tp *TxPool) GetErrCache(txHash *bc.Hash) error {
128 defer tp.mtx.Unlock()
130 v, ok := tp.errCache.Get(txHash)
137 // RemoveTransaction remove a transaction from the pool
138 func (tp *TxPool) RemoveTransaction(txHash *bc.Hash) {
140 defer tp.mtx.Unlock()
142 if txD := tp.removeTransaction(txHash); txD != nil {
143 atomic.StoreInt64(&tp.lastUpdated, time.Now().Unix())
144 tp.eventDispatcher.Post(TxMsgEvent{TxMsg: &TxPoolMsg{TxDesc: txD, MsgType: MsgRemoveTx}})
145 log.WithFields(log.Fields{"module": logModule, "tx_id": txHash}).Debug("remove tx from mempool")
149 func (tp *TxPool) removeTransaction(txHash *bc.Hash) *TxDesc {
150 txD, ok := tp.pool[*txHash]
155 for _, output := range txD.Tx.ResultIds {
156 delete(tp.utxo, *output)
158 delete(tp.pool, *txHash)
162 // GetTransaction return the TxDesc by hash
163 func (tp *TxPool) GetTransaction(txHash *bc.Hash) (*TxDesc, error) {
165 defer tp.mtx.RUnlock()
167 if txD, ok := tp.pool[*txHash]; ok {
170 return nil, ErrTransactionNotExist
173 // GetTransactions return all the transactions in the pool
174 func (tp *TxPool) GetTransactions() []*TxDesc {
176 defer tp.mtx.RUnlock()
178 txDs := make([]*TxDesc, len(tp.pool))
180 for _, desc := range tp.pool {
187 // IsTransactionInPool check wheather a transaction in pool or not
188 func (tp *TxPool) IsTransactionInPool(txHash *bc.Hash) bool {
190 defer tp.mtx.RUnlock()
192 _, ok := tp.pool[*txHash]
196 // IsTransactionInErrCache check wheather a transaction in errCache or not
197 func (tp *TxPool) IsTransactionInErrCache(txHash *bc.Hash) bool {
199 defer tp.mtx.RUnlock()
201 _, ok := tp.errCache.Get(txHash)
205 // HaveTransaction IsTransactionInErrCache check is transaction in errCache or pool
206 func (tp *TxPool) HaveTransaction(txHash *bc.Hash) bool {
207 return tp.IsTransactionInPool(txHash) || tp.IsTransactionInErrCache(txHash)
210 func isTransactionZeroOutput(tx *types.Tx) bool {
211 for _, output := range tx.TxData.Outputs {
212 if value := output.AssetAmount(); value.Amount == uint64(0) {
219 func isNoGasStatusFail(tx *types.Tx, statusFail bool) bool {
224 for _, input := range tx.TxData.Inputs {
225 if *consensus.BTMAssetID == input.AssetID() {
232 //IsDust checks if a tx has zero output
233 func (tp *TxPool) IsDust(tx *types.Tx) bool {
234 if ok := isTransactionZeroOutput(tx); ok {
238 for _, filter := range tp.filters {
239 if ok := filter.IsDust(tx); ok {
246 func (tp *TxPool) processTransaction(tx *types.Tx, statusFail bool, height, fee uint64) (bool, error) {
248 defer tp.mtx.Unlock()
252 StatusFail: statusFail,
253 Weight: tx.SerializedSize,
257 requireParents, err := tp.checkOrphanUtxos(tx)
262 if len(requireParents) > 0 {
263 return true, tp.addOrphan(txD, requireParents)
266 if err := tp.addTransaction(txD); err != nil {
270 tp.processOrphans(txD)
274 // ProcessTransaction is the main entry for txpool handle new tx, ignore dust tx.
275 func (tp *TxPool) ProcessTransaction(tx *types.Tx, statusFail bool, height, fee uint64) (bool, error) {
277 log.WithFields(log.Fields{"module": logModule, "tx_id": tx.ID.String()}).Warn("dust tx")
281 if isNoGasStatusFail(tx, statusFail) {
282 log.WithFields(log.Fields{"module": logModule, "tx_id": tx.ID.String()}).Warn("drop no gas status fail tx")
285 return tp.processTransaction(tx, statusFail, height, fee)
288 func (tp *TxPool) addOrphan(txD *TxDesc, requireParents []*bc.Hash) error {
289 if len(tp.orphans) >= maxOrphanNum {
293 orphan := &orphanTx{txD, time.Now().Add(orphanTTL)}
294 tp.orphans[txD.Tx.ID] = orphan
295 for _, hash := range requireParents {
296 if _, ok := tp.orphansByPrev[*hash]; !ok {
297 tp.orphansByPrev[*hash] = make(map[bc.Hash]*orphanTx)
299 tp.orphansByPrev[*hash][txD.Tx.ID] = orphan
304 func (tp *TxPool) addTransaction(txD *TxDesc) error {
305 if len(tp.pool) >= maxNewTxNum {
310 txD.Added = time.Now()
312 for _, id := range tx.ResultIds {
313 outputEntry, err := tx.Entry(*id)
318 var assetID bc.AssetID
319 switch output := outputEntry.(type) {
320 case *bc.IntraChainOutput:
321 assetID = *output.Source.Value.AssetId
323 assetID = *output.Source.Value.AssetId
328 if !txD.StatusFail || assetID == *consensus.BTMAssetID {
333 atomic.StoreInt64(&tp.lastUpdated, time.Now().Unix())
334 tp.eventDispatcher.Post(TxMsgEvent{TxMsg: &TxPoolMsg{TxDesc: txD, MsgType: MsgNewTx}})
335 log.WithFields(log.Fields{"module": logModule, "tx_id": tx.ID.String()}).Debug("Add tx to mempool")
339 func (tp *TxPool) checkOrphanUtxos(tx *types.Tx) ([]*bc.Hash, error) {
340 view := state.NewUtxoViewpoint()
341 if err := tp.store.GetTransactionsUtxo(view, []*bc.Tx{tx.Tx}); err != nil {
345 hashes := []*bc.Hash{}
346 for _, hash := range tx.SpentOutputIDs {
347 if !view.CanSpend(&hash) && tp.utxo[hash] == nil {
348 hashes = append(hashes, &hash)
354 func (tp *TxPool) orphanExpireWorker() {
355 ticker := time.NewTicker(orphanExpireScanInterval)
358 for now := range ticker.C {
363 func (tp *TxPool) processOrphans(txD *TxDesc) {
364 processOrphans := []*orphanTx{}
365 addRely := func(tx *types.Tx) {
366 for _, outHash := range tx.ResultIds {
367 orphans, ok := tp.orphansByPrev[*outHash]
372 for _, orphan := range orphans {
373 processOrphans = append(processOrphans, orphan)
375 delete(tp.orphansByPrev, *outHash)
380 for ; len(processOrphans) > 0; processOrphans = processOrphans[1:] {
381 processOrphan := processOrphans[0]
382 requireParents, err := tp.checkOrphanUtxos(processOrphan.Tx)
384 log.WithFields(log.Fields{"module": logModule, "err": err}).Error("processOrphans got unexpect error")
388 if len(requireParents) == 0 {
389 addRely(processOrphan.Tx)
390 tp.removeOrphan(&processOrphan.Tx.ID)
391 tp.addTransaction(processOrphan.TxDesc)
396 func (tp *TxPool) removeOrphan(hash *bc.Hash) {
397 orphan, ok := tp.orphans[*hash]
402 for _, spend := range orphan.Tx.SpentOutputIDs {
403 orphans, ok := tp.orphansByPrev[spend]
408 if delete(orphans, *hash); len(orphans) == 0 {
409 delete(tp.orphansByPrev, spend)
412 delete(tp.orphans, *hash)
415 func (tp *TxPool) txExpireWorker() {
416 ticker := time.NewTicker(txExpireScanInterval)
419 for now := range ticker.C {
424 // expireTx expires all the Txs that before the input time range
425 func (tp *TxPool) expireTx(now time.Time) {
427 defer tp.mtx.Unlock()
429 cutOff := now.Add(-txTTL)
430 for hash, txD := range tp.pool {
431 if txD.Added.Before(cutOff) {
432 tp.removeTransaction(&hash)