OSDN Git Service

Hulk did something
[bytom/vapor.git] / vendor / github.com / syndtr / goleveldb / leveldb / session_compaction.go
1 // Copyright (c) 2012, Suryandaru Triandana <syndtr@gmail.com>
2 // All rights reserved.
3 //
4 // Use of this source code is governed by a BSD-style license that can be
5 // found in the LICENSE file.
6
7 package leveldb
8
9 import (
10         "sync/atomic"
11
12         "github.com/syndtr/goleveldb/leveldb/iterator"
13         "github.com/syndtr/goleveldb/leveldb/memdb"
14         "github.com/syndtr/goleveldb/leveldb/opt"
15 )
16
17 func (s *session) pickMemdbLevel(umin, umax []byte, maxLevel int) int {
18         v := s.version()
19         defer v.release()
20         return v.pickMemdbLevel(umin, umax, maxLevel)
21 }
22
23 func (s *session) flushMemdb(rec *sessionRecord, mdb *memdb.DB, maxLevel int) (int, error) {
24         // Create sorted table.
25         iter := mdb.NewIterator(nil)
26         defer iter.Release()
27         t, n, err := s.tops.createFrom(iter)
28         if err != nil {
29                 return 0, err
30         }
31
32         // Pick level other than zero can cause compaction issue with large
33         // bulk insert and delete on strictly incrementing key-space. The
34         // problem is that the small deletion markers trapped at lower level,
35         // while key/value entries keep growing at higher level. Since the
36         // key-space is strictly incrementing it will not overlaps with
37         // higher level, thus maximum possible level is always picked, while
38         // overlapping deletion marker pushed into lower level.
39         // See: https://github.com/syndtr/goleveldb/issues/127.
40         flushLevel := s.pickMemdbLevel(t.imin.ukey(), t.imax.ukey(), maxLevel)
41         rec.addTableFile(flushLevel, t)
42
43         s.logf("memdb@flush created L%d@%d N·%d S·%s %q:%q", flushLevel, t.fd.Num, n, shortenb(int(t.size)), t.imin, t.imax)
44         return flushLevel, nil
45 }
46
47 // Pick a compaction based on current state; need external synchronization.
48 func (s *session) pickCompaction() *compaction {
49         v := s.version()
50
51         var sourceLevel int
52         var t0 tFiles
53         if v.cScore >= 1 {
54                 sourceLevel = v.cLevel
55                 cptr := s.getCompPtr(sourceLevel)
56                 tables := v.levels[sourceLevel]
57                 for _, t := range tables {
58                         if cptr == nil || s.icmp.Compare(t.imax, cptr) > 0 {
59                                 t0 = append(t0, t)
60                                 break
61                         }
62                 }
63                 if len(t0) == 0 {
64                         t0 = append(t0, tables[0])
65                 }
66         } else {
67                 if p := atomic.LoadPointer(&v.cSeek); p != nil {
68                         ts := (*tSet)(p)
69                         sourceLevel = ts.level
70                         t0 = append(t0, ts.table)
71                 } else {
72                         v.release()
73                         return nil
74                 }
75         }
76
77         return newCompaction(s, v, sourceLevel, t0)
78 }
79
80 // Create compaction from given level and range; need external synchronization.
81 func (s *session) getCompactionRange(sourceLevel int, umin, umax []byte, noLimit bool) *compaction {
82         v := s.version()
83
84         if sourceLevel >= len(v.levels) {
85                 v.release()
86                 return nil
87         }
88
89         t0 := v.levels[sourceLevel].getOverlaps(nil, s.icmp, umin, umax, sourceLevel == 0)
90         if len(t0) == 0 {
91                 v.release()
92                 return nil
93         }
94
95         // Avoid compacting too much in one shot in case the range is large.
96         // But we cannot do this for level-0 since level-0 files can overlap
97         // and we must not pick one file and drop another older file if the
98         // two files overlap.
99         if !noLimit && sourceLevel > 0 {
100                 limit := int64(v.s.o.GetCompactionSourceLimit(sourceLevel))
101                 total := int64(0)
102                 for i, t := range t0 {
103                         total += t.size
104                         if total >= limit {
105                                 s.logf("table@compaction limiting F·%d -> F·%d", len(t0), i+1)
106                                 t0 = t0[:i+1]
107                                 break
108                         }
109                 }
110         }
111
112         return newCompaction(s, v, sourceLevel, t0)
113 }
114
115 func newCompaction(s *session, v *version, sourceLevel int, t0 tFiles) *compaction {
116         c := &compaction{
117                 s:             s,
118                 v:             v,
119                 sourceLevel:   sourceLevel,
120                 levels:        [2]tFiles{t0, nil},
121                 maxGPOverlaps: int64(s.o.GetCompactionGPOverlaps(sourceLevel)),
122                 tPtrs:         make([]int, len(v.levels)),
123         }
124         c.expand()
125         c.save()
126         return c
127 }
128
129 // compaction represent a compaction state.
130 type compaction struct {
131         s *session
132         v *version
133
134         sourceLevel   int
135         levels        [2]tFiles
136         maxGPOverlaps int64
137
138         gp                tFiles
139         gpi               int
140         seenKey           bool
141         gpOverlappedBytes int64
142         imin, imax        internalKey
143         tPtrs             []int
144         released          bool
145
146         snapGPI               int
147         snapSeenKey           bool
148         snapGPOverlappedBytes int64
149         snapTPtrs             []int
150 }
151
152 func (c *compaction) save() {
153         c.snapGPI = c.gpi
154         c.snapSeenKey = c.seenKey
155         c.snapGPOverlappedBytes = c.gpOverlappedBytes
156         c.snapTPtrs = append(c.snapTPtrs[:0], c.tPtrs...)
157 }
158
159 func (c *compaction) restore() {
160         c.gpi = c.snapGPI
161         c.seenKey = c.snapSeenKey
162         c.gpOverlappedBytes = c.snapGPOverlappedBytes
163         c.tPtrs = append(c.tPtrs[:0], c.snapTPtrs...)
164 }
165
166 func (c *compaction) release() {
167         if !c.released {
168                 c.released = true
169                 c.v.release()
170         }
171 }
172
173 // Expand compacted tables; need external synchronization.
174 func (c *compaction) expand() {
175         limit := int64(c.s.o.GetCompactionExpandLimit(c.sourceLevel))
176         vt0 := c.v.levels[c.sourceLevel]
177         vt1 := tFiles{}
178         if level := c.sourceLevel + 1; level < len(c.v.levels) {
179                 vt1 = c.v.levels[level]
180         }
181
182         t0, t1 := c.levels[0], c.levels[1]
183         imin, imax := t0.getRange(c.s.icmp)
184         // We expand t0 here just incase ukey hop across tables.
185         t0 = vt0.getOverlaps(t0, c.s.icmp, imin.ukey(), imax.ukey(), c.sourceLevel == 0)
186         if len(t0) != len(c.levels[0]) {
187                 imin, imax = t0.getRange(c.s.icmp)
188         }
189         t1 = vt1.getOverlaps(t1, c.s.icmp, imin.ukey(), imax.ukey(), false)
190         // Get entire range covered by compaction.
191         amin, amax := append(t0, t1...).getRange(c.s.icmp)
192
193         // See if we can grow the number of inputs in "sourceLevel" without
194         // changing the number of "sourceLevel+1" files we pick up.
195         if len(t1) > 0 {
196                 exp0 := vt0.getOverlaps(nil, c.s.icmp, amin.ukey(), amax.ukey(), c.sourceLevel == 0)
197                 if len(exp0) > len(t0) && t1.size()+exp0.size() < limit {
198                         xmin, xmax := exp0.getRange(c.s.icmp)
199                         exp1 := vt1.getOverlaps(nil, c.s.icmp, xmin.ukey(), xmax.ukey(), false)
200                         if len(exp1) == len(t1) {
201                                 c.s.logf("table@compaction expanding L%d+L%d (F·%d S·%s)+(F·%d S·%s) -> (F·%d S·%s)+(F·%d S·%s)",
202                                         c.sourceLevel, c.sourceLevel+1, len(t0), shortenb(int(t0.size())), len(t1), shortenb(int(t1.size())),
203                                         len(exp0), shortenb(int(exp0.size())), len(exp1), shortenb(int(exp1.size())))
204                                 imin, imax = xmin, xmax
205                                 t0, t1 = exp0, exp1
206                                 amin, amax = append(t0, t1...).getRange(c.s.icmp)
207                         }
208                 }
209         }
210
211         // Compute the set of grandparent files that overlap this compaction
212         // (parent == sourceLevel+1; grandparent == sourceLevel+2)
213         if level := c.sourceLevel + 2; level < len(c.v.levels) {
214                 c.gp = c.v.levels[level].getOverlaps(c.gp, c.s.icmp, amin.ukey(), amax.ukey(), false)
215         }
216
217         c.levels[0], c.levels[1] = t0, t1
218         c.imin, c.imax = imin, imax
219 }
220
221 // Check whether compaction is trivial.
222 func (c *compaction) trivial() bool {
223         return len(c.levels[0]) == 1 && len(c.levels[1]) == 0 && c.gp.size() <= c.maxGPOverlaps
224 }
225
226 func (c *compaction) baseLevelForKey(ukey []byte) bool {
227         for level := c.sourceLevel + 2; level < len(c.v.levels); level++ {
228                 tables := c.v.levels[level]
229                 for c.tPtrs[level] < len(tables) {
230                         t := tables[c.tPtrs[level]]
231                         if c.s.icmp.uCompare(ukey, t.imax.ukey()) <= 0 {
232                                 // We've advanced far enough.
233                                 if c.s.icmp.uCompare(ukey, t.imin.ukey()) >= 0 {
234                                         // Key falls in this file's range, so definitely not base level.
235                                         return false
236                                 }
237                                 break
238                         }
239                         c.tPtrs[level]++
240                 }
241         }
242         return true
243 }
244
245 func (c *compaction) shouldStopBefore(ikey internalKey) bool {
246         for ; c.gpi < len(c.gp); c.gpi++ {
247                 gp := c.gp[c.gpi]
248                 if c.s.icmp.Compare(ikey, gp.imax) <= 0 {
249                         break
250                 }
251                 if c.seenKey {
252                         c.gpOverlappedBytes += gp.size
253                 }
254         }
255         c.seenKey = true
256
257         if c.gpOverlappedBytes > c.maxGPOverlaps {
258                 // Too much overlap for current output; start new output.
259                 c.gpOverlappedBytes = 0
260                 return true
261         }
262         return false
263 }
264
265 // Creates an iterator.
266 func (c *compaction) newIterator() iterator.Iterator {
267         // Creates iterator slice.
268         icap := len(c.levels)
269         if c.sourceLevel == 0 {
270                 // Special case for level-0.
271                 icap = len(c.levels[0]) + 1
272         }
273         its := make([]iterator.Iterator, 0, icap)
274
275         // Options.
276         ro := &opt.ReadOptions{
277                 DontFillCache: true,
278                 Strict:        opt.StrictOverride,
279         }
280         strict := c.s.o.GetStrict(opt.StrictCompaction)
281         if strict {
282                 ro.Strict |= opt.StrictReader
283         }
284
285         for i, tables := range c.levels {
286                 if len(tables) == 0 {
287                         continue
288                 }
289
290                 // Level-0 is not sorted and may overlaps each other.
291                 if c.sourceLevel+i == 0 {
292                         for _, t := range tables {
293                                 its = append(its, c.s.tops.newIterator(t, nil, ro))
294                         }
295                 } else {
296                         it := iterator.NewIndexedIterator(tables.newIndexIterator(c.s.tops, c.s.icmp, nil, ro), strict)
297                         its = append(its, it)
298                 }
299         }
300
301         return iterator.NewMergedIterator(its, c.s.icmp, strict)
302 }