OSDN Git Service

Merge pull request #27 from Bytom/dev_modify_code
[bytom/vapor.git] / protocol / txpool.go
1 package protocol
2
3 import (
4         "errors"
5         "sync"
6         "sync/atomic"
7         "time"
8
9         "github.com/golang/groupcache/lru"
10         log "github.com/sirupsen/logrus"
11
12         "github.com/vapor/consensus"
13         "github.com/vapor/protocol/bc"
14         "github.com/vapor/protocol/bc/types"
15         "github.com/vapor/protocol/state"
16 )
17
18 // msg type
19 const (
20         MsgNewTx = iota
21         MsgRemoveTx
22 )
23
24 var (
25         maxCachedErrTxs = 1000
26         maxMsgChSize    = 1000
27         maxNewTxNum     = 10000
28         maxOrphanNum    = 2000
29
30         orphanTTL                = 10 * time.Minute
31         orphanExpireScanInterval = 3 * time.Minute
32
33         // ErrTransactionNotExist is the pre-defined error message
34         ErrTransactionNotExist = errors.New("transaction are not existed in the mempool")
35         // ErrPoolIsFull indicates the pool is full
36         ErrPoolIsFull = errors.New("transaction pool reach the max number")
37 )
38
39 // TxDesc store tx and related info for mining strategy
40 type TxDesc struct {
41         Tx         *types.Tx
42         Added      time.Time
43         StatusFail bool
44         Height     uint64
45         Weight     uint64
46         Fee        uint64
47 }
48
49 // TxPoolMsg is use for notify pool changes
50 type TxPoolMsg struct {
51         *TxDesc
52         MsgType int
53 }
54
55 type orphanTx struct {
56         *TxDesc
57         expiration time.Time
58 }
59
60 // TxPool is use for store the unconfirmed transaction
61 type TxPool struct {
62         lastUpdated   int64
63         mtx           sync.RWMutex
64         store         Store
65         pool          map[bc.Hash]*TxDesc
66         utxo          map[bc.Hash]*types.Tx
67         orphans       map[bc.Hash]*orphanTx
68         orphansByPrev map[bc.Hash]map[bc.Hash]*orphanTx
69         claimTx       map[bc.Hash]bool
70         errCache      *lru.Cache
71         msgCh         chan *TxPoolMsg
72 }
73
74 // NewTxPool init a new TxPool
75 func NewTxPool(store Store) *TxPool {
76         tp := &TxPool{
77                 lastUpdated:   time.Now().Unix(),
78                 store:         store,
79                 pool:          make(map[bc.Hash]*TxDesc),
80                 utxo:          make(map[bc.Hash]*types.Tx),
81                 orphans:       make(map[bc.Hash]*orphanTx),
82                 orphansByPrev: make(map[bc.Hash]map[bc.Hash]*orphanTx),
83                 claimTx:       make(map[bc.Hash]bool),
84                 errCache:      lru.New(maxCachedErrTxs),
85                 msgCh:         make(chan *TxPoolMsg, maxMsgChSize),
86         }
87         go tp.orphanExpireWorker()
88         return tp
89 }
90
91 // AddErrCache add a failed transaction record to lru cache
92 func (tp *TxPool) AddErrCache(txHash *bc.Hash, err error) {
93         tp.mtx.Lock()
94         defer tp.mtx.Unlock()
95
96         tp.errCache.Add(txHash, err)
97 }
98
99 // ExpireOrphan expire all the orphans that before the input time range
100 func (tp *TxPool) ExpireOrphan(now time.Time) {
101         tp.mtx.Lock()
102         defer tp.mtx.Unlock()
103
104         for hash, orphan := range tp.orphans {
105                 if orphan.expiration.Before(now) {
106                         tp.removeOrphan(&hash)
107                 }
108         }
109 }
110
111 // GetErrCache return the error of the transaction
112 func (tp *TxPool) GetErrCache(txHash *bc.Hash) error {
113         tp.mtx.Lock()
114         defer tp.mtx.Unlock()
115
116         v, ok := tp.errCache.Get(txHash)
117         if !ok {
118                 return nil
119         }
120         return v.(error)
121 }
122
123 // GetMsgCh return a unconfirmed transaction feed channel
124 func (tp *TxPool) GetMsgCh() <-chan *TxPoolMsg {
125         return tp.msgCh
126 }
127
128 // RemoveTransaction remove a transaction from the pool
129 func (tp *TxPool) RemoveTransaction(txHash *bc.Hash) {
130         tp.mtx.Lock()
131         defer tp.mtx.Unlock()
132
133         txD, ok := tp.pool[*txHash]
134         if !ok {
135                 return
136         }
137
138         for _, output := range txD.Tx.ResultIds {
139                 delete(tp.utxo, *output)
140         }
141         tp.removeClaimTx(txD.Tx)
142         delete(tp.pool, *txHash)
143
144         atomic.StoreInt64(&tp.lastUpdated, time.Now().Unix())
145         tp.msgCh <- &TxPoolMsg{TxDesc: txD, MsgType: MsgRemoveTx}
146         log.WithField("tx_id", txHash).Debug("remove tx from mempool")
147 }
148
149 // GetTransaction return the TxDesc by hash
150 func (tp *TxPool) GetTransaction(txHash *bc.Hash) (*TxDesc, error) {
151         tp.mtx.RLock()
152         defer tp.mtx.RUnlock()
153
154         if txD, ok := tp.pool[*txHash]; ok {
155                 return txD, nil
156         }
157         return nil, ErrTransactionNotExist
158 }
159
160 // GetTransactions return all the transactions in the pool
161 func (tp *TxPool) GetTransactions() []*TxDesc {
162         tp.mtx.RLock()
163         defer tp.mtx.RUnlock()
164
165         txDs := make([]*TxDesc, len(tp.pool))
166         i := 0
167         for _, desc := range tp.pool {
168                 txDs[i] = desc
169                 i++
170         }
171         return txDs
172 }
173
174 // IsTransactionInPool check wheather a transaction in pool or not
175 func (tp *TxPool) IsTransactionInPool(txHash *bc.Hash) bool {
176         tp.mtx.RLock()
177         defer tp.mtx.RUnlock()
178
179         _, ok := tp.pool[*txHash]
180         return ok
181 }
182
183 // IsTransactionInErrCache check wheather a transaction in errCache or not
184 func (tp *TxPool) IsTransactionInErrCache(txHash *bc.Hash) bool {
185         tp.mtx.RLock()
186         defer tp.mtx.RUnlock()
187
188         _, ok := tp.errCache.Get(txHash)
189         return ok
190 }
191
192 // HaveTransaction IsTransactionInErrCache check is  transaction in errCache or pool
193 func (tp *TxPool) HaveTransaction(txHash *bc.Hash) bool {
194         return tp.IsTransactionInPool(txHash) || tp.IsTransactionInErrCache(txHash)
195 }
196
197 // ProcessTransaction is the main entry for txpool handle new tx
198 func (tp *TxPool) ProcessTransaction(tx *types.Tx, statusFail bool, height, fee uint64) (bool, error) {
199         tp.mtx.Lock()
200         defer tp.mtx.Unlock()
201
202         if tp.IsWithdrawSpent(tx) {
203                 log.WithFields(log.Fields{"module": "ProcessTransaction", "error": "pegin-already-claimed"}).Error("ProcessTransaction error")
204                 return false, errors.New("pegin-already-claimed")
205         }
206         txD := &TxDesc{
207                 Tx:         tx,
208                 StatusFail: statusFail,
209                 Weight:     tx.SerializedSize,
210                 Height:     height,
211                 Fee:        fee,
212         }
213         requireParents, err := tp.checkOrphanUtxos(tx)
214         if err != nil {
215                 return false, err
216         }
217
218         if len(requireParents) > 0 {
219                 return true, tp.addOrphan(txD, requireParents)
220         }
221         if err := tp.addTransaction(txD); err != nil {
222                 return false, err
223         }
224
225         tp.processOrphans(txD)
226         return false, nil
227 }
228
229 func (tp *TxPool) IsWithdrawSpent(tx *types.Tx) bool {
230         for key, value := range tx.Entries {
231                 switch value.(type) {
232                 case *bc.Claim:
233                         _, ok := tp.claimTx[key]
234                         return tp.store.IsWithdrawSpent(&key) || ok
235                 default:
236                         continue
237                 }
238         }
239
240         return false
241 }
242
243 func (tp *TxPool) addClaimTx(tx *types.Tx) {
244         for key, value := range tx.Entries {
245                 switch value.(type) {
246                 case *bc.Claim:
247                         tp.claimTx[key] = true
248                 default:
249                         continue
250                 }
251         }
252 }
253
254 func (tp *TxPool) removeClaimTx(tx *types.Tx) {
255         for key, value := range tx.Entries {
256                 switch value.(type) {
257                 case *bc.Claim:
258                         delete(tp.claimTx, key)
259                 default:
260                         continue
261                 }
262         }
263 }
264
265 func (tp *TxPool) addOrphan(txD *TxDesc, requireParents []*bc.Hash) error {
266         if len(tp.orphans) >= maxOrphanNum {
267                 return ErrPoolIsFull
268         }
269
270         orphan := &orphanTx{txD, time.Now().Add(orphanTTL)}
271         tp.orphans[txD.Tx.ID] = orphan
272         for _, hash := range requireParents {
273                 if _, ok := tp.orphansByPrev[*hash]; !ok {
274                         tp.orphansByPrev[*hash] = make(map[bc.Hash]*orphanTx)
275                 }
276                 tp.orphansByPrev[*hash][txD.Tx.ID] = orphan
277         }
278         return nil
279 }
280
281 func (tp *TxPool) addTransaction(txD *TxDesc) error {
282         if len(tp.pool) >= maxNewTxNum {
283                 return ErrPoolIsFull
284         }
285
286         tx := txD.Tx
287         txD.Added = time.Now()
288         tp.pool[tx.ID] = txD
289         // 增加一个claim id 到到claim pool中
290         tp.addClaimTx(tx)
291         for _, id := range tx.ResultIds {
292                 output, err := tx.Output(*id)
293                 if err != nil {
294                         // error due to it's a retirement, utxo doesn't care this output type so skip it
295                         continue
296                 }
297                 if !txD.StatusFail || *output.Source.Value.AssetId == *consensus.BTMAssetID {
298                         tp.utxo[*id] = tx
299                 }
300         }
301
302         atomic.StoreInt64(&tp.lastUpdated, time.Now().Unix())
303         tp.msgCh <- &TxPoolMsg{TxDesc: txD, MsgType: MsgNewTx}
304
305         log.WithField("tx_id", tx.ID.String()).Debug("Add tx to mempool")
306         return nil
307 }
308
309 func (tp *TxPool) checkOrphanUtxos(tx *types.Tx) ([]*bc.Hash, error) {
310         view := state.NewUtxoViewpoint()
311         if err := tp.store.GetTransactionsUtxo(view, []*bc.Tx{tx.Tx}); err != nil {
312                 return nil, err
313         }
314
315         hashes := []*bc.Hash{}
316         for _, hash := range tx.SpentOutputIDs {
317                 if !view.CanSpend(&hash) && tp.utxo[hash] == nil {
318                         hashes = append(hashes, &hash)
319                 }
320         }
321         return hashes, nil
322 }
323
324 func (tp *TxPool) orphanExpireWorker() {
325         ticker := time.NewTicker(orphanExpireScanInterval)
326         for now := range ticker.C {
327                 tp.ExpireOrphan(now)
328         }
329 }
330
331 func (tp *TxPool) processOrphans(txD *TxDesc) {
332         processOrphans := []*orphanTx{}
333         addRely := func(tx *types.Tx) {
334                 for _, outHash := range tx.ResultIds {
335                         orphans, ok := tp.orphansByPrev[*outHash]
336                         if !ok {
337                                 continue
338                         }
339
340                         for _, orphan := range orphans {
341                                 processOrphans = append(processOrphans, orphan)
342                         }
343                         delete(tp.orphansByPrev, *outHash)
344                 }
345         }
346
347         addRely(txD.Tx)
348         for ; len(processOrphans) > 0; processOrphans = processOrphans[1:] {
349                 processOrphan := processOrphans[0]
350                 requireParents, err := tp.checkOrphanUtxos(processOrphan.Tx)
351                 if err != nil {
352                         log.WithField("err", err).Error("processOrphans got unexpect error")
353                         continue
354                 }
355
356                 if len(requireParents) == 0 {
357                         addRely(processOrphan.Tx)
358                         tp.removeOrphan(&processOrphan.Tx.ID)
359                         tp.addTransaction(processOrphan.TxDesc)
360                 }
361         }
362 }
363
364 func (tp *TxPool) removeOrphan(hash *bc.Hash) {
365         orphan, ok := tp.orphans[*hash]
366         if !ok {
367                 return
368         }
369
370         for _, spend := range orphan.Tx.SpentOutputIDs {
371                 orphans, ok := tp.orphansByPrev[spend]
372                 if !ok {
373                         continue
374                 }
375
376                 if delete(orphans, *hash); len(orphans) == 0 {
377                         delete(tp.orphansByPrev, spend)
378                 }
379         }
380         delete(tp.orphans, *hash)
381 }