OSDN Git Service

Hulk did something
[bytom/vapor.git] / vendor / github.com / syndtr / goleveldb / leveldb / table.go
1 // Copyright (c) 2012, Suryandaru Triandana <syndtr@gmail.com>
2 // All rights reserved.
3 //
4 // Use of this source code is governed by a BSD-style license that can be
5 // found in the LICENSE file.
6
7 package leveldb
8
9 import (
10         "fmt"
11         "sort"
12         "sync/atomic"
13
14         "github.com/syndtr/goleveldb/leveldb/cache"
15         "github.com/syndtr/goleveldb/leveldb/iterator"
16         "github.com/syndtr/goleveldb/leveldb/opt"
17         "github.com/syndtr/goleveldb/leveldb/storage"
18         "github.com/syndtr/goleveldb/leveldb/table"
19         "github.com/syndtr/goleveldb/leveldb/util"
20 )
21
22 // tFile holds basic information about a table.
23 type tFile struct {
24         fd         storage.FileDesc
25         seekLeft   int32
26         size       int64
27         imin, imax internalKey
28 }
29
30 // Returns true if given key is after largest key of this table.
31 func (t *tFile) after(icmp *iComparer, ukey []byte) bool {
32         return ukey != nil && icmp.uCompare(ukey, t.imax.ukey()) > 0
33 }
34
35 // Returns true if given key is before smallest key of this table.
36 func (t *tFile) before(icmp *iComparer, ukey []byte) bool {
37         return ukey != nil && icmp.uCompare(ukey, t.imin.ukey()) < 0
38 }
39
40 // Returns true if given key range overlaps with this table key range.
41 func (t *tFile) overlaps(icmp *iComparer, umin, umax []byte) bool {
42         return !t.after(icmp, umin) && !t.before(icmp, umax)
43 }
44
45 // Cosumes one seek and return current seeks left.
46 func (t *tFile) consumeSeek() int32 {
47         return atomic.AddInt32(&t.seekLeft, -1)
48 }
49
50 // Creates new tFile.
51 func newTableFile(fd storage.FileDesc, size int64, imin, imax internalKey) *tFile {
52         f := &tFile{
53                 fd:   fd,
54                 size: size,
55                 imin: imin,
56                 imax: imax,
57         }
58
59         // We arrange to automatically compact this file after
60         // a certain number of seeks.  Let's assume:
61         //   (1) One seek costs 10ms
62         //   (2) Writing or reading 1MB costs 10ms (100MB/s)
63         //   (3) A compaction of 1MB does 25MB of IO:
64         //         1MB read from this level
65         //         10-12MB read from next level (boundaries may be misaligned)
66         //         10-12MB written to next level
67         // This implies that 25 seeks cost the same as the compaction
68         // of 1MB of data.  I.e., one seek costs approximately the
69         // same as the compaction of 40KB of data.  We are a little
70         // conservative and allow approximately one seek for every 16KB
71         // of data before triggering a compaction.
72         f.seekLeft = int32(size / 16384)
73         if f.seekLeft < 100 {
74                 f.seekLeft = 100
75         }
76
77         return f
78 }
79
80 func tableFileFromRecord(r atRecord) *tFile {
81         return newTableFile(storage.FileDesc{storage.TypeTable, r.num}, r.size, r.imin, r.imax)
82 }
83
84 // tFiles hold multiple tFile.
85 type tFiles []*tFile
86
87 func (tf tFiles) Len() int      { return len(tf) }
88 func (tf tFiles) Swap(i, j int) { tf[i], tf[j] = tf[j], tf[i] }
89
90 func (tf tFiles) nums() string {
91         x := "[ "
92         for i, f := range tf {
93                 if i != 0 {
94                         x += ", "
95                 }
96                 x += fmt.Sprint(f.fd.Num)
97         }
98         x += " ]"
99         return x
100 }
101
102 // Returns true if i smallest key is less than j.
103 // This used for sort by key in ascending order.
104 func (tf tFiles) lessByKey(icmp *iComparer, i, j int) bool {
105         a, b := tf[i], tf[j]
106         n := icmp.Compare(a.imin, b.imin)
107         if n == 0 {
108                 return a.fd.Num < b.fd.Num
109         }
110         return n < 0
111 }
112
113 // Returns true if i file number is greater than j.
114 // This used for sort by file number in descending order.
115 func (tf tFiles) lessByNum(i, j int) bool {
116         return tf[i].fd.Num > tf[j].fd.Num
117 }
118
119 // Sorts tables by key in ascending order.
120 func (tf tFiles) sortByKey(icmp *iComparer) {
121         sort.Sort(&tFilesSortByKey{tFiles: tf, icmp: icmp})
122 }
123
124 // Sorts tables by file number in descending order.
125 func (tf tFiles) sortByNum() {
126         sort.Sort(&tFilesSortByNum{tFiles: tf})
127 }
128
129 // Returns sum of all tables size.
130 func (tf tFiles) size() (sum int64) {
131         for _, t := range tf {
132                 sum += t.size
133         }
134         return sum
135 }
136
137 // Searches smallest index of tables whose its smallest
138 // key is after or equal with given key.
139 func (tf tFiles) searchMin(icmp *iComparer, ikey internalKey) int {
140         return sort.Search(len(tf), func(i int) bool {
141                 return icmp.Compare(tf[i].imin, ikey) >= 0
142         })
143 }
144
145 // Searches smallest index of tables whose its largest
146 // key is after or equal with given key.
147 func (tf tFiles) searchMax(icmp *iComparer, ikey internalKey) int {
148         return sort.Search(len(tf), func(i int) bool {
149                 return icmp.Compare(tf[i].imax, ikey) >= 0
150         })
151 }
152
153 // Returns true if given key range overlaps with one or more
154 // tables key range. If unsorted is true then binary search will not be used.
155 func (tf tFiles) overlaps(icmp *iComparer, umin, umax []byte, unsorted bool) bool {
156         if unsorted {
157                 // Check against all files.
158                 for _, t := range tf {
159                         if t.overlaps(icmp, umin, umax) {
160                                 return true
161                         }
162                 }
163                 return false
164         }
165
166         i := 0
167         if len(umin) > 0 {
168                 // Find the earliest possible internal key for min.
169                 i = tf.searchMax(icmp, makeInternalKey(nil, umin, keyMaxSeq, keyTypeSeek))
170         }
171         if i >= len(tf) {
172                 // Beginning of range is after all files, so no overlap.
173                 return false
174         }
175         return !tf[i].before(icmp, umax)
176 }
177
178 // Returns tables whose its key range overlaps with given key range.
179 // Range will be expanded if ukey found hop across tables.
180 // If overlapped is true then the search will be restarted if umax
181 // expanded.
182 // The dst content will be overwritten.
183 func (tf tFiles) getOverlaps(dst tFiles, icmp *iComparer, umin, umax []byte, overlapped bool) tFiles {
184         dst = dst[:0]
185         for i := 0; i < len(tf); {
186                 t := tf[i]
187                 if t.overlaps(icmp, umin, umax) {
188                         if umin != nil && icmp.uCompare(t.imin.ukey(), umin) < 0 {
189                                 umin = t.imin.ukey()
190                                 dst = dst[:0]
191                                 i = 0
192                                 continue
193                         } else if umax != nil && icmp.uCompare(t.imax.ukey(), umax) > 0 {
194                                 umax = t.imax.ukey()
195                                 // Restart search if it is overlapped.
196                                 if overlapped {
197                                         dst = dst[:0]
198                                         i = 0
199                                         continue
200                                 }
201                         }
202
203                         dst = append(dst, t)
204                 }
205                 i++
206         }
207
208         return dst
209 }
210
211 // Returns tables key range.
212 func (tf tFiles) getRange(icmp *iComparer) (imin, imax internalKey) {
213         for i, t := range tf {
214                 if i == 0 {
215                         imin, imax = t.imin, t.imax
216                         continue
217                 }
218                 if icmp.Compare(t.imin, imin) < 0 {
219                         imin = t.imin
220                 }
221                 if icmp.Compare(t.imax, imax) > 0 {
222                         imax = t.imax
223                 }
224         }
225
226         return
227 }
228
229 // Creates iterator index from tables.
230 func (tf tFiles) newIndexIterator(tops *tOps, icmp *iComparer, slice *util.Range, ro *opt.ReadOptions) iterator.IteratorIndexer {
231         if slice != nil {
232                 var start, limit int
233                 if slice.Start != nil {
234                         start = tf.searchMax(icmp, internalKey(slice.Start))
235                 }
236                 if slice.Limit != nil {
237                         limit = tf.searchMin(icmp, internalKey(slice.Limit))
238                 } else {
239                         limit = tf.Len()
240                 }
241                 tf = tf[start:limit]
242         }
243         return iterator.NewArrayIndexer(&tFilesArrayIndexer{
244                 tFiles: tf,
245                 tops:   tops,
246                 icmp:   icmp,
247                 slice:  slice,
248                 ro:     ro,
249         })
250 }
251
252 // Tables iterator index.
253 type tFilesArrayIndexer struct {
254         tFiles
255         tops  *tOps
256         icmp  *iComparer
257         slice *util.Range
258         ro    *opt.ReadOptions
259 }
260
261 func (a *tFilesArrayIndexer) Search(key []byte) int {
262         return a.searchMax(a.icmp, internalKey(key))
263 }
264
265 func (a *tFilesArrayIndexer) Get(i int) iterator.Iterator {
266         if i == 0 || i == a.Len()-1 {
267                 return a.tops.newIterator(a.tFiles[i], a.slice, a.ro)
268         }
269         return a.tops.newIterator(a.tFiles[i], nil, a.ro)
270 }
271
272 // Helper type for sortByKey.
273 type tFilesSortByKey struct {
274         tFiles
275         icmp *iComparer
276 }
277
278 func (x *tFilesSortByKey) Less(i, j int) bool {
279         return x.lessByKey(x.icmp, i, j)
280 }
281
282 // Helper type for sortByNum.
283 type tFilesSortByNum struct {
284         tFiles
285 }
286
287 func (x *tFilesSortByNum) Less(i, j int) bool {
288         return x.lessByNum(i, j)
289 }
290
291 // Table operations.
292 type tOps struct {
293         s      *session
294         noSync bool
295         cache  *cache.Cache
296         bcache *cache.Cache
297         bpool  *util.BufferPool
298 }
299
300 // Creates an empty table and returns table writer.
301 func (t *tOps) create() (*tWriter, error) {
302         fd := storage.FileDesc{storage.TypeTable, t.s.allocFileNum()}
303         fw, err := t.s.stor.Create(fd)
304         if err != nil {
305                 return nil, err
306         }
307         return &tWriter{
308                 t:  t,
309                 fd: fd,
310                 w:  fw,
311                 tw: table.NewWriter(fw, t.s.o.Options),
312         }, nil
313 }
314
315 // Builds table from src iterator.
316 func (t *tOps) createFrom(src iterator.Iterator) (f *tFile, n int, err error) {
317         w, err := t.create()
318         if err != nil {
319                 return
320         }
321
322         defer func() {
323                 if err != nil {
324                         w.drop()
325                 }
326         }()
327
328         for src.Next() {
329                 err = w.append(src.Key(), src.Value())
330                 if err != nil {
331                         return
332                 }
333         }
334         err = src.Error()
335         if err != nil {
336                 return
337         }
338
339         n = w.tw.EntriesLen()
340         f, err = w.finish()
341         return
342 }
343
344 // Opens table. It returns a cache handle, which should
345 // be released after use.
346 func (t *tOps) open(f *tFile) (ch *cache.Handle, err error) {
347         ch = t.cache.Get(0, uint64(f.fd.Num), func() (size int, value cache.Value) {
348                 var r storage.Reader
349                 r, err = t.s.stor.Open(f.fd)
350                 if err != nil {
351                         return 0, nil
352                 }
353
354                 var bcache *cache.NamespaceGetter
355                 if t.bcache != nil {
356                         bcache = &cache.NamespaceGetter{Cache: t.bcache, NS: uint64(f.fd.Num)}
357                 }
358
359                 var tr *table.Reader
360                 tr, err = table.NewReader(r, f.size, f.fd, bcache, t.bpool, t.s.o.Options)
361                 if err != nil {
362                         r.Close()
363                         return 0, nil
364                 }
365                 return 1, tr
366
367         })
368         if ch == nil && err == nil {
369                 err = ErrClosed
370         }
371         return
372 }
373
374 // Finds key/value pair whose key is greater than or equal to the
375 // given key.
376 func (t *tOps) find(f *tFile, key []byte, ro *opt.ReadOptions) (rkey, rvalue []byte, err error) {
377         ch, err := t.open(f)
378         if err != nil {
379                 return nil, nil, err
380         }
381         defer ch.Release()
382         return ch.Value().(*table.Reader).Find(key, true, ro)
383 }
384
385 // Finds key that is greater than or equal to the given key.
386 func (t *tOps) findKey(f *tFile, key []byte, ro *opt.ReadOptions) (rkey []byte, err error) {
387         ch, err := t.open(f)
388         if err != nil {
389                 return nil, err
390         }
391         defer ch.Release()
392         return ch.Value().(*table.Reader).FindKey(key, true, ro)
393 }
394
395 // Returns approximate offset of the given key.
396 func (t *tOps) offsetOf(f *tFile, key []byte) (offset int64, err error) {
397         ch, err := t.open(f)
398         if err != nil {
399                 return
400         }
401         defer ch.Release()
402         return ch.Value().(*table.Reader).OffsetOf(key)
403 }
404
405 // Creates an iterator from the given table.
406 func (t *tOps) newIterator(f *tFile, slice *util.Range, ro *opt.ReadOptions) iterator.Iterator {
407         ch, err := t.open(f)
408         if err != nil {
409                 return iterator.NewEmptyIterator(err)
410         }
411         iter := ch.Value().(*table.Reader).NewIterator(slice, ro)
412         iter.SetReleaser(ch)
413         return iter
414 }
415
416 // Removes table from persistent storage. It waits until
417 // no one use the the table.
418 func (t *tOps) remove(f *tFile) {
419         t.cache.Delete(0, uint64(f.fd.Num), func() {
420                 if err := t.s.stor.Remove(f.fd); err != nil {
421                         t.s.logf("table@remove removing @%d %q", f.fd.Num, err)
422                 } else {
423                         t.s.logf("table@remove removed @%d", f.fd.Num)
424                 }
425                 if t.bcache != nil {
426                         t.bcache.EvictNS(uint64(f.fd.Num))
427                 }
428         })
429 }
430
431 // Closes the table ops instance. It will close all tables,
432 // regadless still used or not.
433 func (t *tOps) close() {
434         t.bpool.Close()
435         t.cache.Close()
436         if t.bcache != nil {
437                 t.bcache.CloseWeak()
438         }
439 }
440
441 // Creates new initialized table ops instance.
442 func newTableOps(s *session) *tOps {
443         var (
444                 cacher cache.Cacher
445                 bcache *cache.Cache
446                 bpool  *util.BufferPool
447         )
448         if s.o.GetOpenFilesCacheCapacity() > 0 {
449                 cacher = cache.NewLRU(s.o.GetOpenFilesCacheCapacity())
450         }
451         if !s.o.GetDisableBlockCache() {
452                 var bcacher cache.Cacher
453                 if s.o.GetBlockCacheCapacity() > 0 {
454                         bcacher = cache.NewLRU(s.o.GetBlockCacheCapacity())
455                 }
456                 bcache = cache.NewCache(bcacher)
457         }
458         if !s.o.GetDisableBufferPool() {
459                 bpool = util.NewBufferPool(s.o.GetBlockSize() + 5)
460         }
461         return &tOps{
462                 s:      s,
463                 noSync: s.o.GetNoSync(),
464                 cache:  cache.NewCache(cacher),
465                 bcache: bcache,
466                 bpool:  bpool,
467         }
468 }
469
470 // tWriter wraps the table writer. It keep track of file descriptor
471 // and added key range.
472 type tWriter struct {
473         t *tOps
474
475         fd storage.FileDesc
476         w  storage.Writer
477         tw *table.Writer
478
479         first, last []byte
480 }
481
482 // Append key/value pair to the table.
483 func (w *tWriter) append(key, value []byte) error {
484         if w.first == nil {
485                 w.first = append([]byte{}, key...)
486         }
487         w.last = append(w.last[:0], key...)
488         return w.tw.Append(key, value)
489 }
490
491 // Returns true if the table is empty.
492 func (w *tWriter) empty() bool {
493         return w.first == nil
494 }
495
496 // Closes the storage.Writer.
497 func (w *tWriter) close() {
498         if w.w != nil {
499                 w.w.Close()
500                 w.w = nil
501         }
502 }
503
504 // Finalizes the table and returns table file.
505 func (w *tWriter) finish() (f *tFile, err error) {
506         defer w.close()
507         err = w.tw.Close()
508         if err != nil {
509                 return
510         }
511         if !w.t.noSync {
512                 err = w.w.Sync()
513                 if err != nil {
514                         return
515                 }
516         }
517         f = newTableFile(w.fd, int64(w.tw.BytesLen()), internalKey(w.first), internalKey(w.last))
518         return
519 }
520
521 // Drops the table.
522 func (w *tWriter) drop() {
523         w.close()
524         w.t.s.stor.Remove(w.fd)
525         w.t.s.reuseFileNum(w.fd.Num)
526         w.tw = nil
527         w.first = nil
528         w.last = nil
529 }