OSDN Git Service

Hulk did something
[bytom/vapor.git] / vendor / github.com / syndtr / goleveldb / leveldb / db_state.go
1 // Copyright (c) 2013, Suryandaru Triandana <syndtr@gmail.com>
2 // All rights reserved.
3 //
4 // Use of this source code is governed by a BSD-style license that can be
5 // found in the LICENSE file.
6
7 package leveldb
8
9 import (
10         "errors"
11         "sync/atomic"
12         "time"
13
14         "github.com/syndtr/goleveldb/leveldb/journal"
15         "github.com/syndtr/goleveldb/leveldb/memdb"
16         "github.com/syndtr/goleveldb/leveldb/storage"
17 )
18
19 var (
20         errHasFrozenMem = errors.New("has frozen mem")
21 )
22
23 type memDB struct {
24         db *DB
25         *memdb.DB
26         ref int32
27 }
28
29 func (m *memDB) getref() int32 {
30         return atomic.LoadInt32(&m.ref)
31 }
32
33 func (m *memDB) incref() {
34         atomic.AddInt32(&m.ref, 1)
35 }
36
37 func (m *memDB) decref() {
38         if ref := atomic.AddInt32(&m.ref, -1); ref == 0 {
39                 // Only put back memdb with std capacity.
40                 if m.Capacity() == m.db.s.o.GetWriteBuffer() {
41                         m.Reset()
42                         m.db.mpoolPut(m.DB)
43                 }
44                 m.db = nil
45                 m.DB = nil
46         } else if ref < 0 {
47                 panic("negative memdb ref")
48         }
49 }
50
51 // Get latest sequence number.
52 func (db *DB) getSeq() uint64 {
53         return atomic.LoadUint64(&db.seq)
54 }
55
56 // Atomically adds delta to seq.
57 func (db *DB) addSeq(delta uint64) {
58         atomic.AddUint64(&db.seq, delta)
59 }
60
61 func (db *DB) setSeq(seq uint64) {
62         atomic.StoreUint64(&db.seq, seq)
63 }
64
65 func (db *DB) sampleSeek(ikey internalKey) {
66         v := db.s.version()
67         if v.sampleSeek(ikey) {
68                 // Trigger table compaction.
69                 db.compTrigger(db.tcompCmdC)
70         }
71         v.release()
72 }
73
74 func (db *DB) mpoolPut(mem *memdb.DB) {
75         if !db.isClosed() {
76                 select {
77                 case db.memPool <- mem:
78                 default:
79                 }
80         }
81 }
82
83 func (db *DB) mpoolGet(n int) *memDB {
84         var mdb *memdb.DB
85         select {
86         case mdb = <-db.memPool:
87         default:
88         }
89         if mdb == nil || mdb.Capacity() < n {
90                 mdb = memdb.New(db.s.icmp, maxInt(db.s.o.GetWriteBuffer(), n))
91         }
92         return &memDB{
93                 db: db,
94                 DB: mdb,
95         }
96 }
97
98 func (db *DB) mpoolDrain() {
99         ticker := time.NewTicker(30 * time.Second)
100         for {
101                 select {
102                 case <-ticker.C:
103                         select {
104                         case <-db.memPool:
105                         default:
106                         }
107                 case <-db.closeC:
108                         ticker.Stop()
109                         // Make sure the pool is drained.
110                         select {
111                         case <-db.memPool:
112                         case <-time.After(time.Second):
113                         }
114                         close(db.memPool)
115                         return
116                 }
117         }
118 }
119
120 // Create new memdb and froze the old one; need external synchronization.
121 // newMem only called synchronously by the writer.
122 func (db *DB) newMem(n int) (mem *memDB, err error) {
123         fd := storage.FileDesc{Type: storage.TypeJournal, Num: db.s.allocFileNum()}
124         w, err := db.s.stor.Create(fd)
125         if err != nil {
126                 db.s.reuseFileNum(fd.Num)
127                 return
128         }
129
130         db.memMu.Lock()
131         defer db.memMu.Unlock()
132
133         if db.frozenMem != nil {
134                 return nil, errHasFrozenMem
135         }
136
137         if db.journal == nil {
138                 db.journal = journal.NewWriter(w)
139         } else {
140                 db.journal.Reset(w)
141                 db.journalWriter.Close()
142                 db.frozenJournalFd = db.journalFd
143         }
144         db.journalWriter = w
145         db.journalFd = fd
146         db.frozenMem = db.mem
147         mem = db.mpoolGet(n)
148         mem.incref() // for self
149         mem.incref() // for caller
150         db.mem = mem
151         // The seq only incremented by the writer. And whoever called newMem
152         // should hold write lock, so no need additional synchronization here.
153         db.frozenSeq = db.seq
154         return
155 }
156
157 // Get all memdbs.
158 func (db *DB) getMems() (e, f *memDB) {
159         db.memMu.RLock()
160         defer db.memMu.RUnlock()
161         if db.mem != nil {
162                 db.mem.incref()
163         } else if !db.isClosed() {
164                 panic("nil effective mem")
165         }
166         if db.frozenMem != nil {
167                 db.frozenMem.incref()
168         }
169         return db.mem, db.frozenMem
170 }
171
172 // Get effective memdb.
173 func (db *DB) getEffectiveMem() *memDB {
174         db.memMu.RLock()
175         defer db.memMu.RUnlock()
176         if db.mem != nil {
177                 db.mem.incref()
178         } else if !db.isClosed() {
179                 panic("nil effective mem")
180         }
181         return db.mem
182 }
183
184 // Check whether we has frozen memdb.
185 func (db *DB) hasFrozenMem() bool {
186         db.memMu.RLock()
187         defer db.memMu.RUnlock()
188         return db.frozenMem != nil
189 }
190
191 // Get frozen memdb.
192 func (db *DB) getFrozenMem() *memDB {
193         db.memMu.RLock()
194         defer db.memMu.RUnlock()
195         if db.frozenMem != nil {
196                 db.frozenMem.incref()
197         }
198         return db.frozenMem
199 }
200
201 // Drop frozen memdb; assume that frozen memdb isn't nil.
202 func (db *DB) dropFrozenMem() {
203         db.memMu.Lock()
204         if err := db.s.stor.Remove(db.frozenJournalFd); err != nil {
205                 db.logf("journal@remove removing @%d %q", db.frozenJournalFd.Num, err)
206         } else {
207                 db.logf("journal@remove removed @%d", db.frozenJournalFd.Num)
208         }
209         db.frozenJournalFd = storage.FileDesc{}
210         db.frozenMem.decref()
211         db.frozenMem = nil
212         db.memMu.Unlock()
213 }
214
215 // Clear mems ptr; used by DB.Close().
216 func (db *DB) clearMems() {
217         db.memMu.Lock()
218         db.mem = nil
219         db.frozenMem = nil
220         db.memMu.Unlock()
221 }
222
223 // Set closed flag; return true if not already closed.
224 func (db *DB) setClosed() bool {
225         return atomic.CompareAndSwapUint32(&db.closed, 0, 1)
226 }
227
228 // Check whether DB was closed.
229 func (db *DB) isClosed() bool {
230         return atomic.LoadUint32(&db.closed) != 0
231 }
232
233 // Check read ok status.
234 func (db *DB) ok() error {
235         if db.isClosed() {
236                 return ErrClosed
237         }
238         return nil
239 }