OSDN Git Service

Hulk did something
[bytom/vapor.git] / vendor / github.com / syndtr / goleveldb / leveldb / table / reader.go
1 // Copyright (c) 2012, Suryandaru Triandana <syndtr@gmail.com>
2 // All rights reserved.
3 //
4 // Use of this source code is governed by a BSD-style license that can be
5 // found in the LICENSE file.
6
7 package table
8
9 import (
10         "encoding/binary"
11         "fmt"
12         "io"
13         "sort"
14         "strings"
15         "sync"
16
17         "github.com/golang/snappy"
18
19         "github.com/syndtr/goleveldb/leveldb/cache"
20         "github.com/syndtr/goleveldb/leveldb/comparer"
21         "github.com/syndtr/goleveldb/leveldb/errors"
22         "github.com/syndtr/goleveldb/leveldb/filter"
23         "github.com/syndtr/goleveldb/leveldb/iterator"
24         "github.com/syndtr/goleveldb/leveldb/opt"
25         "github.com/syndtr/goleveldb/leveldb/storage"
26         "github.com/syndtr/goleveldb/leveldb/util"
27 )
28
29 // Reader errors.
30 var (
31         ErrNotFound       = errors.ErrNotFound
32         ErrReaderReleased = errors.New("leveldb/table: reader released")
33         ErrIterReleased   = errors.New("leveldb/table: iterator released")
34 )
35
36 // ErrCorrupted describes error due to corruption. This error will be wrapped
37 // with errors.ErrCorrupted.
38 type ErrCorrupted struct {
39         Pos    int64
40         Size   int64
41         Kind   string
42         Reason string
43 }
44
45 func (e *ErrCorrupted) Error() string {
46         return fmt.Sprintf("leveldb/table: corruption on %s (pos=%d): %s", e.Kind, e.Pos, e.Reason)
47 }
48
49 func max(x, y int) int {
50         if x > y {
51                 return x
52         }
53         return y
54 }
55
56 type block struct {
57         bpool          *util.BufferPool
58         bh             blockHandle
59         data           []byte
60         restartsLen    int
61         restartsOffset int
62 }
63
64 func (b *block) seek(cmp comparer.Comparer, rstart, rlimit int, key []byte) (index, offset int, err error) {
65         index = sort.Search(b.restartsLen-rstart-(b.restartsLen-rlimit), func(i int) bool {
66                 offset := int(binary.LittleEndian.Uint32(b.data[b.restartsOffset+4*(rstart+i):]))
67                 offset++                                    // shared always zero, since this is a restart point
68                 v1, n1 := binary.Uvarint(b.data[offset:])   // key length
69                 _, n2 := binary.Uvarint(b.data[offset+n1:]) // value length
70                 m := offset + n1 + n2
71                 return cmp.Compare(b.data[m:m+int(v1)], key) > 0
72         }) + rstart - 1
73         if index < rstart {
74                 // The smallest key is greater-than key sought.
75                 index = rstart
76         }
77         offset = int(binary.LittleEndian.Uint32(b.data[b.restartsOffset+4*index:]))
78         return
79 }
80
81 func (b *block) restartIndex(rstart, rlimit, offset int) int {
82         return sort.Search(b.restartsLen-rstart-(b.restartsLen-rlimit), func(i int) bool {
83                 return int(binary.LittleEndian.Uint32(b.data[b.restartsOffset+4*(rstart+i):])) > offset
84         }) + rstart - 1
85 }
86
87 func (b *block) restartOffset(index int) int {
88         return int(binary.LittleEndian.Uint32(b.data[b.restartsOffset+4*index:]))
89 }
90
91 func (b *block) entry(offset int) (key, value []byte, nShared, n int, err error) {
92         if offset >= b.restartsOffset {
93                 if offset != b.restartsOffset {
94                         err = &ErrCorrupted{Reason: "entries offset not aligned"}
95                 }
96                 return
97         }
98         v0, n0 := binary.Uvarint(b.data[offset:])       // Shared prefix length
99         v1, n1 := binary.Uvarint(b.data[offset+n0:])    // Key length
100         v2, n2 := binary.Uvarint(b.data[offset+n0+n1:]) // Value length
101         m := n0 + n1 + n2
102         n = m + int(v1) + int(v2)
103         if n0 <= 0 || n1 <= 0 || n2 <= 0 || offset+n > b.restartsOffset {
104                 err = &ErrCorrupted{Reason: "entries corrupted"}
105                 return
106         }
107         key = b.data[offset+m : offset+m+int(v1)]
108         value = b.data[offset+m+int(v1) : offset+n]
109         nShared = int(v0)
110         return
111 }
112
113 func (b *block) Release() {
114         b.bpool.Put(b.data)
115         b.bpool = nil
116         b.data = nil
117 }
118
119 type dir int
120
121 const (
122         dirReleased dir = iota - 1
123         dirSOI
124         dirEOI
125         dirBackward
126         dirForward
127 )
128
129 type blockIter struct {
130         tr            *Reader
131         block         *block
132         blockReleaser util.Releaser
133         releaser      util.Releaser
134         key, value    []byte
135         offset        int
136         // Previous offset, only filled by Next.
137         prevOffset   int
138         prevNode     []int
139         prevKeys     []byte
140         restartIndex int
141         // Iterator direction.
142         dir dir
143         // Restart index slice range.
144         riStart int
145         riLimit int
146         // Offset slice range.
147         offsetStart     int
148         offsetRealStart int
149         offsetLimit     int
150         // Error.
151         err error
152 }
153
154 func (i *blockIter) sErr(err error) {
155         i.err = err
156         i.key = nil
157         i.value = nil
158         i.prevNode = nil
159         i.prevKeys = nil
160 }
161
162 func (i *blockIter) reset() {
163         if i.dir == dirBackward {
164                 i.prevNode = i.prevNode[:0]
165                 i.prevKeys = i.prevKeys[:0]
166         }
167         i.restartIndex = i.riStart
168         i.offset = i.offsetStart
169         i.dir = dirSOI
170         i.key = i.key[:0]
171         i.value = nil
172 }
173
174 func (i *blockIter) isFirst() bool {
175         switch i.dir {
176         case dirForward:
177                 return i.prevOffset == i.offsetRealStart
178         case dirBackward:
179                 return len(i.prevNode) == 1 && i.restartIndex == i.riStart
180         }
181         return false
182 }
183
184 func (i *blockIter) isLast() bool {
185         switch i.dir {
186         case dirForward, dirBackward:
187                 return i.offset == i.offsetLimit
188         }
189         return false
190 }
191
192 func (i *blockIter) First() bool {
193         if i.err != nil {
194                 return false
195         } else if i.dir == dirReleased {
196                 i.err = ErrIterReleased
197                 return false
198         }
199
200         if i.dir == dirBackward {
201                 i.prevNode = i.prevNode[:0]
202                 i.prevKeys = i.prevKeys[:0]
203         }
204         i.dir = dirSOI
205         return i.Next()
206 }
207
208 func (i *blockIter) Last() bool {
209         if i.err != nil {
210                 return false
211         } else if i.dir == dirReleased {
212                 i.err = ErrIterReleased
213                 return false
214         }
215
216         if i.dir == dirBackward {
217                 i.prevNode = i.prevNode[:0]
218                 i.prevKeys = i.prevKeys[:0]
219         }
220         i.dir = dirEOI
221         return i.Prev()
222 }
223
224 func (i *blockIter) Seek(key []byte) bool {
225         if i.err != nil {
226                 return false
227         } else if i.dir == dirReleased {
228                 i.err = ErrIterReleased
229                 return false
230         }
231
232         ri, offset, err := i.block.seek(i.tr.cmp, i.riStart, i.riLimit, key)
233         if err != nil {
234                 i.sErr(err)
235                 return false
236         }
237         i.restartIndex = ri
238         i.offset = max(i.offsetStart, offset)
239         if i.dir == dirSOI || i.dir == dirEOI {
240                 i.dir = dirForward
241         }
242         for i.Next() {
243                 if i.tr.cmp.Compare(i.key, key) >= 0 {
244                         return true
245                 }
246         }
247         return false
248 }
249
250 func (i *blockIter) Next() bool {
251         if i.dir == dirEOI || i.err != nil {
252                 return false
253         } else if i.dir == dirReleased {
254                 i.err = ErrIterReleased
255                 return false
256         }
257
258         if i.dir == dirSOI {
259                 i.restartIndex = i.riStart
260                 i.offset = i.offsetStart
261         } else if i.dir == dirBackward {
262                 i.prevNode = i.prevNode[:0]
263                 i.prevKeys = i.prevKeys[:0]
264         }
265         for i.offset < i.offsetRealStart {
266                 key, value, nShared, n, err := i.block.entry(i.offset)
267                 if err != nil {
268                         i.sErr(i.tr.fixErrCorruptedBH(i.block.bh, err))
269                         return false
270                 }
271                 if n == 0 {
272                         i.dir = dirEOI
273                         return false
274                 }
275                 i.key = append(i.key[:nShared], key...)
276                 i.value = value
277                 i.offset += n
278         }
279         if i.offset >= i.offsetLimit {
280                 i.dir = dirEOI
281                 if i.offset != i.offsetLimit {
282                         i.sErr(i.tr.newErrCorruptedBH(i.block.bh, "entries offset not aligned"))
283                 }
284                 return false
285         }
286         key, value, nShared, n, err := i.block.entry(i.offset)
287         if err != nil {
288                 i.sErr(i.tr.fixErrCorruptedBH(i.block.bh, err))
289                 return false
290         }
291         if n == 0 {
292                 i.dir = dirEOI
293                 return false
294         }
295         i.key = append(i.key[:nShared], key...)
296         i.value = value
297         i.prevOffset = i.offset
298         i.offset += n
299         i.dir = dirForward
300         return true
301 }
302
303 func (i *blockIter) Prev() bool {
304         if i.dir == dirSOI || i.err != nil {
305                 return false
306         } else if i.dir == dirReleased {
307                 i.err = ErrIterReleased
308                 return false
309         }
310
311         var ri int
312         if i.dir == dirForward {
313                 // Change direction.
314                 i.offset = i.prevOffset
315                 if i.offset == i.offsetRealStart {
316                         i.dir = dirSOI
317                         return false
318                 }
319                 ri = i.block.restartIndex(i.restartIndex, i.riLimit, i.offset)
320                 i.dir = dirBackward
321         } else if i.dir == dirEOI {
322                 // At the end of iterator.
323                 i.restartIndex = i.riLimit
324                 i.offset = i.offsetLimit
325                 if i.offset == i.offsetRealStart {
326                         i.dir = dirSOI
327                         return false
328                 }
329                 ri = i.riLimit - 1
330                 i.dir = dirBackward
331         } else if len(i.prevNode) == 1 {
332                 // This is the end of a restart range.
333                 i.offset = i.prevNode[0]
334                 i.prevNode = i.prevNode[:0]
335                 if i.restartIndex == i.riStart {
336                         i.dir = dirSOI
337                         return false
338                 }
339                 i.restartIndex--
340                 ri = i.restartIndex
341         } else {
342                 // In the middle of restart range, get from cache.
343                 n := len(i.prevNode) - 3
344                 node := i.prevNode[n:]
345                 i.prevNode = i.prevNode[:n]
346                 // Get the key.
347                 ko := node[0]
348                 i.key = append(i.key[:0], i.prevKeys[ko:]...)
349                 i.prevKeys = i.prevKeys[:ko]
350                 // Get the value.
351                 vo := node[1]
352                 vl := vo + node[2]
353                 i.value = i.block.data[vo:vl]
354                 i.offset = vl
355                 return true
356         }
357         // Build entries cache.
358         i.key = i.key[:0]
359         i.value = nil
360         offset := i.block.restartOffset(ri)
361         if offset == i.offset {
362                 ri--
363                 if ri < 0 {
364                         i.dir = dirSOI
365                         return false
366                 }
367                 offset = i.block.restartOffset(ri)
368         }
369         i.prevNode = append(i.prevNode, offset)
370         for {
371                 key, value, nShared, n, err := i.block.entry(offset)
372                 if err != nil {
373                         i.sErr(i.tr.fixErrCorruptedBH(i.block.bh, err))
374                         return false
375                 }
376                 if offset >= i.offsetRealStart {
377                         if i.value != nil {
378                                 // Appends 3 variables:
379                                 // 1. Previous keys offset
380                                 // 2. Value offset in the data block
381                                 // 3. Value length
382                                 i.prevNode = append(i.prevNode, len(i.prevKeys), offset-len(i.value), len(i.value))
383                                 i.prevKeys = append(i.prevKeys, i.key...)
384                         }
385                         i.value = value
386                 }
387                 i.key = append(i.key[:nShared], key...)
388                 offset += n
389                 // Stop if target offset reached.
390                 if offset >= i.offset {
391                         if offset != i.offset {
392                                 i.sErr(i.tr.newErrCorruptedBH(i.block.bh, "entries offset not aligned"))
393                                 return false
394                         }
395
396                         break
397                 }
398         }
399         i.restartIndex = ri
400         i.offset = offset
401         return true
402 }
403
404 func (i *blockIter) Key() []byte {
405         if i.err != nil || i.dir <= dirEOI {
406                 return nil
407         }
408         return i.key
409 }
410
411 func (i *blockIter) Value() []byte {
412         if i.err != nil || i.dir <= dirEOI {
413                 return nil
414         }
415         return i.value
416 }
417
418 func (i *blockIter) Release() {
419         if i.dir != dirReleased {
420                 i.tr = nil
421                 i.block = nil
422                 i.prevNode = nil
423                 i.prevKeys = nil
424                 i.key = nil
425                 i.value = nil
426                 i.dir = dirReleased
427                 if i.blockReleaser != nil {
428                         i.blockReleaser.Release()
429                         i.blockReleaser = nil
430                 }
431                 if i.releaser != nil {
432                         i.releaser.Release()
433                         i.releaser = nil
434                 }
435         }
436 }
437
438 func (i *blockIter) SetReleaser(releaser util.Releaser) {
439         if i.dir == dirReleased {
440                 panic(util.ErrReleased)
441         }
442         if i.releaser != nil && releaser != nil {
443                 panic(util.ErrHasReleaser)
444         }
445         i.releaser = releaser
446 }
447
448 func (i *blockIter) Valid() bool {
449         return i.err == nil && (i.dir == dirBackward || i.dir == dirForward)
450 }
451
452 func (i *blockIter) Error() error {
453         return i.err
454 }
455
456 type filterBlock struct {
457         bpool      *util.BufferPool
458         data       []byte
459         oOffset    int
460         baseLg     uint
461         filtersNum int
462 }
463
464 func (b *filterBlock) contains(filter filter.Filter, offset uint64, key []byte) bool {
465         i := int(offset >> b.baseLg)
466         if i < b.filtersNum {
467                 o := b.data[b.oOffset+i*4:]
468                 n := int(binary.LittleEndian.Uint32(o))
469                 m := int(binary.LittleEndian.Uint32(o[4:]))
470                 if n < m && m <= b.oOffset {
471                         return filter.Contains(b.data[n:m], key)
472                 } else if n == m {
473                         return false
474                 }
475         }
476         return true
477 }
478
479 func (b *filterBlock) Release() {
480         b.bpool.Put(b.data)
481         b.bpool = nil
482         b.data = nil
483 }
484
485 type indexIter struct {
486         *blockIter
487         tr    *Reader
488         slice *util.Range
489         // Options
490         fillCache bool
491 }
492
493 func (i *indexIter) Get() iterator.Iterator {
494         value := i.Value()
495         if value == nil {
496                 return nil
497         }
498         dataBH, n := decodeBlockHandle(value)
499         if n == 0 {
500                 return iterator.NewEmptyIterator(i.tr.newErrCorruptedBH(i.tr.indexBH, "bad data block handle"))
501         }
502
503         var slice *util.Range
504         if i.slice != nil && (i.blockIter.isFirst() || i.blockIter.isLast()) {
505                 slice = i.slice
506         }
507         return i.tr.getDataIterErr(dataBH, slice, i.tr.verifyChecksum, i.fillCache)
508 }
509
510 // Reader is a table reader.
511 type Reader struct {
512         mu     sync.RWMutex
513         fd     storage.FileDesc
514         reader io.ReaderAt
515         cache  *cache.NamespaceGetter
516         err    error
517         bpool  *util.BufferPool
518         // Options
519         o              *opt.Options
520         cmp            comparer.Comparer
521         filter         filter.Filter
522         verifyChecksum bool
523
524         dataEnd                   int64
525         metaBH, indexBH, filterBH blockHandle
526         indexBlock                *block
527         filterBlock               *filterBlock
528 }
529
530 func (r *Reader) blockKind(bh blockHandle) string {
531         switch bh.offset {
532         case r.metaBH.offset:
533                 return "meta-block"
534         case r.indexBH.offset:
535                 return "index-block"
536         case r.filterBH.offset:
537                 if r.filterBH.length > 0 {
538                         return "filter-block"
539                 }
540         }
541         return "data-block"
542 }
543
544 func (r *Reader) newErrCorrupted(pos, size int64, kind, reason string) error {
545         return &errors.ErrCorrupted{Fd: r.fd, Err: &ErrCorrupted{Pos: pos, Size: size, Kind: kind, Reason: reason}}
546 }
547
548 func (r *Reader) newErrCorruptedBH(bh blockHandle, reason string) error {
549         return r.newErrCorrupted(int64(bh.offset), int64(bh.length), r.blockKind(bh), reason)
550 }
551
552 func (r *Reader) fixErrCorruptedBH(bh blockHandle, err error) error {
553         if cerr, ok := err.(*ErrCorrupted); ok {
554                 cerr.Pos = int64(bh.offset)
555                 cerr.Size = int64(bh.length)
556                 cerr.Kind = r.blockKind(bh)
557                 return &errors.ErrCorrupted{Fd: r.fd, Err: cerr}
558         }
559         return err
560 }
561
562 func (r *Reader) readRawBlock(bh blockHandle, verifyChecksum bool) ([]byte, error) {
563         data := r.bpool.Get(int(bh.length + blockTrailerLen))
564         if _, err := r.reader.ReadAt(data, int64(bh.offset)); err != nil && err != io.EOF {
565                 return nil, err
566         }
567
568         if verifyChecksum {
569                 n := bh.length + 1
570                 checksum0 := binary.LittleEndian.Uint32(data[n:])
571                 checksum1 := util.NewCRC(data[:n]).Value()
572                 if checksum0 != checksum1 {
573                         r.bpool.Put(data)
574                         return nil, r.newErrCorruptedBH(bh, fmt.Sprintf("checksum mismatch, want=%#x got=%#x", checksum0, checksum1))
575                 }
576         }
577
578         switch data[bh.length] {
579         case blockTypeNoCompression:
580                 data = data[:bh.length]
581         case blockTypeSnappyCompression:
582                 decLen, err := snappy.DecodedLen(data[:bh.length])
583                 if err != nil {
584                         r.bpool.Put(data)
585                         return nil, r.newErrCorruptedBH(bh, err.Error())
586                 }
587                 decData := r.bpool.Get(decLen)
588                 decData, err = snappy.Decode(decData, data[:bh.length])
589                 r.bpool.Put(data)
590                 if err != nil {
591                         r.bpool.Put(decData)
592                         return nil, r.newErrCorruptedBH(bh, err.Error())
593                 }
594                 data = decData
595         default:
596                 r.bpool.Put(data)
597                 return nil, r.newErrCorruptedBH(bh, fmt.Sprintf("unknown compression type %#x", data[bh.length]))
598         }
599         return data, nil
600 }
601
602 func (r *Reader) readBlock(bh blockHandle, verifyChecksum bool) (*block, error) {
603         data, err := r.readRawBlock(bh, verifyChecksum)
604         if err != nil {
605                 return nil, err
606         }
607         restartsLen := int(binary.LittleEndian.Uint32(data[len(data)-4:]))
608         b := &block{
609                 bpool:          r.bpool,
610                 bh:             bh,
611                 data:           data,
612                 restartsLen:    restartsLen,
613                 restartsOffset: len(data) - (restartsLen+1)*4,
614         }
615         return b, nil
616 }
617
618 func (r *Reader) readBlockCached(bh blockHandle, verifyChecksum, fillCache bool) (*block, util.Releaser, error) {
619         if r.cache != nil {
620                 var (
621                         err error
622                         ch  *cache.Handle
623                 )
624                 if fillCache {
625                         ch = r.cache.Get(bh.offset, func() (size int, value cache.Value) {
626                                 var b *block
627                                 b, err = r.readBlock(bh, verifyChecksum)
628                                 if err != nil {
629                                         return 0, nil
630                                 }
631                                 return cap(b.data), b
632                         })
633                 } else {
634                         ch = r.cache.Get(bh.offset, nil)
635                 }
636                 if ch != nil {
637                         b, ok := ch.Value().(*block)
638                         if !ok {
639                                 ch.Release()
640                                 return nil, nil, errors.New("leveldb/table: inconsistent block type")
641                         }
642                         return b, ch, err
643                 } else if err != nil {
644                         return nil, nil, err
645                 }
646         }
647
648         b, err := r.readBlock(bh, verifyChecksum)
649         return b, b, err
650 }
651
652 func (r *Reader) readFilterBlock(bh blockHandle) (*filterBlock, error) {
653         data, err := r.readRawBlock(bh, true)
654         if err != nil {
655                 return nil, err
656         }
657         n := len(data)
658         if n < 5 {
659                 return nil, r.newErrCorruptedBH(bh, "too short")
660         }
661         m := n - 5
662         oOffset := int(binary.LittleEndian.Uint32(data[m:]))
663         if oOffset > m {
664                 return nil, r.newErrCorruptedBH(bh, "invalid data-offsets offset")
665         }
666         b := &filterBlock{
667                 bpool:      r.bpool,
668                 data:       data,
669                 oOffset:    oOffset,
670                 baseLg:     uint(data[n-1]),
671                 filtersNum: (m - oOffset) / 4,
672         }
673         return b, nil
674 }
675
676 func (r *Reader) readFilterBlockCached(bh blockHandle, fillCache bool) (*filterBlock, util.Releaser, error) {
677         if r.cache != nil {
678                 var (
679                         err error
680                         ch  *cache.Handle
681                 )
682                 if fillCache {
683                         ch = r.cache.Get(bh.offset, func() (size int, value cache.Value) {
684                                 var b *filterBlock
685                                 b, err = r.readFilterBlock(bh)
686                                 if err != nil {
687                                         return 0, nil
688                                 }
689                                 return cap(b.data), b
690                         })
691                 } else {
692                         ch = r.cache.Get(bh.offset, nil)
693                 }
694                 if ch != nil {
695                         b, ok := ch.Value().(*filterBlock)
696                         if !ok {
697                                 ch.Release()
698                                 return nil, nil, errors.New("leveldb/table: inconsistent block type")
699                         }
700                         return b, ch, err
701                 } else if err != nil {
702                         return nil, nil, err
703                 }
704         }
705
706         b, err := r.readFilterBlock(bh)
707         return b, b, err
708 }
709
710 func (r *Reader) getIndexBlock(fillCache bool) (b *block, rel util.Releaser, err error) {
711         if r.indexBlock == nil {
712                 return r.readBlockCached(r.indexBH, true, fillCache)
713         }
714         return r.indexBlock, util.NoopReleaser{}, nil
715 }
716
717 func (r *Reader) getFilterBlock(fillCache bool) (*filterBlock, util.Releaser, error) {
718         if r.filterBlock == nil {
719                 return r.readFilterBlockCached(r.filterBH, fillCache)
720         }
721         return r.filterBlock, util.NoopReleaser{}, nil
722 }
723
724 func (r *Reader) newBlockIter(b *block, bReleaser util.Releaser, slice *util.Range, inclLimit bool) *blockIter {
725         bi := &blockIter{
726                 tr:            r,
727                 block:         b,
728                 blockReleaser: bReleaser,
729                 // Valid key should never be nil.
730                 key:             make([]byte, 0),
731                 dir:             dirSOI,
732                 riStart:         0,
733                 riLimit:         b.restartsLen,
734                 offsetStart:     0,
735                 offsetRealStart: 0,
736                 offsetLimit:     b.restartsOffset,
737         }
738         if slice != nil {
739                 if slice.Start != nil {
740                         if bi.Seek(slice.Start) {
741                                 bi.riStart = b.restartIndex(bi.restartIndex, b.restartsLen, bi.prevOffset)
742                                 bi.offsetStart = b.restartOffset(bi.riStart)
743                                 bi.offsetRealStart = bi.prevOffset
744                         } else {
745                                 bi.riStart = b.restartsLen
746                                 bi.offsetStart = b.restartsOffset
747                                 bi.offsetRealStart = b.restartsOffset
748                         }
749                 }
750                 if slice.Limit != nil {
751                         if bi.Seek(slice.Limit) && (!inclLimit || bi.Next()) {
752                                 bi.offsetLimit = bi.prevOffset
753                                 bi.riLimit = bi.restartIndex + 1
754                         }
755                 }
756                 bi.reset()
757                 if bi.offsetStart > bi.offsetLimit {
758                         bi.sErr(errors.New("leveldb/table: invalid slice range"))
759                 }
760         }
761         return bi
762 }
763
764 func (r *Reader) getDataIter(dataBH blockHandle, slice *util.Range, verifyChecksum, fillCache bool) iterator.Iterator {
765         b, rel, err := r.readBlockCached(dataBH, verifyChecksum, fillCache)
766         if err != nil {
767                 return iterator.NewEmptyIterator(err)
768         }
769         return r.newBlockIter(b, rel, slice, false)
770 }
771
772 func (r *Reader) getDataIterErr(dataBH blockHandle, slice *util.Range, verifyChecksum, fillCache bool) iterator.Iterator {
773         r.mu.RLock()
774         defer r.mu.RUnlock()
775
776         if r.err != nil {
777                 return iterator.NewEmptyIterator(r.err)
778         }
779
780         return r.getDataIter(dataBH, slice, verifyChecksum, fillCache)
781 }
782
783 // NewIterator creates an iterator from the table.
784 //
785 // Slice allows slicing the iterator to only contains keys in the given
786 // range. A nil Range.Start is treated as a key before all keys in the
787 // table. And a nil Range.Limit is treated as a key after all keys in
788 // the table.
789 //
790 // The returned iterator is not safe for concurrent use and should be released
791 // after use.
792 //
793 // Also read Iterator documentation of the leveldb/iterator package.
794 func (r *Reader) NewIterator(slice *util.Range, ro *opt.ReadOptions) iterator.Iterator {
795         r.mu.RLock()
796         defer r.mu.RUnlock()
797
798         if r.err != nil {
799                 return iterator.NewEmptyIterator(r.err)
800         }
801
802         fillCache := !ro.GetDontFillCache()
803         indexBlock, rel, err := r.getIndexBlock(fillCache)
804         if err != nil {
805                 return iterator.NewEmptyIterator(err)
806         }
807         index := &indexIter{
808                 blockIter: r.newBlockIter(indexBlock, rel, slice, true),
809                 tr:        r,
810                 slice:     slice,
811                 fillCache: !ro.GetDontFillCache(),
812         }
813         return iterator.NewIndexedIterator(index, opt.GetStrict(r.o, ro, opt.StrictReader))
814 }
815
816 func (r *Reader) find(key []byte, filtered bool, ro *opt.ReadOptions, noValue bool) (rkey, value []byte, err error) {
817         r.mu.RLock()
818         defer r.mu.RUnlock()
819
820         if r.err != nil {
821                 err = r.err
822                 return
823         }
824
825         indexBlock, rel, err := r.getIndexBlock(true)
826         if err != nil {
827                 return
828         }
829         defer rel.Release()
830
831         index := r.newBlockIter(indexBlock, nil, nil, true)
832         defer index.Release()
833
834         if !index.Seek(key) {
835                 if err = index.Error(); err == nil {
836                         err = ErrNotFound
837                 }
838                 return
839         }
840
841         dataBH, n := decodeBlockHandle(index.Value())
842         if n == 0 {
843                 r.err = r.newErrCorruptedBH(r.indexBH, "bad data block handle")
844                 return nil, nil, r.err
845         }
846
847         // The filter should only used for exact match.
848         if filtered && r.filter != nil {
849                 filterBlock, frel, ferr := r.getFilterBlock(true)
850                 if ferr == nil {
851                         if !filterBlock.contains(r.filter, dataBH.offset, key) {
852                                 frel.Release()
853                                 return nil, nil, ErrNotFound
854                         }
855                         frel.Release()
856                 } else if !errors.IsCorrupted(ferr) {
857                         return nil, nil, ferr
858                 }
859         }
860
861         data := r.getDataIter(dataBH, nil, r.verifyChecksum, !ro.GetDontFillCache())
862         if !data.Seek(key) {
863                 data.Release()
864                 if err = data.Error(); err != nil {
865                         return
866                 }
867
868                 // The nearest greater-than key is the first key of the next block.
869                 if !index.Next() {
870                         if err = index.Error(); err == nil {
871                                 err = ErrNotFound
872                         }
873                         return
874                 }
875
876                 dataBH, n = decodeBlockHandle(index.Value())
877                 if n == 0 {
878                         r.err = r.newErrCorruptedBH(r.indexBH, "bad data block handle")
879                         return nil, nil, r.err
880                 }
881
882                 data = r.getDataIter(dataBH, nil, r.verifyChecksum, !ro.GetDontFillCache())
883                 if !data.Next() {
884                         data.Release()
885                         if err = data.Error(); err == nil {
886                                 err = ErrNotFound
887                         }
888                         return
889                 }
890         }
891
892         // Key doesn't use block buffer, no need to copy the buffer.
893         rkey = data.Key()
894         if !noValue {
895                 if r.bpool == nil {
896                         value = data.Value()
897                 } else {
898                         // Value does use block buffer, and since the buffer will be
899                         // recycled, it need to be copied.
900                         value = append([]byte{}, data.Value()...)
901                 }
902         }
903         data.Release()
904         return
905 }
906
907 // Find finds key/value pair whose key is greater than or equal to the
908 // given key. It returns ErrNotFound if the table doesn't contain
909 // such pair.
910 // If filtered is true then the nearest 'block' will be checked against
911 // 'filter data' (if present) and will immediately return ErrNotFound if
912 // 'filter data' indicates that such pair doesn't exist.
913 //
914 // The caller may modify the contents of the returned slice as it is its
915 // own copy.
916 // It is safe to modify the contents of the argument after Find returns.
917 func (r *Reader) Find(key []byte, filtered bool, ro *opt.ReadOptions) (rkey, value []byte, err error) {
918         return r.find(key, filtered, ro, false)
919 }
920
921 // FindKey finds key that is greater than or equal to the given key.
922 // It returns ErrNotFound if the table doesn't contain such key.
923 // If filtered is true then the nearest 'block' will be checked against
924 // 'filter data' (if present) and will immediately return ErrNotFound if
925 // 'filter data' indicates that such key doesn't exist.
926 //
927 // The caller may modify the contents of the returned slice as it is its
928 // own copy.
929 // It is safe to modify the contents of the argument after Find returns.
930 func (r *Reader) FindKey(key []byte, filtered bool, ro *opt.ReadOptions) (rkey []byte, err error) {
931         rkey, _, err = r.find(key, filtered, ro, true)
932         return
933 }
934
935 // Get gets the value for the given key. It returns errors.ErrNotFound
936 // if the table does not contain the key.
937 //
938 // The caller may modify the contents of the returned slice as it is its
939 // own copy.
940 // It is safe to modify the contents of the argument after Find returns.
941 func (r *Reader) Get(key []byte, ro *opt.ReadOptions) (value []byte, err error) {
942         r.mu.RLock()
943         defer r.mu.RUnlock()
944
945         if r.err != nil {
946                 err = r.err
947                 return
948         }
949
950         rkey, value, err := r.find(key, false, ro, false)
951         if err == nil && r.cmp.Compare(rkey, key) != 0 {
952                 value = nil
953                 err = ErrNotFound
954         }
955         return
956 }
957
958 // OffsetOf returns approximate offset for the given key.
959 //
960 // It is safe to modify the contents of the argument after Get returns.
961 func (r *Reader) OffsetOf(key []byte) (offset int64, err error) {
962         r.mu.RLock()
963         defer r.mu.RUnlock()
964
965         if r.err != nil {
966                 err = r.err
967                 return
968         }
969
970         indexBlock, rel, err := r.readBlockCached(r.indexBH, true, true)
971         if err != nil {
972                 return
973         }
974         defer rel.Release()
975
976         index := r.newBlockIter(indexBlock, nil, nil, true)
977         defer index.Release()
978         if index.Seek(key) {
979                 dataBH, n := decodeBlockHandle(index.Value())
980                 if n == 0 {
981                         r.err = r.newErrCorruptedBH(r.indexBH, "bad data block handle")
982                         return
983                 }
984                 offset = int64(dataBH.offset)
985                 return
986         }
987         err = index.Error()
988         if err == nil {
989                 offset = r.dataEnd
990         }
991         return
992 }
993
994 // Release implements util.Releaser.
995 // It also close the file if it is an io.Closer.
996 func (r *Reader) Release() {
997         r.mu.Lock()
998         defer r.mu.Unlock()
999
1000         if closer, ok := r.reader.(io.Closer); ok {
1001                 closer.Close()
1002         }
1003         if r.indexBlock != nil {
1004                 r.indexBlock.Release()
1005                 r.indexBlock = nil
1006         }
1007         if r.filterBlock != nil {
1008                 r.filterBlock.Release()
1009                 r.filterBlock = nil
1010         }
1011         r.reader = nil
1012         r.cache = nil
1013         r.bpool = nil
1014         r.err = ErrReaderReleased
1015 }
1016
1017 // NewReader creates a new initialized table reader for the file.
1018 // The fi, cache and bpool is optional and can be nil.
1019 //
1020 // The returned table reader instance is safe for concurrent use.
1021 func NewReader(f io.ReaderAt, size int64, fd storage.FileDesc, cache *cache.NamespaceGetter, bpool *util.BufferPool, o *opt.Options) (*Reader, error) {
1022         if f == nil {
1023                 return nil, errors.New("leveldb/table: nil file")
1024         }
1025
1026         r := &Reader{
1027                 fd:             fd,
1028                 reader:         f,
1029                 cache:          cache,
1030                 bpool:          bpool,
1031                 o:              o,
1032                 cmp:            o.GetComparer(),
1033                 verifyChecksum: o.GetStrict(opt.StrictBlockChecksum),
1034         }
1035
1036         if size < footerLen {
1037                 r.err = r.newErrCorrupted(0, size, "table", "too small")
1038                 return r, nil
1039         }
1040
1041         footerPos := size - footerLen
1042         var footer [footerLen]byte
1043         if _, err := r.reader.ReadAt(footer[:], footerPos); err != nil && err != io.EOF {
1044                 return nil, err
1045         }
1046         if string(footer[footerLen-len(magic):footerLen]) != magic {
1047                 r.err = r.newErrCorrupted(footerPos, footerLen, "table-footer", "bad magic number")
1048                 return r, nil
1049         }
1050
1051         var n int
1052         // Decode the metaindex block handle.
1053         r.metaBH, n = decodeBlockHandle(footer[:])
1054         if n == 0 {
1055                 r.err = r.newErrCorrupted(footerPos, footerLen, "table-footer", "bad metaindex block handle")
1056                 return r, nil
1057         }
1058
1059         // Decode the index block handle.
1060         r.indexBH, n = decodeBlockHandle(footer[n:])
1061         if n == 0 {
1062                 r.err = r.newErrCorrupted(footerPos, footerLen, "table-footer", "bad index block handle")
1063                 return r, nil
1064         }
1065
1066         // Read metaindex block.
1067         metaBlock, err := r.readBlock(r.metaBH, true)
1068         if err != nil {
1069                 if errors.IsCorrupted(err) {
1070                         r.err = err
1071                         return r, nil
1072                 }
1073                 return nil, err
1074         }
1075
1076         // Set data end.
1077         r.dataEnd = int64(r.metaBH.offset)
1078
1079         // Read metaindex.
1080         metaIter := r.newBlockIter(metaBlock, nil, nil, true)
1081         for metaIter.Next() {
1082                 key := string(metaIter.Key())
1083                 if !strings.HasPrefix(key, "filter.") {
1084                         continue
1085                 }
1086                 fn := key[7:]
1087                 if f0 := o.GetFilter(); f0 != nil && f0.Name() == fn {
1088                         r.filter = f0
1089                 } else {
1090                         for _, f0 := range o.GetAltFilters() {
1091                                 if f0.Name() == fn {
1092                                         r.filter = f0
1093                                         break
1094                                 }
1095                         }
1096                 }
1097                 if r.filter != nil {
1098                         filterBH, n := decodeBlockHandle(metaIter.Value())
1099                         if n == 0 {
1100                                 continue
1101                         }
1102                         r.filterBH = filterBH
1103                         // Update data end.
1104                         r.dataEnd = int64(filterBH.offset)
1105                         break
1106                 }
1107         }
1108         metaIter.Release()
1109         metaBlock.Release()
1110
1111         // Cache index and filter block locally, since we don't have global cache.
1112         if cache == nil {
1113                 r.indexBlock, err = r.readBlock(r.indexBH, true)
1114                 if err != nil {
1115                         if errors.IsCorrupted(err) {
1116                                 r.err = err
1117                                 return r, nil
1118                         }
1119                         return nil, err
1120                 }
1121                 if r.filter != nil {
1122                         r.filterBlock, err = r.readFilterBlock(r.filterBH)
1123                         if err != nil {
1124                                 if !errors.IsCorrupted(err) {
1125                                         return nil, err
1126                                 }
1127
1128                                 // Don't use filter then.
1129                                 r.filter = nil
1130                         }
1131                 }
1132         }
1133
1134         return r, nil
1135 }