OSDN Git Service

79ef71fa1da97e53759e1d54033f6906961b8169
[bytom/bytom.git] / blockchain / account / reserve.go
1 package account
2
3 import (
4         "bytes"
5         "context"
6         "encoding/json"
7         "fmt"
8         "sync"
9         "sync/atomic"
10         "time"
11
12         log "github.com/sirupsen/logrus"
13         dbm "github.com/tendermint/tmlibs/db"
14
15         "github.com/bytom/config"
16         "github.com/bytom/errors"
17         "github.com/bytom/protocol"
18         "github.com/bytom/protocol/bc"
19         "github.com/bytom/sync/idempotency"
20 )
21
22 var (
23         // ErrInsufficient indicates the account doesn't contain enough
24         // units of the requested asset to satisfy the reservation.
25         // New units must be deposited into the account in order to
26         // satisfy the request; change will not be sufficient.
27         ErrInsufficient = errors.New("reservation found insufficient funds")
28
29         // ErrReserved indicates that a reservation could not be
30         // satisfied because some of the outputs were already reserved.
31         // When those reservations are finalized into a transaction
32         // (and no other transaction spends funds from the account),
33         // new change outputs will be created
34         // in sufficient amounts to satisfy the request.
35         ErrReserved = errors.New("reservation found outputs already reserved")
36 )
37
38 // utxo describes an individual account utxo.
39 type utxo struct {
40         OutputID bc.Hash
41         SourceID bc.Hash
42
43         // Avoiding AssetAmount here so that new(utxo) doesn't produce an
44         // AssetAmount with a nil AssetId.
45         AssetID bc.AssetID
46         Amount  uint64
47
48         SourcePos      uint64
49         ControlProgram []byte
50         RefDataHash    bc.Hash
51
52         AccountID           string
53         ControlProgramIndex uint64
54 }
55
56 func (u *utxo) source() source {
57         return source{AssetID: u.AssetID, AccountID: u.AccountID}
58 }
59
60 // source describes the criteria to use when selecting UTXOs.
61 type source struct {
62         AssetID   bc.AssetID
63         AccountID string
64 }
65
66 // reservation describes a reservation of a set of UTXOs belonging
67 // to a particular account. Reservations are immutable.
68 type reservation struct {
69         ID          uint64
70         Source      source
71         UTXOs       []*utxo
72         Change      uint64
73         Expiry      time.Time
74         ClientToken *string
75 }
76
77 func newReserver(c *protocol.Chain, walletdb dbm.DB) *reserver {
78         return &reserver{
79                 c:            c,
80                 db:           walletdb,
81                 reservations: make(map[uint64]*reservation),
82                 sources:      make(map[source]*sourceReserver),
83         }
84 }
85
86 // reserver implements a utxo reserver that stores reservations
87 // in-memory. It relies on the account_utxos table for the source of
88 // truth of valid UTXOs but tracks which of those UTXOs are reserved
89 // in-memory.
90 //
91 // To reduce latency and prevent deadlock, no two mutexes (either on
92 // reserver or sourceReserver) should be held at the same time
93 //
94 // reserver ensures idempotency of reservations until the reservation
95 // expiration.
96 type reserver struct {
97         c                 *protocol.Chain
98         db                dbm.DB
99         nextReservationID uint64
100         idempotency       idempotency.Group
101
102         reservationsMu sync.Mutex
103         reservations   map[uint64]*reservation
104
105         sourcesMu sync.Mutex
106         sources   map[source]*sourceReserver
107 }
108
109 // Reserve selects and reserves UTXOs according to the criteria provided
110 // in source. The resulting reservation expires at exp.
111 func (re *reserver) Reserve(src source, amount uint64, clientToken *string, exp time.Time) (*reservation, error) {
112
113         if clientToken == nil {
114                 return re.reserve(src, amount, clientToken, exp)
115         }
116
117         untypedRes, err := re.idempotency.Once(*clientToken, func() (interface{}, error) {
118                 return re.reserve(src, amount, clientToken, exp)
119         })
120         return untypedRes.(*reservation), err
121 }
122
123 func (re *reserver) reserve(src source, amount uint64, clientToken *string, exp time.Time) (res *reservation, err error) {
124         sourceReserver := re.source(src)
125
126         // Try to reserve the right amount.
127         rid := atomic.AddUint64(&re.nextReservationID, 1)
128         reserved, total, err := sourceReserver.reserve(rid, amount)
129         if err != nil {
130                 return nil, err
131         }
132
133         res = &reservation{
134                 ID:          rid,
135                 Source:      src,
136                 UTXOs:       reserved,
137                 Expiry:      exp,
138                 ClientToken: clientToken,
139         }
140
141         // Save the successful reservation.
142         re.reservationsMu.Lock()
143         defer re.reservationsMu.Unlock()
144         re.reservations[rid] = res
145
146         // Make change if necessary
147         if total > amount {
148                 res.Change = total - amount
149         }
150         return res, nil
151 }
152
153 // ReserveUTXO reserves a specific utxo for spending. The resulting
154 // reservation expires at exp.
155 func (re *reserver) ReserveUTXO(ctx context.Context, out bc.Hash, clientToken *string, exp time.Time) (*reservation, error) {
156         if clientToken == nil {
157                 return re.reserveUTXO(ctx, out, exp, nil)
158         }
159
160         untypedRes, err := re.idempotency.Once(*clientToken, func() (interface{}, error) {
161                 return re.reserveUTXO(ctx, out, exp, clientToken)
162         })
163         return untypedRes.(*reservation), err
164 }
165
166 func (re *reserver) reserveUTXO(ctx context.Context, out bc.Hash, exp time.Time, clientToken *string) (*reservation, error) {
167         u, err := findSpecificUTXO(re.db, out)
168         if err != nil {
169                 return nil, err
170         }
171
172         if !re.checkUTXO(u) {
173                 return nil, errors.New("didn't find utxo")
174         }
175
176         rid := atomic.AddUint64(&re.nextReservationID, 1)
177         err = re.source(u.source()).reserveUTXO(rid, u)
178         if err != nil {
179                 return nil, err
180         }
181
182         res := &reservation{
183                 ID:          rid,
184                 Source:      u.source(),
185                 UTXOs:       []*utxo{u},
186                 Expiry:      exp,
187                 ClientToken: clientToken,
188         }
189         re.reservationsMu.Lock()
190         re.reservations[rid] = res
191         re.reservationsMu.Unlock()
192         return res, nil
193 }
194
195 // Cancel makes a best-effort attempt at canceling the reservation with
196 // the provided ID.
197 func (re *reserver) Cancel(ctx context.Context, rid uint64) error {
198         re.reservationsMu.Lock()
199         res, ok := re.reservations[rid]
200         delete(re.reservations, rid)
201         re.reservationsMu.Unlock()
202         if !ok {
203                 return fmt.Errorf("couldn't find reservation %d", rid)
204         }
205         re.source(res.Source).cancel(res)
206         /*if res.ClientToken != nil {
207                 re.idempotency.Forget(*res.ClientToken)
208         }*/
209         return nil
210 }
211
212 // ExpireReservations cleans up all reservations that have expired,
213 // making their UTXOs available for reservation again.
214 func (re *reserver) ExpireReservations(ctx context.Context) error {
215         // Remove records of any reservations that have expired.
216         now := time.Now()
217         var canceled []*reservation
218         re.reservationsMu.Lock()
219         for rid, res := range re.reservations {
220                 if res.Expiry.Before(now) {
221                         canceled = append(canceled, res)
222                         delete(re.reservations, rid)
223                 }
224         }
225         re.reservationsMu.Unlock()
226
227         // If we removed any expired reservations, update the corresponding
228         // source reservers.
229         for _, res := range canceled {
230                 re.source(res.Source).cancel(res)
231                 /*if res.ClientToken != nil {
232                         re.idempotency.Forget(*res.ClientToken)
233                 }*/
234         }
235
236         // TODO(jackson): Cleanup any source reservers that don't have
237         // anything reserved. It'll be a little tricky because of our
238         // locking scheme.
239         return nil
240 }
241
242 func (re *reserver) checkUTXO(u *utxo) bool {
243         _, s := re.c.State()
244         return s.Tree.Contains(u.OutputID.Bytes())
245 }
246
247 func (re *reserver) source(src source) *sourceReserver {
248         re.sourcesMu.Lock()
249         defer re.sourcesMu.Unlock()
250
251         sr, ok := re.sources[src]
252         if ok {
253                 return sr
254         }
255
256         sr = &sourceReserver{
257                 db:       re.db,
258                 src:      src,
259                 validFn:  re.checkUTXO,
260                 cached:   make(map[bc.Hash]*utxo),
261                 reserved: make(map[bc.Hash]uint64),
262         }
263         re.sources[src] = sr
264         return sr
265 }
266
267 type sourceReserver struct {
268         db       dbm.DB
269         src      source
270         validFn  func(u *utxo) bool
271         mu       sync.Mutex
272         cached   map[bc.Hash]*utxo
273         reserved map[bc.Hash]uint64
274 }
275
276 func (sr *sourceReserver) reserve(rid uint64, amount uint64) ([]*utxo, uint64, error) {
277         reservedUTXOs, reservedAmount, err := sr.reserveFromCache(rid, amount)
278         if err == nil {
279                 return reservedUTXOs, reservedAmount, nil
280         }
281
282         // Find the set of UTXOs that match this source.
283         err = sr.refillCache()
284         if err != nil {
285                 return nil, 0, err
286         }
287
288         return sr.reserveFromCache(rid, amount)
289 }
290
291 func (sr *sourceReserver) reserveFromCache(rid uint64, amount uint64) ([]*utxo, uint64, error) {
292         var (
293                 reserved, unavailable uint64
294                 reservedUTXOs         []*utxo
295         )
296         sr.mu.Lock()
297         defer sr.mu.Unlock()
298
299         for o, u := range sr.cached {
300                 // If the UTXO is already reserved, skip it.
301                 if _, ok := sr.reserved[u.OutputID]; ok {
302                         unavailable += u.Amount
303                         continue
304                 }
305                 // Cached utxos aren't guaranteed to still be valid; they may
306                 // have been spent. Verify that that the outputs are still in
307                 // the state tree.
308                 if !sr.validFn(u) {
309                         delete(sr.cached, o)
310                         continue
311                 }
312
313                 reserved += u.Amount
314                 reservedUTXOs = append(reservedUTXOs, u)
315                 if reserved >= amount {
316                         break
317                 }
318         }
319         if reserved+unavailable < amount {
320                 // Even if everything was available, this account wouldn't have
321                 // enough to satisfy the request.
322                 return nil, 0, ErrInsufficient
323         }
324         if reserved < amount {
325                 // The account has enough for the request, but some is tied up in
326                 // other reservations.
327                 return nil, 0, ErrReserved
328         }
329
330         // We've found enough to satisfy the request.
331         for _, u := range reservedUTXOs {
332                 sr.reserved[u.OutputID] = rid
333         }
334
335         return reservedUTXOs, reserved, nil
336 }
337
338 func (sr *sourceReserver) reserveUTXO(rid uint64, utxo *utxo) error {
339         sr.mu.Lock()
340         defer sr.mu.Unlock()
341
342         _, isReserved := sr.reserved[utxo.OutputID]
343         if isReserved {
344                 return ErrReserved
345         }
346
347         sr.reserved[utxo.OutputID] = rid
348         return nil
349 }
350
351 func (sr *sourceReserver) cancel(res *reservation) {
352         sr.mu.Lock()
353         defer sr.mu.Unlock()
354         for _, utxo := range res.UTXOs {
355                 delete(sr.reserved, utxo.OutputID)
356         }
357 }
358
359 func (sr *sourceReserver) refillCache() error {
360
361         utxos, err := findMatchingUTXOs(sr.db, sr.src)
362         if err != nil {
363                 return errors.Wrap(err)
364         }
365
366         sr.mu.Lock()
367         for _, u := range utxos {
368                 sr.cached[u.OutputID] = u
369         }
370         sr.mu.Unlock()
371
372         return nil
373 }
374
375 func findMatchingUTXOs(db dbm.DB, src source) ([]*utxo, error) {
376
377         var (
378                 utxos       []*utxo
379                 accountUTXO UTXO
380                 rawOutputID [32]byte
381                 rawSourceID [32]byte
382                 rawRefData  [32]byte
383         )
384
385         accountUTXOIter := db.IteratorPrefix([]byte(UTXOPreFix))
386         defer accountUTXOIter.Release()
387         for accountUTXOIter.Next() {
388
389                 if err := json.Unmarshal(accountUTXOIter.Value(), &accountUTXO); err != nil {
390                         return nil, errors.Wrap(err)
391                 }
392
393                 if (accountUTXO.AccountID == src.AccountID) && (bytes.Equal(accountUTXO.AssetID, src.AssetID.Bytes())) {
394                         copy(rawOutputID[:], accountUTXO.OutputID)
395                         copy(rawSourceID[:], accountUTXO.SourceID)
396                         copy(rawRefData[:], accountUTXO.RefData)
397
398                         utxos = append(utxos, &utxo{
399                                 OutputID:            bc.NewHash(rawOutputID),
400                                 SourceID:            bc.NewHash(rawSourceID),
401                                 AssetID:             src.AssetID,
402                                 Amount:              accountUTXO.Amount,
403                                 SourcePos:           accountUTXO.SourcePos,
404                                 ControlProgram:      accountUTXO.Program,
405                                 RefDataHash:         bc.NewHash(rawRefData),
406                                 AccountID:           src.AccountID,
407                                 ControlProgramIndex: accountUTXO.ProgramIndex,
408                         })
409
410                 }
411
412         }
413
414         if len(utxos) == 0 {
415                 log.WithFields(log.Fields{"AccountID": src.AccountID, "AssetID": src.AssetID.String()}).Error("can't match utxo")
416                 return nil, errors.New("can't match utxo")
417         }
418
419         return utxos, nil
420 }
421
422 func findSpecificUTXO(db dbm.DB, outHash bc.Hash) (*utxo, error) {
423         u := new(utxo)
424         accountUTXO := new(UTXO)
425
426         //temp fix for coinbase UTXO isn't add to accountUTXO db, will be remove later
427         if outHash == *config.GenerateGenesisTx().ResultIds[0] {
428                 return genesisBlockUTXO(), nil
429         }
430
431         // make sure accountUTXO existed in the db
432         accountUTXOValue := db.Get(UTXOKey(outHash))
433         if accountUTXOValue == nil {
434                 return nil, fmt.Errorf("can't find utxo: %s", outHash.String())
435         }
436         if err := json.Unmarshal(accountUTXOValue, &accountUTXO); err != nil {
437                 return nil, errors.Wrap(err)
438         }
439
440         rawOutputID := new([32]byte)
441         rawAssetID := new([32]byte)
442         rawSourceID := new([32]byte)
443         rawRefData := new([32]byte)
444
445         copy(rawOutputID[:], accountUTXO.OutputID)
446         copy(rawAssetID[:], accountUTXO.AssetID)
447         copy(rawSourceID[:], accountUTXO.SourceID)
448         copy(rawRefData[:], accountUTXO.RefData)
449
450         u.OutputID = bc.NewHash(*rawOutputID)
451         u.AccountID = accountUTXO.AccountID
452         u.AssetID = bc.NewAssetID(*rawAssetID)
453         u.Amount = accountUTXO.Amount
454         u.ControlProgramIndex = accountUTXO.ProgramIndex
455         u.ControlProgram = accountUTXO.Program
456         u.SourceID = bc.NewHash(*rawSourceID)
457         u.SourcePos = accountUTXO.SourcePos
458         u.RefDataHash = bc.NewHash(*rawRefData)
459
460         return u, nil
461 }
462
463 //temp fix for coinbase UTXO isn't add to accountUTXO db, will be remove later
464 func genesisBlockUTXO() *utxo {
465         u := new(utxo)
466         tx := config.GenerateGenesisTx()
467
468         out := tx.Outputs[0]
469         resOutID := tx.ResultIds[0]
470         resOut, _ := tx.Entries[*resOutID].(*bc.Output)
471         log.Infof("genesis Output:%v", resOut)
472
473         //u.AccountID =
474         u.OutputID = *tx.OutputID(0)
475         u.AssetID = *out.AssetId
476         u.Amount = out.Amount
477         u.ControlProgramIndex = 0
478         u.ControlProgram = out.ControlProgram
479         u.SourceID = *resOut.Source.Ref
480         u.SourcePos = resOut.Source.Position
481         u.RefDataHash = *resOut.Data
482         return u
483 }