OSDN Git Service

Merge branch 'dev' into dev-gas
[bytom/bytom.git] / account / reserve.go
1 package account
2
3 import (
4         "context"
5         "encoding/json"
6         "sync"
7         "sync/atomic"
8         "time"
9
10         dbm "github.com/tendermint/tmlibs/db"
11
12         "github.com/bytom/errors"
13         "github.com/bytom/protocol"
14         "github.com/bytom/protocol/bc"
15         "github.com/bytom/sync/idempotency"
16 )
17
18 var (
19         // ErrInsufficient indicates the account doesn't contain enough
20         // units of the requested asset to satisfy the reservation.
21         // New units must be deposited into the account in order to
22         // satisfy the request; change will not be sufficient.
23         ErrInsufficient = errors.New("reservation found insufficient funds")
24
25         // ErrReserved indicates that a reservation could not be
26         // satisfied because some of the outputs were already reserved.
27         // When those reservations are finalized into a transaction
28         // (and no other transaction spends funds from the account),
29         // new change outputs will be created
30         // in sufficient amounts to satisfy the request.
31         ErrReserved = errors.New("reservation found outputs already reserved")
32         // ErrMatchUTXO indicates the account doesn't contain enough utxo to satisfy the reservation.
33         ErrMatchUTXO = errors.New("can't match enough valid utxos")
34         // ErrReservation indicates the reserver doesn't found the reservation with the provided ID.
35         ErrReservation = errors.New("couldn't find reservation")
36 )
37
38 // UTXO describes an individual account utxo.
39 type UTXO struct {
40         OutputID bc.Hash
41         SourceID bc.Hash
42
43         // Avoiding AssetAmount here so that new(utxo) doesn't produce an
44         // AssetAmount with a nil AssetId.
45         AssetID bc.AssetID
46         Amount  uint64
47
48         SourcePos      uint64
49         ControlProgram []byte
50
51         AccountID           string
52         Address             string
53         ControlProgramIndex uint64
54         ValidHeight         uint64
55         Change              bool
56 }
57
58 func (u *UTXO) source() source {
59         return source{AssetID: u.AssetID, AccountID: u.AccountID}
60 }
61
62 // source describes the criteria to use when selecting UTXOs.
63 type source struct {
64         AssetID   bc.AssetID
65         AccountID string
66 }
67
68 // reservation describes a reservation of a set of UTXOs belonging
69 // to a particular account. Reservations are immutable.
70 type reservation struct {
71         ID          uint64
72         Source      source
73         UTXOs       []*UTXO
74         Change      uint64
75         Expiry      time.Time
76         ClientToken *string
77 }
78
79 func newReserver(c *protocol.Chain, walletdb dbm.DB) *reserver {
80         return &reserver{
81                 c:            c,
82                 db:           walletdb,
83                 reservations: make(map[uint64]*reservation),
84                 sources:      make(map[source]*sourceReserver),
85         }
86 }
87
88 // reserver implements a utxo reserver that stores reservations
89 // in-memory. It relies on the account_utxos table for the source of
90 // truth of valid UTXOs but tracks which of those UTXOs are reserved
91 // in-memory.
92 //
93 // To reduce latency and prevent deadlock, no two mutexes (either on
94 // reserver or sourceReserver) should be held at the same time
95 //
96 // reserver ensures idempotency of reservations until the reservation
97 // expiration.
98 type reserver struct {
99         c                 *protocol.Chain
100         db                dbm.DB
101         nextReservationID uint64
102         idempotency       idempotency.Group
103
104         reservationsMu sync.Mutex
105         reservations   map[uint64]*reservation
106
107         sourcesMu sync.Mutex
108         sources   map[source]*sourceReserver
109 }
110
111 // Reserve selects and reserves UTXOs according to the criteria provided
112 // in source. The resulting reservation expires at exp.
113 func (re *reserver) Reserve(src source, amount uint64, clientToken *string, exp time.Time) (*reservation, error) {
114
115         if clientToken == nil {
116                 return re.reserve(src, amount, clientToken, exp)
117         }
118
119         untypedRes, err := re.idempotency.Once(*clientToken, func() (interface{}, error) {
120                 return re.reserve(src, amount, clientToken, exp)
121         })
122         return untypedRes.(*reservation), err
123 }
124
125 func (re *reserver) reserve(src source, amount uint64, clientToken *string, exp time.Time) (res *reservation, err error) {
126         sourceReserver := re.source(src)
127
128         // Try to reserve the right amount.
129         rid := atomic.AddUint64(&re.nextReservationID, 1)
130         reserved, total, isImmature, err := sourceReserver.reserve(rid, amount)
131         if err != nil {
132                 if isImmature {
133                         return nil, errors.WithDetail(err, "some coinbase utxos are immature")
134                 }
135                 return nil, err
136         }
137
138         res = &reservation{
139                 ID:          rid,
140                 Source:      src,
141                 UTXOs:       reserved,
142                 Expiry:      exp,
143                 ClientToken: clientToken,
144         }
145
146         // Save the successful reservation.
147         re.reservationsMu.Lock()
148         defer re.reservationsMu.Unlock()
149         re.reservations[rid] = res
150
151         // Make change if necessary
152         if total > amount {
153                 res.Change = total - amount
154         }
155         return res, nil
156 }
157
158 // ReserveUTXO reserves a specific utxo for spending. The resulting
159 // reservation expires at exp.
160 func (re *reserver) ReserveUTXO(ctx context.Context, out bc.Hash, clientToken *string, exp time.Time) (*reservation, error) {
161         if clientToken == nil {
162                 return re.reserveUTXO(ctx, out, exp, nil)
163         }
164
165         untypedRes, err := re.idempotency.Once(*clientToken, func() (interface{}, error) {
166                 return re.reserveUTXO(ctx, out, exp, clientToken)
167         })
168         return untypedRes.(*reservation), err
169 }
170
171 func (re *reserver) reserveUTXO(ctx context.Context, out bc.Hash, exp time.Time, clientToken *string) (*reservation, error) {
172         u, err := findSpecificUTXO(re.db, out)
173         if err != nil {
174                 return nil, err
175         }
176
177         //u.ValidHeight > 0 means coinbase utxo
178         if u.ValidHeight > 0 && u.ValidHeight > re.c.BestBlockHeight() {
179                 return nil, errors.WithDetail(ErrMatchUTXO, "this coinbase utxo is immature")
180         }
181
182         rid := atomic.AddUint64(&re.nextReservationID, 1)
183         err = re.source(u.source()).reserveUTXO(rid, u)
184         if err != nil {
185                 return nil, err
186         }
187
188         res := &reservation{
189                 ID:          rid,
190                 Source:      u.source(),
191                 UTXOs:       []*UTXO{u},
192                 Expiry:      exp,
193                 ClientToken: clientToken,
194         }
195         re.reservationsMu.Lock()
196         re.reservations[rid] = res
197         re.reservationsMu.Unlock()
198         return res, nil
199 }
200
201 // Cancel makes a best-effort attempt at canceling the reservation with
202 // the provided ID.
203 func (re *reserver) Cancel(ctx context.Context, rid uint64) error {
204         re.reservationsMu.Lock()
205         res, ok := re.reservations[rid]
206         delete(re.reservations, rid)
207         re.reservationsMu.Unlock()
208         if !ok {
209                 return errors.Wrapf(ErrReservation, "rid=%d", rid)
210         }
211         re.source(res.Source).cancel(res)
212         /*if res.ClientToken != nil {
213                 re.idempotency.Forget(*res.ClientToken)
214         }*/
215         return nil
216 }
217
218 // ExpireReservations cleans up all reservations that have expired,
219 // making their UTXOs available for reservation again.
220 func (re *reserver) ExpireReservations(ctx context.Context) error {
221         // Remove records of any reservations that have expired.
222         now := time.Now()
223         var canceled []*reservation
224         re.reservationsMu.Lock()
225         for rid, res := range re.reservations {
226                 if res.Expiry.Before(now) {
227                         canceled = append(canceled, res)
228                         delete(re.reservations, rid)
229                 }
230         }
231         re.reservationsMu.Unlock()
232
233         // If we removed any expired reservations, update the corresponding
234         // source reservers.
235         for _, res := range canceled {
236                 re.source(res.Source).cancel(res)
237                 /*if res.ClientToken != nil {
238                         re.idempotency.Forget(*res.ClientToken)
239                 }*/
240         }
241
242         // TODO(jackson): Cleanup any source reservers that don't have
243         // anything reserved. It'll be a little tricky because of our
244         // locking scheme.
245         return nil
246 }
247
248 func (re *reserver) source(src source) *sourceReserver {
249         re.sourcesMu.Lock()
250         defer re.sourcesMu.Unlock()
251
252         sr, ok := re.sources[src]
253         if ok {
254                 return sr
255         }
256
257         sr = &sourceReserver{
258                 db:            re.db,
259                 src:           src,
260                 reserved:      make(map[bc.Hash]uint64),
261                 currentHeight: re.c.BestBlockHeight,
262         }
263         re.sources[src] = sr
264         return sr
265 }
266
267 type sourceReserver struct {
268         db            dbm.DB
269         src           source
270         currentHeight func() uint64
271         mu            sync.Mutex
272         reserved      map[bc.Hash]uint64
273 }
274
275 func (sr *sourceReserver) reserve(rid uint64, amount uint64) ([]*UTXO, uint64, bool, error) {
276         var (
277                 reserved, unavailable uint64
278                 reservedUTXOs         []*UTXO
279         )
280
281         utxos, isImmature, err := findMatchingUTXOs(sr.db, sr.src, sr.currentHeight)
282         if err != nil {
283                 return nil, 0, isImmature, errors.Wrap(err)
284         }
285
286         sr.mu.Lock()
287         defer sr.mu.Unlock()
288         for _, u := range utxos {
289                 // If the UTXO is already reserved, skip it.
290                 if _, ok := sr.reserved[u.OutputID]; ok {
291                         unavailable += u.Amount
292                         continue
293                 }
294
295                 reserved += u.Amount
296                 reservedUTXOs = append(reservedUTXOs, u)
297                 if reserved >= amount {
298                         break
299                 }
300         }
301         if reserved+unavailable < amount {
302                 // Even if everything was available, this account wouldn't have
303                 // enough to satisfy the request.
304                 return nil, 0, isImmature, ErrInsufficient
305         }
306         if reserved < amount {
307                 // The account has enough for the request, but some is tied up in
308                 // other reservations.
309                 return nil, 0, isImmature, ErrReserved
310         }
311
312         // We've found enough to satisfy the request.
313         for _, u := range reservedUTXOs {
314                 sr.reserved[u.OutputID] = rid
315         }
316
317         return reservedUTXOs, reserved, isImmature, nil
318 }
319
320 func (sr *sourceReserver) reserveUTXO(rid uint64, utxo *UTXO) error {
321         sr.mu.Lock()
322         defer sr.mu.Unlock()
323
324         _, isReserved := sr.reserved[utxo.OutputID]
325         if isReserved {
326                 return ErrReserved
327         }
328
329         sr.reserved[utxo.OutputID] = rid
330         return nil
331 }
332
333 func (sr *sourceReserver) cancel(res *reservation) {
334         sr.mu.Lock()
335         defer sr.mu.Unlock()
336         for _, utxo := range res.UTXOs {
337                 delete(sr.reserved, utxo.OutputID)
338         }
339 }
340
341 func findMatchingUTXOs(db dbm.DB, src source, currentHeight func() uint64) ([]*UTXO, bool, error) {
342         utxos := []*UTXO{}
343         isImmature := false
344         utxoIter := db.IteratorPrefix([]byte(UTXOPreFix))
345         defer utxoIter.Release()
346
347         for utxoIter.Next() {
348                 u := &UTXO{}
349                 if err := json.Unmarshal(utxoIter.Value(), u); err != nil {
350                         return nil, false, errors.Wrap(err)
351                 }
352
353                 //u.ValidHeight > 0 means coinbase utxo
354                 if u.ValidHeight > 0 && u.ValidHeight > currentHeight() {
355                         isImmature = true
356                         continue
357                 }
358
359                 if u.AccountID == src.AccountID && u.AssetID == src.AssetID {
360                         utxos = append(utxos, u)
361                 }
362         }
363
364         if len(utxos) == 0 {
365                 return nil, isImmature, ErrMatchUTXO
366         }
367         return utxos, isImmature, nil
368 }
369
370 func findSpecificUTXO(db dbm.DB, outHash bc.Hash) (*UTXO, error) {
371         u := &UTXO{}
372
373         data := db.Get(StandardUTXOKey(outHash))
374         if data == nil {
375                 if data = db.Get(ContractUTXOKey(outHash)); data == nil {
376                         return nil, errors.Wrapf(ErrMatchUTXO, "output_id = %s", outHash.String())
377                 }
378         }
379         return u, json.Unmarshal(data, u)
380 }