OSDN Git Service

Hulk did something
[bytom/vapor.git] / vendor / github.com / syndtr / goleveldb / leveldb / db_write.go
diff --git a/vendor/github.com/syndtr/goleveldb/leveldb/db_write.go b/vendor/github.com/syndtr/goleveldb/leveldb/db_write.go
new file mode 100644 (file)
index 0000000..5b6cb48
--- /dev/null
@@ -0,0 +1,452 @@
+// Copyright (c) 2012, Suryandaru Triandana <syndtr@gmail.com>
+// All rights reserved.
+//
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+package leveldb
+
+import (
+       "time"
+
+       "github.com/syndtr/goleveldb/leveldb/memdb"
+       "github.com/syndtr/goleveldb/leveldb/opt"
+       "github.com/syndtr/goleveldb/leveldb/util"
+)
+
+func (db *DB) writeJournal(batches []*Batch, seq uint64, sync bool) error {
+       wr, err := db.journal.Next()
+       if err != nil {
+               return err
+       }
+       if err := writeBatchesWithHeader(wr, batches, seq); err != nil {
+               return err
+       }
+       if err := db.journal.Flush(); err != nil {
+               return err
+       }
+       if sync {
+               return db.journalWriter.Sync()
+       }
+       return nil
+}
+
+func (db *DB) rotateMem(n int, wait bool) (mem *memDB, err error) {
+       retryLimit := 3
+retry:
+       // Wait for pending memdb compaction.
+       err = db.compTriggerWait(db.mcompCmdC)
+       if err != nil {
+               return
+       }
+       retryLimit--
+
+       // Create new memdb and journal.
+       mem, err = db.newMem(n)
+       if err != nil {
+               if err == errHasFrozenMem {
+                       if retryLimit <= 0 {
+                               panic("BUG: still has frozen memdb")
+                       }
+                       goto retry
+               }
+               return
+       }
+
+       // Schedule memdb compaction.
+       if wait {
+               err = db.compTriggerWait(db.mcompCmdC)
+       } else {
+               db.compTrigger(db.mcompCmdC)
+       }
+       return
+}
+
+func (db *DB) flush(n int) (mdb *memDB, mdbFree int, err error) {
+       delayed := false
+       slowdownTrigger := db.s.o.GetWriteL0SlowdownTrigger()
+       pauseTrigger := db.s.o.GetWriteL0PauseTrigger()
+       flush := func() (retry bool) {
+               mdb = db.getEffectiveMem()
+               if mdb == nil {
+                       err = ErrClosed
+                       return false
+               }
+               defer func() {
+                       if retry {
+                               mdb.decref()
+                               mdb = nil
+                       }
+               }()
+               tLen := db.s.tLen(0)
+               mdbFree = mdb.Free()
+               switch {
+               case tLen >= slowdownTrigger && !delayed:
+                       delayed = true
+                       time.Sleep(time.Millisecond)
+               case mdbFree >= n:
+                       return false
+               case tLen >= pauseTrigger:
+                       delayed = true
+                       err = db.compTriggerWait(db.tcompCmdC)
+                       if err != nil {
+                               return false
+                       }
+               default:
+                       // Allow memdb to grow if it has no entry.
+                       if mdb.Len() == 0 {
+                               mdbFree = n
+                       } else {
+                               mdb.decref()
+                               mdb, err = db.rotateMem(n, false)
+                               if err == nil {
+                                       mdbFree = mdb.Free()
+                               } else {
+                                       mdbFree = 0
+                               }
+                       }
+                       return false
+               }
+               return true
+       }
+       start := time.Now()
+       for flush() {
+       }
+       if delayed {
+               db.writeDelay += time.Since(start)
+               db.writeDelayN++
+       } else if db.writeDelayN > 0 {
+               db.logf("db@write was delayed N·%d T·%v", db.writeDelayN, db.writeDelay)
+               db.writeDelay = 0
+               db.writeDelayN = 0
+       }
+       return
+}
+
+type writeMerge struct {
+       sync       bool
+       batch      *Batch
+       keyType    keyType
+       key, value []byte
+}
+
+func (db *DB) unlockWrite(overflow bool, merged int, err error) {
+       for i := 0; i < merged; i++ {
+               db.writeAckC <- err
+       }
+       if overflow {
+               // Pass lock to the next write (that failed to merge).
+               db.writeMergedC <- false
+       } else {
+               // Release lock.
+               <-db.writeLockC
+       }
+}
+
+// ourBatch if defined should equal with batch.
+func (db *DB) writeLocked(batch, ourBatch *Batch, merge, sync bool) error {
+       // Try to flush memdb. This method would also trying to throttle writes
+       // if it is too fast and compaction cannot catch-up.
+       mdb, mdbFree, err := db.flush(batch.internalLen)
+       if err != nil {
+               db.unlockWrite(false, 0, err)
+               return err
+       }
+       defer mdb.decref()
+
+       var (
+               overflow bool
+               merged   int
+               batches  = []*Batch{batch}
+       )
+
+       if merge {
+               // Merge limit.
+               var mergeLimit int
+               if batch.internalLen > 128<<10 {
+                       mergeLimit = (1 << 20) - batch.internalLen
+               } else {
+                       mergeLimit = 128 << 10
+               }
+               mergeCap := mdbFree - batch.internalLen
+               if mergeLimit > mergeCap {
+                       mergeLimit = mergeCap
+               }
+
+       merge:
+               for mergeLimit > 0 {
+                       select {
+                       case incoming := <-db.writeMergeC:
+                               if incoming.batch != nil {
+                                       // Merge batch.
+                                       if incoming.batch.internalLen > mergeLimit {
+                                               overflow = true
+                                               break merge
+                                       }
+                                       batches = append(batches, incoming.batch)
+                                       mergeLimit -= incoming.batch.internalLen
+                               } else {
+                                       // Merge put.
+                                       internalLen := len(incoming.key) + len(incoming.value) + 8
+                                       if internalLen > mergeLimit {
+                                               overflow = true
+                                               break merge
+                                       }
+                                       if ourBatch == nil {
+                                               ourBatch = db.batchPool.Get().(*Batch)
+                                               ourBatch.Reset()
+                                               batches = append(batches, ourBatch)
+                                       }
+                                       // We can use same batch since concurrent write doesn't
+                                       // guarantee write order.
+                                       ourBatch.appendRec(incoming.keyType, incoming.key, incoming.value)
+                                       mergeLimit -= internalLen
+                               }
+                               sync = sync || incoming.sync
+                               merged++
+                               db.writeMergedC <- true
+
+                       default:
+                               break merge
+                       }
+               }
+       }
+
+       // Seq number.
+       seq := db.seq + 1
+
+       // Write journal.
+       if err := db.writeJournal(batches, seq, sync); err != nil {
+               db.unlockWrite(overflow, merged, err)
+               return err
+       }
+
+       // Put batches.
+       for _, batch := range batches {
+               if err := batch.putMem(seq, mdb.DB); err != nil {
+                       panic(err)
+               }
+               seq += uint64(batch.Len())
+       }
+
+       // Incr seq number.
+       db.addSeq(uint64(batchesLen(batches)))
+
+       // Rotate memdb if it's reach the threshold.
+       if batch.internalLen >= mdbFree {
+               db.rotateMem(0, false)
+       }
+
+       db.unlockWrite(overflow, merged, nil)
+       return nil
+}
+
+// Write apply the given batch to the DB. The batch records will be applied
+// sequentially. Write might be used concurrently, when used concurrently and
+// batch is small enough, write will try to merge the batches. Set NoWriteMerge
+// option to true to disable write merge.
+//
+// It is safe to modify the contents of the arguments after Write returns but
+// not before. Write will not modify content of the batch.
+func (db *DB) Write(batch *Batch, wo *opt.WriteOptions) error {
+       if err := db.ok(); err != nil || batch == nil || batch.Len() == 0 {
+               return err
+       }
+
+       // If the batch size is larger than write buffer, it may justified to write
+       // using transaction instead. Using transaction the batch will be written
+       // into tables directly, skipping the journaling.
+       if batch.internalLen > db.s.o.GetWriteBuffer() && !db.s.o.GetDisableLargeBatchTransaction() {
+               tr, err := db.OpenTransaction()
+               if err != nil {
+                       return err
+               }
+               if err := tr.Write(batch, wo); err != nil {
+                       tr.Discard()
+                       return err
+               }
+               return tr.Commit()
+       }
+
+       merge := !wo.GetNoWriteMerge() && !db.s.o.GetNoWriteMerge()
+       sync := wo.GetSync() && !db.s.o.GetNoSync()
+
+       // Acquire write lock.
+       if merge {
+               select {
+               case db.writeMergeC <- writeMerge{sync: sync, batch: batch}:
+                       if <-db.writeMergedC {
+                               // Write is merged.
+                               return <-db.writeAckC
+                       }
+                       // Write is not merged, the write lock is handed to us. Continue.
+               case db.writeLockC <- struct{}{}:
+                       // Write lock acquired.
+               case err := <-db.compPerErrC:
+                       // Compaction error.
+                       return err
+               case <-db.closeC:
+                       // Closed
+                       return ErrClosed
+               }
+       } else {
+               select {
+               case db.writeLockC <- struct{}{}:
+                       // Write lock acquired.
+               case err := <-db.compPerErrC:
+                       // Compaction error.
+                       return err
+               case <-db.closeC:
+                       // Closed
+                       return ErrClosed
+               }
+       }
+
+       return db.writeLocked(batch, nil, merge, sync)
+}
+
+func (db *DB) putRec(kt keyType, key, value []byte, wo *opt.WriteOptions) error {
+       if err := db.ok(); err != nil {
+               return err
+       }
+
+       merge := !wo.GetNoWriteMerge() && !db.s.o.GetNoWriteMerge()
+       sync := wo.GetSync() && !db.s.o.GetNoSync()
+
+       // Acquire write lock.
+       if merge {
+               select {
+               case db.writeMergeC <- writeMerge{sync: sync, keyType: kt, key: key, value: value}:
+                       if <-db.writeMergedC {
+                               // Write is merged.
+                               return <-db.writeAckC
+                       }
+                       // Write is not merged, the write lock is handed to us. Continue.
+               case db.writeLockC <- struct{}{}:
+                       // Write lock acquired.
+               case err := <-db.compPerErrC:
+                       // Compaction error.
+                       return err
+               case <-db.closeC:
+                       // Closed
+                       return ErrClosed
+               }
+       } else {
+               select {
+               case db.writeLockC <- struct{}{}:
+                       // Write lock acquired.
+               case err := <-db.compPerErrC:
+                       // Compaction error.
+                       return err
+               case <-db.closeC:
+                       // Closed
+                       return ErrClosed
+               }
+       }
+
+       batch := db.batchPool.Get().(*Batch)
+       batch.Reset()
+       batch.appendRec(kt, key, value)
+       return db.writeLocked(batch, batch, merge, sync)
+}
+
+// Put sets the value for the given key. It overwrites any previous value
+// for that key; a DB is not a multi-map. Write merge also applies for Put, see
+// Write.
+//
+// It is safe to modify the contents of the arguments after Put returns but not
+// before.
+func (db *DB) Put(key, value []byte, wo *opt.WriteOptions) error {
+       return db.putRec(keyTypeVal, key, value, wo)
+}
+
+// Delete deletes the value for the given key. Delete will not returns error if
+// key doesn't exist. Write merge also applies for Delete, see Write.
+//
+// It is safe to modify the contents of the arguments after Delete returns but
+// not before.
+func (db *DB) Delete(key []byte, wo *opt.WriteOptions) error {
+       return db.putRec(keyTypeDel, key, nil, wo)
+}
+
+func isMemOverlaps(icmp *iComparer, mem *memdb.DB, min, max []byte) bool {
+       iter := mem.NewIterator(nil)
+       defer iter.Release()
+       return (max == nil || (iter.First() && icmp.uCompare(max, internalKey(iter.Key()).ukey()) >= 0)) &&
+               (min == nil || (iter.Last() && icmp.uCompare(min, internalKey(iter.Key()).ukey()) <= 0))
+}
+
+// CompactRange compacts the underlying DB for the given key range.
+// In particular, deleted and overwritten versions are discarded,
+// and the data is rearranged to reduce the cost of operations
+// needed to access the data. This operation should typically only
+// be invoked by users who understand the underlying implementation.
+//
+// A nil Range.Start is treated as a key before all keys in the DB.
+// And a nil Range.Limit is treated as a key after all keys in the DB.
+// Therefore if both is nil then it will compact entire DB.
+func (db *DB) CompactRange(r util.Range) error {
+       if err := db.ok(); err != nil {
+               return err
+       }
+
+       // Lock writer.
+       select {
+       case db.writeLockC <- struct{}{}:
+       case err := <-db.compPerErrC:
+               return err
+       case <-db.closeC:
+               return ErrClosed
+       }
+
+       // Check for overlaps in memdb.
+       mdb := db.getEffectiveMem()
+       if mdb == nil {
+               return ErrClosed
+       }
+       defer mdb.decref()
+       if isMemOverlaps(db.s.icmp, mdb.DB, r.Start, r.Limit) {
+               // Memdb compaction.
+               if _, err := db.rotateMem(0, false); err != nil {
+                       <-db.writeLockC
+                       return err
+               }
+               <-db.writeLockC
+               if err := db.compTriggerWait(db.mcompCmdC); err != nil {
+                       return err
+               }
+       } else {
+               <-db.writeLockC
+       }
+
+       // Table compaction.
+       return db.compTriggerRange(db.tcompCmdC, -1, r.Start, r.Limit)
+}
+
+// SetReadOnly makes DB read-only. It will stay read-only until reopened.
+func (db *DB) SetReadOnly() error {
+       if err := db.ok(); err != nil {
+               return err
+       }
+
+       // Lock writer.
+       select {
+       case db.writeLockC <- struct{}{}:
+               db.compWriteLocking = true
+       case err := <-db.compPerErrC:
+               return err
+       case <-db.closeC:
+               return ErrClosed
+       }
+
+       // Set compaction read-only.
+       select {
+       case db.compErrSetC <- ErrReadOnly:
+       case perr := <-db.compPerErrC:
+               return perr
+       case <-db.closeC:
+               return ErrClosed
+       }
+
+       return nil
+}