1 // Copyright (c) 2012, Suryandaru Triandana <syndtr@gmail.com>
2 // All rights reserved.
4 // Use of this source code is governed by a BSD-style license that can be
5 // found in the LICENSE file.
20 "github.com/syndtr/goleveldb/leveldb/errors"
21 "github.com/syndtr/goleveldb/leveldb/iterator"
22 "github.com/syndtr/goleveldb/leveldb/journal"
23 "github.com/syndtr/goleveldb/leveldb/memdb"
24 "github.com/syndtr/goleveldb/leveldb/opt"
25 "github.com/syndtr/goleveldb/leveldb/storage"
26 "github.com/syndtr/goleveldb/leveldb/table"
27 "github.com/syndtr/goleveldb/leveldb/util"
30 // DB is a LevelDB database.
32 // Need 64-bit alignment.
40 memPool chan *memdb.DB
42 journal *journal.Writer
43 journalWriter storage.Writer
44 journalFd storage.FileDesc
45 frozenJournalFd storage.FileDesc
53 aliveSnaps, aliveIters int32
57 writeMergeC chan writeMerge
58 writeMergedC chan bool
59 writeLockC chan struct{}
61 writeDelay time.Duration
66 compCommitLk sync.Mutex
68 tcompPauseC chan chan<- struct{}
71 compPerErrC chan error
72 compErrSetC chan error
75 memdbMaxLevel int // For testing.
84 func openDB(s *session) (*DB, error) {
85 s.log("db@open opening")
92 memPool: make(chan *memdb.DB, 1),
94 snapsList: list.New(),
96 batchPool: sync.Pool{New: newBatch},
97 writeMergeC: make(chan writeMerge),
98 writeMergedC: make(chan bool),
99 writeLockC: make(chan struct{}, 1),
100 writeAckC: make(chan error),
102 tcompCmdC: make(chan cCmd),
103 tcompPauseC: make(chan chan<- struct{}),
104 mcompCmdC: make(chan cCmd),
105 compErrC: make(chan error),
106 compPerErrC: make(chan error),
107 compErrSetC: make(chan error),
109 closeC: make(chan struct{}),
113 readOnly := s.o.GetReadOnly()
116 // Recover journals (read-only mode).
117 if err := db.recoverJournalRO(); err != nil {
122 if err := db.recoverJournal(); err != nil {
126 // Remove any obsolete files.
127 if err := db.checkAndCleanFiles(); err != nil {
129 if db.journal != nil {
131 db.journalWriter.Close()
138 // Doesn't need to be included in the wait group.
139 go db.compactionError()
151 s.logf("db@open done T·%v", time.Since(start))
153 runtime.SetFinalizer(db, (*DB).Close)
157 // Open opens or creates a DB for the given storage.
158 // The DB will be created if not exist, unless ErrorIfMissing is true.
159 // Also, if ErrorIfExist is true and the DB exist Open will returns
160 // os.ErrExist error.
162 // Open will return an error with type of ErrCorrupted if corruption
163 // detected in the DB. Use errors.IsCorrupted to test whether an error is
164 // due to corruption. Corrupted DB can be recovered with Recover function.
166 // The returned DB instance is safe for concurrent use.
167 // The DB must be closed after use, by calling Close method.
168 func Open(stor storage.Storage, o *opt.Options) (db *DB, err error) {
169 s, err := newSession(stor, o)
182 if !os.IsNotExist(err) || s.o.GetErrorIfMissing() {
189 } else if s.o.GetErrorIfExist() {
197 // OpenFile opens or creates a DB for the given path.
198 // The DB will be created if not exist, unless ErrorIfMissing is true.
199 // Also, if ErrorIfExist is true and the DB exist OpenFile will returns
200 // os.ErrExist error.
202 // OpenFile uses standard file-system backed storage implementation as
203 // described in the leveldb/storage package.
205 // OpenFile will return an error with type of ErrCorrupted if corruption
206 // detected in the DB. Use errors.IsCorrupted to test whether an error is
207 // due to corruption. Corrupted DB can be recovered with Recover function.
209 // The returned DB instance is safe for concurrent use.
210 // The DB must be closed after use, by calling Close method.
211 func OpenFile(path string, o *opt.Options) (db *DB, err error) {
212 stor, err := storage.OpenFile(path, o.GetReadOnly())
216 db, err = Open(stor, o)
225 // Recover recovers and opens a DB with missing or corrupted manifest files
226 // for the given storage. It will ignore any manifest files, valid or not.
227 // The DB must already exist or it will returns an error.
228 // Also, Recover will ignore ErrorIfMissing and ErrorIfExist options.
230 // The returned DB instance is safe for concurrent use.
231 // The DB must be closed after use, by calling Close method.
232 func Recover(stor storage.Storage, o *opt.Options) (db *DB, err error) {
233 s, err := newSession(stor, o)
244 err = recoverTable(s, o)
251 // RecoverFile recovers and opens a DB with missing or corrupted manifest files
252 // for the given path. It will ignore any manifest files, valid or not.
253 // The DB must already exist or it will returns an error.
254 // Also, Recover will ignore ErrorIfMissing and ErrorIfExist options.
256 // RecoverFile uses standard file-system backed storage implementation as described
257 // in the leveldb/storage package.
259 // The returned DB instance is safe for concurrent use.
260 // The DB must be closed after use, by calling Close method.
261 func RecoverFile(path string, o *opt.Options) (db *DB, err error) {
262 stor, err := storage.OpenFile(path, false)
266 db, err = Recover(stor, o)
275 func recoverTable(s *session, o *opt.Options) error {
277 // Mask StrictReader, lets StrictRecovery doing its job.
278 o.Strict &= ^opt.StrictReader
280 // Get all tables and sort it by file number.
281 fds, err := s.stor.List(storage.TypeTable)
289 recoveredKey, goodKey, corruptedKey, corruptedBlock, droppedTable int
291 // We will drop corrupted table.
292 strict = o.GetStrict(opt.StrictRecovery)
293 noSync = o.GetNoSync()
295 rec = &sessionRecord{}
296 bpool = util.NewBufferPool(o.GetBlockSize() + 5)
298 buildTable := func(iter iterator.Iterator) (tmpFd storage.FileDesc, size int64, err error) {
300 writer, err := s.stor.Create(tmpFd)
308 tmpFd = storage.FileDesc{}
313 tw := table.NewWriter(writer, o)
316 if validInternalKey(key) {
317 err = tw.Append(key, iter.Value())
337 size = int64(tw.BytesLen())
340 recoverTable := func(fd storage.FileDesc) error {
341 s.logf("table@recovery recovering @%d", fd.Num)
342 reader, err := s.stor.Open(fd)
354 size, err := reader.Seek(0, 2)
361 tgoodKey, tcorruptedKey, tcorruptedBlock int
364 tr, err := table.NewReader(reader, size, fd, nil, bpool, o)
368 iter := tr.NewIterator(nil, nil)
369 if itererr, ok := iter.(iterator.ErrorCallbackSetter); ok {
370 itererr.SetErrorCallback(func(err error) {
371 if errors.IsCorrupted(err) {
372 s.logf("table@recovery block corruption @%d %q", fd.Num, err)
381 _, seq, _, kerr := parseInternalKey(key)
391 imin = append([]byte{}, key...)
393 imax = append(imax[:0], key...)
395 if err := iter.Error(); err != nil {
402 corruptedKey += tcorruptedKey
403 corruptedBlock += tcorruptedBlock
405 if strict && (tcorruptedKey > 0 || tcorruptedBlock > 0) {
407 s.logf("table@recovery dropped @%d Gk·%d Ck·%d Cb·%d S·%d Q·%d", fd.Num, tgoodKey, tcorruptedKey, tcorruptedBlock, size, tSeq)
412 if tcorruptedKey > 0 || tcorruptedBlock > 0 {
413 // Rebuild the table.
414 s.logf("table@recovery rebuilding @%d", fd.Num)
415 iter := tr.NewIterator(nil, nil)
416 tmpFd, newSize, err := buildTable(iter)
423 if err := s.stor.Rename(tmpFd, fd); err != nil {
431 recoveredKey += tgoodKey
432 // Add table to level 0.
433 rec.addTable(0, fd.Num, size, imin, imax)
434 s.logf("table@recovery recovered @%d Gk·%d Ck·%d Cb·%d S·%d Q·%d", fd.Num, tgoodKey, tcorruptedKey, tcorruptedBlock, size, tSeq)
437 s.logf("table@recovery unrecoverable @%d Ck·%d Cb·%d S·%d", fd.Num, tcorruptedKey, tcorruptedBlock, size)
443 // Recover all tables.
445 s.logf("table@recovery F·%d", len(fds))
447 // Mark file number as used.
448 s.markFileNum(fds[len(fds)-1].Num)
450 for _, fd := range fds {
451 if err := recoverTable(fd); err != nil {
456 s.logf("table@recovery recovered F·%d N·%d Gk·%d Ck·%d Q·%d", len(fds), recoveredKey, goodKey, corruptedKey, maxSeq)
459 // Set sequence number.
460 rec.setSeqNum(maxSeq)
462 // Create new manifest.
463 if err := s.create(); err != nil {
471 func (db *DB) recoverJournal() error {
472 // Get all journals and sort it by file number.
473 rawFds, err := db.s.stor.List(storage.TypeJournal)
479 // Journals that will be recovered.
480 var fds []storage.FileDesc
481 for _, fd := range rawFds {
482 if fd.Num >= db.s.stJournalNum || fd.Num == db.s.stPrevJournalNum {
483 fds = append(fds, fd)
488 ofd storage.FileDesc // Obsolete file.
489 rec = &sessionRecord{}
494 db.logf("journal@recovery F·%d", len(fds))
496 // Mark file number as used.
497 db.s.markFileNum(fds[len(fds)-1].Num)
501 strict = db.s.o.GetStrict(opt.StrictJournal)
502 checksum = db.s.o.GetStrict(opt.StrictJournalChecksum)
503 writeBuffer = db.s.o.GetWriteBuffer()
506 mdb = memdb.New(db.s.icmp, writeBuffer)
512 for _, fd := range fds {
513 db.logf("journal@recovery recovering @%d", fd.Num)
515 fr, err := db.s.stor.Open(fd)
520 // Create or reset journal reader instance.
522 jr = journal.NewReader(fr, dropper{db.s, fd}, strict, checksum)
524 jr.Reset(fr, dropper{db.s, fd}, strict, checksum)
527 // Flush memdb and remove obsolete journal file.
530 if _, err := db.s.flushMemdb(rec, mdb, 0); err != nil {
536 rec.setJournalNum(fd.Num)
537 rec.setSeqNum(db.seq)
538 if err := db.s.commit(rec); err != nil {
542 rec.resetAddedTables()
544 db.s.stor.Remove(ofd)
545 ofd = storage.FileDesc{}
548 // Replay journal to memdb.
558 return errors.SetFd(err, fd)
562 if _, err := buf.ReadFrom(r); err != nil {
563 if err == io.ErrUnexpectedEOF {
564 // This is error returned due to corruption, with strict == false.
569 return errors.SetFd(err, fd)
571 batchSeq, batchLen, err = decodeBatchToMem(buf.Bytes(), db.seq, mdb)
573 if !strict && errors.IsCorrupted(err) {
574 db.s.logf("journal error: %v (skipped)", err)
575 // We won't apply sequence number as it might be corrupted.
580 return errors.SetFd(err, fd)
583 // Save sequence number.
584 db.seq = batchSeq + uint64(batchLen)
586 // Flush it if large enough.
587 if mdb.Size() >= writeBuffer {
588 if _, err := db.s.flushMemdb(rec, mdb, 0); err != nil {
601 // Flush the last memdb.
603 if _, err := db.s.flushMemdb(rec, mdb, 0); err != nil {
609 // Create a new journal.
610 if _, err := db.newMem(0); err != nil {
615 rec.setJournalNum(db.journalFd.Num)
616 rec.setSeqNum(db.seq)
617 if err := db.s.commit(rec); err != nil {
618 // Close journal on error.
619 if db.journal != nil {
621 db.journalWriter.Close()
626 // Remove the last obsolete journal file.
628 db.s.stor.Remove(ofd)
634 func (db *DB) recoverJournalRO() error {
635 // Get all journals and sort it by file number.
636 rawFds, err := db.s.stor.List(storage.TypeJournal)
642 // Journals that will be recovered.
643 var fds []storage.FileDesc
644 for _, fd := range rawFds {
645 if fd.Num >= db.s.stJournalNum || fd.Num == db.s.stPrevJournalNum {
646 fds = append(fds, fd)
652 strict = db.s.o.GetStrict(opt.StrictJournal)
653 checksum = db.s.o.GetStrict(opt.StrictJournalChecksum)
654 writeBuffer = db.s.o.GetWriteBuffer()
656 mdb = memdb.New(db.s.icmp, writeBuffer)
661 db.logf("journal@recovery RO·Mode F·%d", len(fds))
670 for _, fd := range fds {
671 db.logf("journal@recovery recovering @%d", fd.Num)
673 fr, err := db.s.stor.Open(fd)
678 // Create or reset journal reader instance.
680 jr = journal.NewReader(fr, dropper{db.s, fd}, strict, checksum)
682 jr.Reset(fr, dropper{db.s, fd}, strict, checksum)
685 // Replay journal to memdb.
694 return errors.SetFd(err, fd)
698 if _, err := buf.ReadFrom(r); err != nil {
699 if err == io.ErrUnexpectedEOF {
700 // This is error returned due to corruption, with strict == false.
705 return errors.SetFd(err, fd)
707 batchSeq, batchLen, err = decodeBatchToMem(buf.Bytes(), db.seq, mdb)
709 if !strict && errors.IsCorrupted(err) {
710 db.s.logf("journal error: %v (skipped)", err)
711 // We won't apply sequence number as it might be corrupted.
716 return errors.SetFd(err, fd)
719 // Save sequence number.
720 db.seq = batchSeq + uint64(batchLen)
728 db.mem = &memDB{db: db, DB: mdb, ref: 1}
733 func memGet(mdb *memdb.DB, ikey internalKey, icmp *iComparer) (ok bool, mv []byte, err error) {
734 mk, mv, err := mdb.Find(ikey)
736 ukey, _, kt, kerr := parseInternalKey(mk)
738 // Shouldn't have had happen.
741 if icmp.uCompare(ukey, ikey.ukey()) == 0 {
742 if kt == keyTypeDel {
743 return true, nil, ErrNotFound
748 } else if err != ErrNotFound {
749 return true, nil, err
754 func (db *DB) get(auxm *memdb.DB, auxt tFiles, key []byte, seq uint64, ro *opt.ReadOptions) (value []byte, err error) {
755 ikey := makeInternalKey(nil, key, seq, keyTypeSeek)
758 if ok, mv, me := memGet(auxm, ikey, db.s.icmp); ok {
759 return append([]byte{}, mv...), me
763 em, fm := db.getMems()
764 for _, m := range [...]*memDB{em, fm} {
770 if ok, mv, me := memGet(m.DB, ikey, db.s.icmp); ok {
771 return append([]byte{}, mv...), me
776 value, cSched, err := v.get(auxt, ikey, ro, false)
779 // Trigger table compaction.
780 db.compTrigger(db.tcompCmdC)
785 func nilIfNotFound(err error) error {
786 if err == ErrNotFound {
792 func (db *DB) has(auxm *memdb.DB, auxt tFiles, key []byte, seq uint64, ro *opt.ReadOptions) (ret bool, err error) {
793 ikey := makeInternalKey(nil, key, seq, keyTypeSeek)
796 if ok, _, me := memGet(auxm, ikey, db.s.icmp); ok {
797 return me == nil, nilIfNotFound(me)
801 em, fm := db.getMems()
802 for _, m := range [...]*memDB{em, fm} {
808 if ok, _, me := memGet(m.DB, ikey, db.s.icmp); ok {
809 return me == nil, nilIfNotFound(me)
814 _, cSched, err := v.get(auxt, ikey, ro, true)
817 // Trigger table compaction.
818 db.compTrigger(db.tcompCmdC)
822 } else if err == ErrNotFound {
828 // Get gets the value for the given key. It returns ErrNotFound if the
829 // DB does not contains the key.
831 // The returned slice is its own copy, it is safe to modify the contents
832 // of the returned slice.
833 // It is safe to modify the contents of the argument after Get returns.
834 func (db *DB) Get(key []byte, ro *opt.ReadOptions) (value []byte, err error) {
840 se := db.acquireSnapshot()
841 defer db.releaseSnapshot(se)
842 return db.get(nil, nil, key, se.seq, ro)
845 // Has returns true if the DB does contains the given key.
847 // It is safe to modify the contents of the argument after Has returns.
848 func (db *DB) Has(key []byte, ro *opt.ReadOptions) (ret bool, err error) {
854 se := db.acquireSnapshot()
855 defer db.releaseSnapshot(se)
856 return db.has(nil, nil, key, se.seq, ro)
859 // NewIterator returns an iterator for the latest snapshot of the
861 // The returned iterator is not safe for concurrent use, but it is safe to use
862 // multiple iterators concurrently, with each in a dedicated goroutine.
863 // It is also safe to use an iterator concurrently with modifying its
864 // underlying DB. The resultant key/value pairs are guaranteed to be
867 // Slice allows slicing the iterator to only contains keys in the given
868 // range. A nil Range.Start is treated as a key before all keys in the
869 // DB. And a nil Range.Limit is treated as a key after all keys in
872 // The iterator must be released after use, by calling Release method.
874 // Also read Iterator documentation of the leveldb/iterator package.
875 func (db *DB) NewIterator(slice *util.Range, ro *opt.ReadOptions) iterator.Iterator {
876 if err := db.ok(); err != nil {
877 return iterator.NewEmptyIterator(err)
880 se := db.acquireSnapshot()
881 defer db.releaseSnapshot(se)
882 // Iterator holds 'version' lock, 'version' is immutable so snapshot
883 // can be released after iterator created.
884 return db.newIterator(nil, nil, se.seq, slice, ro)
887 // GetSnapshot returns a latest snapshot of the underlying DB. A snapshot
888 // is a frozen snapshot of a DB state at a particular point in time. The
889 // content of snapshot are guaranteed to be consistent.
891 // The snapshot must be released after use, by calling Release method.
892 func (db *DB) GetSnapshot() (*Snapshot, error) {
893 if err := db.ok(); err != nil {
897 return db.newSnapshot(), nil
900 // GetProperty returns value of the given property name.
903 // leveldb.num-files-at-level{n}
904 // Returns the number of files at level 'n'.
906 // Returns statistics of the underlying DB.
908 // Returns sstables list for each level.
910 // Returns block pool stats.
911 // leveldb.cachedblock
912 // Returns size of cached block.
913 // leveldb.openedtables
914 // Returns number of opened tables.
915 // leveldb.alivesnaps
916 // Returns number of alive snapshots.
917 // leveldb.aliveiters
918 // Returns number of alive iterators.
919 func (db *DB) GetProperty(name string) (value string, err error) {
925 const prefix = "leveldb."
926 if !strings.HasPrefix(name, prefix) {
927 return "", ErrNotFound
929 p := name[len(prefix):]
934 numFilesPrefix := "num-files-at-level"
936 case strings.HasPrefix(p, numFilesPrefix):
939 n, _ := fmt.Sscanf(p[len(numFilesPrefix):], "%d%s", &level, &rest)
943 value = fmt.Sprint(v.tLen(int(level)))
946 value = "Compactions\n" +
947 " Level | Tables | Size(MB) | Time(sec) | Read(MB) | Write(MB)\n" +
948 "-------+------------+---------------+---------------+---------------+---------------\n"
949 for level, tables := range v.levels {
950 duration, read, write := db.compStats.getStat(level)
951 if len(tables) == 0 && duration == 0 {
954 value += fmt.Sprintf(" %3d | %10d | %13.5f | %13.5f | %13.5f | %13.5f\n",
955 level, len(tables), float64(tables.size())/1048576.0, duration.Seconds(),
956 float64(read)/1048576.0, float64(write)/1048576.0)
958 case p == "sstables":
959 for level, tables := range v.levels {
960 value += fmt.Sprintf("--- level %d ---\n", level)
961 for _, t := range tables {
962 value += fmt.Sprintf("%d:%d[%q .. %q]\n", t.fd.Num, t.size, t.imin, t.imax)
965 case p == "blockpool":
966 value = fmt.Sprintf("%v", db.s.tops.bpool)
967 case p == "cachedblock":
968 if db.s.tops.bcache != nil {
969 value = fmt.Sprintf("%d", db.s.tops.bcache.Size())
973 case p == "openedtables":
974 value = fmt.Sprintf("%d", db.s.tops.cache.Size())
975 case p == "alivesnaps":
976 value = fmt.Sprintf("%d", atomic.LoadInt32(&db.aliveSnaps))
977 case p == "aliveiters":
978 value = fmt.Sprintf("%d", atomic.LoadInt32(&db.aliveIters))
986 // SizeOf calculates approximate sizes of the given key ranges.
987 // The length of the returned sizes are equal with the length of the given
988 // ranges. The returned sizes measure storage space usage, so if the user
989 // data compresses by a factor of ten, the returned sizes will be one-tenth
990 // the size of the corresponding user data size.
991 // The results may not include the sizes of recently written data.
992 func (db *DB) SizeOf(ranges []util.Range) (Sizes, error) {
993 if err := db.ok(); err != nil {
1000 sizes := make(Sizes, 0, len(ranges))
1001 for _, r := range ranges {
1002 imin := makeInternalKey(nil, r.Start, keyMaxSeq, keyTypeSeek)
1003 imax := makeInternalKey(nil, r.Limit, keyMaxSeq, keyTypeSeek)
1004 start, err := v.offsetOf(imin)
1008 limit, err := v.offsetOf(imax)
1014 size = limit - start
1016 sizes = append(sizes, size)
1022 // Close closes the DB. This will also releases any outstanding snapshot,
1023 // abort any in-flight compaction and discard open transaction.
1025 // It is not safe to close a DB until all outstanding iterators are released.
1026 // It is valid to call Close multiple times. Other methods should not be
1027 // called after the DB has been closed.
1028 func (db *DB) Close() error {
1029 if !db.setClosed() {
1034 db.log("db@close closing")
1036 // Clear the finalizer.
1037 runtime.SetFinalizer(db, nil)
1039 // Get compaction error.
1042 case err = <-db.compErrC:
1043 if err == ErrReadOnly {
1049 // Signal all goroutines.
1052 // Discard open transaction.
1057 // Acquire writer lock.
1058 db.writeLockC <- struct{}{}
1060 // Wait for all gorotines to exit.
1064 if db.journal != nil {
1066 db.journalWriter.Close()
1068 db.journalWriter = nil
1071 if db.writeDelayN > 0 {
1072 db.logf("db@write was delayed N·%d T·%v", db.writeDelayN, db.writeDelay)
1077 db.logf("db@close done T·%v", time.Since(start))
1080 if db.closer != nil {
1081 if err1 := db.closer.Close(); err == nil {