OSDN Git Service

Merge pull request #572 from Bytom/list-addresses
[bytom/bytom.git] / account / reserve.go
1 package account
2
3 import (
4         "context"
5         "encoding/json"
6         "sync"
7         "sync/atomic"
8         "time"
9
10         dbm "github.com/tendermint/tmlibs/db"
11
12         "github.com/bytom/errors"
13         "github.com/bytom/protocol"
14         "github.com/bytom/protocol/bc"
15         "github.com/bytom/sync/idempotency"
16 )
17
18 var (
19         // ErrInsufficient indicates the account doesn't contain enough
20         // units of the requested asset to satisfy the reservation.
21         // New units must be deposited into the account in order to
22         // satisfy the request; change will not be sufficient.
23         ErrInsufficient = errors.New("reservation found insufficient funds")
24
25         // ErrReserved indicates that a reservation could not be
26         // satisfied because some of the outputs were already reserved.
27         // When those reservations are finalized into a transaction
28         // (and no other transaction spends funds from the account),
29         // new change outputs will be created
30         // in sufficient amounts to satisfy the request.
31         ErrReserved = errors.New("reservation found outputs already reserved")
32         // ErrMatchUTXO indicates the account doesn't contain enough utxo to satisfy the reservation.
33         ErrMatchUTXO = errors.New("can't match enough valid utxos")
34         // ErrReservation indicates the reserver doesn't found the reservation with the provided ID.
35         ErrReservation = errors.New("couldn't find reservation")
36 )
37
38 // UTXO describes an individual account utxo.
39 type UTXO struct {
40         OutputID bc.Hash
41         SourceID bc.Hash
42
43         // Avoiding AssetAmount here so that new(utxo) doesn't produce an
44         // AssetAmount with a nil AssetId.
45         AssetID bc.AssetID
46         Amount  uint64
47
48         SourcePos      uint64
49         ControlProgram []byte
50
51         AccountID           string
52         Address             string
53         ControlProgramIndex uint64
54         ValidHeight         uint64
55 }
56
57 func (u *UTXO) source() source {
58         return source{AssetID: u.AssetID, AccountID: u.AccountID}
59 }
60
61 // source describes the criteria to use when selecting UTXOs.
62 type source struct {
63         AssetID   bc.AssetID
64         AccountID string
65 }
66
67 // reservation describes a reservation of a set of UTXOs belonging
68 // to a particular account. Reservations are immutable.
69 type reservation struct {
70         ID          uint64
71         Source      source
72         UTXOs       []*UTXO
73         Change      uint64
74         Expiry      time.Time
75         ClientToken *string
76 }
77
78 func newReserver(c *protocol.Chain, walletdb dbm.DB) *reserver {
79         return &reserver{
80                 c:            c,
81                 db:           walletdb,
82                 reservations: make(map[uint64]*reservation),
83                 sources:      make(map[source]*sourceReserver),
84         }
85 }
86
87 // reserver implements a utxo reserver that stores reservations
88 // in-memory. It relies on the account_utxos table for the source of
89 // truth of valid UTXOs but tracks which of those UTXOs are reserved
90 // in-memory.
91 //
92 // To reduce latency and prevent deadlock, no two mutexes (either on
93 // reserver or sourceReserver) should be held at the same time
94 //
95 // reserver ensures idempotency of reservations until the reservation
96 // expiration.
97 type reserver struct {
98         c                 *protocol.Chain
99         db                dbm.DB
100         nextReservationID uint64
101         idempotency       idempotency.Group
102
103         reservationsMu sync.Mutex
104         reservations   map[uint64]*reservation
105
106         sourcesMu sync.Mutex
107         sources   map[source]*sourceReserver
108 }
109
110 // Reserve selects and reserves UTXOs according to the criteria provided
111 // in source. The resulting reservation expires at exp.
112 func (re *reserver) Reserve(src source, amount uint64, clientToken *string, exp time.Time) (*reservation, error) {
113
114         if clientToken == nil {
115                 return re.reserve(src, amount, clientToken, exp)
116         }
117
118         untypedRes, err := re.idempotency.Once(*clientToken, func() (interface{}, error) {
119                 return re.reserve(src, amount, clientToken, exp)
120         })
121         return untypedRes.(*reservation), err
122 }
123
124 func (re *reserver) reserve(src source, amount uint64, clientToken *string, exp time.Time) (res *reservation, err error) {
125         sourceReserver := re.source(src)
126
127         // Try to reserve the right amount.
128         rid := atomic.AddUint64(&re.nextReservationID, 1)
129         reserved, total, isImmature, err := sourceReserver.reserve(rid, amount)
130         if err != nil {
131                 if isImmature {
132                         return nil, errors.WithDetail(err, "some coinbase utxos are immature")
133                 }
134                 return nil, err
135         }
136
137         res = &reservation{
138                 ID:          rid,
139                 Source:      src,
140                 UTXOs:       reserved,
141                 Expiry:      exp,
142                 ClientToken: clientToken,
143         }
144
145         // Save the successful reservation.
146         re.reservationsMu.Lock()
147         defer re.reservationsMu.Unlock()
148         re.reservations[rid] = res
149
150         // Make change if necessary
151         if total > amount {
152                 res.Change = total - amount
153         }
154         return res, nil
155 }
156
157 // ReserveUTXO reserves a specific utxo for spending. The resulting
158 // reservation expires at exp.
159 func (re *reserver) ReserveUTXO(ctx context.Context, out bc.Hash, clientToken *string, exp time.Time) (*reservation, error) {
160         if clientToken == nil {
161                 return re.reserveUTXO(ctx, out, exp, nil)
162         }
163
164         untypedRes, err := re.idempotency.Once(*clientToken, func() (interface{}, error) {
165                 return re.reserveUTXO(ctx, out, exp, clientToken)
166         })
167         return untypedRes.(*reservation), err
168 }
169
170 func (re *reserver) reserveUTXO(ctx context.Context, out bc.Hash, exp time.Time, clientToken *string) (*reservation, error) {
171         u, err := findSpecificUTXO(re.db, out)
172         if err != nil {
173                 return nil, err
174         }
175
176         //u.ValidHeight > 0 means coinbase utxo
177         if u.ValidHeight > 0 && u.ValidHeight > re.c.Height() {
178                 return nil, errors.WithDetail(ErrMatchUTXO, "this coinbase utxo is immature")
179         }
180
181         rid := atomic.AddUint64(&re.nextReservationID, 1)
182         err = re.source(u.source()).reserveUTXO(rid, u)
183         if err != nil {
184                 return nil, err
185         }
186
187         res := &reservation{
188                 ID:          rid,
189                 Source:      u.source(),
190                 UTXOs:       []*UTXO{u},
191                 Expiry:      exp,
192                 ClientToken: clientToken,
193         }
194         re.reservationsMu.Lock()
195         re.reservations[rid] = res
196         re.reservationsMu.Unlock()
197         return res, nil
198 }
199
200 // Cancel makes a best-effort attempt at canceling the reservation with
201 // the provided ID.
202 func (re *reserver) Cancel(ctx context.Context, rid uint64) error {
203         re.reservationsMu.Lock()
204         res, ok := re.reservations[rid]
205         delete(re.reservations, rid)
206         re.reservationsMu.Unlock()
207         if !ok {
208                 return errors.Wrapf(ErrReservation, "rid=%d", rid)
209         }
210         re.source(res.Source).cancel(res)
211         /*if res.ClientToken != nil {
212                 re.idempotency.Forget(*res.ClientToken)
213         }*/
214         return nil
215 }
216
217 // ExpireReservations cleans up all reservations that have expired,
218 // making their UTXOs available for reservation again.
219 func (re *reserver) ExpireReservations(ctx context.Context) error {
220         // Remove records of any reservations that have expired.
221         now := time.Now()
222         var canceled []*reservation
223         re.reservationsMu.Lock()
224         for rid, res := range re.reservations {
225                 if res.Expiry.Before(now) {
226                         canceled = append(canceled, res)
227                         delete(re.reservations, rid)
228                 }
229         }
230         re.reservationsMu.Unlock()
231
232         // If we removed any expired reservations, update the corresponding
233         // source reservers.
234         for _, res := range canceled {
235                 re.source(res.Source).cancel(res)
236                 /*if res.ClientToken != nil {
237                         re.idempotency.Forget(*res.ClientToken)
238                 }*/
239         }
240
241         // TODO(jackson): Cleanup any source reservers that don't have
242         // anything reserved. It'll be a little tricky because of our
243         // locking scheme.
244         return nil
245 }
246
247 func (re *reserver) source(src source) *sourceReserver {
248         re.sourcesMu.Lock()
249         defer re.sourcesMu.Unlock()
250
251         sr, ok := re.sources[src]
252         if ok {
253                 return sr
254         }
255
256         sr = &sourceReserver{
257                 db:            re.db,
258                 src:           src,
259                 reserved:      make(map[bc.Hash]uint64),
260                 currentHeight: re.c.Height,
261         }
262         re.sources[src] = sr
263         return sr
264 }
265
266 type sourceReserver struct {
267         db            dbm.DB
268         src           source
269         currentHeight func() uint64
270         mu            sync.Mutex
271         reserved      map[bc.Hash]uint64
272 }
273
274 func (sr *sourceReserver) reserve(rid uint64, amount uint64) ([]*UTXO, uint64, bool, error) {
275         var (
276                 reserved, unavailable uint64
277                 reservedUTXOs         []*UTXO
278         )
279
280         utxos, isImmature, err := findMatchingUTXOs(sr.db, sr.src, sr.currentHeight)
281         if err != nil {
282                 return nil, 0, isImmature, errors.Wrap(err)
283         }
284
285         sr.mu.Lock()
286         defer sr.mu.Unlock()
287         for _, u := range utxos {
288                 // If the UTXO is already reserved, skip it.
289                 if _, ok := sr.reserved[u.OutputID]; ok {
290                         unavailable += u.Amount
291                         continue
292                 }
293
294                 reserved += u.Amount
295                 reservedUTXOs = append(reservedUTXOs, u)
296                 if reserved >= amount {
297                         break
298                 }
299         }
300         if reserved+unavailable < amount {
301                 // Even if everything was available, this account wouldn't have
302                 // enough to satisfy the request.
303                 return nil, 0, isImmature, ErrInsufficient
304         }
305         if reserved < amount {
306                 // The account has enough for the request, but some is tied up in
307                 // other reservations.
308                 return nil, 0, isImmature, ErrReserved
309         }
310
311         // We've found enough to satisfy the request.
312         for _, u := range reservedUTXOs {
313                 sr.reserved[u.OutputID] = rid
314         }
315
316         return reservedUTXOs, reserved, isImmature, nil
317 }
318
319 func (sr *sourceReserver) reserveUTXO(rid uint64, utxo *UTXO) error {
320         sr.mu.Lock()
321         defer sr.mu.Unlock()
322
323         _, isReserved := sr.reserved[utxo.OutputID]
324         if isReserved {
325                 return ErrReserved
326         }
327
328         sr.reserved[utxo.OutputID] = rid
329         return nil
330 }
331
332 func (sr *sourceReserver) cancel(res *reservation) {
333         sr.mu.Lock()
334         defer sr.mu.Unlock()
335         for _, utxo := range res.UTXOs {
336                 delete(sr.reserved, utxo.OutputID)
337         }
338 }
339
340 func findMatchingUTXOs(db dbm.DB, src source, currentHeight func() uint64) ([]*UTXO, bool, error) {
341         utxos := []*UTXO{}
342         isImmature := false
343         utxoIter := db.IteratorPrefix([]byte(UTXOPreFix))
344         defer utxoIter.Release()
345
346         for utxoIter.Next() {
347                 u := &UTXO{}
348                 if err := json.Unmarshal(utxoIter.Value(), u); err != nil {
349                         return nil, false, errors.Wrap(err)
350                 }
351
352                 //u.ValidHeight > 0 means coinbase utxo
353                 if u.ValidHeight > 0 && u.ValidHeight > currentHeight() {
354                         isImmature = true
355                         continue
356                 }
357
358                 if u.AccountID == src.AccountID && u.AssetID == src.AssetID {
359                         utxos = append(utxos, u)
360                 }
361         }
362
363         if len(utxos) == 0 {
364                 return nil, isImmature, ErrMatchUTXO
365         }
366         return utxos, isImmature, nil
367 }
368
369 func findSpecificUTXO(db dbm.DB, outHash bc.Hash) (*UTXO, error) {
370         u := &UTXO{}
371
372         data := db.Get(StandardUTXOKey(outHash))
373         if data == nil {
374                 if data = db.Get(ContractUTXOKey(outHash)); data == nil {
375                         return nil, errors.Wrapf(ErrMatchUTXO, "output_id = %s", outHash.String())
376                 }
377         }
378         return u, json.Unmarshal(data, u)
379 }