OSDN Git Service

new repo
[bytom/vapor.git] / vendor / github.com / syndtr / goleveldb / leveldb / db_compaction.go
1 // Copyright (c) 2012, Suryandaru Triandana <syndtr@gmail.com>
2 // All rights reserved.
3 //
4 // Use of this source code is governed by a BSD-style license that can be
5 // found in the LICENSE file.
6
7 package leveldb
8
9 import (
10         "sync"
11         "time"
12
13         "github.com/syndtr/goleveldb/leveldb/errors"
14         "github.com/syndtr/goleveldb/leveldb/opt"
15         "github.com/syndtr/goleveldb/leveldb/storage"
16 )
17
18 var (
19         errCompactionTransactExiting = errors.New("leveldb: compaction transact exiting")
20 )
21
22 type cStat struct {
23         duration time.Duration
24         read     int64
25         write    int64
26 }
27
28 func (p *cStat) add(n *cStatStaging) {
29         p.duration += n.duration
30         p.read += n.read
31         p.write += n.write
32 }
33
34 func (p *cStat) get() (duration time.Duration, read, write int64) {
35         return p.duration, p.read, p.write
36 }
37
38 type cStatStaging struct {
39         start    time.Time
40         duration time.Duration
41         on       bool
42         read     int64
43         write    int64
44 }
45
46 func (p *cStatStaging) startTimer() {
47         if !p.on {
48                 p.start = time.Now()
49                 p.on = true
50         }
51 }
52
53 func (p *cStatStaging) stopTimer() {
54         if p.on {
55                 p.duration += time.Since(p.start)
56                 p.on = false
57         }
58 }
59
60 type cStats struct {
61         lk    sync.Mutex
62         stats []cStat
63 }
64
65 func (p *cStats) addStat(level int, n *cStatStaging) {
66         p.lk.Lock()
67         if level >= len(p.stats) {
68                 newStats := make([]cStat, level+1)
69                 copy(newStats, p.stats)
70                 p.stats = newStats
71         }
72         p.stats[level].add(n)
73         p.lk.Unlock()
74 }
75
76 func (p *cStats) getStat(level int) (duration time.Duration, read, write int64) {
77         p.lk.Lock()
78         defer p.lk.Unlock()
79         if level < len(p.stats) {
80                 return p.stats[level].get()
81         }
82         return
83 }
84
85 func (db *DB) compactionError() {
86         var err error
87 noerr:
88         // No error.
89         for {
90                 select {
91                 case err = <-db.compErrSetC:
92                         switch {
93                         case err == nil:
94                         case err == ErrReadOnly, errors.IsCorrupted(err):
95                                 goto hasperr
96                         default:
97                                 goto haserr
98                         }
99                 case <-db.closeC:
100                         return
101                 }
102         }
103 haserr:
104         // Transient error.
105         for {
106                 select {
107                 case db.compErrC <- err:
108                 case err = <-db.compErrSetC:
109                         switch {
110                         case err == nil:
111                                 goto noerr
112                         case err == ErrReadOnly, errors.IsCorrupted(err):
113                                 goto hasperr
114                         default:
115                         }
116                 case <-db.closeC:
117                         return
118                 }
119         }
120 hasperr:
121         // Persistent error.
122         for {
123                 select {
124                 case db.compErrC <- err:
125                 case db.compPerErrC <- err:
126                 case db.writeLockC <- struct{}{}:
127                         // Hold write lock, so that write won't pass-through.
128                         db.compWriteLocking = true
129                 case <-db.closeC:
130                         if db.compWriteLocking {
131                                 // We should release the lock or Close will hang.
132                                 <-db.writeLockC
133                         }
134                         return
135                 }
136         }
137 }
138
139 type compactionTransactCounter int
140
141 func (cnt *compactionTransactCounter) incr() {
142         *cnt++
143 }
144
145 type compactionTransactInterface interface {
146         run(cnt *compactionTransactCounter) error
147         revert() error
148 }
149
150 func (db *DB) compactionTransact(name string, t compactionTransactInterface) {
151         defer func() {
152                 if x := recover(); x != nil {
153                         if x == errCompactionTransactExiting {
154                                 if err := t.revert(); err != nil {
155                                         db.logf("%s revert error %q", name, err)
156                                 }
157                         }
158                         panic(x)
159                 }
160         }()
161
162         const (
163                 backoffMin = 1 * time.Second
164                 backoffMax = 8 * time.Second
165                 backoffMul = 2 * time.Second
166         )
167         var (
168                 backoff  = backoffMin
169                 backoffT = time.NewTimer(backoff)
170                 lastCnt  = compactionTransactCounter(0)
171
172                 disableBackoff = db.s.o.GetDisableCompactionBackoff()
173         )
174         for n := 0; ; n++ {
175                 // Check whether the DB is closed.
176                 if db.isClosed() {
177                         db.logf("%s exiting", name)
178                         db.compactionExitTransact()
179                 } else if n > 0 {
180                         db.logf("%s retrying N·%d", name, n)
181                 }
182
183                 // Execute.
184                 cnt := compactionTransactCounter(0)
185                 err := t.run(&cnt)
186                 if err != nil {
187                         db.logf("%s error I·%d %q", name, cnt, err)
188                 }
189
190                 // Set compaction error status.
191                 select {
192                 case db.compErrSetC <- err:
193                 case perr := <-db.compPerErrC:
194                         if err != nil {
195                                 db.logf("%s exiting (persistent error %q)", name, perr)
196                                 db.compactionExitTransact()
197                         }
198                 case <-db.closeC:
199                         db.logf("%s exiting", name)
200                         db.compactionExitTransact()
201                 }
202                 if err == nil {
203                         return
204                 }
205                 if errors.IsCorrupted(err) {
206                         db.logf("%s exiting (corruption detected)", name)
207                         db.compactionExitTransact()
208                 }
209
210                 if !disableBackoff {
211                         // Reset backoff duration if counter is advancing.
212                         if cnt > lastCnt {
213                                 backoff = backoffMin
214                                 lastCnt = cnt
215                         }
216
217                         // Backoff.
218                         backoffT.Reset(backoff)
219                         if backoff < backoffMax {
220                                 backoff *= backoffMul
221                                 if backoff > backoffMax {
222                                         backoff = backoffMax
223                                 }
224                         }
225                         select {
226                         case <-backoffT.C:
227                         case <-db.closeC:
228                                 db.logf("%s exiting", name)
229                                 db.compactionExitTransact()
230                         }
231                 }
232         }
233 }
234
235 type compactionTransactFunc struct {
236         runFunc    func(cnt *compactionTransactCounter) error
237         revertFunc func() error
238 }
239
240 func (t *compactionTransactFunc) run(cnt *compactionTransactCounter) error {
241         return t.runFunc(cnt)
242 }
243
244 func (t *compactionTransactFunc) revert() error {
245         if t.revertFunc != nil {
246                 return t.revertFunc()
247         }
248         return nil
249 }
250
251 func (db *DB) compactionTransactFunc(name string, run func(cnt *compactionTransactCounter) error, revert func() error) {
252         db.compactionTransact(name, &compactionTransactFunc{run, revert})
253 }
254
255 func (db *DB) compactionExitTransact() {
256         panic(errCompactionTransactExiting)
257 }
258
259 func (db *DB) compactionCommit(name string, rec *sessionRecord) {
260         db.compCommitLk.Lock()
261         defer db.compCommitLk.Unlock() // Defer is necessary.
262         db.compactionTransactFunc(name+"@commit", func(cnt *compactionTransactCounter) error {
263                 return db.s.commit(rec)
264         }, nil)
265 }
266
267 func (db *DB) memCompaction() {
268         mdb := db.getFrozenMem()
269         if mdb == nil {
270                 return
271         }
272         defer mdb.decref()
273
274         db.logf("memdb@flush N·%d S·%s", mdb.Len(), shortenb(mdb.Size()))
275
276         // Don't compact empty memdb.
277         if mdb.Len() == 0 {
278                 db.logf("memdb@flush skipping")
279                 // drop frozen memdb
280                 db.dropFrozenMem()
281                 return
282         }
283
284         // Pause table compaction.
285         resumeC := make(chan struct{})
286         select {
287         case db.tcompPauseC <- (chan<- struct{})(resumeC):
288         case <-db.compPerErrC:
289                 close(resumeC)
290                 resumeC = nil
291         case <-db.closeC:
292                 db.compactionExitTransact()
293         }
294
295         var (
296                 rec        = &sessionRecord{}
297                 stats      = &cStatStaging{}
298                 flushLevel int
299         )
300
301         // Generate tables.
302         db.compactionTransactFunc("memdb@flush", func(cnt *compactionTransactCounter) (err error) {
303                 stats.startTimer()
304                 flushLevel, err = db.s.flushMemdb(rec, mdb.DB, db.memdbMaxLevel)
305                 stats.stopTimer()
306                 return
307         }, func() error {
308                 for _, r := range rec.addedTables {
309                         db.logf("memdb@flush revert @%d", r.num)
310                         if err := db.s.stor.Remove(storage.FileDesc{Type: storage.TypeTable, Num: r.num}); err != nil {
311                                 return err
312                         }
313                 }
314                 return nil
315         })
316
317         rec.setJournalNum(db.journalFd.Num)
318         rec.setSeqNum(db.frozenSeq)
319
320         // Commit.
321         stats.startTimer()
322         db.compactionCommit("memdb", rec)
323         stats.stopTimer()
324
325         db.logf("memdb@flush committed F·%d T·%v", len(rec.addedTables), stats.duration)
326
327         for _, r := range rec.addedTables {
328                 stats.write += r.size
329         }
330         db.compStats.addStat(flushLevel, stats)
331
332         // Drop frozen memdb.
333         db.dropFrozenMem()
334
335         // Resume table compaction.
336         if resumeC != nil {
337                 select {
338                 case <-resumeC:
339                         close(resumeC)
340                 case <-db.closeC:
341                         db.compactionExitTransact()
342                 }
343         }
344
345         // Trigger table compaction.
346         db.compTrigger(db.tcompCmdC)
347 }
348
349 type tableCompactionBuilder struct {
350         db           *DB
351         s            *session
352         c            *compaction
353         rec          *sessionRecord
354         stat0, stat1 *cStatStaging
355
356         snapHasLastUkey bool
357         snapLastUkey    []byte
358         snapLastSeq     uint64
359         snapIter        int
360         snapKerrCnt     int
361         snapDropCnt     int
362
363         kerrCnt int
364         dropCnt int
365
366         minSeq    uint64
367         strict    bool
368         tableSize int
369
370         tw *tWriter
371 }
372
373 func (b *tableCompactionBuilder) appendKV(key, value []byte) error {
374         // Create new table if not already.
375         if b.tw == nil {
376                 // Check for pause event.
377                 if b.db != nil {
378                         select {
379                         case ch := <-b.db.tcompPauseC:
380                                 b.db.pauseCompaction(ch)
381                         case <-b.db.closeC:
382                                 b.db.compactionExitTransact()
383                         default:
384                         }
385                 }
386
387                 // Create new table.
388                 var err error
389                 b.tw, err = b.s.tops.create()
390                 if err != nil {
391                         return err
392                 }
393         }
394
395         // Write key/value into table.
396         return b.tw.append(key, value)
397 }
398
399 func (b *tableCompactionBuilder) needFlush() bool {
400         return b.tw.tw.BytesLen() >= b.tableSize
401 }
402
403 func (b *tableCompactionBuilder) flush() error {
404         t, err := b.tw.finish()
405         if err != nil {
406                 return err
407         }
408         b.rec.addTableFile(b.c.sourceLevel+1, t)
409         b.stat1.write += t.size
410         b.s.logf("table@build created L%d@%d N·%d S·%s %q:%q", b.c.sourceLevel+1, t.fd.Num, b.tw.tw.EntriesLen(), shortenb(int(t.size)), t.imin, t.imax)
411         b.tw = nil
412         return nil
413 }
414
415 func (b *tableCompactionBuilder) cleanup() {
416         if b.tw != nil {
417                 b.tw.drop()
418                 b.tw = nil
419         }
420 }
421
422 func (b *tableCompactionBuilder) run(cnt *compactionTransactCounter) error {
423         snapResumed := b.snapIter > 0
424         hasLastUkey := b.snapHasLastUkey // The key might has zero length, so this is necessary.
425         lastUkey := append([]byte{}, b.snapLastUkey...)
426         lastSeq := b.snapLastSeq
427         b.kerrCnt = b.snapKerrCnt
428         b.dropCnt = b.snapDropCnt
429         // Restore compaction state.
430         b.c.restore()
431
432         defer b.cleanup()
433
434         b.stat1.startTimer()
435         defer b.stat1.stopTimer()
436
437         iter := b.c.newIterator()
438         defer iter.Release()
439         for i := 0; iter.Next(); i++ {
440                 // Incr transact counter.
441                 cnt.incr()
442
443                 // Skip until last state.
444                 if i < b.snapIter {
445                         continue
446                 }
447
448                 resumed := false
449                 if snapResumed {
450                         resumed = true
451                         snapResumed = false
452                 }
453
454                 ikey := iter.Key()
455                 ukey, seq, kt, kerr := parseInternalKey(ikey)
456
457                 if kerr == nil {
458                         shouldStop := !resumed && b.c.shouldStopBefore(ikey)
459
460                         if !hasLastUkey || b.s.icmp.uCompare(lastUkey, ukey) != 0 {
461                                 // First occurrence of this user key.
462
463                                 // Only rotate tables if ukey doesn't hop across.
464                                 if b.tw != nil && (shouldStop || b.needFlush()) {
465                                         if err := b.flush(); err != nil {
466                                                 return err
467                                         }
468
469                                         // Creates snapshot of the state.
470                                         b.c.save()
471                                         b.snapHasLastUkey = hasLastUkey
472                                         b.snapLastUkey = append(b.snapLastUkey[:0], lastUkey...)
473                                         b.snapLastSeq = lastSeq
474                                         b.snapIter = i
475                                         b.snapKerrCnt = b.kerrCnt
476                                         b.snapDropCnt = b.dropCnt
477                                 }
478
479                                 hasLastUkey = true
480                                 lastUkey = append(lastUkey[:0], ukey...)
481                                 lastSeq = keyMaxSeq
482                         }
483
484                         switch {
485                         case lastSeq <= b.minSeq:
486                                 // Dropped because newer entry for same user key exist
487                                 fallthrough // (A)
488                         case kt == keyTypeDel && seq <= b.minSeq && b.c.baseLevelForKey(lastUkey):
489                                 // For this user key:
490                                 // (1) there is no data in higher levels
491                                 // (2) data in lower levels will have larger seq numbers
492                                 // (3) data in layers that are being compacted here and have
493                                 //     smaller seq numbers will be dropped in the next
494                                 //     few iterations of this loop (by rule (A) above).
495                                 // Therefore this deletion marker is obsolete and can be dropped.
496                                 lastSeq = seq
497                                 b.dropCnt++
498                                 continue
499                         default:
500                                 lastSeq = seq
501                         }
502                 } else {
503                         if b.strict {
504                                 return kerr
505                         }
506
507                         // Don't drop corrupted keys.
508                         hasLastUkey = false
509                         lastUkey = lastUkey[:0]
510                         lastSeq = keyMaxSeq
511                         b.kerrCnt++
512                 }
513
514                 if err := b.appendKV(ikey, iter.Value()); err != nil {
515                         return err
516                 }
517         }
518
519         if err := iter.Error(); err != nil {
520                 return err
521         }
522
523         // Finish last table.
524         if b.tw != nil && !b.tw.empty() {
525                 return b.flush()
526         }
527         return nil
528 }
529
530 func (b *tableCompactionBuilder) revert() error {
531         for _, at := range b.rec.addedTables {
532                 b.s.logf("table@build revert @%d", at.num)
533                 if err := b.s.stor.Remove(storage.FileDesc{Type: storage.TypeTable, Num: at.num}); err != nil {
534                         return err
535                 }
536         }
537         return nil
538 }
539
540 func (db *DB) tableCompaction(c *compaction, noTrivial bool) {
541         defer c.release()
542
543         rec := &sessionRecord{}
544         rec.addCompPtr(c.sourceLevel, c.imax)
545
546         if !noTrivial && c.trivial() {
547                 t := c.levels[0][0]
548                 db.logf("table@move L%d@%d -> L%d", c.sourceLevel, t.fd.Num, c.sourceLevel+1)
549                 rec.delTable(c.sourceLevel, t.fd.Num)
550                 rec.addTableFile(c.sourceLevel+1, t)
551                 db.compactionCommit("table-move", rec)
552                 return
553         }
554
555         var stats [2]cStatStaging
556         for i, tables := range c.levels {
557                 for _, t := range tables {
558                         stats[i].read += t.size
559                         // Insert deleted tables into record
560                         rec.delTable(c.sourceLevel+i, t.fd.Num)
561                 }
562         }
563         sourceSize := int(stats[0].read + stats[1].read)
564         minSeq := db.minSeq()
565         db.logf("table@compaction L%d·%d -> L%d·%d S·%s Q·%d", c.sourceLevel, len(c.levels[0]), c.sourceLevel+1, len(c.levels[1]), shortenb(sourceSize), minSeq)
566
567         b := &tableCompactionBuilder{
568                 db:        db,
569                 s:         db.s,
570                 c:         c,
571                 rec:       rec,
572                 stat1:     &stats[1],
573                 minSeq:    minSeq,
574                 strict:    db.s.o.GetStrict(opt.StrictCompaction),
575                 tableSize: db.s.o.GetCompactionTableSize(c.sourceLevel + 1),
576         }
577         db.compactionTransact("table@build", b)
578
579         // Commit.
580         stats[1].startTimer()
581         db.compactionCommit("table", rec)
582         stats[1].stopTimer()
583
584         resultSize := int(stats[1].write)
585         db.logf("table@compaction committed F%s S%s Ke·%d D·%d T·%v", sint(len(rec.addedTables)-len(rec.deletedTables)), sshortenb(resultSize-sourceSize), b.kerrCnt, b.dropCnt, stats[1].duration)
586
587         // Save compaction stats
588         for i := range stats {
589                 db.compStats.addStat(c.sourceLevel+1, &stats[i])
590         }
591 }
592
593 func (db *DB) tableRangeCompaction(level int, umin, umax []byte) error {
594         db.logf("table@compaction range L%d %q:%q", level, umin, umax)
595         if level >= 0 {
596                 if c := db.s.getCompactionRange(level, umin, umax, true); c != nil {
597                         db.tableCompaction(c, true)
598                 }
599         } else {
600                 // Retry until nothing to compact.
601                 for {
602                         compacted := false
603
604                         // Scan for maximum level with overlapped tables.
605                         v := db.s.version()
606                         m := 1
607                         for i := m; i < len(v.levels); i++ {
608                                 tables := v.levels[i]
609                                 if tables.overlaps(db.s.icmp, umin, umax, false) {
610                                         m = i
611                                 }
612                         }
613                         v.release()
614
615                         for level := 0; level < m; level++ {
616                                 if c := db.s.getCompactionRange(level, umin, umax, false); c != nil {
617                                         db.tableCompaction(c, true)
618                                         compacted = true
619                                 }
620                         }
621
622                         if !compacted {
623                                 break
624                         }
625                 }
626         }
627
628         return nil
629 }
630
631 func (db *DB) tableAutoCompaction() {
632         if c := db.s.pickCompaction(); c != nil {
633                 db.tableCompaction(c, false)
634         }
635 }
636
637 func (db *DB) tableNeedCompaction() bool {
638         v := db.s.version()
639         defer v.release()
640         return v.needCompaction()
641 }
642
643 func (db *DB) pauseCompaction(ch chan<- struct{}) {
644         select {
645         case ch <- struct{}{}:
646         case <-db.closeC:
647                 db.compactionExitTransact()
648         }
649 }
650
651 type cCmd interface {
652         ack(err error)
653 }
654
655 type cAuto struct {
656         ackC chan<- error
657 }
658
659 func (r cAuto) ack(err error) {
660         if r.ackC != nil {
661                 defer func() {
662                         recover()
663                 }()
664                 r.ackC <- err
665         }
666 }
667
668 type cRange struct {
669         level    int
670         min, max []byte
671         ackC     chan<- error
672 }
673
674 func (r cRange) ack(err error) {
675         if r.ackC != nil {
676                 defer func() {
677                         recover()
678                 }()
679                 r.ackC <- err
680         }
681 }
682
683 // This will trigger auto compaction but will not wait for it.
684 func (db *DB) compTrigger(compC chan<- cCmd) {
685         select {
686         case compC <- cAuto{}:
687         default:
688         }
689 }
690
691 // This will trigger auto compaction and/or wait for all compaction to be done.
692 func (db *DB) compTriggerWait(compC chan<- cCmd) (err error) {
693         ch := make(chan error)
694         defer close(ch)
695         // Send cmd.
696         select {
697         case compC <- cAuto{ch}:
698         case err = <-db.compErrC:
699                 return
700         case <-db.closeC:
701                 return ErrClosed
702         }
703         // Wait cmd.
704         select {
705         case err = <-ch:
706         case err = <-db.compErrC:
707         case <-db.closeC:
708                 return ErrClosed
709         }
710         return err
711 }
712
713 // Send range compaction request.
714 func (db *DB) compTriggerRange(compC chan<- cCmd, level int, min, max []byte) (err error) {
715         ch := make(chan error)
716         defer close(ch)
717         // Send cmd.
718         select {
719         case compC <- cRange{level, min, max, ch}:
720         case err := <-db.compErrC:
721                 return err
722         case <-db.closeC:
723                 return ErrClosed
724         }
725         // Wait cmd.
726         select {
727         case err = <-ch:
728         case err = <-db.compErrC:
729         case <-db.closeC:
730                 return ErrClosed
731         }
732         return err
733 }
734
735 func (db *DB) mCompaction() {
736         var x cCmd
737
738         defer func() {
739                 if x := recover(); x != nil {
740                         if x != errCompactionTransactExiting {
741                                 panic(x)
742                         }
743                 }
744                 if x != nil {
745                         x.ack(ErrClosed)
746                 }
747                 db.closeW.Done()
748         }()
749
750         for {
751                 select {
752                 case x = <-db.mcompCmdC:
753                         switch x.(type) {
754                         case cAuto:
755                                 db.memCompaction()
756                                 x.ack(nil)
757                                 x = nil
758                         default:
759                                 panic("leveldb: unknown command")
760                         }
761                 case <-db.closeC:
762                         return
763                 }
764         }
765 }
766
767 func (db *DB) tCompaction() {
768         var x cCmd
769         var ackQ []cCmd
770
771         defer func() {
772                 if x := recover(); x != nil {
773                         if x != errCompactionTransactExiting {
774                                 panic(x)
775                         }
776                 }
777                 for i := range ackQ {
778                         ackQ[i].ack(ErrClosed)
779                         ackQ[i] = nil
780                 }
781                 if x != nil {
782                         x.ack(ErrClosed)
783                 }
784                 db.closeW.Done()
785         }()
786
787         for {
788                 if db.tableNeedCompaction() {
789                         select {
790                         case x = <-db.tcompCmdC:
791                         case ch := <-db.tcompPauseC:
792                                 db.pauseCompaction(ch)
793                                 continue
794                         case <-db.closeC:
795                                 return
796                         default:
797                         }
798                 } else {
799                         for i := range ackQ {
800                                 ackQ[i].ack(nil)
801                                 ackQ[i] = nil
802                         }
803                         ackQ = ackQ[:0]
804                         select {
805                         case x = <-db.tcompCmdC:
806                         case ch := <-db.tcompPauseC:
807                                 db.pauseCompaction(ch)
808                                 continue
809                         case <-db.closeC:
810                                 return
811                         }
812                 }
813                 if x != nil {
814                         switch cmd := x.(type) {
815                         case cAuto:
816                                 ackQ = append(ackQ, x)
817                         case cRange:
818                                 x.ack(db.tableRangeCompaction(cmd.level, cmd.min, cmd.max))
819                         default:
820                                 panic("leveldb: unknown command")
821                         }
822                         x = nil
823                 }
824                 db.tableAutoCompaction()
825         }
826 }