1 // Copyright (c) 2012, Suryandaru Triandana <syndtr@gmail.com>
2 // All rights reserved.
4 // Use of this source code is governed by a BSD-style license that can be
5 // found in the LICENSE file.
26 "github.com/onsi/gomega"
28 "github.com/syndtr/goleveldb/leveldb/comparer"
29 "github.com/syndtr/goleveldb/leveldb/errors"
30 "github.com/syndtr/goleveldb/leveldb/filter"
31 "github.com/syndtr/goleveldb/leveldb/iterator"
32 "github.com/syndtr/goleveldb/leveldb/opt"
33 "github.com/syndtr/goleveldb/leveldb/storage"
34 "github.com/syndtr/goleveldb/leveldb/testutil"
35 "github.com/syndtr/goleveldb/leveldb/util"
38 func tkey(i int) []byte {
39 return []byte(fmt.Sprintf("%016d", i))
42 func tval(seed, n int) []byte {
43 r := rand.New(rand.NewSource(int64(seed)))
44 return randomString(r, n)
47 func testingLogger(t *testing.T) func(log string) {
48 return func(log string) {
53 func testingPreserveOnFailed(t *testing.T) func() (preserve bool, err error) {
54 return func() (preserve bool, err error) {
60 type dbHarness struct {
63 stor *testutil.Storage
70 func newDbHarnessWopt(t *testing.T, o *opt.Options) *dbHarness {
76 func newDbHarness(t *testing.T) *dbHarness {
77 return newDbHarnessWopt(t, &opt.Options{DisableLargeBatchTransaction: true})
80 func (h *dbHarness) init(t *testing.T, o *opt.Options) {
81 gomega.RegisterTestingT(t)
83 h.stor = testutil.NewStorage()
84 h.stor.OnLog(testingLogger(t))
85 h.stor.OnClose(testingPreserveOnFailed(t))
90 if err := h.openDB0(); err != nil {
91 // So that it will come after fatal message.
93 h.t.Fatal("Open (init): got error: ", err)
97 func (h *dbHarness) openDB0() (err error) {
99 h.db, err = Open(h.stor, h.o)
103 func (h *dbHarness) openDB() {
104 if err := h.openDB0(); err != nil {
105 h.t.Fatal("Open: got error: ", err)
109 func (h *dbHarness) closeDB0() error {
110 h.t.Log("closing DB")
114 func (h *dbHarness) closeDB() {
116 if err := h.closeDB0(); err != nil {
117 h.t.Error("Close: got error: ", err)
124 func (h *dbHarness) reopenDB() {
131 func (h *dbHarness) close() {
141 func (h *dbHarness) openAssert(want bool) {
142 db, err := Open(h.stor, h.o)
145 h.t.Error("Open: assert: got error: ", err)
147 h.t.Log("Open: assert: got error (expected): ", err)
151 h.t.Error("Open: assert: expect error")
157 func (h *dbHarness) write(batch *Batch) {
158 if err := h.db.Write(batch, h.wo); err != nil {
159 h.t.Error("Write: got error: ", err)
163 func (h *dbHarness) put(key, value string) {
164 if err := h.db.Put([]byte(key), []byte(value), h.wo); err != nil {
165 h.t.Error("Put: got error: ", err)
169 func (h *dbHarness) putMulti(n int, low, hi string) {
170 for i := 0; i < n; i++ {
177 func (h *dbHarness) maxNextLevelOverlappingBytes(want int64) {
186 if len(v.levels) > 2 {
187 for i, tt := range v.levels[1 : len(v.levels)-1] {
189 next := v.levels[level+1]
190 for _, t := range tt {
191 r := next.getOverlaps(nil, db.s.icmp, t.imin.ukey(), t.imax.ukey(), false)
193 if sum > maxOverlaps {
202 if maxOverlaps > want {
203 t.Errorf("next level most overlapping bytes is more than %d, got=%d level=%d", want, maxOverlaps, maxLevel)
205 t.Logf("next level most overlapping bytes is %d, level=%d want=%d", maxOverlaps, maxLevel, want)
209 func (h *dbHarness) delete(key string) {
213 err := db.Delete([]byte(key), h.wo)
215 t.Error("Delete: got error: ", err)
219 func (h *dbHarness) assertNumKeys(want int) {
220 iter := h.db.NewIterator(nil, h.ro)
226 if err := iter.Error(); err != nil {
227 h.t.Error("assertNumKeys: ", err)
230 h.t.Errorf("assertNumKeys: want=%d got=%d", want, got)
234 func (h *dbHarness) getr(db Reader, key string, expectFound bool) (found bool, v []byte) {
236 v, err := db.Get([]byte(key), h.ro)
240 t.Errorf("Get: key '%s' not found, want found", key)
245 t.Errorf("Get: key '%s' found, want not found", key)
248 t.Error("Get: got error: ", err)
253 func (h *dbHarness) get(key string, expectFound bool) (found bool, v []byte) {
254 return h.getr(h.db, key, expectFound)
257 func (h *dbHarness) getValr(db Reader, key, value string) {
259 found, r := h.getr(db, key, true)
265 t.Errorf("Get: invalid value, got '%s', want '%s'", rval, value)
269 func (h *dbHarness) getVal(key, value string) {
270 h.getValr(h.db, key, value)
273 func (h *dbHarness) allEntriesFor(key, want string) {
278 ikey := makeInternalKey(nil, []byte(key), keyMaxSeq, keyTypeVal)
279 iter := db.newRawIterator(nil, nil, nil, nil)
280 if !iter.Seek(ikey) && iter.Error() != nil {
281 t.Error("AllEntries: error during seek, err: ", iter.Error())
287 if ukey, _, kt, kerr := parseInternalKey(iter.Key()); kerr == nil {
288 if s.icmp.uCompare(ikey.ukey(), ukey) != 0 {
297 res += string(iter.Value())
315 t.Errorf("AllEntries: assert failed for key %q, got=%q want=%q", key, res, want)
319 // Return a string that contains all key,value pairs in order,
320 // formatted like "(k1->v1)(k2->v2)".
321 func (h *dbHarness) getKeyVal(want string) {
325 s, err := db.GetSnapshot()
327 t.Fatal("GetSnapshot: got error: ", err)
330 iter := s.NewIterator(nil, nil)
332 res += fmt.Sprintf("(%s->%s)", string(iter.Key()), string(iter.Value()))
337 t.Errorf("GetKeyVal: invalid key/value pair, got=%q want=%q", res, want)
342 func (h *dbHarness) waitCompaction() {
345 if err := db.compTriggerWait(db.tcompCmdC); err != nil {
346 t.Error("compaction error: ", err)
350 func (h *dbHarness) waitMemCompaction() {
354 if err := db.compTriggerWait(db.mcompCmdC); err != nil {
355 t.Error("compaction error: ", err)
359 func (h *dbHarness) compactMem() {
363 t.Log("starting memdb compaction")
365 db.writeLockC <- struct{}{}
370 if _, err := db.rotateMem(0, true); err != nil {
371 t.Error("compaction error: ", err)
374 if h.totalTables() == 0 {
375 t.Error("zero tables after mem compaction")
378 t.Log("memdb compaction done")
381 func (h *dbHarness) compactRangeAtErr(level int, min, max string, wanterr bool) {
385 var _min, _max []byte
393 t.Logf("starting table range compaction: level=%d, min=%q, max=%q", level, min, max)
395 if err := db.compTriggerRange(db.tcompCmdC, level, _min, _max); err != nil {
397 t.Log("CompactRangeAt: got error (expected): ", err)
399 t.Error("CompactRangeAt: got error: ", err)
402 t.Error("CompactRangeAt: expect error")
405 t.Log("table range compaction done")
408 func (h *dbHarness) compactRangeAt(level int, min, max string) {
409 h.compactRangeAtErr(level, min, max, false)
412 func (h *dbHarness) compactRange(min, max string) {
416 t.Logf("starting DB range compaction: min=%q, max=%q", min, max)
420 r.Start = []byte(min)
423 r.Limit = []byte(max)
425 if err := db.CompactRange(r); err != nil {
426 t.Error("CompactRange: got error: ", err)
429 t.Log("DB range compaction done")
432 func (h *dbHarness) sizeOf(start, limit string) int64 {
433 sz, err := h.db.SizeOf([]util.Range{
434 {[]byte(start), []byte(limit)},
437 h.t.Error("SizeOf: got error: ", err)
442 func (h *dbHarness) sizeAssert(start, limit string, low, hi int64) {
443 sz := h.sizeOf(start, limit)
444 if sz < low || sz > hi {
445 h.t.Errorf("sizeOf %q to %q not in range, want %d - %d, got %d",
446 shorten(start), shorten(limit), low, hi, sz)
450 func (h *dbHarness) getSnapshot() (s *Snapshot) {
451 s, err := h.db.GetSnapshot()
453 h.t.Fatal("GetSnapshot: got error: ", err)
458 func (h *dbHarness) getTablesPerLevel() string {
461 v := h.db.s.version()
462 for level, tables := range v.levels {
466 res += fmt.Sprint(len(tables))
475 func (h *dbHarness) tablesPerLevel(want string) {
476 res := h.getTablesPerLevel()
478 h.t.Errorf("invalid tables len, want=%s, got=%s", want, res)
482 func (h *dbHarness) totalTables() (n int) {
483 v := h.db.s.version()
484 for _, tables := range v.levels {
491 type keyValue interface {
496 func testKeyVal(t *testing.T, kv keyValue, want string) {
497 res := string(kv.Key()) + "->" + string(kv.Value())
499 t.Errorf("invalid key/value, want=%q, got=%q", want, res)
503 func numKey(num int) string {
504 return fmt.Sprintf("key%06d", num)
507 var testingBloomFilter = filter.NewBloomFilter(10)
509 func truno(t *testing.T, o *opt.Options, f func(h *dbHarness)) {
510 for i := 0; i < 4; i++ {
517 DisableLargeBatchTransaction: true,
518 Filter: testingBloomFilter,
524 o.Filter = testingBloomFilter
529 DisableLargeBatchTransaction: true,
530 Compression: opt.NoCompression,
536 o.Compression = opt.NoCompression
539 h := newDbHarnessWopt(t, o)
550 func trun(t *testing.T, f func(h *dbHarness)) {
554 func testAligned(t *testing.T, name string, offset uintptr) {
556 t.Errorf("field %s offset is not 64-bit aligned", name)
560 func Test_FieldsAligned(t *testing.T) {
562 testAligned(t, "DB.seq", unsafe.Offsetof(p1.seq))
564 testAligned(t, "session.stNextFileNum", unsafe.Offsetof(p2.stNextFileNum))
565 testAligned(t, "session.stJournalNum", unsafe.Offsetof(p2.stJournalNum))
566 testAligned(t, "session.stPrevJournalNum", unsafe.Offsetof(p2.stPrevJournalNum))
567 testAligned(t, "session.stSeqNum", unsafe.Offsetof(p2.stSeqNum))
570 func TestDB_Locking(t *testing.T) {
578 func TestDB_Empty(t *testing.T) {
579 trun(t, func(h *dbHarness) {
587 func TestDB_ReadWrite(t *testing.T) {
588 trun(t, func(h *dbHarness) {
590 h.getVal("foo", "v1")
593 h.getVal("foo", "v3")
594 h.getVal("bar", "v2")
597 h.getVal("foo", "v3")
598 h.getVal("bar", "v2")
602 func TestDB_PutDeleteGet(t *testing.T) {
603 trun(t, func(h *dbHarness) {
605 h.getVal("foo", "v1")
607 h.getVal("foo", "v2")
616 func TestDB_EmptyBatch(t *testing.T) {
621 err := h.db.Write(new(Batch), h.wo)
623 t.Error("writing empty batch yield error: ", err)
628 func TestDB_GetFromFrozen(t *testing.T) {
629 h := newDbHarnessWopt(t, &opt.Options{
630 DisableLargeBatchTransaction: true,
636 h.getVal("foo", "v1")
638 h.stor.Stall(testutil.ModeSync, storage.TypeTable) // Block sync calls
639 h.put("k1", strings.Repeat("x", 100000)) // Fill memtable
640 h.put("k2", strings.Repeat("y", 100000)) // Trigger compaction
641 for i := 0; h.db.getFrozenMem() == nil && i < 100; i++ {
642 time.Sleep(10 * time.Microsecond)
644 if h.db.getFrozenMem() == nil {
645 h.stor.Release(testutil.ModeSync, storage.TypeTable)
646 t.Fatal("No frozen mem")
648 h.getVal("foo", "v1")
649 h.stor.Release(testutil.ModeSync, storage.TypeTable) // Release sync calls
652 h.getVal("foo", "v1")
657 func TestDB_GetFromTable(t *testing.T) {
658 trun(t, func(h *dbHarness) {
661 h.getVal("foo", "v1")
665 func TestDB_GetSnapshot(t *testing.T) {
666 trun(t, func(h *dbHarness) {
667 bar := strings.Repeat("b", 200)
671 snap, err := h.db.GetSnapshot()
673 t.Fatal("GetSnapshot: got error: ", err)
679 h.getVal("foo", "v2")
681 h.getValr(snap, "foo", "v1")
682 h.getValr(snap, bar, "v1")
686 h.getVal("foo", "v2")
688 h.getValr(snap, "foo", "v1")
689 h.getValr(snap, bar, "v1")
694 h.getVal("foo", "v2")
699 func TestDB_GetLevel0Ordering(t *testing.T) {
700 trun(t, func(h *dbHarness) {
701 h.db.memdbMaxLevel = 2
703 for i := 0; i < 4; i++ {
704 h.put("bar", fmt.Sprintf("b%d", i))
705 h.put("foo", fmt.Sprintf("v%d", i))
708 h.getVal("foo", "v3")
709 h.getVal("bar", "b3")
711 v := h.db.s.version()
715 t.Errorf("level-0 tables is less than 2, got %d", t0len)
719 h.getVal("foo", "v3")
720 h.getVal("bar", "b3")
724 func TestDB_GetOrderedByLevels(t *testing.T) {
725 trun(t, func(h *dbHarness) {
728 h.compactRange("a", "z")
729 h.getVal("foo", "v1")
732 h.getVal("foo", "v2")
736 func TestDB_GetPicksCorrectFile(t *testing.T) {
737 trun(t, func(h *dbHarness) {
738 // Arrange to have multiple files in a non-level-0 level.
741 h.compactRange("a", "b")
744 h.compactRange("x", "y")
747 h.compactRange("f", "g")
753 h.compactRange("", "")
760 func TestDB_GetEncountersEmptyLevel(t *testing.T) {
761 trun(t, func(h *dbHarness) {
762 h.db.memdbMaxLevel = 2
764 // Arrange for the following to happen:
765 // * sstable A in level 0
766 // * nothing in level 1
767 // * sstable B in level 2
768 // Then do enough Get() calls to arrange for an automatic compaction
769 // of sstable A. A bug would cause the compaction to be marked as
770 // occuring at level 1 (instead of the correct level 0).
772 // Step 1: First place sstables in levels 0 and 2
775 t.Fatal("could not fill levels-0 and level-2")
777 v := h.db.s.version()
778 if v.tLen(0) > 0 && v.tLen(2) > 0 {
787 h.getVal("a", "begin")
791 // Step 2: clear level 1 if necessary.
792 h.compactRangeAt(1, "", "")
793 h.tablesPerLevel("1,0,1")
795 h.getVal("a", "begin")
798 // Step 3: read a bunch of times
799 for i := 0; i < 200; i++ {
800 h.get("missing", false)
803 // Step 4: Wait for compaction to finish
806 v := h.db.s.version()
808 t.Errorf("level-0 tables more than 0, got %d", v.tLen(0))
812 h.getVal("a", "begin")
817 func TestDB_IterMultiWithDelete(t *testing.T) {
818 trun(t, func(h *dbHarness) {
825 iter := h.db.NewIterator(nil, nil)
826 iter.Seek([]byte("c"))
827 testKeyVal(t, iter, "c->vc")
829 testKeyVal(t, iter, "a->va")
834 iter = h.db.NewIterator(nil, nil)
835 iter.Seek([]byte("c"))
836 testKeyVal(t, iter, "c->vc")
838 testKeyVal(t, iter, "a->va")
843 func TestDB_IteratorPinsRef(t *testing.T) {
847 h.put("foo", "hello")
849 // Get iterator that will yield the current contents of the DB.
850 iter := h.db.NewIterator(nil, nil)
852 // Write to force compactions
853 h.put("foo", "newvalue1")
854 for i := 0; i < 100; i++ {
855 h.put(numKey(i), strings.Repeat(fmt.Sprintf("v%09d", i), 100000/10))
857 h.put("foo", "newvalue2")
860 testKeyVal(t, iter, "foo->hello")
862 t.Errorf("expect eof")
867 func TestDB_Recover(t *testing.T) {
868 trun(t, func(h *dbHarness) {
873 h.getVal("foo", "v1")
875 h.getVal("foo", "v1")
876 h.getVal("baz", "v5")
881 h.getVal("foo", "v3")
883 h.getVal("foo", "v4")
884 h.getVal("bar", "v2")
885 h.getVal("baz", "v5")
889 func TestDB_RecoverWithEmptyJournal(t *testing.T) {
890 trun(t, func(h *dbHarness) {
899 h.getVal("foo", "v3")
903 func TestDB_RecoverDuringMemtableCompaction(t *testing.T) {
904 truno(t, &opt.Options{DisableLargeBatchTransaction: true, WriteBuffer: 1000000}, func(h *dbHarness) {
906 h.stor.Stall(testutil.ModeSync, storage.TypeTable)
907 h.put("big1", strings.Repeat("x", 10000000))
908 h.put("big2", strings.Repeat("y", 1000))
910 h.stor.Release(testutil.ModeSync, storage.TypeTable)
913 h.getVal("bar", "v2")
914 h.getVal("big1", strings.Repeat("x", 10000000))
915 h.getVal("big2", strings.Repeat("y", 1000))
919 func TestDB_MinorCompactionsHappen(t *testing.T) {
920 h := newDbHarnessWopt(t, &opt.Options{DisableLargeBatchTransaction: true, WriteBuffer: 10000})
925 key := func(i int) string {
926 return fmt.Sprintf("key%06d", i)
929 for i := 0; i < n; i++ {
930 h.put(key(i), key(i)+strings.Repeat("v", 1000))
933 for i := 0; i < n; i++ {
934 h.getVal(key(i), key(i)+strings.Repeat("v", 1000))
938 for i := 0; i < n; i++ {
939 h.getVal(key(i), key(i)+strings.Repeat("v", 1000))
943 func TestDB_RecoverWithLargeJournal(t *testing.T) {
947 h.put("big1", strings.Repeat("1", 200000))
948 h.put("big2", strings.Repeat("2", 200000))
949 h.put("small3", strings.Repeat("3", 10))
950 h.put("small4", strings.Repeat("4", 10))
953 // Make sure that if we re-open with a small write buffer size that
954 // we flush table files in the middle of a large journal file.
955 h.o.WriteBuffer = 100000
957 h.getVal("big1", strings.Repeat("1", 200000))
958 h.getVal("big2", strings.Repeat("2", 200000))
959 h.getVal("small3", strings.Repeat("3", 10))
960 h.getVal("small4", strings.Repeat("4", 10))
961 v := h.db.s.version()
963 t.Errorf("tables-0 less than one")
968 func TestDB_CompactionsGenerateMultipleFiles(t *testing.T) {
969 h := newDbHarnessWopt(t, &opt.Options{
970 DisableLargeBatchTransaction: true,
971 WriteBuffer: 10000000,
972 Compression: opt.NoCompression,
976 v := h.db.s.version()
978 t.Errorf("level-0 tables more than 0, got %d", v.tLen(0))
984 // Write 8MB (80 values, each 100K)
985 for i := 0; i < n; i++ {
986 h.put(numKey(i), strings.Repeat(fmt.Sprintf("v%09d", i), 100000/10))
989 // Reopening moves updates to level-0
991 h.compactRangeAt(0, "", "")
995 t.Errorf("level-0 tables more than 0, got %d", v.tLen(0))
998 t.Errorf("level-1 tables less than 1, got %d", v.tLen(1))
1002 for i := 0; i < n; i++ {
1003 h.getVal(numKey(i), strings.Repeat(fmt.Sprintf("v%09d", i), 100000/10))
1007 func TestDB_RepeatedWritesToSameKey(t *testing.T) {
1008 h := newDbHarnessWopt(t, &opt.Options{DisableLargeBatchTransaction: true, WriteBuffer: 100000})
1011 maxTables := h.o.GetWriteL0PauseTrigger() + 7
1013 value := strings.Repeat("v", 2*h.o.GetWriteBuffer())
1014 for i := 0; i < 5*maxTables; i++ {
1016 n := h.totalTables()
1018 t.Errorf("total tables exceed %d, got=%d, iter=%d", maxTables, n, i)
1023 func TestDB_RepeatedWritesToSameKeyAfterReopen(t *testing.T) {
1024 h := newDbHarnessWopt(t, &opt.Options{
1025 DisableLargeBatchTransaction: true,
1026 WriteBuffer: 100000,
1032 maxTables := h.o.GetWriteL0PauseTrigger() + 7
1034 value := strings.Repeat("v", 2*h.o.GetWriteBuffer())
1035 for i := 0; i < 5*maxTables; i++ {
1037 n := h.totalTables()
1039 t.Errorf("total tables exceed %d, got=%d, iter=%d", maxTables, n, i)
1044 func TestDB_SparseMerge(t *testing.T) {
1045 h := newDbHarnessWopt(t, &opt.Options{DisableLargeBatchTransaction: true, Compression: opt.NoCompression})
1048 h.putMulti(7, "A", "Z")
1050 // Suppose there is:
1051 // small amount of data with prefix A
1052 // large amount of data with prefix B
1053 // small amount of data with prefix C
1054 // and that recent updates have made small changes to all three prefixes.
1055 // Check that we do not do a compaction that merges all of B in one shot.
1057 value := strings.Repeat("x", 1000)
1058 for i := 0; i < 100000; i++ {
1059 h.put(fmt.Sprintf("B%010d", i), value)
1063 h.compactRangeAt(0, "", "")
1066 // Make sparse update
1068 h.put("B100", "bvalue2")
1073 h.maxNextLevelOverlappingBytes(20 * 1048576)
1074 h.compactRangeAt(0, "", "")
1076 h.maxNextLevelOverlappingBytes(20 * 1048576)
1077 h.compactRangeAt(1, "", "")
1079 h.maxNextLevelOverlappingBytes(20 * 1048576)
1082 func TestDB_SizeOf(t *testing.T) {
1083 h := newDbHarnessWopt(t, &opt.Options{
1084 DisableLargeBatchTransaction: true,
1085 Compression: opt.NoCompression,
1086 WriteBuffer: 10000000,
1090 h.sizeAssert("", "xyz", 0, 0)
1092 h.sizeAssert("", "xyz", 0, 0)
1094 // Write 8MB (80 values, each 100K)
1099 for i := 0; i < n; i++ {
1100 h.put(numKey(i), strings.Repeat(fmt.Sprintf("v%09d", i), s1/10))
1103 // 0 because SizeOf() does not account for memtable space
1104 h.sizeAssert("", numKey(50), 0, 0)
1106 for r := 0; r < 3; r++ {
1109 for cs := 0; cs < n; cs += 10 {
1110 for i := 0; i < n; i += 10 {
1111 h.sizeAssert("", numKey(i), int64(s1*i), int64(s2*i))
1112 h.sizeAssert("", numKey(i)+".suffix", int64(s1*(i+1)), int64(s2*(i+1)))
1113 h.sizeAssert(numKey(i), numKey(i+10), int64(s1*10), int64(s2*10))
1116 h.sizeAssert("", numKey(50), int64(s1*50), int64(s2*50))
1117 h.sizeAssert("", numKey(50)+".suffix", int64(s1*50), int64(s2*50))
1119 h.compactRangeAt(0, numKey(cs), numKey(cs+9))
1122 v := h.db.s.version()
1124 t.Errorf("level-0 tables was not zero, got %d", v.tLen(0))
1127 t.Error("level-1 tables was zero")
1133 func TestDB_SizeOf_MixOfSmallAndLarge(t *testing.T) {
1134 h := newDbHarnessWopt(t, &opt.Options{
1135 DisableLargeBatchTransaction: true,
1136 Compression: opt.NoCompression,
1151 for i, n := range sizes {
1152 h.put(numKey(i), strings.Repeat(fmt.Sprintf("v%09d", i), int(n)/10))
1155 for r := 0; r < 3; r++ {
1159 for i, n := range sizes {
1164 h.sizeAssert("", numKey(i), x, y)
1168 h.sizeAssert(numKey(3), numKey(5), 110000, 111000)
1170 h.compactRangeAt(0, "", "")
1174 func TestDB_Snapshot(t *testing.T) {
1175 trun(t, func(h *dbHarness) {
1177 s1 := h.getSnapshot()
1179 s2 := h.getSnapshot()
1181 s3 := h.getSnapshot()
1184 h.getValr(s1, "foo", "v1")
1185 h.getValr(s2, "foo", "v2")
1186 h.getValr(s3, "foo", "v3")
1187 h.getVal("foo", "v4")
1190 h.getValr(s1, "foo", "v1")
1191 h.getValr(s2, "foo", "v2")
1192 h.getVal("foo", "v4")
1195 h.getValr(s2, "foo", "v2")
1196 h.getVal("foo", "v4")
1199 h.getVal("foo", "v4")
1203 func TestDB_SnapshotList(t *testing.T) {
1204 db := &DB{snapsList: list.New()}
1205 e0a := db.acquireSnapshot()
1206 e0b := db.acquireSnapshot()
1208 e1 := db.acquireSnapshot()
1210 e2 := db.acquireSnapshot()
1212 if db.minSeq() != 0 {
1213 t.Fatalf("invalid sequence number, got=%d", db.minSeq())
1215 db.releaseSnapshot(e0a)
1216 if db.minSeq() != 0 {
1217 t.Fatalf("invalid sequence number, got=%d", db.minSeq())
1219 db.releaseSnapshot(e2)
1220 if db.minSeq() != 0 {
1221 t.Fatalf("invalid sequence number, got=%d", db.minSeq())
1223 db.releaseSnapshot(e0b)
1224 if db.minSeq() != 1 {
1225 t.Fatalf("invalid sequence number, got=%d", db.minSeq())
1227 e2 = db.acquireSnapshot()
1228 if db.minSeq() != 1 {
1229 t.Fatalf("invalid sequence number, got=%d", db.minSeq())
1231 db.releaseSnapshot(e1)
1232 if db.minSeq() != 2 {
1233 t.Fatalf("invalid sequence number, got=%d", db.minSeq())
1235 db.releaseSnapshot(e2)
1236 if db.minSeq() != 2 {
1237 t.Fatalf("invalid sequence number, got=%d", db.minSeq())
1241 func TestDB_HiddenValuesAreRemoved(t *testing.T) {
1242 trun(t, func(h *dbHarness) {
1246 h.db.memdbMaxLevel = m
1254 t.Errorf("invalid level-%d len, want=1 got=%d", m, num)
1257 // Place a table at level last-1 to prevent merging with preceding mutation
1263 t.Errorf("invalid level-%d len, want=1 got=%d", m, v.tLen(m))
1265 if v.tLen(m-1) != 1 {
1266 t.Errorf("invalid level-%d len, want=1 got=%d", m-1, v.tLen(m-1))
1272 h.allEntriesFor("foo", "[ v2, DEL, v1 ]")
1274 h.allEntriesFor("foo", "[ v2, DEL, v1 ]")
1275 h.compactRangeAt(m-2, "", "z")
1276 // DEL eliminated, but v1 remains because we aren't compacting that level
1277 // (DEL can be eliminated because v2 hides v1).
1278 h.allEntriesFor("foo", "[ v2, v1 ]")
1279 h.compactRangeAt(m-1, "", "")
1280 // Merging last-1 w/ last, so we are the base level for "foo", so
1281 // DEL is removed. (as is v1).
1282 h.allEntriesFor("foo", "[ v2 ]")
1286 func TestDB_DeletionMarkers2(t *testing.T) {
1287 h := newDbHarness(t)
1292 h.db.memdbMaxLevel = m
1300 t.Errorf("invalid level-%d len, want=1 got=%d", m, num)
1303 // Place a table at level last-1 to prevent merging with preceding mutation
1309 t.Errorf("invalid level-%d len, want=1 got=%d", m, v.tLen(m))
1311 if v.tLen(m-1) != 1 {
1312 t.Errorf("invalid level-%d len, want=1 got=%d", m-1, v.tLen(m-1))
1317 h.allEntriesFor("foo", "[ DEL, v1 ]")
1318 h.compactMem() // Moves to level last-2
1319 h.allEntriesFor("foo", "[ DEL, v1 ]")
1320 h.compactRangeAt(m-2, "", "")
1321 // DEL kept: "last" file overlaps
1322 h.allEntriesFor("foo", "[ DEL, v1 ]")
1323 h.compactRangeAt(m-1, "", "")
1324 // Merging last-1 w/ last, so we are the base level for "foo", so
1325 // DEL is removed. (as is v1).
1326 h.allEntriesFor("foo", "[ ]")
1329 func TestDB_CompactionTableOpenError(t *testing.T) {
1330 h := newDbHarnessWopt(t, &opt.Options{
1331 DisableLargeBatchTransaction: true,
1332 OpenFilesCacheCapacity: -1,
1336 h.db.memdbMaxLevel = 2
1340 for r := 0; r < 2; r++ {
1341 for i := 0; i < im; i++ {
1342 for j := 0; j < jm; j++ {
1343 h.put(fmt.Sprintf("k%d,%d", i, j), fmt.Sprintf("v%d,%d", i, j))
1349 if n := h.totalTables(); n != im*2 {
1350 t.Errorf("total tables is %d, want %d", n, im*2)
1353 h.stor.EmulateError(testutil.ModeOpen, storage.TypeTable, errors.New("open error during table compaction"))
1354 go h.db.CompactRange(util.Range{})
1355 if err := h.db.compTriggerWait(h.db.tcompCmdC); err != nil {
1356 t.Log("compaction error: ", err)
1360 h.stor.EmulateError(testutil.ModeOpen, storage.TypeTable, nil)
1362 for i := 0; i < im; i++ {
1363 for j := 0; j < jm; j++ {
1364 h.getVal(fmt.Sprintf("k%d,%d", i, j), fmt.Sprintf("v%d,%d", i, j))
1369 func TestDB_OverlapInLevel0(t *testing.T) {
1370 trun(t, func(h *dbHarness) {
1371 h.db.memdbMaxLevel = 2
1373 // Fill levels 1 and 2 to disable the pushing of new memtables to levels > 0.
1374 h.put("100", "v100")
1375 h.put("999", "v999")
1380 h.tablesPerLevel("0,1,1")
1382 // Make files spanning the following ranges in level-0:
1383 // files[0] 200 .. 900
1384 // files[1] 300 .. 500
1385 // Note that files are sorted by min key.
1386 h.put("300", "v300")
1387 h.put("500", "v500")
1389 h.put("200", "v200")
1390 h.put("600", "v600")
1391 h.put("900", "v900")
1393 h.tablesPerLevel("2,1,1")
1395 // Compact away the placeholder files we created initially
1396 h.compactRangeAt(1, "", "")
1397 h.compactRangeAt(2, "", "")
1398 h.tablesPerLevel("2")
1400 // Do a memtable compaction. Before bug-fix, the compaction would
1401 // not detect the overlap with level-0 files and would incorrectly place
1402 // the deletion in a deeper level.
1405 h.tablesPerLevel("3")
1410 func TestDB_L0_CompactionBug_Issue44_a(t *testing.T) {
1411 h := newDbHarness(t)
1425 h.getKeyVal("(a->v)")
1427 h.getKeyVal("(a->v)")
1430 func TestDB_L0_CompactionBug_Issue44_b(t *testing.T) {
1431 h := newDbHarness(t)
1454 h.getKeyVal("(->)(c->cv)")
1456 h.getKeyVal("(->)(c->cv)")
1459 func TestDB_SingleEntryMemCompaction(t *testing.T) {
1460 trun(t, func(h *dbHarness) {
1461 for i := 0; i < 10; i++ {
1462 h.put("big", strings.Repeat("v", opt.DefaultWriteBuffer))
1464 h.put("key", strings.Repeat("v", opt.DefaultBlockSize))
1470 h.put("verybig", strings.Repeat("v", opt.DefaultWriteBuffer*2))
1476 func TestDB_ManifestWriteError(t *testing.T) {
1477 for i := 0; i < 2; i++ {
1479 h := newDbHarness(t)
1483 h.getVal("foo", "bar")
1485 // Mem compaction (will succeed)
1487 h.getVal("foo", "bar")
1488 v := h.db.s.version()
1489 if n := v.tLen(0); n != 1 {
1490 t.Errorf("invalid total tables, want=1 got=%d", n)
1495 h.stor.EmulateError(testutil.ModeWrite, storage.TypeManifest, errors.New("manifest write error"))
1497 h.stor.EmulateError(testutil.ModeSync, storage.TypeManifest, errors.New("manifest sync error"))
1500 // Merging compaction (will fail)
1501 h.compactRangeAtErr(0, "", "", true)
1504 h.stor.EmulateError(testutil.ModeWrite, storage.TypeManifest, nil)
1505 h.stor.EmulateError(testutil.ModeSync, storage.TypeManifest, nil)
1507 // Should not lose data
1509 h.getVal("foo", "bar")
1514 func assertErr(t *testing.T, err error, wanterr bool) {
1517 t.Log("AssertErr: got error (expected): ", err)
1519 t.Error("AssertErr: got error: ", err)
1522 t.Error("AssertErr: expect error")
1526 func TestDB_ClosedIsClosed(t *testing.T) {
1527 h := newDbHarness(t)
1530 var iter, iter2 iterator.Iterator
1538 iter = db.NewIterator(nil, h.ro)
1539 iter.Seek([]byte("k"))
1540 testKeyVal(t, iter, "k->v")
1543 snap, err = db.GetSnapshot()
1545 t.Fatal("GetSnapshot: got error: ", err)
1548 h.getValr(snap, "k", "v")
1550 iter2 = snap.NewIterator(nil, h.ro)
1551 iter2.Seek([]byte("k"))
1552 testKeyVal(t, iter2, "k->v")
1562 assertErr(t, db.Put([]byte("x"), []byte("y"), h.wo), true)
1563 _, err := db.Get([]byte("k"), h.ro)
1564 assertErr(t, err, true)
1567 t.Errorf("iter.Valid should false")
1569 assertErr(t, iter.Error(), false)
1570 testKeyVal(t, iter, "->")
1571 if iter.Seek([]byte("k")) {
1572 t.Errorf("iter.Seek should false")
1574 assertErr(t, iter.Error(), true)
1576 assertErr(t, iter2.Error(), false)
1578 _, err = snap.Get([]byte("k"), h.ro)
1579 assertErr(t, err, true)
1581 _, err = db.GetSnapshot()
1582 assertErr(t, err, true)
1584 iter3 := db.NewIterator(nil, h.ro)
1585 assertErr(t, iter3.Error(), true)
1587 iter3 = snap.NewIterator(nil, h.ro)
1588 assertErr(t, iter3.Error(), true)
1590 assertErr(t, db.Delete([]byte("k"), h.wo), true)
1592 _, err = db.GetProperty("leveldb.stats")
1593 assertErr(t, err, true)
1595 _, err = db.SizeOf([]util.Range{{[]byte("a"), []byte("z")}})
1596 assertErr(t, err, true)
1598 assertErr(t, db.CompactRange(util.Range{}), true)
1600 assertErr(t, db.Close(), true)
1603 type numberComparer struct{}
1605 func (numberComparer) num(x []byte) (n int) {
1606 fmt.Sscan(string(x[1:len(x)-1]), &n)
1610 func (numberComparer) Name() string {
1611 return "test.NumberComparer"
1614 func (p numberComparer) Compare(a, b []byte) int {
1615 return p.num(a) - p.num(b)
1618 func (numberComparer) Separator(dst, a, b []byte) []byte { return nil }
1619 func (numberComparer) Successor(dst, b []byte) []byte { return nil }
1621 func TestDB_CustomComparer(t *testing.T) {
1622 h := newDbHarnessWopt(t, &opt.Options{
1623 DisableLargeBatchTransaction: true,
1624 Comparer: numberComparer{},
1629 h.put("[10]", "ten")
1630 h.put("[0x14]", "twenty")
1631 for i := 0; i < 2; i++ {
1632 h.getVal("[10]", "ten")
1633 h.getVal("[0xa]", "ten")
1634 h.getVal("[20]", "twenty")
1635 h.getVal("[0x14]", "twenty")
1636 h.get("[15]", false)
1637 h.get("[0xf]", false)
1639 h.compactRange("[0]", "[9999]")
1642 for n := 0; n < 2; n++ {
1643 for i := 0; i < 100; i++ {
1644 v := fmt.Sprintf("[%d]", i*10)
1648 h.compactRange("[0]", "[1000000]")
1652 func TestDB_ManualCompaction(t *testing.T) {
1653 h := newDbHarness(t)
1656 h.db.memdbMaxLevel = 2
1658 h.putMulti(3, "p", "q")
1659 h.tablesPerLevel("1,1,1")
1661 // Compaction range falls before files
1662 h.compactRange("", "c")
1663 h.tablesPerLevel("1,1,1")
1665 // Compaction range falls after files
1666 h.compactRange("r", "z")
1667 h.tablesPerLevel("1,1,1")
1669 // Compaction range overlaps files
1670 h.compactRange("p1", "p9")
1671 h.tablesPerLevel("0,0,1")
1673 // Populate a different range
1674 h.putMulti(3, "c", "e")
1675 h.tablesPerLevel("1,1,2")
1677 // Compact just the new range
1678 h.compactRange("b", "f")
1679 h.tablesPerLevel("0,0,2")
1682 h.putMulti(1, "a", "z")
1683 h.tablesPerLevel("0,1,2")
1684 h.compactRange("", "")
1685 h.tablesPerLevel("0,0,1")
1688 func TestDB_BloomFilter(t *testing.T) {
1689 h := newDbHarnessWopt(t, &opt.Options{
1690 DisableLargeBatchTransaction: true,
1691 DisableBlockCache: true,
1692 Filter: filter.NewBloomFilter(10),
1696 key := func(i int) string {
1697 return fmt.Sprintf("key%06d", i)
1702 // Populate multiple layers
1703 for i := 0; i < n; i++ {
1704 h.put(key(i), key(i))
1707 h.compactRange("a", "z")
1708 for i := 0; i < n; i += 100 {
1709 h.put(key(i), key(i))
1713 // Prevent auto compactions triggered by seeks
1714 h.stor.Stall(testutil.ModeSync, storage.TypeTable)
1716 // Lookup present keys. Should rarely read from small sstable.
1717 h.stor.ResetCounter(testutil.ModeRead, storage.TypeTable)
1718 for i := 0; i < n; i++ {
1719 h.getVal(key(i), key(i))
1721 cnt, _ := h.stor.Counter(testutil.ModeRead, storage.TypeTable)
1722 t.Logf("lookup of %d present keys yield %d sstable I/O reads", n, cnt)
1723 if min, max := n, n+2*n/100; cnt < min || cnt > max {
1724 t.Errorf("num of sstable I/O reads of present keys not in range of %d - %d, got %d", min, max, cnt)
1727 // Lookup missing keys. Should rarely read from either sstable.
1728 h.stor.ResetCounter(testutil.ModeRead, storage.TypeTable)
1729 for i := 0; i < n; i++ {
1730 h.get(key(i)+".missing", false)
1732 cnt, _ = h.stor.Counter(testutil.ModeRead, storage.TypeTable)
1733 t.Logf("lookup of %d missing keys yield %d sstable I/O reads", n, cnt)
1734 if max := 3 * n / 100; cnt > max {
1735 t.Errorf("num of sstable I/O reads of missing keys was more than %d, got %d", max, cnt)
1738 h.stor.Release(testutil.ModeSync, storage.TypeTable)
1741 func TestDB_Concurrent(t *testing.T) {
1742 const n, secs, maxkey = 4, 6, 1000
1743 h := newDbHarness(t)
1746 runtime.GOMAXPROCS(runtime.NumCPU())
1749 closeWg sync.WaitGroup
1754 for i := 0; i < n; i++ {
1757 var put, get, found uint
1759 t.Logf("goroutine %d stopped after %d ops, put=%d get=%d found=%d missing=%d",
1760 i, cnt[i], put, get, found, get-found)
1764 rnd := rand.New(rand.NewSource(int64(1000 + i)))
1765 for atomic.LoadUint32(&stop) == 0 {
1768 k := rnd.Intn(maxkey)
1769 kstr := fmt.Sprintf("%016d", k)
1771 if (rnd.Int() % 2) > 0 {
1773 h.put(kstr, fmt.Sprintf("%d.%d.%-1000d", k, i, x))
1776 v, err := h.db.Get([]byte(kstr), h.ro)
1779 rk, ri, rx := 0, -1, uint32(0)
1780 fmt.Sscanf(string(v), "%d.%d.%d", &rk, &ri, &rx)
1782 t.Errorf("invalid key want=%d got=%d", k, rk)
1784 if ri < 0 || ri >= n {
1785 t.Error("invalid goroutine number: ", ri)
1787 tx := atomic.LoadUint32(&(cnt[ri]))
1789 t.Errorf("invalid seq number, %d > %d ", rx, tx)
1792 } else if err != ErrNotFound {
1793 t.Error("Get: got error: ", err)
1797 atomic.AddUint32(&cnt[i], 1)
1802 time.Sleep(secs * time.Second)
1803 atomic.StoreUint32(&stop, 1)
1807 func TestDB_ConcurrentIterator(t *testing.T) {
1808 const n, n2 = 4, 1000
1809 h := newDbHarnessWopt(t, &opt.Options{DisableLargeBatchTransaction: true, WriteBuffer: 30})
1812 runtime.GOMAXPROCS(runtime.NumCPU())
1815 closeWg sync.WaitGroup
1819 for i := 0; i < n; i++ {
1822 for k := 0; atomic.LoadUint32(&stop) == 0; k++ {
1823 h.put(fmt.Sprintf("k%d", k), fmt.Sprintf("%d.%d.", k, i)+strings.Repeat("x", 10))
1829 for i := 0; i < n; i++ {
1832 for k := 1000000; k < 0 || atomic.LoadUint32(&stop) == 0; k-- {
1833 h.put(fmt.Sprintf("k%d", k), fmt.Sprintf("%d.%d.", k, i)+strings.Repeat("x", 10))
1839 cmp := comparer.DefaultComparer
1840 for i := 0; i < n2; i++ {
1843 it := h.db.NewIterator(nil, nil)
1847 if cmp.Compare(kk, pk) <= 0 {
1848 t.Errorf("iter %d: %q is successor of %q", i, pk, kk)
1850 pk = append(pk[:0], kk...)
1852 if n, err := fmt.Sscanf(string(it.Key()), "k%d", &k); err != nil {
1853 t.Errorf("iter %d: Scanf error on key %q: %v", i, it.Key(), err)
1855 t.Errorf("iter %d: Cannot parse key %q", i, it.Key())
1857 if n, err := fmt.Sscanf(string(it.Value()), "%d.%d", &vk, &vi); err != nil {
1858 t.Errorf("iter %d: Scanf error on value %q: %v", i, it.Value(), err)
1860 t.Errorf("iter %d: Cannot parse value %q", i, it.Value())
1864 t.Errorf("iter %d: invalid value i=%d, want=%d got=%d", i, vi, k, vk)
1867 if err := it.Error(); err != nil {
1868 t.Errorf("iter %d: Got error: %v", i, err)
1875 atomic.StoreUint32(&stop, 1)
1879 func TestDB_ConcurrentWrite(t *testing.T) {
1880 const n, bk, niter = 10, 3, 10000
1881 h := newDbHarness(t)
1884 runtime.GOMAXPROCS(runtime.NumCPU())
1886 var wg sync.WaitGroup
1887 for i := 0; i < n; i++ {
1891 for k := 0; k < niter; k++ {
1892 kstr := fmt.Sprintf("put-%d.%d", i, k)
1893 vstr := fmt.Sprintf("v%d", k)
1895 // Key should immediately available after put returns.
1896 h.getVal(kstr, vstr)
1900 for i := 0; i < n; i++ {
1905 for k := 0; k < niter; k++ {
1907 for j := 0; j < bk; j++ {
1908 batch.Put([]byte(fmt.Sprintf("batch-%d.%d.%d", i, k, j)), []byte(fmt.Sprintf("v%d", k)))
1911 // Key should immediately available after put returns.
1912 for j := 0; j < bk; j++ {
1913 h.getVal(fmt.Sprintf("batch-%d.%d.%d", i, k, j), fmt.Sprintf("v%d", k))
1921 func TestDB_CreateReopenDbOnFile(t *testing.T) {
1922 dbpath := filepath.Join(os.TempDir(), fmt.Sprintf("goleveldbtestCreateReopenDbOnFile-%d", os.Getuid()))
1923 if err := os.RemoveAll(dbpath); err != nil {
1924 t.Fatal("cannot remove old db: ", err)
1926 defer os.RemoveAll(dbpath)
1928 for i := 0; i < 3; i++ {
1929 stor, err := storage.OpenFile(dbpath, false)
1931 t.Fatalf("(%d) cannot open storage: %s", i, err)
1933 db, err := Open(stor, nil)
1935 t.Fatalf("(%d) cannot open db: %s", i, err)
1937 if err := db.Put([]byte("foo"), []byte("bar"), nil); err != nil {
1938 t.Fatalf("(%d) cannot write to db: %s", i, err)
1940 if err := db.Close(); err != nil {
1941 t.Fatalf("(%d) cannot close db: %s", i, err)
1943 if err := stor.Close(); err != nil {
1944 t.Fatalf("(%d) cannot close storage: %s", i, err)
1949 func TestDB_CreateReopenDbOnFile2(t *testing.T) {
1950 dbpath := filepath.Join(os.TempDir(), fmt.Sprintf("goleveldbtestCreateReopenDbOnFile2-%d", os.Getuid()))
1951 if err := os.RemoveAll(dbpath); err != nil {
1952 t.Fatal("cannot remove old db: ", err)
1954 defer os.RemoveAll(dbpath)
1956 for i := 0; i < 3; i++ {
1957 db, err := OpenFile(dbpath, nil)
1959 t.Fatalf("(%d) cannot open db: %s", i, err)
1961 if err := db.Put([]byte("foo"), []byte("bar"), nil); err != nil {
1962 t.Fatalf("(%d) cannot write to db: %s", i, err)
1964 if err := db.Close(); err != nil {
1965 t.Fatalf("(%d) cannot close db: %s", i, err)
1970 func TestDB_DeletionMarkersOnMemdb(t *testing.T) {
1971 h := newDbHarness(t)
1981 func TestDB_LeveldbIssue178(t *testing.T) {
1982 nKeys := (opt.DefaultCompactionTableSize / 30) * 5
1983 key1 := func(i int) string {
1984 return fmt.Sprintf("my_key_%d", i)
1986 key2 := func(i int) string {
1987 return fmt.Sprintf("my_key_%d_xxx", i)
1990 // Disable compression since it affects the creation of layers and the
1991 // code below is trying to test against a very specific scenario.
1992 h := newDbHarnessWopt(t, &opt.Options{
1993 DisableLargeBatchTransaction: true,
1994 Compression: opt.NoCompression,
1998 // Create first key range.
2000 for i := 0; i < nKeys; i++ {
2001 batch.Put([]byte(key1(i)), []byte("value for range 1 key"))
2005 // Create second key range.
2007 for i := 0; i < nKeys; i++ {
2008 batch.Put([]byte(key2(i)), []byte("value for range 2 key"))
2012 // Delete second key range.
2014 for i := 0; i < nKeys; i++ {
2015 batch.Delete([]byte(key2(i)))
2018 h.waitMemCompaction()
2020 // Run manual compaction.
2021 h.compactRange(key1(0), key1(nKeys-1))
2023 // Checking the keys.
2024 h.assertNumKeys(nKeys)
2027 func TestDB_LeveldbIssue200(t *testing.T) {
2028 h := newDbHarness(t)
2037 iter := h.db.NewIterator(nil, h.ro)
2039 // Add an element that should not be reflected in the iterator.
2042 iter.Seek([]byte("5"))
2043 assertBytes(t, []byte("5"), iter.Key())
2045 assertBytes(t, []byte("4"), iter.Key())
2047 assertBytes(t, []byte("3"), iter.Key())
2049 assertBytes(t, []byte("4"), iter.Key())
2051 assertBytes(t, []byte("5"), iter.Key())
2054 func TestDB_GoleveldbIssue74(t *testing.T) {
2055 h := newDbHarnessWopt(t, &opt.Options{
2056 DisableLargeBatchTransaction: true,
2057 WriteBuffer: 1 * opt.MiB,
2061 const n, dur = 10000, 5 * time.Second
2063 runtime.GOMAXPROCS(runtime.NumCPU())
2065 until := time.Now().Add(dur)
2066 wg := new(sync.WaitGroup)
2072 t.Logf("WRITER DONE #%d", i)
2073 atomic.StoreUint32(&done, 1)
2078 for ; time.Now().Before(until) && atomic.LoadUint32(&done) == 0; i++ {
2079 iv := fmt.Sprintf("VAL%010d", i)
2080 for k := 0; k < n; k++ {
2081 key := fmt.Sprintf("KEY%06d", k)
2082 b.Put([]byte(key), []byte(key+iv))
2083 b.Put([]byte(fmt.Sprintf("PTR%06d", k)), []byte(key))
2088 snap := h.getSnapshot()
2089 iter := snap.NewIterator(util.BytesPrefix([]byte("PTR")), nil)
2091 for ; iter.Next(); k++ {
2092 ptrKey := iter.Key()
2095 if _, err := snap.Get(ptrKey, nil); err != nil {
2096 t.Fatalf("WRITER #%d snapshot.Get %q: %v", i, ptrKey, err)
2098 if value, err := snap.Get(key, nil); err != nil {
2099 t.Fatalf("WRITER #%d snapshot.Get %q: %v", i, key, err)
2100 } else if string(value) != string(key)+iv {
2101 t.Fatalf("WRITER #%d snapshot.Get %q got invalid value, want %q got %q", i, key, string(key)+iv, value)
2111 t.Fatalf("#%d %d != %d", i, k, n)
2118 t.Logf("READER DONE #%d", i)
2119 atomic.StoreUint32(&done, 1)
2122 for ; time.Now().Before(until) && atomic.LoadUint32(&done) == 0; i++ {
2123 snap := h.getSnapshot()
2124 iter := snap.NewIterator(util.BytesPrefix([]byte("PTR")), nil)
2125 var prevValue string
2127 for ; iter.Next(); k++ {
2128 ptrKey := iter.Key()
2131 if _, err := snap.Get(ptrKey, nil); err != nil {
2132 t.Fatalf("READER #%d snapshot.Get %q: %v", i, ptrKey, err)
2135 if value, err := snap.Get(key, nil); err != nil {
2136 t.Fatalf("READER #%d snapshot.Get %q: %v", i, key, err)
2137 } else if prevValue != "" && string(value) != string(key)+prevValue {
2138 t.Fatalf("READER #%d snapshot.Get %q got invalid value, want %q got %q", i, key, string(key)+prevValue, value)
2140 prevValue = string(value[len(key):])
2145 if k > 0 && k != n {
2146 t.Fatalf("#%d %d != %d", i, k, n)
2153 func TestDB_GetProperties(t *testing.T) {
2154 h := newDbHarness(t)
2157 _, err := h.db.GetProperty("leveldb.num-files-at-level")
2159 t.Error("GetProperty() failed to detect missing level")
2162 _, err = h.db.GetProperty("leveldb.num-files-at-level0")
2164 t.Error("got unexpected error", err)
2167 _, err = h.db.GetProperty("leveldb.num-files-at-level0x")
2169 t.Error("GetProperty() failed to detect invalid level")
2173 func TestDB_GoleveldbIssue72and83(t *testing.T) {
2174 h := newDbHarnessWopt(t, &opt.Options{
2175 DisableLargeBatchTransaction: true,
2176 WriteBuffer: 1 * opt.MiB,
2177 OpenFilesCacheCapacity: 3,
2181 const n, wn, dur = 10000, 100, 30 * time.Second
2183 runtime.GOMAXPROCS(runtime.NumCPU())
2185 randomData := func(prefix byte, i int) []byte {
2186 data := make([]byte, 1+4+32+64+32)
2187 _, err := crand.Reader.Read(data[1 : len(data)-8])
2192 binary.LittleEndian.PutUint32(data[len(data)-8:], uint32(i))
2193 binary.LittleEndian.PutUint32(data[len(data)-4:], util.NewCRC(data[:len(data)-4]).Value())
2197 keys := make([][]byte, n)
2198 for i := range keys {
2199 keys[i] = randomData(1, 0)
2202 until := time.Now().Add(dur)
2203 wg := new(sync.WaitGroup)
2209 t.Logf("WRITER DONE #%d", i)
2214 for ; i < wn && atomic.LoadUint32(&done) == 0; i++ {
2216 for _, k1 := range keys {
2217 k2 := randomData(2, i)
2218 b.Put(k2, randomData(42, i))
2221 if err := h.db.Write(b, h.wo); err != nil {
2222 atomic.StoreUint32(&done, 1)
2223 t.Fatalf("WRITER #%d db.Write: %v", i, err)
2230 t.Logf("READER0 DONE #%d", i)
2231 atomic.StoreUint32(&done, 1)
2234 for ; time.Now().Before(until) && atomic.LoadUint32(&done) == 0; i++ {
2235 snap := h.getSnapshot()
2236 seq := snap.elem.seq
2241 iter := snap.NewIterator(util.BytesPrefix([]byte{1}), nil)
2242 writei := int(seq/(n*2) - 1)
2244 for ; iter.Next(); k++ {
2247 k1checksum0 := binary.LittleEndian.Uint32(k1[len(k1)-4:])
2248 k1checksum1 := util.NewCRC(k1[:len(k1)-4]).Value()
2249 if k1checksum0 != k1checksum1 {
2250 t.Fatalf("READER0 #%d.%d W#%d invalid K1 checksum: %#x != %#x", i, k, k1checksum0, k1checksum0)
2252 k2checksum0 := binary.LittleEndian.Uint32(k2[len(k2)-4:])
2253 k2checksum1 := util.NewCRC(k2[:len(k2)-4]).Value()
2254 if k2checksum0 != k2checksum1 {
2255 t.Fatalf("READER0 #%d.%d W#%d invalid K2 checksum: %#x != %#x", i, k, k2checksum0, k2checksum1)
2257 kwritei := int(binary.LittleEndian.Uint32(k2[len(k2)-8:]))
2258 if writei != kwritei {
2259 t.Fatalf("READER0 #%d.%d W#%d invalid write iteration num: %d", i, k, writei, kwritei)
2261 if _, err := snap.Get(k2, nil); err != nil {
2262 t.Fatalf("READER0 #%d.%d W#%d snap.Get: %v\nk1: %x\n -> k2: %x", i, k, writei, err, k1, k2)
2265 if err := iter.Error(); err != nil {
2266 t.Fatalf("READER0 #%d.%d W#%d snap.Iterator: %v", i, k, writei, err)
2270 if k > 0 && k != n {
2271 t.Fatalf("READER0 #%d W#%d short read, got=%d want=%d", i, writei, k, n)
2278 t.Logf("READER1 DONE #%d", i)
2279 atomic.StoreUint32(&done, 1)
2282 for ; time.Now().Before(until) && atomic.LoadUint32(&done) == 0; i++ {
2283 iter := h.db.NewIterator(nil, nil)
2284 seq := iter.(*dbIter).seq
2289 writei := int(seq/(n*2) - 1)
2291 for ok := iter.Last(); ok; ok = iter.Prev() {
2294 if err := iter.Error(); err != nil {
2295 t.Fatalf("READER1 #%d.%d W#%d db.Iterator: %v", i, k, writei, err)
2298 if m := (writei+1)*n + n; k != m {
2299 t.Fatalf("READER1 #%d W#%d short read, got=%d want=%d", i, writei, k, m)
2307 func TestDB_TransientError(t *testing.T) {
2308 h := newDbHarnessWopt(t, &opt.Options{
2309 DisableLargeBatchTransaction: true,
2310 WriteBuffer: 128 * opt.KiB,
2311 OpenFilesCacheCapacity: 3,
2312 DisableCompactionBackoff: true,
2322 snaps [nSnap]*Snapshot
2325 for i := range snaps {
2326 vtail := fmt.Sprintf("VAL%030d", i)
2328 for k := 0; k < nKey; k++ {
2329 key := fmt.Sprintf("KEY%8d", k)
2330 b.Put([]byte(key), []byte(key+vtail))
2332 h.stor.EmulateError(testutil.ModeOpen|testutil.ModeRead, storage.TypeTable, errors.New("table transient read error"))
2333 if err := h.db.Write(b, nil); err != nil {
2334 t.Logf("WRITE #%d error: %v", i, err)
2335 h.stor.EmulateError(testutil.ModeOpen|testutil.ModeRead, storage.TypeTable, nil)
2337 if err := h.db.Write(b, nil); err == nil {
2339 } else if errors.IsCorrupted(err) {
2340 t.Fatalf("WRITE #%d corrupted: %v", i, err)
2345 snaps[i] = h.db.newSnapshot()
2347 for k := 0; k < nKey; k++ {
2348 key := fmt.Sprintf("KEY%8d", k)
2349 b.Delete([]byte(key))
2351 h.stor.EmulateError(testutil.ModeOpen|testutil.ModeRead, storage.TypeTable, errors.New("table transient read error"))
2352 if err := h.db.Write(b, nil); err != nil {
2353 t.Logf("WRITE #%d error: %v", i, err)
2354 h.stor.EmulateError(testutil.ModeOpen|testutil.ModeRead, storage.TypeTable, nil)
2356 if err := h.db.Write(b, nil); err == nil {
2358 } else if errors.IsCorrupted(err) {
2359 t.Fatalf("WRITE #%d corrupted: %v", i, err)
2364 h.stor.EmulateError(testutil.ModeOpen|testutil.ModeRead, storage.TypeTable, nil)
2366 runtime.GOMAXPROCS(runtime.NumCPU())
2368 rnd := rand.New(rand.NewSource(0xecafdaed))
2369 wg := &sync.WaitGroup{}
2370 for i, snap := range snaps {
2373 go func(i int, snap *Snapshot, sk []int) {
2376 vtail := fmt.Sprintf("VAL%030d", i)
2377 for _, k := range sk {
2378 key := fmt.Sprintf("KEY%8d", k)
2379 xvalue, err := snap.Get([]byte(key), nil)
2381 t.Fatalf("READER_GET #%d SEQ=%d K%d error: %v", i, snap.elem.seq, k, err)
2383 value := key + vtail
2384 if !bytes.Equal([]byte(value), xvalue) {
2385 t.Fatalf("READER_GET #%d SEQ=%d K%d invalid value: want %q, got %q", i, snap.elem.seq, k, value, xvalue)
2388 }(i, snap, rnd.Perm(nKey))
2390 go func(i int, snap *Snapshot) {
2393 vtail := fmt.Sprintf("VAL%030d", i)
2394 iter := snap.NewIterator(nil, nil)
2395 defer iter.Release()
2396 for k := 0; k < nKey; k++ {
2398 if err := iter.Error(); err != nil {
2399 t.Fatalf("READER_ITER #%d K%d error: %v", i, k, err)
2401 t.Fatalf("READER_ITER #%d K%d eoi", i, k)
2404 key := fmt.Sprintf("KEY%8d", k)
2406 if !bytes.Equal([]byte(key), xkey) {
2407 t.Fatalf("READER_ITER #%d K%d invalid key: want %q, got %q", i, k, key, xkey)
2409 value := key + vtail
2410 xvalue := iter.Value()
2411 if !bytes.Equal([]byte(value), xvalue) {
2412 t.Fatalf("READER_ITER #%d K%d invalid value: want %q, got %q", i, k, value, xvalue)
2421 func TestDB_UkeyShouldntHopAcrossTable(t *testing.T) {
2422 h := newDbHarnessWopt(t, &opt.Options{
2423 DisableLargeBatchTransaction: true,
2424 WriteBuffer: 112 * opt.KiB,
2425 CompactionTableSize: 90 * opt.KiB,
2426 CompactionExpandLimitFactor: 1,
2436 snaps [nSnap]*Snapshot
2439 for i := range snaps {
2440 vtail := fmt.Sprintf("VAL%030d", i)
2442 for k := 0; k < nKey; k++ {
2443 key := fmt.Sprintf("KEY%08d", k)
2444 b.Put([]byte(key), []byte(key+vtail))
2446 if err := h.db.Write(b, nil); err != nil {
2447 t.Fatalf("WRITE #%d error: %v", i, err)
2450 snaps[i] = h.db.newSnapshot()
2452 for k := 0; k < nKey; k++ {
2453 key := fmt.Sprintf("KEY%08d", k)
2454 b.Delete([]byte(key))
2456 if err := h.db.Write(b, nil); err != nil {
2457 t.Fatalf("WRITE #%d error: %v", i, err)
2464 for level, tables := range h.db.s.stVersion.levels {
2465 for _, table := range tables {
2466 t.Logf("L%d@%d %q:%q", level, table.fd.Num, table.imin, table.imax)
2470 h.compactRangeAt(0, "", "")
2472 for level, tables := range h.db.s.stVersion.levels {
2473 for _, table := range tables {
2474 t.Logf("L%d@%d %q:%q", level, table.fd.Num, table.imin, table.imax)
2477 h.compactRangeAt(1, "", "")
2479 for level, tables := range h.db.s.stVersion.levels {
2480 for _, table := range tables {
2481 t.Logf("L%d@%d %q:%q", level, table.fd.Num, table.imin, table.imax)
2484 runtime.GOMAXPROCS(runtime.NumCPU())
2486 wg := &sync.WaitGroup{}
2487 for i, snap := range snaps {
2490 go func(i int, snap *Snapshot) {
2493 vtail := fmt.Sprintf("VAL%030d", i)
2494 for k := 0; k < nKey; k++ {
2495 key := fmt.Sprintf("KEY%08d", k)
2496 xvalue, err := snap.Get([]byte(key), nil)
2498 t.Fatalf("READER_GET #%d SEQ=%d K%d error: %v", i, snap.elem.seq, k, err)
2500 value := key + vtail
2501 if !bytes.Equal([]byte(value), xvalue) {
2502 t.Fatalf("READER_GET #%d SEQ=%d K%d invalid value: want %q, got %q", i, snap.elem.seq, k, value, xvalue)
2511 func TestDB_TableCompactionBuilder(t *testing.T) {
2512 gomega.RegisterTestingT(t)
2513 stor := testutil.NewStorage()
2514 stor.OnLog(testingLogger(t))
2515 stor.OnClose(testingPreserveOnFailed(t))
2521 DisableLargeBatchTransaction: true,
2522 WriteBuffer: 112 * opt.KiB,
2523 CompactionTableSize: 43 * opt.KiB,
2524 CompactionExpandLimitFactor: 1,
2525 CompactionGPOverlapsFactor: 1,
2526 DisableBlockCache: true,
2528 s, err := newSession(stor, o)
2532 if err := s.create(); err != nil {
2538 targetSize = 5 * o.CompactionTableSize
2539 value = bytes.Repeat([]byte{'0'}, 100)
2541 for i := 0; i < 2; i++ {
2542 tw, err := s.tops.create()
2546 for k := 0; tw.tw.BytesLen() < targetSize; k++ {
2547 key := []byte(fmt.Sprintf("%09d", k))
2549 for x := uint64(0); x < nSeq; x++ {
2550 if err := tw.append(makeInternalKey(nil, key, seq-x, keyTypeVal), value); err != nil {
2555 tf, err := tw.finish()
2559 rec := &sessionRecord{}
2560 rec.addTableFile(i, tf)
2561 if err := s.commit(rec); err != nil {
2566 // Build grandparent.
2568 c := newCompaction(s, v, 1, append(tFiles{}, v.levels[1]...))
2569 rec := &sessionRecord{}
2570 b := &tableCompactionBuilder{
2574 stat1: new(cStatStaging),
2577 tableSize: o.CompactionTableSize/3 + 961,
2579 if err := b.run(new(compactionTransactCounter)); err != nil {
2582 for _, t := range c.levels[0] {
2583 rec.delTable(c.sourceLevel, t.fd.Num)
2585 if err := s.commit(rec); err != nil {
2592 c = newCompaction(s, v, 0, append(tFiles{}, v.levels[0]...))
2593 rec = &sessionRecord{}
2594 b = &tableCompactionBuilder{
2598 stat1: new(cStatStaging),
2601 tableSize: o.CompactionTableSize,
2603 if err := b.run(new(compactionTransactCounter)); err != nil {
2606 for _, t := range c.levels[0] {
2607 rec.delTable(c.sourceLevel, t.fd.Num)
2609 // Move grandparent to level-3
2610 for _, t := range v.levels[2] {
2611 rec.delTable(2, t.fd.Num)
2612 rec.addTableFile(3, t)
2614 if err := s.commit(rec); err != nil {
2620 for level, want := range []bool{false, true, false, true} {
2621 got := len(v.levels[level]) > 0
2623 t.Fatalf("invalid level-%d tables len: want %v, got %v", level, want, got)
2626 for i, f := range v.levels[1][:len(v.levels[1])-1] {
2627 nf := v.levels[1][i+1]
2628 if bytes.Equal(f.imax.ukey(), nf.imin.ukey()) {
2629 t.Fatalf("KEY %q hop across table %d .. %d", f.imax.ukey(), f.fd.Num, nf.fd.Num)
2634 // Compaction with transient error.
2636 c = newCompaction(s, v, 1, append(tFiles{}, v.levels[1]...))
2637 rec = &sessionRecord{}
2638 b = &tableCompactionBuilder{
2642 stat1: new(cStatStaging),
2645 tableSize: o.CompactionTableSize,
2647 stor.EmulateErrorOnce(testutil.ModeSync, storage.TypeTable, errors.New("table sync error (once)"))
2648 stor.EmulateRandomError(testutil.ModeRead|testutil.ModeWrite, storage.TypeTable, 0.01, errors.New("table random IO error"))
2650 if err := b.run(new(compactionTransactCounter)); err != nil {
2651 t.Logf("(expected) b.run: %v", err)
2656 if err := s.commit(rec); err != nil {
2661 stor.EmulateErrorOnce(testutil.ModeSync, storage.TypeTable, nil)
2662 stor.EmulateRandomError(testutil.ModeRead|testutil.ModeWrite, storage.TypeTable, 0, nil)
2665 if len(v.levels[1]) != len(v.levels[2]) {
2666 t.Fatalf("invalid tables length, want %d, got %d", len(v.levels[1]), len(v.levels[2]))
2668 for i, f0 := range v.levels[1] {
2669 f1 := v.levels[2][i]
2670 iter0 := s.tops.newIterator(f0, nil, nil)
2671 iter1 := s.tops.newIterator(f1, nil, nil)
2672 for j := 0; true; j++ {
2673 next0 := iter0.Next()
2674 next1 := iter1.Next()
2676 t.Fatalf("#%d.%d invalid eoi: want %v, got %v", i, j, next0, next1)
2680 if !bytes.Equal(key0, key1) {
2681 t.Fatalf("#%d.%d invalid key: want %q, got %q", i, j, key0, key1)
2693 func testDB_IterTriggeredCompaction(t *testing.T, limitDiv int) {
2695 vSize = 200 * opt.KiB
2696 tSize = 100 * opt.MiB
2701 h := newDbHarnessWopt(t, &opt.Options{
2702 DisableLargeBatchTransaction: true,
2703 Compression: opt.NoCompression,
2704 DisableBlockCache: true,
2708 h.db.memdbMaxLevel = 2
2710 key := func(x int) string {
2711 return fmt.Sprintf("v%06d", x)
2715 value := strings.Repeat("x", vSize)
2716 for i := 0; i < n; i++ {
2717 h.put(key(i), value)
2722 for i := 0; i < n; i++ {
2728 limit = n / limitDiv
2731 limitKey = key(limit)
2733 slice = &util.Range{Limit: []byte(limitKey)}
2735 initialSize0 = h.sizeOf(startKey, limitKey)
2736 initialSize1 = h.sizeOf(limitKey, maxKey)
2739 t.Logf("inital size %s [rest %s]", shortenb(int(initialSize0)), shortenb(int(initialSize1)))
2741 for r := 0; true; r++ {
2743 t.Fatal("taking too long to compact")
2747 iter := h.db.NewIterator(slice, h.ro)
2750 if err := iter.Error(); err != nil {
2751 t.Fatalf("Iter err: %v", err)
2759 size0 := h.sizeOf(startKey, limitKey)
2760 size1 := h.sizeOf(limitKey, maxKey)
2761 t.Logf("#%03d size %s [rest %s]", r, shortenb(int(size0)), shortenb(int(size1)))
2762 if size0 < initialSize0/10 {
2767 if initialSize1 > 0 {
2768 h.sizeAssert(limitKey, maxKey, initialSize1/4-opt.MiB, initialSize1+opt.MiB)
2772 func TestDB_IterTriggeredCompaction(t *testing.T) {
2773 testDB_IterTriggeredCompaction(t, 1)
2776 func TestDB_IterTriggeredCompactionHalf(t *testing.T) {
2777 testDB_IterTriggeredCompaction(t, 2)
2780 func TestDB_ReadOnly(t *testing.T) {
2781 h := newDbHarness(t)
2791 t.Log("Trigger read-only")
2792 if err := h.db.SetReadOnly(); err != nil {
2794 t.Fatalf("SetReadOnly error: %v", err)
2797 mode := testutil.ModeCreate | testutil.ModeRemove | testutil.ModeRename | testutil.ModeWrite | testutil.ModeSync
2798 h.stor.EmulateError(mode, storage.TypeAll, errors.New("read-only DB shouldn't writes"))
2800 ro := func(key, value, wantValue string) {
2801 if err := h.db.Put([]byte(key), []byte(value), h.wo); err != ErrReadOnly {
2802 t.Fatalf("unexpected error: %v", err)
2804 h.getVal(key, wantValue)
2807 ro("foo", "vx", "v1")
2812 ro("foo", "vx", "v1")
2813 ro("bar", "vx", "v2")
2817 func TestDB_BulkInsertDelete(t *testing.T) {
2818 h := newDbHarnessWopt(t, &opt.Options{
2819 DisableLargeBatchTransaction: true,
2820 Compression: opt.NoCompression,
2821 CompactionTableSize: 128 * opt.KiB,
2822 CompactionTotalSize: 1 * opt.MiB,
2823 WriteBuffer: 256 * opt.KiB,
2829 key := make([]byte, 4)
2830 value := make([]byte, 256)
2831 for i := 0; i < R; i++ {
2833 for j := 0; j < N; j++ {
2834 binary.BigEndian.PutUint32(key, uint32(offset+j))
2835 h.db.Put(key, value, nil)
2837 for j := 0; j < N; j++ {
2838 binary.BigEndian.PutUint32(key, uint32(offset+j))
2839 h.db.Delete(key, nil)
2843 if tot := h.totalTables(); tot > 10 {
2844 t.Fatalf("too many uncompacted tables: %d (%s)", tot, h.getTablesPerLevel())
2848 func TestDB_GracefulClose(t *testing.T) {
2849 runtime.GOMAXPROCS(4)
2850 h := newDbHarnessWopt(t, &opt.Options{
2851 DisableLargeBatchTransaction: true,
2852 Compression: opt.NoCompression,
2853 CompactionTableSize: 1 * opt.MiB,
2854 WriteBuffer: 1 * opt.MiB,
2858 var closeWait sync.WaitGroup
2863 for i := 0; i < 1000000; i++ {
2864 if !closing && h.totalTables() > 3 {
2865 t.Logf("close db during write, index=%d", i)
2873 if err := h.db.Put([]byte(fmt.Sprintf("%09d", i)), []byte(fmt.Sprintf("VAL-%09d", i)), h.wo); err != nil {
2874 t.Logf("Put error: %s (expected)", err)
2884 for i := 0; i < n; i++ {
2885 if !closing && i > n/2 {
2886 t.Logf("close db during read, index=%d", i)
2894 if _, err := h.db.Get([]byte(fmt.Sprintf("%09d", i)), h.ro); err != nil {
2895 t.Logf("Get error: %s (expected)", err)
2904 iter := h.db.NewIterator(nil, h.ro)
2905 for i := 0; iter.Next(); i++ {
2906 if len(iter.Key()) == 0 || len(iter.Value()) == 0 {
2907 t.Error("Key or value has zero length")
2910 t.Logf("close db during iter, index=%d", i)
2918 time.Sleep(time.Millisecond)
2920 if err := iter.Error(); err != nil {
2921 t.Logf("Iter error: %s (expected)", err)