OSDN Git Service

Thanos did someting
[bytom/vapor.git] / vendor / github.com / syndtr / goleveldb / leveldb / db_compaction.go
diff --git a/vendor/github.com/syndtr/goleveldb/leveldb/db_compaction.go b/vendor/github.com/syndtr/goleveldb/leveldb/db_compaction.go
deleted file mode 100644 (file)
index b6563e8..0000000
+++ /dev/null
@@ -1,826 +0,0 @@
-// Copyright (c) 2012, Suryandaru Triandana <syndtr@gmail.com>
-// All rights reserved.
-//
-// Use of this source code is governed by a BSD-style license that can be
-// found in the LICENSE file.
-
-package leveldb
-
-import (
-       "sync"
-       "time"
-
-       "github.com/syndtr/goleveldb/leveldb/errors"
-       "github.com/syndtr/goleveldb/leveldb/opt"
-       "github.com/syndtr/goleveldb/leveldb/storage"
-)
-
-var (
-       errCompactionTransactExiting = errors.New("leveldb: compaction transact exiting")
-)
-
-type cStat struct {
-       duration time.Duration
-       read     int64
-       write    int64
-}
-
-func (p *cStat) add(n *cStatStaging) {
-       p.duration += n.duration
-       p.read += n.read
-       p.write += n.write
-}
-
-func (p *cStat) get() (duration time.Duration, read, write int64) {
-       return p.duration, p.read, p.write
-}
-
-type cStatStaging struct {
-       start    time.Time
-       duration time.Duration
-       on       bool
-       read     int64
-       write    int64
-}
-
-func (p *cStatStaging) startTimer() {
-       if !p.on {
-               p.start = time.Now()
-               p.on = true
-       }
-}
-
-func (p *cStatStaging) stopTimer() {
-       if p.on {
-               p.duration += time.Since(p.start)
-               p.on = false
-       }
-}
-
-type cStats struct {
-       lk    sync.Mutex
-       stats []cStat
-}
-
-func (p *cStats) addStat(level int, n *cStatStaging) {
-       p.lk.Lock()
-       if level >= len(p.stats) {
-               newStats := make([]cStat, level+1)
-               copy(newStats, p.stats)
-               p.stats = newStats
-       }
-       p.stats[level].add(n)
-       p.lk.Unlock()
-}
-
-func (p *cStats) getStat(level int) (duration time.Duration, read, write int64) {
-       p.lk.Lock()
-       defer p.lk.Unlock()
-       if level < len(p.stats) {
-               return p.stats[level].get()
-       }
-       return
-}
-
-func (db *DB) compactionError() {
-       var err error
-noerr:
-       // No error.
-       for {
-               select {
-               case err = <-db.compErrSetC:
-                       switch {
-                       case err == nil:
-                       case err == ErrReadOnly, errors.IsCorrupted(err):
-                               goto hasperr
-                       default:
-                               goto haserr
-                       }
-               case <-db.closeC:
-                       return
-               }
-       }
-haserr:
-       // Transient error.
-       for {
-               select {
-               case db.compErrC <- err:
-               case err = <-db.compErrSetC:
-                       switch {
-                       case err == nil:
-                               goto noerr
-                       case err == ErrReadOnly, errors.IsCorrupted(err):
-                               goto hasperr
-                       default:
-                       }
-               case <-db.closeC:
-                       return
-               }
-       }
-hasperr:
-       // Persistent error.
-       for {
-               select {
-               case db.compErrC <- err:
-               case db.compPerErrC <- err:
-               case db.writeLockC <- struct{}{}:
-                       // Hold write lock, so that write won't pass-through.
-                       db.compWriteLocking = true
-               case <-db.closeC:
-                       if db.compWriteLocking {
-                               // We should release the lock or Close will hang.
-                               <-db.writeLockC
-                       }
-                       return
-               }
-       }
-}
-
-type compactionTransactCounter int
-
-func (cnt *compactionTransactCounter) incr() {
-       *cnt++
-}
-
-type compactionTransactInterface interface {
-       run(cnt *compactionTransactCounter) error
-       revert() error
-}
-
-func (db *DB) compactionTransact(name string, t compactionTransactInterface) {
-       defer func() {
-               if x := recover(); x != nil {
-                       if x == errCompactionTransactExiting {
-                               if err := t.revert(); err != nil {
-                                       db.logf("%s revert error %q", name, err)
-                               }
-                       }
-                       panic(x)
-               }
-       }()
-
-       const (
-               backoffMin = 1 * time.Second
-               backoffMax = 8 * time.Second
-               backoffMul = 2 * time.Second
-       )
-       var (
-               backoff  = backoffMin
-               backoffT = time.NewTimer(backoff)
-               lastCnt  = compactionTransactCounter(0)
-
-               disableBackoff = db.s.o.GetDisableCompactionBackoff()
-       )
-       for n := 0; ; n++ {
-               // Check whether the DB is closed.
-               if db.isClosed() {
-                       db.logf("%s exiting", name)
-                       db.compactionExitTransact()
-               } else if n > 0 {
-                       db.logf("%s retrying N·%d", name, n)
-               }
-
-               // Execute.
-               cnt := compactionTransactCounter(0)
-               err := t.run(&cnt)
-               if err != nil {
-                       db.logf("%s error I·%d %q", name, cnt, err)
-               }
-
-               // Set compaction error status.
-               select {
-               case db.compErrSetC <- err:
-               case perr := <-db.compPerErrC:
-                       if err != nil {
-                               db.logf("%s exiting (persistent error %q)", name, perr)
-                               db.compactionExitTransact()
-                       }
-               case <-db.closeC:
-                       db.logf("%s exiting", name)
-                       db.compactionExitTransact()
-               }
-               if err == nil {
-                       return
-               }
-               if errors.IsCorrupted(err) {
-                       db.logf("%s exiting (corruption detected)", name)
-                       db.compactionExitTransact()
-               }
-
-               if !disableBackoff {
-                       // Reset backoff duration if counter is advancing.
-                       if cnt > lastCnt {
-                               backoff = backoffMin
-                               lastCnt = cnt
-                       }
-
-                       // Backoff.
-                       backoffT.Reset(backoff)
-                       if backoff < backoffMax {
-                               backoff *= backoffMul
-                               if backoff > backoffMax {
-                                       backoff = backoffMax
-                               }
-                       }
-                       select {
-                       case <-backoffT.C:
-                       case <-db.closeC:
-                               db.logf("%s exiting", name)
-                               db.compactionExitTransact()
-                       }
-               }
-       }
-}
-
-type compactionTransactFunc struct {
-       runFunc    func(cnt *compactionTransactCounter) error
-       revertFunc func() error
-}
-
-func (t *compactionTransactFunc) run(cnt *compactionTransactCounter) error {
-       return t.runFunc(cnt)
-}
-
-func (t *compactionTransactFunc) revert() error {
-       if t.revertFunc != nil {
-               return t.revertFunc()
-       }
-       return nil
-}
-
-func (db *DB) compactionTransactFunc(name string, run func(cnt *compactionTransactCounter) error, revert func() error) {
-       db.compactionTransact(name, &compactionTransactFunc{run, revert})
-}
-
-func (db *DB) compactionExitTransact() {
-       panic(errCompactionTransactExiting)
-}
-
-func (db *DB) compactionCommit(name string, rec *sessionRecord) {
-       db.compCommitLk.Lock()
-       defer db.compCommitLk.Unlock() // Defer is necessary.
-       db.compactionTransactFunc(name+"@commit", func(cnt *compactionTransactCounter) error {
-               return db.s.commit(rec)
-       }, nil)
-}
-
-func (db *DB) memCompaction() {
-       mdb := db.getFrozenMem()
-       if mdb == nil {
-               return
-       }
-       defer mdb.decref()
-
-       db.logf("memdb@flush N·%d S·%s", mdb.Len(), shortenb(mdb.Size()))
-
-       // Don't compact empty memdb.
-       if mdb.Len() == 0 {
-               db.logf("memdb@flush skipping")
-               // drop frozen memdb
-               db.dropFrozenMem()
-               return
-       }
-
-       // Pause table compaction.
-       resumeC := make(chan struct{})
-       select {
-       case db.tcompPauseC <- (chan<- struct{})(resumeC):
-       case <-db.compPerErrC:
-               close(resumeC)
-               resumeC = nil
-       case <-db.closeC:
-               db.compactionExitTransact()
-       }
-
-       var (
-               rec        = &sessionRecord{}
-               stats      = &cStatStaging{}
-               flushLevel int
-       )
-
-       // Generate tables.
-       db.compactionTransactFunc("memdb@flush", func(cnt *compactionTransactCounter) (err error) {
-               stats.startTimer()
-               flushLevel, err = db.s.flushMemdb(rec, mdb.DB, db.memdbMaxLevel)
-               stats.stopTimer()
-               return
-       }, func() error {
-               for _, r := range rec.addedTables {
-                       db.logf("memdb@flush revert @%d", r.num)
-                       if err := db.s.stor.Remove(storage.FileDesc{Type: storage.TypeTable, Num: r.num}); err != nil {
-                               return err
-                       }
-               }
-               return nil
-       })
-
-       rec.setJournalNum(db.journalFd.Num)
-       rec.setSeqNum(db.frozenSeq)
-
-       // Commit.
-       stats.startTimer()
-       db.compactionCommit("memdb", rec)
-       stats.stopTimer()
-
-       db.logf("memdb@flush committed F·%d T·%v", len(rec.addedTables), stats.duration)
-
-       for _, r := range rec.addedTables {
-               stats.write += r.size
-       }
-       db.compStats.addStat(flushLevel, stats)
-
-       // Drop frozen memdb.
-       db.dropFrozenMem()
-
-       // Resume table compaction.
-       if resumeC != nil {
-               select {
-               case <-resumeC:
-                       close(resumeC)
-               case <-db.closeC:
-                       db.compactionExitTransact()
-               }
-       }
-
-       // Trigger table compaction.
-       db.compTrigger(db.tcompCmdC)
-}
-
-type tableCompactionBuilder struct {
-       db           *DB
-       s            *session
-       c            *compaction
-       rec          *sessionRecord
-       stat0, stat1 *cStatStaging
-
-       snapHasLastUkey bool
-       snapLastUkey    []byte
-       snapLastSeq     uint64
-       snapIter        int
-       snapKerrCnt     int
-       snapDropCnt     int
-
-       kerrCnt int
-       dropCnt int
-
-       minSeq    uint64
-       strict    bool
-       tableSize int
-
-       tw *tWriter
-}
-
-func (b *tableCompactionBuilder) appendKV(key, value []byte) error {
-       // Create new table if not already.
-       if b.tw == nil {
-               // Check for pause event.
-               if b.db != nil {
-                       select {
-                       case ch := <-b.db.tcompPauseC:
-                               b.db.pauseCompaction(ch)
-                       case <-b.db.closeC:
-                               b.db.compactionExitTransact()
-                       default:
-                       }
-               }
-
-               // Create new table.
-               var err error
-               b.tw, err = b.s.tops.create()
-               if err != nil {
-                       return err
-               }
-       }
-
-       // Write key/value into table.
-       return b.tw.append(key, value)
-}
-
-func (b *tableCompactionBuilder) needFlush() bool {
-       return b.tw.tw.BytesLen() >= b.tableSize
-}
-
-func (b *tableCompactionBuilder) flush() error {
-       t, err := b.tw.finish()
-       if err != nil {
-               return err
-       }
-       b.rec.addTableFile(b.c.sourceLevel+1, t)
-       b.stat1.write += t.size
-       b.s.logf("table@build created L%d@%d N·%d S·%s %q:%q", b.c.sourceLevel+1, t.fd.Num, b.tw.tw.EntriesLen(), shortenb(int(t.size)), t.imin, t.imax)
-       b.tw = nil
-       return nil
-}
-
-func (b *tableCompactionBuilder) cleanup() {
-       if b.tw != nil {
-               b.tw.drop()
-               b.tw = nil
-       }
-}
-
-func (b *tableCompactionBuilder) run(cnt *compactionTransactCounter) error {
-       snapResumed := b.snapIter > 0
-       hasLastUkey := b.snapHasLastUkey // The key might has zero length, so this is necessary.
-       lastUkey := append([]byte{}, b.snapLastUkey...)
-       lastSeq := b.snapLastSeq
-       b.kerrCnt = b.snapKerrCnt
-       b.dropCnt = b.snapDropCnt
-       // Restore compaction state.
-       b.c.restore()
-
-       defer b.cleanup()
-
-       b.stat1.startTimer()
-       defer b.stat1.stopTimer()
-
-       iter := b.c.newIterator()
-       defer iter.Release()
-       for i := 0; iter.Next(); i++ {
-               // Incr transact counter.
-               cnt.incr()
-
-               // Skip until last state.
-               if i < b.snapIter {
-                       continue
-               }
-
-               resumed := false
-               if snapResumed {
-                       resumed = true
-                       snapResumed = false
-               }
-
-               ikey := iter.Key()
-               ukey, seq, kt, kerr := parseInternalKey(ikey)
-
-               if kerr == nil {
-                       shouldStop := !resumed && b.c.shouldStopBefore(ikey)
-
-                       if !hasLastUkey || b.s.icmp.uCompare(lastUkey, ukey) != 0 {
-                               // First occurrence of this user key.
-
-                               // Only rotate tables if ukey doesn't hop across.
-                               if b.tw != nil && (shouldStop || b.needFlush()) {
-                                       if err := b.flush(); err != nil {
-                                               return err
-                                       }
-
-                                       // Creates snapshot of the state.
-                                       b.c.save()
-                                       b.snapHasLastUkey = hasLastUkey
-                                       b.snapLastUkey = append(b.snapLastUkey[:0], lastUkey...)
-                                       b.snapLastSeq = lastSeq
-                                       b.snapIter = i
-                                       b.snapKerrCnt = b.kerrCnt
-                                       b.snapDropCnt = b.dropCnt
-                               }
-
-                               hasLastUkey = true
-                               lastUkey = append(lastUkey[:0], ukey...)
-                               lastSeq = keyMaxSeq
-                       }
-
-                       switch {
-                       case lastSeq <= b.minSeq:
-                               // Dropped because newer entry for same user key exist
-                               fallthrough // (A)
-                       case kt == keyTypeDel && seq <= b.minSeq && b.c.baseLevelForKey(lastUkey):
-                               // For this user key:
-                               // (1) there is no data in higher levels
-                               // (2) data in lower levels will have larger seq numbers
-                               // (3) data in layers that are being compacted here and have
-                               //     smaller seq numbers will be dropped in the next
-                               //     few iterations of this loop (by rule (A) above).
-                               // Therefore this deletion marker is obsolete and can be dropped.
-                               lastSeq = seq
-                               b.dropCnt++
-                               continue
-                       default:
-                               lastSeq = seq
-                       }
-               } else {
-                       if b.strict {
-                               return kerr
-                       }
-
-                       // Don't drop corrupted keys.
-                       hasLastUkey = false
-                       lastUkey = lastUkey[:0]
-                       lastSeq = keyMaxSeq
-                       b.kerrCnt++
-               }
-
-               if err := b.appendKV(ikey, iter.Value()); err != nil {
-                       return err
-               }
-       }
-
-       if err := iter.Error(); err != nil {
-               return err
-       }
-
-       // Finish last table.
-       if b.tw != nil && !b.tw.empty() {
-               return b.flush()
-       }
-       return nil
-}
-
-func (b *tableCompactionBuilder) revert() error {
-       for _, at := range b.rec.addedTables {
-               b.s.logf("table@build revert @%d", at.num)
-               if err := b.s.stor.Remove(storage.FileDesc{Type: storage.TypeTable, Num: at.num}); err != nil {
-                       return err
-               }
-       }
-       return nil
-}
-
-func (db *DB) tableCompaction(c *compaction, noTrivial bool) {
-       defer c.release()
-
-       rec := &sessionRecord{}
-       rec.addCompPtr(c.sourceLevel, c.imax)
-
-       if !noTrivial && c.trivial() {
-               t := c.levels[0][0]
-               db.logf("table@move L%d@%d -> L%d", c.sourceLevel, t.fd.Num, c.sourceLevel+1)
-               rec.delTable(c.sourceLevel, t.fd.Num)
-               rec.addTableFile(c.sourceLevel+1, t)
-               db.compactionCommit("table-move", rec)
-               return
-       }
-
-       var stats [2]cStatStaging
-       for i, tables := range c.levels {
-               for _, t := range tables {
-                       stats[i].read += t.size
-                       // Insert deleted tables into record
-                       rec.delTable(c.sourceLevel+i, t.fd.Num)
-               }
-       }
-       sourceSize := int(stats[0].read + stats[1].read)
-       minSeq := db.minSeq()
-       db.logf("table@compaction L%d·%d -> L%d·%d S·%s Q·%d", c.sourceLevel, len(c.levels[0]), c.sourceLevel+1, len(c.levels[1]), shortenb(sourceSize), minSeq)
-
-       b := &tableCompactionBuilder{
-               db:        db,
-               s:         db.s,
-               c:         c,
-               rec:       rec,
-               stat1:     &stats[1],
-               minSeq:    minSeq,
-               strict:    db.s.o.GetStrict(opt.StrictCompaction),
-               tableSize: db.s.o.GetCompactionTableSize(c.sourceLevel + 1),
-       }
-       db.compactionTransact("table@build", b)
-
-       // Commit.
-       stats[1].startTimer()
-       db.compactionCommit("table", rec)
-       stats[1].stopTimer()
-
-       resultSize := int(stats[1].write)
-       db.logf("table@compaction committed F%s S%s Ke·%d D·%d T·%v", sint(len(rec.addedTables)-len(rec.deletedTables)), sshortenb(resultSize-sourceSize), b.kerrCnt, b.dropCnt, stats[1].duration)
-
-       // Save compaction stats
-       for i := range stats {
-               db.compStats.addStat(c.sourceLevel+1, &stats[i])
-       }
-}
-
-func (db *DB) tableRangeCompaction(level int, umin, umax []byte) error {
-       db.logf("table@compaction range L%d %q:%q", level, umin, umax)
-       if level >= 0 {
-               if c := db.s.getCompactionRange(level, umin, umax, true); c != nil {
-                       db.tableCompaction(c, true)
-               }
-       } else {
-               // Retry until nothing to compact.
-               for {
-                       compacted := false
-
-                       // Scan for maximum level with overlapped tables.
-                       v := db.s.version()
-                       m := 1
-                       for i := m; i < len(v.levels); i++ {
-                               tables := v.levels[i]
-                               if tables.overlaps(db.s.icmp, umin, umax, false) {
-                                       m = i
-                               }
-                       }
-                       v.release()
-
-                       for level := 0; level < m; level++ {
-                               if c := db.s.getCompactionRange(level, umin, umax, false); c != nil {
-                                       db.tableCompaction(c, true)
-                                       compacted = true
-                               }
-                       }
-
-                       if !compacted {
-                               break
-                       }
-               }
-       }
-
-       return nil
-}
-
-func (db *DB) tableAutoCompaction() {
-       if c := db.s.pickCompaction(); c != nil {
-               db.tableCompaction(c, false)
-       }
-}
-
-func (db *DB) tableNeedCompaction() bool {
-       v := db.s.version()
-       defer v.release()
-       return v.needCompaction()
-}
-
-func (db *DB) pauseCompaction(ch chan<- struct{}) {
-       select {
-       case ch <- struct{}{}:
-       case <-db.closeC:
-               db.compactionExitTransact()
-       }
-}
-
-type cCmd interface {
-       ack(err error)
-}
-
-type cAuto struct {
-       ackC chan<- error
-}
-
-func (r cAuto) ack(err error) {
-       if r.ackC != nil {
-               defer func() {
-                       recover()
-               }()
-               r.ackC <- err
-       }
-}
-
-type cRange struct {
-       level    int
-       min, max []byte
-       ackC     chan<- error
-}
-
-func (r cRange) ack(err error) {
-       if r.ackC != nil {
-               defer func() {
-                       recover()
-               }()
-               r.ackC <- err
-       }
-}
-
-// This will trigger auto compaction but will not wait for it.
-func (db *DB) compTrigger(compC chan<- cCmd) {
-       select {
-       case compC <- cAuto{}:
-       default:
-       }
-}
-
-// This will trigger auto compaction and/or wait for all compaction to be done.
-func (db *DB) compTriggerWait(compC chan<- cCmd) (err error) {
-       ch := make(chan error)
-       defer close(ch)
-       // Send cmd.
-       select {
-       case compC <- cAuto{ch}:
-       case err = <-db.compErrC:
-               return
-       case <-db.closeC:
-               return ErrClosed
-       }
-       // Wait cmd.
-       select {
-       case err = <-ch:
-       case err = <-db.compErrC:
-       case <-db.closeC:
-               return ErrClosed
-       }
-       return err
-}
-
-// Send range compaction request.
-func (db *DB) compTriggerRange(compC chan<- cCmd, level int, min, max []byte) (err error) {
-       ch := make(chan error)
-       defer close(ch)
-       // Send cmd.
-       select {
-       case compC <- cRange{level, min, max, ch}:
-       case err := <-db.compErrC:
-               return err
-       case <-db.closeC:
-               return ErrClosed
-       }
-       // Wait cmd.
-       select {
-       case err = <-ch:
-       case err = <-db.compErrC:
-       case <-db.closeC:
-               return ErrClosed
-       }
-       return err
-}
-
-func (db *DB) mCompaction() {
-       var x cCmd
-
-       defer func() {
-               if x := recover(); x != nil {
-                       if x != errCompactionTransactExiting {
-                               panic(x)
-                       }
-               }
-               if x != nil {
-                       x.ack(ErrClosed)
-               }
-               db.closeW.Done()
-       }()
-
-       for {
-               select {
-               case x = <-db.mcompCmdC:
-                       switch x.(type) {
-                       case cAuto:
-                               db.memCompaction()
-                               x.ack(nil)
-                               x = nil
-                       default:
-                               panic("leveldb: unknown command")
-                       }
-               case <-db.closeC:
-                       return
-               }
-       }
-}
-
-func (db *DB) tCompaction() {
-       var x cCmd
-       var ackQ []cCmd
-
-       defer func() {
-               if x := recover(); x != nil {
-                       if x != errCompactionTransactExiting {
-                               panic(x)
-                       }
-               }
-               for i := range ackQ {
-                       ackQ[i].ack(ErrClosed)
-                       ackQ[i] = nil
-               }
-               if x != nil {
-                       x.ack(ErrClosed)
-               }
-               db.closeW.Done()
-       }()
-
-       for {
-               if db.tableNeedCompaction() {
-                       select {
-                       case x = <-db.tcompCmdC:
-                       case ch := <-db.tcompPauseC:
-                               db.pauseCompaction(ch)
-                               continue
-                       case <-db.closeC:
-                               return
-                       default:
-                       }
-               } else {
-                       for i := range ackQ {
-                               ackQ[i].ack(nil)
-                               ackQ[i] = nil
-                       }
-                       ackQ = ackQ[:0]
-                       select {
-                       case x = <-db.tcompCmdC:
-                       case ch := <-db.tcompPauseC:
-                               db.pauseCompaction(ch)
-                               continue
-                       case <-db.closeC:
-                               return
-                       }
-               }
-               if x != nil {
-                       switch cmd := x.(type) {
-                       case cAuto:
-                               ackQ = append(ackQ, x)
-                       case cRange:
-                               x.ack(db.tableRangeCompaction(cmd.level, cmd.min, cmd.max))
-                       default:
-                               panic("leveldb: unknown command")
-                       }
-                       x = nil
-               }
-               db.tableAutoCompaction()
-       }
-}