1 // Copyright (c) 2013, Suryandaru Triandana <syndtr@gmail.com>
2 // All rights reserved.
4 // Use of this source code is governed by a BSD-style license that can be
5 // found in the LICENSE file.
14 "github.com/syndtr/goleveldb/leveldb/journal"
15 "github.com/syndtr/goleveldb/leveldb/memdb"
16 "github.com/syndtr/goleveldb/leveldb/storage"
20 errHasFrozenMem = errors.New("has frozen mem")
29 func (m *memDB) getref() int32 {
30 return atomic.LoadInt32(&m.ref)
33 func (m *memDB) incref() {
34 atomic.AddInt32(&m.ref, 1)
37 func (m *memDB) decref() {
38 if ref := atomic.AddInt32(&m.ref, -1); ref == 0 {
39 // Only put back memdb with std capacity.
40 if m.Capacity() == m.db.s.o.GetWriteBuffer() {
47 panic("negative memdb ref")
51 // Get latest sequence number.
52 func (db *DB) getSeq() uint64 {
53 return atomic.LoadUint64(&db.seq)
56 // Atomically adds delta to seq.
57 func (db *DB) addSeq(delta uint64) {
58 atomic.AddUint64(&db.seq, delta)
61 func (db *DB) setSeq(seq uint64) {
62 atomic.StoreUint64(&db.seq, seq)
65 func (db *DB) sampleSeek(ikey internalKey) {
67 if v.sampleSeek(ikey) {
68 // Trigger table compaction.
69 db.compTrigger(db.tcompCmdC)
74 func (db *DB) mpoolPut(mem *memdb.DB) {
77 case db.memPool <- mem:
83 func (db *DB) mpoolGet(n int) *memDB {
86 case mdb = <-db.memPool:
89 if mdb == nil || mdb.Capacity() < n {
90 mdb = memdb.New(db.s.icmp, maxInt(db.s.o.GetWriteBuffer(), n))
98 func (db *DB) mpoolDrain() {
99 ticker := time.NewTicker(30 * time.Second)
109 // Make sure the pool is drained.
112 case <-time.After(time.Second):
120 // Create new memdb and froze the old one; need external synchronization.
121 // newMem only called synchronously by the writer.
122 func (db *DB) newMem(n int) (mem *memDB, err error) {
123 fd := storage.FileDesc{Type: storage.TypeJournal, Num: db.s.allocFileNum()}
124 w, err := db.s.stor.Create(fd)
126 db.s.reuseFileNum(fd.Num)
131 defer db.memMu.Unlock()
133 if db.frozenMem != nil {
134 return nil, errHasFrozenMem
137 if db.journal == nil {
138 db.journal = journal.NewWriter(w)
141 db.journalWriter.Close()
142 db.frozenJournalFd = db.journalFd
146 db.frozenMem = db.mem
148 mem.incref() // for self
149 mem.incref() // for caller
151 // The seq only incremented by the writer. And whoever called newMem
152 // should hold write lock, so no need additional synchronization here.
153 db.frozenSeq = db.seq
158 func (db *DB) getMems() (e, f *memDB) {
160 defer db.memMu.RUnlock()
163 } else if !db.isClosed() {
164 panic("nil effective mem")
166 if db.frozenMem != nil {
167 db.frozenMem.incref()
169 return db.mem, db.frozenMem
172 // Get effective memdb.
173 func (db *DB) getEffectiveMem() *memDB {
175 defer db.memMu.RUnlock()
178 } else if !db.isClosed() {
179 panic("nil effective mem")
184 // Check whether we has frozen memdb.
185 func (db *DB) hasFrozenMem() bool {
187 defer db.memMu.RUnlock()
188 return db.frozenMem != nil
192 func (db *DB) getFrozenMem() *memDB {
194 defer db.memMu.RUnlock()
195 if db.frozenMem != nil {
196 db.frozenMem.incref()
201 // Drop frozen memdb; assume that frozen memdb isn't nil.
202 func (db *DB) dropFrozenMem() {
204 if err := db.s.stor.Remove(db.frozenJournalFd); err != nil {
205 db.logf("journal@remove removing @%d %q", db.frozenJournalFd.Num, err)
207 db.logf("journal@remove removed @%d", db.frozenJournalFd.Num)
209 db.frozenJournalFd = storage.FileDesc{}
210 db.frozenMem.decref()
215 // Clear mems ptr; used by DB.Close().
216 func (db *DB) clearMems() {
223 // Set closed flag; return true if not already closed.
224 func (db *DB) setClosed() bool {
225 return atomic.CompareAndSwapUint32(&db.closed, 0, 1)
228 // Check whether DB was closed.
229 func (db *DB) isClosed() bool {
230 return atomic.LoadUint32(&db.closed) != 0
233 // Check read ok status.
234 func (db *DB) ok() error {