OSDN Git Service

fix commands
[bytom/shuttle.git] / vendor / github.com / bytom / vendor / github.com / syndtr / goleveldb / leveldb / db_state.go
diff --git a/vendor/github.com/bytom/vendor/github.com/syndtr/goleveldb/leveldb/db_state.go b/vendor/github.com/bytom/vendor/github.com/syndtr/goleveldb/leveldb/db_state.go
new file mode 100644 (file)
index 0000000..65e1c54
--- /dev/null
@@ -0,0 +1,239 @@
+// Copyright (c) 2013, Suryandaru Triandana <syndtr@gmail.com>
+// All rights reserved.
+//
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+package leveldb
+
+import (
+       "errors"
+       "sync/atomic"
+       "time"
+
+       "github.com/syndtr/goleveldb/leveldb/journal"
+       "github.com/syndtr/goleveldb/leveldb/memdb"
+       "github.com/syndtr/goleveldb/leveldb/storage"
+)
+
+var (
+       errHasFrozenMem = errors.New("has frozen mem")
+)
+
+type memDB struct {
+       db *DB
+       *memdb.DB
+       ref int32
+}
+
+func (m *memDB) getref() int32 {
+       return atomic.LoadInt32(&m.ref)
+}
+
+func (m *memDB) incref() {
+       atomic.AddInt32(&m.ref, 1)
+}
+
+func (m *memDB) decref() {
+       if ref := atomic.AddInt32(&m.ref, -1); ref == 0 {
+               // Only put back memdb with std capacity.
+               if m.Capacity() == m.db.s.o.GetWriteBuffer() {
+                       m.Reset()
+                       m.db.mpoolPut(m.DB)
+               }
+               m.db = nil
+               m.DB = nil
+       } else if ref < 0 {
+               panic("negative memdb ref")
+       }
+}
+
+// Get latest sequence number.
+func (db *DB) getSeq() uint64 {
+       return atomic.LoadUint64(&db.seq)
+}
+
+// Atomically adds delta to seq.
+func (db *DB) addSeq(delta uint64) {
+       atomic.AddUint64(&db.seq, delta)
+}
+
+func (db *DB) setSeq(seq uint64) {
+       atomic.StoreUint64(&db.seq, seq)
+}
+
+func (db *DB) sampleSeek(ikey internalKey) {
+       v := db.s.version()
+       if v.sampleSeek(ikey) {
+               // Trigger table compaction.
+               db.compTrigger(db.tcompCmdC)
+       }
+       v.release()
+}
+
+func (db *DB) mpoolPut(mem *memdb.DB) {
+       if !db.isClosed() {
+               select {
+               case db.memPool <- mem:
+               default:
+               }
+       }
+}
+
+func (db *DB) mpoolGet(n int) *memDB {
+       var mdb *memdb.DB
+       select {
+       case mdb = <-db.memPool:
+       default:
+       }
+       if mdb == nil || mdb.Capacity() < n {
+               mdb = memdb.New(db.s.icmp, maxInt(db.s.o.GetWriteBuffer(), n))
+       }
+       return &memDB{
+               db: db,
+               DB: mdb,
+       }
+}
+
+func (db *DB) mpoolDrain() {
+       ticker := time.NewTicker(30 * time.Second)
+       for {
+               select {
+               case <-ticker.C:
+                       select {
+                       case <-db.memPool:
+                       default:
+                       }
+               case <-db.closeC:
+                       ticker.Stop()
+                       // Make sure the pool is drained.
+                       select {
+                       case <-db.memPool:
+                       case <-time.After(time.Second):
+                       }
+                       close(db.memPool)
+                       return
+               }
+       }
+}
+
+// Create new memdb and froze the old one; need external synchronization.
+// newMem only called synchronously by the writer.
+func (db *DB) newMem(n int) (mem *memDB, err error) {
+       fd := storage.FileDesc{Type: storage.TypeJournal, Num: db.s.allocFileNum()}
+       w, err := db.s.stor.Create(fd)
+       if err != nil {
+               db.s.reuseFileNum(fd.Num)
+               return
+       }
+
+       db.memMu.Lock()
+       defer db.memMu.Unlock()
+
+       if db.frozenMem != nil {
+               return nil, errHasFrozenMem
+       }
+
+       if db.journal == nil {
+               db.journal = journal.NewWriter(w)
+       } else {
+               db.journal.Reset(w)
+               db.journalWriter.Close()
+               db.frozenJournalFd = db.journalFd
+       }
+       db.journalWriter = w
+       db.journalFd = fd
+       db.frozenMem = db.mem
+       mem = db.mpoolGet(n)
+       mem.incref() // for self
+       mem.incref() // for caller
+       db.mem = mem
+       // The seq only incremented by the writer. And whoever called newMem
+       // should hold write lock, so no need additional synchronization here.
+       db.frozenSeq = db.seq
+       return
+}
+
+// Get all memdbs.
+func (db *DB) getMems() (e, f *memDB) {
+       db.memMu.RLock()
+       defer db.memMu.RUnlock()
+       if db.mem != nil {
+               db.mem.incref()
+       } else if !db.isClosed() {
+               panic("nil effective mem")
+       }
+       if db.frozenMem != nil {
+               db.frozenMem.incref()
+       }
+       return db.mem, db.frozenMem
+}
+
+// Get effective memdb.
+func (db *DB) getEffectiveMem() *memDB {
+       db.memMu.RLock()
+       defer db.memMu.RUnlock()
+       if db.mem != nil {
+               db.mem.incref()
+       } else if !db.isClosed() {
+               panic("nil effective mem")
+       }
+       return db.mem
+}
+
+// Check whether we has frozen memdb.
+func (db *DB) hasFrozenMem() bool {
+       db.memMu.RLock()
+       defer db.memMu.RUnlock()
+       return db.frozenMem != nil
+}
+
+// Get frozen memdb.
+func (db *DB) getFrozenMem() *memDB {
+       db.memMu.RLock()
+       defer db.memMu.RUnlock()
+       if db.frozenMem != nil {
+               db.frozenMem.incref()
+       }
+       return db.frozenMem
+}
+
+// Drop frozen memdb; assume that frozen memdb isn't nil.
+func (db *DB) dropFrozenMem() {
+       db.memMu.Lock()
+       if err := db.s.stor.Remove(db.frozenJournalFd); err != nil {
+               db.logf("journal@remove removing @%d %q", db.frozenJournalFd.Num, err)
+       } else {
+               db.logf("journal@remove removed @%d", db.frozenJournalFd.Num)
+       }
+       db.frozenJournalFd = storage.FileDesc{}
+       db.frozenMem.decref()
+       db.frozenMem = nil
+       db.memMu.Unlock()
+}
+
+// Clear mems ptr; used by DB.Close().
+func (db *DB) clearMems() {
+       db.memMu.Lock()
+       db.mem = nil
+       db.frozenMem = nil
+       db.memMu.Unlock()
+}
+
+// Set closed flag; return true if not already closed.
+func (db *DB) setClosed() bool {
+       return atomic.CompareAndSwapUint32(&db.closed, 0, 1)
+}
+
+// Check whether DB was closed.
+func (db *DB) isClosed() bool {
+       return atomic.LoadUint32(&db.closed) != 0
+}
+
+// Check read ok status.
+func (db *DB) ok() error {
+       if db.isClosed() {
+               return ErrClosed
+       }
+       return nil
+}