9 "github.com/golang/groupcache/lru"
10 log "github.com/sirupsen/logrus"
12 "github.com/vapor/consensus"
13 "github.com/vapor/event"
14 "github.com/vapor/protocol/bc"
15 "github.com/vapor/protocol/bc/types"
16 "github.com/vapor/protocol/state"
23 logModule = "protocol"
27 maxCachedErrTxs = 1000
32 orphanTTL = 10 * time.Minute
33 orphanExpireScanInterval = 3 * time.Minute
35 // ErrTransactionNotExist is the pre-defined error message
36 ErrTransactionNotExist = errors.New("transaction are not existed in the mempool")
37 // ErrPoolIsFull indicates the pool is full
38 ErrPoolIsFull = errors.New("transaction pool reach the max number")
39 // ErrDustTx indicates transaction is dust tx
40 ErrDustTx = errors.New("transaction is dust tx")
43 type TxMsgEvent struct{ TxMsg *TxPoolMsg }
45 // TxDesc store tx and related info for mining strategy
47 Tx *types.Tx `json:"transaction"`
48 Added time.Time `json:"-"`
49 StatusFail bool `json:"status_fail"`
50 Height uint64 `json:"-"`
51 Weight uint64 `json:"-"`
55 // TxPoolMsg is use for notify pool changes
56 type TxPoolMsg struct {
61 type orphanTx struct {
66 // TxPool is use for store the unconfirmed transaction
71 pool map[bc.Hash]*TxDesc
72 utxo map[bc.Hash]*types.Tx
73 orphans map[bc.Hash]*orphanTx
74 orphansByPrev map[bc.Hash]map[bc.Hash]*orphanTx
76 eventDispatcher *event.Dispatcher
79 // NewTxPool init a new TxPool
80 func NewTxPool(store Store, dispatcher *event.Dispatcher) *TxPool {
82 lastUpdated: time.Now().Unix(),
84 pool: make(map[bc.Hash]*TxDesc),
85 utxo: make(map[bc.Hash]*types.Tx),
86 orphans: make(map[bc.Hash]*orphanTx),
87 orphansByPrev: make(map[bc.Hash]map[bc.Hash]*orphanTx),
88 errCache: lru.New(maxCachedErrTxs),
89 eventDispatcher: dispatcher,
91 go tp.orphanExpireWorker()
95 // AddErrCache add a failed transaction record to lru cache
96 func (tp *TxPool) AddErrCache(txHash *bc.Hash, err error) {
100 tp.errCache.Add(txHash, err)
103 // ExpireOrphan expire all the orphans that before the input time range
104 func (tp *TxPool) ExpireOrphan(now time.Time) {
106 defer tp.mtx.Unlock()
108 for hash, orphan := range tp.orphans {
109 if orphan.expiration.Before(now) {
110 tp.removeOrphan(&hash)
115 // GetErrCache return the error of the transaction
116 func (tp *TxPool) GetErrCache(txHash *bc.Hash) error {
118 defer tp.mtx.Unlock()
120 v, ok := tp.errCache.Get(txHash)
127 // RemoveTransaction remove a transaction from the pool
128 func (tp *TxPool) RemoveTransaction(txHash *bc.Hash) {
130 defer tp.mtx.Unlock()
132 txD, ok := tp.pool[*txHash]
137 for _, output := range txD.Tx.ResultIds {
138 delete(tp.utxo, *output)
140 delete(tp.pool, *txHash)
142 atomic.StoreInt64(&tp.lastUpdated, time.Now().Unix())
143 tp.eventDispatcher.Post(TxMsgEvent{TxMsg: &TxPoolMsg{TxDesc: txD, MsgType: MsgRemoveTx}})
144 log.WithFields(log.Fields{"module": logModule, "tx_id": txHash}).Debug("remove tx from mempool")
147 // GetTransaction return the TxDesc by hash
148 func (tp *TxPool) GetTransaction(txHash *bc.Hash) (*TxDesc, error) {
150 defer tp.mtx.RUnlock()
152 if txD, ok := tp.pool[*txHash]; ok {
155 return nil, ErrTransactionNotExist
158 // GetTransactions return all the transactions in the pool
159 func (tp *TxPool) GetTransactions() []*TxDesc {
161 defer tp.mtx.RUnlock()
163 txDs := make([]*TxDesc, len(tp.pool))
165 for _, desc := range tp.pool {
172 // IsTransactionInPool check wheather a transaction in pool or not
173 func (tp *TxPool) IsTransactionInPool(txHash *bc.Hash) bool {
175 defer tp.mtx.RUnlock()
177 _, ok := tp.pool[*txHash]
181 // IsTransactionInErrCache check wheather a transaction in errCache or not
182 func (tp *TxPool) IsTransactionInErrCache(txHash *bc.Hash) bool {
184 defer tp.mtx.RUnlock()
186 _, ok := tp.errCache.Get(txHash)
190 // HaveTransaction IsTransactionInErrCache check is transaction in errCache or pool
191 func (tp *TxPool) HaveTransaction(txHash *bc.Hash) bool {
192 return tp.IsTransactionInPool(txHash) || tp.IsTransactionInErrCache(txHash)
195 func isTransactionNoBtmInput(tx *types.Tx) bool {
196 for _, input := range tx.TxData.Inputs {
197 if input.AssetID() == *consensus.BTMAssetID {
204 func isTransactionZeroOutput(tx *types.Tx) bool {
205 for _, output := range tx.TxData.Outputs {
206 value := output.AssetAmount()
207 if value.Amount == uint64(0) {
214 func (tp *TxPool) IsDust(tx *types.Tx) bool {
215 return isTransactionNoBtmInput(tx) || isTransactionZeroOutput(tx)
218 func (tp *TxPool) processTransaction(tx *types.Tx, statusFail bool, height, fee uint64) (bool, error) {
220 defer tp.mtx.Unlock()
224 StatusFail: statusFail,
225 Weight: tx.SerializedSize,
229 requireParents, err := tp.checkOrphanUtxos(tx)
234 if len(requireParents) > 0 {
235 return true, tp.addOrphan(txD, requireParents)
238 if err := tp.addTransaction(txD); err != nil {
242 tp.processOrphans(txD)
246 // ProcessTransaction is the main entry for txpool handle new tx, ignore dust tx.
247 func (tp *TxPool) ProcessTransaction(tx *types.Tx, statusFail bool, height, fee uint64) (bool, error) {
249 log.WithFields(log.Fields{"module": logModule, "tx_id": tx.ID.String()}).Warn("dust tx")
252 return tp.processTransaction(tx, statusFail, height, fee)
255 func (tp *TxPool) addOrphan(txD *TxDesc, requireParents []*bc.Hash) error {
256 if len(tp.orphans) >= maxOrphanNum {
260 orphan := &orphanTx{txD, time.Now().Add(orphanTTL)}
261 tp.orphans[txD.Tx.ID] = orphan
262 for _, hash := range requireParents {
263 if _, ok := tp.orphansByPrev[*hash]; !ok {
264 tp.orphansByPrev[*hash] = make(map[bc.Hash]*orphanTx)
266 tp.orphansByPrev[*hash][txD.Tx.ID] = orphan
271 func (tp *TxPool) addTransaction(txD *TxDesc) error {
272 if len(tp.pool) >= maxNewTxNum {
277 txD.Added = time.Now()
279 for _, id := range tx.ResultIds {
280 var assetID bc.AssetID
281 outputEntry, err := tx.Entry(*id)
285 switch output := outputEntry.(type) {
286 case *bc.IntraChainOutput:
287 assetID = *output.Source.Value.AssetId
289 assetID = *output.Source.Value.AssetId
294 if !txD.StatusFail || assetID == *consensus.BTMAssetID {
299 atomic.StoreInt64(&tp.lastUpdated, time.Now().Unix())
300 tp.eventDispatcher.Post(TxMsgEvent{TxMsg: &TxPoolMsg{TxDesc: txD, MsgType: MsgNewTx}})
301 log.WithFields(log.Fields{"module": logModule, "tx_id": tx.ID.String()}).Debug("Add tx to mempool")
305 func (tp *TxPool) checkOrphanUtxos(tx *types.Tx) ([]*bc.Hash, error) {
306 view := state.NewUtxoViewpoint()
307 if err := tp.store.GetTransactionsUtxo(view, []*bc.Tx{tx.Tx}); err != nil {
311 hashes := []*bc.Hash{}
312 for _, hash := range tx.SpentOutputIDs {
313 if !view.CanSpend(&hash) && tp.utxo[hash] == nil {
314 hashes = append(hashes, &hash)
320 func (tp *TxPool) orphanExpireWorker() {
321 ticker := time.NewTicker(orphanExpireScanInterval)
324 for now := range ticker.C {
329 func (tp *TxPool) processOrphans(txD *TxDesc) {
330 processOrphans := []*orphanTx{}
331 addRely := func(tx *types.Tx) {
332 for _, outHash := range tx.ResultIds {
333 orphans, ok := tp.orphansByPrev[*outHash]
338 for _, orphan := range orphans {
339 processOrphans = append(processOrphans, orphan)
341 delete(tp.orphansByPrev, *outHash)
346 for ; len(processOrphans) > 0; processOrphans = processOrphans[1:] {
347 processOrphan := processOrphans[0]
348 requireParents, err := tp.checkOrphanUtxos(processOrphan.Tx)
350 log.WithFields(log.Fields{"module": logModule, "err": err}).Error("processOrphans got unexpect error")
354 if len(requireParents) == 0 {
355 addRely(processOrphan.Tx)
356 tp.removeOrphan(&processOrphan.Tx.ID)
357 tp.addTransaction(processOrphan.TxDesc)
362 func (tp *TxPool) removeOrphan(hash *bc.Hash) {
363 orphan, ok := tp.orphans[*hash]
368 for _, spend := range orphan.Tx.SpentOutputIDs {
369 orphans, ok := tp.orphansByPrev[spend]
374 if delete(orphans, *hash); len(orphans) == 0 {
375 delete(tp.orphansByPrev, spend)
378 delete(tp.orphans, *hash)