OSDN Git Service

new repo
[bytom/vapor.git] / vendor / github.com / syndtr / goleveldb / leveldb / db_test.go
1 // Copyright (c) 2012, Suryandaru Triandana <syndtr@gmail.com>
2 // All rights reserved.
3 //
4 // Use of this source code is governed by a BSD-style license that can be
5 // found in the LICENSE file.
6
7 package leveldb
8
9 import (
10         "bytes"
11         "container/list"
12         crand "crypto/rand"
13         "encoding/binary"
14         "fmt"
15         "math/rand"
16         "os"
17         "path/filepath"
18         "runtime"
19         "strings"
20         "sync"
21         "sync/atomic"
22         "testing"
23         "time"
24         "unsafe"
25
26         "github.com/onsi/gomega"
27
28         "github.com/syndtr/goleveldb/leveldb/comparer"
29         "github.com/syndtr/goleveldb/leveldb/errors"
30         "github.com/syndtr/goleveldb/leveldb/filter"
31         "github.com/syndtr/goleveldb/leveldb/iterator"
32         "github.com/syndtr/goleveldb/leveldb/opt"
33         "github.com/syndtr/goleveldb/leveldb/storage"
34         "github.com/syndtr/goleveldb/leveldb/testutil"
35         "github.com/syndtr/goleveldb/leveldb/util"
36 )
37
38 func tkey(i int) []byte {
39         return []byte(fmt.Sprintf("%016d", i))
40 }
41
42 func tval(seed, n int) []byte {
43         r := rand.New(rand.NewSource(int64(seed)))
44         return randomString(r, n)
45 }
46
47 func testingLogger(t *testing.T) func(log string) {
48         return func(log string) {
49                 t.Log(log)
50         }
51 }
52
53 func testingPreserveOnFailed(t *testing.T) func() (preserve bool, err error) {
54         return func() (preserve bool, err error) {
55                 preserve = t.Failed()
56                 return
57         }
58 }
59
60 type dbHarness struct {
61         t *testing.T
62
63         stor *testutil.Storage
64         db   *DB
65         o    *opt.Options
66         ro   *opt.ReadOptions
67         wo   *opt.WriteOptions
68 }
69
70 func newDbHarnessWopt(t *testing.T, o *opt.Options) *dbHarness {
71         h := new(dbHarness)
72         h.init(t, o)
73         return h
74 }
75
76 func newDbHarness(t *testing.T) *dbHarness {
77         return newDbHarnessWopt(t, &opt.Options{DisableLargeBatchTransaction: true})
78 }
79
80 func (h *dbHarness) init(t *testing.T, o *opt.Options) {
81         gomega.RegisterTestingT(t)
82         h.t = t
83         h.stor = testutil.NewStorage()
84         h.stor.OnLog(testingLogger(t))
85         h.stor.OnClose(testingPreserveOnFailed(t))
86         h.o = o
87         h.ro = nil
88         h.wo = nil
89
90         if err := h.openDB0(); err != nil {
91                 // So that it will come after fatal message.
92                 defer h.stor.Close()
93                 h.t.Fatal("Open (init): got error: ", err)
94         }
95 }
96
97 func (h *dbHarness) openDB0() (err error) {
98         h.t.Log("opening DB")
99         h.db, err = Open(h.stor, h.o)
100         return
101 }
102
103 func (h *dbHarness) openDB() {
104         if err := h.openDB0(); err != nil {
105                 h.t.Fatal("Open: got error: ", err)
106         }
107 }
108
109 func (h *dbHarness) closeDB0() error {
110         h.t.Log("closing DB")
111         return h.db.Close()
112 }
113
114 func (h *dbHarness) closeDB() {
115         if h.db != nil {
116                 if err := h.closeDB0(); err != nil {
117                         h.t.Error("Close: got error: ", err)
118                 }
119         }
120         h.stor.CloseCheck()
121         runtime.GC()
122 }
123
124 func (h *dbHarness) reopenDB() {
125         if h.db != nil {
126                 h.closeDB()
127         }
128         h.openDB()
129 }
130
131 func (h *dbHarness) close() {
132         if h.db != nil {
133                 h.closeDB0()
134                 h.db = nil
135         }
136         h.stor.Close()
137         h.stor = nil
138         runtime.GC()
139 }
140
141 func (h *dbHarness) openAssert(want bool) {
142         db, err := Open(h.stor, h.o)
143         if err != nil {
144                 if want {
145                         h.t.Error("Open: assert: got error: ", err)
146                 } else {
147                         h.t.Log("Open: assert: got error (expected): ", err)
148                 }
149         } else {
150                 if !want {
151                         h.t.Error("Open: assert: expect error")
152                 }
153                 db.Close()
154         }
155 }
156
157 func (h *dbHarness) write(batch *Batch) {
158         if err := h.db.Write(batch, h.wo); err != nil {
159                 h.t.Error("Write: got error: ", err)
160         }
161 }
162
163 func (h *dbHarness) put(key, value string) {
164         if err := h.db.Put([]byte(key), []byte(value), h.wo); err != nil {
165                 h.t.Error("Put: got error: ", err)
166         }
167 }
168
169 func (h *dbHarness) putMulti(n int, low, hi string) {
170         for i := 0; i < n; i++ {
171                 h.put(low, "begin")
172                 h.put(hi, "end")
173                 h.compactMem()
174         }
175 }
176
177 func (h *dbHarness) maxNextLevelOverlappingBytes(want int64) {
178         t := h.t
179         db := h.db
180
181         var (
182                 maxOverlaps int64
183                 maxLevel    int
184         )
185         v := db.s.version()
186         if len(v.levels) > 2 {
187                 for i, tt := range v.levels[1 : len(v.levels)-1] {
188                         level := i + 1
189                         next := v.levels[level+1]
190                         for _, t := range tt {
191                                 r := next.getOverlaps(nil, db.s.icmp, t.imin.ukey(), t.imax.ukey(), false)
192                                 sum := r.size()
193                                 if sum > maxOverlaps {
194                                         maxOverlaps = sum
195                                         maxLevel = level
196                                 }
197                         }
198                 }
199         }
200         v.release()
201
202         if maxOverlaps > want {
203                 t.Errorf("next level most overlapping bytes is more than %d, got=%d level=%d", want, maxOverlaps, maxLevel)
204         } else {
205                 t.Logf("next level most overlapping bytes is %d, level=%d want=%d", maxOverlaps, maxLevel, want)
206         }
207 }
208
209 func (h *dbHarness) delete(key string) {
210         t := h.t
211         db := h.db
212
213         err := db.Delete([]byte(key), h.wo)
214         if err != nil {
215                 t.Error("Delete: got error: ", err)
216         }
217 }
218
219 func (h *dbHarness) assertNumKeys(want int) {
220         iter := h.db.NewIterator(nil, h.ro)
221         defer iter.Release()
222         got := 0
223         for iter.Next() {
224                 got++
225         }
226         if err := iter.Error(); err != nil {
227                 h.t.Error("assertNumKeys: ", err)
228         }
229         if want != got {
230                 h.t.Errorf("assertNumKeys: want=%d got=%d", want, got)
231         }
232 }
233
234 func (h *dbHarness) getr(db Reader, key string, expectFound bool) (found bool, v []byte) {
235         t := h.t
236         v, err := db.Get([]byte(key), h.ro)
237         switch err {
238         case ErrNotFound:
239                 if expectFound {
240                         t.Errorf("Get: key '%s' not found, want found", key)
241                 }
242         case nil:
243                 found = true
244                 if !expectFound {
245                         t.Errorf("Get: key '%s' found, want not found", key)
246                 }
247         default:
248                 t.Error("Get: got error: ", err)
249         }
250         return
251 }
252
253 func (h *dbHarness) get(key string, expectFound bool) (found bool, v []byte) {
254         return h.getr(h.db, key, expectFound)
255 }
256
257 func (h *dbHarness) getValr(db Reader, key, value string) {
258         t := h.t
259         found, r := h.getr(db, key, true)
260         if !found {
261                 return
262         }
263         rval := string(r)
264         if rval != value {
265                 t.Errorf("Get: invalid value, got '%s', want '%s'", rval, value)
266         }
267 }
268
269 func (h *dbHarness) getVal(key, value string) {
270         h.getValr(h.db, key, value)
271 }
272
273 func (h *dbHarness) allEntriesFor(key, want string) {
274         t := h.t
275         db := h.db
276         s := db.s
277
278         ikey := makeInternalKey(nil, []byte(key), keyMaxSeq, keyTypeVal)
279         iter := db.newRawIterator(nil, nil, nil, nil)
280         if !iter.Seek(ikey) && iter.Error() != nil {
281                 t.Error("AllEntries: error during seek, err: ", iter.Error())
282                 return
283         }
284         res := "[ "
285         first := true
286         for iter.Valid() {
287                 if ukey, _, kt, kerr := parseInternalKey(iter.Key()); kerr == nil {
288                         if s.icmp.uCompare(ikey.ukey(), ukey) != 0 {
289                                 break
290                         }
291                         if !first {
292                                 res += ", "
293                         }
294                         first = false
295                         switch kt {
296                         case keyTypeVal:
297                                 res += string(iter.Value())
298                         case keyTypeDel:
299                                 res += "DEL"
300                         }
301                 } else {
302                         if !first {
303                                 res += ", "
304                         }
305                         first = false
306                         res += "CORRUPTED"
307                 }
308                 iter.Next()
309         }
310         if !first {
311                 res += " "
312         }
313         res += "]"
314         if res != want {
315                 t.Errorf("AllEntries: assert failed for key %q, got=%q want=%q", key, res, want)
316         }
317 }
318
319 // Return a string that contains all key,value pairs in order,
320 // formatted like "(k1->v1)(k2->v2)".
321 func (h *dbHarness) getKeyVal(want string) {
322         t := h.t
323         db := h.db
324
325         s, err := db.GetSnapshot()
326         if err != nil {
327                 t.Fatal("GetSnapshot: got error: ", err)
328         }
329         res := ""
330         iter := s.NewIterator(nil, nil)
331         for iter.Next() {
332                 res += fmt.Sprintf("(%s->%s)", string(iter.Key()), string(iter.Value()))
333         }
334         iter.Release()
335
336         if res != want {
337                 t.Errorf("GetKeyVal: invalid key/value pair, got=%q want=%q", res, want)
338         }
339         s.Release()
340 }
341
342 func (h *dbHarness) waitCompaction() {
343         t := h.t
344         db := h.db
345         if err := db.compTriggerWait(db.tcompCmdC); err != nil {
346                 t.Error("compaction error: ", err)
347         }
348 }
349
350 func (h *dbHarness) waitMemCompaction() {
351         t := h.t
352         db := h.db
353
354         if err := db.compTriggerWait(db.mcompCmdC); err != nil {
355                 t.Error("compaction error: ", err)
356         }
357 }
358
359 func (h *dbHarness) compactMem() {
360         t := h.t
361         db := h.db
362
363         t.Log("starting memdb compaction")
364
365         db.writeLockC <- struct{}{}
366         defer func() {
367                 <-db.writeLockC
368         }()
369
370         if _, err := db.rotateMem(0, true); err != nil {
371                 t.Error("compaction error: ", err)
372         }
373
374         if h.totalTables() == 0 {
375                 t.Error("zero tables after mem compaction")
376         }
377
378         t.Log("memdb compaction done")
379 }
380
381 func (h *dbHarness) compactRangeAtErr(level int, min, max string, wanterr bool) {
382         t := h.t
383         db := h.db
384
385         var _min, _max []byte
386         if min != "" {
387                 _min = []byte(min)
388         }
389         if max != "" {
390                 _max = []byte(max)
391         }
392
393         t.Logf("starting table range compaction: level=%d, min=%q, max=%q", level, min, max)
394
395         if err := db.compTriggerRange(db.tcompCmdC, level, _min, _max); err != nil {
396                 if wanterr {
397                         t.Log("CompactRangeAt: got error (expected): ", err)
398                 } else {
399                         t.Error("CompactRangeAt: got error: ", err)
400                 }
401         } else if wanterr {
402                 t.Error("CompactRangeAt: expect error")
403         }
404
405         t.Log("table range compaction done")
406 }
407
408 func (h *dbHarness) compactRangeAt(level int, min, max string) {
409         h.compactRangeAtErr(level, min, max, false)
410 }
411
412 func (h *dbHarness) compactRange(min, max string) {
413         t := h.t
414         db := h.db
415
416         t.Logf("starting DB range compaction: min=%q, max=%q", min, max)
417
418         var r util.Range
419         if min != "" {
420                 r.Start = []byte(min)
421         }
422         if max != "" {
423                 r.Limit = []byte(max)
424         }
425         if err := db.CompactRange(r); err != nil {
426                 t.Error("CompactRange: got error: ", err)
427         }
428
429         t.Log("DB range compaction done")
430 }
431
432 func (h *dbHarness) sizeOf(start, limit string) int64 {
433         sz, err := h.db.SizeOf([]util.Range{
434                 {[]byte(start), []byte(limit)},
435         })
436         if err != nil {
437                 h.t.Error("SizeOf: got error: ", err)
438         }
439         return sz.Sum()
440 }
441
442 func (h *dbHarness) sizeAssert(start, limit string, low, hi int64) {
443         sz := h.sizeOf(start, limit)
444         if sz < low || sz > hi {
445                 h.t.Errorf("sizeOf %q to %q not in range, want %d - %d, got %d",
446                         shorten(start), shorten(limit), low, hi, sz)
447         }
448 }
449
450 func (h *dbHarness) getSnapshot() (s *Snapshot) {
451         s, err := h.db.GetSnapshot()
452         if err != nil {
453                 h.t.Fatal("GetSnapshot: got error: ", err)
454         }
455         return
456 }
457
458 func (h *dbHarness) getTablesPerLevel() string {
459         res := ""
460         nz := 0
461         v := h.db.s.version()
462         for level, tables := range v.levels {
463                 if level > 0 {
464                         res += ","
465                 }
466                 res += fmt.Sprint(len(tables))
467                 if len(tables) > 0 {
468                         nz = len(res)
469                 }
470         }
471         v.release()
472         return res[:nz]
473 }
474
475 func (h *dbHarness) tablesPerLevel(want string) {
476         res := h.getTablesPerLevel()
477         if res != want {
478                 h.t.Errorf("invalid tables len, want=%s, got=%s", want, res)
479         }
480 }
481
482 func (h *dbHarness) totalTables() (n int) {
483         v := h.db.s.version()
484         for _, tables := range v.levels {
485                 n += len(tables)
486         }
487         v.release()
488         return
489 }
490
491 type keyValue interface {
492         Key() []byte
493         Value() []byte
494 }
495
496 func testKeyVal(t *testing.T, kv keyValue, want string) {
497         res := string(kv.Key()) + "->" + string(kv.Value())
498         if res != want {
499                 t.Errorf("invalid key/value, want=%q, got=%q", want, res)
500         }
501 }
502
503 func numKey(num int) string {
504         return fmt.Sprintf("key%06d", num)
505 }
506
507 var testingBloomFilter = filter.NewBloomFilter(10)
508
509 func truno(t *testing.T, o *opt.Options, f func(h *dbHarness)) {
510         for i := 0; i < 4; i++ {
511                 func() {
512                         switch i {
513                         case 0:
514                         case 1:
515                                 if o == nil {
516                                         o = &opt.Options{
517                                                 DisableLargeBatchTransaction: true,
518                                                 Filter: testingBloomFilter,
519                                         }
520                                 } else {
521                                         old := o
522                                         o = &opt.Options{}
523                                         *o = *old
524                                         o.Filter = testingBloomFilter
525                                 }
526                         case 2:
527                                 if o == nil {
528                                         o = &opt.Options{
529                                                 DisableLargeBatchTransaction: true,
530                                                 Compression:                  opt.NoCompression,
531                                         }
532                                 } else {
533                                         old := o
534                                         o = &opt.Options{}
535                                         *o = *old
536                                         o.Compression = opt.NoCompression
537                                 }
538                         }
539                         h := newDbHarnessWopt(t, o)
540                         defer h.close()
541                         switch i {
542                         case 3:
543                                 h.reopenDB()
544                         }
545                         f(h)
546                 }()
547         }
548 }
549
550 func trun(t *testing.T, f func(h *dbHarness)) {
551         truno(t, nil, f)
552 }
553
554 func testAligned(t *testing.T, name string, offset uintptr) {
555         if offset%8 != 0 {
556                 t.Errorf("field %s offset is not 64-bit aligned", name)
557         }
558 }
559
560 func Test_FieldsAligned(t *testing.T) {
561         p1 := new(DB)
562         testAligned(t, "DB.seq", unsafe.Offsetof(p1.seq))
563         p2 := new(session)
564         testAligned(t, "session.stNextFileNum", unsafe.Offsetof(p2.stNextFileNum))
565         testAligned(t, "session.stJournalNum", unsafe.Offsetof(p2.stJournalNum))
566         testAligned(t, "session.stPrevJournalNum", unsafe.Offsetof(p2.stPrevJournalNum))
567         testAligned(t, "session.stSeqNum", unsafe.Offsetof(p2.stSeqNum))
568 }
569
570 func TestDB_Locking(t *testing.T) {
571         h := newDbHarness(t)
572         defer h.stor.Close()
573         h.openAssert(false)
574         h.closeDB()
575         h.openAssert(true)
576 }
577
578 func TestDB_Empty(t *testing.T) {
579         trun(t, func(h *dbHarness) {
580                 h.get("foo", false)
581
582                 h.reopenDB()
583                 h.get("foo", false)
584         })
585 }
586
587 func TestDB_ReadWrite(t *testing.T) {
588         trun(t, func(h *dbHarness) {
589                 h.put("foo", "v1")
590                 h.getVal("foo", "v1")
591                 h.put("bar", "v2")
592                 h.put("foo", "v3")
593                 h.getVal("foo", "v3")
594                 h.getVal("bar", "v2")
595
596                 h.reopenDB()
597                 h.getVal("foo", "v3")
598                 h.getVal("bar", "v2")
599         })
600 }
601
602 func TestDB_PutDeleteGet(t *testing.T) {
603         trun(t, func(h *dbHarness) {
604                 h.put("foo", "v1")
605                 h.getVal("foo", "v1")
606                 h.put("foo", "v2")
607                 h.getVal("foo", "v2")
608                 h.delete("foo")
609                 h.get("foo", false)
610
611                 h.reopenDB()
612                 h.get("foo", false)
613         })
614 }
615
616 func TestDB_EmptyBatch(t *testing.T) {
617         h := newDbHarness(t)
618         defer h.close()
619
620         h.get("foo", false)
621         err := h.db.Write(new(Batch), h.wo)
622         if err != nil {
623                 t.Error("writing empty batch yield error: ", err)
624         }
625         h.get("foo", false)
626 }
627
628 func TestDB_GetFromFrozen(t *testing.T) {
629         h := newDbHarnessWopt(t, &opt.Options{
630                 DisableLargeBatchTransaction: true,
631                 WriteBuffer:                  100100,
632         })
633         defer h.close()
634
635         h.put("foo", "v1")
636         h.getVal("foo", "v1")
637
638         h.stor.Stall(testutil.ModeSync, storage.TypeTable) // Block sync calls
639         h.put("k1", strings.Repeat("x", 100000))           // Fill memtable
640         h.put("k2", strings.Repeat("y", 100000))           // Trigger compaction
641         for i := 0; h.db.getFrozenMem() == nil && i < 100; i++ {
642                 time.Sleep(10 * time.Microsecond)
643         }
644         if h.db.getFrozenMem() == nil {
645                 h.stor.Release(testutil.ModeSync, storage.TypeTable)
646                 t.Fatal("No frozen mem")
647         }
648         h.getVal("foo", "v1")
649         h.stor.Release(testutil.ModeSync, storage.TypeTable) // Release sync calls
650
651         h.reopenDB()
652         h.getVal("foo", "v1")
653         h.get("k1", true)
654         h.get("k2", true)
655 }
656
657 func TestDB_GetFromTable(t *testing.T) {
658         trun(t, func(h *dbHarness) {
659                 h.put("foo", "v1")
660                 h.compactMem()
661                 h.getVal("foo", "v1")
662         })
663 }
664
665 func TestDB_GetSnapshot(t *testing.T) {
666         trun(t, func(h *dbHarness) {
667                 bar := strings.Repeat("b", 200)
668                 h.put("foo", "v1")
669                 h.put(bar, "v1")
670
671                 snap, err := h.db.GetSnapshot()
672                 if err != nil {
673                         t.Fatal("GetSnapshot: got error: ", err)
674                 }
675
676                 h.put("foo", "v2")
677                 h.put(bar, "v2")
678
679                 h.getVal("foo", "v2")
680                 h.getVal(bar, "v2")
681                 h.getValr(snap, "foo", "v1")
682                 h.getValr(snap, bar, "v1")
683
684                 h.compactMem()
685
686                 h.getVal("foo", "v2")
687                 h.getVal(bar, "v2")
688                 h.getValr(snap, "foo", "v1")
689                 h.getValr(snap, bar, "v1")
690
691                 snap.Release()
692
693                 h.reopenDB()
694                 h.getVal("foo", "v2")
695                 h.getVal(bar, "v2")
696         })
697 }
698
699 func TestDB_GetLevel0Ordering(t *testing.T) {
700         trun(t, func(h *dbHarness) {
701                 h.db.memdbMaxLevel = 2
702
703                 for i := 0; i < 4; i++ {
704                         h.put("bar", fmt.Sprintf("b%d", i))
705                         h.put("foo", fmt.Sprintf("v%d", i))
706                         h.compactMem()
707                 }
708                 h.getVal("foo", "v3")
709                 h.getVal("bar", "b3")
710
711                 v := h.db.s.version()
712                 t0len := v.tLen(0)
713                 v.release()
714                 if t0len < 2 {
715                         t.Errorf("level-0 tables is less than 2, got %d", t0len)
716                 }
717
718                 h.reopenDB()
719                 h.getVal("foo", "v3")
720                 h.getVal("bar", "b3")
721         })
722 }
723
724 func TestDB_GetOrderedByLevels(t *testing.T) {
725         trun(t, func(h *dbHarness) {
726                 h.put("foo", "v1")
727                 h.compactMem()
728                 h.compactRange("a", "z")
729                 h.getVal("foo", "v1")
730                 h.put("foo", "v2")
731                 h.compactMem()
732                 h.getVal("foo", "v2")
733         })
734 }
735
736 func TestDB_GetPicksCorrectFile(t *testing.T) {
737         trun(t, func(h *dbHarness) {
738                 // Arrange to have multiple files in a non-level-0 level.
739                 h.put("a", "va")
740                 h.compactMem()
741                 h.compactRange("a", "b")
742                 h.put("x", "vx")
743                 h.compactMem()
744                 h.compactRange("x", "y")
745                 h.put("f", "vf")
746                 h.compactMem()
747                 h.compactRange("f", "g")
748
749                 h.getVal("a", "va")
750                 h.getVal("f", "vf")
751                 h.getVal("x", "vx")
752
753                 h.compactRange("", "")
754                 h.getVal("a", "va")
755                 h.getVal("f", "vf")
756                 h.getVal("x", "vx")
757         })
758 }
759
760 func TestDB_GetEncountersEmptyLevel(t *testing.T) {
761         trun(t, func(h *dbHarness) {
762                 h.db.memdbMaxLevel = 2
763
764                 // Arrange for the following to happen:
765                 //   * sstable A in level 0
766                 //   * nothing in level 1
767                 //   * sstable B in level 2
768                 // Then do enough Get() calls to arrange for an automatic compaction
769                 // of sstable A.  A bug would cause the compaction to be marked as
770                 // occuring at level 1 (instead of the correct level 0).
771
772                 // Step 1: First place sstables in levels 0 and 2
773                 for i := 0; ; i++ {
774                         if i >= 100 {
775                                 t.Fatal("could not fill levels-0 and level-2")
776                         }
777                         v := h.db.s.version()
778                         if v.tLen(0) > 0 && v.tLen(2) > 0 {
779                                 v.release()
780                                 break
781                         }
782                         v.release()
783                         h.put("a", "begin")
784                         h.put("z", "end")
785                         h.compactMem()
786
787                         h.getVal("a", "begin")
788                         h.getVal("z", "end")
789                 }
790
791                 // Step 2: clear level 1 if necessary.
792                 h.compactRangeAt(1, "", "")
793                 h.tablesPerLevel("1,0,1")
794
795                 h.getVal("a", "begin")
796                 h.getVal("z", "end")
797
798                 // Step 3: read a bunch of times
799                 for i := 0; i < 200; i++ {
800                         h.get("missing", false)
801                 }
802
803                 // Step 4: Wait for compaction to finish
804                 h.waitCompaction()
805
806                 v := h.db.s.version()
807                 if v.tLen(0) > 0 {
808                         t.Errorf("level-0 tables more than 0, got %d", v.tLen(0))
809                 }
810                 v.release()
811
812                 h.getVal("a", "begin")
813                 h.getVal("z", "end")
814         })
815 }
816
817 func TestDB_IterMultiWithDelete(t *testing.T) {
818         trun(t, func(h *dbHarness) {
819                 h.put("a", "va")
820                 h.put("b", "vb")
821                 h.put("c", "vc")
822                 h.delete("b")
823                 h.get("b", false)
824
825                 iter := h.db.NewIterator(nil, nil)
826                 iter.Seek([]byte("c"))
827                 testKeyVal(t, iter, "c->vc")
828                 iter.Prev()
829                 testKeyVal(t, iter, "a->va")
830                 iter.Release()
831
832                 h.compactMem()
833
834                 iter = h.db.NewIterator(nil, nil)
835                 iter.Seek([]byte("c"))
836                 testKeyVal(t, iter, "c->vc")
837                 iter.Prev()
838                 testKeyVal(t, iter, "a->va")
839                 iter.Release()
840         })
841 }
842
843 func TestDB_IteratorPinsRef(t *testing.T) {
844         h := newDbHarness(t)
845         defer h.close()
846
847         h.put("foo", "hello")
848
849         // Get iterator that will yield the current contents of the DB.
850         iter := h.db.NewIterator(nil, nil)
851
852         // Write to force compactions
853         h.put("foo", "newvalue1")
854         for i := 0; i < 100; i++ {
855                 h.put(numKey(i), strings.Repeat(fmt.Sprintf("v%09d", i), 100000/10))
856         }
857         h.put("foo", "newvalue2")
858
859         iter.First()
860         testKeyVal(t, iter, "foo->hello")
861         if iter.Next() {
862                 t.Errorf("expect eof")
863         }
864         iter.Release()
865 }
866
867 func TestDB_Recover(t *testing.T) {
868         trun(t, func(h *dbHarness) {
869                 h.put("foo", "v1")
870                 h.put("baz", "v5")
871
872                 h.reopenDB()
873                 h.getVal("foo", "v1")
874
875                 h.getVal("foo", "v1")
876                 h.getVal("baz", "v5")
877                 h.put("bar", "v2")
878                 h.put("foo", "v3")
879
880                 h.reopenDB()
881                 h.getVal("foo", "v3")
882                 h.put("foo", "v4")
883                 h.getVal("foo", "v4")
884                 h.getVal("bar", "v2")
885                 h.getVal("baz", "v5")
886         })
887 }
888
889 func TestDB_RecoverWithEmptyJournal(t *testing.T) {
890         trun(t, func(h *dbHarness) {
891                 h.put("foo", "v1")
892                 h.put("foo", "v2")
893
894                 h.reopenDB()
895                 h.reopenDB()
896                 h.put("foo", "v3")
897
898                 h.reopenDB()
899                 h.getVal("foo", "v3")
900         })
901 }
902
903 func TestDB_RecoverDuringMemtableCompaction(t *testing.T) {
904         truno(t, &opt.Options{DisableLargeBatchTransaction: true, WriteBuffer: 1000000}, func(h *dbHarness) {
905
906                 h.stor.Stall(testutil.ModeSync, storage.TypeTable)
907                 h.put("big1", strings.Repeat("x", 10000000))
908                 h.put("big2", strings.Repeat("y", 1000))
909                 h.put("bar", "v2")
910                 h.stor.Release(testutil.ModeSync, storage.TypeTable)
911
912                 h.reopenDB()
913                 h.getVal("bar", "v2")
914                 h.getVal("big1", strings.Repeat("x", 10000000))
915                 h.getVal("big2", strings.Repeat("y", 1000))
916         })
917 }
918
919 func TestDB_MinorCompactionsHappen(t *testing.T) {
920         h := newDbHarnessWopt(t, &opt.Options{DisableLargeBatchTransaction: true, WriteBuffer: 10000})
921         defer h.close()
922
923         n := 500
924
925         key := func(i int) string {
926                 return fmt.Sprintf("key%06d", i)
927         }
928
929         for i := 0; i < n; i++ {
930                 h.put(key(i), key(i)+strings.Repeat("v", 1000))
931         }
932
933         for i := 0; i < n; i++ {
934                 h.getVal(key(i), key(i)+strings.Repeat("v", 1000))
935         }
936
937         h.reopenDB()
938         for i := 0; i < n; i++ {
939                 h.getVal(key(i), key(i)+strings.Repeat("v", 1000))
940         }
941 }
942
943 func TestDB_RecoverWithLargeJournal(t *testing.T) {
944         h := newDbHarness(t)
945         defer h.close()
946
947         h.put("big1", strings.Repeat("1", 200000))
948         h.put("big2", strings.Repeat("2", 200000))
949         h.put("small3", strings.Repeat("3", 10))
950         h.put("small4", strings.Repeat("4", 10))
951         h.tablesPerLevel("")
952
953         // Make sure that if we re-open with a small write buffer size that
954         // we flush table files in the middle of a large journal file.
955         h.o.WriteBuffer = 100000
956         h.reopenDB()
957         h.getVal("big1", strings.Repeat("1", 200000))
958         h.getVal("big2", strings.Repeat("2", 200000))
959         h.getVal("small3", strings.Repeat("3", 10))
960         h.getVal("small4", strings.Repeat("4", 10))
961         v := h.db.s.version()
962         if v.tLen(0) <= 1 {
963                 t.Errorf("tables-0 less than one")
964         }
965         v.release()
966 }
967
968 func TestDB_CompactionsGenerateMultipleFiles(t *testing.T) {
969         h := newDbHarnessWopt(t, &opt.Options{
970                 DisableLargeBatchTransaction: true,
971                 WriteBuffer:                  10000000,
972                 Compression:                  opt.NoCompression,
973         })
974         defer h.close()
975
976         v := h.db.s.version()
977         if v.tLen(0) > 0 {
978                 t.Errorf("level-0 tables more than 0, got %d", v.tLen(0))
979         }
980         v.release()
981
982         n := 80
983
984         // Write 8MB (80 values, each 100K)
985         for i := 0; i < n; i++ {
986                 h.put(numKey(i), strings.Repeat(fmt.Sprintf("v%09d", i), 100000/10))
987         }
988
989         // Reopening moves updates to level-0
990         h.reopenDB()
991         h.compactRangeAt(0, "", "")
992
993         v = h.db.s.version()
994         if v.tLen(0) > 0 {
995                 t.Errorf("level-0 tables more than 0, got %d", v.tLen(0))
996         }
997         if v.tLen(1) <= 1 {
998                 t.Errorf("level-1 tables less than 1, got %d", v.tLen(1))
999         }
1000         v.release()
1001
1002         for i := 0; i < n; i++ {
1003                 h.getVal(numKey(i), strings.Repeat(fmt.Sprintf("v%09d", i), 100000/10))
1004         }
1005 }
1006
1007 func TestDB_RepeatedWritesToSameKey(t *testing.T) {
1008         h := newDbHarnessWopt(t, &opt.Options{DisableLargeBatchTransaction: true, WriteBuffer: 100000})
1009         defer h.close()
1010
1011         maxTables := h.o.GetWriteL0PauseTrigger() + 7
1012
1013         value := strings.Repeat("v", 2*h.o.GetWriteBuffer())
1014         for i := 0; i < 5*maxTables; i++ {
1015                 h.put("key", value)
1016                 n := h.totalTables()
1017                 if n > maxTables {
1018                         t.Errorf("total tables exceed %d, got=%d, iter=%d", maxTables, n, i)
1019                 }
1020         }
1021 }
1022
1023 func TestDB_RepeatedWritesToSameKeyAfterReopen(t *testing.T) {
1024         h := newDbHarnessWopt(t, &opt.Options{
1025                 DisableLargeBatchTransaction: true,
1026                 WriteBuffer:                  100000,
1027         })
1028         defer h.close()
1029
1030         h.reopenDB()
1031
1032         maxTables := h.o.GetWriteL0PauseTrigger() + 7
1033
1034         value := strings.Repeat("v", 2*h.o.GetWriteBuffer())
1035         for i := 0; i < 5*maxTables; i++ {
1036                 h.put("key", value)
1037                 n := h.totalTables()
1038                 if n > maxTables {
1039                         t.Errorf("total tables exceed %d, got=%d, iter=%d", maxTables, n, i)
1040                 }
1041         }
1042 }
1043
1044 func TestDB_SparseMerge(t *testing.T) {
1045         h := newDbHarnessWopt(t, &opt.Options{DisableLargeBatchTransaction: true, Compression: opt.NoCompression})
1046         defer h.close()
1047
1048         h.putMulti(7, "A", "Z")
1049
1050         // Suppose there is:
1051         //    small amount of data with prefix A
1052         //    large amount of data with prefix B
1053         //    small amount of data with prefix C
1054         // and that recent updates have made small changes to all three prefixes.
1055         // Check that we do not do a compaction that merges all of B in one shot.
1056         h.put("A", "va")
1057         value := strings.Repeat("x", 1000)
1058         for i := 0; i < 100000; i++ {
1059                 h.put(fmt.Sprintf("B%010d", i), value)
1060         }
1061         h.put("C", "vc")
1062         h.compactMem()
1063         h.compactRangeAt(0, "", "")
1064         h.waitCompaction()
1065
1066         // Make sparse update
1067         h.put("A", "va2")
1068         h.put("B100", "bvalue2")
1069         h.put("C", "vc2")
1070         h.compactMem()
1071
1072         h.waitCompaction()
1073         h.maxNextLevelOverlappingBytes(20 * 1048576)
1074         h.compactRangeAt(0, "", "")
1075         h.waitCompaction()
1076         h.maxNextLevelOverlappingBytes(20 * 1048576)
1077         h.compactRangeAt(1, "", "")
1078         h.waitCompaction()
1079         h.maxNextLevelOverlappingBytes(20 * 1048576)
1080 }
1081
1082 func TestDB_SizeOf(t *testing.T) {
1083         h := newDbHarnessWopt(t, &opt.Options{
1084                 DisableLargeBatchTransaction: true,
1085                 Compression:                  opt.NoCompression,
1086                 WriteBuffer:                  10000000,
1087         })
1088         defer h.close()
1089
1090         h.sizeAssert("", "xyz", 0, 0)
1091         h.reopenDB()
1092         h.sizeAssert("", "xyz", 0, 0)
1093
1094         // Write 8MB (80 values, each 100K)
1095         n := 80
1096         s1 := 100000
1097         s2 := 105000
1098
1099         for i := 0; i < n; i++ {
1100                 h.put(numKey(i), strings.Repeat(fmt.Sprintf("v%09d", i), s1/10))
1101         }
1102
1103         // 0 because SizeOf() does not account for memtable space
1104         h.sizeAssert("", numKey(50), 0, 0)
1105
1106         for r := 0; r < 3; r++ {
1107                 h.reopenDB()
1108
1109                 for cs := 0; cs < n; cs += 10 {
1110                         for i := 0; i < n; i += 10 {
1111                                 h.sizeAssert("", numKey(i), int64(s1*i), int64(s2*i))
1112                                 h.sizeAssert("", numKey(i)+".suffix", int64(s1*(i+1)), int64(s2*(i+1)))
1113                                 h.sizeAssert(numKey(i), numKey(i+10), int64(s1*10), int64(s2*10))
1114                         }
1115
1116                         h.sizeAssert("", numKey(50), int64(s1*50), int64(s2*50))
1117                         h.sizeAssert("", numKey(50)+".suffix", int64(s1*50), int64(s2*50))
1118
1119                         h.compactRangeAt(0, numKey(cs), numKey(cs+9))
1120                 }
1121
1122                 v := h.db.s.version()
1123                 if v.tLen(0) != 0 {
1124                         t.Errorf("level-0 tables was not zero, got %d", v.tLen(0))
1125                 }
1126                 if v.tLen(1) == 0 {
1127                         t.Error("level-1 tables was zero")
1128                 }
1129                 v.release()
1130         }
1131 }
1132
1133 func TestDB_SizeOf_MixOfSmallAndLarge(t *testing.T) {
1134         h := newDbHarnessWopt(t, &opt.Options{
1135                 DisableLargeBatchTransaction: true,
1136                 Compression:                  opt.NoCompression,
1137         })
1138         defer h.close()
1139
1140         sizes := []int64{
1141                 10000,
1142                 10000,
1143                 100000,
1144                 10000,
1145                 100000,
1146                 10000,
1147                 300000,
1148                 10000,
1149         }
1150
1151         for i, n := range sizes {
1152                 h.put(numKey(i), strings.Repeat(fmt.Sprintf("v%09d", i), int(n)/10))
1153         }
1154
1155         for r := 0; r < 3; r++ {
1156                 h.reopenDB()
1157
1158                 var x int64
1159                 for i, n := range sizes {
1160                         y := x
1161                         if i > 0 {
1162                                 y += 1000
1163                         }
1164                         h.sizeAssert("", numKey(i), x, y)
1165                         x += n
1166                 }
1167
1168                 h.sizeAssert(numKey(3), numKey(5), 110000, 111000)
1169
1170                 h.compactRangeAt(0, "", "")
1171         }
1172 }
1173
1174 func TestDB_Snapshot(t *testing.T) {
1175         trun(t, func(h *dbHarness) {
1176                 h.put("foo", "v1")
1177                 s1 := h.getSnapshot()
1178                 h.put("foo", "v2")
1179                 s2 := h.getSnapshot()
1180                 h.put("foo", "v3")
1181                 s3 := h.getSnapshot()
1182                 h.put("foo", "v4")
1183
1184                 h.getValr(s1, "foo", "v1")
1185                 h.getValr(s2, "foo", "v2")
1186                 h.getValr(s3, "foo", "v3")
1187                 h.getVal("foo", "v4")
1188
1189                 s3.Release()
1190                 h.getValr(s1, "foo", "v1")
1191                 h.getValr(s2, "foo", "v2")
1192                 h.getVal("foo", "v4")
1193
1194                 s1.Release()
1195                 h.getValr(s2, "foo", "v2")
1196                 h.getVal("foo", "v4")
1197
1198                 s2.Release()
1199                 h.getVal("foo", "v4")
1200         })
1201 }
1202
1203 func TestDB_SnapshotList(t *testing.T) {
1204         db := &DB{snapsList: list.New()}
1205         e0a := db.acquireSnapshot()
1206         e0b := db.acquireSnapshot()
1207         db.seq = 1
1208         e1 := db.acquireSnapshot()
1209         db.seq = 2
1210         e2 := db.acquireSnapshot()
1211
1212         if db.minSeq() != 0 {
1213                 t.Fatalf("invalid sequence number, got=%d", db.minSeq())
1214         }
1215         db.releaseSnapshot(e0a)
1216         if db.minSeq() != 0 {
1217                 t.Fatalf("invalid sequence number, got=%d", db.minSeq())
1218         }
1219         db.releaseSnapshot(e2)
1220         if db.minSeq() != 0 {
1221                 t.Fatalf("invalid sequence number, got=%d", db.minSeq())
1222         }
1223         db.releaseSnapshot(e0b)
1224         if db.minSeq() != 1 {
1225                 t.Fatalf("invalid sequence number, got=%d", db.minSeq())
1226         }
1227         e2 = db.acquireSnapshot()
1228         if db.minSeq() != 1 {
1229                 t.Fatalf("invalid sequence number, got=%d", db.minSeq())
1230         }
1231         db.releaseSnapshot(e1)
1232         if db.minSeq() != 2 {
1233                 t.Fatalf("invalid sequence number, got=%d", db.minSeq())
1234         }
1235         db.releaseSnapshot(e2)
1236         if db.minSeq() != 2 {
1237                 t.Fatalf("invalid sequence number, got=%d", db.minSeq())
1238         }
1239 }
1240
1241 func TestDB_HiddenValuesAreRemoved(t *testing.T) {
1242         trun(t, func(h *dbHarness) {
1243                 s := h.db.s
1244
1245                 m := 2
1246                 h.db.memdbMaxLevel = m
1247
1248                 h.put("foo", "v1")
1249                 h.compactMem()
1250                 v := s.version()
1251                 num := v.tLen(m)
1252                 v.release()
1253                 if num != 1 {
1254                         t.Errorf("invalid level-%d len, want=1 got=%d", m, num)
1255                 }
1256
1257                 // Place a table at level last-1 to prevent merging with preceding mutation
1258                 h.put("a", "begin")
1259                 h.put("z", "end")
1260                 h.compactMem()
1261                 v = s.version()
1262                 if v.tLen(m) != 1 {
1263                         t.Errorf("invalid level-%d len, want=1 got=%d", m, v.tLen(m))
1264                 }
1265                 if v.tLen(m-1) != 1 {
1266                         t.Errorf("invalid level-%d len, want=1 got=%d", m-1, v.tLen(m-1))
1267                 }
1268                 v.release()
1269
1270                 h.delete("foo")
1271                 h.put("foo", "v2")
1272                 h.allEntriesFor("foo", "[ v2, DEL, v1 ]")
1273                 h.compactMem()
1274                 h.allEntriesFor("foo", "[ v2, DEL, v1 ]")
1275                 h.compactRangeAt(m-2, "", "z")
1276                 // DEL eliminated, but v1 remains because we aren't compacting that level
1277                 // (DEL can be eliminated because v2 hides v1).
1278                 h.allEntriesFor("foo", "[ v2, v1 ]")
1279                 h.compactRangeAt(m-1, "", "")
1280                 // Merging last-1 w/ last, so we are the base level for "foo", so
1281                 // DEL is removed.  (as is v1).
1282                 h.allEntriesFor("foo", "[ v2 ]")
1283         })
1284 }
1285
1286 func TestDB_DeletionMarkers2(t *testing.T) {
1287         h := newDbHarness(t)
1288         defer h.close()
1289         s := h.db.s
1290
1291         m := 2
1292         h.db.memdbMaxLevel = m
1293
1294         h.put("foo", "v1")
1295         h.compactMem()
1296         v := s.version()
1297         num := v.tLen(m)
1298         v.release()
1299         if num != 1 {
1300                 t.Errorf("invalid level-%d len, want=1 got=%d", m, num)
1301         }
1302
1303         // Place a table at level last-1 to prevent merging with preceding mutation
1304         h.put("a", "begin")
1305         h.put("z", "end")
1306         h.compactMem()
1307         v = s.version()
1308         if v.tLen(m) != 1 {
1309                 t.Errorf("invalid level-%d len, want=1 got=%d", m, v.tLen(m))
1310         }
1311         if v.tLen(m-1) != 1 {
1312                 t.Errorf("invalid level-%d len, want=1 got=%d", m-1, v.tLen(m-1))
1313         }
1314         v.release()
1315
1316         h.delete("foo")
1317         h.allEntriesFor("foo", "[ DEL, v1 ]")
1318         h.compactMem() // Moves to level last-2
1319         h.allEntriesFor("foo", "[ DEL, v1 ]")
1320         h.compactRangeAt(m-2, "", "")
1321         // DEL kept: "last" file overlaps
1322         h.allEntriesFor("foo", "[ DEL, v1 ]")
1323         h.compactRangeAt(m-1, "", "")
1324         // Merging last-1 w/ last, so we are the base level for "foo", so
1325         // DEL is removed.  (as is v1).
1326         h.allEntriesFor("foo", "[ ]")
1327 }
1328
1329 func TestDB_CompactionTableOpenError(t *testing.T) {
1330         h := newDbHarnessWopt(t, &opt.Options{
1331                 DisableLargeBatchTransaction: true,
1332                 OpenFilesCacheCapacity:       -1,
1333         })
1334         defer h.close()
1335
1336         h.db.memdbMaxLevel = 2
1337
1338         im := 10
1339         jm := 10
1340         for r := 0; r < 2; r++ {
1341                 for i := 0; i < im; i++ {
1342                         for j := 0; j < jm; j++ {
1343                                 h.put(fmt.Sprintf("k%d,%d", i, j), fmt.Sprintf("v%d,%d", i, j))
1344                         }
1345                         h.compactMem()
1346                 }
1347         }
1348
1349         if n := h.totalTables(); n != im*2 {
1350                 t.Errorf("total tables is %d, want %d", n, im*2)
1351         }
1352
1353         h.stor.EmulateError(testutil.ModeOpen, storage.TypeTable, errors.New("open error during table compaction"))
1354         go h.db.CompactRange(util.Range{})
1355         if err := h.db.compTriggerWait(h.db.tcompCmdC); err != nil {
1356                 t.Log("compaction error: ", err)
1357         }
1358         h.closeDB0()
1359         h.openDB()
1360         h.stor.EmulateError(testutil.ModeOpen, storage.TypeTable, nil)
1361
1362         for i := 0; i < im; i++ {
1363                 for j := 0; j < jm; j++ {
1364                         h.getVal(fmt.Sprintf("k%d,%d", i, j), fmt.Sprintf("v%d,%d", i, j))
1365                 }
1366         }
1367 }
1368
1369 func TestDB_OverlapInLevel0(t *testing.T) {
1370         trun(t, func(h *dbHarness) {
1371                 h.db.memdbMaxLevel = 2
1372
1373                 // Fill levels 1 and 2 to disable the pushing of new memtables to levels > 0.
1374                 h.put("100", "v100")
1375                 h.put("999", "v999")
1376                 h.compactMem()
1377                 h.delete("100")
1378                 h.delete("999")
1379                 h.compactMem()
1380                 h.tablesPerLevel("0,1,1")
1381
1382                 // Make files spanning the following ranges in level-0:
1383                 //  files[0]  200 .. 900
1384                 //  files[1]  300 .. 500
1385                 // Note that files are sorted by min key.
1386                 h.put("300", "v300")
1387                 h.put("500", "v500")
1388                 h.compactMem()
1389                 h.put("200", "v200")
1390                 h.put("600", "v600")
1391                 h.put("900", "v900")
1392                 h.compactMem()
1393                 h.tablesPerLevel("2,1,1")
1394
1395                 // Compact away the placeholder files we created initially
1396                 h.compactRangeAt(1, "", "")
1397                 h.compactRangeAt(2, "", "")
1398                 h.tablesPerLevel("2")
1399
1400                 // Do a memtable compaction.  Before bug-fix, the compaction would
1401                 // not detect the overlap with level-0 files and would incorrectly place
1402                 // the deletion in a deeper level.
1403                 h.delete("600")
1404                 h.compactMem()
1405                 h.tablesPerLevel("3")
1406                 h.get("600", false)
1407         })
1408 }
1409
1410 func TestDB_L0_CompactionBug_Issue44_a(t *testing.T) {
1411         h := newDbHarness(t)
1412         defer h.close()
1413
1414         h.reopenDB()
1415         h.put("b", "v")
1416         h.reopenDB()
1417         h.delete("b")
1418         h.delete("a")
1419         h.reopenDB()
1420         h.delete("a")
1421         h.reopenDB()
1422         h.put("a", "v")
1423         h.reopenDB()
1424         h.reopenDB()
1425         h.getKeyVal("(a->v)")
1426         h.waitCompaction()
1427         h.getKeyVal("(a->v)")
1428 }
1429
1430 func TestDB_L0_CompactionBug_Issue44_b(t *testing.T) {
1431         h := newDbHarness(t)
1432         defer h.close()
1433
1434         h.reopenDB()
1435         h.put("", "")
1436         h.reopenDB()
1437         h.delete("e")
1438         h.put("", "")
1439         h.reopenDB()
1440         h.put("c", "cv")
1441         h.reopenDB()
1442         h.put("", "")
1443         h.reopenDB()
1444         h.put("", "")
1445         h.waitCompaction()
1446         h.reopenDB()
1447         h.put("d", "dv")
1448         h.reopenDB()
1449         h.put("", "")
1450         h.reopenDB()
1451         h.delete("d")
1452         h.delete("b")
1453         h.reopenDB()
1454         h.getKeyVal("(->)(c->cv)")
1455         h.waitCompaction()
1456         h.getKeyVal("(->)(c->cv)")
1457 }
1458
1459 func TestDB_SingleEntryMemCompaction(t *testing.T) {
1460         trun(t, func(h *dbHarness) {
1461                 for i := 0; i < 10; i++ {
1462                         h.put("big", strings.Repeat("v", opt.DefaultWriteBuffer))
1463                         h.compactMem()
1464                         h.put("key", strings.Repeat("v", opt.DefaultBlockSize))
1465                         h.compactMem()
1466                         h.put("k", "v")
1467                         h.compactMem()
1468                         h.put("", "")
1469                         h.compactMem()
1470                         h.put("verybig", strings.Repeat("v", opt.DefaultWriteBuffer*2))
1471                         h.compactMem()
1472                 }
1473         })
1474 }
1475
1476 func TestDB_ManifestWriteError(t *testing.T) {
1477         for i := 0; i < 2; i++ {
1478                 func() {
1479                         h := newDbHarness(t)
1480                         defer h.close()
1481
1482                         h.put("foo", "bar")
1483                         h.getVal("foo", "bar")
1484
1485                         // Mem compaction (will succeed)
1486                         h.compactMem()
1487                         h.getVal("foo", "bar")
1488                         v := h.db.s.version()
1489                         if n := v.tLen(0); n != 1 {
1490                                 t.Errorf("invalid total tables, want=1 got=%d", n)
1491                         }
1492                         v.release()
1493
1494                         if i == 0 {
1495                                 h.stor.EmulateError(testutil.ModeWrite, storage.TypeManifest, errors.New("manifest write error"))
1496                         } else {
1497                                 h.stor.EmulateError(testutil.ModeSync, storage.TypeManifest, errors.New("manifest sync error"))
1498                         }
1499
1500                         // Merging compaction (will fail)
1501                         h.compactRangeAtErr(0, "", "", true)
1502
1503                         h.db.Close()
1504                         h.stor.EmulateError(testutil.ModeWrite, storage.TypeManifest, nil)
1505                         h.stor.EmulateError(testutil.ModeSync, storage.TypeManifest, nil)
1506
1507                         // Should not lose data
1508                         h.openDB()
1509                         h.getVal("foo", "bar")
1510                 }()
1511         }
1512 }
1513
1514 func assertErr(t *testing.T, err error, wanterr bool) {
1515         if err != nil {
1516                 if wanterr {
1517                         t.Log("AssertErr: got error (expected): ", err)
1518                 } else {
1519                         t.Error("AssertErr: got error: ", err)
1520                 }
1521         } else if wanterr {
1522                 t.Error("AssertErr: expect error")
1523         }
1524 }
1525
1526 func TestDB_ClosedIsClosed(t *testing.T) {
1527         h := newDbHarness(t)
1528         db := h.db
1529
1530         var iter, iter2 iterator.Iterator
1531         var snap *Snapshot
1532         func() {
1533                 defer h.close()
1534
1535                 h.put("k", "v")
1536                 h.getVal("k", "v")
1537
1538                 iter = db.NewIterator(nil, h.ro)
1539                 iter.Seek([]byte("k"))
1540                 testKeyVal(t, iter, "k->v")
1541
1542                 var err error
1543                 snap, err = db.GetSnapshot()
1544                 if err != nil {
1545                         t.Fatal("GetSnapshot: got error: ", err)
1546                 }
1547
1548                 h.getValr(snap, "k", "v")
1549
1550                 iter2 = snap.NewIterator(nil, h.ro)
1551                 iter2.Seek([]byte("k"))
1552                 testKeyVal(t, iter2, "k->v")
1553
1554                 h.put("foo", "v2")
1555                 h.delete("foo")
1556
1557                 // closing DB
1558                 iter.Release()
1559                 iter2.Release()
1560         }()
1561
1562         assertErr(t, db.Put([]byte("x"), []byte("y"), h.wo), true)
1563         _, err := db.Get([]byte("k"), h.ro)
1564         assertErr(t, err, true)
1565
1566         if iter.Valid() {
1567                 t.Errorf("iter.Valid should false")
1568         }
1569         assertErr(t, iter.Error(), false)
1570         testKeyVal(t, iter, "->")
1571         if iter.Seek([]byte("k")) {
1572                 t.Errorf("iter.Seek should false")
1573         }
1574         assertErr(t, iter.Error(), true)
1575
1576         assertErr(t, iter2.Error(), false)
1577
1578         _, err = snap.Get([]byte("k"), h.ro)
1579         assertErr(t, err, true)
1580
1581         _, err = db.GetSnapshot()
1582         assertErr(t, err, true)
1583
1584         iter3 := db.NewIterator(nil, h.ro)
1585         assertErr(t, iter3.Error(), true)
1586
1587         iter3 = snap.NewIterator(nil, h.ro)
1588         assertErr(t, iter3.Error(), true)
1589
1590         assertErr(t, db.Delete([]byte("k"), h.wo), true)
1591
1592         _, err = db.GetProperty("leveldb.stats")
1593         assertErr(t, err, true)
1594
1595         _, err = db.SizeOf([]util.Range{{[]byte("a"), []byte("z")}})
1596         assertErr(t, err, true)
1597
1598         assertErr(t, db.CompactRange(util.Range{}), true)
1599
1600         assertErr(t, db.Close(), true)
1601 }
1602
1603 type numberComparer struct{}
1604
1605 func (numberComparer) num(x []byte) (n int) {
1606         fmt.Sscan(string(x[1:len(x)-1]), &n)
1607         return
1608 }
1609
1610 func (numberComparer) Name() string {
1611         return "test.NumberComparer"
1612 }
1613
1614 func (p numberComparer) Compare(a, b []byte) int {
1615         return p.num(a) - p.num(b)
1616 }
1617
1618 func (numberComparer) Separator(dst, a, b []byte) []byte { return nil }
1619 func (numberComparer) Successor(dst, b []byte) []byte    { return nil }
1620
1621 func TestDB_CustomComparer(t *testing.T) {
1622         h := newDbHarnessWopt(t, &opt.Options{
1623                 DisableLargeBatchTransaction: true,
1624                 Comparer:                     numberComparer{},
1625                 WriteBuffer:                  1000,
1626         })
1627         defer h.close()
1628
1629         h.put("[10]", "ten")
1630         h.put("[0x14]", "twenty")
1631         for i := 0; i < 2; i++ {
1632                 h.getVal("[10]", "ten")
1633                 h.getVal("[0xa]", "ten")
1634                 h.getVal("[20]", "twenty")
1635                 h.getVal("[0x14]", "twenty")
1636                 h.get("[15]", false)
1637                 h.get("[0xf]", false)
1638                 h.compactMem()
1639                 h.compactRange("[0]", "[9999]")
1640         }
1641
1642         for n := 0; n < 2; n++ {
1643                 for i := 0; i < 100; i++ {
1644                         v := fmt.Sprintf("[%d]", i*10)
1645                         h.put(v, v)
1646                 }
1647                 h.compactMem()
1648                 h.compactRange("[0]", "[1000000]")
1649         }
1650 }
1651
1652 func TestDB_ManualCompaction(t *testing.T) {
1653         h := newDbHarness(t)
1654         defer h.close()
1655
1656         h.db.memdbMaxLevel = 2
1657
1658         h.putMulti(3, "p", "q")
1659         h.tablesPerLevel("1,1,1")
1660
1661         // Compaction range falls before files
1662         h.compactRange("", "c")
1663         h.tablesPerLevel("1,1,1")
1664
1665         // Compaction range falls after files
1666         h.compactRange("r", "z")
1667         h.tablesPerLevel("1,1,1")
1668
1669         // Compaction range overlaps files
1670         h.compactRange("p1", "p9")
1671         h.tablesPerLevel("0,0,1")
1672
1673         // Populate a different range
1674         h.putMulti(3, "c", "e")
1675         h.tablesPerLevel("1,1,2")
1676
1677         // Compact just the new range
1678         h.compactRange("b", "f")
1679         h.tablesPerLevel("0,0,2")
1680
1681         // Compact all
1682         h.putMulti(1, "a", "z")
1683         h.tablesPerLevel("0,1,2")
1684         h.compactRange("", "")
1685         h.tablesPerLevel("0,0,1")
1686 }
1687
1688 func TestDB_BloomFilter(t *testing.T) {
1689         h := newDbHarnessWopt(t, &opt.Options{
1690                 DisableLargeBatchTransaction: true,
1691                 DisableBlockCache:            true,
1692                 Filter:                       filter.NewBloomFilter(10),
1693         })
1694         defer h.close()
1695
1696         key := func(i int) string {
1697                 return fmt.Sprintf("key%06d", i)
1698         }
1699
1700         const n = 10000
1701
1702         // Populate multiple layers
1703         for i := 0; i < n; i++ {
1704                 h.put(key(i), key(i))
1705         }
1706         h.compactMem()
1707         h.compactRange("a", "z")
1708         for i := 0; i < n; i += 100 {
1709                 h.put(key(i), key(i))
1710         }
1711         h.compactMem()
1712
1713         // Prevent auto compactions triggered by seeks
1714         h.stor.Stall(testutil.ModeSync, storage.TypeTable)
1715
1716         // Lookup present keys. Should rarely read from small sstable.
1717         h.stor.ResetCounter(testutil.ModeRead, storage.TypeTable)
1718         for i := 0; i < n; i++ {
1719                 h.getVal(key(i), key(i))
1720         }
1721         cnt, _ := h.stor.Counter(testutil.ModeRead, storage.TypeTable)
1722         t.Logf("lookup of %d present keys yield %d sstable I/O reads", n, cnt)
1723         if min, max := n, n+2*n/100; cnt < min || cnt > max {
1724                 t.Errorf("num of sstable I/O reads of present keys not in range of %d - %d, got %d", min, max, cnt)
1725         }
1726
1727         // Lookup missing keys. Should rarely read from either sstable.
1728         h.stor.ResetCounter(testutil.ModeRead, storage.TypeTable)
1729         for i := 0; i < n; i++ {
1730                 h.get(key(i)+".missing", false)
1731         }
1732         cnt, _ = h.stor.Counter(testutil.ModeRead, storage.TypeTable)
1733         t.Logf("lookup of %d missing keys yield %d sstable I/O reads", n, cnt)
1734         if max := 3 * n / 100; cnt > max {
1735                 t.Errorf("num of sstable I/O reads of missing keys was more than %d, got %d", max, cnt)
1736         }
1737
1738         h.stor.Release(testutil.ModeSync, storage.TypeTable)
1739 }
1740
1741 func TestDB_Concurrent(t *testing.T) {
1742         const n, secs, maxkey = 4, 6, 1000
1743         h := newDbHarness(t)
1744         defer h.close()
1745
1746         runtime.GOMAXPROCS(runtime.NumCPU())
1747
1748         var (
1749                 closeWg sync.WaitGroup
1750                 stop    uint32
1751                 cnt     [n]uint32
1752         )
1753
1754         for i := 0; i < n; i++ {
1755                 closeWg.Add(1)
1756                 go func(i int) {
1757                         var put, get, found uint
1758                         defer func() {
1759                                 t.Logf("goroutine %d stopped after %d ops, put=%d get=%d found=%d missing=%d",
1760                                         i, cnt[i], put, get, found, get-found)
1761                                 closeWg.Done()
1762                         }()
1763
1764                         rnd := rand.New(rand.NewSource(int64(1000 + i)))
1765                         for atomic.LoadUint32(&stop) == 0 {
1766                                 x := cnt[i]
1767
1768                                 k := rnd.Intn(maxkey)
1769                                 kstr := fmt.Sprintf("%016d", k)
1770
1771                                 if (rnd.Int() % 2) > 0 {
1772                                         put++
1773                                         h.put(kstr, fmt.Sprintf("%d.%d.%-1000d", k, i, x))
1774                                 } else {
1775                                         get++
1776                                         v, err := h.db.Get([]byte(kstr), h.ro)
1777                                         if err == nil {
1778                                                 found++
1779                                                 rk, ri, rx := 0, -1, uint32(0)
1780                                                 fmt.Sscanf(string(v), "%d.%d.%d", &rk, &ri, &rx)
1781                                                 if rk != k {
1782                                                         t.Errorf("invalid key want=%d got=%d", k, rk)
1783                                                 }
1784                                                 if ri < 0 || ri >= n {
1785                                                         t.Error("invalid goroutine number: ", ri)
1786                                                 } else {
1787                                                         tx := atomic.LoadUint32(&(cnt[ri]))
1788                                                         if rx > tx {
1789                                                                 t.Errorf("invalid seq number, %d > %d ", rx, tx)
1790                                                         }
1791                                                 }
1792                                         } else if err != ErrNotFound {
1793                                                 t.Error("Get: got error: ", err)
1794                                                 return
1795                                         }
1796                                 }
1797                                 atomic.AddUint32(&cnt[i], 1)
1798                         }
1799                 }(i)
1800         }
1801
1802         time.Sleep(secs * time.Second)
1803         atomic.StoreUint32(&stop, 1)
1804         closeWg.Wait()
1805 }
1806
1807 func TestDB_ConcurrentIterator(t *testing.T) {
1808         const n, n2 = 4, 1000
1809         h := newDbHarnessWopt(t, &opt.Options{DisableLargeBatchTransaction: true, WriteBuffer: 30})
1810         defer h.close()
1811
1812         runtime.GOMAXPROCS(runtime.NumCPU())
1813
1814         var (
1815                 closeWg sync.WaitGroup
1816                 stop    uint32
1817         )
1818
1819         for i := 0; i < n; i++ {
1820                 closeWg.Add(1)
1821                 go func(i int) {
1822                         for k := 0; atomic.LoadUint32(&stop) == 0; k++ {
1823                                 h.put(fmt.Sprintf("k%d", k), fmt.Sprintf("%d.%d.", k, i)+strings.Repeat("x", 10))
1824                         }
1825                         closeWg.Done()
1826                 }(i)
1827         }
1828
1829         for i := 0; i < n; i++ {
1830                 closeWg.Add(1)
1831                 go func(i int) {
1832                         for k := 1000000; k < 0 || atomic.LoadUint32(&stop) == 0; k-- {
1833                                 h.put(fmt.Sprintf("k%d", k), fmt.Sprintf("%d.%d.", k, i)+strings.Repeat("x", 10))
1834                         }
1835                         closeWg.Done()
1836                 }(i)
1837         }
1838
1839         cmp := comparer.DefaultComparer
1840         for i := 0; i < n2; i++ {
1841                 closeWg.Add(1)
1842                 go func(i int) {
1843                         it := h.db.NewIterator(nil, nil)
1844                         var pk []byte
1845                         for it.Next() {
1846                                 kk := it.Key()
1847                                 if cmp.Compare(kk, pk) <= 0 {
1848                                         t.Errorf("iter %d: %q is successor of %q", i, pk, kk)
1849                                 }
1850                                 pk = append(pk[:0], kk...)
1851                                 var k, vk, vi int
1852                                 if n, err := fmt.Sscanf(string(it.Key()), "k%d", &k); err != nil {
1853                                         t.Errorf("iter %d: Scanf error on key %q: %v", i, it.Key(), err)
1854                                 } else if n < 1 {
1855                                         t.Errorf("iter %d: Cannot parse key %q", i, it.Key())
1856                                 }
1857                                 if n, err := fmt.Sscanf(string(it.Value()), "%d.%d", &vk, &vi); err != nil {
1858                                         t.Errorf("iter %d: Scanf error on value %q: %v", i, it.Value(), err)
1859                                 } else if n < 2 {
1860                                         t.Errorf("iter %d: Cannot parse value %q", i, it.Value())
1861                                 }
1862
1863                                 if vk != k {
1864                                         t.Errorf("iter %d: invalid value i=%d, want=%d got=%d", i, vi, k, vk)
1865                                 }
1866                         }
1867                         if err := it.Error(); err != nil {
1868                                 t.Errorf("iter %d: Got error: %v", i, err)
1869                         }
1870                         it.Release()
1871                         closeWg.Done()
1872                 }(i)
1873         }
1874
1875         atomic.StoreUint32(&stop, 1)
1876         closeWg.Wait()
1877 }
1878
1879 func TestDB_ConcurrentWrite(t *testing.T) {
1880         const n, bk, niter = 10, 3, 10000
1881         h := newDbHarness(t)
1882         defer h.close()
1883
1884         runtime.GOMAXPROCS(runtime.NumCPU())
1885
1886         var wg sync.WaitGroup
1887         for i := 0; i < n; i++ {
1888                 wg.Add(1)
1889                 go func(i int) {
1890                         defer wg.Done()
1891                         for k := 0; k < niter; k++ {
1892                                 kstr := fmt.Sprintf("put-%d.%d", i, k)
1893                                 vstr := fmt.Sprintf("v%d", k)
1894                                 h.put(kstr, vstr)
1895                                 // Key should immediately available after put returns.
1896                                 h.getVal(kstr, vstr)
1897                         }
1898                 }(i)
1899         }
1900         for i := 0; i < n; i++ {
1901                 wg.Add(1)
1902                 batch := &Batch{}
1903                 go func(i int) {
1904                         defer wg.Done()
1905                         for k := 0; k < niter; k++ {
1906                                 batch.Reset()
1907                                 for j := 0; j < bk; j++ {
1908                                         batch.Put([]byte(fmt.Sprintf("batch-%d.%d.%d", i, k, j)), []byte(fmt.Sprintf("v%d", k)))
1909                                 }
1910                                 h.write(batch)
1911                                 // Key should immediately available after put returns.
1912                                 for j := 0; j < bk; j++ {
1913                                         h.getVal(fmt.Sprintf("batch-%d.%d.%d", i, k, j), fmt.Sprintf("v%d", k))
1914                                 }
1915                         }
1916                 }(i)
1917         }
1918         wg.Wait()
1919 }
1920
1921 func TestDB_CreateReopenDbOnFile(t *testing.T) {
1922         dbpath := filepath.Join(os.TempDir(), fmt.Sprintf("goleveldbtestCreateReopenDbOnFile-%d", os.Getuid()))
1923         if err := os.RemoveAll(dbpath); err != nil {
1924                 t.Fatal("cannot remove old db: ", err)
1925         }
1926         defer os.RemoveAll(dbpath)
1927
1928         for i := 0; i < 3; i++ {
1929                 stor, err := storage.OpenFile(dbpath, false)
1930                 if err != nil {
1931                         t.Fatalf("(%d) cannot open storage: %s", i, err)
1932                 }
1933                 db, err := Open(stor, nil)
1934                 if err != nil {
1935                         t.Fatalf("(%d) cannot open db: %s", i, err)
1936                 }
1937                 if err := db.Put([]byte("foo"), []byte("bar"), nil); err != nil {
1938                         t.Fatalf("(%d) cannot write to db: %s", i, err)
1939                 }
1940                 if err := db.Close(); err != nil {
1941                         t.Fatalf("(%d) cannot close db: %s", i, err)
1942                 }
1943                 if err := stor.Close(); err != nil {
1944                         t.Fatalf("(%d) cannot close storage: %s", i, err)
1945                 }
1946         }
1947 }
1948
1949 func TestDB_CreateReopenDbOnFile2(t *testing.T) {
1950         dbpath := filepath.Join(os.TempDir(), fmt.Sprintf("goleveldbtestCreateReopenDbOnFile2-%d", os.Getuid()))
1951         if err := os.RemoveAll(dbpath); err != nil {
1952                 t.Fatal("cannot remove old db: ", err)
1953         }
1954         defer os.RemoveAll(dbpath)
1955
1956         for i := 0; i < 3; i++ {
1957                 db, err := OpenFile(dbpath, nil)
1958                 if err != nil {
1959                         t.Fatalf("(%d) cannot open db: %s", i, err)
1960                 }
1961                 if err := db.Put([]byte("foo"), []byte("bar"), nil); err != nil {
1962                         t.Fatalf("(%d) cannot write to db: %s", i, err)
1963                 }
1964                 if err := db.Close(); err != nil {
1965                         t.Fatalf("(%d) cannot close db: %s", i, err)
1966                 }
1967         }
1968 }
1969
1970 func TestDB_DeletionMarkersOnMemdb(t *testing.T) {
1971         h := newDbHarness(t)
1972         defer h.close()
1973
1974         h.put("foo", "v1")
1975         h.compactMem()
1976         h.delete("foo")
1977         h.get("foo", false)
1978         h.getKeyVal("")
1979 }
1980
1981 func TestDB_LeveldbIssue178(t *testing.T) {
1982         nKeys := (opt.DefaultCompactionTableSize / 30) * 5
1983         key1 := func(i int) string {
1984                 return fmt.Sprintf("my_key_%d", i)
1985         }
1986         key2 := func(i int) string {
1987                 return fmt.Sprintf("my_key_%d_xxx", i)
1988         }
1989
1990         // Disable compression since it affects the creation of layers and the
1991         // code below is trying to test against a very specific scenario.
1992         h := newDbHarnessWopt(t, &opt.Options{
1993                 DisableLargeBatchTransaction: true,
1994                 Compression:                  opt.NoCompression,
1995         })
1996         defer h.close()
1997
1998         // Create first key range.
1999         batch := new(Batch)
2000         for i := 0; i < nKeys; i++ {
2001                 batch.Put([]byte(key1(i)), []byte("value for range 1 key"))
2002         }
2003         h.write(batch)
2004
2005         // Create second key range.
2006         batch.Reset()
2007         for i := 0; i < nKeys; i++ {
2008                 batch.Put([]byte(key2(i)), []byte("value for range 2 key"))
2009         }
2010         h.write(batch)
2011
2012         // Delete second key range.
2013         batch.Reset()
2014         for i := 0; i < nKeys; i++ {
2015                 batch.Delete([]byte(key2(i)))
2016         }
2017         h.write(batch)
2018         h.waitMemCompaction()
2019
2020         // Run manual compaction.
2021         h.compactRange(key1(0), key1(nKeys-1))
2022
2023         // Checking the keys.
2024         h.assertNumKeys(nKeys)
2025 }
2026
2027 func TestDB_LeveldbIssue200(t *testing.T) {
2028         h := newDbHarness(t)
2029         defer h.close()
2030
2031         h.put("1", "b")
2032         h.put("2", "c")
2033         h.put("3", "d")
2034         h.put("4", "e")
2035         h.put("5", "f")
2036
2037         iter := h.db.NewIterator(nil, h.ro)
2038
2039         // Add an element that should not be reflected in the iterator.
2040         h.put("25", "cd")
2041
2042         iter.Seek([]byte("5"))
2043         assertBytes(t, []byte("5"), iter.Key())
2044         iter.Prev()
2045         assertBytes(t, []byte("4"), iter.Key())
2046         iter.Prev()
2047         assertBytes(t, []byte("3"), iter.Key())
2048         iter.Next()
2049         assertBytes(t, []byte("4"), iter.Key())
2050         iter.Next()
2051         assertBytes(t, []byte("5"), iter.Key())
2052 }
2053
2054 func TestDB_GoleveldbIssue74(t *testing.T) {
2055         h := newDbHarnessWopt(t, &opt.Options{
2056                 DisableLargeBatchTransaction: true,
2057                 WriteBuffer:                  1 * opt.MiB,
2058         })
2059         defer h.close()
2060
2061         const n, dur = 10000, 5 * time.Second
2062
2063         runtime.GOMAXPROCS(runtime.NumCPU())
2064
2065         until := time.Now().Add(dur)
2066         wg := new(sync.WaitGroup)
2067         wg.Add(2)
2068         var done uint32
2069         go func() {
2070                 var i int
2071                 defer func() {
2072                         t.Logf("WRITER DONE #%d", i)
2073                         atomic.StoreUint32(&done, 1)
2074                         wg.Done()
2075                 }()
2076
2077                 b := new(Batch)
2078                 for ; time.Now().Before(until) && atomic.LoadUint32(&done) == 0; i++ {
2079                         iv := fmt.Sprintf("VAL%010d", i)
2080                         for k := 0; k < n; k++ {
2081                                 key := fmt.Sprintf("KEY%06d", k)
2082                                 b.Put([]byte(key), []byte(key+iv))
2083                                 b.Put([]byte(fmt.Sprintf("PTR%06d", k)), []byte(key))
2084                         }
2085                         h.write(b)
2086
2087                         b.Reset()
2088                         snap := h.getSnapshot()
2089                         iter := snap.NewIterator(util.BytesPrefix([]byte("PTR")), nil)
2090                         var k int
2091                         for ; iter.Next(); k++ {
2092                                 ptrKey := iter.Key()
2093                                 key := iter.Value()
2094
2095                                 if _, err := snap.Get(ptrKey, nil); err != nil {
2096                                         t.Fatalf("WRITER #%d snapshot.Get %q: %v", i, ptrKey, err)
2097                                 }
2098                                 if value, err := snap.Get(key, nil); err != nil {
2099                                         t.Fatalf("WRITER #%d snapshot.Get %q: %v", i, key, err)
2100                                 } else if string(value) != string(key)+iv {
2101                                         t.Fatalf("WRITER #%d snapshot.Get %q got invalid value, want %q got %q", i, key, string(key)+iv, value)
2102                                 }
2103
2104                                 b.Delete(key)
2105                                 b.Delete(ptrKey)
2106                         }
2107                         h.write(b)
2108                         iter.Release()
2109                         snap.Release()
2110                         if k != n {
2111                                 t.Fatalf("#%d %d != %d", i, k, n)
2112                         }
2113                 }
2114         }()
2115         go func() {
2116                 var i int
2117                 defer func() {
2118                         t.Logf("READER DONE #%d", i)
2119                         atomic.StoreUint32(&done, 1)
2120                         wg.Done()
2121                 }()
2122                 for ; time.Now().Before(until) && atomic.LoadUint32(&done) == 0; i++ {
2123                         snap := h.getSnapshot()
2124                         iter := snap.NewIterator(util.BytesPrefix([]byte("PTR")), nil)
2125                         var prevValue string
2126                         var k int
2127                         for ; iter.Next(); k++ {
2128                                 ptrKey := iter.Key()
2129                                 key := iter.Value()
2130
2131                                 if _, err := snap.Get(ptrKey, nil); err != nil {
2132                                         t.Fatalf("READER #%d snapshot.Get %q: %v", i, ptrKey, err)
2133                                 }
2134
2135                                 if value, err := snap.Get(key, nil); err != nil {
2136                                         t.Fatalf("READER #%d snapshot.Get %q: %v", i, key, err)
2137                                 } else if prevValue != "" && string(value) != string(key)+prevValue {
2138                                         t.Fatalf("READER #%d snapshot.Get %q got invalid value, want %q got %q", i, key, string(key)+prevValue, value)
2139                                 } else {
2140                                         prevValue = string(value[len(key):])
2141                                 }
2142                         }
2143                         iter.Release()
2144                         snap.Release()
2145                         if k > 0 && k != n {
2146                                 t.Fatalf("#%d %d != %d", i, k, n)
2147                         }
2148                 }
2149         }()
2150         wg.Wait()
2151 }
2152
2153 func TestDB_GetProperties(t *testing.T) {
2154         h := newDbHarness(t)
2155         defer h.close()
2156
2157         _, err := h.db.GetProperty("leveldb.num-files-at-level")
2158         if err == nil {
2159                 t.Error("GetProperty() failed to detect missing level")
2160         }
2161
2162         _, err = h.db.GetProperty("leveldb.num-files-at-level0")
2163         if err != nil {
2164                 t.Error("got unexpected error", err)
2165         }
2166
2167         _, err = h.db.GetProperty("leveldb.num-files-at-level0x")
2168         if err == nil {
2169                 t.Error("GetProperty() failed to detect invalid level")
2170         }
2171 }
2172
2173 func TestDB_GoleveldbIssue72and83(t *testing.T) {
2174         h := newDbHarnessWopt(t, &opt.Options{
2175                 DisableLargeBatchTransaction: true,
2176                 WriteBuffer:                  1 * opt.MiB,
2177                 OpenFilesCacheCapacity:       3,
2178         })
2179         defer h.close()
2180
2181         const n, wn, dur = 10000, 100, 30 * time.Second
2182
2183         runtime.GOMAXPROCS(runtime.NumCPU())
2184
2185         randomData := func(prefix byte, i int) []byte {
2186                 data := make([]byte, 1+4+32+64+32)
2187                 _, err := crand.Reader.Read(data[1 : len(data)-8])
2188                 if err != nil {
2189                         panic(err)
2190                 }
2191                 data[0] = prefix
2192                 binary.LittleEndian.PutUint32(data[len(data)-8:], uint32(i))
2193                 binary.LittleEndian.PutUint32(data[len(data)-4:], util.NewCRC(data[:len(data)-4]).Value())
2194                 return data
2195         }
2196
2197         keys := make([][]byte, n)
2198         for i := range keys {
2199                 keys[i] = randomData(1, 0)
2200         }
2201
2202         until := time.Now().Add(dur)
2203         wg := new(sync.WaitGroup)
2204         wg.Add(3)
2205         var done uint32
2206         go func() {
2207                 i := 0
2208                 defer func() {
2209                         t.Logf("WRITER DONE #%d", i)
2210                         wg.Done()
2211                 }()
2212
2213                 b := new(Batch)
2214                 for ; i < wn && atomic.LoadUint32(&done) == 0; i++ {
2215                         b.Reset()
2216                         for _, k1 := range keys {
2217                                 k2 := randomData(2, i)
2218                                 b.Put(k2, randomData(42, i))
2219                                 b.Put(k1, k2)
2220                         }
2221                         if err := h.db.Write(b, h.wo); err != nil {
2222                                 atomic.StoreUint32(&done, 1)
2223                                 t.Fatalf("WRITER #%d db.Write: %v", i, err)
2224                         }
2225                 }
2226         }()
2227         go func() {
2228                 var i int
2229                 defer func() {
2230                         t.Logf("READER0 DONE #%d", i)
2231                         atomic.StoreUint32(&done, 1)
2232                         wg.Done()
2233                 }()
2234                 for ; time.Now().Before(until) && atomic.LoadUint32(&done) == 0; i++ {
2235                         snap := h.getSnapshot()
2236                         seq := snap.elem.seq
2237                         if seq == 0 {
2238                                 snap.Release()
2239                                 continue
2240                         }
2241                         iter := snap.NewIterator(util.BytesPrefix([]byte{1}), nil)
2242                         writei := int(seq/(n*2) - 1)
2243                         var k int
2244                         for ; iter.Next(); k++ {
2245                                 k1 := iter.Key()
2246                                 k2 := iter.Value()
2247                                 k1checksum0 := binary.LittleEndian.Uint32(k1[len(k1)-4:])
2248                                 k1checksum1 := util.NewCRC(k1[:len(k1)-4]).Value()
2249                                 if k1checksum0 != k1checksum1 {
2250                                         t.Fatalf("READER0 #%d.%d W#%d invalid K1 checksum: %#x != %#x", i, k, k1checksum0, k1checksum0)
2251                                 }
2252                                 k2checksum0 := binary.LittleEndian.Uint32(k2[len(k2)-4:])
2253                                 k2checksum1 := util.NewCRC(k2[:len(k2)-4]).Value()
2254                                 if k2checksum0 != k2checksum1 {
2255                                         t.Fatalf("READER0 #%d.%d W#%d invalid K2 checksum: %#x != %#x", i, k, k2checksum0, k2checksum1)
2256                                 }
2257                                 kwritei := int(binary.LittleEndian.Uint32(k2[len(k2)-8:]))
2258                                 if writei != kwritei {
2259                                         t.Fatalf("READER0 #%d.%d W#%d invalid write iteration num: %d", i, k, writei, kwritei)
2260                                 }
2261                                 if _, err := snap.Get(k2, nil); err != nil {
2262                                         t.Fatalf("READER0 #%d.%d W#%d snap.Get: %v\nk1: %x\n -> k2: %x", i, k, writei, err, k1, k2)
2263                                 }
2264                         }
2265                         if err := iter.Error(); err != nil {
2266                                 t.Fatalf("READER0 #%d.%d W#%d snap.Iterator: %v", i, k, writei, err)
2267                         }
2268                         iter.Release()
2269                         snap.Release()
2270                         if k > 0 && k != n {
2271                                 t.Fatalf("READER0 #%d W#%d short read, got=%d want=%d", i, writei, k, n)
2272                         }
2273                 }
2274         }()
2275         go func() {
2276                 var i int
2277                 defer func() {
2278                         t.Logf("READER1 DONE #%d", i)
2279                         atomic.StoreUint32(&done, 1)
2280                         wg.Done()
2281                 }()
2282                 for ; time.Now().Before(until) && atomic.LoadUint32(&done) == 0; i++ {
2283                         iter := h.db.NewIterator(nil, nil)
2284                         seq := iter.(*dbIter).seq
2285                         if seq == 0 {
2286                                 iter.Release()
2287                                 continue
2288                         }
2289                         writei := int(seq/(n*2) - 1)
2290                         var k int
2291                         for ok := iter.Last(); ok; ok = iter.Prev() {
2292                                 k++
2293                         }
2294                         if err := iter.Error(); err != nil {
2295                                 t.Fatalf("READER1 #%d.%d W#%d db.Iterator: %v", i, k, writei, err)
2296                         }
2297                         iter.Release()
2298                         if m := (writei+1)*n + n; k != m {
2299                                 t.Fatalf("READER1 #%d W#%d short read, got=%d want=%d", i, writei, k, m)
2300                         }
2301                 }
2302         }()
2303
2304         wg.Wait()
2305 }
2306
2307 func TestDB_TransientError(t *testing.T) {
2308         h := newDbHarnessWopt(t, &opt.Options{
2309                 DisableLargeBatchTransaction: true,
2310                 WriteBuffer:                  128 * opt.KiB,
2311                 OpenFilesCacheCapacity:       3,
2312                 DisableCompactionBackoff:     true,
2313         })
2314         defer h.close()
2315
2316         const (
2317                 nSnap = 20
2318                 nKey  = 10000
2319         )
2320
2321         var (
2322                 snaps [nSnap]*Snapshot
2323                 b     = &Batch{}
2324         )
2325         for i := range snaps {
2326                 vtail := fmt.Sprintf("VAL%030d", i)
2327                 b.Reset()
2328                 for k := 0; k < nKey; k++ {
2329                         key := fmt.Sprintf("KEY%8d", k)
2330                         b.Put([]byte(key), []byte(key+vtail))
2331                 }
2332                 h.stor.EmulateError(testutil.ModeOpen|testutil.ModeRead, storage.TypeTable, errors.New("table transient read error"))
2333                 if err := h.db.Write(b, nil); err != nil {
2334                         t.Logf("WRITE #%d error: %v", i, err)
2335                         h.stor.EmulateError(testutil.ModeOpen|testutil.ModeRead, storage.TypeTable, nil)
2336                         for {
2337                                 if err := h.db.Write(b, nil); err == nil {
2338                                         break
2339                                 } else if errors.IsCorrupted(err) {
2340                                         t.Fatalf("WRITE #%d corrupted: %v", i, err)
2341                                 }
2342                         }
2343                 }
2344
2345                 snaps[i] = h.db.newSnapshot()
2346                 b.Reset()
2347                 for k := 0; k < nKey; k++ {
2348                         key := fmt.Sprintf("KEY%8d", k)
2349                         b.Delete([]byte(key))
2350                 }
2351                 h.stor.EmulateError(testutil.ModeOpen|testutil.ModeRead, storage.TypeTable, errors.New("table transient read error"))
2352                 if err := h.db.Write(b, nil); err != nil {
2353                         t.Logf("WRITE #%d  error: %v", i, err)
2354                         h.stor.EmulateError(testutil.ModeOpen|testutil.ModeRead, storage.TypeTable, nil)
2355                         for {
2356                                 if err := h.db.Write(b, nil); err == nil {
2357                                         break
2358                                 } else if errors.IsCorrupted(err) {
2359                                         t.Fatalf("WRITE #%d corrupted: %v", i, err)
2360                                 }
2361                         }
2362                 }
2363         }
2364         h.stor.EmulateError(testutil.ModeOpen|testutil.ModeRead, storage.TypeTable, nil)
2365
2366         runtime.GOMAXPROCS(runtime.NumCPU())
2367
2368         rnd := rand.New(rand.NewSource(0xecafdaed))
2369         wg := &sync.WaitGroup{}
2370         for i, snap := range snaps {
2371                 wg.Add(2)
2372
2373                 go func(i int, snap *Snapshot, sk []int) {
2374                         defer wg.Done()
2375
2376                         vtail := fmt.Sprintf("VAL%030d", i)
2377                         for _, k := range sk {
2378                                 key := fmt.Sprintf("KEY%8d", k)
2379                                 xvalue, err := snap.Get([]byte(key), nil)
2380                                 if err != nil {
2381                                         t.Fatalf("READER_GET #%d SEQ=%d K%d error: %v", i, snap.elem.seq, k, err)
2382                                 }
2383                                 value := key + vtail
2384                                 if !bytes.Equal([]byte(value), xvalue) {
2385                                         t.Fatalf("READER_GET #%d SEQ=%d K%d invalid value: want %q, got %q", i, snap.elem.seq, k, value, xvalue)
2386                                 }
2387                         }
2388                 }(i, snap, rnd.Perm(nKey))
2389
2390                 go func(i int, snap *Snapshot) {
2391                         defer wg.Done()
2392
2393                         vtail := fmt.Sprintf("VAL%030d", i)
2394                         iter := snap.NewIterator(nil, nil)
2395                         defer iter.Release()
2396                         for k := 0; k < nKey; k++ {
2397                                 if !iter.Next() {
2398                                         if err := iter.Error(); err != nil {
2399                                                 t.Fatalf("READER_ITER #%d K%d error: %v", i, k, err)
2400                                         } else {
2401                                                 t.Fatalf("READER_ITER #%d K%d eoi", i, k)
2402                                         }
2403                                 }
2404                                 key := fmt.Sprintf("KEY%8d", k)
2405                                 xkey := iter.Key()
2406                                 if !bytes.Equal([]byte(key), xkey) {
2407                                         t.Fatalf("READER_ITER #%d K%d invalid key: want %q, got %q", i, k, key, xkey)
2408                                 }
2409                                 value := key + vtail
2410                                 xvalue := iter.Value()
2411                                 if !bytes.Equal([]byte(value), xvalue) {
2412                                         t.Fatalf("READER_ITER #%d K%d invalid value: want %q, got %q", i, k, value, xvalue)
2413                                 }
2414                         }
2415                 }(i, snap)
2416         }
2417
2418         wg.Wait()
2419 }
2420
2421 func TestDB_UkeyShouldntHopAcrossTable(t *testing.T) {
2422         h := newDbHarnessWopt(t, &opt.Options{
2423                 DisableLargeBatchTransaction: true,
2424                 WriteBuffer:                  112 * opt.KiB,
2425                 CompactionTableSize:          90 * opt.KiB,
2426                 CompactionExpandLimitFactor:  1,
2427         })
2428         defer h.close()
2429
2430         const (
2431                 nSnap = 190
2432                 nKey  = 140
2433         )
2434
2435         var (
2436                 snaps [nSnap]*Snapshot
2437                 b     = &Batch{}
2438         )
2439         for i := range snaps {
2440                 vtail := fmt.Sprintf("VAL%030d", i)
2441                 b.Reset()
2442                 for k := 0; k < nKey; k++ {
2443                         key := fmt.Sprintf("KEY%08d", k)
2444                         b.Put([]byte(key), []byte(key+vtail))
2445                 }
2446                 if err := h.db.Write(b, nil); err != nil {
2447                         t.Fatalf("WRITE #%d error: %v", i, err)
2448                 }
2449
2450                 snaps[i] = h.db.newSnapshot()
2451                 b.Reset()
2452                 for k := 0; k < nKey; k++ {
2453                         key := fmt.Sprintf("KEY%08d", k)
2454                         b.Delete([]byte(key))
2455                 }
2456                 if err := h.db.Write(b, nil); err != nil {
2457                         t.Fatalf("WRITE #%d  error: %v", i, err)
2458                 }
2459         }
2460
2461         h.compactMem()
2462
2463         h.waitCompaction()
2464         for level, tables := range h.db.s.stVersion.levels {
2465                 for _, table := range tables {
2466                         t.Logf("L%d@%d %q:%q", level, table.fd.Num, table.imin, table.imax)
2467                 }
2468         }
2469
2470         h.compactRangeAt(0, "", "")
2471         h.waitCompaction()
2472         for level, tables := range h.db.s.stVersion.levels {
2473                 for _, table := range tables {
2474                         t.Logf("L%d@%d %q:%q", level, table.fd.Num, table.imin, table.imax)
2475                 }
2476         }
2477         h.compactRangeAt(1, "", "")
2478         h.waitCompaction()
2479         for level, tables := range h.db.s.stVersion.levels {
2480                 for _, table := range tables {
2481                         t.Logf("L%d@%d %q:%q", level, table.fd.Num, table.imin, table.imax)
2482                 }
2483         }
2484         runtime.GOMAXPROCS(runtime.NumCPU())
2485
2486         wg := &sync.WaitGroup{}
2487         for i, snap := range snaps {
2488                 wg.Add(1)
2489
2490                 go func(i int, snap *Snapshot) {
2491                         defer wg.Done()
2492
2493                         vtail := fmt.Sprintf("VAL%030d", i)
2494                         for k := 0; k < nKey; k++ {
2495                                 key := fmt.Sprintf("KEY%08d", k)
2496                                 xvalue, err := snap.Get([]byte(key), nil)
2497                                 if err != nil {
2498                                         t.Fatalf("READER_GET #%d SEQ=%d K%d error: %v", i, snap.elem.seq, k, err)
2499                                 }
2500                                 value := key + vtail
2501                                 if !bytes.Equal([]byte(value), xvalue) {
2502                                         t.Fatalf("READER_GET #%d SEQ=%d K%d invalid value: want %q, got %q", i, snap.elem.seq, k, value, xvalue)
2503                                 }
2504                         }
2505                 }(i, snap)
2506         }
2507
2508         wg.Wait()
2509 }
2510
2511 func TestDB_TableCompactionBuilder(t *testing.T) {
2512         gomega.RegisterTestingT(t)
2513         stor := testutil.NewStorage()
2514         stor.OnLog(testingLogger(t))
2515         stor.OnClose(testingPreserveOnFailed(t))
2516         defer stor.Close()
2517
2518         const nSeq = 99
2519
2520         o := &opt.Options{
2521                 DisableLargeBatchTransaction: true,
2522                 WriteBuffer:                  112 * opt.KiB,
2523                 CompactionTableSize:          43 * opt.KiB,
2524                 CompactionExpandLimitFactor:  1,
2525                 CompactionGPOverlapsFactor:   1,
2526                 DisableBlockCache:            true,
2527         }
2528         s, err := newSession(stor, o)
2529         if err != nil {
2530                 t.Fatal(err)
2531         }
2532         if err := s.create(); err != nil {
2533                 t.Fatal(err)
2534         }
2535         defer s.close()
2536         var (
2537                 seq        uint64
2538                 targetSize = 5 * o.CompactionTableSize
2539                 value      = bytes.Repeat([]byte{'0'}, 100)
2540         )
2541         for i := 0; i < 2; i++ {
2542                 tw, err := s.tops.create()
2543                 if err != nil {
2544                         t.Fatal(err)
2545                 }
2546                 for k := 0; tw.tw.BytesLen() < targetSize; k++ {
2547                         key := []byte(fmt.Sprintf("%09d", k))
2548                         seq += nSeq - 1
2549                         for x := uint64(0); x < nSeq; x++ {
2550                                 if err := tw.append(makeInternalKey(nil, key, seq-x, keyTypeVal), value); err != nil {
2551                                         t.Fatal(err)
2552                                 }
2553                         }
2554                 }
2555                 tf, err := tw.finish()
2556                 if err != nil {
2557                         t.Fatal(err)
2558                 }
2559                 rec := &sessionRecord{}
2560                 rec.addTableFile(i, tf)
2561                 if err := s.commit(rec); err != nil {
2562                         t.Fatal(err)
2563                 }
2564         }
2565
2566         // Build grandparent.
2567         v := s.version()
2568         c := newCompaction(s, v, 1, append(tFiles{}, v.levels[1]...))
2569         rec := &sessionRecord{}
2570         b := &tableCompactionBuilder{
2571                 s:         s,
2572                 c:         c,
2573                 rec:       rec,
2574                 stat1:     new(cStatStaging),
2575                 minSeq:    0,
2576                 strict:    true,
2577                 tableSize: o.CompactionTableSize/3 + 961,
2578         }
2579         if err := b.run(new(compactionTransactCounter)); err != nil {
2580                 t.Fatal(err)
2581         }
2582         for _, t := range c.levels[0] {
2583                 rec.delTable(c.sourceLevel, t.fd.Num)
2584         }
2585         if err := s.commit(rec); err != nil {
2586                 t.Fatal(err)
2587         }
2588         c.release()
2589
2590         // Build level-1.
2591         v = s.version()
2592         c = newCompaction(s, v, 0, append(tFiles{}, v.levels[0]...))
2593         rec = &sessionRecord{}
2594         b = &tableCompactionBuilder{
2595                 s:         s,
2596                 c:         c,
2597                 rec:       rec,
2598                 stat1:     new(cStatStaging),
2599                 minSeq:    0,
2600                 strict:    true,
2601                 tableSize: o.CompactionTableSize,
2602         }
2603         if err := b.run(new(compactionTransactCounter)); err != nil {
2604                 t.Fatal(err)
2605         }
2606         for _, t := range c.levels[0] {
2607                 rec.delTable(c.sourceLevel, t.fd.Num)
2608         }
2609         // Move grandparent to level-3
2610         for _, t := range v.levels[2] {
2611                 rec.delTable(2, t.fd.Num)
2612                 rec.addTableFile(3, t)
2613         }
2614         if err := s.commit(rec); err != nil {
2615                 t.Fatal(err)
2616         }
2617         c.release()
2618
2619         v = s.version()
2620         for level, want := range []bool{false, true, false, true} {
2621                 got := len(v.levels[level]) > 0
2622                 if want != got {
2623                         t.Fatalf("invalid level-%d tables len: want %v, got %v", level, want, got)
2624                 }
2625         }
2626         for i, f := range v.levels[1][:len(v.levels[1])-1] {
2627                 nf := v.levels[1][i+1]
2628                 if bytes.Equal(f.imax.ukey(), nf.imin.ukey()) {
2629                         t.Fatalf("KEY %q hop across table %d .. %d", f.imax.ukey(), f.fd.Num, nf.fd.Num)
2630                 }
2631         }
2632         v.release()
2633
2634         // Compaction with transient error.
2635         v = s.version()
2636         c = newCompaction(s, v, 1, append(tFiles{}, v.levels[1]...))
2637         rec = &sessionRecord{}
2638         b = &tableCompactionBuilder{
2639                 s:         s,
2640                 c:         c,
2641                 rec:       rec,
2642                 stat1:     new(cStatStaging),
2643                 minSeq:    0,
2644                 strict:    true,
2645                 tableSize: o.CompactionTableSize,
2646         }
2647         stor.EmulateErrorOnce(testutil.ModeSync, storage.TypeTable, errors.New("table sync error (once)"))
2648         stor.EmulateRandomError(testutil.ModeRead|testutil.ModeWrite, storage.TypeTable, 0.01, errors.New("table random IO error"))
2649         for {
2650                 if err := b.run(new(compactionTransactCounter)); err != nil {
2651                         t.Logf("(expected) b.run: %v", err)
2652                 } else {
2653                         break
2654                 }
2655         }
2656         if err := s.commit(rec); err != nil {
2657                 t.Fatal(err)
2658         }
2659         c.release()
2660
2661         stor.EmulateErrorOnce(testutil.ModeSync, storage.TypeTable, nil)
2662         stor.EmulateRandomError(testutil.ModeRead|testutil.ModeWrite, storage.TypeTable, 0, nil)
2663
2664         v = s.version()
2665         if len(v.levels[1]) != len(v.levels[2]) {
2666                 t.Fatalf("invalid tables length, want %d, got %d", len(v.levels[1]), len(v.levels[2]))
2667         }
2668         for i, f0 := range v.levels[1] {
2669                 f1 := v.levels[2][i]
2670                 iter0 := s.tops.newIterator(f0, nil, nil)
2671                 iter1 := s.tops.newIterator(f1, nil, nil)
2672                 for j := 0; true; j++ {
2673                         next0 := iter0.Next()
2674                         next1 := iter1.Next()
2675                         if next0 != next1 {
2676                                 t.Fatalf("#%d.%d invalid eoi: want %v, got %v", i, j, next0, next1)
2677                         }
2678                         key0 := iter0.Key()
2679                         key1 := iter1.Key()
2680                         if !bytes.Equal(key0, key1) {
2681                                 t.Fatalf("#%d.%d invalid key: want %q, got %q", i, j, key0, key1)
2682                         }
2683                         if next0 == false {
2684                                 break
2685                         }
2686                 }
2687                 iter0.Release()
2688                 iter1.Release()
2689         }
2690         v.release()
2691 }
2692
2693 func testDB_IterTriggeredCompaction(t *testing.T, limitDiv int) {
2694         const (
2695                 vSize = 200 * opt.KiB
2696                 tSize = 100 * opt.MiB
2697                 mIter = 100
2698                 n     = tSize / vSize
2699         )
2700
2701         h := newDbHarnessWopt(t, &opt.Options{
2702                 DisableLargeBatchTransaction: true,
2703                 Compression:                  opt.NoCompression,
2704                 DisableBlockCache:            true,
2705         })
2706         defer h.close()
2707
2708         h.db.memdbMaxLevel = 2
2709
2710         key := func(x int) string {
2711                 return fmt.Sprintf("v%06d", x)
2712         }
2713
2714         // Fill.
2715         value := strings.Repeat("x", vSize)
2716         for i := 0; i < n; i++ {
2717                 h.put(key(i), value)
2718         }
2719         h.compactMem()
2720
2721         // Delete all.
2722         for i := 0; i < n; i++ {
2723                 h.delete(key(i))
2724         }
2725         h.compactMem()
2726
2727         var (
2728                 limit = n / limitDiv
2729
2730                 startKey = key(0)
2731                 limitKey = key(limit)
2732                 maxKey   = key(n)
2733                 slice    = &util.Range{Limit: []byte(limitKey)}
2734
2735                 initialSize0 = h.sizeOf(startKey, limitKey)
2736                 initialSize1 = h.sizeOf(limitKey, maxKey)
2737         )
2738
2739         t.Logf("inital size %s [rest %s]", shortenb(int(initialSize0)), shortenb(int(initialSize1)))
2740
2741         for r := 0; true; r++ {
2742                 if r >= mIter {
2743                         t.Fatal("taking too long to compact")
2744                 }
2745
2746                 // Iterates.
2747                 iter := h.db.NewIterator(slice, h.ro)
2748                 for iter.Next() {
2749                 }
2750                 if err := iter.Error(); err != nil {
2751                         t.Fatalf("Iter err: %v", err)
2752                 }
2753                 iter.Release()
2754
2755                 // Wait compaction.
2756                 h.waitCompaction()
2757
2758                 // Check size.
2759                 size0 := h.sizeOf(startKey, limitKey)
2760                 size1 := h.sizeOf(limitKey, maxKey)
2761                 t.Logf("#%03d size %s [rest %s]", r, shortenb(int(size0)), shortenb(int(size1)))
2762                 if size0 < initialSize0/10 {
2763                         break
2764                 }
2765         }
2766
2767         if initialSize1 > 0 {
2768                 h.sizeAssert(limitKey, maxKey, initialSize1/4-opt.MiB, initialSize1+opt.MiB)
2769         }
2770 }
2771
2772 func TestDB_IterTriggeredCompaction(t *testing.T) {
2773         testDB_IterTriggeredCompaction(t, 1)
2774 }
2775
2776 func TestDB_IterTriggeredCompactionHalf(t *testing.T) {
2777         testDB_IterTriggeredCompaction(t, 2)
2778 }
2779
2780 func TestDB_ReadOnly(t *testing.T) {
2781         h := newDbHarness(t)
2782         defer h.close()
2783
2784         h.put("foo", "v1")
2785         h.put("bar", "v2")
2786         h.compactMem()
2787
2788         h.put("xfoo", "v1")
2789         h.put("xbar", "v2")
2790
2791         t.Log("Trigger read-only")
2792         if err := h.db.SetReadOnly(); err != nil {
2793                 h.close()
2794                 t.Fatalf("SetReadOnly error: %v", err)
2795         }
2796
2797         mode := testutil.ModeCreate | testutil.ModeRemove | testutil.ModeRename | testutil.ModeWrite | testutil.ModeSync
2798         h.stor.EmulateError(mode, storage.TypeAll, errors.New("read-only DB shouldn't writes"))
2799
2800         ro := func(key, value, wantValue string) {
2801                 if err := h.db.Put([]byte(key), []byte(value), h.wo); err != ErrReadOnly {
2802                         t.Fatalf("unexpected error: %v", err)
2803                 }
2804                 h.getVal(key, wantValue)
2805         }
2806
2807         ro("foo", "vx", "v1")
2808
2809         h.o.ReadOnly = true
2810         h.reopenDB()
2811
2812         ro("foo", "vx", "v1")
2813         ro("bar", "vx", "v2")
2814         h.assertNumKeys(4)
2815 }
2816
2817 func TestDB_BulkInsertDelete(t *testing.T) {
2818         h := newDbHarnessWopt(t, &opt.Options{
2819                 DisableLargeBatchTransaction: true,
2820                 Compression:                  opt.NoCompression,
2821                 CompactionTableSize:          128 * opt.KiB,
2822                 CompactionTotalSize:          1 * opt.MiB,
2823                 WriteBuffer:                  256 * opt.KiB,
2824         })
2825         defer h.close()
2826
2827         const R = 100
2828         const N = 2500
2829         key := make([]byte, 4)
2830         value := make([]byte, 256)
2831         for i := 0; i < R; i++ {
2832                 offset := N * i
2833                 for j := 0; j < N; j++ {
2834                         binary.BigEndian.PutUint32(key, uint32(offset+j))
2835                         h.db.Put(key, value, nil)
2836                 }
2837                 for j := 0; j < N; j++ {
2838                         binary.BigEndian.PutUint32(key, uint32(offset+j))
2839                         h.db.Delete(key, nil)
2840                 }
2841         }
2842
2843         if tot := h.totalTables(); tot > 10 {
2844                 t.Fatalf("too many uncompacted tables: %d (%s)", tot, h.getTablesPerLevel())
2845         }
2846 }
2847
2848 func TestDB_GracefulClose(t *testing.T) {
2849         runtime.GOMAXPROCS(4)
2850         h := newDbHarnessWopt(t, &opt.Options{
2851                 DisableLargeBatchTransaction: true,
2852                 Compression:                  opt.NoCompression,
2853                 CompactionTableSize:          1 * opt.MiB,
2854                 WriteBuffer:                  1 * opt.MiB,
2855         })
2856         defer h.close()
2857
2858         var closeWait sync.WaitGroup
2859
2860         // During write.
2861         n := 0
2862         closing := false
2863         for i := 0; i < 1000000; i++ {
2864                 if !closing && h.totalTables() > 3 {
2865                         t.Logf("close db during write, index=%d", i)
2866                         closeWait.Add(1)
2867                         go func() {
2868                                 h.closeDB()
2869                                 closeWait.Done()
2870                         }()
2871                         closing = true
2872                 }
2873                 if err := h.db.Put([]byte(fmt.Sprintf("%09d", i)), []byte(fmt.Sprintf("VAL-%09d", i)), h.wo); err != nil {
2874                         t.Logf("Put error: %s (expected)", err)
2875                         n = i
2876                         break
2877                 }
2878         }
2879         closeWait.Wait()
2880
2881         // During read.
2882         h.openDB()
2883         closing = false
2884         for i := 0; i < n; i++ {
2885                 if !closing && i > n/2 {
2886                         t.Logf("close db during read, index=%d", i)
2887                         closeWait.Add(1)
2888                         go func() {
2889                                 h.closeDB()
2890                                 closeWait.Done()
2891                         }()
2892                         closing = true
2893                 }
2894                 if _, err := h.db.Get([]byte(fmt.Sprintf("%09d", i)), h.ro); err != nil {
2895                         t.Logf("Get error: %s (expected)", err)
2896                         break
2897                 }
2898         }
2899         closeWait.Wait()
2900
2901         // During iterate.
2902         h.openDB()
2903         closing = false
2904         iter := h.db.NewIterator(nil, h.ro)
2905         for i := 0; iter.Next(); i++ {
2906                 if len(iter.Key()) == 0 || len(iter.Value()) == 0 {
2907                         t.Error("Key or value has zero length")
2908                 }
2909                 if !closing {
2910                         t.Logf("close db during iter, index=%d", i)
2911                         closeWait.Add(1)
2912                         go func() {
2913                                 h.closeDB()
2914                                 closeWait.Done()
2915                         }()
2916                         closing = true
2917                 }
2918                 time.Sleep(time.Millisecond)
2919         }
2920         if err := iter.Error(); err != nil {
2921                 t.Logf("Iter error: %s (expected)", err)
2922         }
2923         iter.Release()
2924         closeWait.Wait()
2925 }