OSDN Git Service

Hulk did something
[bytom/vapor.git] / vendor / github.com / syndtr / goleveldb / manualtest / dbstress / main.go
1 package main
2
3 import (
4         "crypto/rand"
5         "encoding/binary"
6         "flag"
7         "fmt"
8         "log"
9         mrand "math/rand"
10         "net/http"
11         _ "net/http/pprof"
12         "os"
13         "os/signal"
14         "path"
15         "runtime"
16         "strconv"
17         "strings"
18         "sync"
19         "sync/atomic"
20         "time"
21
22         "github.com/syndtr/goleveldb/leveldb"
23         "github.com/syndtr/goleveldb/leveldb/errors"
24         "github.com/syndtr/goleveldb/leveldb/opt"
25         "github.com/syndtr/goleveldb/leveldb/storage"
26         "github.com/syndtr/goleveldb/leveldb/table"
27         "github.com/syndtr/goleveldb/leveldb/util"
28 )
29
30 var (
31         dbPath                 = path.Join(os.TempDir(), "goleveldb-testdb")
32         openFilesCacheCapacity = 500
33         keyLen                 = 63
34         valueLen               = 256
35         numKeys                = arrayInt{100000, 1332, 531, 1234, 9553, 1024, 35743}
36         httpProf               = "127.0.0.1:5454"
37         transactionProb        = 0.5
38         enableBlockCache       = false
39         enableCompression      = false
40         enableBufferPool       = false
41
42         wg         = new(sync.WaitGroup)
43         done, fail uint32
44
45         bpool *util.BufferPool
46 )
47
48 type arrayInt []int
49
50 func (a arrayInt) String() string {
51         var str string
52         for i, n := range a {
53                 if i > 0 {
54                         str += ","
55                 }
56                 str += strconv.Itoa(n)
57         }
58         return str
59 }
60
61 func (a *arrayInt) Set(str string) error {
62         var na arrayInt
63         for _, s := range strings.Split(str, ",") {
64                 s = strings.TrimSpace(s)
65                 if s != "" {
66                         n, err := strconv.Atoi(s)
67                         if err != nil {
68                                 return err
69                         }
70                         na = append(na, n)
71                 }
72         }
73         *a = na
74         return nil
75 }
76
77 func init() {
78         flag.StringVar(&dbPath, "db", dbPath, "testdb path")
79         flag.IntVar(&openFilesCacheCapacity, "openfilescachecap", openFilesCacheCapacity, "open files cache capacity")
80         flag.IntVar(&keyLen, "keylen", keyLen, "key length")
81         flag.IntVar(&valueLen, "valuelen", valueLen, "value length")
82         flag.Var(&numKeys, "numkeys", "num keys")
83         flag.StringVar(&httpProf, "httpprof", httpProf, "http pprof listen addr")
84         flag.Float64Var(&transactionProb, "transactionprob", transactionProb, "probablity of writes using transaction")
85         flag.BoolVar(&enableBufferPool, "enablebufferpool", enableBufferPool, "enable buffer pool")
86         flag.BoolVar(&enableBlockCache, "enableblockcache", enableBlockCache, "enable block cache")
87         flag.BoolVar(&enableCompression, "enablecompression", enableCompression, "enable block compression")
88 }
89
90 func randomData(dst []byte, ns, prefix byte, i uint32, dataLen int) []byte {
91         if dataLen < (2+4+4)*2+4 {
92                 panic("dataLen is too small")
93         }
94         if cap(dst) < dataLen {
95                 dst = make([]byte, dataLen)
96         } else {
97                 dst = dst[:dataLen]
98         }
99         half := (dataLen - 4) / 2
100         if _, err := rand.Reader.Read(dst[2 : half-8]); err != nil {
101                 panic(err)
102         }
103         dst[0] = ns
104         dst[1] = prefix
105         binary.LittleEndian.PutUint32(dst[half-8:], i)
106         binary.LittleEndian.PutUint32(dst[half-8:], i)
107         binary.LittleEndian.PutUint32(dst[half-4:], util.NewCRC(dst[:half-4]).Value())
108         full := half * 2
109         copy(dst[half:full], dst[:half])
110         if full < dataLen-4 {
111                 if _, err := rand.Reader.Read(dst[full : dataLen-4]); err != nil {
112                         panic(err)
113                 }
114         }
115         binary.LittleEndian.PutUint32(dst[dataLen-4:], util.NewCRC(dst[:dataLen-4]).Value())
116         return dst
117 }
118
119 func dataSplit(data []byte) (data0, data1 []byte) {
120         n := (len(data) - 4) / 2
121         return data[:n], data[n : n+n]
122 }
123
124 func dataNS(data []byte) byte {
125         return data[0]
126 }
127
128 func dataPrefix(data []byte) byte {
129         return data[1]
130 }
131
132 func dataI(data []byte) uint32 {
133         return binary.LittleEndian.Uint32(data[(len(data)-4)/2-8:])
134 }
135
136 func dataChecksum(data []byte) (uint32, uint32) {
137         checksum0 := binary.LittleEndian.Uint32(data[len(data)-4:])
138         checksum1 := util.NewCRC(data[:len(data)-4]).Value()
139         return checksum0, checksum1
140 }
141
142 func dataPrefixSlice(ns, prefix byte) *util.Range {
143         return util.BytesPrefix([]byte{ns, prefix})
144 }
145
146 func dataNsSlice(ns byte) *util.Range {
147         return util.BytesPrefix([]byte{ns})
148 }
149
150 type testingStorage struct {
151         storage.Storage
152 }
153
154 func (ts *testingStorage) scanTable(fd storage.FileDesc, checksum bool) (corrupted bool) {
155         r, err := ts.Open(fd)
156         if err != nil {
157                 log.Fatal(err)
158         }
159         defer r.Close()
160
161         size, err := r.Seek(0, os.SEEK_END)
162         if err != nil {
163                 log.Fatal(err)
164         }
165
166         o := &opt.Options{
167                 DisableLargeBatchTransaction: true,
168                 Strict: opt.NoStrict,
169         }
170         if checksum {
171                 o.Strict = opt.StrictBlockChecksum | opt.StrictReader
172         }
173         tr, err := table.NewReader(r, size, fd, nil, bpool, o)
174         if err != nil {
175                 log.Fatal(err)
176         }
177         defer tr.Release()
178
179         checkData := func(i int, t string, data []byte) bool {
180                 if len(data) == 0 {
181                         panic(fmt.Sprintf("[%v] nil data: i=%d t=%s", fd, i, t))
182                 }
183
184                 checksum0, checksum1 := dataChecksum(data)
185                 if checksum0 != checksum1 {
186                         atomic.StoreUint32(&fail, 1)
187                         atomic.StoreUint32(&done, 1)
188                         corrupted = true
189
190                         data0, data1 := dataSplit(data)
191                         data0c0, data0c1 := dataChecksum(data0)
192                         data1c0, data1c1 := dataChecksum(data1)
193                         log.Printf("FATAL: [%v] Corrupted data i=%d t=%s (%#x != %#x): %x(%v) vs %x(%v)",
194                                 fd, i, t, checksum0, checksum1, data0, data0c0 == data0c1, data1, data1c0 == data1c1)
195                         return true
196                 }
197                 return false
198         }
199
200         iter := tr.NewIterator(nil, nil)
201         defer iter.Release()
202         for i := 0; iter.Next(); i++ {
203                 ukey, _, kt, kerr := parseIkey(iter.Key())
204                 if kerr != nil {
205                         atomic.StoreUint32(&fail, 1)
206                         atomic.StoreUint32(&done, 1)
207                         corrupted = true
208
209                         log.Printf("FATAL: [%v] Corrupted ikey i=%d: %v", fd, i, kerr)
210                         return
211                 }
212                 if checkData(i, "key", ukey) {
213                         return
214                 }
215                 if kt == ktVal && checkData(i, "value", iter.Value()) {
216                         return
217                 }
218         }
219         if err := iter.Error(); err != nil {
220                 if errors.IsCorrupted(err) {
221                         atomic.StoreUint32(&fail, 1)
222                         atomic.StoreUint32(&done, 1)
223                         corrupted = true
224
225                         log.Printf("FATAL: [%v] Corruption detected: %v", fd, err)
226                 } else {
227                         log.Fatal(err)
228                 }
229         }
230
231         return
232 }
233
234 func (ts *testingStorage) Remove(fd storage.FileDesc) error {
235         if atomic.LoadUint32(&fail) == 1 {
236                 return nil
237         }
238
239         if fd.Type == storage.TypeTable {
240                 if ts.scanTable(fd, true) {
241                         return nil
242                 }
243         }
244         return ts.Storage.Remove(fd)
245 }
246
247 type latencyStats struct {
248         mark          time.Time
249         dur, min, max time.Duration
250         num           int
251 }
252
253 func (s *latencyStats) start() {
254         s.mark = time.Now()
255 }
256
257 func (s *latencyStats) record(n int) {
258         if s.mark.IsZero() {
259                 panic("not started")
260         }
261         dur := time.Now().Sub(s.mark)
262         dur1 := dur / time.Duration(n)
263         if dur1 < s.min || s.min == 0 {
264                 s.min = dur1
265         }
266         if dur1 > s.max {
267                 s.max = dur1
268         }
269         s.dur += dur
270         s.num += n
271         s.mark = time.Time{}
272 }
273
274 func (s *latencyStats) ratePerSec() int {
275         durSec := s.dur / time.Second
276         if durSec > 0 {
277                 return s.num / int(durSec)
278         }
279         return s.num
280 }
281
282 func (s *latencyStats) avg() time.Duration {
283         if s.num > 0 {
284                 return s.dur / time.Duration(s.num)
285         }
286         return 0
287 }
288
289 func (s *latencyStats) add(x *latencyStats) {
290         if x.min < s.min || s.min == 0 {
291                 s.min = x.min
292         }
293         if x.max > s.max {
294                 s.max = x.max
295         }
296         s.dur += x.dur
297         s.num += x.num
298 }
299
300 func main() {
301         flag.Parse()
302
303         if enableBufferPool {
304                 bpool = util.NewBufferPool(opt.DefaultBlockSize + 128)
305         }
306
307         log.Printf("Test DB stored at %q", dbPath)
308         if httpProf != "" {
309                 log.Printf("HTTP pprof listening at %q", httpProf)
310                 runtime.SetBlockProfileRate(1)
311                 go func() {
312                         if err := http.ListenAndServe(httpProf, nil); err != nil {
313                                 log.Fatalf("HTTPPROF: %v", err)
314                         }
315                 }()
316         }
317
318         runtime.GOMAXPROCS(runtime.NumCPU())
319
320         os.RemoveAll(dbPath)
321         stor, err := storage.OpenFile(dbPath, false)
322         if err != nil {
323                 log.Fatal(err)
324         }
325         tstor := &testingStorage{stor}
326         defer tstor.Close()
327
328         fatalf := func(err error, format string, v ...interface{}) {
329                 atomic.StoreUint32(&fail, 1)
330                 atomic.StoreUint32(&done, 1)
331                 log.Printf("FATAL: "+format, v...)
332                 if err != nil && errors.IsCorrupted(err) {
333                         cerr := err.(*errors.ErrCorrupted)
334                         if !cerr.Fd.Zero() && cerr.Fd.Type == storage.TypeTable {
335                                 log.Print("FATAL: corruption detected, scanning...")
336                                 if !tstor.scanTable(storage.FileDesc{Type: storage.TypeTable, Num: cerr.Fd.Num}, false) {
337                                         log.Printf("FATAL: unable to find corrupted key/value pair in table %v", cerr.Fd)
338                                 }
339                         }
340                 }
341                 runtime.Goexit()
342         }
343
344         if openFilesCacheCapacity == 0 {
345                 openFilesCacheCapacity = -1
346         }
347         o := &opt.Options{
348                 OpenFilesCacheCapacity: openFilesCacheCapacity,
349                 DisableBufferPool:      !enableBufferPool,
350                 DisableBlockCache:      !enableBlockCache,
351                 ErrorIfExist:           true,
352                 Compression:            opt.NoCompression,
353         }
354         if enableCompression {
355                 o.Compression = opt.DefaultCompression
356         }
357
358         db, err := leveldb.Open(tstor, o)
359         if err != nil {
360                 log.Fatal(err)
361         }
362         defer db.Close()
363
364         var (
365                 mu              = &sync.Mutex{}
366                 gGetStat        = &latencyStats{}
367                 gIterStat       = &latencyStats{}
368                 gWriteStat      = &latencyStats{}
369                 gTrasactionStat = &latencyStats{}
370                 startTime       = time.Now()
371
372                 writeReq    = make(chan *leveldb.Batch)
373                 writeAck    = make(chan error)
374                 writeAckAck = make(chan struct{})
375         )
376
377         go func() {
378                 for b := range writeReq {
379
380                         var err error
381                         if mrand.Float64() < transactionProb {
382                                 log.Print("> Write using transaction")
383                                 gTrasactionStat.start()
384                                 var tr *leveldb.Transaction
385                                 if tr, err = db.OpenTransaction(); err == nil {
386                                         if err = tr.Write(b, nil); err == nil {
387                                                 if err = tr.Commit(); err == nil {
388                                                         gTrasactionStat.record(b.Len())
389                                                 }
390                                         } else {
391                                                 tr.Discard()
392                                         }
393                                 }
394                         } else {
395                                 gWriteStat.start()
396                                 if err = db.Write(b, nil); err == nil {
397                                         gWriteStat.record(b.Len())
398                                 }
399                         }
400                         writeAck <- err
401                         <-writeAckAck
402                 }
403         }()
404
405         go func() {
406                 for {
407                         time.Sleep(3 * time.Second)
408
409                         log.Print("------------------------")
410
411                         log.Printf("> Elapsed=%v", time.Now().Sub(startTime))
412                         mu.Lock()
413                         log.Printf("> GetLatencyMin=%v GetLatencyMax=%v GetLatencyAvg=%v GetRatePerSec=%d",
414                                 gGetStat.min, gGetStat.max, gGetStat.avg(), gGetStat.ratePerSec())
415                         log.Printf("> IterLatencyMin=%v IterLatencyMax=%v IterLatencyAvg=%v IterRatePerSec=%d",
416                                 gIterStat.min, gIterStat.max, gIterStat.avg(), gIterStat.ratePerSec())
417                         log.Printf("> WriteLatencyMin=%v WriteLatencyMax=%v WriteLatencyAvg=%v WriteRatePerSec=%d",
418                                 gWriteStat.min, gWriteStat.max, gWriteStat.avg(), gWriteStat.ratePerSec())
419                         log.Printf("> TransactionLatencyMin=%v TransactionLatencyMax=%v TransactionLatencyAvg=%v TransactionRatePerSec=%d",
420                                 gTrasactionStat.min, gTrasactionStat.max, gTrasactionStat.avg(), gTrasactionStat.ratePerSec())
421                         mu.Unlock()
422
423                         cachedblock, _ := db.GetProperty("leveldb.cachedblock")
424                         openedtables, _ := db.GetProperty("leveldb.openedtables")
425                         alivesnaps, _ := db.GetProperty("leveldb.alivesnaps")
426                         aliveiters, _ := db.GetProperty("leveldb.aliveiters")
427                         blockpool, _ := db.GetProperty("leveldb.blockpool")
428                         log.Printf("> BlockCache=%s OpenedTables=%s AliveSnaps=%s AliveIter=%s BlockPool=%q",
429                                 cachedblock, openedtables, alivesnaps, aliveiters, blockpool)
430
431                         log.Print("------------------------")
432                 }
433         }()
434
435         for ns, numKey := range numKeys {
436                 func(ns, numKey int) {
437                         log.Printf("[%02d] STARTING: numKey=%d", ns, numKey)
438
439                         keys := make([][]byte, numKey)
440                         for i := range keys {
441                                 keys[i] = randomData(nil, byte(ns), 1, uint32(i), keyLen)
442                         }
443
444                         wg.Add(1)
445                         go func() {
446                                 var wi uint32
447                                 defer func() {
448                                         log.Printf("[%02d] WRITER DONE #%d", ns, wi)
449                                         wg.Done()
450                                 }()
451
452                                 var (
453                                         b       = new(leveldb.Batch)
454                                         k2, v2  []byte
455                                         nReader int32
456                                 )
457                                 for atomic.LoadUint32(&done) == 0 {
458                                         log.Printf("[%02d] WRITER #%d", ns, wi)
459
460                                         b.Reset()
461                                         for _, k1 := range keys {
462                                                 k2 = randomData(k2, byte(ns), 2, wi, keyLen)
463                                                 v2 = randomData(v2, byte(ns), 3, wi, valueLen)
464                                                 b.Put(k2, v2)
465                                                 b.Put(k1, k2)
466                                         }
467                                         writeReq <- b
468                                         if err := <-writeAck; err != nil {
469                                                 writeAckAck <- struct{}{}
470                                                 fatalf(err, "[%02d] WRITER #%d db.Write: %v", ns, wi, err)
471                                         }
472
473                                         snap, err := db.GetSnapshot()
474                                         if err != nil {
475                                                 writeAckAck <- struct{}{}
476                                                 fatalf(err, "[%02d] WRITER #%d db.GetSnapshot: %v", ns, wi, err)
477                                         }
478
479                                         writeAckAck <- struct{}{}
480
481                                         wg.Add(1)
482                                         atomic.AddInt32(&nReader, 1)
483                                         go func(snapwi uint32, snap *leveldb.Snapshot) {
484                                                 var (
485                                                         ri       int
486                                                         iterStat = &latencyStats{}
487                                                         getStat  = &latencyStats{}
488                                                 )
489                                                 defer func() {
490                                                         mu.Lock()
491                                                         gGetStat.add(getStat)
492                                                         gIterStat.add(iterStat)
493                                                         mu.Unlock()
494
495                                                         atomic.AddInt32(&nReader, -1)
496                                                         log.Printf("[%02d] READER #%d.%d DONE Snap=%v Alive=%d IterLatency=%v GetLatency=%v", ns, snapwi, ri, snap, atomic.LoadInt32(&nReader), iterStat.avg(), getStat.avg())
497                                                         snap.Release()
498                                                         wg.Done()
499                                                 }()
500
501                                                 stopi := snapwi + 3
502                                                 for (ri < 3 || atomic.LoadUint32(&wi) < stopi) && atomic.LoadUint32(&done) == 0 {
503                                                         var n int
504                                                         iter := snap.NewIterator(dataPrefixSlice(byte(ns), 1), nil)
505                                                         iterStat.start()
506                                                         for iter.Next() {
507                                                                 k1 := iter.Key()
508                                                                 k2 := iter.Value()
509                                                                 iterStat.record(1)
510
511                                                                 if dataNS(k2) != byte(ns) {
512                                                                         fatalf(nil, "[%02d] READER #%d.%d K%d invalid in-key NS: want=%d got=%d", ns, snapwi, ri, n, ns, dataNS(k2))
513                                                                 }
514
515                                                                 kwritei := dataI(k2)
516                                                                 if kwritei != snapwi {
517                                                                         fatalf(nil, "[%02d] READER #%d.%d K%d invalid in-key iter num: %d", ns, snapwi, ri, n, kwritei)
518                                                                 }
519
520                                                                 getStat.start()
521                                                                 v2, err := snap.Get(k2, nil)
522                                                                 if err != nil {
523                                                                         fatalf(err, "[%02d] READER #%d.%d K%d snap.Get: %v\nk1: %x\n -> k2: %x", ns, snapwi, ri, n, err, k1, k2)
524                                                                 }
525                                                                 getStat.record(1)
526
527                                                                 if checksum0, checksum1 := dataChecksum(v2); checksum0 != checksum1 {
528                                                                         err := &errors.ErrCorrupted{Fd: storage.FileDesc{0xff, 0}, Err: fmt.Errorf("v2: %x: checksum mismatch: %v vs %v", v2, checksum0, checksum1)}
529                                                                         fatalf(err, "[%02d] READER #%d.%d K%d snap.Get: %v\nk1: %x\n -> k2: %x", ns, snapwi, ri, n, err, k1, k2)
530                                                                 }
531
532                                                                 n++
533                                                                 iterStat.start()
534                                                         }
535                                                         iter.Release()
536                                                         if err := iter.Error(); err != nil {
537                                                                 fatalf(err, "[%02d] READER #%d.%d K%d iter.Error: %v", ns, snapwi, ri, numKey, err)
538                                                         }
539                                                         if n != numKey {
540                                                                 fatalf(nil, "[%02d] READER #%d.%d missing keys: want=%d got=%d", ns, snapwi, ri, numKey, n)
541                                                         }
542
543                                                         ri++
544                                                 }
545                                         }(wi, snap)
546
547                                         atomic.AddUint32(&wi, 1)
548                                 }
549                         }()
550
551                         delB := new(leveldb.Batch)
552                         wg.Add(1)
553                         go func() {
554                                 var (
555                                         i        int
556                                         iterStat = &latencyStats{}
557                                 )
558                                 defer func() {
559                                         log.Printf("[%02d] SCANNER DONE #%d", ns, i)
560                                         wg.Done()
561                                 }()
562
563                                 time.Sleep(2 * time.Second)
564
565                                 for atomic.LoadUint32(&done) == 0 {
566                                         var n int
567                                         delB.Reset()
568                                         iter := db.NewIterator(dataNsSlice(byte(ns)), nil)
569                                         iterStat.start()
570                                         for iter.Next() && atomic.LoadUint32(&done) == 0 {
571                                                 k := iter.Key()
572                                                 v := iter.Value()
573                                                 iterStat.record(1)
574
575                                                 for ci, x := range [...][]byte{k, v} {
576                                                         checksum0, checksum1 := dataChecksum(x)
577                                                         if checksum0 != checksum1 {
578                                                                 if ci == 0 {
579                                                                         fatalf(nil, "[%02d] SCANNER %d.%d invalid key checksum: want %d, got %d\n%x -> %x", ns, i, n, checksum0, checksum1, k, v)
580                                                                 } else {
581                                                                         fatalf(nil, "[%02d] SCANNER %d.%d invalid value checksum: want %d, got %d\n%x -> %x", ns, i, n, checksum0, checksum1, k, v)
582                                                                 }
583                                                         }
584                                                 }
585
586                                                 if dataPrefix(k) == 2 || mrand.Int()%999 == 0 {
587                                                         delB.Delete(k)
588                                                 }
589
590                                                 n++
591                                                 iterStat.start()
592                                         }
593                                         iter.Release()
594                                         if err := iter.Error(); err != nil {
595                                                 fatalf(err, "[%02d] SCANNER #%d.%d iter.Error: %v", ns, i, n, err)
596                                         }
597
598                                         if n > 0 {
599                                                 log.Printf("[%02d] SCANNER #%d IterLatency=%v", ns, i, iterStat.avg())
600                                         }
601
602                                         if delB.Len() > 0 && atomic.LoadUint32(&done) == 0 {
603                                                 t := time.Now()
604                                                 writeReq <- delB
605                                                 if err := <-writeAck; err != nil {
606                                                         writeAckAck <- struct{}{}
607                                                         fatalf(err, "[%02d] SCANNER #%d db.Write: %v", ns, i, err)
608                                                 } else {
609                                                         writeAckAck <- struct{}{}
610                                                 }
611                                                 log.Printf("[%02d] SCANNER #%d Deleted=%d Time=%v", ns, i, delB.Len(), time.Now().Sub(t))
612                                         }
613
614                                         i++
615                                 }
616                         }()
617                 }(ns, numKey)
618         }
619
620         go func() {
621                 sig := make(chan os.Signal)
622                 signal.Notify(sig, os.Interrupt, os.Kill)
623                 log.Printf("Got signal: %v, exiting...", <-sig)
624                 atomic.StoreUint32(&done, 1)
625         }()
626
627         wg.Wait()
628 }