OSDN Git Service

new repo
[bytom/vapor.git] / vendor / github.com / syndtr / goleveldb / leveldb / session.go
1 // Copyright (c) 2012, Suryandaru Triandana <syndtr@gmail.com>
2 // All rights reserved.
3 //
4 // Use of this source code is governed by a BSD-style license that can be
5 // found in the LICENSE file.
6
7 package leveldb
8
9 import (
10         "fmt"
11         "io"
12         "os"
13         "sync"
14
15         "github.com/syndtr/goleveldb/leveldb/errors"
16         "github.com/syndtr/goleveldb/leveldb/journal"
17         "github.com/syndtr/goleveldb/leveldb/opt"
18         "github.com/syndtr/goleveldb/leveldb/storage"
19 )
20
21 // ErrManifestCorrupted records manifest corruption. This error will be
22 // wrapped with errors.ErrCorrupted.
23 type ErrManifestCorrupted struct {
24         Field  string
25         Reason string
26 }
27
28 func (e *ErrManifestCorrupted) Error() string {
29         return fmt.Sprintf("leveldb: manifest corrupted (field '%s'): %s", e.Field, e.Reason)
30 }
31
32 func newErrManifestCorrupted(fd storage.FileDesc, field, reason string) error {
33         return errors.NewErrCorrupted(fd, &ErrManifestCorrupted{field, reason})
34 }
35
36 // session represent a persistent database session.
37 type session struct {
38         // Need 64-bit alignment.
39         stNextFileNum    int64 // current unused file number
40         stJournalNum     int64 // current journal file number; need external synchronization
41         stPrevJournalNum int64 // prev journal file number; no longer used; for compatibility with older version of leveldb
42         stTempFileNum    int64
43         stSeqNum         uint64 // last mem compacted seq; need external synchronization
44
45         stor     storage.Storage
46         storLock storage.Locker
47         o        *cachedOptions
48         icmp     *iComparer
49         tops     *tOps
50         fileRef  map[int64]int
51
52         manifest       *journal.Writer
53         manifestWriter storage.Writer
54         manifestFd     storage.FileDesc
55
56         stCompPtrs []internalKey // compaction pointers; need external synchronization
57         stVersion  *version      // current version
58         vmu        sync.Mutex
59 }
60
61 // Creates new initialized session instance.
62 func newSession(stor storage.Storage, o *opt.Options) (s *session, err error) {
63         if stor == nil {
64                 return nil, os.ErrInvalid
65         }
66         storLock, err := stor.Lock()
67         if err != nil {
68                 return
69         }
70         s = &session{
71                 stor:     stor,
72                 storLock: storLock,
73                 fileRef:  make(map[int64]int),
74         }
75         s.setOptions(o)
76         s.tops = newTableOps(s)
77         s.setVersion(newVersion(s))
78         s.log("log@legend F·NumFile S·FileSize N·Entry C·BadEntry B·BadBlock Ke·KeyError D·DroppedEntry L·Level Q·SeqNum T·TimeElapsed")
79         return
80 }
81
82 // Close session.
83 func (s *session) close() {
84         s.tops.close()
85         if s.manifest != nil {
86                 s.manifest.Close()
87         }
88         if s.manifestWriter != nil {
89                 s.manifestWriter.Close()
90         }
91         s.manifest = nil
92         s.manifestWriter = nil
93         s.setVersion(&version{s: s, closing: true})
94 }
95
96 // Release session lock.
97 func (s *session) release() {
98         s.storLock.Unlock()
99 }
100
101 // Create a new database session; need external synchronization.
102 func (s *session) create() error {
103         // create manifest
104         return s.newManifest(nil, nil)
105 }
106
107 // Recover a database session; need external synchronization.
108 func (s *session) recover() (err error) {
109         defer func() {
110                 if os.IsNotExist(err) {
111                         // Don't return os.ErrNotExist if the underlying storage contains
112                         // other files that belong to LevelDB. So the DB won't get trashed.
113                         if fds, _ := s.stor.List(storage.TypeAll); len(fds) > 0 {
114                                 err = &errors.ErrCorrupted{Fd: storage.FileDesc{Type: storage.TypeManifest}, Err: &errors.ErrMissingFiles{}}
115                         }
116                 }
117         }()
118
119         fd, err := s.stor.GetMeta()
120         if err != nil {
121                 return
122         }
123
124         reader, err := s.stor.Open(fd)
125         if err != nil {
126                 return
127         }
128         defer reader.Close()
129
130         var (
131                 // Options.
132                 strict = s.o.GetStrict(opt.StrictManifest)
133
134                 jr      = journal.NewReader(reader, dropper{s, fd}, strict, true)
135                 rec     = &sessionRecord{}
136                 staging = s.stVersion.newStaging()
137         )
138         for {
139                 var r io.Reader
140                 r, err = jr.Next()
141                 if err != nil {
142                         if err == io.EOF {
143                                 err = nil
144                                 break
145                         }
146                         return errors.SetFd(err, fd)
147                 }
148
149                 err = rec.decode(r)
150                 if err == nil {
151                         // save compact pointers
152                         for _, r := range rec.compPtrs {
153                                 s.setCompPtr(r.level, internalKey(r.ikey))
154                         }
155                         // commit record to version staging
156                         staging.commit(rec)
157                 } else {
158                         err = errors.SetFd(err, fd)
159                         if strict || !errors.IsCorrupted(err) {
160                                 return
161                         }
162                         s.logf("manifest error: %v (skipped)", errors.SetFd(err, fd))
163                 }
164                 rec.resetCompPtrs()
165                 rec.resetAddedTables()
166                 rec.resetDeletedTables()
167         }
168
169         switch {
170         case !rec.has(recComparer):
171                 return newErrManifestCorrupted(fd, "comparer", "missing")
172         case rec.comparer != s.icmp.uName():
173                 return newErrManifestCorrupted(fd, "comparer", fmt.Sprintf("mismatch: want '%s', got '%s'", s.icmp.uName(), rec.comparer))
174         case !rec.has(recNextFileNum):
175                 return newErrManifestCorrupted(fd, "next-file-num", "missing")
176         case !rec.has(recJournalNum):
177                 return newErrManifestCorrupted(fd, "journal-file-num", "missing")
178         case !rec.has(recSeqNum):
179                 return newErrManifestCorrupted(fd, "seq-num", "missing")
180         }
181
182         s.manifestFd = fd
183         s.setVersion(staging.finish())
184         s.setNextFileNum(rec.nextFileNum)
185         s.recordCommited(rec)
186         return nil
187 }
188
189 // Commit session; need external synchronization.
190 func (s *session) commit(r *sessionRecord) (err error) {
191         v := s.version()
192         defer v.release()
193
194         // spawn new version based on current version
195         nv := v.spawn(r)
196
197         if s.manifest == nil {
198                 // manifest journal writer not yet created, create one
199                 err = s.newManifest(r, nv)
200         } else {
201                 err = s.flushManifest(r)
202         }
203
204         // finally, apply new version if no error rise
205         if err == nil {
206                 s.setVersion(nv)
207         }
208
209         return
210 }