1 // Copyright (c) 2012, Suryandaru Triandana <syndtr@gmail.com>
2 // All rights reserved.
4 // Use of this source code is governed by a BSD-style license that can be
5 // found in the LICENSE file.
12 "github.com/syndtr/goleveldb/leveldb/memdb"
13 "github.com/syndtr/goleveldb/leveldb/opt"
14 "github.com/syndtr/goleveldb/leveldb/util"
17 func (db *DB) writeJournal(batches []*Batch, seq uint64, sync bool) error {
18 wr, err := db.journal.Next()
22 if err := writeBatchesWithHeader(wr, batches, seq); err != nil {
25 if err := db.journal.Flush(); err != nil {
29 return db.journalWriter.Sync()
34 func (db *DB) rotateMem(n int, wait bool) (mem *memDB, err error) {
37 // Wait for pending memdb compaction.
38 err = db.compTriggerWait(db.mcompCmdC)
44 // Create new memdb and journal.
45 mem, err = db.newMem(n)
47 if err == errHasFrozenMem {
49 panic("BUG: still has frozen memdb")
56 // Schedule memdb compaction.
58 err = db.compTriggerWait(db.mcompCmdC)
60 db.compTrigger(db.mcompCmdC)
65 func (db *DB) flush(n int) (mdb *memDB, mdbFree int, err error) {
67 slowdownTrigger := db.s.o.GetWriteL0SlowdownTrigger()
68 pauseTrigger := db.s.o.GetWriteL0PauseTrigger()
69 flush := func() (retry bool) {
70 mdb = db.getEffectiveMem()
84 case tLen >= slowdownTrigger && !delayed:
86 time.Sleep(time.Millisecond)
89 case tLen >= pauseTrigger:
91 err = db.compTriggerWait(db.tcompCmdC)
96 // Allow memdb to grow if it has no entry.
101 mdb, err = db.rotateMem(n, false)
116 db.writeDelay += time.Since(start)
118 } else if db.writeDelayN > 0 {
119 db.logf("db@write was delayed N·%d T·%v", db.writeDelayN, db.writeDelay)
126 type writeMerge struct {
133 func (db *DB) unlockWrite(overflow bool, merged int, err error) {
134 for i := 0; i < merged; i++ {
138 // Pass lock to the next write (that failed to merge).
139 db.writeMergedC <- false
146 // ourBatch if defined should equal with batch.
147 func (db *DB) writeLocked(batch, ourBatch *Batch, merge, sync bool) error {
148 // Try to flush memdb. This method would also trying to throttle writes
149 // if it is too fast and compaction cannot catch-up.
150 mdb, mdbFree, err := db.flush(batch.internalLen)
152 db.unlockWrite(false, 0, err)
160 batches = []*Batch{batch}
166 if batch.internalLen > 128<<10 {
167 mergeLimit = (1 << 20) - batch.internalLen
169 mergeLimit = 128 << 10
171 mergeCap := mdbFree - batch.internalLen
172 if mergeLimit > mergeCap {
173 mergeLimit = mergeCap
179 case incoming := <-db.writeMergeC:
180 if incoming.batch != nil {
182 if incoming.batch.internalLen > mergeLimit {
186 batches = append(batches, incoming.batch)
187 mergeLimit -= incoming.batch.internalLen
190 internalLen := len(incoming.key) + len(incoming.value) + 8
191 if internalLen > mergeLimit {
196 ourBatch = db.batchPool.Get().(*Batch)
198 batches = append(batches, ourBatch)
200 // We can use same batch since concurrent write doesn't
201 // guarantee write order.
202 ourBatch.appendRec(incoming.keyType, incoming.key, incoming.value)
203 mergeLimit -= internalLen
205 sync = sync || incoming.sync
207 db.writeMergedC <- true
219 if err := db.writeJournal(batches, seq, sync); err != nil {
220 db.unlockWrite(overflow, merged, err)
225 for _, batch := range batches {
226 if err := batch.putMem(seq, mdb.DB); err != nil {
229 seq += uint64(batch.Len())
233 db.addSeq(uint64(batchesLen(batches)))
235 // Rotate memdb if it's reach the threshold.
236 if batch.internalLen >= mdbFree {
237 db.rotateMem(0, false)
240 db.unlockWrite(overflow, merged, nil)
244 // Write apply the given batch to the DB. The batch records will be applied
245 // sequentially. Write might be used concurrently, when used concurrently and
246 // batch is small enough, write will try to merge the batches. Set NoWriteMerge
247 // option to true to disable write merge.
249 // It is safe to modify the contents of the arguments after Write returns but
250 // not before. Write will not modify content of the batch.
251 func (db *DB) Write(batch *Batch, wo *opt.WriteOptions) error {
252 if err := db.ok(); err != nil || batch == nil || batch.Len() == 0 {
256 // If the batch size is larger than write buffer, it may justified to write
257 // using transaction instead. Using transaction the batch will be written
258 // into tables directly, skipping the journaling.
259 if batch.internalLen > db.s.o.GetWriteBuffer() && !db.s.o.GetDisableLargeBatchTransaction() {
260 tr, err := db.OpenTransaction()
264 if err := tr.Write(batch, wo); err != nil {
271 merge := !wo.GetNoWriteMerge() && !db.s.o.GetNoWriteMerge()
272 sync := wo.GetSync() && !db.s.o.GetNoSync()
274 // Acquire write lock.
277 case db.writeMergeC <- writeMerge{sync: sync, batch: batch}:
278 if <-db.writeMergedC {
280 return <-db.writeAckC
282 // Write is not merged, the write lock is handed to us. Continue.
283 case db.writeLockC <- struct{}{}:
284 // Write lock acquired.
285 case err := <-db.compPerErrC:
294 case db.writeLockC <- struct{}{}:
295 // Write lock acquired.
296 case err := <-db.compPerErrC:
305 return db.writeLocked(batch, nil, merge, sync)
308 func (db *DB) putRec(kt keyType, key, value []byte, wo *opt.WriteOptions) error {
309 if err := db.ok(); err != nil {
313 merge := !wo.GetNoWriteMerge() && !db.s.o.GetNoWriteMerge()
314 sync := wo.GetSync() && !db.s.o.GetNoSync()
316 // Acquire write lock.
319 case db.writeMergeC <- writeMerge{sync: sync, keyType: kt, key: key, value: value}:
320 if <-db.writeMergedC {
322 return <-db.writeAckC
324 // Write is not merged, the write lock is handed to us. Continue.
325 case db.writeLockC <- struct{}{}:
326 // Write lock acquired.
327 case err := <-db.compPerErrC:
336 case db.writeLockC <- struct{}{}:
337 // Write lock acquired.
338 case err := <-db.compPerErrC:
347 batch := db.batchPool.Get().(*Batch)
349 batch.appendRec(kt, key, value)
350 return db.writeLocked(batch, batch, merge, sync)
353 // Put sets the value for the given key. It overwrites any previous value
354 // for that key; a DB is not a multi-map. Write merge also applies for Put, see
357 // It is safe to modify the contents of the arguments after Put returns but not
359 func (db *DB) Put(key, value []byte, wo *opt.WriteOptions) error {
360 return db.putRec(keyTypeVal, key, value, wo)
363 // Delete deletes the value for the given key. Delete will not returns error if
364 // key doesn't exist. Write merge also applies for Delete, see Write.
366 // It is safe to modify the contents of the arguments after Delete returns but
368 func (db *DB) Delete(key []byte, wo *opt.WriteOptions) error {
369 return db.putRec(keyTypeDel, key, nil, wo)
372 func isMemOverlaps(icmp *iComparer, mem *memdb.DB, min, max []byte) bool {
373 iter := mem.NewIterator(nil)
375 return (max == nil || (iter.First() && icmp.uCompare(max, internalKey(iter.Key()).ukey()) >= 0)) &&
376 (min == nil || (iter.Last() && icmp.uCompare(min, internalKey(iter.Key()).ukey()) <= 0))
379 // CompactRange compacts the underlying DB for the given key range.
380 // In particular, deleted and overwritten versions are discarded,
381 // and the data is rearranged to reduce the cost of operations
382 // needed to access the data. This operation should typically only
383 // be invoked by users who understand the underlying implementation.
385 // A nil Range.Start is treated as a key before all keys in the DB.
386 // And a nil Range.Limit is treated as a key after all keys in the DB.
387 // Therefore if both is nil then it will compact entire DB.
388 func (db *DB) CompactRange(r util.Range) error {
389 if err := db.ok(); err != nil {
395 case db.writeLockC <- struct{}{}:
396 case err := <-db.compPerErrC:
402 // Check for overlaps in memdb.
403 mdb := db.getEffectiveMem()
408 if isMemOverlaps(db.s.icmp, mdb.DB, r.Start, r.Limit) {
410 if _, err := db.rotateMem(0, false); err != nil {
415 if err := db.compTriggerWait(db.mcompCmdC); err != nil {
423 return db.compTriggerRange(db.tcompCmdC, -1, r.Start, r.Limit)
426 // SetReadOnly makes DB read-only. It will stay read-only until reopened.
427 func (db *DB) SetReadOnly() error {
428 if err := db.ok(); err != nil {
434 case db.writeLockC <- struct{}{}:
435 db.compWriteLocking = true
436 case err := <-db.compPerErrC:
442 // Set compaction read-only.
444 case db.compErrSetC <- ErrReadOnly:
445 case perr := <-db.compPerErrC: