OSDN Git Service

add maker taker (#537)
[bytom/vapor.git] / protocol / txpool.go
1 package protocol
2
3 import (
4         "errors"
5         "sync"
6         "sync/atomic"
7         "time"
8
9         "github.com/golang/groupcache/lru"
10         log "github.com/sirupsen/logrus"
11
12         "github.com/bytom/vapor/consensus"
13         "github.com/bytom/vapor/event"
14         "github.com/bytom/vapor/protocol/bc"
15         "github.com/bytom/vapor/protocol/bc/types"
16         "github.com/bytom/vapor/protocol/state"
17 )
18
19 // msg type
20 const (
21         MsgNewTx = iota
22         MsgRemoveTx
23         logModule = "protocol"
24 )
25
26 var (
27         maxCachedErrTxs = 1000
28         maxMsgChSize    = 1000
29         maxNewTxNum     = 65536
30         maxOrphanNum    = 32768
31
32         orphanTTL                = 60 * time.Second
33         orphanExpireScanInterval = 30 * time.Second
34
35         txTTL                = 1 * time.Hour
36         txExpireScanInterval = 20 * time.Minute
37
38         // ErrTransactionNotExist is the pre-defined error message
39         ErrTransactionNotExist = errors.New("transaction are not existed in the mempool")
40         // ErrPoolIsFull indicates the pool is full
41         ErrPoolIsFull = errors.New("transaction pool reach the max number")
42         // ErrDustTx indicates transaction is dust tx
43         ErrDustTx = errors.New("transaction is dust tx")
44 )
45
46 // DustFilterer is inerface for dust transaction filter rule
47 type DustFilterer interface {
48         IsDust(tx *types.Tx) bool
49 }
50
51 // TxMsgEvent is message wrap for subscribe event
52 type TxMsgEvent struct{ TxMsg *TxPoolMsg }
53
54 // TxDesc store tx and related info for mining strategy
55 type TxDesc struct {
56         Tx         *types.Tx `json:"transaction"`
57         Added      time.Time `json:"-"`
58         StatusFail bool      `json:"status_fail"`
59         Height     uint64    `json:"-"`
60         Weight     uint64    `json:"-"`
61         Fee        uint64    `json:"-"`
62 }
63
64 // TxPoolMsg is use for notify pool changes
65 type TxPoolMsg struct {
66         *TxDesc
67         MsgType int
68 }
69
70 type orphanTx struct {
71         *TxDesc
72         expiration time.Time
73 }
74
75 // TxPool is use for store the unconfirmed transaction
76 type TxPool struct {
77         lastUpdated     int64
78         mtx             sync.RWMutex
79         store           Store
80         pool            map[bc.Hash]*TxDesc
81         utxo            map[bc.Hash]*types.Tx
82         orphans         map[bc.Hash]*orphanTx
83         orphansByPrev   map[bc.Hash]map[bc.Hash]*orphanTx
84         errCache        *lru.Cache
85         filters         []DustFilterer
86         eventDispatcher *event.Dispatcher
87 }
88
89 // NewTxPool init a new TxPool
90 func NewTxPool(store Store, filters []DustFilterer, dispatcher *event.Dispatcher) *TxPool {
91         tp := &TxPool{
92                 lastUpdated:     time.Now().Unix(),
93                 store:           store,
94                 pool:            make(map[bc.Hash]*TxDesc),
95                 utxo:            make(map[bc.Hash]*types.Tx),
96                 orphans:         make(map[bc.Hash]*orphanTx),
97                 orphansByPrev:   make(map[bc.Hash]map[bc.Hash]*orphanTx),
98                 errCache:        lru.New(maxCachedErrTxs),
99                 filters:         filters,
100                 eventDispatcher: dispatcher,
101         }
102         go tp.orphanExpireWorker()
103         go tp.txExpireWorker()
104         return tp
105 }
106
107 // AddErrCache add a failed transaction record to lru cache
108 func (tp *TxPool) AddErrCache(txHash *bc.Hash, err error) {
109         tp.mtx.Lock()
110         defer tp.mtx.Unlock()
111
112         tp.errCache.Add(txHash, err)
113 }
114
115 // expireOrphan expire all the orphans that before the input time range
116 func (tp *TxPool) expireOrphan(now time.Time) {
117         tp.mtx.Lock()
118         defer tp.mtx.Unlock()
119
120         for hash, orphan := range tp.orphans {
121                 if orphan.expiration.Before(now) {
122                         tp.removeOrphan(&hash)
123                 }
124         }
125 }
126
127 // GetErrCache return the error of the transaction
128 func (tp *TxPool) GetErrCache(txHash *bc.Hash) error {
129         tp.mtx.Lock()
130         defer tp.mtx.Unlock()
131
132         v, ok := tp.errCache.Get(txHash)
133         if !ok {
134                 return nil
135         }
136         return v.(error)
137 }
138
139 // RemoveTransaction remove a transaction from the pool
140 func (tp *TxPool) RemoveTransaction(txHash *bc.Hash) {
141         tp.mtx.Lock()
142         defer tp.mtx.Unlock()
143
144         if txD := tp.removeTransaction(txHash); txD != nil {
145                 atomic.StoreInt64(&tp.lastUpdated, time.Now().Unix())
146                 tp.eventDispatcher.Post(TxMsgEvent{TxMsg: &TxPoolMsg{TxDesc: txD, MsgType: MsgRemoveTx}})
147                 log.WithFields(log.Fields{"module": logModule, "tx_id": txHash}).Debug("remove tx from mempool")
148         }
149 }
150
151 func (tp *TxPool) removeTransaction(txHash *bc.Hash) *TxDesc {
152         txD, ok := tp.pool[*txHash]
153         if !ok {
154                 return nil
155         }
156
157         for _, output := range txD.Tx.ResultIds {
158                 delete(tp.utxo, *output)
159         }
160         delete(tp.pool, *txHash)
161         return txD
162 }
163
164 // GetTransaction return the TxDesc by hash
165 func (tp *TxPool) GetTransaction(txHash *bc.Hash) (*TxDesc, error) {
166         tp.mtx.RLock()
167         defer tp.mtx.RUnlock()
168
169         if txD, ok := tp.pool[*txHash]; ok {
170                 return txD, nil
171         }
172         return nil, ErrTransactionNotExist
173 }
174
175 // GetTransactions return all the transactions in the pool
176 func (tp *TxPool) GetTransactions() []*TxDesc {
177         tp.mtx.RLock()
178         defer tp.mtx.RUnlock()
179
180         txDs := make([]*TxDesc, len(tp.pool))
181         i := 0
182         for _, desc := range tp.pool {
183                 txDs[i] = desc
184                 i++
185         }
186         return txDs
187 }
188
189 // IsTransactionInPool check wheather a transaction in pool or not
190 func (tp *TxPool) IsTransactionInPool(txHash *bc.Hash) bool {
191         tp.mtx.RLock()
192         defer tp.mtx.RUnlock()
193
194         _, ok := tp.pool[*txHash]
195         return ok
196 }
197
198 // IsTransactionInErrCache check wheather a transaction in errCache or not
199 func (tp *TxPool) IsTransactionInErrCache(txHash *bc.Hash) bool {
200         tp.mtx.RLock()
201         defer tp.mtx.RUnlock()
202
203         _, ok := tp.errCache.Get(txHash)
204         return ok
205 }
206
207 // HaveTransaction IsTransactionInErrCache check is  transaction in errCache or pool
208 func (tp *TxPool) HaveTransaction(txHash *bc.Hash) bool {
209         return tp.IsTransactionInPool(txHash) || tp.IsTransactionInErrCache(txHash)
210 }
211
212 func isTransactionZeroOutput(tx *types.Tx) bool {
213         for _, output := range tx.TxData.Outputs {
214                 if value := output.AssetAmount(); value.Amount == uint64(0) {
215                         return true
216                 }
217         }
218         return false
219 }
220
221 func isNoGasStatusFail(tx *types.Tx, statusFail bool) bool {
222         if !statusFail {
223                 return false
224         }
225
226         for _, input := range tx.TxData.Inputs {
227                 if *consensus.BTMAssetID == input.AssetID() {
228                         return false
229                 }
230         }
231         return true
232 }
233
234 //IsDust checks if a tx has zero output
235 func (tp *TxPool) IsDust(tx *types.Tx) bool {
236         if ok := isTransactionZeroOutput(tx); ok {
237                 return ok
238         }
239
240         for _, filter := range tp.filters {
241                 if ok := filter.IsDust(tx); ok {
242                         return ok
243                 }
244         }
245         return false
246 }
247
248 func (tp *TxPool) processTransaction(tx *types.Tx, statusFail bool, height, fee uint64) (bool, error) {
249         tp.mtx.Lock()
250         defer tp.mtx.Unlock()
251
252         txD := &TxDesc{
253                 Tx:         tx,
254                 StatusFail: statusFail,
255                 Weight:     tx.SerializedSize,
256                 Height:     height,
257                 Fee:        fee,
258         }
259         requireParents, err := tp.checkOrphanUtxos(tx)
260         if err != nil {
261                 return false, err
262         }
263
264         if len(requireParents) > 0 {
265                 return true, tp.addOrphan(txD, requireParents)
266         }
267
268         if err := tp.addTransaction(txD); err != nil {
269                 return false, err
270         }
271
272         tp.processOrphans(txD)
273         return false, nil
274 }
275
276 // ProcessTransaction is the main entry for txpool handle new tx, ignore dust tx.
277 func (tp *TxPool) ProcessTransaction(tx *types.Tx, statusFail bool, height, fee uint64) (bool, error) {
278         if tp.IsDust(tx) {
279                 log.WithFields(log.Fields{"module": logModule, "tx_id": tx.ID.String()}).Warn("dust tx")
280                 return false, nil
281         }
282
283         if isNoGasStatusFail(tx, statusFail) {
284                 log.WithFields(log.Fields{"module": logModule, "tx_id": tx.ID.String()}).Warn("drop no gas status fail tx")
285                 return false, nil
286         }
287         return tp.processTransaction(tx, statusFail, height, fee)
288 }
289
290 func (tp *TxPool) addOrphan(txD *TxDesc, requireParents []*bc.Hash) error {
291         if len(tp.orphans) >= maxOrphanNum {
292                 return ErrPoolIsFull
293         }
294
295         orphan := &orphanTx{txD, time.Now().Add(orphanTTL)}
296         tp.orphans[txD.Tx.ID] = orphan
297         for _, hash := range requireParents {
298                 if _, ok := tp.orphansByPrev[*hash]; !ok {
299                         tp.orphansByPrev[*hash] = make(map[bc.Hash]*orphanTx)
300                 }
301                 tp.orphansByPrev[*hash][txD.Tx.ID] = orphan
302         }
303         return nil
304 }
305
306 func (tp *TxPool) addTransaction(txD *TxDesc) error {
307         if len(tp.pool) >= maxNewTxNum {
308                 return ErrPoolIsFull
309         }
310
311         tx := txD.Tx
312         txD.Added = time.Now()
313         tp.pool[tx.ID] = txD
314         for _, id := range tx.ResultIds {
315                 outputEntry, err := tx.Entry(*id)
316                 if err != nil {
317                         return err
318                 }
319
320                 var assetID bc.AssetID
321                 switch output := outputEntry.(type) {
322                 case *bc.IntraChainOutput:
323                         assetID = *output.Source.Value.AssetId
324                 case *bc.VoteOutput:
325                         assetID = *output.Source.Value.AssetId
326                 default:
327                         continue
328                 }
329
330                 if !txD.StatusFail || assetID == *consensus.BTMAssetID {
331                         tp.utxo[*id] = tx
332                 }
333         }
334
335         atomic.StoreInt64(&tp.lastUpdated, time.Now().Unix())
336         tp.eventDispatcher.Post(TxMsgEvent{TxMsg: &TxPoolMsg{TxDesc: txD, MsgType: MsgNewTx}})
337         log.WithFields(log.Fields{"module": logModule, "tx_id": tx.ID.String()}).Debug("Add tx to mempool")
338         return nil
339 }
340
341 func (tp *TxPool) checkOrphanUtxos(tx *types.Tx) ([]*bc.Hash, error) {
342         view := state.NewUtxoViewpoint()
343         if err := tp.store.GetTransactionsUtxo(view, []*bc.Tx{tx.Tx}); err != nil {
344                 return nil, err
345         }
346
347         hashes := []*bc.Hash{}
348         for _, hash := range tx.SpentOutputIDs {
349                 if !view.CanSpend(&hash) && tp.utxo[hash] == nil {
350                         hashes = append(hashes, &hash)
351                 }
352         }
353         return hashes, nil
354 }
355
356 func (tp *TxPool) orphanExpireWorker() {
357         ticker := time.NewTicker(orphanExpireScanInterval)
358         defer ticker.Stop()
359
360         for now := range ticker.C {
361                 tp.expireOrphan(now)
362         }
363 }
364
365 func (tp *TxPool) processOrphans(txD *TxDesc) {
366         processOrphans := []*orphanTx{}
367         addRely := func(tx *types.Tx) {
368                 for _, outHash := range tx.ResultIds {
369                         orphans, ok := tp.orphansByPrev[*outHash]
370                         if !ok {
371                                 continue
372                         }
373
374                         for _, orphan := range orphans {
375                                 processOrphans = append(processOrphans, orphan)
376                         }
377                         delete(tp.orphansByPrev, *outHash)
378                 }
379         }
380
381         addRely(txD.Tx)
382         for ; len(processOrphans) > 0; processOrphans = processOrphans[1:] {
383                 processOrphan := processOrphans[0]
384                 requireParents, err := tp.checkOrphanUtxos(processOrphan.Tx)
385                 if err != nil {
386                         log.WithFields(log.Fields{"module": logModule, "err": err}).Error("processOrphans got unexpect error")
387                         continue
388                 }
389
390                 if len(requireParents) == 0 {
391                         addRely(processOrphan.Tx)
392                         tp.removeOrphan(&processOrphan.Tx.ID)
393                         tp.addTransaction(processOrphan.TxDesc)
394                 }
395         }
396 }
397
398 func (tp *TxPool) removeOrphan(hash *bc.Hash) {
399         orphan, ok := tp.orphans[*hash]
400         if !ok {
401                 return
402         }
403
404         for _, spend := range orphan.Tx.SpentOutputIDs {
405                 orphans, ok := tp.orphansByPrev[spend]
406                 if !ok {
407                         continue
408                 }
409
410                 if delete(orphans, *hash); len(orphans) == 0 {
411                         delete(tp.orphansByPrev, spend)
412                 }
413         }
414         delete(tp.orphans, *hash)
415 }
416
417 func (tp *TxPool) txExpireWorker() {
418         ticker := time.NewTicker(txExpireScanInterval)
419         defer ticker.Stop()
420
421         for now := range ticker.C {
422                 tp.expireTx(now)
423         }
424 }
425
426 // expireTx expires all the Txs that before the input time range
427 func (tp *TxPool) expireTx(now time.Time) {
428         tp.mtx.Lock()
429         defer tp.mtx.Unlock()
430
431         cutOff := now.Add(-txTTL)
432         for hash, txD := range tp.pool {
433                 if txD.Added.Before(cutOff) {
434                         tp.removeTransaction(&hash)
435                 }
436         }
437 }