9 "github.com/golang/groupcache/lru"
10 log "github.com/sirupsen/logrus"
12 "github.com/vapor/consensus"
13 "github.com/vapor/protocol/bc"
14 "github.com/vapor/protocol/bc/types"
15 "github.com/vapor/protocol/state"
25 maxCachedErrTxs = 1000
30 orphanTTL = 10 * time.Minute
31 orphanExpireScanInterval = 3 * time.Minute
33 // ErrTransactionNotExist is the pre-defined error message
34 ErrTransactionNotExist = errors.New("transaction are not existed in the mempool")
35 // ErrPoolIsFull indicates the pool is full
36 ErrPoolIsFull = errors.New("transaction pool reach the max number")
39 // TxDesc store tx and related info for mining strategy
49 // TxPoolMsg is use for notify pool changes
50 type TxPoolMsg struct {
55 type orphanTx struct {
60 // TxPool is use for store the unconfirmed transaction
65 pool map[bc.Hash]*TxDesc
66 utxo map[bc.Hash]*types.Tx
67 orphans map[bc.Hash]*orphanTx
68 orphansByPrev map[bc.Hash]map[bc.Hash]*orphanTx
69 claimTx map[bc.Hash]bool
74 // NewTxPool init a new TxPool
75 func NewTxPool(store Store) *TxPool {
77 lastUpdated: time.Now().Unix(),
79 pool: make(map[bc.Hash]*TxDesc),
80 utxo: make(map[bc.Hash]*types.Tx),
81 orphans: make(map[bc.Hash]*orphanTx),
82 orphansByPrev: make(map[bc.Hash]map[bc.Hash]*orphanTx),
83 claimTx: make(map[bc.Hash]bool),
84 errCache: lru.New(maxCachedErrTxs),
85 msgCh: make(chan *TxPoolMsg, maxMsgChSize),
87 go tp.orphanExpireWorker()
91 // AddErrCache add a failed transaction record to lru cache
92 func (tp *TxPool) AddErrCache(txHash *bc.Hash, err error) {
96 tp.errCache.Add(txHash, err)
99 // ExpireOrphan expire all the orphans that before the input time range
100 func (tp *TxPool) ExpireOrphan(now time.Time) {
102 defer tp.mtx.Unlock()
104 for hash, orphan := range tp.orphans {
105 if orphan.expiration.Before(now) {
106 tp.removeOrphan(&hash)
111 // GetErrCache return the error of the transaction
112 func (tp *TxPool) GetErrCache(txHash *bc.Hash) error {
114 defer tp.mtx.Unlock()
116 v, ok := tp.errCache.Get(txHash)
123 // GetMsgCh return a unconfirmed transaction feed channel
124 func (tp *TxPool) GetMsgCh() <-chan *TxPoolMsg {
128 // RemoveTransaction remove a transaction from the pool
129 func (tp *TxPool) RemoveTransaction(txHash *bc.Hash) {
131 defer tp.mtx.Unlock()
133 txD, ok := tp.pool[*txHash]
138 for _, output := range txD.Tx.ResultIds {
139 delete(tp.utxo, *output)
141 tp.removeClaimTx(txD.Tx)
142 delete(tp.pool, *txHash)
144 atomic.StoreInt64(&tp.lastUpdated, time.Now().Unix())
145 tp.msgCh <- &TxPoolMsg{TxDesc: txD, MsgType: MsgRemoveTx}
146 log.WithField("tx_id", txHash).Debug("remove tx from mempool")
149 // GetTransaction return the TxDesc by hash
150 func (tp *TxPool) GetTransaction(txHash *bc.Hash) (*TxDesc, error) {
152 defer tp.mtx.RUnlock()
154 if txD, ok := tp.pool[*txHash]; ok {
157 return nil, ErrTransactionNotExist
160 // GetTransactions return all the transactions in the pool
161 func (tp *TxPool) GetTransactions() []*TxDesc {
163 defer tp.mtx.RUnlock()
165 txDs := make([]*TxDesc, len(tp.pool))
167 for _, desc := range tp.pool {
174 // IsTransactionInPool check wheather a transaction in pool or not
175 func (tp *TxPool) IsTransactionInPool(txHash *bc.Hash) bool {
177 defer tp.mtx.RUnlock()
179 _, ok := tp.pool[*txHash]
183 // IsTransactionInErrCache check wheather a transaction in errCache or not
184 func (tp *TxPool) IsTransactionInErrCache(txHash *bc.Hash) bool {
186 defer tp.mtx.RUnlock()
188 _, ok := tp.errCache.Get(txHash)
192 // HaveTransaction IsTransactionInErrCache check is transaction in errCache or pool
193 func (tp *TxPool) HaveTransaction(txHash *bc.Hash) bool {
194 return tp.IsTransactionInPool(txHash) || tp.IsTransactionInErrCache(txHash)
197 // ProcessTransaction is the main entry for txpool handle new tx
198 func (tp *TxPool) ProcessTransaction(tx *types.Tx, statusFail bool, height, fee uint64) (bool, error) {
200 defer tp.mtx.Unlock()
202 if tp.IsWithdrawSpent(tx) {
203 log.WithFields(log.Fields{"module": "ProcessTransaction", "error": "pegin-already-claimed"}).Error("ProcessTransaction error")
204 return false, errors.New("pegin-already-claimed")
208 StatusFail: statusFail,
209 Weight: tx.SerializedSize,
213 requireParents, err := tp.checkOrphanUtxos(tx)
218 if len(requireParents) > 0 {
219 return true, tp.addOrphan(txD, requireParents)
221 if err := tp.addTransaction(txD); err != nil {
225 tp.processOrphans(txD)
229 func (tp *TxPool) IsWithdrawSpent(tx *types.Tx) bool {
230 for key, value := range tx.Entries {
231 switch value.(type) {
233 _, ok := tp.claimTx[key]
234 return tp.store.IsWithdrawSpent(&key) || ok
243 func (tp *TxPool) addClaimTx(tx *types.Tx) {
244 for key, value := range tx.Entries {
245 switch value.(type) {
247 tp.claimTx[key] = true
254 func (tp *TxPool) removeClaimTx(tx *types.Tx) {
255 for key, value := range tx.Entries {
256 switch value.(type) {
258 delete(tp.claimTx, key)
265 func (tp *TxPool) addOrphan(txD *TxDesc, requireParents []*bc.Hash) error {
266 if len(tp.orphans) >= maxOrphanNum {
270 orphan := &orphanTx{txD, time.Now().Add(orphanTTL)}
271 tp.orphans[txD.Tx.ID] = orphan
272 for _, hash := range requireParents {
273 if _, ok := tp.orphansByPrev[*hash]; !ok {
274 tp.orphansByPrev[*hash] = make(map[bc.Hash]*orphanTx)
276 tp.orphansByPrev[*hash][txD.Tx.ID] = orphan
281 func (tp *TxPool) addTransaction(txD *TxDesc) error {
282 if len(tp.pool) >= maxNewTxNum {
287 txD.Added = time.Now()
289 // 增加一个claim id 到到claim pool中
291 for _, id := range tx.ResultIds {
292 output, err := tx.Output(*id)
294 // error due to it's a retirement, utxo doesn't care this output type so skip it
297 if !txD.StatusFail || *output.Source.Value.AssetId == *consensus.BTMAssetID {
302 atomic.StoreInt64(&tp.lastUpdated, time.Now().Unix())
303 tp.msgCh <- &TxPoolMsg{TxDesc: txD, MsgType: MsgNewTx}
305 log.WithField("tx_id", tx.ID.String()).Debug("Add tx to mempool")
309 func (tp *TxPool) checkOrphanUtxos(tx *types.Tx) ([]*bc.Hash, error) {
310 view := state.NewUtxoViewpoint()
311 if err := tp.store.GetTransactionsUtxo(view, []*bc.Tx{tx.Tx}); err != nil {
315 hashes := []*bc.Hash{}
316 for _, hash := range tx.SpentOutputIDs {
317 if !view.CanSpend(&hash) && tp.utxo[hash] == nil {
318 hashes = append(hashes, &hash)
324 func (tp *TxPool) orphanExpireWorker() {
325 ticker := time.NewTicker(orphanExpireScanInterval)
326 for now := range ticker.C {
331 func (tp *TxPool) processOrphans(txD *TxDesc) {
332 processOrphans := []*orphanTx{}
333 addRely := func(tx *types.Tx) {
334 for _, outHash := range tx.ResultIds {
335 orphans, ok := tp.orphansByPrev[*outHash]
340 for _, orphan := range orphans {
341 processOrphans = append(processOrphans, orphan)
343 delete(tp.orphansByPrev, *outHash)
348 for ; len(processOrphans) > 0; processOrphans = processOrphans[1:] {
349 processOrphan := processOrphans[0]
350 requireParents, err := tp.checkOrphanUtxos(processOrphan.Tx)
352 log.WithField("err", err).Error("processOrphans got unexpect error")
356 if len(requireParents) == 0 {
357 addRely(processOrphan.Tx)
358 tp.removeOrphan(&processOrphan.Tx.ID)
359 tp.addTransaction(processOrphan.TxDesc)
364 func (tp *TxPool) removeOrphan(hash *bc.Hash) {
365 orphan, ok := tp.orphans[*hash]
370 for _, spend := range orphan.Tx.SpentOutputIDs {
371 orphans, ok := tp.orphansByPrev[spend]
376 if delete(orphans, *hash); len(orphans) == 0 {
377 delete(tp.orphansByPrev, spend)
380 delete(tp.orphans, *hash)