OSDN Git Service

new repo
[bytom/vapor.git] / vendor / github.com / syndtr / goleveldb / leveldb / db.go
1 // Copyright (c) 2012, Suryandaru Triandana <syndtr@gmail.com>
2 // All rights reserved.
3 //
4 // Use of this source code is governed by a BSD-style license that can be
5 // found in the LICENSE file.
6
7 package leveldb
8
9 import (
10         "container/list"
11         "fmt"
12         "io"
13         "os"
14         "runtime"
15         "strings"
16         "sync"
17         "sync/atomic"
18         "time"
19
20         "github.com/syndtr/goleveldb/leveldb/errors"
21         "github.com/syndtr/goleveldb/leveldb/iterator"
22         "github.com/syndtr/goleveldb/leveldb/journal"
23         "github.com/syndtr/goleveldb/leveldb/memdb"
24         "github.com/syndtr/goleveldb/leveldb/opt"
25         "github.com/syndtr/goleveldb/leveldb/storage"
26         "github.com/syndtr/goleveldb/leveldb/table"
27         "github.com/syndtr/goleveldb/leveldb/util"
28 )
29
30 // DB is a LevelDB database.
31 type DB struct {
32         // Need 64-bit alignment.
33         seq uint64
34
35         // Session.
36         s *session
37
38         // MemDB.
39         memMu           sync.RWMutex
40         memPool         chan *memdb.DB
41         mem, frozenMem  *memDB
42         journal         *journal.Writer
43         journalWriter   storage.Writer
44         journalFd       storage.FileDesc
45         frozenJournalFd storage.FileDesc
46         frozenSeq       uint64
47
48         // Snapshot.
49         snapsMu   sync.Mutex
50         snapsList *list.List
51
52         // Stats.
53         aliveSnaps, aliveIters int32
54
55         // Write.
56         batchPool    sync.Pool
57         writeMergeC  chan writeMerge
58         writeMergedC chan bool
59         writeLockC   chan struct{}
60         writeAckC    chan error
61         writeDelay   time.Duration
62         writeDelayN  int
63         tr           *Transaction
64
65         // Compaction.
66         compCommitLk     sync.Mutex
67         tcompCmdC        chan cCmd
68         tcompPauseC      chan chan<- struct{}
69         mcompCmdC        chan cCmd
70         compErrC         chan error
71         compPerErrC      chan error
72         compErrSetC      chan error
73         compWriteLocking bool
74         compStats        cStats
75         memdbMaxLevel    int // For testing.
76
77         // Close.
78         closeW sync.WaitGroup
79         closeC chan struct{}
80         closed uint32
81         closer io.Closer
82 }
83
84 func openDB(s *session) (*DB, error) {
85         s.log("db@open opening")
86         start := time.Now()
87         db := &DB{
88                 s: s,
89                 // Initial sequence
90                 seq: s.stSeqNum,
91                 // MemDB
92                 memPool: make(chan *memdb.DB, 1),
93                 // Snapshot
94                 snapsList: list.New(),
95                 // Write
96                 batchPool:    sync.Pool{New: newBatch},
97                 writeMergeC:  make(chan writeMerge),
98                 writeMergedC: make(chan bool),
99                 writeLockC:   make(chan struct{}, 1),
100                 writeAckC:    make(chan error),
101                 // Compaction
102                 tcompCmdC:   make(chan cCmd),
103                 tcompPauseC: make(chan chan<- struct{}),
104                 mcompCmdC:   make(chan cCmd),
105                 compErrC:    make(chan error),
106                 compPerErrC: make(chan error),
107                 compErrSetC: make(chan error),
108                 // Close
109                 closeC: make(chan struct{}),
110         }
111
112         // Read-only mode.
113         readOnly := s.o.GetReadOnly()
114
115         if readOnly {
116                 // Recover journals (read-only mode).
117                 if err := db.recoverJournalRO(); err != nil {
118                         return nil, err
119                 }
120         } else {
121                 // Recover journals.
122                 if err := db.recoverJournal(); err != nil {
123                         return nil, err
124                 }
125
126                 // Remove any obsolete files.
127                 if err := db.checkAndCleanFiles(); err != nil {
128                         // Close journal.
129                         if db.journal != nil {
130                                 db.journal.Close()
131                                 db.journalWriter.Close()
132                         }
133                         return nil, err
134                 }
135
136         }
137
138         // Doesn't need to be included in the wait group.
139         go db.compactionError()
140         go db.mpoolDrain()
141
142         if readOnly {
143                 db.SetReadOnly()
144         } else {
145                 db.closeW.Add(2)
146                 go db.tCompaction()
147                 go db.mCompaction()
148                 // go db.jWriter()
149         }
150
151         s.logf("db@open done T·%v", time.Since(start))
152
153         runtime.SetFinalizer(db, (*DB).Close)
154         return db, nil
155 }
156
157 // Open opens or creates a DB for the given storage.
158 // The DB will be created if not exist, unless ErrorIfMissing is true.
159 // Also, if ErrorIfExist is true and the DB exist Open will returns
160 // os.ErrExist error.
161 //
162 // Open will return an error with type of ErrCorrupted if corruption
163 // detected in the DB. Use errors.IsCorrupted to test whether an error is
164 // due to corruption. Corrupted DB can be recovered with Recover function.
165 //
166 // The returned DB instance is safe for concurrent use.
167 // The DB must be closed after use, by calling Close method.
168 func Open(stor storage.Storage, o *opt.Options) (db *DB, err error) {
169         s, err := newSession(stor, o)
170         if err != nil {
171                 return
172         }
173         defer func() {
174                 if err != nil {
175                         s.close()
176                         s.release()
177                 }
178         }()
179
180         err = s.recover()
181         if err != nil {
182                 if !os.IsNotExist(err) || s.o.GetErrorIfMissing() {
183                         return
184                 }
185                 err = s.create()
186                 if err != nil {
187                         return
188                 }
189         } else if s.o.GetErrorIfExist() {
190                 err = os.ErrExist
191                 return
192         }
193
194         return openDB(s)
195 }
196
197 // OpenFile opens or creates a DB for the given path.
198 // The DB will be created if not exist, unless ErrorIfMissing is true.
199 // Also, if ErrorIfExist is true and the DB exist OpenFile will returns
200 // os.ErrExist error.
201 //
202 // OpenFile uses standard file-system backed storage implementation as
203 // described in the leveldb/storage package.
204 //
205 // OpenFile will return an error with type of ErrCorrupted if corruption
206 // detected in the DB. Use errors.IsCorrupted to test whether an error is
207 // due to corruption. Corrupted DB can be recovered with Recover function.
208 //
209 // The returned DB instance is safe for concurrent use.
210 // The DB must be closed after use, by calling Close method.
211 func OpenFile(path string, o *opt.Options) (db *DB, err error) {
212         stor, err := storage.OpenFile(path, o.GetReadOnly())
213         if err != nil {
214                 return
215         }
216         db, err = Open(stor, o)
217         if err != nil {
218                 stor.Close()
219         } else {
220                 db.closer = stor
221         }
222         return
223 }
224
225 // Recover recovers and opens a DB with missing or corrupted manifest files
226 // for the given storage. It will ignore any manifest files, valid or not.
227 // The DB must already exist or it will returns an error.
228 // Also, Recover will ignore ErrorIfMissing and ErrorIfExist options.
229 //
230 // The returned DB instance is safe for concurrent use.
231 // The DB must be closed after use, by calling Close method.
232 func Recover(stor storage.Storage, o *opt.Options) (db *DB, err error) {
233         s, err := newSession(stor, o)
234         if err != nil {
235                 return
236         }
237         defer func() {
238                 if err != nil {
239                         s.close()
240                         s.release()
241                 }
242         }()
243
244         err = recoverTable(s, o)
245         if err != nil {
246                 return
247         }
248         return openDB(s)
249 }
250
251 // RecoverFile recovers and opens a DB with missing or corrupted manifest files
252 // for the given path. It will ignore any manifest files, valid or not.
253 // The DB must already exist or it will returns an error.
254 // Also, Recover will ignore ErrorIfMissing and ErrorIfExist options.
255 //
256 // RecoverFile uses standard file-system backed storage implementation as described
257 // in the leveldb/storage package.
258 //
259 // The returned DB instance is safe for concurrent use.
260 // The DB must be closed after use, by calling Close method.
261 func RecoverFile(path string, o *opt.Options) (db *DB, err error) {
262         stor, err := storage.OpenFile(path, false)
263         if err != nil {
264                 return
265         }
266         db, err = Recover(stor, o)
267         if err != nil {
268                 stor.Close()
269         } else {
270                 db.closer = stor
271         }
272         return
273 }
274
275 func recoverTable(s *session, o *opt.Options) error {
276         o = dupOptions(o)
277         // Mask StrictReader, lets StrictRecovery doing its job.
278         o.Strict &= ^opt.StrictReader
279
280         // Get all tables and sort it by file number.
281         fds, err := s.stor.List(storage.TypeTable)
282         if err != nil {
283                 return err
284         }
285         sortFds(fds)
286
287         var (
288                 maxSeq                                                            uint64
289                 recoveredKey, goodKey, corruptedKey, corruptedBlock, droppedTable int
290
291                 // We will drop corrupted table.
292                 strict = o.GetStrict(opt.StrictRecovery)
293                 noSync = o.GetNoSync()
294
295                 rec   = &sessionRecord{}
296                 bpool = util.NewBufferPool(o.GetBlockSize() + 5)
297         )
298         buildTable := func(iter iterator.Iterator) (tmpFd storage.FileDesc, size int64, err error) {
299                 tmpFd = s.newTemp()
300                 writer, err := s.stor.Create(tmpFd)
301                 if err != nil {
302                         return
303                 }
304                 defer func() {
305                         writer.Close()
306                         if err != nil {
307                                 s.stor.Remove(tmpFd)
308                                 tmpFd = storage.FileDesc{}
309                         }
310                 }()
311
312                 // Copy entries.
313                 tw := table.NewWriter(writer, o)
314                 for iter.Next() {
315                         key := iter.Key()
316                         if validInternalKey(key) {
317                                 err = tw.Append(key, iter.Value())
318                                 if err != nil {
319                                         return
320                                 }
321                         }
322                 }
323                 err = iter.Error()
324                 if err != nil {
325                         return
326                 }
327                 err = tw.Close()
328                 if err != nil {
329                         return
330                 }
331                 if !noSync {
332                         err = writer.Sync()
333                         if err != nil {
334                                 return
335                         }
336                 }
337                 size = int64(tw.BytesLen())
338                 return
339         }
340         recoverTable := func(fd storage.FileDesc) error {
341                 s.logf("table@recovery recovering @%d", fd.Num)
342                 reader, err := s.stor.Open(fd)
343                 if err != nil {
344                         return err
345                 }
346                 var closed bool
347                 defer func() {
348                         if !closed {
349                                 reader.Close()
350                         }
351                 }()
352
353                 // Get file size.
354                 size, err := reader.Seek(0, 2)
355                 if err != nil {
356                         return err
357                 }
358
359                 var (
360                         tSeq                                     uint64
361                         tgoodKey, tcorruptedKey, tcorruptedBlock int
362                         imin, imax                               []byte
363                 )
364                 tr, err := table.NewReader(reader, size, fd, nil, bpool, o)
365                 if err != nil {
366                         return err
367                 }
368                 iter := tr.NewIterator(nil, nil)
369                 if itererr, ok := iter.(iterator.ErrorCallbackSetter); ok {
370                         itererr.SetErrorCallback(func(err error) {
371                                 if errors.IsCorrupted(err) {
372                                         s.logf("table@recovery block corruption @%d %q", fd.Num, err)
373                                         tcorruptedBlock++
374                                 }
375                         })
376                 }
377
378                 // Scan the table.
379                 for iter.Next() {
380                         key := iter.Key()
381                         _, seq, _, kerr := parseInternalKey(key)
382                         if kerr != nil {
383                                 tcorruptedKey++
384                                 continue
385                         }
386                         tgoodKey++
387                         if seq > tSeq {
388                                 tSeq = seq
389                         }
390                         if imin == nil {
391                                 imin = append([]byte{}, key...)
392                         }
393                         imax = append(imax[:0], key...)
394                 }
395                 if err := iter.Error(); err != nil {
396                         iter.Release()
397                         return err
398                 }
399                 iter.Release()
400
401                 goodKey += tgoodKey
402                 corruptedKey += tcorruptedKey
403                 corruptedBlock += tcorruptedBlock
404
405                 if strict && (tcorruptedKey > 0 || tcorruptedBlock > 0) {
406                         droppedTable++
407                         s.logf("table@recovery dropped @%d Gk·%d Ck·%d Cb·%d S·%d Q·%d", fd.Num, tgoodKey, tcorruptedKey, tcorruptedBlock, size, tSeq)
408                         return nil
409                 }
410
411                 if tgoodKey > 0 {
412                         if tcorruptedKey > 0 || tcorruptedBlock > 0 {
413                                 // Rebuild the table.
414                                 s.logf("table@recovery rebuilding @%d", fd.Num)
415                                 iter := tr.NewIterator(nil, nil)
416                                 tmpFd, newSize, err := buildTable(iter)
417                                 iter.Release()
418                                 if err != nil {
419                                         return err
420                                 }
421                                 closed = true
422                                 reader.Close()
423                                 if err := s.stor.Rename(tmpFd, fd); err != nil {
424                                         return err
425                                 }
426                                 size = newSize
427                         }
428                         if tSeq > maxSeq {
429                                 maxSeq = tSeq
430                         }
431                         recoveredKey += tgoodKey
432                         // Add table to level 0.
433                         rec.addTable(0, fd.Num, size, imin, imax)
434                         s.logf("table@recovery recovered @%d Gk·%d Ck·%d Cb·%d S·%d Q·%d", fd.Num, tgoodKey, tcorruptedKey, tcorruptedBlock, size, tSeq)
435                 } else {
436                         droppedTable++
437                         s.logf("table@recovery unrecoverable @%d Ck·%d Cb·%d S·%d", fd.Num, tcorruptedKey, tcorruptedBlock, size)
438                 }
439
440                 return nil
441         }
442
443         // Recover all tables.
444         if len(fds) > 0 {
445                 s.logf("table@recovery F·%d", len(fds))
446
447                 // Mark file number as used.
448                 s.markFileNum(fds[len(fds)-1].Num)
449
450                 for _, fd := range fds {
451                         if err := recoverTable(fd); err != nil {
452                                 return err
453                         }
454                 }
455
456                 s.logf("table@recovery recovered F·%d N·%d Gk·%d Ck·%d Q·%d", len(fds), recoveredKey, goodKey, corruptedKey, maxSeq)
457         }
458
459         // Set sequence number.
460         rec.setSeqNum(maxSeq)
461
462         // Create new manifest.
463         if err := s.create(); err != nil {
464                 return err
465         }
466
467         // Commit.
468         return s.commit(rec)
469 }
470
471 func (db *DB) recoverJournal() error {
472         // Get all journals and sort it by file number.
473         rawFds, err := db.s.stor.List(storage.TypeJournal)
474         if err != nil {
475                 return err
476         }
477         sortFds(rawFds)
478
479         // Journals that will be recovered.
480         var fds []storage.FileDesc
481         for _, fd := range rawFds {
482                 if fd.Num >= db.s.stJournalNum || fd.Num == db.s.stPrevJournalNum {
483                         fds = append(fds, fd)
484                 }
485         }
486
487         var (
488                 ofd storage.FileDesc // Obsolete file.
489                 rec = &sessionRecord{}
490         )
491
492         // Recover journals.
493         if len(fds) > 0 {
494                 db.logf("journal@recovery F·%d", len(fds))
495
496                 // Mark file number as used.
497                 db.s.markFileNum(fds[len(fds)-1].Num)
498
499                 var (
500                         // Options.
501                         strict      = db.s.o.GetStrict(opt.StrictJournal)
502                         checksum    = db.s.o.GetStrict(opt.StrictJournalChecksum)
503                         writeBuffer = db.s.o.GetWriteBuffer()
504
505                         jr       *journal.Reader
506                         mdb      = memdb.New(db.s.icmp, writeBuffer)
507                         buf      = &util.Buffer{}
508                         batchSeq uint64
509                         batchLen int
510                 )
511
512                 for _, fd := range fds {
513                         db.logf("journal@recovery recovering @%d", fd.Num)
514
515                         fr, err := db.s.stor.Open(fd)
516                         if err != nil {
517                                 return err
518                         }
519
520                         // Create or reset journal reader instance.
521                         if jr == nil {
522                                 jr = journal.NewReader(fr, dropper{db.s, fd}, strict, checksum)
523                         } else {
524                                 jr.Reset(fr, dropper{db.s, fd}, strict, checksum)
525                         }
526
527                         // Flush memdb and remove obsolete journal file.
528                         if !ofd.Zero() {
529                                 if mdb.Len() > 0 {
530                                         if _, err := db.s.flushMemdb(rec, mdb, 0); err != nil {
531                                                 fr.Close()
532                                                 return err
533                                         }
534                                 }
535
536                                 rec.setJournalNum(fd.Num)
537                                 rec.setSeqNum(db.seq)
538                                 if err := db.s.commit(rec); err != nil {
539                                         fr.Close()
540                                         return err
541                                 }
542                                 rec.resetAddedTables()
543
544                                 db.s.stor.Remove(ofd)
545                                 ofd = storage.FileDesc{}
546                         }
547
548                         // Replay journal to memdb.
549                         mdb.Reset()
550                         for {
551                                 r, err := jr.Next()
552                                 if err != nil {
553                                         if err == io.EOF {
554                                                 break
555                                         }
556
557                                         fr.Close()
558                                         return errors.SetFd(err, fd)
559                                 }
560
561                                 buf.Reset()
562                                 if _, err := buf.ReadFrom(r); err != nil {
563                                         if err == io.ErrUnexpectedEOF {
564                                                 // This is error returned due to corruption, with strict == false.
565                                                 continue
566                                         }
567
568                                         fr.Close()
569                                         return errors.SetFd(err, fd)
570                                 }
571                                 batchSeq, batchLen, err = decodeBatchToMem(buf.Bytes(), db.seq, mdb)
572                                 if err != nil {
573                                         if !strict && errors.IsCorrupted(err) {
574                                                 db.s.logf("journal error: %v (skipped)", err)
575                                                 // We won't apply sequence number as it might be corrupted.
576                                                 continue
577                                         }
578
579                                         fr.Close()
580                                         return errors.SetFd(err, fd)
581                                 }
582
583                                 // Save sequence number.
584                                 db.seq = batchSeq + uint64(batchLen)
585
586                                 // Flush it if large enough.
587                                 if mdb.Size() >= writeBuffer {
588                                         if _, err := db.s.flushMemdb(rec, mdb, 0); err != nil {
589                                                 fr.Close()
590                                                 return err
591                                         }
592
593                                         mdb.Reset()
594                                 }
595                         }
596
597                         fr.Close()
598                         ofd = fd
599                 }
600
601                 // Flush the last memdb.
602                 if mdb.Len() > 0 {
603                         if _, err := db.s.flushMemdb(rec, mdb, 0); err != nil {
604                                 return err
605                         }
606                 }
607         }
608
609         // Create a new journal.
610         if _, err := db.newMem(0); err != nil {
611                 return err
612         }
613
614         // Commit.
615         rec.setJournalNum(db.journalFd.Num)
616         rec.setSeqNum(db.seq)
617         if err := db.s.commit(rec); err != nil {
618                 // Close journal on error.
619                 if db.journal != nil {
620                         db.journal.Close()
621                         db.journalWriter.Close()
622                 }
623                 return err
624         }
625
626         // Remove the last obsolete journal file.
627         if !ofd.Zero() {
628                 db.s.stor.Remove(ofd)
629         }
630
631         return nil
632 }
633
634 func (db *DB) recoverJournalRO() error {
635         // Get all journals and sort it by file number.
636         rawFds, err := db.s.stor.List(storage.TypeJournal)
637         if err != nil {
638                 return err
639         }
640         sortFds(rawFds)
641
642         // Journals that will be recovered.
643         var fds []storage.FileDesc
644         for _, fd := range rawFds {
645                 if fd.Num >= db.s.stJournalNum || fd.Num == db.s.stPrevJournalNum {
646                         fds = append(fds, fd)
647                 }
648         }
649
650         var (
651                 // Options.
652                 strict      = db.s.o.GetStrict(opt.StrictJournal)
653                 checksum    = db.s.o.GetStrict(opt.StrictJournalChecksum)
654                 writeBuffer = db.s.o.GetWriteBuffer()
655
656                 mdb = memdb.New(db.s.icmp, writeBuffer)
657         )
658
659         // Recover journals.
660         if len(fds) > 0 {
661                 db.logf("journal@recovery RO·Mode F·%d", len(fds))
662
663                 var (
664                         jr       *journal.Reader
665                         buf      = &util.Buffer{}
666                         batchSeq uint64
667                         batchLen int
668                 )
669
670                 for _, fd := range fds {
671                         db.logf("journal@recovery recovering @%d", fd.Num)
672
673                         fr, err := db.s.stor.Open(fd)
674                         if err != nil {
675                                 return err
676                         }
677
678                         // Create or reset journal reader instance.
679                         if jr == nil {
680                                 jr = journal.NewReader(fr, dropper{db.s, fd}, strict, checksum)
681                         } else {
682                                 jr.Reset(fr, dropper{db.s, fd}, strict, checksum)
683                         }
684
685                         // Replay journal to memdb.
686                         for {
687                                 r, err := jr.Next()
688                                 if err != nil {
689                                         if err == io.EOF {
690                                                 break
691                                         }
692
693                                         fr.Close()
694                                         return errors.SetFd(err, fd)
695                                 }
696
697                                 buf.Reset()
698                                 if _, err := buf.ReadFrom(r); err != nil {
699                                         if err == io.ErrUnexpectedEOF {
700                                                 // This is error returned due to corruption, with strict == false.
701                                                 continue
702                                         }
703
704                                         fr.Close()
705                                         return errors.SetFd(err, fd)
706                                 }
707                                 batchSeq, batchLen, err = decodeBatchToMem(buf.Bytes(), db.seq, mdb)
708                                 if err != nil {
709                                         if !strict && errors.IsCorrupted(err) {
710                                                 db.s.logf("journal error: %v (skipped)", err)
711                                                 // We won't apply sequence number as it might be corrupted.
712                                                 continue
713                                         }
714
715                                         fr.Close()
716                                         return errors.SetFd(err, fd)
717                                 }
718
719                                 // Save sequence number.
720                                 db.seq = batchSeq + uint64(batchLen)
721                         }
722
723                         fr.Close()
724                 }
725         }
726
727         // Set memDB.
728         db.mem = &memDB{db: db, DB: mdb, ref: 1}
729
730         return nil
731 }
732
733 func memGet(mdb *memdb.DB, ikey internalKey, icmp *iComparer) (ok bool, mv []byte, err error) {
734         mk, mv, err := mdb.Find(ikey)
735         if err == nil {
736                 ukey, _, kt, kerr := parseInternalKey(mk)
737                 if kerr != nil {
738                         // Shouldn't have had happen.
739                         panic(kerr)
740                 }
741                 if icmp.uCompare(ukey, ikey.ukey()) == 0 {
742                         if kt == keyTypeDel {
743                                 return true, nil, ErrNotFound
744                         }
745                         return true, mv, nil
746
747                 }
748         } else if err != ErrNotFound {
749                 return true, nil, err
750         }
751         return
752 }
753
754 func (db *DB) get(auxm *memdb.DB, auxt tFiles, key []byte, seq uint64, ro *opt.ReadOptions) (value []byte, err error) {
755         ikey := makeInternalKey(nil, key, seq, keyTypeSeek)
756
757         if auxm != nil {
758                 if ok, mv, me := memGet(auxm, ikey, db.s.icmp); ok {
759                         return append([]byte{}, mv...), me
760                 }
761         }
762
763         em, fm := db.getMems()
764         for _, m := range [...]*memDB{em, fm} {
765                 if m == nil {
766                         continue
767                 }
768                 defer m.decref()
769
770                 if ok, mv, me := memGet(m.DB, ikey, db.s.icmp); ok {
771                         return append([]byte{}, mv...), me
772                 }
773         }
774
775         v := db.s.version()
776         value, cSched, err := v.get(auxt, ikey, ro, false)
777         v.release()
778         if cSched {
779                 // Trigger table compaction.
780                 db.compTrigger(db.tcompCmdC)
781         }
782         return
783 }
784
785 func nilIfNotFound(err error) error {
786         if err == ErrNotFound {
787                 return nil
788         }
789         return err
790 }
791
792 func (db *DB) has(auxm *memdb.DB, auxt tFiles, key []byte, seq uint64, ro *opt.ReadOptions) (ret bool, err error) {
793         ikey := makeInternalKey(nil, key, seq, keyTypeSeek)
794
795         if auxm != nil {
796                 if ok, _, me := memGet(auxm, ikey, db.s.icmp); ok {
797                         return me == nil, nilIfNotFound(me)
798                 }
799         }
800
801         em, fm := db.getMems()
802         for _, m := range [...]*memDB{em, fm} {
803                 if m == nil {
804                         continue
805                 }
806                 defer m.decref()
807
808                 if ok, _, me := memGet(m.DB, ikey, db.s.icmp); ok {
809                         return me == nil, nilIfNotFound(me)
810                 }
811         }
812
813         v := db.s.version()
814         _, cSched, err := v.get(auxt, ikey, ro, true)
815         v.release()
816         if cSched {
817                 // Trigger table compaction.
818                 db.compTrigger(db.tcompCmdC)
819         }
820         if err == nil {
821                 ret = true
822         } else if err == ErrNotFound {
823                 err = nil
824         }
825         return
826 }
827
828 // Get gets the value for the given key. It returns ErrNotFound if the
829 // DB does not contains the key.
830 //
831 // The returned slice is its own copy, it is safe to modify the contents
832 // of the returned slice.
833 // It is safe to modify the contents of the argument after Get returns.
834 func (db *DB) Get(key []byte, ro *opt.ReadOptions) (value []byte, err error) {
835         err = db.ok()
836         if err != nil {
837                 return
838         }
839
840         se := db.acquireSnapshot()
841         defer db.releaseSnapshot(se)
842         return db.get(nil, nil, key, se.seq, ro)
843 }
844
845 // Has returns true if the DB does contains the given key.
846 //
847 // It is safe to modify the contents of the argument after Has returns.
848 func (db *DB) Has(key []byte, ro *opt.ReadOptions) (ret bool, err error) {
849         err = db.ok()
850         if err != nil {
851                 return
852         }
853
854         se := db.acquireSnapshot()
855         defer db.releaseSnapshot(se)
856         return db.has(nil, nil, key, se.seq, ro)
857 }
858
859 // NewIterator returns an iterator for the latest snapshot of the
860 // underlying DB.
861 // The returned iterator is not safe for concurrent use, but it is safe to use
862 // multiple iterators concurrently, with each in a dedicated goroutine.
863 // It is also safe to use an iterator concurrently with modifying its
864 // underlying DB. The resultant key/value pairs are guaranteed to be
865 // consistent.
866 //
867 // Slice allows slicing the iterator to only contains keys in the given
868 // range. A nil Range.Start is treated as a key before all keys in the
869 // DB. And a nil Range.Limit is treated as a key after all keys in
870 // the DB.
871 //
872 // The iterator must be released after use, by calling Release method.
873 //
874 // Also read Iterator documentation of the leveldb/iterator package.
875 func (db *DB) NewIterator(slice *util.Range, ro *opt.ReadOptions) iterator.Iterator {
876         if err := db.ok(); err != nil {
877                 return iterator.NewEmptyIterator(err)
878         }
879
880         se := db.acquireSnapshot()
881         defer db.releaseSnapshot(se)
882         // Iterator holds 'version' lock, 'version' is immutable so snapshot
883         // can be released after iterator created.
884         return db.newIterator(nil, nil, se.seq, slice, ro)
885 }
886
887 // GetSnapshot returns a latest snapshot of the underlying DB. A snapshot
888 // is a frozen snapshot of a DB state at a particular point in time. The
889 // content of snapshot are guaranteed to be consistent.
890 //
891 // The snapshot must be released after use, by calling Release method.
892 func (db *DB) GetSnapshot() (*Snapshot, error) {
893         if err := db.ok(); err != nil {
894                 return nil, err
895         }
896
897         return db.newSnapshot(), nil
898 }
899
900 // GetProperty returns value of the given property name.
901 //
902 // Property names:
903 //      leveldb.num-files-at-level{n}
904 //              Returns the number of files at level 'n'.
905 //      leveldb.stats
906 //              Returns statistics of the underlying DB.
907 //      leveldb.sstables
908 //              Returns sstables list for each level.
909 //      leveldb.blockpool
910 //              Returns block pool stats.
911 //      leveldb.cachedblock
912 //              Returns size of cached block.
913 //      leveldb.openedtables
914 //              Returns number of opened tables.
915 //      leveldb.alivesnaps
916 //              Returns number of alive snapshots.
917 //      leveldb.aliveiters
918 //              Returns number of alive iterators.
919 func (db *DB) GetProperty(name string) (value string, err error) {
920         err = db.ok()
921         if err != nil {
922                 return
923         }
924
925         const prefix = "leveldb."
926         if !strings.HasPrefix(name, prefix) {
927                 return "", ErrNotFound
928         }
929         p := name[len(prefix):]
930
931         v := db.s.version()
932         defer v.release()
933
934         numFilesPrefix := "num-files-at-level"
935         switch {
936         case strings.HasPrefix(p, numFilesPrefix):
937                 var level uint
938                 var rest string
939                 n, _ := fmt.Sscanf(p[len(numFilesPrefix):], "%d%s", &level, &rest)
940                 if n != 1 {
941                         err = ErrNotFound
942                 } else {
943                         value = fmt.Sprint(v.tLen(int(level)))
944                 }
945         case p == "stats":
946                 value = "Compactions\n" +
947                         " Level |   Tables   |    Size(MB)   |    Time(sec)  |    Read(MB)   |   Write(MB)\n" +
948                         "-------+------------+---------------+---------------+---------------+---------------\n"
949                 for level, tables := range v.levels {
950                         duration, read, write := db.compStats.getStat(level)
951                         if len(tables) == 0 && duration == 0 {
952                                 continue
953                         }
954                         value += fmt.Sprintf(" %3d   | %10d | %13.5f | %13.5f | %13.5f | %13.5f\n",
955                                 level, len(tables), float64(tables.size())/1048576.0, duration.Seconds(),
956                                 float64(read)/1048576.0, float64(write)/1048576.0)
957                 }
958         case p == "sstables":
959                 for level, tables := range v.levels {
960                         value += fmt.Sprintf("--- level %d ---\n", level)
961                         for _, t := range tables {
962                                 value += fmt.Sprintf("%d:%d[%q .. %q]\n", t.fd.Num, t.size, t.imin, t.imax)
963                         }
964                 }
965         case p == "blockpool":
966                 value = fmt.Sprintf("%v", db.s.tops.bpool)
967         case p == "cachedblock":
968                 if db.s.tops.bcache != nil {
969                         value = fmt.Sprintf("%d", db.s.tops.bcache.Size())
970                 } else {
971                         value = "<nil>"
972                 }
973         case p == "openedtables":
974                 value = fmt.Sprintf("%d", db.s.tops.cache.Size())
975         case p == "alivesnaps":
976                 value = fmt.Sprintf("%d", atomic.LoadInt32(&db.aliveSnaps))
977         case p == "aliveiters":
978                 value = fmt.Sprintf("%d", atomic.LoadInt32(&db.aliveIters))
979         default:
980                 err = ErrNotFound
981         }
982
983         return
984 }
985
986 // SizeOf calculates approximate sizes of the given key ranges.
987 // The length of the returned sizes are equal with the length of the given
988 // ranges. The returned sizes measure storage space usage, so if the user
989 // data compresses by a factor of ten, the returned sizes will be one-tenth
990 // the size of the corresponding user data size.
991 // The results may not include the sizes of recently written data.
992 func (db *DB) SizeOf(ranges []util.Range) (Sizes, error) {
993         if err := db.ok(); err != nil {
994                 return nil, err
995         }
996
997         v := db.s.version()
998         defer v.release()
999
1000         sizes := make(Sizes, 0, len(ranges))
1001         for _, r := range ranges {
1002                 imin := makeInternalKey(nil, r.Start, keyMaxSeq, keyTypeSeek)
1003                 imax := makeInternalKey(nil, r.Limit, keyMaxSeq, keyTypeSeek)
1004                 start, err := v.offsetOf(imin)
1005                 if err != nil {
1006                         return nil, err
1007                 }
1008                 limit, err := v.offsetOf(imax)
1009                 if err != nil {
1010                         return nil, err
1011                 }
1012                 var size int64
1013                 if limit >= start {
1014                         size = limit - start
1015                 }
1016                 sizes = append(sizes, size)
1017         }
1018
1019         return sizes, nil
1020 }
1021
1022 // Close closes the DB. This will also releases any outstanding snapshot,
1023 // abort any in-flight compaction and discard open transaction.
1024 //
1025 // It is not safe to close a DB until all outstanding iterators are released.
1026 // It is valid to call Close multiple times. Other methods should not be
1027 // called after the DB has been closed.
1028 func (db *DB) Close() error {
1029         if !db.setClosed() {
1030                 return ErrClosed
1031         }
1032
1033         start := time.Now()
1034         db.log("db@close closing")
1035
1036         // Clear the finalizer.
1037         runtime.SetFinalizer(db, nil)
1038
1039         // Get compaction error.
1040         var err error
1041         select {
1042         case err = <-db.compErrC:
1043                 if err == ErrReadOnly {
1044                         err = nil
1045                 }
1046         default:
1047         }
1048
1049         // Signal all goroutines.
1050         close(db.closeC)
1051
1052         // Discard open transaction.
1053         if db.tr != nil {
1054                 db.tr.Discard()
1055         }
1056
1057         // Acquire writer lock.
1058         db.writeLockC <- struct{}{}
1059
1060         // Wait for all gorotines to exit.
1061         db.closeW.Wait()
1062
1063         // Closes journal.
1064         if db.journal != nil {
1065                 db.journal.Close()
1066                 db.journalWriter.Close()
1067                 db.journal = nil
1068                 db.journalWriter = nil
1069         }
1070
1071         if db.writeDelayN > 0 {
1072                 db.logf("db@write was delayed N·%d T·%v", db.writeDelayN, db.writeDelay)
1073         }
1074
1075         // Close session.
1076         db.s.close()
1077         db.logf("db@close done T·%v", time.Since(start))
1078         db.s.release()
1079
1080         if db.closer != nil {
1081                 if err1 := db.closer.Close(); err == nil {
1082                         err = err1
1083                 }
1084                 db.closer = nil
1085         }
1086
1087         // Clear memdbs.
1088         db.clearMems()
1089
1090         return err
1091 }