1 // Copyright (c) 2012, Suryandaru Triandana <syndtr@gmail.com>
2 // All rights reserved.
4 // Use of this source code is governed by a BSD-style license that can be
5 // found in the LICENSE file.
17 "github.com/golang/snappy"
19 "github.com/syndtr/goleveldb/leveldb/cache"
20 "github.com/syndtr/goleveldb/leveldb/comparer"
21 "github.com/syndtr/goleveldb/leveldb/errors"
22 "github.com/syndtr/goleveldb/leveldb/filter"
23 "github.com/syndtr/goleveldb/leveldb/iterator"
24 "github.com/syndtr/goleveldb/leveldb/opt"
25 "github.com/syndtr/goleveldb/leveldb/storage"
26 "github.com/syndtr/goleveldb/leveldb/util"
31 ErrNotFound = errors.ErrNotFound
32 ErrReaderReleased = errors.New("leveldb/table: reader released")
33 ErrIterReleased = errors.New("leveldb/table: iterator released")
36 // ErrCorrupted describes error due to corruption. This error will be wrapped
37 // with errors.ErrCorrupted.
38 type ErrCorrupted struct {
45 func (e *ErrCorrupted) Error() string {
46 return fmt.Sprintf("leveldb/table: corruption on %s (pos=%d): %s", e.Kind, e.Pos, e.Reason)
49 func max(x, y int) int {
57 bpool *util.BufferPool
64 func (b *block) seek(cmp comparer.Comparer, rstart, rlimit int, key []byte) (index, offset int, err error) {
65 index = sort.Search(b.restartsLen-rstart-(b.restartsLen-rlimit), func(i int) bool {
66 offset := int(binary.LittleEndian.Uint32(b.data[b.restartsOffset+4*(rstart+i):]))
67 offset++ // shared always zero, since this is a restart point
68 v1, n1 := binary.Uvarint(b.data[offset:]) // key length
69 _, n2 := binary.Uvarint(b.data[offset+n1:]) // value length
71 return cmp.Compare(b.data[m:m+int(v1)], key) > 0
74 // The smallest key is greater-than key sought.
77 offset = int(binary.LittleEndian.Uint32(b.data[b.restartsOffset+4*index:]))
81 func (b *block) restartIndex(rstart, rlimit, offset int) int {
82 return sort.Search(b.restartsLen-rstart-(b.restartsLen-rlimit), func(i int) bool {
83 return int(binary.LittleEndian.Uint32(b.data[b.restartsOffset+4*(rstart+i):])) > offset
87 func (b *block) restartOffset(index int) int {
88 return int(binary.LittleEndian.Uint32(b.data[b.restartsOffset+4*index:]))
91 func (b *block) entry(offset int) (key, value []byte, nShared, n int, err error) {
92 if offset >= b.restartsOffset {
93 if offset != b.restartsOffset {
94 err = &ErrCorrupted{Reason: "entries offset not aligned"}
98 v0, n0 := binary.Uvarint(b.data[offset:]) // Shared prefix length
99 v1, n1 := binary.Uvarint(b.data[offset+n0:]) // Key length
100 v2, n2 := binary.Uvarint(b.data[offset+n0+n1:]) // Value length
102 n = m + int(v1) + int(v2)
103 if n0 <= 0 || n1 <= 0 || n2 <= 0 || offset+n > b.restartsOffset {
104 err = &ErrCorrupted{Reason: "entries corrupted"}
107 key = b.data[offset+m : offset+m+int(v1)]
108 value = b.data[offset+m+int(v1) : offset+n]
113 func (b *block) Release() {
122 dirReleased dir = iota - 1
129 type blockIter struct {
132 blockReleaser util.Releaser
133 releaser util.Releaser
136 // Previous offset, only filled by Next.
141 // Iterator direction.
143 // Restart index slice range.
146 // Offset slice range.
154 func (i *blockIter) sErr(err error) {
162 func (i *blockIter) reset() {
163 if i.dir == dirBackward {
164 i.prevNode = i.prevNode[:0]
165 i.prevKeys = i.prevKeys[:0]
167 i.restartIndex = i.riStart
168 i.offset = i.offsetStart
174 func (i *blockIter) isFirst() bool {
177 return i.prevOffset == i.offsetRealStart
179 return len(i.prevNode) == 1 && i.restartIndex == i.riStart
184 func (i *blockIter) isLast() bool {
186 case dirForward, dirBackward:
187 return i.offset == i.offsetLimit
192 func (i *blockIter) First() bool {
195 } else if i.dir == dirReleased {
196 i.err = ErrIterReleased
200 if i.dir == dirBackward {
201 i.prevNode = i.prevNode[:0]
202 i.prevKeys = i.prevKeys[:0]
208 func (i *blockIter) Last() bool {
211 } else if i.dir == dirReleased {
212 i.err = ErrIterReleased
216 if i.dir == dirBackward {
217 i.prevNode = i.prevNode[:0]
218 i.prevKeys = i.prevKeys[:0]
224 func (i *blockIter) Seek(key []byte) bool {
227 } else if i.dir == dirReleased {
228 i.err = ErrIterReleased
232 ri, offset, err := i.block.seek(i.tr.cmp, i.riStart, i.riLimit, key)
238 i.offset = max(i.offsetStart, offset)
239 if i.dir == dirSOI || i.dir == dirEOI {
243 if i.tr.cmp.Compare(i.key, key) >= 0 {
250 func (i *blockIter) Next() bool {
251 if i.dir == dirEOI || i.err != nil {
253 } else if i.dir == dirReleased {
254 i.err = ErrIterReleased
259 i.restartIndex = i.riStart
260 i.offset = i.offsetStart
261 } else if i.dir == dirBackward {
262 i.prevNode = i.prevNode[:0]
263 i.prevKeys = i.prevKeys[:0]
265 for i.offset < i.offsetRealStart {
266 key, value, nShared, n, err := i.block.entry(i.offset)
268 i.sErr(i.tr.fixErrCorruptedBH(i.block.bh, err))
275 i.key = append(i.key[:nShared], key...)
279 if i.offset >= i.offsetLimit {
281 if i.offset != i.offsetLimit {
282 i.sErr(i.tr.newErrCorruptedBH(i.block.bh, "entries offset not aligned"))
286 key, value, nShared, n, err := i.block.entry(i.offset)
288 i.sErr(i.tr.fixErrCorruptedBH(i.block.bh, err))
295 i.key = append(i.key[:nShared], key...)
297 i.prevOffset = i.offset
303 func (i *blockIter) Prev() bool {
304 if i.dir == dirSOI || i.err != nil {
306 } else if i.dir == dirReleased {
307 i.err = ErrIterReleased
312 if i.dir == dirForward {
314 i.offset = i.prevOffset
315 if i.offset == i.offsetRealStart {
319 ri = i.block.restartIndex(i.restartIndex, i.riLimit, i.offset)
321 } else if i.dir == dirEOI {
322 // At the end of iterator.
323 i.restartIndex = i.riLimit
324 i.offset = i.offsetLimit
325 if i.offset == i.offsetRealStart {
331 } else if len(i.prevNode) == 1 {
332 // This is the end of a restart range.
333 i.offset = i.prevNode[0]
334 i.prevNode = i.prevNode[:0]
335 if i.restartIndex == i.riStart {
342 // In the middle of restart range, get from cache.
343 n := len(i.prevNode) - 3
344 node := i.prevNode[n:]
345 i.prevNode = i.prevNode[:n]
348 i.key = append(i.key[:0], i.prevKeys[ko:]...)
349 i.prevKeys = i.prevKeys[:ko]
353 i.value = i.block.data[vo:vl]
357 // Build entries cache.
360 offset := i.block.restartOffset(ri)
361 if offset == i.offset {
367 offset = i.block.restartOffset(ri)
369 i.prevNode = append(i.prevNode, offset)
371 key, value, nShared, n, err := i.block.entry(offset)
373 i.sErr(i.tr.fixErrCorruptedBH(i.block.bh, err))
376 if offset >= i.offsetRealStart {
378 // Appends 3 variables:
379 // 1. Previous keys offset
380 // 2. Value offset in the data block
382 i.prevNode = append(i.prevNode, len(i.prevKeys), offset-len(i.value), len(i.value))
383 i.prevKeys = append(i.prevKeys, i.key...)
387 i.key = append(i.key[:nShared], key...)
389 // Stop if target offset reached.
390 if offset >= i.offset {
391 if offset != i.offset {
392 i.sErr(i.tr.newErrCorruptedBH(i.block.bh, "entries offset not aligned"))
404 func (i *blockIter) Key() []byte {
405 if i.err != nil || i.dir <= dirEOI {
411 func (i *blockIter) Value() []byte {
412 if i.err != nil || i.dir <= dirEOI {
418 func (i *blockIter) Release() {
419 if i.dir != dirReleased {
427 if i.blockReleaser != nil {
428 i.blockReleaser.Release()
429 i.blockReleaser = nil
431 if i.releaser != nil {
438 func (i *blockIter) SetReleaser(releaser util.Releaser) {
439 if i.dir == dirReleased {
440 panic(util.ErrReleased)
442 if i.releaser != nil && releaser != nil {
443 panic(util.ErrHasReleaser)
445 i.releaser = releaser
448 func (i *blockIter) Valid() bool {
449 return i.err == nil && (i.dir == dirBackward || i.dir == dirForward)
452 func (i *blockIter) Error() error {
456 type filterBlock struct {
457 bpool *util.BufferPool
464 func (b *filterBlock) contains(filter filter.Filter, offset uint64, key []byte) bool {
465 i := int(offset >> b.baseLg)
466 if i < b.filtersNum {
467 o := b.data[b.oOffset+i*4:]
468 n := int(binary.LittleEndian.Uint32(o))
469 m := int(binary.LittleEndian.Uint32(o[4:]))
470 if n < m && m <= b.oOffset {
471 return filter.Contains(b.data[n:m], key)
479 func (b *filterBlock) Release() {
485 type indexIter struct {
493 func (i *indexIter) Get() iterator.Iterator {
498 dataBH, n := decodeBlockHandle(value)
500 return iterator.NewEmptyIterator(i.tr.newErrCorruptedBH(i.tr.indexBH, "bad data block handle"))
503 var slice *util.Range
504 if i.slice != nil && (i.blockIter.isFirst() || i.blockIter.isLast()) {
507 return i.tr.getDataIterErr(dataBH, slice, i.tr.verifyChecksum, i.fillCache)
510 // Reader is a table reader.
515 cache *cache.NamespaceGetter
517 bpool *util.BufferPool
520 cmp comparer.Comparer
525 metaBH, indexBH, filterBH blockHandle
527 filterBlock *filterBlock
530 func (r *Reader) blockKind(bh blockHandle) string {
532 case r.metaBH.offset:
534 case r.indexBH.offset:
536 case r.filterBH.offset:
537 if r.filterBH.length > 0 {
538 return "filter-block"
544 func (r *Reader) newErrCorrupted(pos, size int64, kind, reason string) error {
545 return &errors.ErrCorrupted{Fd: r.fd, Err: &ErrCorrupted{Pos: pos, Size: size, Kind: kind, Reason: reason}}
548 func (r *Reader) newErrCorruptedBH(bh blockHandle, reason string) error {
549 return r.newErrCorrupted(int64(bh.offset), int64(bh.length), r.blockKind(bh), reason)
552 func (r *Reader) fixErrCorruptedBH(bh blockHandle, err error) error {
553 if cerr, ok := err.(*ErrCorrupted); ok {
554 cerr.Pos = int64(bh.offset)
555 cerr.Size = int64(bh.length)
556 cerr.Kind = r.blockKind(bh)
557 return &errors.ErrCorrupted{Fd: r.fd, Err: cerr}
562 func (r *Reader) readRawBlock(bh blockHandle, verifyChecksum bool) ([]byte, error) {
563 data := r.bpool.Get(int(bh.length + blockTrailerLen))
564 if _, err := r.reader.ReadAt(data, int64(bh.offset)); err != nil && err != io.EOF {
570 checksum0 := binary.LittleEndian.Uint32(data[n:])
571 checksum1 := util.NewCRC(data[:n]).Value()
572 if checksum0 != checksum1 {
574 return nil, r.newErrCorruptedBH(bh, fmt.Sprintf("checksum mismatch, want=%#x got=%#x", checksum0, checksum1))
578 switch data[bh.length] {
579 case blockTypeNoCompression:
580 data = data[:bh.length]
581 case blockTypeSnappyCompression:
582 decLen, err := snappy.DecodedLen(data[:bh.length])
585 return nil, r.newErrCorruptedBH(bh, err.Error())
587 decData := r.bpool.Get(decLen)
588 decData, err = snappy.Decode(decData, data[:bh.length])
592 return nil, r.newErrCorruptedBH(bh, err.Error())
597 return nil, r.newErrCorruptedBH(bh, fmt.Sprintf("unknown compression type %#x", data[bh.length]))
602 func (r *Reader) readBlock(bh blockHandle, verifyChecksum bool) (*block, error) {
603 data, err := r.readRawBlock(bh, verifyChecksum)
607 restartsLen := int(binary.LittleEndian.Uint32(data[len(data)-4:]))
612 restartsLen: restartsLen,
613 restartsOffset: len(data) - (restartsLen+1)*4,
618 func (r *Reader) readBlockCached(bh blockHandle, verifyChecksum, fillCache bool) (*block, util.Releaser, error) {
625 ch = r.cache.Get(bh.offset, func() (size int, value cache.Value) {
627 b, err = r.readBlock(bh, verifyChecksum)
631 return cap(b.data), b
634 ch = r.cache.Get(bh.offset, nil)
637 b, ok := ch.Value().(*block)
640 return nil, nil, errors.New("leveldb/table: inconsistent block type")
643 } else if err != nil {
648 b, err := r.readBlock(bh, verifyChecksum)
652 func (r *Reader) readFilterBlock(bh blockHandle) (*filterBlock, error) {
653 data, err := r.readRawBlock(bh, true)
659 return nil, r.newErrCorruptedBH(bh, "too short")
662 oOffset := int(binary.LittleEndian.Uint32(data[m:]))
664 return nil, r.newErrCorruptedBH(bh, "invalid data-offsets offset")
670 baseLg: uint(data[n-1]),
671 filtersNum: (m - oOffset) / 4,
676 func (r *Reader) readFilterBlockCached(bh blockHandle, fillCache bool) (*filterBlock, util.Releaser, error) {
683 ch = r.cache.Get(bh.offset, func() (size int, value cache.Value) {
685 b, err = r.readFilterBlock(bh)
689 return cap(b.data), b
692 ch = r.cache.Get(bh.offset, nil)
695 b, ok := ch.Value().(*filterBlock)
698 return nil, nil, errors.New("leveldb/table: inconsistent block type")
701 } else if err != nil {
706 b, err := r.readFilterBlock(bh)
710 func (r *Reader) getIndexBlock(fillCache bool) (b *block, rel util.Releaser, err error) {
711 if r.indexBlock == nil {
712 return r.readBlockCached(r.indexBH, true, fillCache)
714 return r.indexBlock, util.NoopReleaser{}, nil
717 func (r *Reader) getFilterBlock(fillCache bool) (*filterBlock, util.Releaser, error) {
718 if r.filterBlock == nil {
719 return r.readFilterBlockCached(r.filterBH, fillCache)
721 return r.filterBlock, util.NoopReleaser{}, nil
724 func (r *Reader) newBlockIter(b *block, bReleaser util.Releaser, slice *util.Range, inclLimit bool) *blockIter {
728 blockReleaser: bReleaser,
729 // Valid key should never be nil.
730 key: make([]byte, 0),
733 riLimit: b.restartsLen,
736 offsetLimit: b.restartsOffset,
739 if slice.Start != nil {
740 if bi.Seek(slice.Start) {
741 bi.riStart = b.restartIndex(bi.restartIndex, b.restartsLen, bi.prevOffset)
742 bi.offsetStart = b.restartOffset(bi.riStart)
743 bi.offsetRealStart = bi.prevOffset
745 bi.riStart = b.restartsLen
746 bi.offsetStart = b.restartsOffset
747 bi.offsetRealStart = b.restartsOffset
750 if slice.Limit != nil {
751 if bi.Seek(slice.Limit) && (!inclLimit || bi.Next()) {
752 bi.offsetLimit = bi.prevOffset
753 bi.riLimit = bi.restartIndex + 1
757 if bi.offsetStart > bi.offsetLimit {
758 bi.sErr(errors.New("leveldb/table: invalid slice range"))
764 func (r *Reader) getDataIter(dataBH blockHandle, slice *util.Range, verifyChecksum, fillCache bool) iterator.Iterator {
765 b, rel, err := r.readBlockCached(dataBH, verifyChecksum, fillCache)
767 return iterator.NewEmptyIterator(err)
769 return r.newBlockIter(b, rel, slice, false)
772 func (r *Reader) getDataIterErr(dataBH blockHandle, slice *util.Range, verifyChecksum, fillCache bool) iterator.Iterator {
777 return iterator.NewEmptyIterator(r.err)
780 return r.getDataIter(dataBH, slice, verifyChecksum, fillCache)
783 // NewIterator creates an iterator from the table.
785 // Slice allows slicing the iterator to only contains keys in the given
786 // range. A nil Range.Start is treated as a key before all keys in the
787 // table. And a nil Range.Limit is treated as a key after all keys in
790 // The returned iterator is not safe for concurrent use and should be released
793 // Also read Iterator documentation of the leveldb/iterator package.
794 func (r *Reader) NewIterator(slice *util.Range, ro *opt.ReadOptions) iterator.Iterator {
799 return iterator.NewEmptyIterator(r.err)
802 fillCache := !ro.GetDontFillCache()
803 indexBlock, rel, err := r.getIndexBlock(fillCache)
805 return iterator.NewEmptyIterator(err)
808 blockIter: r.newBlockIter(indexBlock, rel, slice, true),
811 fillCache: !ro.GetDontFillCache(),
813 return iterator.NewIndexedIterator(index, opt.GetStrict(r.o, ro, opt.StrictReader))
816 func (r *Reader) find(key []byte, filtered bool, ro *opt.ReadOptions, noValue bool) (rkey, value []byte, err error) {
825 indexBlock, rel, err := r.getIndexBlock(true)
831 index := r.newBlockIter(indexBlock, nil, nil, true)
832 defer index.Release()
834 if !index.Seek(key) {
835 if err = index.Error(); err == nil {
841 dataBH, n := decodeBlockHandle(index.Value())
843 r.err = r.newErrCorruptedBH(r.indexBH, "bad data block handle")
844 return nil, nil, r.err
847 // The filter should only used for exact match.
848 if filtered && r.filter != nil {
849 filterBlock, frel, ferr := r.getFilterBlock(true)
851 if !filterBlock.contains(r.filter, dataBH.offset, key) {
853 return nil, nil, ErrNotFound
856 } else if !errors.IsCorrupted(ferr) {
857 return nil, nil, ferr
861 data := r.getDataIter(dataBH, nil, r.verifyChecksum, !ro.GetDontFillCache())
864 if err = data.Error(); err != nil {
868 // The nearest greater-than key is the first key of the next block.
870 if err = index.Error(); err == nil {
876 dataBH, n = decodeBlockHandle(index.Value())
878 r.err = r.newErrCorruptedBH(r.indexBH, "bad data block handle")
879 return nil, nil, r.err
882 data = r.getDataIter(dataBH, nil, r.verifyChecksum, !ro.GetDontFillCache())
885 if err = data.Error(); err == nil {
892 // Key doesn't use block buffer, no need to copy the buffer.
898 // Value does use block buffer, and since the buffer will be
899 // recycled, it need to be copied.
900 value = append([]byte{}, data.Value()...)
907 // Find finds key/value pair whose key is greater than or equal to the
908 // given key. It returns ErrNotFound if the table doesn't contain
910 // If filtered is true then the nearest 'block' will be checked against
911 // 'filter data' (if present) and will immediately return ErrNotFound if
912 // 'filter data' indicates that such pair doesn't exist.
914 // The caller may modify the contents of the returned slice as it is its
916 // It is safe to modify the contents of the argument after Find returns.
917 func (r *Reader) Find(key []byte, filtered bool, ro *opt.ReadOptions) (rkey, value []byte, err error) {
918 return r.find(key, filtered, ro, false)
921 // FindKey finds key that is greater than or equal to the given key.
922 // It returns ErrNotFound if the table doesn't contain such key.
923 // If filtered is true then the nearest 'block' will be checked against
924 // 'filter data' (if present) and will immediately return ErrNotFound if
925 // 'filter data' indicates that such key doesn't exist.
927 // The caller may modify the contents of the returned slice as it is its
929 // It is safe to modify the contents of the argument after Find returns.
930 func (r *Reader) FindKey(key []byte, filtered bool, ro *opt.ReadOptions) (rkey []byte, err error) {
931 rkey, _, err = r.find(key, filtered, ro, true)
935 // Get gets the value for the given key. It returns errors.ErrNotFound
936 // if the table does not contain the key.
938 // The caller may modify the contents of the returned slice as it is its
940 // It is safe to modify the contents of the argument after Find returns.
941 func (r *Reader) Get(key []byte, ro *opt.ReadOptions) (value []byte, err error) {
950 rkey, value, err := r.find(key, false, ro, false)
951 if err == nil && r.cmp.Compare(rkey, key) != 0 {
958 // OffsetOf returns approximate offset for the given key.
960 // It is safe to modify the contents of the argument after Get returns.
961 func (r *Reader) OffsetOf(key []byte) (offset int64, err error) {
970 indexBlock, rel, err := r.readBlockCached(r.indexBH, true, true)
976 index := r.newBlockIter(indexBlock, nil, nil, true)
977 defer index.Release()
979 dataBH, n := decodeBlockHandle(index.Value())
981 r.err = r.newErrCorruptedBH(r.indexBH, "bad data block handle")
984 offset = int64(dataBH.offset)
994 // Release implements util.Releaser.
995 // It also close the file if it is an io.Closer.
996 func (r *Reader) Release() {
1000 if closer, ok := r.reader.(io.Closer); ok {
1003 if r.indexBlock != nil {
1004 r.indexBlock.Release()
1007 if r.filterBlock != nil {
1008 r.filterBlock.Release()
1014 r.err = ErrReaderReleased
1017 // NewReader creates a new initialized table reader for the file.
1018 // The fi, cache and bpool is optional and can be nil.
1020 // The returned table reader instance is safe for concurrent use.
1021 func NewReader(f io.ReaderAt, size int64, fd storage.FileDesc, cache *cache.NamespaceGetter, bpool *util.BufferPool, o *opt.Options) (*Reader, error) {
1023 return nil, errors.New("leveldb/table: nil file")
1032 cmp: o.GetComparer(),
1033 verifyChecksum: o.GetStrict(opt.StrictBlockChecksum),
1036 if size < footerLen {
1037 r.err = r.newErrCorrupted(0, size, "table", "too small")
1041 footerPos := size - footerLen
1042 var footer [footerLen]byte
1043 if _, err := r.reader.ReadAt(footer[:], footerPos); err != nil && err != io.EOF {
1046 if string(footer[footerLen-len(magic):footerLen]) != magic {
1047 r.err = r.newErrCorrupted(footerPos, footerLen, "table-footer", "bad magic number")
1052 // Decode the metaindex block handle.
1053 r.metaBH, n = decodeBlockHandle(footer[:])
1055 r.err = r.newErrCorrupted(footerPos, footerLen, "table-footer", "bad metaindex block handle")
1059 // Decode the index block handle.
1060 r.indexBH, n = decodeBlockHandle(footer[n:])
1062 r.err = r.newErrCorrupted(footerPos, footerLen, "table-footer", "bad index block handle")
1066 // Read metaindex block.
1067 metaBlock, err := r.readBlock(r.metaBH, true)
1069 if errors.IsCorrupted(err) {
1077 r.dataEnd = int64(r.metaBH.offset)
1080 metaIter := r.newBlockIter(metaBlock, nil, nil, true)
1081 for metaIter.Next() {
1082 key := string(metaIter.Key())
1083 if !strings.HasPrefix(key, "filter.") {
1087 if f0 := o.GetFilter(); f0 != nil && f0.Name() == fn {
1090 for _, f0 := range o.GetAltFilters() {
1091 if f0.Name() == fn {
1097 if r.filter != nil {
1098 filterBH, n := decodeBlockHandle(metaIter.Value())
1102 r.filterBH = filterBH
1104 r.dataEnd = int64(filterBH.offset)
1111 // Cache index and filter block locally, since we don't have global cache.
1113 r.indexBlock, err = r.readBlock(r.indexBH, true)
1115 if errors.IsCorrupted(err) {
1121 if r.filter != nil {
1122 r.filterBlock, err = r.readFilterBlock(r.filterBH)
1124 if !errors.IsCorrupted(err) {
1128 // Don't use filter then.