1 // Copyright (c) 2016, Suryandaru Triandana <syndtr@gmail.com>
2 // All rights reserved.
4 // Use of this source code is governed by a BSD-style license that can be
5 // found in the LICENSE file.
14 "github.com/syndtr/goleveldb/leveldb/iterator"
15 "github.com/syndtr/goleveldb/leveldb/opt"
16 "github.com/syndtr/goleveldb/leveldb/util"
19 var errTransactionDone = errors.New("leveldb: transaction already closed")
21 // Transaction is the transaction handle.
22 type Transaction struct {
34 // Get gets the value for the given key. It returns ErrNotFound if the
35 // DB does not contains the key.
37 // The returned slice is its own copy, it is safe to modify the contents
38 // of the returned slice.
39 // It is safe to modify the contents of the argument after Get returns.
40 func (tr *Transaction) Get(key []byte, ro *opt.ReadOptions) ([]byte, error) {
44 return nil, errTransactionDone
46 return tr.db.get(tr.mem.DB, tr.tables, key, tr.seq, ro)
49 // Has returns true if the DB does contains the given key.
51 // It is safe to modify the contents of the argument after Has returns.
52 func (tr *Transaction) Has(key []byte, ro *opt.ReadOptions) (bool, error) {
56 return false, errTransactionDone
58 return tr.db.has(tr.mem.DB, tr.tables, key, tr.seq, ro)
61 // NewIterator returns an iterator for the latest snapshot of the transaction.
62 // The returned iterator is not safe for concurrent use, but it is safe to use
63 // multiple iterators concurrently, with each in a dedicated goroutine.
64 // It is also safe to use an iterator concurrently while writes to the
65 // transaction. The resultant key/value pairs are guaranteed to be consistent.
67 // Slice allows slicing the iterator to only contains keys in the given
68 // range. A nil Range.Start is treated as a key before all keys in the
69 // DB. And a nil Range.Limit is treated as a key after all keys in
72 // The iterator must be released after use, by calling Release method.
74 // Also read Iterator documentation of the leveldb/iterator package.
75 func (tr *Transaction) NewIterator(slice *util.Range, ro *opt.ReadOptions) iterator.Iterator {
79 return iterator.NewEmptyIterator(errTransactionDone)
82 return tr.db.newIterator(tr.mem, tr.tables, tr.seq, slice, ro)
85 func (tr *Transaction) flush() error {
87 if tr.mem.Len() != 0 {
89 iter := tr.mem.NewIterator(nil)
90 t, n, err := tr.db.s.tops.createFrom(iter)
96 if tr.mem.getref() == 1 {
100 tr.mem = tr.db.mpoolGet(0)
103 tr.tables = append(tr.tables, t)
104 tr.rec.addTableFile(0, t)
105 tr.stats.write += t.size
106 tr.db.logf("transaction@flush created L0@%d N·%d S·%s %q:%q", t.fd.Num, n, shortenb(int(t.size)), t.imin, t.imax)
111 func (tr *Transaction) put(kt keyType, key, value []byte) error {
112 tr.ikScratch = makeInternalKey(tr.ikScratch, key, tr.seq+1, kt)
113 if tr.mem.Free() < len(tr.ikScratch)+len(value) {
114 if err := tr.flush(); err != nil {
118 if err := tr.mem.Put(tr.ikScratch, value); err != nil {
125 // Put sets the value for the given key. It overwrites any previous value
126 // for that key; a DB is not a multi-map.
127 // Please note that the transaction is not compacted until committed, so if you
128 // writes 10 same keys, then those 10 same keys are in the transaction.
130 // It is safe to modify the contents of the arguments after Put returns.
131 func (tr *Transaction) Put(key, value []byte, wo *opt.WriteOptions) error {
135 return errTransactionDone
137 return tr.put(keyTypeVal, key, value)
140 // Delete deletes the value for the given key.
141 // Please note that the transaction is not compacted until committed, so if you
142 // writes 10 same keys, then those 10 same keys are in the transaction.
144 // It is safe to modify the contents of the arguments after Delete returns.
145 func (tr *Transaction) Delete(key []byte, wo *opt.WriteOptions) error {
149 return errTransactionDone
151 return tr.put(keyTypeDel, key, nil)
154 // Write apply the given batch to the transaction. The batch will be applied
156 // Please note that the transaction is not compacted until committed, so if you
157 // writes 10 same keys, then those 10 same keys are in the transaction.
159 // It is safe to modify the contents of the arguments after Write returns.
160 func (tr *Transaction) Write(b *Batch, wo *opt.WriteOptions) error {
161 if b == nil || b.Len() == 0 {
168 return errTransactionDone
170 return b.replayInternal(func(i int, kt keyType, k, v []byte) error {
171 return tr.put(kt, k, v)
175 func (tr *Transaction) setDone() {
182 // Commit commits the transaction. If error is not nil, then the transaction is
183 // not committed, it can then either be retried or discarded.
185 // Other methods should not be called after transaction has been committed.
186 func (tr *Transaction) Commit() error {
187 if err := tr.db.ok(); err != nil {
194 return errTransactionDone
196 if err := tr.flush(); err != nil {
197 // Return error, lets user decide either to retry or discard
201 if len(tr.tables) != 0 {
202 // Committing transaction.
203 tr.rec.setSeqNum(tr.seq)
204 tr.db.compCommitLk.Lock()
205 tr.stats.startTimer()
207 for retry := 0; retry < 3; retry++ {
208 cerr = tr.db.s.commit(&tr.rec)
210 tr.db.logf("transaction@commit error R·%d %q", retry, cerr)
212 case <-time.After(time.Second):
214 tr.db.logf("transaction@commit exiting")
215 tr.db.compCommitLk.Unlock()
219 // Success. Set db.seq.
226 // Return error, lets user decide either to retry or discard
231 // Update compaction stats. This is safe as long as we hold compCommitLk.
232 tr.db.compStats.addStat(0, &tr.stats)
234 // Trigger table auto-compaction.
235 tr.db.compTrigger(tr.db.tcompCmdC)
236 tr.db.compCommitLk.Unlock()
238 // Additionally, wait compaction when certain threshold reached.
239 // Ignore error, returns error only if transaction can't be committed.
240 tr.db.waitCompaction()
242 // Only mark as done if transaction committed successfully.
247 func (tr *Transaction) discard() {
248 // Discard transaction.
249 for _, t := range tr.tables {
250 tr.db.logf("transaction@discard @%d", t.fd.Num)
251 if err1 := tr.db.s.stor.Remove(t.fd); err1 == nil {
252 tr.db.s.reuseFileNum(t.fd.Num)
257 // Discard discards the transaction.
259 // Other methods should not be called after transaction has been discarded.
260 func (tr *Transaction) Discard() {
269 func (db *DB) waitCompaction() error {
270 if db.s.tLen(0) >= db.s.o.GetWriteL0PauseTrigger() {
271 return db.compTriggerWait(db.tcompCmdC)
276 // OpenTransaction opens an atomic DB transaction. Only one transaction can be
277 // opened at a time. Subsequent call to Write and OpenTransaction will be blocked
278 // until in-flight transaction is committed or discarded.
279 // The returned transaction handle is safe for concurrent use.
281 // Transaction is expensive and can overwhelm compaction, especially if
282 // transaction size is small. Use with caution.
284 // The transaction must be closed once done, either by committing or discarding
286 // Closing the DB will discard open transaction.
287 func (db *DB) OpenTransaction() (*Transaction, error) {
288 if err := db.ok(); err != nil {
292 // The write happen synchronously.
294 case db.writeLockC <- struct{}{}:
295 case err := <-db.compPerErrC:
298 return nil, ErrClosed
302 panic("leveldb: has open transaction")
305 // Flush current memdb.
306 if db.mem != nil && db.mem.Len() != 0 {
307 if _, err := db.rotateMem(0, true); err != nil {
312 // Wait compaction when certain threshold reached.
313 if err := db.waitCompaction(); err != nil {