10 dbm "github.com/tendermint/tmlibs/db"
12 "github.com/bytom/errors"
13 "github.com/bytom/protocol"
14 "github.com/bytom/protocol/bc"
15 "github.com/bytom/sync/idempotency"
19 // ErrInsufficient indicates the account doesn't contain enough
20 // units of the requested asset to satisfy the reservation.
21 // New units must be deposited into the account in order to
22 // satisfy the request; change will not be sufficient.
23 ErrInsufficient = errors.New("reservation found insufficient funds")
25 // ErrReserved indicates that a reservation could not be
26 // satisfied because some of the outputs were already reserved.
27 // When those reservations are finalized into a transaction
28 // (and no other transaction spends funds from the account),
29 // new change outputs will be created
30 // in sufficient amounts to satisfy the request.
31 ErrReserved = errors.New("reservation found outputs already reserved")
32 // ErrMatchUTXO indicates the account doesn't contain enough utxo to satisfy the reservation.
33 ErrMatchUTXO = errors.New("can't match enough valid utxos")
34 // ErrReservation indicates the reserver doesn't found the reservation with the provided ID.
35 ErrReservation = errors.New("couldn't find reservation")
38 // UTXO describes an individual account utxo.
43 // Avoiding AssetAmount here so that new(utxo) doesn't produce an
44 // AssetAmount with a nil AssetId.
53 ControlProgramIndex uint64
58 func (u *UTXO) source() source {
59 return source{AssetID: u.AssetID, AccountID: u.AccountID}
62 // source describes the criteria to use when selecting UTXOs.
68 // reservation describes a reservation of a set of UTXOs belonging
69 // to a particular account. Reservations are immutable.
70 type reservation struct {
79 func newReserver(c *protocol.Chain, walletdb dbm.DB) *reserver {
83 reservations: make(map[uint64]*reservation),
84 sources: make(map[source]*sourceReserver),
88 // reserver implements a utxo reserver that stores reservations
89 // in-memory. It relies on the account_utxos table for the source of
90 // truth of valid UTXOs but tracks which of those UTXOs are reserved
93 // To reduce latency and prevent deadlock, no two mutexes (either on
94 // reserver or sourceReserver) should be held at the same time
96 // reserver ensures idempotency of reservations until the reservation
98 type reserver struct {
101 nextReservationID uint64
102 idempotency idempotency.Group
104 reservationsMu sync.Mutex
105 reservations map[uint64]*reservation
108 sources map[source]*sourceReserver
111 // Reserve selects and reserves UTXOs according to the criteria provided
112 // in source. The resulting reservation expires at exp.
113 func (re *reserver) Reserve(src source, amount uint64, clientToken *string, exp time.Time) (*reservation, error) {
115 if clientToken == nil {
116 return re.reserve(src, amount, clientToken, exp)
119 untypedRes, err := re.idempotency.Once(*clientToken, func() (interface{}, error) {
120 return re.reserve(src, amount, clientToken, exp)
122 return untypedRes.(*reservation), err
125 func (re *reserver) reserve(src source, amount uint64, clientToken *string, exp time.Time) (res *reservation, err error) {
126 sourceReserver := re.source(src)
128 // Try to reserve the right amount.
129 rid := atomic.AddUint64(&re.nextReservationID, 1)
130 reserved, total, isImmature, err := sourceReserver.reserve(rid, amount)
133 return nil, errors.WithDetail(err, "some coinbase utxos are immature")
143 ClientToken: clientToken,
146 // Save the successful reservation.
147 re.reservationsMu.Lock()
148 defer re.reservationsMu.Unlock()
149 re.reservations[rid] = res
151 // Make change if necessary
153 res.Change = total - amount
158 // ReserveUTXO reserves a specific utxo for spending. The resulting
159 // reservation expires at exp.
160 func (re *reserver) ReserveUTXO(ctx context.Context, out bc.Hash, clientToken *string, exp time.Time) (*reservation, error) {
161 if clientToken == nil {
162 return re.reserveUTXO(ctx, out, exp, nil)
165 untypedRes, err := re.idempotency.Once(*clientToken, func() (interface{}, error) {
166 return re.reserveUTXO(ctx, out, exp, clientToken)
168 return untypedRes.(*reservation), err
171 func (re *reserver) reserveUTXO(ctx context.Context, out bc.Hash, exp time.Time, clientToken *string) (*reservation, error) {
172 u, err := findSpecificUTXO(re.db, out)
177 //u.ValidHeight > 0 means coinbase utxo
178 if u.ValidHeight > 0 && u.ValidHeight > re.c.BestBlockHeight() {
179 return nil, errors.WithDetail(ErrMatchUTXO, "this coinbase utxo is immature")
182 rid := atomic.AddUint64(&re.nextReservationID, 1)
183 err = re.source(u.source()).reserveUTXO(rid, u)
193 ClientToken: clientToken,
195 re.reservationsMu.Lock()
196 re.reservations[rid] = res
197 re.reservationsMu.Unlock()
201 // Cancel makes a best-effort attempt at canceling the reservation with
203 func (re *reserver) Cancel(ctx context.Context, rid uint64) error {
204 re.reservationsMu.Lock()
205 res, ok := re.reservations[rid]
206 delete(re.reservations, rid)
207 re.reservationsMu.Unlock()
209 return errors.Wrapf(ErrReservation, "rid=%d", rid)
211 re.source(res.Source).cancel(res)
212 /*if res.ClientToken != nil {
213 re.idempotency.Forget(*res.ClientToken)
218 // ExpireReservations cleans up all reservations that have expired,
219 // making their UTXOs available for reservation again.
220 func (re *reserver) ExpireReservations(ctx context.Context) error {
221 // Remove records of any reservations that have expired.
223 var canceled []*reservation
224 re.reservationsMu.Lock()
225 for rid, res := range re.reservations {
226 if res.Expiry.Before(now) {
227 canceled = append(canceled, res)
228 delete(re.reservations, rid)
231 re.reservationsMu.Unlock()
233 // If we removed any expired reservations, update the corresponding
235 for _, res := range canceled {
236 re.source(res.Source).cancel(res)
237 /*if res.ClientToken != nil {
238 re.idempotency.Forget(*res.ClientToken)
242 // TODO(jackson): Cleanup any source reservers that don't have
243 // anything reserved. It'll be a little tricky because of our
248 func (re *reserver) source(src source) *sourceReserver {
250 defer re.sourcesMu.Unlock()
252 sr, ok := re.sources[src]
257 sr = &sourceReserver{
260 reserved: make(map[bc.Hash]uint64),
261 currentHeight: re.c.BestBlockHeight,
267 type sourceReserver struct {
270 currentHeight func() uint64
272 reserved map[bc.Hash]uint64
275 func (sr *sourceReserver) reserve(rid uint64, amount uint64) ([]*UTXO, uint64, bool, error) {
277 reserved, unavailable uint64
278 reservedUTXOs []*UTXO
281 utxos, isImmature, err := findMatchingUTXOs(sr.db, sr.src, sr.currentHeight)
283 return nil, 0, isImmature, errors.Wrap(err)
288 for _, u := range utxos {
289 // If the UTXO is already reserved, skip it.
290 if _, ok := sr.reserved[u.OutputID]; ok {
291 unavailable += u.Amount
296 reservedUTXOs = append(reservedUTXOs, u)
297 if reserved >= amount {
301 if reserved+unavailable < amount {
302 // Even if everything was available, this account wouldn't have
303 // enough to satisfy the request.
304 return nil, 0, isImmature, ErrInsufficient
306 if reserved < amount {
307 // The account has enough for the request, but some is tied up in
308 // other reservations.
309 return nil, 0, isImmature, ErrReserved
312 // We've found enough to satisfy the request.
313 for _, u := range reservedUTXOs {
314 sr.reserved[u.OutputID] = rid
317 return reservedUTXOs, reserved, isImmature, nil
320 func (sr *sourceReserver) reserveUTXO(rid uint64, utxo *UTXO) error {
324 _, isReserved := sr.reserved[utxo.OutputID]
329 sr.reserved[utxo.OutputID] = rid
333 func (sr *sourceReserver) cancel(res *reservation) {
336 for _, utxo := range res.UTXOs {
337 delete(sr.reserved, utxo.OutputID)
341 func findMatchingUTXOs(db dbm.DB, src source, currentHeight func() uint64) ([]*UTXO, bool, error) {
344 utxoIter := db.IteratorPrefix([]byte(UTXOPreFix))
345 defer utxoIter.Release()
347 for utxoIter.Next() {
349 if err := json.Unmarshal(utxoIter.Value(), u); err != nil {
350 return nil, false, errors.Wrap(err)
353 //u.ValidHeight > 0 means coinbase utxo
354 if u.ValidHeight > 0 && u.ValidHeight > currentHeight() {
359 if u.AccountID == src.AccountID && u.AssetID == src.AssetID {
360 utxos = append(utxos, u)
365 return nil, isImmature, ErrMatchUTXO
367 return utxos, isImmature, nil
370 func findSpecificUTXO(db dbm.DB, outHash bc.Hash) (*UTXO, error) {
373 data := db.Get(StandardUTXOKey(outHash))
375 if data = db.Get(ContractUTXOKey(outHash)); data == nil {
376 return nil, errors.Wrapf(ErrMatchUTXO, "output_id = %s", outHash.String())
379 return u, json.Unmarshal(data, u)