22 "github.com/syndtr/goleveldb/leveldb"
23 "github.com/syndtr/goleveldb/leveldb/errors"
24 "github.com/syndtr/goleveldb/leveldb/opt"
25 "github.com/syndtr/goleveldb/leveldb/storage"
26 "github.com/syndtr/goleveldb/leveldb/table"
27 "github.com/syndtr/goleveldb/leveldb/util"
31 dbPath = path.Join(os.TempDir(), "goleveldb-testdb")
32 openFilesCacheCapacity = 500
35 numKeys = arrayInt{100000, 1332, 531, 1234, 9553, 1024, 35743}
36 httpProf = "127.0.0.1:5454"
38 enableBlockCache = false
39 enableCompression = false
40 enableBufferPool = false
42 wg = new(sync.WaitGroup)
45 bpool *util.BufferPool
50 func (a arrayInt) String() string {
56 str += strconv.Itoa(n)
61 func (a *arrayInt) Set(str string) error {
63 for _, s := range strings.Split(str, ",") {
64 s = strings.TrimSpace(s)
66 n, err := strconv.Atoi(s)
78 flag.StringVar(&dbPath, "db", dbPath, "testdb path")
79 flag.IntVar(&openFilesCacheCapacity, "openfilescachecap", openFilesCacheCapacity, "open files cache capacity")
80 flag.IntVar(&keyLen, "keylen", keyLen, "key length")
81 flag.IntVar(&valueLen, "valuelen", valueLen, "value length")
82 flag.Var(&numKeys, "numkeys", "num keys")
83 flag.StringVar(&httpProf, "httpprof", httpProf, "http pprof listen addr")
84 flag.Float64Var(&transactionProb, "transactionprob", transactionProb, "probablity of writes using transaction")
85 flag.BoolVar(&enableBufferPool, "enablebufferpool", enableBufferPool, "enable buffer pool")
86 flag.BoolVar(&enableBlockCache, "enableblockcache", enableBlockCache, "enable block cache")
87 flag.BoolVar(&enableCompression, "enablecompression", enableCompression, "enable block compression")
90 func randomData(dst []byte, ns, prefix byte, i uint32, dataLen int) []byte {
91 if dataLen < (2+4+4)*2+4 {
92 panic("dataLen is too small")
94 if cap(dst) < dataLen {
95 dst = make([]byte, dataLen)
99 half := (dataLen - 4) / 2
100 if _, err := rand.Reader.Read(dst[2 : half-8]); err != nil {
105 binary.LittleEndian.PutUint32(dst[half-8:], i)
106 binary.LittleEndian.PutUint32(dst[half-8:], i)
107 binary.LittleEndian.PutUint32(dst[half-4:], util.NewCRC(dst[:half-4]).Value())
109 copy(dst[half:full], dst[:half])
110 if full < dataLen-4 {
111 if _, err := rand.Reader.Read(dst[full : dataLen-4]); err != nil {
115 binary.LittleEndian.PutUint32(dst[dataLen-4:], util.NewCRC(dst[:dataLen-4]).Value())
119 func dataSplit(data []byte) (data0, data1 []byte) {
120 n := (len(data) - 4) / 2
121 return data[:n], data[n : n+n]
124 func dataNS(data []byte) byte {
128 func dataPrefix(data []byte) byte {
132 func dataI(data []byte) uint32 {
133 return binary.LittleEndian.Uint32(data[(len(data)-4)/2-8:])
136 func dataChecksum(data []byte) (uint32, uint32) {
137 checksum0 := binary.LittleEndian.Uint32(data[len(data)-4:])
138 checksum1 := util.NewCRC(data[:len(data)-4]).Value()
139 return checksum0, checksum1
142 func dataPrefixSlice(ns, prefix byte) *util.Range {
143 return util.BytesPrefix([]byte{ns, prefix})
146 func dataNsSlice(ns byte) *util.Range {
147 return util.BytesPrefix([]byte{ns})
150 type testingStorage struct {
154 func (ts *testingStorage) scanTable(fd storage.FileDesc, checksum bool) (corrupted bool) {
155 r, err := ts.Open(fd)
161 size, err := r.Seek(0, os.SEEK_END)
167 DisableLargeBatchTransaction: true,
168 Strict: opt.NoStrict,
171 o.Strict = opt.StrictBlockChecksum | opt.StrictReader
173 tr, err := table.NewReader(r, size, fd, nil, bpool, o)
179 checkData := func(i int, t string, data []byte) bool {
181 panic(fmt.Sprintf("[%v] nil data: i=%d t=%s", fd, i, t))
184 checksum0, checksum1 := dataChecksum(data)
185 if checksum0 != checksum1 {
186 atomic.StoreUint32(&fail, 1)
187 atomic.StoreUint32(&done, 1)
190 data0, data1 := dataSplit(data)
191 data0c0, data0c1 := dataChecksum(data0)
192 data1c0, data1c1 := dataChecksum(data1)
193 log.Printf("FATAL: [%v] Corrupted data i=%d t=%s (%#x != %#x): %x(%v) vs %x(%v)",
194 fd, i, t, checksum0, checksum1, data0, data0c0 == data0c1, data1, data1c0 == data1c1)
200 iter := tr.NewIterator(nil, nil)
202 for i := 0; iter.Next(); i++ {
203 ukey, _, kt, kerr := parseIkey(iter.Key())
205 atomic.StoreUint32(&fail, 1)
206 atomic.StoreUint32(&done, 1)
209 log.Printf("FATAL: [%v] Corrupted ikey i=%d: %v", fd, i, kerr)
212 if checkData(i, "key", ukey) {
215 if kt == ktVal && checkData(i, "value", iter.Value()) {
219 if err := iter.Error(); err != nil {
220 if errors.IsCorrupted(err) {
221 atomic.StoreUint32(&fail, 1)
222 atomic.StoreUint32(&done, 1)
225 log.Printf("FATAL: [%v] Corruption detected: %v", fd, err)
234 func (ts *testingStorage) Remove(fd storage.FileDesc) error {
235 if atomic.LoadUint32(&fail) == 1 {
239 if fd.Type == storage.TypeTable {
240 if ts.scanTable(fd, true) {
244 return ts.Storage.Remove(fd)
247 type latencyStats struct {
249 dur, min, max time.Duration
253 func (s *latencyStats) start() {
257 func (s *latencyStats) record(n int) {
261 dur := time.Now().Sub(s.mark)
262 dur1 := dur / time.Duration(n)
263 if dur1 < s.min || s.min == 0 {
274 func (s *latencyStats) ratePerSec() int {
275 durSec := s.dur / time.Second
277 return s.num / int(durSec)
282 func (s *latencyStats) avg() time.Duration {
284 return s.dur / time.Duration(s.num)
289 func (s *latencyStats) add(x *latencyStats) {
290 if x.min < s.min || s.min == 0 {
303 if enableBufferPool {
304 bpool = util.NewBufferPool(opt.DefaultBlockSize + 128)
307 log.Printf("Test DB stored at %q", dbPath)
309 log.Printf("HTTP pprof listening at %q", httpProf)
310 runtime.SetBlockProfileRate(1)
312 if err := http.ListenAndServe(httpProf, nil); err != nil {
313 log.Fatalf("HTTPPROF: %v", err)
318 runtime.GOMAXPROCS(runtime.NumCPU())
321 stor, err := storage.OpenFile(dbPath, false)
325 tstor := &testingStorage{stor}
328 fatalf := func(err error, format string, v ...interface{}) {
329 atomic.StoreUint32(&fail, 1)
330 atomic.StoreUint32(&done, 1)
331 log.Printf("FATAL: "+format, v...)
332 if err != nil && errors.IsCorrupted(err) {
333 cerr := err.(*errors.ErrCorrupted)
334 if !cerr.Fd.Zero() && cerr.Fd.Type == storage.TypeTable {
335 log.Print("FATAL: corruption detected, scanning...")
336 if !tstor.scanTable(storage.FileDesc{Type: storage.TypeTable, Num: cerr.Fd.Num}, false) {
337 log.Printf("FATAL: unable to find corrupted key/value pair in table %v", cerr.Fd)
344 if openFilesCacheCapacity == 0 {
345 openFilesCacheCapacity = -1
348 OpenFilesCacheCapacity: openFilesCacheCapacity,
349 DisableBufferPool: !enableBufferPool,
350 DisableBlockCache: !enableBlockCache,
352 Compression: opt.NoCompression,
354 if enableCompression {
355 o.Compression = opt.DefaultCompression
358 db, err := leveldb.Open(tstor, o)
366 gGetStat = &latencyStats{}
367 gIterStat = &latencyStats{}
368 gWriteStat = &latencyStats{}
369 gTrasactionStat = &latencyStats{}
370 startTime = time.Now()
372 writeReq = make(chan *leveldb.Batch)
373 writeAck = make(chan error)
374 writeAckAck = make(chan struct{})
378 for b := range writeReq {
381 if mrand.Float64() < transactionProb {
382 log.Print("> Write using transaction")
383 gTrasactionStat.start()
384 var tr *leveldb.Transaction
385 if tr, err = db.OpenTransaction(); err == nil {
386 if err = tr.Write(b, nil); err == nil {
387 if err = tr.Commit(); err == nil {
388 gTrasactionStat.record(b.Len())
396 if err = db.Write(b, nil); err == nil {
397 gWriteStat.record(b.Len())
407 time.Sleep(3 * time.Second)
409 log.Print("------------------------")
411 log.Printf("> Elapsed=%v", time.Now().Sub(startTime))
413 log.Printf("> GetLatencyMin=%v GetLatencyMax=%v GetLatencyAvg=%v GetRatePerSec=%d",
414 gGetStat.min, gGetStat.max, gGetStat.avg(), gGetStat.ratePerSec())
415 log.Printf("> IterLatencyMin=%v IterLatencyMax=%v IterLatencyAvg=%v IterRatePerSec=%d",
416 gIterStat.min, gIterStat.max, gIterStat.avg(), gIterStat.ratePerSec())
417 log.Printf("> WriteLatencyMin=%v WriteLatencyMax=%v WriteLatencyAvg=%v WriteRatePerSec=%d",
418 gWriteStat.min, gWriteStat.max, gWriteStat.avg(), gWriteStat.ratePerSec())
419 log.Printf("> TransactionLatencyMin=%v TransactionLatencyMax=%v TransactionLatencyAvg=%v TransactionRatePerSec=%d",
420 gTrasactionStat.min, gTrasactionStat.max, gTrasactionStat.avg(), gTrasactionStat.ratePerSec())
423 cachedblock, _ := db.GetProperty("leveldb.cachedblock")
424 openedtables, _ := db.GetProperty("leveldb.openedtables")
425 alivesnaps, _ := db.GetProperty("leveldb.alivesnaps")
426 aliveiters, _ := db.GetProperty("leveldb.aliveiters")
427 blockpool, _ := db.GetProperty("leveldb.blockpool")
428 log.Printf("> BlockCache=%s OpenedTables=%s AliveSnaps=%s AliveIter=%s BlockPool=%q",
429 cachedblock, openedtables, alivesnaps, aliveiters, blockpool)
431 log.Print("------------------------")
435 for ns, numKey := range numKeys {
436 func(ns, numKey int) {
437 log.Printf("[%02d] STARTING: numKey=%d", ns, numKey)
439 keys := make([][]byte, numKey)
440 for i := range keys {
441 keys[i] = randomData(nil, byte(ns), 1, uint32(i), keyLen)
448 log.Printf("[%02d] WRITER DONE #%d", ns, wi)
453 b = new(leveldb.Batch)
457 for atomic.LoadUint32(&done) == 0 {
458 log.Printf("[%02d] WRITER #%d", ns, wi)
461 for _, k1 := range keys {
462 k2 = randomData(k2, byte(ns), 2, wi, keyLen)
463 v2 = randomData(v2, byte(ns), 3, wi, valueLen)
468 if err := <-writeAck; err != nil {
469 writeAckAck <- struct{}{}
470 fatalf(err, "[%02d] WRITER #%d db.Write: %v", ns, wi, err)
473 snap, err := db.GetSnapshot()
475 writeAckAck <- struct{}{}
476 fatalf(err, "[%02d] WRITER #%d db.GetSnapshot: %v", ns, wi, err)
479 writeAckAck <- struct{}{}
482 atomic.AddInt32(&nReader, 1)
483 go func(snapwi uint32, snap *leveldb.Snapshot) {
486 iterStat = &latencyStats{}
487 getStat = &latencyStats{}
491 gGetStat.add(getStat)
492 gIterStat.add(iterStat)
495 atomic.AddInt32(&nReader, -1)
496 log.Printf("[%02d] READER #%d.%d DONE Snap=%v Alive=%d IterLatency=%v GetLatency=%v", ns, snapwi, ri, snap, atomic.LoadInt32(&nReader), iterStat.avg(), getStat.avg())
502 for (ri < 3 || atomic.LoadUint32(&wi) < stopi) && atomic.LoadUint32(&done) == 0 {
504 iter := snap.NewIterator(dataPrefixSlice(byte(ns), 1), nil)
511 if dataNS(k2) != byte(ns) {
512 fatalf(nil, "[%02d] READER #%d.%d K%d invalid in-key NS: want=%d got=%d", ns, snapwi, ri, n, ns, dataNS(k2))
516 if kwritei != snapwi {
517 fatalf(nil, "[%02d] READER #%d.%d K%d invalid in-key iter num: %d", ns, snapwi, ri, n, kwritei)
521 v2, err := snap.Get(k2, nil)
523 fatalf(err, "[%02d] READER #%d.%d K%d snap.Get: %v\nk1: %x\n -> k2: %x", ns, snapwi, ri, n, err, k1, k2)
527 if checksum0, checksum1 := dataChecksum(v2); checksum0 != checksum1 {
528 err := &errors.ErrCorrupted{Fd: storage.FileDesc{0xff, 0}, Err: fmt.Errorf("v2: %x: checksum mismatch: %v vs %v", v2, checksum0, checksum1)}
529 fatalf(err, "[%02d] READER #%d.%d K%d snap.Get: %v\nk1: %x\n -> k2: %x", ns, snapwi, ri, n, err, k1, k2)
536 if err := iter.Error(); err != nil {
537 fatalf(err, "[%02d] READER #%d.%d K%d iter.Error: %v", ns, snapwi, ri, numKey, err)
540 fatalf(nil, "[%02d] READER #%d.%d missing keys: want=%d got=%d", ns, snapwi, ri, numKey, n)
547 atomic.AddUint32(&wi, 1)
551 delB := new(leveldb.Batch)
556 iterStat = &latencyStats{}
559 log.Printf("[%02d] SCANNER DONE #%d", ns, i)
563 time.Sleep(2 * time.Second)
565 for atomic.LoadUint32(&done) == 0 {
568 iter := db.NewIterator(dataNsSlice(byte(ns)), nil)
570 for iter.Next() && atomic.LoadUint32(&done) == 0 {
575 for ci, x := range [...][]byte{k, v} {
576 checksum0, checksum1 := dataChecksum(x)
577 if checksum0 != checksum1 {
579 fatalf(nil, "[%02d] SCANNER %d.%d invalid key checksum: want %d, got %d\n%x -> %x", ns, i, n, checksum0, checksum1, k, v)
581 fatalf(nil, "[%02d] SCANNER %d.%d invalid value checksum: want %d, got %d\n%x -> %x", ns, i, n, checksum0, checksum1, k, v)
586 if dataPrefix(k) == 2 || mrand.Int()%999 == 0 {
594 if err := iter.Error(); err != nil {
595 fatalf(err, "[%02d] SCANNER #%d.%d iter.Error: %v", ns, i, n, err)
599 log.Printf("[%02d] SCANNER #%d IterLatency=%v", ns, i, iterStat.avg())
602 if delB.Len() > 0 && atomic.LoadUint32(&done) == 0 {
605 if err := <-writeAck; err != nil {
606 writeAckAck <- struct{}{}
607 fatalf(err, "[%02d] SCANNER #%d db.Write: %v", ns, i, err)
609 writeAckAck <- struct{}{}
611 log.Printf("[%02d] SCANNER #%d Deleted=%d Time=%v", ns, i, delB.Len(), time.Now().Sub(t))
621 sig := make(chan os.Signal)
622 signal.Notify(sig, os.Interrupt, os.Kill)
623 log.Printf("Got signal: %v, exiting...", <-sig)
624 atomic.StoreUint32(&done, 1)