1 // Copyright (c) 2012, Suryandaru Triandana <syndtr@gmail.com>
2 // All rights reserved.
4 // Use of this source code is governed by a BSD-style license that can be
5 // found in the LICENSE file.
13 "github.com/syndtr/goleveldb/leveldb/journal"
14 "github.com/syndtr/goleveldb/leveldb/storage"
24 func (d dropper) Drop(err error) {
25 if e, ok := err.(*journal.ErrCorrupted); ok {
26 d.s.logf("journal@drop %s-%d S·%s %q", d.fd.Type, d.fd.Num, shortenb(e.Size), e.Reason)
28 d.s.logf("journal@drop %s-%d %q", d.fd.Type, d.fd.Num, err)
32 func (s *session) log(v ...interface{}) { s.stor.Log(fmt.Sprint(v...)) }
33 func (s *session) logf(format string, v ...interface{}) { s.stor.Log(fmt.Sprintf(format, v...)) }
37 func (s *session) newTemp() storage.FileDesc {
38 num := atomic.AddInt64(&s.stTempFileNum, 1) - 1
39 return storage.FileDesc{storage.TypeTemp, num}
42 func (s *session) addFileRef(fd storage.FileDesc, ref int) int {
43 ref += s.fileRef[fd.Num]
45 s.fileRef[fd.Num] = ref
47 delete(s.fileRef, fd.Num)
49 panic(fmt.Sprintf("negative ref: %v", fd))
56 // Get current version. This will incr version ref, must call
57 // version.release (exactly once) after use.
58 func (s *session) version() *version {
65 func (s *session) tLen(level int) int {
68 return s.stVersion.tLen(level)
71 // Set current version to v.
72 func (s *session) setVersion(v *version) {
75 // Hold by session. It is important to call this first before releasing
76 // current version, otherwise the still used files might get released.
78 if s.stVersion != nil {
79 // Release current version.
80 s.stVersion.releaseNB()
85 // Get current unused file number.
86 func (s *session) nextFileNum() int64 {
87 return atomic.LoadInt64(&s.stNextFileNum)
90 // Set current unused file number to num.
91 func (s *session) setNextFileNum(num int64) {
92 atomic.StoreInt64(&s.stNextFileNum, num)
95 // Mark file number as used.
96 func (s *session) markFileNum(num int64) {
97 nextFileNum := num + 1
99 old, x := s.stNextFileNum, nextFileNum
103 if atomic.CompareAndSwapInt64(&s.stNextFileNum, old, x) {
109 // Allocate a file number.
110 func (s *session) allocFileNum() int64 {
111 return atomic.AddInt64(&s.stNextFileNum, 1) - 1
114 // Reuse given file number.
115 func (s *session) reuseFileNum(num int64) {
117 old, x := s.stNextFileNum, num
121 if atomic.CompareAndSwapInt64(&s.stNextFileNum, old, x) {
127 // Set compaction ptr at given level; need external synchronization.
128 func (s *session) setCompPtr(level int, ik internalKey) {
129 if level >= len(s.stCompPtrs) {
130 newCompPtrs := make([]internalKey, level+1)
131 copy(newCompPtrs, s.stCompPtrs)
132 s.stCompPtrs = newCompPtrs
134 s.stCompPtrs[level] = append(internalKey{}, ik...)
137 // Get compaction ptr at given level; need external synchronization.
138 func (s *session) getCompPtr(level int) internalKey {
139 if level >= len(s.stCompPtrs) {
142 return s.stCompPtrs[level]
145 // Manifest related utils.
147 // Fill given session record obj with current states; need external
149 func (s *session) fillRecord(r *sessionRecord, snapshot bool) {
150 r.setNextFileNum(s.nextFileNum())
153 if !r.has(recJournalNum) {
154 r.setJournalNum(s.stJournalNum)
157 if !r.has(recSeqNum) {
158 r.setSeqNum(s.stSeqNum)
161 for level, ik := range s.stCompPtrs {
163 r.addCompPtr(level, ik)
167 r.setComparer(s.icmp.uName())
171 // Mark if record has been committed, this will update session state;
172 // need external synchronization.
173 func (s *session) recordCommited(rec *sessionRecord) {
174 if rec.has(recJournalNum) {
175 s.stJournalNum = rec.journalNum
178 if rec.has(recPrevJournalNum) {
179 s.stPrevJournalNum = rec.prevJournalNum
182 if rec.has(recSeqNum) {
183 s.stSeqNum = rec.seqNum
186 for _, r := range rec.compPtrs {
187 s.setCompPtr(r.level, internalKey(r.ikey))
191 // Create a new manifest file; need external synchronization.
192 func (s *session) newManifest(rec *sessionRecord, v *version) (err error) {
193 fd := storage.FileDesc{storage.TypeManifest, s.allocFileNum()}
194 writer, err := s.stor.Create(fd)
198 jw := journal.NewWriter(writer)
205 rec = &sessionRecord{}
207 s.fillRecord(rec, true)
212 s.recordCommited(rec)
213 if s.manifest != nil {
216 if s.manifestWriter != nil {
217 s.manifestWriter.Close()
219 if !s.manifestFd.Zero() {
220 s.stor.Remove(s.manifestFd)
223 s.manifestWriter = writer
228 s.reuseFileNum(fd.Num)
244 err = s.stor.SetMeta(fd)
248 // Flush record to disk.
249 func (s *session) flushManifest(rec *sessionRecord) (err error) {
250 s.fillRecord(rec, false)
251 w, err := s.manifest.Next()
259 err = s.manifest.Flush()
263 if !s.o.GetNoSync() {
264 err = s.manifestWriter.Sync()
269 s.recordCommited(rec)