OSDN Git Service

Test crossin (#213)
[bytom/vapor.git] / protocol / txpool.go
1 package protocol
2
3 import (
4         "errors"
5         "sync"
6         "sync/atomic"
7         "time"
8
9         "github.com/golang/groupcache/lru"
10         log "github.com/sirupsen/logrus"
11
12         "github.com/vapor/consensus"
13         "github.com/vapor/event"
14         "github.com/vapor/protocol/bc"
15         "github.com/vapor/protocol/bc/types"
16         "github.com/vapor/protocol/state"
17 )
18
19 // msg type
20 const (
21         MsgNewTx = iota
22         MsgRemoveTx
23         logModule = "protocol"
24 )
25
26 var (
27         maxCachedErrTxs = 1000
28         maxMsgChSize    = 1000
29         maxNewTxNum     = 10000
30         maxOrphanNum    = 2000
31
32         orphanTTL                = 10 * time.Minute
33         orphanExpireScanInterval = 3 * time.Minute
34
35         // ErrTransactionNotExist is the pre-defined error message
36         ErrTransactionNotExist = errors.New("transaction are not existed in the mempool")
37         // ErrPoolIsFull indicates the pool is full
38         ErrPoolIsFull = errors.New("transaction pool reach the max number")
39         // ErrDustTx indicates transaction is dust tx
40         ErrDustTx = errors.New("transaction is dust tx")
41 )
42
43 type TxMsgEvent struct{ TxMsg *TxPoolMsg }
44
45 // TxDesc store tx and related info for mining strategy
46 type TxDesc struct {
47         Tx         *types.Tx `json:"transaction"`
48         Added      time.Time `json:"-"`
49         StatusFail bool      `json:"status_fail"`
50         Height     uint64    `json:"-"`
51         Weight     uint64    `json:"-"`
52         Fee        uint64    `json:"-"`
53 }
54
55 // TxPoolMsg is use for notify pool changes
56 type TxPoolMsg struct {
57         *TxDesc
58         MsgType int
59 }
60
61 type orphanTx struct {
62         *TxDesc
63         expiration time.Time
64 }
65
66 // TxPool is use for store the unconfirmed transaction
67 type TxPool struct {
68         lastUpdated     int64
69         mtx             sync.RWMutex
70         store           Store
71         pool            map[bc.Hash]*TxDesc
72         utxo            map[bc.Hash]*types.Tx
73         orphans         map[bc.Hash]*orphanTx
74         orphansByPrev   map[bc.Hash]map[bc.Hash]*orphanTx
75         errCache        *lru.Cache
76         eventDispatcher *event.Dispatcher
77 }
78
79 // NewTxPool init a new TxPool
80 func NewTxPool(store Store, dispatcher *event.Dispatcher) *TxPool {
81         tp := &TxPool{
82                 lastUpdated:     time.Now().Unix(),
83                 store:           store,
84                 pool:            make(map[bc.Hash]*TxDesc),
85                 utxo:            make(map[bc.Hash]*types.Tx),
86                 orphans:         make(map[bc.Hash]*orphanTx),
87                 orphansByPrev:   make(map[bc.Hash]map[bc.Hash]*orphanTx),
88                 errCache:        lru.New(maxCachedErrTxs),
89                 eventDispatcher: dispatcher,
90         }
91         go tp.orphanExpireWorker()
92         return tp
93 }
94
95 // AddErrCache add a failed transaction record to lru cache
96 func (tp *TxPool) AddErrCache(txHash *bc.Hash, err error) {
97         tp.mtx.Lock()
98         defer tp.mtx.Unlock()
99
100         tp.errCache.Add(txHash, err)
101 }
102
103 // ExpireOrphan expire all the orphans that before the input time range
104 func (tp *TxPool) ExpireOrphan(now time.Time) {
105         tp.mtx.Lock()
106         defer tp.mtx.Unlock()
107
108         for hash, orphan := range tp.orphans {
109                 if orphan.expiration.Before(now) {
110                         tp.removeOrphan(&hash)
111                 }
112         }
113 }
114
115 // GetErrCache return the error of the transaction
116 func (tp *TxPool) GetErrCache(txHash *bc.Hash) error {
117         tp.mtx.Lock()
118         defer tp.mtx.Unlock()
119
120         v, ok := tp.errCache.Get(txHash)
121         if !ok {
122                 return nil
123         }
124         return v.(error)
125 }
126
127 // RemoveTransaction remove a transaction from the pool
128 func (tp *TxPool) RemoveTransaction(txHash *bc.Hash) {
129         tp.mtx.Lock()
130         defer tp.mtx.Unlock()
131
132         txD, ok := tp.pool[*txHash]
133         if !ok {
134                 return
135         }
136
137         for _, output := range txD.Tx.ResultIds {
138                 delete(tp.utxo, *output)
139         }
140         delete(tp.pool, *txHash)
141
142         atomic.StoreInt64(&tp.lastUpdated, time.Now().Unix())
143         tp.eventDispatcher.Post(TxMsgEvent{TxMsg: &TxPoolMsg{TxDesc: txD, MsgType: MsgRemoveTx}})
144         log.WithFields(log.Fields{"module": logModule, "tx_id": txHash}).Debug("remove tx from mempool")
145 }
146
147 // GetTransaction return the TxDesc by hash
148 func (tp *TxPool) GetTransaction(txHash *bc.Hash) (*TxDesc, error) {
149         tp.mtx.RLock()
150         defer tp.mtx.RUnlock()
151
152         if txD, ok := tp.pool[*txHash]; ok {
153                 return txD, nil
154         }
155         return nil, ErrTransactionNotExist
156 }
157
158 // GetTransactions return all the transactions in the pool
159 func (tp *TxPool) GetTransactions() []*TxDesc {
160         tp.mtx.RLock()
161         defer tp.mtx.RUnlock()
162
163         txDs := make([]*TxDesc, len(tp.pool))
164         i := 0
165         for _, desc := range tp.pool {
166                 txDs[i] = desc
167                 i++
168         }
169         return txDs
170 }
171
172 // IsTransactionInPool check wheather a transaction in pool or not
173 func (tp *TxPool) IsTransactionInPool(txHash *bc.Hash) bool {
174         tp.mtx.RLock()
175         defer tp.mtx.RUnlock()
176
177         _, ok := tp.pool[*txHash]
178         return ok
179 }
180
181 // IsTransactionInErrCache check wheather a transaction in errCache or not
182 func (tp *TxPool) IsTransactionInErrCache(txHash *bc.Hash) bool {
183         tp.mtx.RLock()
184         defer tp.mtx.RUnlock()
185
186         _, ok := tp.errCache.Get(txHash)
187         return ok
188 }
189
190 // HaveTransaction IsTransactionInErrCache check is  transaction in errCache or pool
191 func (tp *TxPool) HaveTransaction(txHash *bc.Hash) bool {
192         return tp.IsTransactionInPool(txHash) || tp.IsTransactionInErrCache(txHash)
193 }
194
195 func isTransactionNoBtmInput(tx *types.Tx) bool {
196         for _, input := range tx.TxData.Inputs {
197                 switch input.InputType() {
198                 case types.CrossChainInputType:
199                         return false
200                 }
201                 if input.AssetID() == *consensus.BTMAssetID {
202                         return false
203                 }
204         }
205
206         return true
207 }
208
209 func isTransactionZeroOutput(tx *types.Tx) bool {
210         for _, output := range tx.TxData.Outputs {
211                 value := output.AssetAmount()
212                 if value.Amount == uint64(0) {
213                         return true
214                 }
215         }
216         return false
217 }
218
219 func (tp *TxPool) IsDust(tx *types.Tx) bool {
220         return isTransactionNoBtmInput(tx) || isTransactionZeroOutput(tx)
221 }
222
223 func (tp *TxPool) processTransaction(tx *types.Tx, statusFail bool, height, fee uint64) (bool, error) {
224         tp.mtx.Lock()
225         defer tp.mtx.Unlock()
226
227         txD := &TxDesc{
228                 Tx:         tx,
229                 StatusFail: statusFail,
230                 Weight:     tx.SerializedSize,
231                 Height:     height,
232                 Fee:        fee,
233         }
234         requireParents, err := tp.checkOrphanUtxos(tx)
235         if err != nil {
236                 return false, err
237         }
238
239         if len(requireParents) > 0 {
240                 return true, tp.addOrphan(txD, requireParents)
241         }
242
243         if err := tp.addTransaction(txD); err != nil {
244                 return false, err
245         }
246
247         tp.processOrphans(txD)
248         return false, nil
249 }
250
251 // ProcessTransaction is the main entry for txpool handle new tx, ignore dust tx.
252 func (tp *TxPool) ProcessTransaction(tx *types.Tx, statusFail bool, height, fee uint64) (bool, error) {
253         if tp.IsDust(tx) {
254                 log.WithFields(log.Fields{"module": logModule, "tx_id": tx.ID.String()}).Warn("dust tx")
255                 return false, nil
256         }
257         return tp.processTransaction(tx, statusFail, height, fee)
258 }
259
260 func (tp *TxPool) addOrphan(txD *TxDesc, requireParents []*bc.Hash) error {
261         if len(tp.orphans) >= maxOrphanNum {
262                 return ErrPoolIsFull
263         }
264
265         orphan := &orphanTx{txD, time.Now().Add(orphanTTL)}
266         tp.orphans[txD.Tx.ID] = orphan
267         for _, hash := range requireParents {
268                 if _, ok := tp.orphansByPrev[*hash]; !ok {
269                         tp.orphansByPrev[*hash] = make(map[bc.Hash]*orphanTx)
270                 }
271                 tp.orphansByPrev[*hash][txD.Tx.ID] = orphan
272         }
273         return nil
274 }
275
276 func (tp *TxPool) addTransaction(txD *TxDesc) error {
277         if len(tp.pool) >= maxNewTxNum {
278                 return ErrPoolIsFull
279         }
280
281         tx := txD.Tx
282         txD.Added = time.Now()
283         tp.pool[tx.ID] = txD
284         for _, id := range tx.ResultIds {
285                 var assetID bc.AssetID
286                 outputEntry, err := tx.Entry(*id)
287                 if err != nil {
288                         return err
289                 }
290                 switch output := outputEntry.(type) {
291                 case *bc.IntraChainOutput:
292                         assetID = *output.Source.Value.AssetId
293                 case *bc.VoteOutput:
294                         assetID = *output.Source.Value.AssetId
295                 default:
296                         continue
297                 }
298
299                 if !txD.StatusFail || assetID == *consensus.BTMAssetID {
300                         tp.utxo[*id] = tx
301                 }
302         }
303
304         atomic.StoreInt64(&tp.lastUpdated, time.Now().Unix())
305         tp.eventDispatcher.Post(TxMsgEvent{TxMsg: &TxPoolMsg{TxDesc: txD, MsgType: MsgNewTx}})
306         log.WithFields(log.Fields{"module": logModule, "tx_id": tx.ID.String()}).Debug("Add tx to mempool")
307         return nil
308 }
309
310 func (tp *TxPool) checkOrphanUtxos(tx *types.Tx) ([]*bc.Hash, error) {
311         view := state.NewUtxoViewpoint()
312         if err := tp.store.GetTransactionsUtxo(view, []*bc.Tx{tx.Tx}); err != nil {
313                 return nil, err
314         }
315
316         hashes := []*bc.Hash{}
317         for _, hash := range tx.SpentOutputIDs {
318                 if !view.CanSpend(&hash) && tp.utxo[hash] == nil {
319                         hashes = append(hashes, &hash)
320                 }
321         }
322         return hashes, nil
323 }
324
325 func (tp *TxPool) orphanExpireWorker() {
326         ticker := time.NewTicker(orphanExpireScanInterval)
327         defer ticker.Stop()
328
329         for now := range ticker.C {
330                 tp.ExpireOrphan(now)
331         }
332 }
333
334 func (tp *TxPool) processOrphans(txD *TxDesc) {
335         processOrphans := []*orphanTx{}
336         addRely := func(tx *types.Tx) {
337                 for _, outHash := range tx.ResultIds {
338                         orphans, ok := tp.orphansByPrev[*outHash]
339                         if !ok {
340                                 continue
341                         }
342
343                         for _, orphan := range orphans {
344                                 processOrphans = append(processOrphans, orphan)
345                         }
346                         delete(tp.orphansByPrev, *outHash)
347                 }
348         }
349
350         addRely(txD.Tx)
351         for ; len(processOrphans) > 0; processOrphans = processOrphans[1:] {
352                 processOrphan := processOrphans[0]
353                 requireParents, err := tp.checkOrphanUtxos(processOrphan.Tx)
354                 if err != nil {
355                         log.WithFields(log.Fields{"module": logModule, "err": err}).Error("processOrphans got unexpect error")
356                         continue
357                 }
358
359                 if len(requireParents) == 0 {
360                         addRely(processOrphan.Tx)
361                         tp.removeOrphan(&processOrphan.Tx.ID)
362                         tp.addTransaction(processOrphan.TxDesc)
363                 }
364         }
365 }
366
367 func (tp *TxPool) removeOrphan(hash *bc.Hash) {
368         orphan, ok := tp.orphans[*hash]
369         if !ok {
370                 return
371         }
372
373         for _, spend := range orphan.Tx.SpentOutputIDs {
374                 orphans, ok := tp.orphansByPrev[spend]
375                 if !ok {
376                         continue
377                 }
378
379                 if delete(orphans, *hash); len(orphans) == 0 {
380                         delete(tp.orphansByPrev, spend)
381                 }
382         }
383         delete(tp.orphans, *hash)
384 }