OSDN Git Service

update repo
[bytom/vapor.git] / protocol / txpool.go
1 package protocol
2
3 import (
4         "errors"
5         "sync"
6         "sync/atomic"
7         "time"
8
9         "github.com/golang/groupcache/lru"
10         log "github.com/sirupsen/logrus"
11
12         "github.com/bytom/vapor/consensus"
13         "github.com/bytom/vapor/event"
14         "github.com/bytom/vapor/protocol/bc"
15         "github.com/bytom/vapor/protocol/bc/types"
16         "github.com/bytom/vapor/protocol/state"
17 )
18
19 // msg type
20 const (
21         MsgNewTx = iota
22         MsgRemoveTx
23         logModule = "protocol"
24 )
25
26 var (
27         maxCachedErrTxs = 1000
28         maxMsgChSize    = 1000
29         maxNewTxNum     = 65536
30         maxOrphanNum    = 32768
31
32         orphanTTL                = 60 * time.Second
33         orphanExpireScanInterval = 30 * time.Second
34
35         txTTL                = 1 * time.Hour
36         txExpireScanInterval = 20 * time.Minute
37
38         // ErrTransactionNotExist is the pre-defined error message
39         ErrTransactionNotExist = errors.New("transaction are not existed in the mempool")
40         // ErrPoolIsFull indicates the pool is full
41         ErrPoolIsFull = errors.New("transaction pool reach the max number")
42         // ErrDustTx indicates transaction is dust tx
43         ErrDustTx = errors.New("transaction is dust tx")
44 )
45
46 type DustFilterer interface {
47         IsDust(tx *types.Tx) bool
48 }
49
50 type TxMsgEvent struct{ TxMsg *TxPoolMsg }
51
52 // TxDesc store tx and related info for mining strategy
53 type TxDesc struct {
54         Tx         *types.Tx `json:"transaction"`
55         Added      time.Time `json:"-"`
56         StatusFail bool      `json:"status_fail"`
57         Height     uint64    `json:"-"`
58         Weight     uint64    `json:"-"`
59         Fee        uint64    `json:"-"`
60 }
61
62 // TxPoolMsg is use for notify pool changes
63 type TxPoolMsg struct {
64         *TxDesc
65         MsgType int
66 }
67
68 type orphanTx struct {
69         *TxDesc
70         expiration time.Time
71 }
72
73 // TxPool is use for store the unconfirmed transaction
74 type TxPool struct {
75         lastUpdated     int64
76         mtx             sync.RWMutex
77         store           Store
78         pool            map[bc.Hash]*TxDesc
79         utxo            map[bc.Hash]*types.Tx
80         orphans         map[bc.Hash]*orphanTx
81         orphansByPrev   map[bc.Hash]map[bc.Hash]*orphanTx
82         errCache        *lru.Cache
83         filters         []DustFilterer
84         eventDispatcher *event.Dispatcher
85 }
86
87 // NewTxPool init a new TxPool
88 func NewTxPool(store Store, filters []DustFilterer, dispatcher *event.Dispatcher) *TxPool {
89         tp := &TxPool{
90                 lastUpdated:     time.Now().Unix(),
91                 store:           store,
92                 pool:            make(map[bc.Hash]*TxDesc),
93                 utxo:            make(map[bc.Hash]*types.Tx),
94                 orphans:         make(map[bc.Hash]*orphanTx),
95                 orphansByPrev:   make(map[bc.Hash]map[bc.Hash]*orphanTx),
96                 errCache:        lru.New(maxCachedErrTxs),
97                 filters:         filters,
98                 eventDispatcher: dispatcher,
99         }
100         go tp.orphanExpireWorker()
101         go tp.txExpireWorker()
102         return tp
103 }
104
105 // AddErrCache add a failed transaction record to lru cache
106 func (tp *TxPool) AddErrCache(txHash *bc.Hash, err error) {
107         tp.mtx.Lock()
108         defer tp.mtx.Unlock()
109
110         tp.errCache.Add(txHash, err)
111 }
112
113 // expireOrphan expire all the orphans that before the input time range
114 func (tp *TxPool) expireOrphan(now time.Time) {
115         tp.mtx.Lock()
116         defer tp.mtx.Unlock()
117
118         for hash, orphan := range tp.orphans {
119                 if orphan.expiration.Before(now) {
120                         tp.removeOrphan(&hash)
121                 }
122         }
123 }
124
125 // GetErrCache return the error of the transaction
126 func (tp *TxPool) GetErrCache(txHash *bc.Hash) error {
127         tp.mtx.Lock()
128         defer tp.mtx.Unlock()
129
130         v, ok := tp.errCache.Get(txHash)
131         if !ok {
132                 return nil
133         }
134         return v.(error)
135 }
136
137 // RemoveTransaction remove a transaction from the pool
138 func (tp *TxPool) RemoveTransaction(txHash *bc.Hash) {
139         tp.mtx.Lock()
140         defer tp.mtx.Unlock()
141
142         if txD := tp.removeTransaction(txHash); txD != nil {
143                 atomic.StoreInt64(&tp.lastUpdated, time.Now().Unix())
144                 tp.eventDispatcher.Post(TxMsgEvent{TxMsg: &TxPoolMsg{TxDesc: txD, MsgType: MsgRemoveTx}})
145                 log.WithFields(log.Fields{"module": logModule, "tx_id": txHash}).Debug("remove tx from mempool")
146         }
147 }
148
149 func (tp *TxPool) removeTransaction(txHash *bc.Hash) *TxDesc {
150         txD, ok := tp.pool[*txHash]
151         if !ok {
152                 return nil
153         }
154
155         for _, output := range txD.Tx.ResultIds {
156                 delete(tp.utxo, *output)
157         }
158         delete(tp.pool, *txHash)
159         return txD
160 }
161
162 // GetTransaction return the TxDesc by hash
163 func (tp *TxPool) GetTransaction(txHash *bc.Hash) (*TxDesc, error) {
164         tp.mtx.RLock()
165         defer tp.mtx.RUnlock()
166
167         if txD, ok := tp.pool[*txHash]; ok {
168                 return txD, nil
169         }
170         return nil, ErrTransactionNotExist
171 }
172
173 // GetTransactions return all the transactions in the pool
174 func (tp *TxPool) GetTransactions() []*TxDesc {
175         tp.mtx.RLock()
176         defer tp.mtx.RUnlock()
177
178         txDs := make([]*TxDesc, len(tp.pool))
179         i := 0
180         for _, desc := range tp.pool {
181                 txDs[i] = desc
182                 i++
183         }
184         return txDs
185 }
186
187 // IsTransactionInPool check wheather a transaction in pool or not
188 func (tp *TxPool) IsTransactionInPool(txHash *bc.Hash) bool {
189         tp.mtx.RLock()
190         defer tp.mtx.RUnlock()
191
192         _, ok := tp.pool[*txHash]
193         return ok
194 }
195
196 // IsTransactionInErrCache check wheather a transaction in errCache or not
197 func (tp *TxPool) IsTransactionInErrCache(txHash *bc.Hash) bool {
198         tp.mtx.RLock()
199         defer tp.mtx.RUnlock()
200
201         _, ok := tp.errCache.Get(txHash)
202         return ok
203 }
204
205 // HaveTransaction IsTransactionInErrCache check is  transaction in errCache or pool
206 func (tp *TxPool) HaveTransaction(txHash *bc.Hash) bool {
207         return tp.IsTransactionInPool(txHash) || tp.IsTransactionInErrCache(txHash)
208 }
209
210 func isTransactionZeroOutput(tx *types.Tx) bool {
211         for _, output := range tx.TxData.Outputs {
212                 if value := output.AssetAmount(); value.Amount == uint64(0) {
213                         return true
214                 }
215         }
216         return false
217 }
218
219 func isNoGasStatusFail(tx *types.Tx, statusFail bool) bool {
220         if !statusFail {
221                 return false
222         }
223
224         for _, input := range tx.TxData.Inputs {
225                 if *consensus.BTMAssetID == input.AssetID() {
226                         return false
227                 }
228         }
229         return true
230 }
231
232 //IsDust checks if a tx has zero output
233 func (tp *TxPool) IsDust(tx *types.Tx) bool {
234         if ok := isTransactionZeroOutput(tx); ok {
235                 return ok
236         }
237
238         for _, filter := range tp.filters {
239                 if ok := filter.IsDust(tx); ok {
240                         return ok
241                 }
242         }
243         return false
244 }
245
246 func (tp *TxPool) processTransaction(tx *types.Tx, statusFail bool, height, fee uint64) (bool, error) {
247         tp.mtx.Lock()
248         defer tp.mtx.Unlock()
249
250         txD := &TxDesc{
251                 Tx:         tx,
252                 StatusFail: statusFail,
253                 Weight:     tx.SerializedSize,
254                 Height:     height,
255                 Fee:        fee,
256         }
257         requireParents, err := tp.checkOrphanUtxos(tx)
258         if err != nil {
259                 return false, err
260         }
261
262         if len(requireParents) > 0 {
263                 return true, tp.addOrphan(txD, requireParents)
264         }
265
266         if err := tp.addTransaction(txD); err != nil {
267                 return false, err
268         }
269
270         tp.processOrphans(txD)
271         return false, nil
272 }
273
274 // ProcessTransaction is the main entry for txpool handle new tx, ignore dust tx.
275 func (tp *TxPool) ProcessTransaction(tx *types.Tx, statusFail bool, height, fee uint64) (bool, error) {
276         if tp.IsDust(tx) {
277                 log.WithFields(log.Fields{"module": logModule, "tx_id": tx.ID.String()}).Warn("dust tx")
278                 return false, nil
279         }
280
281         if isNoGasStatusFail(tx, statusFail) {
282                 log.WithFields(log.Fields{"module": logModule, "tx_id": tx.ID.String()}).Warn("drop no gas status fail tx")
283                 return false, nil
284         }
285         return tp.processTransaction(tx, statusFail, height, fee)
286 }
287
288 func (tp *TxPool) addOrphan(txD *TxDesc, requireParents []*bc.Hash) error {
289         if len(tp.orphans) >= maxOrphanNum {
290                 return ErrPoolIsFull
291         }
292
293         orphan := &orphanTx{txD, time.Now().Add(orphanTTL)}
294         tp.orphans[txD.Tx.ID] = orphan
295         for _, hash := range requireParents {
296                 if _, ok := tp.orphansByPrev[*hash]; !ok {
297                         tp.orphansByPrev[*hash] = make(map[bc.Hash]*orphanTx)
298                 }
299                 tp.orphansByPrev[*hash][txD.Tx.ID] = orphan
300         }
301         return nil
302 }
303
304 func (tp *TxPool) addTransaction(txD *TxDesc) error {
305         if len(tp.pool) >= maxNewTxNum {
306                 return ErrPoolIsFull
307         }
308
309         tx := txD.Tx
310         txD.Added = time.Now()
311         tp.pool[tx.ID] = txD
312         for _, id := range tx.ResultIds {
313                 outputEntry, err := tx.Entry(*id)
314                 if err != nil {
315                         return err
316                 }
317
318                 var assetID bc.AssetID
319                 switch output := outputEntry.(type) {
320                 case *bc.IntraChainOutput:
321                         assetID = *output.Source.Value.AssetId
322                 case *bc.VoteOutput:
323                         assetID = *output.Source.Value.AssetId
324                 default:
325                         continue
326                 }
327
328                 if !txD.StatusFail || assetID == *consensus.BTMAssetID {
329                         tp.utxo[*id] = tx
330                 }
331         }
332
333         atomic.StoreInt64(&tp.lastUpdated, time.Now().Unix())
334         tp.eventDispatcher.Post(TxMsgEvent{TxMsg: &TxPoolMsg{TxDesc: txD, MsgType: MsgNewTx}})
335         log.WithFields(log.Fields{"module": logModule, "tx_id": tx.ID.String()}).Debug("Add tx to mempool")
336         return nil
337 }
338
339 func (tp *TxPool) checkOrphanUtxos(tx *types.Tx) ([]*bc.Hash, error) {
340         view := state.NewUtxoViewpoint()
341         if err := tp.store.GetTransactionsUtxo(view, []*bc.Tx{tx.Tx}); err != nil {
342                 return nil, err
343         }
344
345         hashes := []*bc.Hash{}
346         for _, hash := range tx.SpentOutputIDs {
347                 if !view.CanSpend(&hash) && tp.utxo[hash] == nil {
348                         hashes = append(hashes, &hash)
349                 }
350         }
351         return hashes, nil
352 }
353
354 func (tp *TxPool) orphanExpireWorker() {
355         ticker := time.NewTicker(orphanExpireScanInterval)
356         defer ticker.Stop()
357
358         for now := range ticker.C {
359                 tp.expireOrphan(now)
360         }
361 }
362
363 func (tp *TxPool) processOrphans(txD *TxDesc) {
364         processOrphans := []*orphanTx{}
365         addRely := func(tx *types.Tx) {
366                 for _, outHash := range tx.ResultIds {
367                         orphans, ok := tp.orphansByPrev[*outHash]
368                         if !ok {
369                                 continue
370                         }
371
372                         for _, orphan := range orphans {
373                                 processOrphans = append(processOrphans, orphan)
374                         }
375                         delete(tp.orphansByPrev, *outHash)
376                 }
377         }
378
379         addRely(txD.Tx)
380         for ; len(processOrphans) > 0; processOrphans = processOrphans[1:] {
381                 processOrphan := processOrphans[0]
382                 requireParents, err := tp.checkOrphanUtxos(processOrphan.Tx)
383                 if err != nil {
384                         log.WithFields(log.Fields{"module": logModule, "err": err}).Error("processOrphans got unexpect error")
385                         continue
386                 }
387
388                 if len(requireParents) == 0 {
389                         addRely(processOrphan.Tx)
390                         tp.removeOrphan(&processOrphan.Tx.ID)
391                         tp.addTransaction(processOrphan.TxDesc)
392                 }
393         }
394 }
395
396 func (tp *TxPool) removeOrphan(hash *bc.Hash) {
397         orphan, ok := tp.orphans[*hash]
398         if !ok {
399                 return
400         }
401
402         for _, spend := range orphan.Tx.SpentOutputIDs {
403                 orphans, ok := tp.orphansByPrev[spend]
404                 if !ok {
405                         continue
406                 }
407
408                 if delete(orphans, *hash); len(orphans) == 0 {
409                         delete(tp.orphansByPrev, spend)
410                 }
411         }
412         delete(tp.orphans, *hash)
413 }
414
415 func (tp *TxPool) txExpireWorker() {
416         ticker := time.NewTicker(txExpireScanInterval)
417         defer ticker.Stop()
418
419         for now := range ticker.C {
420                 tp.expireTx(now)
421         }
422 }
423
424 // expireTx expires all the Txs that before the input time range
425 func (tp *TxPool) expireTx(now time.Time) {
426         tp.mtx.Lock()
427         defer tp.mtx.Unlock()
428
429         cutOff := now.Add(-txTTL)
430         for hash, txD := range tp.pool {
431                 if txD.Added.Before(cutOff) {
432                         tp.removeTransaction(&hash)
433                 }
434         }
435 }