OSDN Git Service

new repo
[bytom/vapor.git] / vendor / github.com / syndtr / goleveldb / leveldb / db_transaction.go
1 // Copyright (c) 2016, Suryandaru Triandana <syndtr@gmail.com>
2 // All rights reserved.
3 //
4 // Use of this source code is governed by a BSD-style license that can be
5 // found in the LICENSE file.
6
7 package leveldb
8
9 import (
10         "errors"
11         "sync"
12         "time"
13
14         "github.com/syndtr/goleveldb/leveldb/iterator"
15         "github.com/syndtr/goleveldb/leveldb/opt"
16         "github.com/syndtr/goleveldb/leveldb/util"
17 )
18
19 var errTransactionDone = errors.New("leveldb: transaction already closed")
20
21 // Transaction is the transaction handle.
22 type Transaction struct {
23         db        *DB
24         lk        sync.RWMutex
25         seq       uint64
26         mem       *memDB
27         tables    tFiles
28         ikScratch []byte
29         rec       sessionRecord
30         stats     cStatStaging
31         closed    bool
32 }
33
34 // Get gets the value for the given key. It returns ErrNotFound if the
35 // DB does not contains the key.
36 //
37 // The returned slice is its own copy, it is safe to modify the contents
38 // of the returned slice.
39 // It is safe to modify the contents of the argument after Get returns.
40 func (tr *Transaction) Get(key []byte, ro *opt.ReadOptions) ([]byte, error) {
41         tr.lk.RLock()
42         defer tr.lk.RUnlock()
43         if tr.closed {
44                 return nil, errTransactionDone
45         }
46         return tr.db.get(tr.mem.DB, tr.tables, key, tr.seq, ro)
47 }
48
49 // Has returns true if the DB does contains the given key.
50 //
51 // It is safe to modify the contents of the argument after Has returns.
52 func (tr *Transaction) Has(key []byte, ro *opt.ReadOptions) (bool, error) {
53         tr.lk.RLock()
54         defer tr.lk.RUnlock()
55         if tr.closed {
56                 return false, errTransactionDone
57         }
58         return tr.db.has(tr.mem.DB, tr.tables, key, tr.seq, ro)
59 }
60
61 // NewIterator returns an iterator for the latest snapshot of the transaction.
62 // The returned iterator is not safe for concurrent use, but it is safe to use
63 // multiple iterators concurrently, with each in a dedicated goroutine.
64 // It is also safe to use an iterator concurrently while writes to the
65 // transaction. The resultant key/value pairs are guaranteed to be consistent.
66 //
67 // Slice allows slicing the iterator to only contains keys in the given
68 // range. A nil Range.Start is treated as a key before all keys in the
69 // DB. And a nil Range.Limit is treated as a key after all keys in
70 // the DB.
71 //
72 // The iterator must be released after use, by calling Release method.
73 //
74 // Also read Iterator documentation of the leveldb/iterator package.
75 func (tr *Transaction) NewIterator(slice *util.Range, ro *opt.ReadOptions) iterator.Iterator {
76         tr.lk.RLock()
77         defer tr.lk.RUnlock()
78         if tr.closed {
79                 return iterator.NewEmptyIterator(errTransactionDone)
80         }
81         tr.mem.incref()
82         return tr.db.newIterator(tr.mem, tr.tables, tr.seq, slice, ro)
83 }
84
85 func (tr *Transaction) flush() error {
86         // Flush memdb.
87         if tr.mem.Len() != 0 {
88                 tr.stats.startTimer()
89                 iter := tr.mem.NewIterator(nil)
90                 t, n, err := tr.db.s.tops.createFrom(iter)
91                 iter.Release()
92                 tr.stats.stopTimer()
93                 if err != nil {
94                         return err
95                 }
96                 if tr.mem.getref() == 1 {
97                         tr.mem.Reset()
98                 } else {
99                         tr.mem.decref()
100                         tr.mem = tr.db.mpoolGet(0)
101                         tr.mem.incref()
102                 }
103                 tr.tables = append(tr.tables, t)
104                 tr.rec.addTableFile(0, t)
105                 tr.stats.write += t.size
106                 tr.db.logf("transaction@flush created L0@%d N·%d S·%s %q:%q", t.fd.Num, n, shortenb(int(t.size)), t.imin, t.imax)
107         }
108         return nil
109 }
110
111 func (tr *Transaction) put(kt keyType, key, value []byte) error {
112         tr.ikScratch = makeInternalKey(tr.ikScratch, key, tr.seq+1, kt)
113         if tr.mem.Free() < len(tr.ikScratch)+len(value) {
114                 if err := tr.flush(); err != nil {
115                         return err
116                 }
117         }
118         if err := tr.mem.Put(tr.ikScratch, value); err != nil {
119                 return err
120         }
121         tr.seq++
122         return nil
123 }
124
125 // Put sets the value for the given key. It overwrites any previous value
126 // for that key; a DB is not a multi-map.
127 // Please note that the transaction is not compacted until committed, so if you
128 // writes 10 same keys, then those 10 same keys are in the transaction.
129 //
130 // It is safe to modify the contents of the arguments after Put returns.
131 func (tr *Transaction) Put(key, value []byte, wo *opt.WriteOptions) error {
132         tr.lk.Lock()
133         defer tr.lk.Unlock()
134         if tr.closed {
135                 return errTransactionDone
136         }
137         return tr.put(keyTypeVal, key, value)
138 }
139
140 // Delete deletes the value for the given key.
141 // Please note that the transaction is not compacted until committed, so if you
142 // writes 10 same keys, then those 10 same keys are in the transaction.
143 //
144 // It is safe to modify the contents of the arguments after Delete returns.
145 func (tr *Transaction) Delete(key []byte, wo *opt.WriteOptions) error {
146         tr.lk.Lock()
147         defer tr.lk.Unlock()
148         if tr.closed {
149                 return errTransactionDone
150         }
151         return tr.put(keyTypeDel, key, nil)
152 }
153
154 // Write apply the given batch to the transaction. The batch will be applied
155 // sequentially.
156 // Please note that the transaction is not compacted until committed, so if you
157 // writes 10 same keys, then those 10 same keys are in the transaction.
158 //
159 // It is safe to modify the contents of the arguments after Write returns.
160 func (tr *Transaction) Write(b *Batch, wo *opt.WriteOptions) error {
161         if b == nil || b.Len() == 0 {
162                 return nil
163         }
164
165         tr.lk.Lock()
166         defer tr.lk.Unlock()
167         if tr.closed {
168                 return errTransactionDone
169         }
170         return b.replayInternal(func(i int, kt keyType, k, v []byte) error {
171                 return tr.put(kt, k, v)
172         })
173 }
174
175 func (tr *Transaction) setDone() {
176         tr.closed = true
177         tr.db.tr = nil
178         tr.mem.decref()
179         <-tr.db.writeLockC
180 }
181
182 // Commit commits the transaction. If error is not nil, then the transaction is
183 // not committed, it can then either be retried or discarded.
184 //
185 // Other methods should not be called after transaction has been committed.
186 func (tr *Transaction) Commit() error {
187         if err := tr.db.ok(); err != nil {
188                 return err
189         }
190
191         tr.lk.Lock()
192         defer tr.lk.Unlock()
193         if tr.closed {
194                 return errTransactionDone
195         }
196         if err := tr.flush(); err != nil {
197                 // Return error, lets user decide either to retry or discard
198                 // transaction.
199                 return err
200         }
201         if len(tr.tables) != 0 {
202                 // Committing transaction.
203                 tr.rec.setSeqNum(tr.seq)
204                 tr.db.compCommitLk.Lock()
205                 tr.stats.startTimer()
206                 var cerr error
207                 for retry := 0; retry < 3; retry++ {
208                         cerr = tr.db.s.commit(&tr.rec)
209                         if cerr != nil {
210                                 tr.db.logf("transaction@commit error R·%d %q", retry, cerr)
211                                 select {
212                                 case <-time.After(time.Second):
213                                 case <-tr.db.closeC:
214                                         tr.db.logf("transaction@commit exiting")
215                                         tr.db.compCommitLk.Unlock()
216                                         return cerr
217                                 }
218                         } else {
219                                 // Success. Set db.seq.
220                                 tr.db.setSeq(tr.seq)
221                                 break
222                         }
223                 }
224                 tr.stats.stopTimer()
225                 if cerr != nil {
226                         // Return error, lets user decide either to retry or discard
227                         // transaction.
228                         return cerr
229                 }
230
231                 // Update compaction stats. This is safe as long as we hold compCommitLk.
232                 tr.db.compStats.addStat(0, &tr.stats)
233
234                 // Trigger table auto-compaction.
235                 tr.db.compTrigger(tr.db.tcompCmdC)
236                 tr.db.compCommitLk.Unlock()
237
238                 // Additionally, wait compaction when certain threshold reached.
239                 // Ignore error, returns error only if transaction can't be committed.
240                 tr.db.waitCompaction()
241         }
242         // Only mark as done if transaction committed successfully.
243         tr.setDone()
244         return nil
245 }
246
247 func (tr *Transaction) discard() {
248         // Discard transaction.
249         for _, t := range tr.tables {
250                 tr.db.logf("transaction@discard @%d", t.fd.Num)
251                 if err1 := tr.db.s.stor.Remove(t.fd); err1 == nil {
252                         tr.db.s.reuseFileNum(t.fd.Num)
253                 }
254         }
255 }
256
257 // Discard discards the transaction.
258 //
259 // Other methods should not be called after transaction has been discarded.
260 func (tr *Transaction) Discard() {
261         tr.lk.Lock()
262         if !tr.closed {
263                 tr.discard()
264                 tr.setDone()
265         }
266         tr.lk.Unlock()
267 }
268
269 func (db *DB) waitCompaction() error {
270         if db.s.tLen(0) >= db.s.o.GetWriteL0PauseTrigger() {
271                 return db.compTriggerWait(db.tcompCmdC)
272         }
273         return nil
274 }
275
276 // OpenTransaction opens an atomic DB transaction. Only one transaction can be
277 // opened at a time. Subsequent call to Write and OpenTransaction will be blocked
278 // until in-flight transaction is committed or discarded.
279 // The returned transaction handle is safe for concurrent use.
280 //
281 // Transaction is expensive and can overwhelm compaction, especially if
282 // transaction size is small. Use with caution.
283 //
284 // The transaction must be closed once done, either by committing or discarding
285 // the transaction.
286 // Closing the DB will discard open transaction.
287 func (db *DB) OpenTransaction() (*Transaction, error) {
288         if err := db.ok(); err != nil {
289                 return nil, err
290         }
291
292         // The write happen synchronously.
293         select {
294         case db.writeLockC <- struct{}{}:
295         case err := <-db.compPerErrC:
296                 return nil, err
297         case <-db.closeC:
298                 return nil, ErrClosed
299         }
300
301         if db.tr != nil {
302                 panic("leveldb: has open transaction")
303         }
304
305         // Flush current memdb.
306         if db.mem != nil && db.mem.Len() != 0 {
307                 if _, err := db.rotateMem(0, true); err != nil {
308                         return nil, err
309                 }
310         }
311
312         // Wait compaction when certain threshold reached.
313         if err := db.waitCompaction(); err != nil {
314                 return nil, err
315         }
316
317         tr := &Transaction{
318                 db:  db,
319                 seq: db.seq,
320                 mem: db.mpoolGet(0),
321         }
322         tr.mem.incref()
323         db.tr = tr
324         return tr, nil
325 }