1 // Copyright (c) 2012, Suryandaru Triandana <syndtr@gmail.com>
2 // All rights reservefs.
4 // Use of this source code is governed by a BSD-style license that can be
5 // found in the LICENSE file.
23 errFileOpen = errors.New("leveldb/storage: file still open")
24 errReadOnly = errors.New("leveldb/storage: storage is read-only")
27 type fileLock interface {
31 type fileStorageLock struct {
35 func (lock *fileStorageLock) Unlock() {
38 defer lock.fs.mu.Unlock()
39 if lock.fs.slock == lock {
45 const logSizeThreshold = 1024 * 1024 // 1 MiB
47 // fileStorage is a file-system backed storage.
48 type fileStorage struct {
54 slock *fileStorageLock
58 // Opened file counter; if open < 0 means closed.
63 // OpenFile returns a new filesytem-backed storage implementation with the given
64 // path. This also acquire a file lock, so any subsequent attempt to open the
65 // same path will fail.
67 // The storage must be closed after use, by calling Close method.
68 func OpenFile(path string, readOnly bool) (Storage, error) {
69 if fi, err := os.Stat(path); err == nil {
71 return nil, fmt.Errorf("leveldb/storage: open %s: not a directory", path)
73 } else if os.IsNotExist(err) && !readOnly {
74 if err := os.MkdirAll(path, 0755); err != nil {
81 flock, err := newFileLock(filepath.Join(path, "LOCK"), readOnly)
97 logw, err = os.OpenFile(filepath.Join(path, "LOG"), os.O_WRONLY|os.O_CREATE, 0644)
101 logSize, err = logw.Seek(0, os.SEEK_END)
115 runtime.SetFinalizer(fs, (*fileStorage).Close)
119 func (fs *fileStorage) Lock() (Locker, error) {
123 return nil, ErrClosed
126 return &fileStorageLock{}, nil
129 return nil, ErrLocked
131 fs.slock = &fileStorageLock{fs: fs}
135 func itoa(buf []byte, i int, wid int) []byte {
137 if u == 0 && wid <= 1 {
138 return append(buf, '0')
141 // Assemble decimal in reverse order.
144 for ; u > 0 || wid > 0; u /= 10 {
147 b[bp] = byte(u%10) + '0'
149 return append(buf, b[bp:]...)
152 func (fs *fileStorage) printDay(t time.Time) {
153 if fs.day == t.Day() {
157 fs.logw.Write([]byte("=============== " + t.Format("Jan 2, 2006 (MST)") + " ===============\n"))
160 func (fs *fileStorage) doLog(t time.Time, str string) {
161 if fs.logSize > logSizeThreshold {
166 rename(filepath.Join(fs.path, "LOG"), filepath.Join(fs.path, "LOG.old"))
170 fs.logw, err = os.OpenFile(filepath.Join(fs.path, "LOG"), os.O_WRONLY|os.O_CREATE, 0644)
174 // Force printDay on new log file.
178 hour, min, sec := t.Clock()
179 msec := t.Nanosecond() / 1e3
181 fs.buf = itoa(fs.buf[:0], hour, 2)
182 fs.buf = append(fs.buf, ':')
183 fs.buf = itoa(fs.buf, min, 2)
184 fs.buf = append(fs.buf, ':')
185 fs.buf = itoa(fs.buf, sec, 2)
186 fs.buf = append(fs.buf, '.')
187 fs.buf = itoa(fs.buf, msec, 6)
188 fs.buf = append(fs.buf, ' ')
190 fs.buf = append(fs.buf, []byte(str)...)
191 fs.buf = append(fs.buf, '\n')
192 fs.logw.Write(fs.buf)
195 func (fs *fileStorage) Log(str string) {
207 func (fs *fileStorage) log(str string) {
209 fs.doLog(time.Now(), str)
213 func (fs *fileStorage) SetMeta(fd FileDesc) (err error) {
215 return ErrInvalidFile
228 fs.log(fmt.Sprintf("CURRENT: %v", err))
231 path := fmt.Sprintf("%s.%d", filepath.Join(fs.path, "CURRENT"), fd.Num)
232 w, err := os.OpenFile(path, os.O_WRONLY|os.O_CREATE|os.O_TRUNC, 0644)
236 _, err = fmt.Fprintln(w, fsGenName(fd))
238 fs.log(fmt.Sprintf("write CURRENT.%d: %v", fd.Num, err))
241 if err = w.Sync(); err != nil {
242 fs.log(fmt.Sprintf("flush CURRENT.%d: %v", fd.Num, err))
245 if err = w.Close(); err != nil {
246 fs.log(fmt.Sprintf("close CURRENT.%d: %v", fd.Num, err))
252 if err = rename(path, filepath.Join(fs.path, "CURRENT")); err != nil {
253 fs.log(fmt.Sprintf("rename CURRENT.%d: %v", fd.Num, err))
256 // Sync root directory.
257 if err = syncDir(fs.path); err != nil {
258 fs.log(fmt.Sprintf("syncDir: %v", err))
263 func (fs *fileStorage) GetMeta() (fd FileDesc, err error) {
267 return FileDesc{}, ErrClosed
269 dir, err := os.Open(fs.path)
273 names, err := dir.Readdirnames(0)
274 // Close the dir first before checking for Readdirnames error.
275 if ce := dir.Close(); ce != nil {
276 fs.log(fmt.Sprintf("close dir: %v", ce))
281 // Find latest CURRENT file.
285 for _, name := range names {
286 if strings.HasPrefix(name, "CURRENT") {
287 pend1 := len(name) > 7
289 // Make sure it is valid name for a CURRENT file, otherwise skip it.
291 if name[7] != '.' || len(name) < 9 {
292 fs.log(fmt.Sprintf("skipping %s: invalid file name", name))
296 if pendNum, e1 = strconv.ParseInt(name[8:], 10, 0); e1 != nil {
297 fs.log(fmt.Sprintf("skipping %s: invalid file num: %v", name, e1))
301 path := filepath.Join(fs.path, name)
302 r, e1 := os.OpenFile(path, os.O_RDONLY, 0)
304 return FileDesc{}, e1
306 b, e1 := ioutil.ReadAll(r)
309 return FileDesc{}, e1
312 if len(b) < 1 || b[len(b)-1] != '\n' || !fsParseNamePtr(string(b[:len(b)-1]), &fd1) {
313 fs.log(fmt.Sprintf("skipping %s: corrupted or incomplete", name))
315 rem = append(rem, name)
317 if !pend1 || cerr == nil {
318 metaFd, _ := fsParseName(name)
319 cerr = &ErrCorrupted{
321 Err: errors.New("leveldb/storage: corrupted or incomplete meta file"),
324 } else if pend1 && pendNum != fd1.Num {
325 fs.log(fmt.Sprintf("skipping %s: inconsistent pending-file num: %d vs %d", name, pendNum, fd1.Num))
326 rem = append(rem, name)
327 } else if fd1.Num < fd.Num {
328 fs.log(fmt.Sprintf("skipping %s: obsolete", name))
330 rem = append(rem, name)
336 if err := r.Close(); err != nil {
337 fs.log(fmt.Sprintf("close %s: %v", name, err))
341 // Don't remove any files if there is no valid CURRENT file.
351 // Rename pending CURRENT file to an effective CURRENT.
353 path := fmt.Sprintf("%s.%d", filepath.Join(fs.path, "CURRENT"), fd.Num)
354 if err := rename(path, filepath.Join(fs.path, "CURRENT")); err != nil {
355 fs.log(fmt.Sprintf("CURRENT.%d -> CURRENT: %v", fd.Num, err))
358 // Remove obsolete or incomplete pending CURRENT files.
359 for _, name := range rem {
360 path := filepath.Join(fs.path, name)
361 if err := os.Remove(path); err != nil {
362 fs.log(fmt.Sprintf("remove %s: %v", name, err))
369 func (fs *fileStorage) List(ft FileType) (fds []FileDesc, err error) {
373 return nil, ErrClosed
375 dir, err := os.Open(fs.path)
379 names, err := dir.Readdirnames(0)
380 // Close the dir first before checking for Readdirnames error.
381 if cerr := dir.Close(); cerr != nil {
382 fs.log(fmt.Sprintf("close dir: %v", cerr))
385 for _, name := range names {
386 if fd, ok := fsParseName(name); ok && fd.Type&ft != 0 {
387 fds = append(fds, fd)
394 func (fs *fileStorage) Open(fd FileDesc) (Reader, error) {
396 return nil, ErrInvalidFile
402 return nil, ErrClosed
404 of, err := os.OpenFile(filepath.Join(fs.path, fsGenName(fd)), os.O_RDONLY, 0)
406 if fsHasOldName(fd) && os.IsNotExist(err) {
407 of, err = os.OpenFile(filepath.Join(fs.path, fsGenOldName(fd)), os.O_RDONLY, 0)
416 return &fileWrap{File: of, fs: fs, fd: fd}, nil
419 func (fs *fileStorage) Create(fd FileDesc) (Writer, error) {
421 return nil, ErrInvalidFile
424 return nil, errReadOnly
430 return nil, ErrClosed
432 of, err := os.OpenFile(filepath.Join(fs.path, fsGenName(fd)), os.O_WRONLY|os.O_CREATE|os.O_TRUNC, 0644)
437 return &fileWrap{File: of, fs: fs, fd: fd}, nil
440 func (fs *fileStorage) Remove(fd FileDesc) error {
442 return ErrInvalidFile
453 err := os.Remove(filepath.Join(fs.path, fsGenName(fd)))
455 if fsHasOldName(fd) && os.IsNotExist(err) {
456 if e1 := os.Remove(filepath.Join(fs.path, fsGenOldName(fd))); !os.IsNotExist(e1) {
457 fs.log(fmt.Sprintf("remove %s: %v (old name)", fd, err))
461 fs.log(fmt.Sprintf("remove %s: %v", fd, err))
467 func (fs *fileStorage) Rename(oldfd, newfd FileDesc) error {
468 if !FileDescOk(oldfd) || !FileDescOk(newfd) {
469 return ErrInvalidFile
483 return rename(filepath.Join(fs.path, fsGenName(oldfd)), filepath.Join(fs.path, fsGenName(newfd)))
486 func (fs *fileStorage) Close() error {
492 // Clear the finalizer.
493 runtime.SetFinalizer(fs, nil)
496 fs.log(fmt.Sprintf("close: warning, %d files still open", fs.open))
502 return fs.flock.release()
505 type fileWrap struct {
512 func (fw *fileWrap) Sync() error {
513 if err := fw.File.Sync(); err != nil {
516 if fw.fd.Type == TypeManifest {
517 // Also sync parent directory if file type is manifest.
518 // See: https://code.google.com/p/leveldb/issues/detail?id=190.
519 if err := syncDir(fw.fs.path); err != nil {
520 fw.fs.log(fmt.Sprintf("syncDir: %v", err))
527 func (fw *fileWrap) Close() error {
529 defer fw.fs.mu.Unlock()
535 err := fw.File.Close()
537 fw.fs.log(fmt.Sprintf("close %s: %v", fw.fd, err))
542 func fsGenName(fd FileDesc) string {
545 return fmt.Sprintf("MANIFEST-%06d", fd.Num)
547 return fmt.Sprintf("%06d.log", fd.Num)
549 return fmt.Sprintf("%06d.ldb", fd.Num)
551 return fmt.Sprintf("%06d.tmp", fd.Num)
553 panic("invalid file type")
557 func fsHasOldName(fd FileDesc) bool {
558 return fd.Type == TypeTable
561 func fsGenOldName(fd FileDesc) string {
564 return fmt.Sprintf("%06d.sst", fd.Num)
569 func fsParseName(name string) (fd FileDesc, ok bool) {
571 _, err := fmt.Sscanf(name, "%d.%s", &fd.Num, &tail)
575 fd.Type = TypeJournal
585 n, _ := fmt.Sscanf(name, "MANIFEST-%d%s", &fd.Num, &tail)
587 fd.Type = TypeManifest
593 func fsParseNamePtr(name string, fd *FileDesc) bool {
594 _fd, ok := fsParseName(name)