OSDN Git Service

new repo
[bytom/vapor.git] / vendor / github.com / syndtr / goleveldb / leveldb / session_util.go
1 // Copyright (c) 2012, Suryandaru Triandana <syndtr@gmail.com>
2 // All rights reserved.
3 //
4 // Use of this source code is governed by a BSD-style license that can be
5 // found in the LICENSE file.
6
7 package leveldb
8
9 import (
10         "fmt"
11         "sync/atomic"
12
13         "github.com/syndtr/goleveldb/leveldb/journal"
14         "github.com/syndtr/goleveldb/leveldb/storage"
15 )
16
17 // Logging.
18
19 type dropper struct {
20         s  *session
21         fd storage.FileDesc
22 }
23
24 func (d dropper) Drop(err error) {
25         if e, ok := err.(*journal.ErrCorrupted); ok {
26                 d.s.logf("journal@drop %s-%d S·%s %q", d.fd.Type, d.fd.Num, shortenb(e.Size), e.Reason)
27         } else {
28                 d.s.logf("journal@drop %s-%d %q", d.fd.Type, d.fd.Num, err)
29         }
30 }
31
32 func (s *session) log(v ...interface{})                 { s.stor.Log(fmt.Sprint(v...)) }
33 func (s *session) logf(format string, v ...interface{}) { s.stor.Log(fmt.Sprintf(format, v...)) }
34
35 // File utils.
36
37 func (s *session) newTemp() storage.FileDesc {
38         num := atomic.AddInt64(&s.stTempFileNum, 1) - 1
39         return storage.FileDesc{storage.TypeTemp, num}
40 }
41
42 func (s *session) addFileRef(fd storage.FileDesc, ref int) int {
43         ref += s.fileRef[fd.Num]
44         if ref > 0 {
45                 s.fileRef[fd.Num] = ref
46         } else if ref == 0 {
47                 delete(s.fileRef, fd.Num)
48         } else {
49                 panic(fmt.Sprintf("negative ref: %v", fd))
50         }
51         return ref
52 }
53
54 // Session state.
55
56 // Get current version. This will incr version ref, must call
57 // version.release (exactly once) after use.
58 func (s *session) version() *version {
59         s.vmu.Lock()
60         defer s.vmu.Unlock()
61         s.stVersion.incref()
62         return s.stVersion
63 }
64
65 func (s *session) tLen(level int) int {
66         s.vmu.Lock()
67         defer s.vmu.Unlock()
68         return s.stVersion.tLen(level)
69 }
70
71 // Set current version to v.
72 func (s *session) setVersion(v *version) {
73         s.vmu.Lock()
74         defer s.vmu.Unlock()
75         // Hold by session. It is important to call this first before releasing
76         // current version, otherwise the still used files might get released.
77         v.incref()
78         if s.stVersion != nil {
79                 // Release current version.
80                 s.stVersion.releaseNB()
81         }
82         s.stVersion = v
83 }
84
85 // Get current unused file number.
86 func (s *session) nextFileNum() int64 {
87         return atomic.LoadInt64(&s.stNextFileNum)
88 }
89
90 // Set current unused file number to num.
91 func (s *session) setNextFileNum(num int64) {
92         atomic.StoreInt64(&s.stNextFileNum, num)
93 }
94
95 // Mark file number as used.
96 func (s *session) markFileNum(num int64) {
97         nextFileNum := num + 1
98         for {
99                 old, x := s.stNextFileNum, nextFileNum
100                 if old > x {
101                         x = old
102                 }
103                 if atomic.CompareAndSwapInt64(&s.stNextFileNum, old, x) {
104                         break
105                 }
106         }
107 }
108
109 // Allocate a file number.
110 func (s *session) allocFileNum() int64 {
111         return atomic.AddInt64(&s.stNextFileNum, 1) - 1
112 }
113
114 // Reuse given file number.
115 func (s *session) reuseFileNum(num int64) {
116         for {
117                 old, x := s.stNextFileNum, num
118                 if old != x+1 {
119                         x = old
120                 }
121                 if atomic.CompareAndSwapInt64(&s.stNextFileNum, old, x) {
122                         break
123                 }
124         }
125 }
126
127 // Set compaction ptr at given level; need external synchronization.
128 func (s *session) setCompPtr(level int, ik internalKey) {
129         if level >= len(s.stCompPtrs) {
130                 newCompPtrs := make([]internalKey, level+1)
131                 copy(newCompPtrs, s.stCompPtrs)
132                 s.stCompPtrs = newCompPtrs
133         }
134         s.stCompPtrs[level] = append(internalKey{}, ik...)
135 }
136
137 // Get compaction ptr at given level; need external synchronization.
138 func (s *session) getCompPtr(level int) internalKey {
139         if level >= len(s.stCompPtrs) {
140                 return nil
141         }
142         return s.stCompPtrs[level]
143 }
144
145 // Manifest related utils.
146
147 // Fill given session record obj with current states; need external
148 // synchronization.
149 func (s *session) fillRecord(r *sessionRecord, snapshot bool) {
150         r.setNextFileNum(s.nextFileNum())
151
152         if snapshot {
153                 if !r.has(recJournalNum) {
154                         r.setJournalNum(s.stJournalNum)
155                 }
156
157                 if !r.has(recSeqNum) {
158                         r.setSeqNum(s.stSeqNum)
159                 }
160
161                 for level, ik := range s.stCompPtrs {
162                         if ik != nil {
163                                 r.addCompPtr(level, ik)
164                         }
165                 }
166
167                 r.setComparer(s.icmp.uName())
168         }
169 }
170
171 // Mark if record has been committed, this will update session state;
172 // need external synchronization.
173 func (s *session) recordCommited(rec *sessionRecord) {
174         if rec.has(recJournalNum) {
175                 s.stJournalNum = rec.journalNum
176         }
177
178         if rec.has(recPrevJournalNum) {
179                 s.stPrevJournalNum = rec.prevJournalNum
180         }
181
182         if rec.has(recSeqNum) {
183                 s.stSeqNum = rec.seqNum
184         }
185
186         for _, r := range rec.compPtrs {
187                 s.setCompPtr(r.level, internalKey(r.ikey))
188         }
189 }
190
191 // Create a new manifest file; need external synchronization.
192 func (s *session) newManifest(rec *sessionRecord, v *version) (err error) {
193         fd := storage.FileDesc{storage.TypeManifest, s.allocFileNum()}
194         writer, err := s.stor.Create(fd)
195         if err != nil {
196                 return
197         }
198         jw := journal.NewWriter(writer)
199
200         if v == nil {
201                 v = s.version()
202                 defer v.release()
203         }
204         if rec == nil {
205                 rec = &sessionRecord{}
206         }
207         s.fillRecord(rec, true)
208         v.fillRecord(rec)
209
210         defer func() {
211                 if err == nil {
212                         s.recordCommited(rec)
213                         if s.manifest != nil {
214                                 s.manifest.Close()
215                         }
216                         if s.manifestWriter != nil {
217                                 s.manifestWriter.Close()
218                         }
219                         if !s.manifestFd.Zero() {
220                                 s.stor.Remove(s.manifestFd)
221                         }
222                         s.manifestFd = fd
223                         s.manifestWriter = writer
224                         s.manifest = jw
225                 } else {
226                         writer.Close()
227                         s.stor.Remove(fd)
228                         s.reuseFileNum(fd.Num)
229                 }
230         }()
231
232         w, err := jw.Next()
233         if err != nil {
234                 return
235         }
236         err = rec.encode(w)
237         if err != nil {
238                 return
239         }
240         err = jw.Flush()
241         if err != nil {
242                 return
243         }
244         err = s.stor.SetMeta(fd)
245         return
246 }
247
248 // Flush record to disk.
249 func (s *session) flushManifest(rec *sessionRecord) (err error) {
250         s.fillRecord(rec, false)
251         w, err := s.manifest.Next()
252         if err != nil {
253                 return
254         }
255         err = rec.encode(w)
256         if err != nil {
257                 return
258         }
259         err = s.manifest.Flush()
260         if err != nil {
261                 return
262         }
263         if !s.o.GetNoSync() {
264                 err = s.manifestWriter.Sync()
265                 if err != nil {
266                         return
267                 }
268         }
269         s.recordCommited(rec)
270         return
271 }