OSDN Git Service

Hulk did something
[bytom/vapor.git] / vendor / github.com / syndtr / goleveldb / leveldb / session_compaction.go
diff --git a/vendor/github.com/syndtr/goleveldb/leveldb/session_compaction.go b/vendor/github.com/syndtr/goleveldb/leveldb/session_compaction.go
new file mode 100644 (file)
index 0000000..089cd00
--- /dev/null
@@ -0,0 +1,302 @@
+// Copyright (c) 2012, Suryandaru Triandana <syndtr@gmail.com>
+// All rights reserved.
+//
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+package leveldb
+
+import (
+       "sync/atomic"
+
+       "github.com/syndtr/goleveldb/leveldb/iterator"
+       "github.com/syndtr/goleveldb/leveldb/memdb"
+       "github.com/syndtr/goleveldb/leveldb/opt"
+)
+
+func (s *session) pickMemdbLevel(umin, umax []byte, maxLevel int) int {
+       v := s.version()
+       defer v.release()
+       return v.pickMemdbLevel(umin, umax, maxLevel)
+}
+
+func (s *session) flushMemdb(rec *sessionRecord, mdb *memdb.DB, maxLevel int) (int, error) {
+       // Create sorted table.
+       iter := mdb.NewIterator(nil)
+       defer iter.Release()
+       t, n, err := s.tops.createFrom(iter)
+       if err != nil {
+               return 0, err
+       }
+
+       // Pick level other than zero can cause compaction issue with large
+       // bulk insert and delete on strictly incrementing key-space. The
+       // problem is that the small deletion markers trapped at lower level,
+       // while key/value entries keep growing at higher level. Since the
+       // key-space is strictly incrementing it will not overlaps with
+       // higher level, thus maximum possible level is always picked, while
+       // overlapping deletion marker pushed into lower level.
+       // See: https://github.com/syndtr/goleveldb/issues/127.
+       flushLevel := s.pickMemdbLevel(t.imin.ukey(), t.imax.ukey(), maxLevel)
+       rec.addTableFile(flushLevel, t)
+
+       s.logf("memdb@flush created L%d@%d N·%d S·%s %q:%q", flushLevel, t.fd.Num, n, shortenb(int(t.size)), t.imin, t.imax)
+       return flushLevel, nil
+}
+
+// Pick a compaction based on current state; need external synchronization.
+func (s *session) pickCompaction() *compaction {
+       v := s.version()
+
+       var sourceLevel int
+       var t0 tFiles
+       if v.cScore >= 1 {
+               sourceLevel = v.cLevel
+               cptr := s.getCompPtr(sourceLevel)
+               tables := v.levels[sourceLevel]
+               for _, t := range tables {
+                       if cptr == nil || s.icmp.Compare(t.imax, cptr) > 0 {
+                               t0 = append(t0, t)
+                               break
+                       }
+               }
+               if len(t0) == 0 {
+                       t0 = append(t0, tables[0])
+               }
+       } else {
+               if p := atomic.LoadPointer(&v.cSeek); p != nil {
+                       ts := (*tSet)(p)
+                       sourceLevel = ts.level
+                       t0 = append(t0, ts.table)
+               } else {
+                       v.release()
+                       return nil
+               }
+       }
+
+       return newCompaction(s, v, sourceLevel, t0)
+}
+
+// Create compaction from given level and range; need external synchronization.
+func (s *session) getCompactionRange(sourceLevel int, umin, umax []byte, noLimit bool) *compaction {
+       v := s.version()
+
+       if sourceLevel >= len(v.levels) {
+               v.release()
+               return nil
+       }
+
+       t0 := v.levels[sourceLevel].getOverlaps(nil, s.icmp, umin, umax, sourceLevel == 0)
+       if len(t0) == 0 {
+               v.release()
+               return nil
+       }
+
+       // Avoid compacting too much in one shot in case the range is large.
+       // But we cannot do this for level-0 since level-0 files can overlap
+       // and we must not pick one file and drop another older file if the
+       // two files overlap.
+       if !noLimit && sourceLevel > 0 {
+               limit := int64(v.s.o.GetCompactionSourceLimit(sourceLevel))
+               total := int64(0)
+               for i, t := range t0 {
+                       total += t.size
+                       if total >= limit {
+                               s.logf("table@compaction limiting F·%d -> F·%d", len(t0), i+1)
+                               t0 = t0[:i+1]
+                               break
+                       }
+               }
+       }
+
+       return newCompaction(s, v, sourceLevel, t0)
+}
+
+func newCompaction(s *session, v *version, sourceLevel int, t0 tFiles) *compaction {
+       c := &compaction{
+               s:             s,
+               v:             v,
+               sourceLevel:   sourceLevel,
+               levels:        [2]tFiles{t0, nil},
+               maxGPOverlaps: int64(s.o.GetCompactionGPOverlaps(sourceLevel)),
+               tPtrs:         make([]int, len(v.levels)),
+       }
+       c.expand()
+       c.save()
+       return c
+}
+
+// compaction represent a compaction state.
+type compaction struct {
+       s *session
+       v *version
+
+       sourceLevel   int
+       levels        [2]tFiles
+       maxGPOverlaps int64
+
+       gp                tFiles
+       gpi               int
+       seenKey           bool
+       gpOverlappedBytes int64
+       imin, imax        internalKey
+       tPtrs             []int
+       released          bool
+
+       snapGPI               int
+       snapSeenKey           bool
+       snapGPOverlappedBytes int64
+       snapTPtrs             []int
+}
+
+func (c *compaction) save() {
+       c.snapGPI = c.gpi
+       c.snapSeenKey = c.seenKey
+       c.snapGPOverlappedBytes = c.gpOverlappedBytes
+       c.snapTPtrs = append(c.snapTPtrs[:0], c.tPtrs...)
+}
+
+func (c *compaction) restore() {
+       c.gpi = c.snapGPI
+       c.seenKey = c.snapSeenKey
+       c.gpOverlappedBytes = c.snapGPOverlappedBytes
+       c.tPtrs = append(c.tPtrs[:0], c.snapTPtrs...)
+}
+
+func (c *compaction) release() {
+       if !c.released {
+               c.released = true
+               c.v.release()
+       }
+}
+
+// Expand compacted tables; need external synchronization.
+func (c *compaction) expand() {
+       limit := int64(c.s.o.GetCompactionExpandLimit(c.sourceLevel))
+       vt0 := c.v.levels[c.sourceLevel]
+       vt1 := tFiles{}
+       if level := c.sourceLevel + 1; level < len(c.v.levels) {
+               vt1 = c.v.levels[level]
+       }
+
+       t0, t1 := c.levels[0], c.levels[1]
+       imin, imax := t0.getRange(c.s.icmp)
+       // We expand t0 here just incase ukey hop across tables.
+       t0 = vt0.getOverlaps(t0, c.s.icmp, imin.ukey(), imax.ukey(), c.sourceLevel == 0)
+       if len(t0) != len(c.levels[0]) {
+               imin, imax = t0.getRange(c.s.icmp)
+       }
+       t1 = vt1.getOverlaps(t1, c.s.icmp, imin.ukey(), imax.ukey(), false)
+       // Get entire range covered by compaction.
+       amin, amax := append(t0, t1...).getRange(c.s.icmp)
+
+       // See if we can grow the number of inputs in "sourceLevel" without
+       // changing the number of "sourceLevel+1" files we pick up.
+       if len(t1) > 0 {
+               exp0 := vt0.getOverlaps(nil, c.s.icmp, amin.ukey(), amax.ukey(), c.sourceLevel == 0)
+               if len(exp0) > len(t0) && t1.size()+exp0.size() < limit {
+                       xmin, xmax := exp0.getRange(c.s.icmp)
+                       exp1 := vt1.getOverlaps(nil, c.s.icmp, xmin.ukey(), xmax.ukey(), false)
+                       if len(exp1) == len(t1) {
+                               c.s.logf("table@compaction expanding L%d+L%d (F·%d S·%s)+(F·%d S·%s) -> (F·%d S·%s)+(F·%d S·%s)",
+                                       c.sourceLevel, c.sourceLevel+1, len(t0), shortenb(int(t0.size())), len(t1), shortenb(int(t1.size())),
+                                       len(exp0), shortenb(int(exp0.size())), len(exp1), shortenb(int(exp1.size())))
+                               imin, imax = xmin, xmax
+                               t0, t1 = exp0, exp1
+                               amin, amax = append(t0, t1...).getRange(c.s.icmp)
+                       }
+               }
+       }
+
+       // Compute the set of grandparent files that overlap this compaction
+       // (parent == sourceLevel+1; grandparent == sourceLevel+2)
+       if level := c.sourceLevel + 2; level < len(c.v.levels) {
+               c.gp = c.v.levels[level].getOverlaps(c.gp, c.s.icmp, amin.ukey(), amax.ukey(), false)
+       }
+
+       c.levels[0], c.levels[1] = t0, t1
+       c.imin, c.imax = imin, imax
+}
+
+// Check whether compaction is trivial.
+func (c *compaction) trivial() bool {
+       return len(c.levels[0]) == 1 && len(c.levels[1]) == 0 && c.gp.size() <= c.maxGPOverlaps
+}
+
+func (c *compaction) baseLevelForKey(ukey []byte) bool {
+       for level := c.sourceLevel + 2; level < len(c.v.levels); level++ {
+               tables := c.v.levels[level]
+               for c.tPtrs[level] < len(tables) {
+                       t := tables[c.tPtrs[level]]
+                       if c.s.icmp.uCompare(ukey, t.imax.ukey()) <= 0 {
+                               // We've advanced far enough.
+                               if c.s.icmp.uCompare(ukey, t.imin.ukey()) >= 0 {
+                                       // Key falls in this file's range, so definitely not base level.
+                                       return false
+                               }
+                               break
+                       }
+                       c.tPtrs[level]++
+               }
+       }
+       return true
+}
+
+func (c *compaction) shouldStopBefore(ikey internalKey) bool {
+       for ; c.gpi < len(c.gp); c.gpi++ {
+               gp := c.gp[c.gpi]
+               if c.s.icmp.Compare(ikey, gp.imax) <= 0 {
+                       break
+               }
+               if c.seenKey {
+                       c.gpOverlappedBytes += gp.size
+               }
+       }
+       c.seenKey = true
+
+       if c.gpOverlappedBytes > c.maxGPOverlaps {
+               // Too much overlap for current output; start new output.
+               c.gpOverlappedBytes = 0
+               return true
+       }
+       return false
+}
+
+// Creates an iterator.
+func (c *compaction) newIterator() iterator.Iterator {
+       // Creates iterator slice.
+       icap := len(c.levels)
+       if c.sourceLevel == 0 {
+               // Special case for level-0.
+               icap = len(c.levels[0]) + 1
+       }
+       its := make([]iterator.Iterator, 0, icap)
+
+       // Options.
+       ro := &opt.ReadOptions{
+               DontFillCache: true,
+               Strict:        opt.StrictOverride,
+       }
+       strict := c.s.o.GetStrict(opt.StrictCompaction)
+       if strict {
+               ro.Strict |= opt.StrictReader
+       }
+
+       for i, tables := range c.levels {
+               if len(tables) == 0 {
+                       continue
+               }
+
+               // Level-0 is not sorted and may overlaps each other.
+               if c.sourceLevel+i == 0 {
+                       for _, t := range tables {
+                               its = append(its, c.s.tops.newIterator(t, nil, ro))
+                       }
+               } else {
+                       it := iterator.NewIndexedIterator(tables.newIndexIterator(c.s.tops, c.s.icmp, nil, ro), strict)
+                       its = append(its, it)
+               }
+       }
+
+       return iterator.NewMergedIterator(its, c.s.icmp, strict)
+}