OSDN Git Service

Hulk did something
[bytom/vapor.git] / vendor / github.com / syndtr / goleveldb / leveldb / db_write.go
1 // Copyright (c) 2012, Suryandaru Triandana <syndtr@gmail.com>
2 // All rights reserved.
3 //
4 // Use of this source code is governed by a BSD-style license that can be
5 // found in the LICENSE file.
6
7 package leveldb
8
9 import (
10         "time"
11
12         "github.com/syndtr/goleveldb/leveldb/memdb"
13         "github.com/syndtr/goleveldb/leveldb/opt"
14         "github.com/syndtr/goleveldb/leveldb/util"
15 )
16
17 func (db *DB) writeJournal(batches []*Batch, seq uint64, sync bool) error {
18         wr, err := db.journal.Next()
19         if err != nil {
20                 return err
21         }
22         if err := writeBatchesWithHeader(wr, batches, seq); err != nil {
23                 return err
24         }
25         if err := db.journal.Flush(); err != nil {
26                 return err
27         }
28         if sync {
29                 return db.journalWriter.Sync()
30         }
31         return nil
32 }
33
34 func (db *DB) rotateMem(n int, wait bool) (mem *memDB, err error) {
35         retryLimit := 3
36 retry:
37         // Wait for pending memdb compaction.
38         err = db.compTriggerWait(db.mcompCmdC)
39         if err != nil {
40                 return
41         }
42         retryLimit--
43
44         // Create new memdb and journal.
45         mem, err = db.newMem(n)
46         if err != nil {
47                 if err == errHasFrozenMem {
48                         if retryLimit <= 0 {
49                                 panic("BUG: still has frozen memdb")
50                         }
51                         goto retry
52                 }
53                 return
54         }
55
56         // Schedule memdb compaction.
57         if wait {
58                 err = db.compTriggerWait(db.mcompCmdC)
59         } else {
60                 db.compTrigger(db.mcompCmdC)
61         }
62         return
63 }
64
65 func (db *DB) flush(n int) (mdb *memDB, mdbFree int, err error) {
66         delayed := false
67         slowdownTrigger := db.s.o.GetWriteL0SlowdownTrigger()
68         pauseTrigger := db.s.o.GetWriteL0PauseTrigger()
69         flush := func() (retry bool) {
70                 mdb = db.getEffectiveMem()
71                 if mdb == nil {
72                         err = ErrClosed
73                         return false
74                 }
75                 defer func() {
76                         if retry {
77                                 mdb.decref()
78                                 mdb = nil
79                         }
80                 }()
81                 tLen := db.s.tLen(0)
82                 mdbFree = mdb.Free()
83                 switch {
84                 case tLen >= slowdownTrigger && !delayed:
85                         delayed = true
86                         time.Sleep(time.Millisecond)
87                 case mdbFree >= n:
88                         return false
89                 case tLen >= pauseTrigger:
90                         delayed = true
91                         err = db.compTriggerWait(db.tcompCmdC)
92                         if err != nil {
93                                 return false
94                         }
95                 default:
96                         // Allow memdb to grow if it has no entry.
97                         if mdb.Len() == 0 {
98                                 mdbFree = n
99                         } else {
100                                 mdb.decref()
101                                 mdb, err = db.rotateMem(n, false)
102                                 if err == nil {
103                                         mdbFree = mdb.Free()
104                                 } else {
105                                         mdbFree = 0
106                                 }
107                         }
108                         return false
109                 }
110                 return true
111         }
112         start := time.Now()
113         for flush() {
114         }
115         if delayed {
116                 db.writeDelay += time.Since(start)
117                 db.writeDelayN++
118         } else if db.writeDelayN > 0 {
119                 db.logf("db@write was delayed N·%d T·%v", db.writeDelayN, db.writeDelay)
120                 db.writeDelay = 0
121                 db.writeDelayN = 0
122         }
123         return
124 }
125
126 type writeMerge struct {
127         sync       bool
128         batch      *Batch
129         keyType    keyType
130         key, value []byte
131 }
132
133 func (db *DB) unlockWrite(overflow bool, merged int, err error) {
134         for i := 0; i < merged; i++ {
135                 db.writeAckC <- err
136         }
137         if overflow {
138                 // Pass lock to the next write (that failed to merge).
139                 db.writeMergedC <- false
140         } else {
141                 // Release lock.
142                 <-db.writeLockC
143         }
144 }
145
146 // ourBatch if defined should equal with batch.
147 func (db *DB) writeLocked(batch, ourBatch *Batch, merge, sync bool) error {
148         // Try to flush memdb. This method would also trying to throttle writes
149         // if it is too fast and compaction cannot catch-up.
150         mdb, mdbFree, err := db.flush(batch.internalLen)
151         if err != nil {
152                 db.unlockWrite(false, 0, err)
153                 return err
154         }
155         defer mdb.decref()
156
157         var (
158                 overflow bool
159                 merged   int
160                 batches  = []*Batch{batch}
161         )
162
163         if merge {
164                 // Merge limit.
165                 var mergeLimit int
166                 if batch.internalLen > 128<<10 {
167                         mergeLimit = (1 << 20) - batch.internalLen
168                 } else {
169                         mergeLimit = 128 << 10
170                 }
171                 mergeCap := mdbFree - batch.internalLen
172                 if mergeLimit > mergeCap {
173                         mergeLimit = mergeCap
174                 }
175
176         merge:
177                 for mergeLimit > 0 {
178                         select {
179                         case incoming := <-db.writeMergeC:
180                                 if incoming.batch != nil {
181                                         // Merge batch.
182                                         if incoming.batch.internalLen > mergeLimit {
183                                                 overflow = true
184                                                 break merge
185                                         }
186                                         batches = append(batches, incoming.batch)
187                                         mergeLimit -= incoming.batch.internalLen
188                                 } else {
189                                         // Merge put.
190                                         internalLen := len(incoming.key) + len(incoming.value) + 8
191                                         if internalLen > mergeLimit {
192                                                 overflow = true
193                                                 break merge
194                                         }
195                                         if ourBatch == nil {
196                                                 ourBatch = db.batchPool.Get().(*Batch)
197                                                 ourBatch.Reset()
198                                                 batches = append(batches, ourBatch)
199                                         }
200                                         // We can use same batch since concurrent write doesn't
201                                         // guarantee write order.
202                                         ourBatch.appendRec(incoming.keyType, incoming.key, incoming.value)
203                                         mergeLimit -= internalLen
204                                 }
205                                 sync = sync || incoming.sync
206                                 merged++
207                                 db.writeMergedC <- true
208
209                         default:
210                                 break merge
211                         }
212                 }
213         }
214
215         // Seq number.
216         seq := db.seq + 1
217
218         // Write journal.
219         if err := db.writeJournal(batches, seq, sync); err != nil {
220                 db.unlockWrite(overflow, merged, err)
221                 return err
222         }
223
224         // Put batches.
225         for _, batch := range batches {
226                 if err := batch.putMem(seq, mdb.DB); err != nil {
227                         panic(err)
228                 }
229                 seq += uint64(batch.Len())
230         }
231
232         // Incr seq number.
233         db.addSeq(uint64(batchesLen(batches)))
234
235         // Rotate memdb if it's reach the threshold.
236         if batch.internalLen >= mdbFree {
237                 db.rotateMem(0, false)
238         }
239
240         db.unlockWrite(overflow, merged, nil)
241         return nil
242 }
243
244 // Write apply the given batch to the DB. The batch records will be applied
245 // sequentially. Write might be used concurrently, when used concurrently and
246 // batch is small enough, write will try to merge the batches. Set NoWriteMerge
247 // option to true to disable write merge.
248 //
249 // It is safe to modify the contents of the arguments after Write returns but
250 // not before. Write will not modify content of the batch.
251 func (db *DB) Write(batch *Batch, wo *opt.WriteOptions) error {
252         if err := db.ok(); err != nil || batch == nil || batch.Len() == 0 {
253                 return err
254         }
255
256         // If the batch size is larger than write buffer, it may justified to write
257         // using transaction instead. Using transaction the batch will be written
258         // into tables directly, skipping the journaling.
259         if batch.internalLen > db.s.o.GetWriteBuffer() && !db.s.o.GetDisableLargeBatchTransaction() {
260                 tr, err := db.OpenTransaction()
261                 if err != nil {
262                         return err
263                 }
264                 if err := tr.Write(batch, wo); err != nil {
265                         tr.Discard()
266                         return err
267                 }
268                 return tr.Commit()
269         }
270
271         merge := !wo.GetNoWriteMerge() && !db.s.o.GetNoWriteMerge()
272         sync := wo.GetSync() && !db.s.o.GetNoSync()
273
274         // Acquire write lock.
275         if merge {
276                 select {
277                 case db.writeMergeC <- writeMerge{sync: sync, batch: batch}:
278                         if <-db.writeMergedC {
279                                 // Write is merged.
280                                 return <-db.writeAckC
281                         }
282                         // Write is not merged, the write lock is handed to us. Continue.
283                 case db.writeLockC <- struct{}{}:
284                         // Write lock acquired.
285                 case err := <-db.compPerErrC:
286                         // Compaction error.
287                         return err
288                 case <-db.closeC:
289                         // Closed
290                         return ErrClosed
291                 }
292         } else {
293                 select {
294                 case db.writeLockC <- struct{}{}:
295                         // Write lock acquired.
296                 case err := <-db.compPerErrC:
297                         // Compaction error.
298                         return err
299                 case <-db.closeC:
300                         // Closed
301                         return ErrClosed
302                 }
303         }
304
305         return db.writeLocked(batch, nil, merge, sync)
306 }
307
308 func (db *DB) putRec(kt keyType, key, value []byte, wo *opt.WriteOptions) error {
309         if err := db.ok(); err != nil {
310                 return err
311         }
312
313         merge := !wo.GetNoWriteMerge() && !db.s.o.GetNoWriteMerge()
314         sync := wo.GetSync() && !db.s.o.GetNoSync()
315
316         // Acquire write lock.
317         if merge {
318                 select {
319                 case db.writeMergeC <- writeMerge{sync: sync, keyType: kt, key: key, value: value}:
320                         if <-db.writeMergedC {
321                                 // Write is merged.
322                                 return <-db.writeAckC
323                         }
324                         // Write is not merged, the write lock is handed to us. Continue.
325                 case db.writeLockC <- struct{}{}:
326                         // Write lock acquired.
327                 case err := <-db.compPerErrC:
328                         // Compaction error.
329                         return err
330                 case <-db.closeC:
331                         // Closed
332                         return ErrClosed
333                 }
334         } else {
335                 select {
336                 case db.writeLockC <- struct{}{}:
337                         // Write lock acquired.
338                 case err := <-db.compPerErrC:
339                         // Compaction error.
340                         return err
341                 case <-db.closeC:
342                         // Closed
343                         return ErrClosed
344                 }
345         }
346
347         batch := db.batchPool.Get().(*Batch)
348         batch.Reset()
349         batch.appendRec(kt, key, value)
350         return db.writeLocked(batch, batch, merge, sync)
351 }
352
353 // Put sets the value for the given key. It overwrites any previous value
354 // for that key; a DB is not a multi-map. Write merge also applies for Put, see
355 // Write.
356 //
357 // It is safe to modify the contents of the arguments after Put returns but not
358 // before.
359 func (db *DB) Put(key, value []byte, wo *opt.WriteOptions) error {
360         return db.putRec(keyTypeVal, key, value, wo)
361 }
362
363 // Delete deletes the value for the given key. Delete will not returns error if
364 // key doesn't exist. Write merge also applies for Delete, see Write.
365 //
366 // It is safe to modify the contents of the arguments after Delete returns but
367 // not before.
368 func (db *DB) Delete(key []byte, wo *opt.WriteOptions) error {
369         return db.putRec(keyTypeDel, key, nil, wo)
370 }
371
372 func isMemOverlaps(icmp *iComparer, mem *memdb.DB, min, max []byte) bool {
373         iter := mem.NewIterator(nil)
374         defer iter.Release()
375         return (max == nil || (iter.First() && icmp.uCompare(max, internalKey(iter.Key()).ukey()) >= 0)) &&
376                 (min == nil || (iter.Last() && icmp.uCompare(min, internalKey(iter.Key()).ukey()) <= 0))
377 }
378
379 // CompactRange compacts the underlying DB for the given key range.
380 // In particular, deleted and overwritten versions are discarded,
381 // and the data is rearranged to reduce the cost of operations
382 // needed to access the data. This operation should typically only
383 // be invoked by users who understand the underlying implementation.
384 //
385 // A nil Range.Start is treated as a key before all keys in the DB.
386 // And a nil Range.Limit is treated as a key after all keys in the DB.
387 // Therefore if both is nil then it will compact entire DB.
388 func (db *DB) CompactRange(r util.Range) error {
389         if err := db.ok(); err != nil {
390                 return err
391         }
392
393         // Lock writer.
394         select {
395         case db.writeLockC <- struct{}{}:
396         case err := <-db.compPerErrC:
397                 return err
398         case <-db.closeC:
399                 return ErrClosed
400         }
401
402         // Check for overlaps in memdb.
403         mdb := db.getEffectiveMem()
404         if mdb == nil {
405                 return ErrClosed
406         }
407         defer mdb.decref()
408         if isMemOverlaps(db.s.icmp, mdb.DB, r.Start, r.Limit) {
409                 // Memdb compaction.
410                 if _, err := db.rotateMem(0, false); err != nil {
411                         <-db.writeLockC
412                         return err
413                 }
414                 <-db.writeLockC
415                 if err := db.compTriggerWait(db.mcompCmdC); err != nil {
416                         return err
417                 }
418         } else {
419                 <-db.writeLockC
420         }
421
422         // Table compaction.
423         return db.compTriggerRange(db.tcompCmdC, -1, r.Start, r.Limit)
424 }
425
426 // SetReadOnly makes DB read-only. It will stay read-only until reopened.
427 func (db *DB) SetReadOnly() error {
428         if err := db.ok(); err != nil {
429                 return err
430         }
431
432         // Lock writer.
433         select {
434         case db.writeLockC <- struct{}{}:
435                 db.compWriteLocking = true
436         case err := <-db.compPerErrC:
437                 return err
438         case <-db.closeC:
439                 return ErrClosed
440         }
441
442         // Set compaction read-only.
443         select {
444         case db.compErrSetC <- ErrReadOnly:
445         case perr := <-db.compPerErrC:
446                 return perr
447         case <-db.closeC:
448                 return ErrClosed
449         }
450
451         return nil
452 }