package main import ( "crypto/rand" "encoding/binary" "flag" "fmt" "log" mrand "math/rand" "net/http" _ "net/http/pprof" "os" "os/signal" "path" "runtime" "strconv" "strings" "sync" "sync/atomic" "time" "github.com/syndtr/goleveldb/leveldb" "github.com/syndtr/goleveldb/leveldb/errors" "github.com/syndtr/goleveldb/leveldb/opt" "github.com/syndtr/goleveldb/leveldb/storage" "github.com/syndtr/goleveldb/leveldb/table" "github.com/syndtr/goleveldb/leveldb/util" ) var ( dbPath = path.Join(os.TempDir(), "goleveldb-testdb") openFilesCacheCapacity = 500 keyLen = 63 valueLen = 256 numKeys = arrayInt{100000, 1332, 531, 1234, 9553, 1024, 35743} httpProf = "127.0.0.1:5454" transactionProb = 0.5 enableBlockCache = false enableCompression = false enableBufferPool = false wg = new(sync.WaitGroup) done, fail uint32 bpool *util.BufferPool ) type arrayInt []int func (a arrayInt) String() string { var str string for i, n := range a { if i > 0 { str += "," } str += strconv.Itoa(n) } return str } func (a *arrayInt) Set(str string) error { var na arrayInt for _, s := range strings.Split(str, ",") { s = strings.TrimSpace(s) if s != "" { n, err := strconv.Atoi(s) if err != nil { return err } na = append(na, n) } } *a = na return nil } func init() { flag.StringVar(&dbPath, "db", dbPath, "testdb path") flag.IntVar(&openFilesCacheCapacity, "openfilescachecap", openFilesCacheCapacity, "open files cache capacity") flag.IntVar(&keyLen, "keylen", keyLen, "key length") flag.IntVar(&valueLen, "valuelen", valueLen, "value length") flag.Var(&numKeys, "numkeys", "num keys") flag.StringVar(&httpProf, "httpprof", httpProf, "http pprof listen addr") flag.Float64Var(&transactionProb, "transactionprob", transactionProb, "probablity of writes using transaction") flag.BoolVar(&enableBufferPool, "enablebufferpool", enableBufferPool, "enable buffer pool") flag.BoolVar(&enableBlockCache, "enableblockcache", enableBlockCache, "enable block cache") flag.BoolVar(&enableCompression, "enablecompression", enableCompression, "enable block compression") } func randomData(dst []byte, ns, prefix byte, i uint32, dataLen int) []byte { if dataLen < (2+4+4)*2+4 { panic("dataLen is too small") } if cap(dst) < dataLen { dst = make([]byte, dataLen) } else { dst = dst[:dataLen] } half := (dataLen - 4) / 2 if _, err := rand.Reader.Read(dst[2 : half-8]); err != nil { panic(err) } dst[0] = ns dst[1] = prefix binary.LittleEndian.PutUint32(dst[half-8:], i) binary.LittleEndian.PutUint32(dst[half-8:], i) binary.LittleEndian.PutUint32(dst[half-4:], util.NewCRC(dst[:half-4]).Value()) full := half * 2 copy(dst[half:full], dst[:half]) if full < dataLen-4 { if _, err := rand.Reader.Read(dst[full : dataLen-4]); err != nil { panic(err) } } binary.LittleEndian.PutUint32(dst[dataLen-4:], util.NewCRC(dst[:dataLen-4]).Value()) return dst } func dataSplit(data []byte) (data0, data1 []byte) { n := (len(data) - 4) / 2 return data[:n], data[n : n+n] } func dataNS(data []byte) byte { return data[0] } func dataPrefix(data []byte) byte { return data[1] } func dataI(data []byte) uint32 { return binary.LittleEndian.Uint32(data[(len(data)-4)/2-8:]) } func dataChecksum(data []byte) (uint32, uint32) { checksum0 := binary.LittleEndian.Uint32(data[len(data)-4:]) checksum1 := util.NewCRC(data[:len(data)-4]).Value() return checksum0, checksum1 } func dataPrefixSlice(ns, prefix byte) *util.Range { return util.BytesPrefix([]byte{ns, prefix}) } func dataNsSlice(ns byte) *util.Range { return util.BytesPrefix([]byte{ns}) } type testingStorage struct { storage.Storage } func (ts *testingStorage) scanTable(fd storage.FileDesc, checksum bool) (corrupted bool) { r, err := ts.Open(fd) if err != nil { log.Fatal(err) } defer r.Close() size, err := r.Seek(0, os.SEEK_END) if err != nil { log.Fatal(err) } o := &opt.Options{ DisableLargeBatchTransaction: true, Strict: opt.NoStrict, } if checksum { o.Strict = opt.StrictBlockChecksum | opt.StrictReader } tr, err := table.NewReader(r, size, fd, nil, bpool, o) if err != nil { log.Fatal(err) } defer tr.Release() checkData := func(i int, t string, data []byte) bool { if len(data) == 0 { panic(fmt.Sprintf("[%v] nil data: i=%d t=%s", fd, i, t)) } checksum0, checksum1 := dataChecksum(data) if checksum0 != checksum1 { atomic.StoreUint32(&fail, 1) atomic.StoreUint32(&done, 1) corrupted = true data0, data1 := dataSplit(data) data0c0, data0c1 := dataChecksum(data0) data1c0, data1c1 := dataChecksum(data1) log.Printf("FATAL: [%v] Corrupted data i=%d t=%s (%#x != %#x): %x(%v) vs %x(%v)", fd, i, t, checksum0, checksum1, data0, data0c0 == data0c1, data1, data1c0 == data1c1) return true } return false } iter := tr.NewIterator(nil, nil) defer iter.Release() for i := 0; iter.Next(); i++ { ukey, _, kt, kerr := parseIkey(iter.Key()) if kerr != nil { atomic.StoreUint32(&fail, 1) atomic.StoreUint32(&done, 1) corrupted = true log.Printf("FATAL: [%v] Corrupted ikey i=%d: %v", fd, i, kerr) return } if checkData(i, "key", ukey) { return } if kt == ktVal && checkData(i, "value", iter.Value()) { return } } if err := iter.Error(); err != nil { if errors.IsCorrupted(err) { atomic.StoreUint32(&fail, 1) atomic.StoreUint32(&done, 1) corrupted = true log.Printf("FATAL: [%v] Corruption detected: %v", fd, err) } else { log.Fatal(err) } } return } func (ts *testingStorage) Remove(fd storage.FileDesc) error { if atomic.LoadUint32(&fail) == 1 { return nil } if fd.Type == storage.TypeTable { if ts.scanTable(fd, true) { return nil } } return ts.Storage.Remove(fd) } type latencyStats struct { mark time.Time dur, min, max time.Duration num int } func (s *latencyStats) start() { s.mark = time.Now() } func (s *latencyStats) record(n int) { if s.mark.IsZero() { panic("not started") } dur := time.Now().Sub(s.mark) dur1 := dur / time.Duration(n) if dur1 < s.min || s.min == 0 { s.min = dur1 } if dur1 > s.max { s.max = dur1 } s.dur += dur s.num += n s.mark = time.Time{} } func (s *latencyStats) ratePerSec() int { durSec := s.dur / time.Second if durSec > 0 { return s.num / int(durSec) } return s.num } func (s *latencyStats) avg() time.Duration { if s.num > 0 { return s.dur / time.Duration(s.num) } return 0 } func (s *latencyStats) add(x *latencyStats) { if x.min < s.min || s.min == 0 { s.min = x.min } if x.max > s.max { s.max = x.max } s.dur += x.dur s.num += x.num } func main() { flag.Parse() if enableBufferPool { bpool = util.NewBufferPool(opt.DefaultBlockSize + 128) } log.Printf("Test DB stored at %q", dbPath) if httpProf != "" { log.Printf("HTTP pprof listening at %q", httpProf) runtime.SetBlockProfileRate(1) go func() { if err := http.ListenAndServe(httpProf, nil); err != nil { log.Fatalf("HTTPPROF: %v", err) } }() } runtime.GOMAXPROCS(runtime.NumCPU()) os.RemoveAll(dbPath) stor, err := storage.OpenFile(dbPath, false) if err != nil { log.Fatal(err) } tstor := &testingStorage{stor} defer tstor.Close() fatalf := func(err error, format string, v ...interface{}) { atomic.StoreUint32(&fail, 1) atomic.StoreUint32(&done, 1) log.Printf("FATAL: "+format, v...) if err != nil && errors.IsCorrupted(err) { cerr := err.(*errors.ErrCorrupted) if !cerr.Fd.Zero() && cerr.Fd.Type == storage.TypeTable { log.Print("FATAL: corruption detected, scanning...") if !tstor.scanTable(storage.FileDesc{Type: storage.TypeTable, Num: cerr.Fd.Num}, false) { log.Printf("FATAL: unable to find corrupted key/value pair in table %v", cerr.Fd) } } } runtime.Goexit() } if openFilesCacheCapacity == 0 { openFilesCacheCapacity = -1 } o := &opt.Options{ OpenFilesCacheCapacity: openFilesCacheCapacity, DisableBufferPool: !enableBufferPool, DisableBlockCache: !enableBlockCache, ErrorIfExist: true, Compression: opt.NoCompression, } if enableCompression { o.Compression = opt.DefaultCompression } db, err := leveldb.Open(tstor, o) if err != nil { log.Fatal(err) } defer db.Close() var ( mu = &sync.Mutex{} gGetStat = &latencyStats{} gIterStat = &latencyStats{} gWriteStat = &latencyStats{} gTrasactionStat = &latencyStats{} startTime = time.Now() writeReq = make(chan *leveldb.Batch) writeAck = make(chan error) writeAckAck = make(chan struct{}) ) go func() { for b := range writeReq { var err error if mrand.Float64() < transactionProb { log.Print("> Write using transaction") gTrasactionStat.start() var tr *leveldb.Transaction if tr, err = db.OpenTransaction(); err == nil { if err = tr.Write(b, nil); err == nil { if err = tr.Commit(); err == nil { gTrasactionStat.record(b.Len()) } } else { tr.Discard() } } } else { gWriteStat.start() if err = db.Write(b, nil); err == nil { gWriteStat.record(b.Len()) } } writeAck <- err <-writeAckAck } }() go func() { for { time.Sleep(3 * time.Second) log.Print("------------------------") log.Printf("> Elapsed=%v", time.Now().Sub(startTime)) mu.Lock() log.Printf("> GetLatencyMin=%v GetLatencyMax=%v GetLatencyAvg=%v GetRatePerSec=%d", gGetStat.min, gGetStat.max, gGetStat.avg(), gGetStat.ratePerSec()) log.Printf("> IterLatencyMin=%v IterLatencyMax=%v IterLatencyAvg=%v IterRatePerSec=%d", gIterStat.min, gIterStat.max, gIterStat.avg(), gIterStat.ratePerSec()) log.Printf("> WriteLatencyMin=%v WriteLatencyMax=%v WriteLatencyAvg=%v WriteRatePerSec=%d", gWriteStat.min, gWriteStat.max, gWriteStat.avg(), gWriteStat.ratePerSec()) log.Printf("> TransactionLatencyMin=%v TransactionLatencyMax=%v TransactionLatencyAvg=%v TransactionRatePerSec=%d", gTrasactionStat.min, gTrasactionStat.max, gTrasactionStat.avg(), gTrasactionStat.ratePerSec()) mu.Unlock() cachedblock, _ := db.GetProperty("leveldb.cachedblock") openedtables, _ := db.GetProperty("leveldb.openedtables") alivesnaps, _ := db.GetProperty("leveldb.alivesnaps") aliveiters, _ := db.GetProperty("leveldb.aliveiters") blockpool, _ := db.GetProperty("leveldb.blockpool") log.Printf("> BlockCache=%s OpenedTables=%s AliveSnaps=%s AliveIter=%s BlockPool=%q", cachedblock, openedtables, alivesnaps, aliveiters, blockpool) log.Print("------------------------") } }() for ns, numKey := range numKeys { func(ns, numKey int) { log.Printf("[%02d] STARTING: numKey=%d", ns, numKey) keys := make([][]byte, numKey) for i := range keys { keys[i] = randomData(nil, byte(ns), 1, uint32(i), keyLen) } wg.Add(1) go func() { var wi uint32 defer func() { log.Printf("[%02d] WRITER DONE #%d", ns, wi) wg.Done() }() var ( b = new(leveldb.Batch) k2, v2 []byte nReader int32 ) for atomic.LoadUint32(&done) == 0 { log.Printf("[%02d] WRITER #%d", ns, wi) b.Reset() for _, k1 := range keys { k2 = randomData(k2, byte(ns), 2, wi, keyLen) v2 = randomData(v2, byte(ns), 3, wi, valueLen) b.Put(k2, v2) b.Put(k1, k2) } writeReq <- b if err := <-writeAck; err != nil { writeAckAck <- struct{}{} fatalf(err, "[%02d] WRITER #%d db.Write: %v", ns, wi, err) } snap, err := db.GetSnapshot() if err != nil { writeAckAck <- struct{}{} fatalf(err, "[%02d] WRITER #%d db.GetSnapshot: %v", ns, wi, err) } writeAckAck <- struct{}{} wg.Add(1) atomic.AddInt32(&nReader, 1) go func(snapwi uint32, snap *leveldb.Snapshot) { var ( ri int iterStat = &latencyStats{} getStat = &latencyStats{} ) defer func() { mu.Lock() gGetStat.add(getStat) gIterStat.add(iterStat) mu.Unlock() atomic.AddInt32(&nReader, -1) log.Printf("[%02d] READER #%d.%d DONE Snap=%v Alive=%d IterLatency=%v GetLatency=%v", ns, snapwi, ri, snap, atomic.LoadInt32(&nReader), iterStat.avg(), getStat.avg()) snap.Release() wg.Done() }() stopi := snapwi + 3 for (ri < 3 || atomic.LoadUint32(&wi) < stopi) && atomic.LoadUint32(&done) == 0 { var n int iter := snap.NewIterator(dataPrefixSlice(byte(ns), 1), nil) iterStat.start() for iter.Next() { k1 := iter.Key() k2 := iter.Value() iterStat.record(1) if dataNS(k2) != byte(ns) { fatalf(nil, "[%02d] READER #%d.%d K%d invalid in-key NS: want=%d got=%d", ns, snapwi, ri, n, ns, dataNS(k2)) } kwritei := dataI(k2) if kwritei != snapwi { fatalf(nil, "[%02d] READER #%d.%d K%d invalid in-key iter num: %d", ns, snapwi, ri, n, kwritei) } getStat.start() v2, err := snap.Get(k2, nil) if err != nil { fatalf(err, "[%02d] READER #%d.%d K%d snap.Get: %v\nk1: %x\n -> k2: %x", ns, snapwi, ri, n, err, k1, k2) } getStat.record(1) if checksum0, checksum1 := dataChecksum(v2); checksum0 != checksum1 { err := &errors.ErrCorrupted{Fd: storage.FileDesc{0xff, 0}, Err: fmt.Errorf("v2: %x: checksum mismatch: %v vs %v", v2, checksum0, checksum1)} fatalf(err, "[%02d] READER #%d.%d K%d snap.Get: %v\nk1: %x\n -> k2: %x", ns, snapwi, ri, n, err, k1, k2) } n++ iterStat.start() } iter.Release() if err := iter.Error(); err != nil { fatalf(err, "[%02d] READER #%d.%d K%d iter.Error: %v", ns, snapwi, ri, numKey, err) } if n != numKey { fatalf(nil, "[%02d] READER #%d.%d missing keys: want=%d got=%d", ns, snapwi, ri, numKey, n) } ri++ } }(wi, snap) atomic.AddUint32(&wi, 1) } }() delB := new(leveldb.Batch) wg.Add(1) go func() { var ( i int iterStat = &latencyStats{} ) defer func() { log.Printf("[%02d] SCANNER DONE #%d", ns, i) wg.Done() }() time.Sleep(2 * time.Second) for atomic.LoadUint32(&done) == 0 { var n int delB.Reset() iter := db.NewIterator(dataNsSlice(byte(ns)), nil) iterStat.start() for iter.Next() && atomic.LoadUint32(&done) == 0 { k := iter.Key() v := iter.Value() iterStat.record(1) for ci, x := range [...][]byte{k, v} { checksum0, checksum1 := dataChecksum(x) if checksum0 != checksum1 { if ci == 0 { fatalf(nil, "[%02d] SCANNER %d.%d invalid key checksum: want %d, got %d\n%x -> %x", ns, i, n, checksum0, checksum1, k, v) } else { fatalf(nil, "[%02d] SCANNER %d.%d invalid value checksum: want %d, got %d\n%x -> %x", ns, i, n, checksum0, checksum1, k, v) } } } if dataPrefix(k) == 2 || mrand.Int()%999 == 0 { delB.Delete(k) } n++ iterStat.start() } iter.Release() if err := iter.Error(); err != nil { fatalf(err, "[%02d] SCANNER #%d.%d iter.Error: %v", ns, i, n, err) } if n > 0 { log.Printf("[%02d] SCANNER #%d IterLatency=%v", ns, i, iterStat.avg()) } if delB.Len() > 0 && atomic.LoadUint32(&done) == 0 { t := time.Now() writeReq <- delB if err := <-writeAck; err != nil { writeAckAck <- struct{}{} fatalf(err, "[%02d] SCANNER #%d db.Write: %v", ns, i, err) } else { writeAckAck <- struct{}{} } log.Printf("[%02d] SCANNER #%d Deleted=%d Time=%v", ns, i, delB.Len(), time.Now().Sub(t)) } i++ } }() }(ns, numKey) } go func() { sig := make(chan os.Signal) signal.Notify(sig, os.Interrupt, os.Kill) log.Printf("Got signal: %v, exiting...", <-sig) atomic.StoreUint32(&done, 1) }() wg.Wait() }