12 log "github.com/sirupsen/logrus"
13 dbm "github.com/tendermint/tmlibs/db"
15 "github.com/bytom/config"
16 "github.com/bytom/errors"
17 "github.com/bytom/protocol"
18 "github.com/bytom/protocol/bc"
19 "github.com/bytom/sync/idempotency"
23 // ErrInsufficient indicates the account doesn't contain enough
24 // units of the requested asset to satisfy the reservation.
25 // New units must be deposited into the account in order to
26 // satisfy the request; change will not be sufficient.
27 ErrInsufficient = errors.New("reservation found insufficient funds")
29 // ErrReserved indicates that a reservation could not be
30 // satisfied because some of the outputs were already reserved.
31 // When those reservations are finalized into a transaction
32 // (and no other transaction spends funds from the account),
33 // new change outputs will be created
34 // in sufficient amounts to satisfy the request.
35 ErrReserved = errors.New("reservation found outputs already reserved")
38 // utxo describes an individual account utxo.
43 // Avoiding AssetAmount here so that new(utxo) doesn't produce an
44 // AssetAmount with a nil AssetId.
53 ControlProgramIndex uint64
56 func (u *utxo) source() source {
57 return source{AssetID: u.AssetID, AccountID: u.AccountID}
60 // source describes the criteria to use when selecting UTXOs.
66 // reservation describes a reservation of a set of UTXOs belonging
67 // to a particular account. Reservations are immutable.
68 type reservation struct {
77 func newReserver(c *protocol.Chain, walletdb dbm.DB) *reserver {
81 reservations: make(map[uint64]*reservation),
82 sources: make(map[source]*sourceReserver),
86 // reserver implements a utxo reserver that stores reservations
87 // in-memory. It relies on the account_utxos table for the source of
88 // truth of valid UTXOs but tracks which of those UTXOs are reserved
91 // To reduce latency and prevent deadlock, no two mutexes (either on
92 // reserver or sourceReserver) should be held at the same time
94 // reserver ensures idempotency of reservations until the reservation
96 type reserver struct {
99 nextReservationID uint64
100 idempotency idempotency.Group
102 reservationsMu sync.Mutex
103 reservations map[uint64]*reservation
106 sources map[source]*sourceReserver
109 // Reserve selects and reserves UTXOs according to the criteria provided
110 // in source. The resulting reservation expires at exp.
111 func (re *reserver) Reserve(src source, amount uint64, clientToken *string, exp time.Time) (*reservation, error) {
113 if clientToken == nil {
114 return re.reserve(src, amount, clientToken, exp)
117 untypedRes, err := re.idempotency.Once(*clientToken, func() (interface{}, error) {
118 return re.reserve(src, amount, clientToken, exp)
120 return untypedRes.(*reservation), err
123 func (re *reserver) reserve(src source, amount uint64, clientToken *string, exp time.Time) (res *reservation, err error) {
124 sourceReserver := re.source(src)
126 // Try to reserve the right amount.
127 rid := atomic.AddUint64(&re.nextReservationID, 1)
128 reserved, total, err := sourceReserver.reserve(rid, amount)
138 ClientToken: clientToken,
141 // Save the successful reservation.
142 re.reservationsMu.Lock()
143 defer re.reservationsMu.Unlock()
144 re.reservations[rid] = res
146 // Make change if necessary
148 res.Change = total - amount
153 // ReserveUTXO reserves a specific utxo for spending. The resulting
154 // reservation expires at exp.
155 func (re *reserver) ReserveUTXO(ctx context.Context, out bc.Hash, clientToken *string, exp time.Time) (*reservation, error) {
156 if clientToken == nil {
157 return re.reserveUTXO(ctx, out, exp, nil)
160 untypedRes, err := re.idempotency.Once(*clientToken, func() (interface{}, error) {
161 return re.reserveUTXO(ctx, out, exp, clientToken)
163 return untypedRes.(*reservation), err
166 func (re *reserver) reserveUTXO(ctx context.Context, out bc.Hash, exp time.Time, clientToken *string) (*reservation, error) {
167 u, err := findSpecificUTXO(re.db, out)
172 if !re.checkUTXO(u) {
173 return nil, errors.New("didn't find utxo")
176 rid := atomic.AddUint64(&re.nextReservationID, 1)
177 err = re.source(u.source()).reserveUTXO(rid, u)
187 ClientToken: clientToken,
189 re.reservationsMu.Lock()
190 re.reservations[rid] = res
191 re.reservationsMu.Unlock()
195 // Cancel makes a best-effort attempt at canceling the reservation with
197 func (re *reserver) Cancel(ctx context.Context, rid uint64) error {
198 re.reservationsMu.Lock()
199 res, ok := re.reservations[rid]
200 delete(re.reservations, rid)
201 re.reservationsMu.Unlock()
203 return fmt.Errorf("couldn't find reservation %d", rid)
205 re.source(res.Source).cancel(res)
206 /*if res.ClientToken != nil {
207 re.idempotency.Forget(*res.ClientToken)
212 // ExpireReservations cleans up all reservations that have expired,
213 // making their UTXOs available for reservation again.
214 func (re *reserver) ExpireReservations(ctx context.Context) error {
215 // Remove records of any reservations that have expired.
217 var canceled []*reservation
218 re.reservationsMu.Lock()
219 for rid, res := range re.reservations {
220 if res.Expiry.Before(now) {
221 canceled = append(canceled, res)
222 delete(re.reservations, rid)
225 re.reservationsMu.Unlock()
227 // If we removed any expired reservations, update the corresponding
229 for _, res := range canceled {
230 re.source(res.Source).cancel(res)
231 /*if res.ClientToken != nil {
232 re.idempotency.Forget(*res.ClientToken)
236 // TODO(jackson): Cleanup any source reservers that don't have
237 // anything reserved. It'll be a little tricky because of our
242 func (re *reserver) checkUTXO(u *utxo) bool {
244 return s.Tree.Contains(u.OutputID.Bytes())
247 func (re *reserver) source(src source) *sourceReserver {
249 defer re.sourcesMu.Unlock()
251 sr, ok := re.sources[src]
256 sr = &sourceReserver{
259 validFn: re.checkUTXO,
260 cached: make(map[bc.Hash]*utxo),
261 reserved: make(map[bc.Hash]uint64),
267 type sourceReserver struct {
270 validFn func(u *utxo) bool
272 cached map[bc.Hash]*utxo
273 reserved map[bc.Hash]uint64
276 func (sr *sourceReserver) reserve(rid uint64, amount uint64) ([]*utxo, uint64, error) {
277 reservedUTXOs, reservedAmount, err := sr.reserveFromCache(rid, amount)
279 return reservedUTXOs, reservedAmount, nil
282 // Find the set of UTXOs that match this source.
283 err = sr.refillCache()
288 return sr.reserveFromCache(rid, amount)
291 func (sr *sourceReserver) reserveFromCache(rid uint64, amount uint64) ([]*utxo, uint64, error) {
293 reserved, unavailable uint64
294 reservedUTXOs []*utxo
299 for o, u := range sr.cached {
300 // If the UTXO is already reserved, skip it.
301 if _, ok := sr.reserved[u.OutputID]; ok {
302 unavailable += u.Amount
305 // Cached utxos aren't guaranteed to still be valid; they may
306 // have been spent. Verify that that the outputs are still in
314 reservedUTXOs = append(reservedUTXOs, u)
315 if reserved >= amount {
319 if reserved+unavailable < amount {
320 // Even if everything was available, this account wouldn't have
321 // enough to satisfy the request.
322 return nil, 0, ErrInsufficient
324 if reserved < amount {
325 // The account has enough for the request, but some is tied up in
326 // other reservations.
327 return nil, 0, ErrReserved
330 // We've found enough to satisfy the request.
331 for _, u := range reservedUTXOs {
332 sr.reserved[u.OutputID] = rid
335 return reservedUTXOs, reserved, nil
338 func (sr *sourceReserver) reserveUTXO(rid uint64, utxo *utxo) error {
342 _, isReserved := sr.reserved[utxo.OutputID]
347 sr.reserved[utxo.OutputID] = rid
351 func (sr *sourceReserver) cancel(res *reservation) {
354 for _, utxo := range res.UTXOs {
355 delete(sr.reserved, utxo.OutputID)
359 func (sr *sourceReserver) refillCache() error {
361 utxos, err := findMatchingUTXOs(sr.db, sr.src)
363 return errors.Wrap(err)
367 for _, u := range utxos {
368 sr.cached[u.OutputID] = u
375 func findMatchingUTXOs(db dbm.DB, src source) ([]*utxo, error) {
385 accountUTXOIter := db.IteratorPrefix([]byte(UTXOPreFix))
386 defer accountUTXOIter.Release()
387 for accountUTXOIter.Next() {
389 if err := json.Unmarshal(accountUTXOIter.Value(), &accountUTXO); err != nil {
390 return nil, errors.Wrap(err)
393 if (accountUTXO.AccountID == src.AccountID) && (bytes.Equal(accountUTXO.AssetID, src.AssetID.Bytes())) {
394 copy(rawOutputID[:], accountUTXO.OutputID)
395 copy(rawSourceID[:], accountUTXO.SourceID)
396 copy(rawRefData[:], accountUTXO.RefData)
398 utxos = append(utxos, &utxo{
399 OutputID: bc.NewHash(rawOutputID),
400 SourceID: bc.NewHash(rawSourceID),
401 AssetID: src.AssetID,
402 Amount: accountUTXO.Amount,
403 SourcePos: accountUTXO.SourcePos,
404 ControlProgram: accountUTXO.Program,
405 RefDataHash: bc.NewHash(rawRefData),
406 AccountID: src.AccountID,
407 ControlProgramIndex: accountUTXO.ProgramIndex,
415 log.WithFields(log.Fields{"AccountID": src.AccountID, "AssetID": src.AssetID.String()}).Error("can't match utxo")
416 return nil, errors.New("can't match utxo")
422 func findSpecificUTXO(db dbm.DB, outHash bc.Hash) (*utxo, error) {
424 accountUTXO := new(UTXO)
426 //temp fix for coinbase UTXO isn't add to accountUTXO db, will be remove later
427 if outHash == *config.GenerateGenesisTx().ResultIds[0] {
428 return genesisBlockUTXO(), nil
431 // make sure accountUTXO existed in the db
432 accountUTXOValue := db.Get(UTXOKey(outHash))
433 if accountUTXOValue == nil {
434 return nil, fmt.Errorf("can't find utxo: %s", outHash.String())
436 if err := json.Unmarshal(accountUTXOValue, &accountUTXO); err != nil {
437 return nil, errors.Wrap(err)
440 rawOutputID := new([32]byte)
441 rawAssetID := new([32]byte)
442 rawSourceID := new([32]byte)
443 rawRefData := new([32]byte)
445 copy(rawOutputID[:], accountUTXO.OutputID)
446 copy(rawAssetID[:], accountUTXO.AssetID)
447 copy(rawSourceID[:], accountUTXO.SourceID)
448 copy(rawRefData[:], accountUTXO.RefData)
450 u.OutputID = bc.NewHash(*rawOutputID)
451 u.AccountID = accountUTXO.AccountID
452 u.AssetID = bc.NewAssetID(*rawAssetID)
453 u.Amount = accountUTXO.Amount
454 u.ControlProgramIndex = accountUTXO.ProgramIndex
455 u.ControlProgram = accountUTXO.Program
456 u.SourceID = bc.NewHash(*rawSourceID)
457 u.SourcePos = accountUTXO.SourcePos
458 u.RefDataHash = bc.NewHash(*rawRefData)
463 //temp fix for coinbase UTXO isn't add to accountUTXO db, will be remove later
464 func genesisBlockUTXO() *utxo {
466 tx := config.GenerateGenesisTx()
469 resOutID := tx.ResultIds[0]
470 resOut, _ := tx.Entries[*resOutID].(*bc.Output)
471 log.Infof("genesis Output:%v", resOut)
474 u.OutputID = *tx.OutputID(0)
475 u.AssetID = *out.AssetId
476 u.Amount = out.Amount
477 u.ControlProgramIndex = 0
478 u.ControlProgram = out.ControlProgram
479 u.SourceID = *resOut.Source.Ref
480 u.SourcePos = resOut.Source.Position
481 u.RefDataHash = *resOut.Data