OSDN Git Service

Merge pull request #21 from Bytom/dev_modify_pegin_address_to_contract
[bytom/vapor.git] / protocol / txpool.go
1 package protocol
2
3 import (
4         "errors"
5         "sync"
6         "sync/atomic"
7         "time"
8
9         "github.com/golang/groupcache/lru"
10         log "github.com/sirupsen/logrus"
11
12         "github.com/vapor/consensus"
13         "github.com/vapor/protocol/bc"
14         "github.com/vapor/protocol/bc/types"
15         "github.com/vapor/protocol/state"
16 )
17
18 // msg type
19 const (
20         MsgNewTx = iota
21         MsgRemoveTx
22 )
23
24 var (
25         maxCachedErrTxs = 1000
26         maxMsgChSize    = 1000
27         maxNewTxNum     = 10000
28         maxOrphanNum    = 2000
29
30         orphanTTL                = 10 * time.Minute
31         orphanExpireScanInterval = 3 * time.Minute
32
33         // ErrTransactionNotExist is the pre-defined error message
34         ErrTransactionNotExist = errors.New("transaction are not existed in the mempool")
35         // ErrPoolIsFull indicates the pool is full
36         ErrPoolIsFull = errors.New("transaction pool reach the max number")
37 )
38
39 // TxDesc store tx and related info for mining strategy
40 type TxDesc struct {
41         Tx         *types.Tx
42         Added      time.Time
43         StatusFail bool
44         Height     uint64
45         Weight     uint64
46         Fee        uint64
47 }
48
49 // TxPoolMsg is use for notify pool changes
50 type TxPoolMsg struct {
51         *TxDesc
52         MsgType int
53 }
54
55 type orphanTx struct {
56         *TxDesc
57         expiration time.Time
58 }
59
60 // TxPool is use for store the unconfirmed transaction
61 type TxPool struct {
62         lastUpdated   int64
63         mtx           sync.RWMutex
64         store         Store
65         pool          map[bc.Hash]*TxDesc
66         utxo          map[bc.Hash]*types.Tx
67         orphans       map[bc.Hash]*orphanTx
68         orphansByPrev map[bc.Hash]map[bc.Hash]*orphanTx
69         errCache      *lru.Cache
70         msgCh         chan *TxPoolMsg
71 }
72
73 // NewTxPool init a new TxPool
74 func NewTxPool(store Store) *TxPool {
75         tp := &TxPool{
76                 lastUpdated:   time.Now().Unix(),
77                 store:         store,
78                 pool:          make(map[bc.Hash]*TxDesc),
79                 utxo:          make(map[bc.Hash]*types.Tx),
80                 orphans:       make(map[bc.Hash]*orphanTx),
81                 orphansByPrev: make(map[bc.Hash]map[bc.Hash]*orphanTx),
82                 errCache:      lru.New(maxCachedErrTxs),
83                 msgCh:         make(chan *TxPoolMsg, maxMsgChSize),
84         }
85         go tp.orphanExpireWorker()
86         return tp
87 }
88
89 // AddErrCache add a failed transaction record to lru cache
90 func (tp *TxPool) AddErrCache(txHash *bc.Hash, err error) {
91         tp.mtx.Lock()
92         defer tp.mtx.Unlock()
93
94         tp.errCache.Add(txHash, err)
95 }
96
97 // ExpireOrphan expire all the orphans that before the input time range
98 func (tp *TxPool) ExpireOrphan(now time.Time) {
99         tp.mtx.Lock()
100         defer tp.mtx.Unlock()
101
102         for hash, orphan := range tp.orphans {
103                 if orphan.expiration.Before(now) {
104                         tp.removeOrphan(&hash)
105                 }
106         }
107 }
108
109 // GetErrCache return the error of the transaction
110 func (tp *TxPool) GetErrCache(txHash *bc.Hash) error {
111         tp.mtx.Lock()
112         defer tp.mtx.Unlock()
113
114         v, ok := tp.errCache.Get(txHash)
115         if !ok {
116                 return nil
117         }
118         return v.(error)
119 }
120
121 // GetMsgCh return a unconfirmed transaction feed channel
122 func (tp *TxPool) GetMsgCh() <-chan *TxPoolMsg {
123         return tp.msgCh
124 }
125
126 // RemoveTransaction remove a transaction from the pool
127 func (tp *TxPool) RemoveTransaction(txHash *bc.Hash) {
128         tp.mtx.Lock()
129         defer tp.mtx.Unlock()
130
131         txD, ok := tp.pool[*txHash]
132         if !ok {
133                 return
134         }
135
136         for _, output := range txD.Tx.ResultIds {
137                 delete(tp.utxo, *output)
138         }
139         delete(tp.pool, *txHash)
140
141         atomic.StoreInt64(&tp.lastUpdated, time.Now().Unix())
142         tp.msgCh <- &TxPoolMsg{TxDesc: txD, MsgType: MsgRemoveTx}
143         log.WithField("tx_id", txHash).Debug("remove tx from mempool")
144 }
145
146 // GetTransaction return the TxDesc by hash
147 func (tp *TxPool) GetTransaction(txHash *bc.Hash) (*TxDesc, error) {
148         tp.mtx.RLock()
149         defer tp.mtx.RUnlock()
150
151         if txD, ok := tp.pool[*txHash]; ok {
152                 return txD, nil
153         }
154         return nil, ErrTransactionNotExist
155 }
156
157 // GetTransactions return all the transactions in the pool
158 func (tp *TxPool) GetTransactions() []*TxDesc {
159         tp.mtx.RLock()
160         defer tp.mtx.RUnlock()
161
162         txDs := make([]*TxDesc, len(tp.pool))
163         i := 0
164         for _, desc := range tp.pool {
165                 txDs[i] = desc
166                 i++
167         }
168         return txDs
169 }
170
171 // IsTransactionInPool check wheather a transaction in pool or not
172 func (tp *TxPool) IsTransactionInPool(txHash *bc.Hash) bool {
173         tp.mtx.RLock()
174         defer tp.mtx.RUnlock()
175
176         _, ok := tp.pool[*txHash]
177         return ok
178 }
179
180 // IsTransactionInErrCache check wheather a transaction in errCache or not
181 func (tp *TxPool) IsTransactionInErrCache(txHash *bc.Hash) bool {
182         tp.mtx.RLock()
183         defer tp.mtx.RUnlock()
184
185         _, ok := tp.errCache.Get(txHash)
186         return ok
187 }
188
189 // HaveTransaction IsTransactionInErrCache check is  transaction in errCache or pool
190 func (tp *TxPool) HaveTransaction(txHash *bc.Hash) bool {
191         return tp.IsTransactionInPool(txHash) || tp.IsTransactionInErrCache(txHash)
192 }
193
194 // ProcessTransaction is the main entry for txpool handle new tx
195 func (tp *TxPool) ProcessTransaction(tx *types.Tx, statusFail bool, height, fee uint64) (bool, error) {
196         tp.mtx.Lock()
197         defer tp.mtx.Unlock()
198
199         if tp.isWithdrawSpent(tx) {
200                 log.WithFields(log.Fields{"module": "ProcessTransaction", "error": "pegin-already-claimed"}).Error("ProcessTransaction error")
201                 return false, errors.New("pegin-already-claimed")
202         }
203
204         txD := &TxDesc{
205                 Tx:         tx,
206                 StatusFail: statusFail,
207                 Weight:     tx.SerializedSize,
208                 Height:     height,
209                 Fee:        fee,
210         }
211         requireParents, err := tp.checkOrphanUtxos(tx)
212         if err != nil {
213                 return false, err
214         }
215
216         if len(requireParents) > 0 {
217                 return true, tp.addOrphan(txD, requireParents)
218         }
219         if err := tp.addTransaction(txD); err != nil {
220                 return false, err
221         }
222
223         tp.processOrphans(txD)
224         return false, nil
225 }
226
227 func (tp *TxPool) isWithdrawSpent(tx *types.Tx) bool {
228         for key, value := range tx.Entries {
229                 switch value.(type) {
230                 case *bc.Claim:
231                         return tp.store.IsWithdrawSpent(&key)
232                 default:
233                         continue
234                 }
235         }
236
237         return false
238 }
239
240 func (tp *TxPool) addOrphan(txD *TxDesc, requireParents []*bc.Hash) error {
241         if len(tp.orphans) >= maxOrphanNum {
242                 return ErrPoolIsFull
243         }
244
245         orphan := &orphanTx{txD, time.Now().Add(orphanTTL)}
246         tp.orphans[txD.Tx.ID] = orphan
247         for _, hash := range requireParents {
248                 if _, ok := tp.orphansByPrev[*hash]; !ok {
249                         tp.orphansByPrev[*hash] = make(map[bc.Hash]*orphanTx)
250                 }
251                 tp.orphansByPrev[*hash][txD.Tx.ID] = orphan
252         }
253         return nil
254 }
255
256 func (tp *TxPool) addTransaction(txD *TxDesc) error {
257         if len(tp.pool) >= maxNewTxNum {
258                 return ErrPoolIsFull
259         }
260
261         tx := txD.Tx
262         txD.Added = time.Now()
263         tp.pool[tx.ID] = txD
264         for _, id := range tx.ResultIds {
265                 output, err := tx.Output(*id)
266                 if err != nil {
267                         // error due to it's a retirement, utxo doesn't care this output type so skip it
268                         continue
269                 }
270                 if !txD.StatusFail || *output.Source.Value.AssetId == *consensus.BTMAssetID {
271                         tp.utxo[*id] = tx
272                 }
273         }
274
275         atomic.StoreInt64(&tp.lastUpdated, time.Now().Unix())
276         tp.msgCh <- &TxPoolMsg{TxDesc: txD, MsgType: MsgNewTx}
277         log.WithField("tx_id", tx.ID.String()).Debug("Add tx to mempool")
278         return nil
279 }
280
281 func (tp *TxPool) checkOrphanUtxos(tx *types.Tx) ([]*bc.Hash, error) {
282         view := state.NewUtxoViewpoint()
283         if err := tp.store.GetTransactionsUtxo(view, []*bc.Tx{tx.Tx}); err != nil {
284                 return nil, err
285         }
286
287         hashes := []*bc.Hash{}
288         for _, hash := range tx.SpentOutputIDs {
289                 if !view.CanSpend(&hash) && tp.utxo[hash] == nil {
290                         hashes = append(hashes, &hash)
291                 }
292         }
293         return hashes, nil
294 }
295
296 func (tp *TxPool) orphanExpireWorker() {
297         ticker := time.NewTicker(orphanExpireScanInterval)
298         for now := range ticker.C {
299                 tp.ExpireOrphan(now)
300         }
301 }
302
303 func (tp *TxPool) processOrphans(txD *TxDesc) {
304         processOrphans := []*orphanTx{}
305         addRely := func(tx *types.Tx) {
306                 for _, outHash := range tx.ResultIds {
307                         orphans, ok := tp.orphansByPrev[*outHash]
308                         if !ok {
309                                 continue
310                         }
311
312                         for _, orphan := range orphans {
313                                 processOrphans = append(processOrphans, orphan)
314                         }
315                         delete(tp.orphansByPrev, *outHash)
316                 }
317         }
318
319         addRely(txD.Tx)
320         for ; len(processOrphans) > 0; processOrphans = processOrphans[1:] {
321                 processOrphan := processOrphans[0]
322                 requireParents, err := tp.checkOrphanUtxos(processOrphan.Tx)
323                 if err != nil {
324                         log.WithField("err", err).Error("processOrphans got unexpect error")
325                         continue
326                 }
327
328                 if len(requireParents) == 0 {
329                         addRely(processOrphan.Tx)
330                         tp.removeOrphan(&processOrphan.Tx.ID)
331                         tp.addTransaction(processOrphan.TxDesc)
332                 }
333         }
334 }
335
336 func (tp *TxPool) removeOrphan(hash *bc.Hash) {
337         orphan, ok := tp.orphans[*hash]
338         if !ok {
339                 return
340         }
341
342         for _, spend := range orphan.Tx.SpentOutputIDs {
343                 orphans, ok := tp.orphansByPrev[spend]
344                 if !ok {
345                         continue
346                 }
347
348                 if delete(orphans, *hash); len(orphans) == 0 {
349                         delete(tp.orphansByPrev, spend)
350                 }
351         }
352         delete(tp.orphans, *hash)
353 }