OSDN Git Service

fix log (#388)
[bytom/vapor.git] / protocol / txpool.go
1 package protocol
2
3 import (
4         "errors"
5         "sync"
6         "sync/atomic"
7         "time"
8
9         "github.com/golang/groupcache/lru"
10         log "github.com/sirupsen/logrus"
11
12         "github.com/vapor/consensus"
13         "github.com/vapor/event"
14         "github.com/vapor/protocol/bc"
15         "github.com/vapor/protocol/bc/types"
16         "github.com/vapor/protocol/state"
17 )
18
19 // msg type
20 const (
21         MsgNewTx = iota
22         MsgRemoveTx
23         logModule = "protocol"
24 )
25
26 var (
27         maxCachedErrTxs = 1000
28         maxMsgChSize    = 1000
29         maxNewTxNum     = 65536
30         maxOrphanNum    = 32768
31
32         orphanTTL                = 60 * time.Second
33         orphanExpireScanInterval = 30 * time.Second
34
35         txTTL                = 1 * time.Hour
36         txExpireScanInterval = 20 * time.Minute
37
38         // ErrTransactionNotExist is the pre-defined error message
39         ErrTransactionNotExist = errors.New("transaction are not existed in the mempool")
40         // ErrPoolIsFull indicates the pool is full
41         ErrPoolIsFull = errors.New("transaction pool reach the max number")
42         // ErrDustTx indicates transaction is dust tx
43         ErrDustTx = errors.New("transaction is dust tx")
44 )
45
46 type TxMsgEvent struct{ TxMsg *TxPoolMsg }
47
48 // TxDesc store tx and related info for mining strategy
49 type TxDesc struct {
50         Tx         *types.Tx `json:"transaction"`
51         Added      time.Time `json:"-"`
52         StatusFail bool      `json:"status_fail"`
53         Height     uint64    `json:"-"`
54         Weight     uint64    `json:"-"`
55         Fee        uint64    `json:"-"`
56 }
57
58 // TxPoolMsg is use for notify pool changes
59 type TxPoolMsg struct {
60         *TxDesc
61         MsgType int
62 }
63
64 type orphanTx struct {
65         *TxDesc
66         expiration time.Time
67 }
68
69 // TxPool is use for store the unconfirmed transaction
70 type TxPool struct {
71         lastUpdated     int64
72         mtx             sync.RWMutex
73         store           Store
74         pool            map[bc.Hash]*TxDesc
75         utxo            map[bc.Hash]*types.Tx
76         orphans         map[bc.Hash]*orphanTx
77         orphansByPrev   map[bc.Hash]map[bc.Hash]*orphanTx
78         errCache        *lru.Cache
79         eventDispatcher *event.Dispatcher
80 }
81
82 // NewTxPool init a new TxPool
83 func NewTxPool(store Store, dispatcher *event.Dispatcher) *TxPool {
84         tp := &TxPool{
85                 lastUpdated:     time.Now().Unix(),
86                 store:           store,
87                 pool:            make(map[bc.Hash]*TxDesc),
88                 utxo:            make(map[bc.Hash]*types.Tx),
89                 orphans:         make(map[bc.Hash]*orphanTx),
90                 orphansByPrev:   make(map[bc.Hash]map[bc.Hash]*orphanTx),
91                 errCache:        lru.New(maxCachedErrTxs),
92                 eventDispatcher: dispatcher,
93         }
94         go tp.orphanExpireWorker()
95         go tp.txExpireWorker()
96         return tp
97 }
98
99 // AddErrCache add a failed transaction record to lru cache
100 func (tp *TxPool) AddErrCache(txHash *bc.Hash, err error) {
101         tp.mtx.Lock()
102         defer tp.mtx.Unlock()
103
104         tp.errCache.Add(txHash, err)
105 }
106
107 // expireOrphan expire all the orphans that before the input time range
108 func (tp *TxPool) expireOrphan(now time.Time) {
109         tp.mtx.Lock()
110         defer tp.mtx.Unlock()
111
112         for hash, orphan := range tp.orphans {
113                 if orphan.expiration.Before(now) {
114                         tp.removeOrphan(&hash)
115                 }
116         }
117 }
118
119 // GetErrCache return the error of the transaction
120 func (tp *TxPool) GetErrCache(txHash *bc.Hash) error {
121         tp.mtx.Lock()
122         defer tp.mtx.Unlock()
123
124         v, ok := tp.errCache.Get(txHash)
125         if !ok {
126                 return nil
127         }
128         return v.(error)
129 }
130
131 // RemoveTransaction remove a transaction from the pool
132 func (tp *TxPool) RemoveTransaction(txHash *bc.Hash) {
133         tp.mtx.Lock()
134         defer tp.mtx.Unlock()
135
136         if txD := tp.removeTransaction(txHash); txD != nil {
137                 atomic.StoreInt64(&tp.lastUpdated, time.Now().Unix())
138                 tp.eventDispatcher.Post(TxMsgEvent{TxMsg: &TxPoolMsg{TxDesc: txD, MsgType: MsgRemoveTx}})
139                 log.WithFields(log.Fields{"module": logModule, "tx_id": txHash}).Debug("remove tx from mempool")
140         }
141 }
142
143 func (tp *TxPool) removeTransaction(txHash *bc.Hash) *TxDesc {
144         txD, ok := tp.pool[*txHash]
145         if !ok {
146                 return nil
147         }
148
149         for _, output := range txD.Tx.ResultIds {
150                 delete(tp.utxo, *output)
151         }
152         delete(tp.pool, *txHash)
153         return txD
154 }
155
156 // GetTransaction return the TxDesc by hash
157 func (tp *TxPool) GetTransaction(txHash *bc.Hash) (*TxDesc, error) {
158         tp.mtx.RLock()
159         defer tp.mtx.RUnlock()
160
161         if txD, ok := tp.pool[*txHash]; ok {
162                 return txD, nil
163         }
164         return nil, ErrTransactionNotExist
165 }
166
167 // GetTransactions return all the transactions in the pool
168 func (tp *TxPool) GetTransactions() []*TxDesc {
169         tp.mtx.RLock()
170         defer tp.mtx.RUnlock()
171
172         txDs := make([]*TxDesc, len(tp.pool))
173         i := 0
174         for _, desc := range tp.pool {
175                 txDs[i] = desc
176                 i++
177         }
178         return txDs
179 }
180
181 // IsTransactionInPool check wheather a transaction in pool or not
182 func (tp *TxPool) IsTransactionInPool(txHash *bc.Hash) bool {
183         tp.mtx.RLock()
184         defer tp.mtx.RUnlock()
185
186         _, ok := tp.pool[*txHash]
187         return ok
188 }
189
190 // IsTransactionInErrCache check wheather a transaction in errCache or not
191 func (tp *TxPool) IsTransactionInErrCache(txHash *bc.Hash) bool {
192         tp.mtx.RLock()
193         defer tp.mtx.RUnlock()
194
195         _, ok := tp.errCache.Get(txHash)
196         return ok
197 }
198
199 // HaveTransaction IsTransactionInErrCache check is  transaction in errCache or pool
200 func (tp *TxPool) HaveTransaction(txHash *bc.Hash) bool {
201         return tp.IsTransactionInPool(txHash) || tp.IsTransactionInErrCache(txHash)
202 }
203
204 func isTransactionZeroOutput(tx *types.Tx) bool {
205         for _, output := range tx.TxData.Outputs {
206                 if value := output.AssetAmount(); value.Amount == uint64(0) {
207                         return true
208                 }
209         }
210         return false
211 }
212
213 //IsDust checks if a tx has zero output
214 func (tp *TxPool) IsDust(tx *types.Tx) bool {
215         return isTransactionZeroOutput(tx)
216 }
217
218 func (tp *TxPool) processTransaction(tx *types.Tx, statusFail bool, height, fee uint64) (bool, error) {
219         tp.mtx.Lock()
220         defer tp.mtx.Unlock()
221
222         txD := &TxDesc{
223                 Tx:         tx,
224                 StatusFail: statusFail,
225                 Weight:     tx.SerializedSize,
226                 Height:     height,
227                 Fee:        fee,
228         }
229         requireParents, err := tp.checkOrphanUtxos(tx)
230         if err != nil {
231                 return false, err
232         }
233
234         if len(requireParents) > 0 {
235                 return true, tp.addOrphan(txD, requireParents)
236         }
237
238         if err := tp.addTransaction(txD); err != nil {
239                 return false, err
240         }
241
242         tp.processOrphans(txD)
243         return false, nil
244 }
245
246 // ProcessTransaction is the main entry for txpool handle new tx, ignore dust tx.
247 func (tp *TxPool) ProcessTransaction(tx *types.Tx, statusFail bool, height, fee uint64) (bool, error) {
248         if tp.IsDust(tx) {
249                 log.WithFields(log.Fields{"module": logModule, "tx_id": tx.ID.String()}).Warn("dust tx")
250                 return false, nil
251         }
252         return tp.processTransaction(tx, statusFail, height, fee)
253 }
254
255 func (tp *TxPool) addOrphan(txD *TxDesc, requireParents []*bc.Hash) error {
256         if len(tp.orphans) >= maxOrphanNum {
257                 return ErrPoolIsFull
258         }
259
260         orphan := &orphanTx{txD, time.Now().Add(orphanTTL)}
261         tp.orphans[txD.Tx.ID] = orphan
262         for _, hash := range requireParents {
263                 if _, ok := tp.orphansByPrev[*hash]; !ok {
264                         tp.orphansByPrev[*hash] = make(map[bc.Hash]*orphanTx)
265                 }
266                 tp.orphansByPrev[*hash][txD.Tx.ID] = orphan
267         }
268         return nil
269 }
270
271 func (tp *TxPool) addTransaction(txD *TxDesc) error {
272         if len(tp.pool) >= maxNewTxNum {
273                 return ErrPoolIsFull
274         }
275
276         tx := txD.Tx
277         txD.Added = time.Now()
278         tp.pool[tx.ID] = txD
279         for _, id := range tx.ResultIds {
280                 outputEntry, err := tx.Entry(*id)
281                 if err != nil {
282                         return err
283                 }
284
285                 var assetID bc.AssetID
286                 switch output := outputEntry.(type) {
287                 case *bc.IntraChainOutput:
288                         assetID = *output.Source.Value.AssetId
289                 case *bc.VoteOutput:
290                         assetID = *output.Source.Value.AssetId
291                 default:
292                         continue
293                 }
294
295                 if !txD.StatusFail || assetID == *consensus.BTMAssetID {
296                         tp.utxo[*id] = tx
297                 }
298         }
299
300         atomic.StoreInt64(&tp.lastUpdated, time.Now().Unix())
301         tp.eventDispatcher.Post(TxMsgEvent{TxMsg: &TxPoolMsg{TxDesc: txD, MsgType: MsgNewTx}})
302         log.WithFields(log.Fields{"module": logModule, "tx_id": tx.ID.String()}).Debug("Add tx to mempool")
303         return nil
304 }
305
306 func (tp *TxPool) checkOrphanUtxos(tx *types.Tx) ([]*bc.Hash, error) {
307         view := state.NewUtxoViewpoint()
308         if err := tp.store.GetTransactionsUtxo(view, []*bc.Tx{tx.Tx}); err != nil {
309                 return nil, err
310         }
311
312         hashes := []*bc.Hash{}
313         for _, hash := range tx.SpentOutputIDs {
314                 if !view.CanSpend(&hash) && tp.utxo[hash] == nil {
315                         hashes = append(hashes, &hash)
316                 }
317         }
318         return hashes, nil
319 }
320
321 func (tp *TxPool) orphanExpireWorker() {
322         ticker := time.NewTicker(orphanExpireScanInterval)
323         defer ticker.Stop()
324
325         for now := range ticker.C {
326                 tp.expireOrphan(now)
327         }
328 }
329
330 func (tp *TxPool) processOrphans(txD *TxDesc) {
331         processOrphans := []*orphanTx{}
332         addRely := func(tx *types.Tx) {
333                 for _, outHash := range tx.ResultIds {
334                         orphans, ok := tp.orphansByPrev[*outHash]
335                         if !ok {
336                                 continue
337                         }
338
339                         for _, orphan := range orphans {
340                                 processOrphans = append(processOrphans, orphan)
341                         }
342                         delete(tp.orphansByPrev, *outHash)
343                 }
344         }
345
346         addRely(txD.Tx)
347         for ; len(processOrphans) > 0; processOrphans = processOrphans[1:] {
348                 processOrphan := processOrphans[0]
349                 requireParents, err := tp.checkOrphanUtxos(processOrphan.Tx)
350                 if err != nil {
351                         log.WithFields(log.Fields{"module": logModule, "err": err}).Error("processOrphans got unexpect error")
352                         continue
353                 }
354
355                 if len(requireParents) == 0 {
356                         addRely(processOrphan.Tx)
357                         tp.removeOrphan(&processOrphan.Tx.ID)
358                         tp.addTransaction(processOrphan.TxDesc)
359                 }
360         }
361 }
362
363 func (tp *TxPool) removeOrphan(hash *bc.Hash) {
364         orphan, ok := tp.orphans[*hash]
365         if !ok {
366                 return
367         }
368
369         for _, spend := range orphan.Tx.SpentOutputIDs {
370                 orphans, ok := tp.orphansByPrev[spend]
371                 if !ok {
372                         continue
373                 }
374
375                 if delete(orphans, *hash); len(orphans) == 0 {
376                         delete(tp.orphansByPrev, spend)
377                 }
378         }
379         delete(tp.orphans, *hash)
380 }
381
382 func (tp *TxPool) txExpireWorker() {
383         ticker := time.NewTicker(txExpireScanInterval)
384         defer ticker.Stop()
385
386         for now := range ticker.C {
387                 tp.expireTx(now)
388         }
389 }
390
391 // expireTx expires all the Txs that before the input time range
392 func (tp *TxPool) expireTx(now time.Time) {
393         tp.mtx.Lock()
394         defer tp.mtx.Unlock()
395
396         cutOff := now.Add(-txTTL)
397         for hash, txD := range tp.pool {
398                 if txD.Added.Before(cutOff) {
399                         tp.removeTransaction(&hash)
400                 }
401         }
402 }