1 // Copyright (c) 2012, Suryandaru Triandana <syndtr@gmail.com>
2 // All rights reserved.
4 // Use of this source code is governed by a BSD-style license that can be
5 // found in the LICENSE file.
13 "github.com/syndtr/goleveldb/leveldb/errors"
14 "github.com/syndtr/goleveldb/leveldb/opt"
15 "github.com/syndtr/goleveldb/leveldb/storage"
19 errCompactionTransactExiting = errors.New("leveldb: compaction transact exiting")
23 duration time.Duration
28 func (p *cStat) add(n *cStatStaging) {
29 p.duration += n.duration
34 func (p *cStat) get() (duration time.Duration, read, write int64) {
35 return p.duration, p.read, p.write
38 type cStatStaging struct {
40 duration time.Duration
46 func (p *cStatStaging) startTimer() {
53 func (p *cStatStaging) stopTimer() {
55 p.duration += time.Since(p.start)
65 func (p *cStats) addStat(level int, n *cStatStaging) {
67 if level >= len(p.stats) {
68 newStats := make([]cStat, level+1)
69 copy(newStats, p.stats)
76 func (p *cStats) getStat(level int) (duration time.Duration, read, write int64) {
79 if level < len(p.stats) {
80 return p.stats[level].get()
85 func (db *DB) compactionError() {
91 case err = <-db.compErrSetC:
94 case err == ErrReadOnly, errors.IsCorrupted(err):
107 case db.compErrC <- err:
108 case err = <-db.compErrSetC:
112 case err == ErrReadOnly, errors.IsCorrupted(err):
124 case db.compErrC <- err:
125 case db.compPerErrC <- err:
126 case db.writeLockC <- struct{}{}:
127 // Hold write lock, so that write won't pass-through.
128 db.compWriteLocking = true
130 if db.compWriteLocking {
131 // We should release the lock or Close will hang.
139 type compactionTransactCounter int
141 func (cnt *compactionTransactCounter) incr() {
145 type compactionTransactInterface interface {
146 run(cnt *compactionTransactCounter) error
150 func (db *DB) compactionTransact(name string, t compactionTransactInterface) {
152 if x := recover(); x != nil {
153 if x == errCompactionTransactExiting {
154 if err := t.revert(); err != nil {
155 db.logf("%s revert error %q", name, err)
163 backoffMin = 1 * time.Second
164 backoffMax = 8 * time.Second
165 backoffMul = 2 * time.Second
169 backoffT = time.NewTimer(backoff)
170 lastCnt = compactionTransactCounter(0)
172 disableBackoff = db.s.o.GetDisableCompactionBackoff()
175 // Check whether the DB is closed.
177 db.logf("%s exiting", name)
178 db.compactionExitTransact()
180 db.logf("%s retrying N·%d", name, n)
184 cnt := compactionTransactCounter(0)
187 db.logf("%s error I·%d %q", name, cnt, err)
190 // Set compaction error status.
192 case db.compErrSetC <- err:
193 case perr := <-db.compPerErrC:
195 db.logf("%s exiting (persistent error %q)", name, perr)
196 db.compactionExitTransact()
199 db.logf("%s exiting", name)
200 db.compactionExitTransact()
205 if errors.IsCorrupted(err) {
206 db.logf("%s exiting (corruption detected)", name)
207 db.compactionExitTransact()
211 // Reset backoff duration if counter is advancing.
218 backoffT.Reset(backoff)
219 if backoff < backoffMax {
220 backoff *= backoffMul
221 if backoff > backoffMax {
228 db.logf("%s exiting", name)
229 db.compactionExitTransact()
235 type compactionTransactFunc struct {
236 runFunc func(cnt *compactionTransactCounter) error
237 revertFunc func() error
240 func (t *compactionTransactFunc) run(cnt *compactionTransactCounter) error {
241 return t.runFunc(cnt)
244 func (t *compactionTransactFunc) revert() error {
245 if t.revertFunc != nil {
246 return t.revertFunc()
251 func (db *DB) compactionTransactFunc(name string, run func(cnt *compactionTransactCounter) error, revert func() error) {
252 db.compactionTransact(name, &compactionTransactFunc{run, revert})
255 func (db *DB) compactionExitTransact() {
256 panic(errCompactionTransactExiting)
259 func (db *DB) compactionCommit(name string, rec *sessionRecord) {
260 db.compCommitLk.Lock()
261 defer db.compCommitLk.Unlock() // Defer is necessary.
262 db.compactionTransactFunc(name+"@commit", func(cnt *compactionTransactCounter) error {
263 return db.s.commit(rec)
267 func (db *DB) memCompaction() {
268 mdb := db.getFrozenMem()
274 db.logf("memdb@flush N·%d S·%s", mdb.Len(), shortenb(mdb.Size()))
276 // Don't compact empty memdb.
278 db.logf("memdb@flush skipping")
284 // Pause table compaction.
285 resumeC := make(chan struct{})
287 case db.tcompPauseC <- (chan<- struct{})(resumeC):
288 case <-db.compPerErrC:
292 db.compactionExitTransact()
296 rec = &sessionRecord{}
297 stats = &cStatStaging{}
302 db.compactionTransactFunc("memdb@flush", func(cnt *compactionTransactCounter) (err error) {
304 flushLevel, err = db.s.flushMemdb(rec, mdb.DB, db.memdbMaxLevel)
308 for _, r := range rec.addedTables {
309 db.logf("memdb@flush revert @%d", r.num)
310 if err := db.s.stor.Remove(storage.FileDesc{Type: storage.TypeTable, Num: r.num}); err != nil {
317 rec.setJournalNum(db.journalFd.Num)
318 rec.setSeqNum(db.frozenSeq)
322 db.compactionCommit("memdb", rec)
325 db.logf("memdb@flush committed F·%d T·%v", len(rec.addedTables), stats.duration)
327 for _, r := range rec.addedTables {
328 stats.write += r.size
330 db.compStats.addStat(flushLevel, stats)
332 // Drop frozen memdb.
335 // Resume table compaction.
341 db.compactionExitTransact()
345 // Trigger table compaction.
346 db.compTrigger(db.tcompCmdC)
349 type tableCompactionBuilder struct {
354 stat0, stat1 *cStatStaging
373 func (b *tableCompactionBuilder) appendKV(key, value []byte) error {
374 // Create new table if not already.
376 // Check for pause event.
379 case ch := <-b.db.tcompPauseC:
380 b.db.pauseCompaction(ch)
382 b.db.compactionExitTransact()
389 b.tw, err = b.s.tops.create()
395 // Write key/value into table.
396 return b.tw.append(key, value)
399 func (b *tableCompactionBuilder) needFlush() bool {
400 return b.tw.tw.BytesLen() >= b.tableSize
403 func (b *tableCompactionBuilder) flush() error {
404 t, err := b.tw.finish()
408 b.rec.addTableFile(b.c.sourceLevel+1, t)
409 b.stat1.write += t.size
410 b.s.logf("table@build created L%d@%d N·%d S·%s %q:%q", b.c.sourceLevel+1, t.fd.Num, b.tw.tw.EntriesLen(), shortenb(int(t.size)), t.imin, t.imax)
415 func (b *tableCompactionBuilder) cleanup() {
422 func (b *tableCompactionBuilder) run(cnt *compactionTransactCounter) error {
423 snapResumed := b.snapIter > 0
424 hasLastUkey := b.snapHasLastUkey // The key might has zero length, so this is necessary.
425 lastUkey := append([]byte{}, b.snapLastUkey...)
426 lastSeq := b.snapLastSeq
427 b.kerrCnt = b.snapKerrCnt
428 b.dropCnt = b.snapDropCnt
429 // Restore compaction state.
435 defer b.stat1.stopTimer()
437 iter := b.c.newIterator()
439 for i := 0; iter.Next(); i++ {
440 // Incr transact counter.
443 // Skip until last state.
455 ukey, seq, kt, kerr := parseInternalKey(ikey)
458 shouldStop := !resumed && b.c.shouldStopBefore(ikey)
460 if !hasLastUkey || b.s.icmp.uCompare(lastUkey, ukey) != 0 {
461 // First occurrence of this user key.
463 // Only rotate tables if ukey doesn't hop across.
464 if b.tw != nil && (shouldStop || b.needFlush()) {
465 if err := b.flush(); err != nil {
469 // Creates snapshot of the state.
471 b.snapHasLastUkey = hasLastUkey
472 b.snapLastUkey = append(b.snapLastUkey[:0], lastUkey...)
473 b.snapLastSeq = lastSeq
475 b.snapKerrCnt = b.kerrCnt
476 b.snapDropCnt = b.dropCnt
480 lastUkey = append(lastUkey[:0], ukey...)
485 case lastSeq <= b.minSeq:
486 // Dropped because newer entry for same user key exist
488 case kt == keyTypeDel && seq <= b.minSeq && b.c.baseLevelForKey(lastUkey):
489 // For this user key:
490 // (1) there is no data in higher levels
491 // (2) data in lower levels will have larger seq numbers
492 // (3) data in layers that are being compacted here and have
493 // smaller seq numbers will be dropped in the next
494 // few iterations of this loop (by rule (A) above).
495 // Therefore this deletion marker is obsolete and can be dropped.
507 // Don't drop corrupted keys.
509 lastUkey = lastUkey[:0]
514 if err := b.appendKV(ikey, iter.Value()); err != nil {
519 if err := iter.Error(); err != nil {
523 // Finish last table.
524 if b.tw != nil && !b.tw.empty() {
530 func (b *tableCompactionBuilder) revert() error {
531 for _, at := range b.rec.addedTables {
532 b.s.logf("table@build revert @%d", at.num)
533 if err := b.s.stor.Remove(storage.FileDesc{Type: storage.TypeTable, Num: at.num}); err != nil {
540 func (db *DB) tableCompaction(c *compaction, noTrivial bool) {
543 rec := &sessionRecord{}
544 rec.addCompPtr(c.sourceLevel, c.imax)
546 if !noTrivial && c.trivial() {
548 db.logf("table@move L%d@%d -> L%d", c.sourceLevel, t.fd.Num, c.sourceLevel+1)
549 rec.delTable(c.sourceLevel, t.fd.Num)
550 rec.addTableFile(c.sourceLevel+1, t)
551 db.compactionCommit("table-move", rec)
555 var stats [2]cStatStaging
556 for i, tables := range c.levels {
557 for _, t := range tables {
558 stats[i].read += t.size
559 // Insert deleted tables into record
560 rec.delTable(c.sourceLevel+i, t.fd.Num)
563 sourceSize := int(stats[0].read + stats[1].read)
564 minSeq := db.minSeq()
565 db.logf("table@compaction L%d·%d -> L%d·%d S·%s Q·%d", c.sourceLevel, len(c.levels[0]), c.sourceLevel+1, len(c.levels[1]), shortenb(sourceSize), minSeq)
567 b := &tableCompactionBuilder{
574 strict: db.s.o.GetStrict(opt.StrictCompaction),
575 tableSize: db.s.o.GetCompactionTableSize(c.sourceLevel + 1),
577 db.compactionTransact("table@build", b)
580 stats[1].startTimer()
581 db.compactionCommit("table", rec)
584 resultSize := int(stats[1].write)
585 db.logf("table@compaction committed F%s S%s Ke·%d D·%d T·%v", sint(len(rec.addedTables)-len(rec.deletedTables)), sshortenb(resultSize-sourceSize), b.kerrCnt, b.dropCnt, stats[1].duration)
587 // Save compaction stats
588 for i := range stats {
589 db.compStats.addStat(c.sourceLevel+1, &stats[i])
593 func (db *DB) tableRangeCompaction(level int, umin, umax []byte) error {
594 db.logf("table@compaction range L%d %q:%q", level, umin, umax)
596 if c := db.s.getCompactionRange(level, umin, umax, true); c != nil {
597 db.tableCompaction(c, true)
600 // Retry until nothing to compact.
604 // Scan for maximum level with overlapped tables.
607 for i := m; i < len(v.levels); i++ {
608 tables := v.levels[i]
609 if tables.overlaps(db.s.icmp, umin, umax, false) {
615 for level := 0; level < m; level++ {
616 if c := db.s.getCompactionRange(level, umin, umax, false); c != nil {
617 db.tableCompaction(c, true)
631 func (db *DB) tableAutoCompaction() {
632 if c := db.s.pickCompaction(); c != nil {
633 db.tableCompaction(c, false)
637 func (db *DB) tableNeedCompaction() bool {
640 return v.needCompaction()
643 func (db *DB) pauseCompaction(ch chan<- struct{}) {
645 case ch <- struct{}{}:
647 db.compactionExitTransact()
651 type cCmd interface {
659 func (r cAuto) ack(err error) {
674 func (r cRange) ack(err error) {
683 // This will trigger auto compaction but will not wait for it.
684 func (db *DB) compTrigger(compC chan<- cCmd) {
686 case compC <- cAuto{}:
691 // This will trigger auto compaction and/or wait for all compaction to be done.
692 func (db *DB) compTriggerWait(compC chan<- cCmd) (err error) {
693 ch := make(chan error)
697 case compC <- cAuto{ch}:
698 case err = <-db.compErrC:
706 case err = <-db.compErrC:
713 // Send range compaction request.
714 func (db *DB) compTriggerRange(compC chan<- cCmd, level int, min, max []byte) (err error) {
715 ch := make(chan error)
719 case compC <- cRange{level, min, max, ch}:
720 case err := <-db.compErrC:
728 case err = <-db.compErrC:
735 func (db *DB) mCompaction() {
739 if x := recover(); x != nil {
740 if x != errCompactionTransactExiting {
752 case x = <-db.mcompCmdC:
759 panic("leveldb: unknown command")
767 func (db *DB) tCompaction() {
772 if x := recover(); x != nil {
773 if x != errCompactionTransactExiting {
777 for i := range ackQ {
778 ackQ[i].ack(ErrClosed)
788 if db.tableNeedCompaction() {
790 case x = <-db.tcompCmdC:
791 case ch := <-db.tcompPauseC:
792 db.pauseCompaction(ch)
799 for i := range ackQ {
805 case x = <-db.tcompCmdC:
806 case ch := <-db.tcompPauseC:
807 db.pauseCompaction(ch)
814 switch cmd := x.(type) {
816 ackQ = append(ackQ, x)
818 x.ack(db.tableRangeCompaction(cmd.level, cmd.min, cmd.max))
820 panic("leveldb: unknown command")
824 db.tableAutoCompaction()