1 // Copyright (c) 2012, Suryandaru Triandana <syndtr@gmail.com>
2 // All rights reserved.
4 // Use of this source code is governed by a BSD-style license that can be
5 // found in the LICENSE file.
12 "github.com/syndtr/goleveldb/leveldb/iterator"
13 "github.com/syndtr/goleveldb/leveldb/memdb"
14 "github.com/syndtr/goleveldb/leveldb/opt"
17 func (s *session) pickMemdbLevel(umin, umax []byte, maxLevel int) int {
20 return v.pickMemdbLevel(umin, umax, maxLevel)
23 func (s *session) flushMemdb(rec *sessionRecord, mdb *memdb.DB, maxLevel int) (int, error) {
24 // Create sorted table.
25 iter := mdb.NewIterator(nil)
27 t, n, err := s.tops.createFrom(iter)
32 // Pick level other than zero can cause compaction issue with large
33 // bulk insert and delete on strictly incrementing key-space. The
34 // problem is that the small deletion markers trapped at lower level,
35 // while key/value entries keep growing at higher level. Since the
36 // key-space is strictly incrementing it will not overlaps with
37 // higher level, thus maximum possible level is always picked, while
38 // overlapping deletion marker pushed into lower level.
39 // See: https://github.com/syndtr/goleveldb/issues/127.
40 flushLevel := s.pickMemdbLevel(t.imin.ukey(), t.imax.ukey(), maxLevel)
41 rec.addTableFile(flushLevel, t)
43 s.logf("memdb@flush created L%d@%d N·%d S·%s %q:%q", flushLevel, t.fd.Num, n, shortenb(int(t.size)), t.imin, t.imax)
44 return flushLevel, nil
47 // Pick a compaction based on current state; need external synchronization.
48 func (s *session) pickCompaction() *compaction {
54 sourceLevel = v.cLevel
55 cptr := s.getCompPtr(sourceLevel)
56 tables := v.levels[sourceLevel]
57 for _, t := range tables {
58 if cptr == nil || s.icmp.Compare(t.imax, cptr) > 0 {
64 t0 = append(t0, tables[0])
67 if p := atomic.LoadPointer(&v.cSeek); p != nil {
69 sourceLevel = ts.level
70 t0 = append(t0, ts.table)
77 return newCompaction(s, v, sourceLevel, t0)
80 // Create compaction from given level and range; need external synchronization.
81 func (s *session) getCompactionRange(sourceLevel int, umin, umax []byte, noLimit bool) *compaction {
84 if sourceLevel >= len(v.levels) {
89 t0 := v.levels[sourceLevel].getOverlaps(nil, s.icmp, umin, umax, sourceLevel == 0)
95 // Avoid compacting too much in one shot in case the range is large.
96 // But we cannot do this for level-0 since level-0 files can overlap
97 // and we must not pick one file and drop another older file if the
99 if !noLimit && sourceLevel > 0 {
100 limit := int64(v.s.o.GetCompactionSourceLimit(sourceLevel))
102 for i, t := range t0 {
105 s.logf("table@compaction limiting F·%d -> F·%d", len(t0), i+1)
112 return newCompaction(s, v, sourceLevel, t0)
115 func newCompaction(s *session, v *version, sourceLevel int, t0 tFiles) *compaction {
119 sourceLevel: sourceLevel,
120 levels: [2]tFiles{t0, nil},
121 maxGPOverlaps: int64(s.o.GetCompactionGPOverlaps(sourceLevel)),
122 tPtrs: make([]int, len(v.levels)),
129 // compaction represent a compaction state.
130 type compaction struct {
141 gpOverlappedBytes int64
142 imin, imax internalKey
148 snapGPOverlappedBytes int64
152 func (c *compaction) save() {
154 c.snapSeenKey = c.seenKey
155 c.snapGPOverlappedBytes = c.gpOverlappedBytes
156 c.snapTPtrs = append(c.snapTPtrs[:0], c.tPtrs...)
159 func (c *compaction) restore() {
161 c.seenKey = c.snapSeenKey
162 c.gpOverlappedBytes = c.snapGPOverlappedBytes
163 c.tPtrs = append(c.tPtrs[:0], c.snapTPtrs...)
166 func (c *compaction) release() {
173 // Expand compacted tables; need external synchronization.
174 func (c *compaction) expand() {
175 limit := int64(c.s.o.GetCompactionExpandLimit(c.sourceLevel))
176 vt0 := c.v.levels[c.sourceLevel]
178 if level := c.sourceLevel + 1; level < len(c.v.levels) {
179 vt1 = c.v.levels[level]
182 t0, t1 := c.levels[0], c.levels[1]
183 imin, imax := t0.getRange(c.s.icmp)
184 // We expand t0 here just incase ukey hop across tables.
185 t0 = vt0.getOverlaps(t0, c.s.icmp, imin.ukey(), imax.ukey(), c.sourceLevel == 0)
186 if len(t0) != len(c.levels[0]) {
187 imin, imax = t0.getRange(c.s.icmp)
189 t1 = vt1.getOverlaps(t1, c.s.icmp, imin.ukey(), imax.ukey(), false)
190 // Get entire range covered by compaction.
191 amin, amax := append(t0, t1...).getRange(c.s.icmp)
193 // See if we can grow the number of inputs in "sourceLevel" without
194 // changing the number of "sourceLevel+1" files we pick up.
196 exp0 := vt0.getOverlaps(nil, c.s.icmp, amin.ukey(), amax.ukey(), c.sourceLevel == 0)
197 if len(exp0) > len(t0) && t1.size()+exp0.size() < limit {
198 xmin, xmax := exp0.getRange(c.s.icmp)
199 exp1 := vt1.getOverlaps(nil, c.s.icmp, xmin.ukey(), xmax.ukey(), false)
200 if len(exp1) == len(t1) {
201 c.s.logf("table@compaction expanding L%d+L%d (F·%d S·%s)+(F·%d S·%s) -> (F·%d S·%s)+(F·%d S·%s)",
202 c.sourceLevel, c.sourceLevel+1, len(t0), shortenb(int(t0.size())), len(t1), shortenb(int(t1.size())),
203 len(exp0), shortenb(int(exp0.size())), len(exp1), shortenb(int(exp1.size())))
204 imin, imax = xmin, xmax
206 amin, amax = append(t0, t1...).getRange(c.s.icmp)
211 // Compute the set of grandparent files that overlap this compaction
212 // (parent == sourceLevel+1; grandparent == sourceLevel+2)
213 if level := c.sourceLevel + 2; level < len(c.v.levels) {
214 c.gp = c.v.levels[level].getOverlaps(c.gp, c.s.icmp, amin.ukey(), amax.ukey(), false)
217 c.levels[0], c.levels[1] = t0, t1
218 c.imin, c.imax = imin, imax
221 // Check whether compaction is trivial.
222 func (c *compaction) trivial() bool {
223 return len(c.levels[0]) == 1 && len(c.levels[1]) == 0 && c.gp.size() <= c.maxGPOverlaps
226 func (c *compaction) baseLevelForKey(ukey []byte) bool {
227 for level := c.sourceLevel + 2; level < len(c.v.levels); level++ {
228 tables := c.v.levels[level]
229 for c.tPtrs[level] < len(tables) {
230 t := tables[c.tPtrs[level]]
231 if c.s.icmp.uCompare(ukey, t.imax.ukey()) <= 0 {
232 // We've advanced far enough.
233 if c.s.icmp.uCompare(ukey, t.imin.ukey()) >= 0 {
234 // Key falls in this file's range, so definitely not base level.
245 func (c *compaction) shouldStopBefore(ikey internalKey) bool {
246 for ; c.gpi < len(c.gp); c.gpi++ {
248 if c.s.icmp.Compare(ikey, gp.imax) <= 0 {
252 c.gpOverlappedBytes += gp.size
257 if c.gpOverlappedBytes > c.maxGPOverlaps {
258 // Too much overlap for current output; start new output.
259 c.gpOverlappedBytes = 0
265 // Creates an iterator.
266 func (c *compaction) newIterator() iterator.Iterator {
267 // Creates iterator slice.
268 icap := len(c.levels)
269 if c.sourceLevel == 0 {
270 // Special case for level-0.
271 icap = len(c.levels[0]) + 1
273 its := make([]iterator.Iterator, 0, icap)
276 ro := &opt.ReadOptions{
278 Strict: opt.StrictOverride,
280 strict := c.s.o.GetStrict(opt.StrictCompaction)
282 ro.Strict |= opt.StrictReader
285 for i, tables := range c.levels {
286 if len(tables) == 0 {
290 // Level-0 is not sorted and may overlaps each other.
291 if c.sourceLevel+i == 0 {
292 for _, t := range tables {
293 its = append(its, c.s.tops.newIterator(t, nil, ro))
296 it := iterator.NewIndexedIterator(tables.newIndexIterator(c.s.tops, c.s.icmp, nil, ro), strict)
297 its = append(its, it)
301 return iterator.NewMergedIterator(its, c.s.icmp, strict)