OSDN Git Service

Hulk did something
[bytom/vapor.git] / vendor / github.com / syndtr / goleveldb / leveldb / journal / journal.go
diff --git a/vendor/github.com/syndtr/goleveldb/leveldb/journal/journal.go b/vendor/github.com/syndtr/goleveldb/leveldb/journal/journal.go
new file mode 100644 (file)
index 0000000..d094c3d
--- /dev/null
@@ -0,0 +1,524 @@
+// Copyright 2011 The LevelDB-Go Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+// Taken from: https://code.google.com/p/leveldb-go/source/browse/leveldb/record/record.go?r=1d5ccbe03246da926391ee12d1c6caae054ff4b0
+// License, authors and contributors informations can be found at bellow URLs respectively:
+//     https://code.google.com/p/leveldb-go/source/browse/LICENSE
+//     https://code.google.com/p/leveldb-go/source/browse/AUTHORS
+//  https://code.google.com/p/leveldb-go/source/browse/CONTRIBUTORS
+
+// Package journal reads and writes sequences of journals. Each journal is a stream
+// of bytes that completes before the next journal starts.
+//
+// When reading, call Next to obtain an io.Reader for the next journal. Next will
+// return io.EOF when there are no more journals. It is valid to call Next
+// without reading the current journal to exhaustion.
+//
+// When writing, call Next to obtain an io.Writer for the next journal. Calling
+// Next finishes the current journal. Call Close to finish the final journal.
+//
+// Optionally, call Flush to finish the current journal and flush the underlying
+// writer without starting a new journal. To start a new journal after flushing,
+// call Next.
+//
+// Neither Readers or Writers are safe to use concurrently.
+//
+// Example code:
+//     func read(r io.Reader) ([]string, error) {
+//             var ss []string
+//             journals := journal.NewReader(r, nil, true, true)
+//             for {
+//                     j, err := journals.Next()
+//                     if err == io.EOF {
+//                             break
+//                     }
+//                     if err != nil {
+//                             return nil, err
+//                     }
+//                     s, err := ioutil.ReadAll(j)
+//                     if err != nil {
+//                             return nil, err
+//                     }
+//                     ss = append(ss, string(s))
+//             }
+//             return ss, nil
+//     }
+//
+//     func write(w io.Writer, ss []string) error {
+//             journals := journal.NewWriter(w)
+//             for _, s := range ss {
+//                     j, err := journals.Next()
+//                     if err != nil {
+//                             return err
+//                     }
+//                     if _, err := j.Write([]byte(s)), err != nil {
+//                             return err
+//                     }
+//             }
+//             return journals.Close()
+//     }
+//
+// The wire format is that the stream is divided into 32KiB blocks, and each
+// block contains a number of tightly packed chunks. Chunks cannot cross block
+// boundaries. The last block may be shorter than 32 KiB. Any unused bytes in a
+// block must be zero.
+//
+// A journal maps to one or more chunks. Each chunk has a 7 byte header (a 4
+// byte checksum, a 2 byte little-endian uint16 length, and a 1 byte chunk type)
+// followed by a payload. The checksum is over the chunk type and the payload.
+//
+// There are four chunk types: whether the chunk is the full journal, or the
+// first, middle or last chunk of a multi-chunk journal. A multi-chunk journal
+// has one first chunk, zero or more middle chunks, and one last chunk.
+//
+// The wire format allows for limited recovery in the face of data corruption:
+// on a format error (such as a checksum mismatch), the reader moves to the
+// next block and looks for the next full or first chunk.
+package journal
+
+import (
+       "encoding/binary"
+       "fmt"
+       "io"
+
+       "github.com/syndtr/goleveldb/leveldb/errors"
+       "github.com/syndtr/goleveldb/leveldb/storage"
+       "github.com/syndtr/goleveldb/leveldb/util"
+)
+
+// These constants are part of the wire format and should not be changed.
+const (
+       fullChunkType   = 1
+       firstChunkType  = 2
+       middleChunkType = 3
+       lastChunkType   = 4
+)
+
+const (
+       blockSize  = 32 * 1024
+       headerSize = 7
+)
+
+type flusher interface {
+       Flush() error
+}
+
+// ErrCorrupted is the error type that generated by corrupted block or chunk.
+type ErrCorrupted struct {
+       Size   int
+       Reason string
+}
+
+func (e *ErrCorrupted) Error() string {
+       return fmt.Sprintf("leveldb/journal: block/chunk corrupted: %s (%d bytes)", e.Reason, e.Size)
+}
+
+// Dropper is the interface that wrap simple Drop method. The Drop
+// method will be called when the journal reader dropping a block or chunk.
+type Dropper interface {
+       Drop(err error)
+}
+
+// Reader reads journals from an underlying io.Reader.
+type Reader struct {
+       // r is the underlying reader.
+       r io.Reader
+       // the dropper.
+       dropper Dropper
+       // strict flag.
+       strict bool
+       // checksum flag.
+       checksum bool
+       // seq is the sequence number of the current journal.
+       seq int
+       // buf[i:j] is the unread portion of the current chunk's payload.
+       // The low bound, i, excludes the chunk header.
+       i, j int
+       // n is the number of bytes of buf that are valid. Once reading has started,
+       // only the final block can have n < blockSize.
+       n int
+       // last is whether the current chunk is the last chunk of the journal.
+       last bool
+       // err is any accumulated error.
+       err error
+       // buf is the buffer.
+       buf [blockSize]byte
+}
+
+// NewReader returns a new reader. The dropper may be nil, and if
+// strict is true then corrupted or invalid chunk will halt the journal
+// reader entirely.
+func NewReader(r io.Reader, dropper Dropper, strict, checksum bool) *Reader {
+       return &Reader{
+               r:        r,
+               dropper:  dropper,
+               strict:   strict,
+               checksum: checksum,
+               last:     true,
+       }
+}
+
+var errSkip = errors.New("leveldb/journal: skipped")
+
+func (r *Reader) corrupt(n int, reason string, skip bool) error {
+       if r.dropper != nil {
+               r.dropper.Drop(&ErrCorrupted{n, reason})
+       }
+       if r.strict && !skip {
+               r.err = errors.NewErrCorrupted(storage.FileDesc{}, &ErrCorrupted{n, reason})
+               return r.err
+       }
+       return errSkip
+}
+
+// nextChunk sets r.buf[r.i:r.j] to hold the next chunk's payload, reading the
+// next block into the buffer if necessary.
+func (r *Reader) nextChunk(first bool) error {
+       for {
+               if r.j+headerSize <= r.n {
+                       checksum := binary.LittleEndian.Uint32(r.buf[r.j+0 : r.j+4])
+                       length := binary.LittleEndian.Uint16(r.buf[r.j+4 : r.j+6])
+                       chunkType := r.buf[r.j+6]
+                       unprocBlock := r.n - r.j
+                       if checksum == 0 && length == 0 && chunkType == 0 {
+                               // Drop entire block.
+                               r.i = r.n
+                               r.j = r.n
+                               return r.corrupt(unprocBlock, "zero header", false)
+                       }
+                       if chunkType < fullChunkType || chunkType > lastChunkType {
+                               // Drop entire block.
+                               r.i = r.n
+                               r.j = r.n
+                               return r.corrupt(unprocBlock, fmt.Sprintf("invalid chunk type %#x", chunkType), false)
+                       }
+                       r.i = r.j + headerSize
+                       r.j = r.j + headerSize + int(length)
+                       if r.j > r.n {
+                               // Drop entire block.
+                               r.i = r.n
+                               r.j = r.n
+                               return r.corrupt(unprocBlock, "chunk length overflows block", false)
+                       } else if r.checksum && checksum != util.NewCRC(r.buf[r.i-1:r.j]).Value() {
+                               // Drop entire block.
+                               r.i = r.n
+                               r.j = r.n
+                               return r.corrupt(unprocBlock, "checksum mismatch", false)
+                       }
+                       if first && chunkType != fullChunkType && chunkType != firstChunkType {
+                               chunkLength := (r.j - r.i) + headerSize
+                               r.i = r.j
+                               // Report the error, but skip it.
+                               return r.corrupt(chunkLength, "orphan chunk", true)
+                       }
+                       r.last = chunkType == fullChunkType || chunkType == lastChunkType
+                       return nil
+               }
+
+               // The last block.
+               if r.n < blockSize && r.n > 0 {
+                       if !first {
+                               return r.corrupt(0, "missing chunk part", false)
+                       }
+                       r.err = io.EOF
+                       return r.err
+               }
+
+               // Read block.
+               n, err := io.ReadFull(r.r, r.buf[:])
+               if err != nil && err != io.EOF && err != io.ErrUnexpectedEOF {
+                       return err
+               }
+               if n == 0 {
+                       if !first {
+                               return r.corrupt(0, "missing chunk part", false)
+                       }
+                       r.err = io.EOF
+                       return r.err
+               }
+               r.i, r.j, r.n = 0, 0, n
+       }
+}
+
+// Next returns a reader for the next journal. It returns io.EOF if there are no
+// more journals. The reader returned becomes stale after the next Next call,
+// and should no longer be used. If strict is false, the reader will returns
+// io.ErrUnexpectedEOF error when found corrupted journal.
+func (r *Reader) Next() (io.Reader, error) {
+       r.seq++
+       if r.err != nil {
+               return nil, r.err
+       }
+       r.i = r.j
+       for {
+               if err := r.nextChunk(true); err == nil {
+                       break
+               } else if err != errSkip {
+                       return nil, err
+               }
+       }
+       return &singleReader{r, r.seq, nil}, nil
+}
+
+// Reset resets the journal reader, allows reuse of the journal reader. Reset returns
+// last accumulated error.
+func (r *Reader) Reset(reader io.Reader, dropper Dropper, strict, checksum bool) error {
+       r.seq++
+       err := r.err
+       r.r = reader
+       r.dropper = dropper
+       r.strict = strict
+       r.checksum = checksum
+       r.i = 0
+       r.j = 0
+       r.n = 0
+       r.last = true
+       r.err = nil
+       return err
+}
+
+type singleReader struct {
+       r   *Reader
+       seq int
+       err error
+}
+
+func (x *singleReader) Read(p []byte) (int, error) {
+       r := x.r
+       if r.seq != x.seq {
+               return 0, errors.New("leveldb/journal: stale reader")
+       }
+       if x.err != nil {
+               return 0, x.err
+       }
+       if r.err != nil {
+               return 0, r.err
+       }
+       for r.i == r.j {
+               if r.last {
+                       return 0, io.EOF
+               }
+               x.err = r.nextChunk(false)
+               if x.err != nil {
+                       if x.err == errSkip {
+                               x.err = io.ErrUnexpectedEOF
+                       }
+                       return 0, x.err
+               }
+       }
+       n := copy(p, r.buf[r.i:r.j])
+       r.i += n
+       return n, nil
+}
+
+func (x *singleReader) ReadByte() (byte, error) {
+       r := x.r
+       if r.seq != x.seq {
+               return 0, errors.New("leveldb/journal: stale reader")
+       }
+       if x.err != nil {
+               return 0, x.err
+       }
+       if r.err != nil {
+               return 0, r.err
+       }
+       for r.i == r.j {
+               if r.last {
+                       return 0, io.EOF
+               }
+               x.err = r.nextChunk(false)
+               if x.err != nil {
+                       if x.err == errSkip {
+                               x.err = io.ErrUnexpectedEOF
+                       }
+                       return 0, x.err
+               }
+       }
+       c := r.buf[r.i]
+       r.i++
+       return c, nil
+}
+
+// Writer writes journals to an underlying io.Writer.
+type Writer struct {
+       // w is the underlying writer.
+       w io.Writer
+       // seq is the sequence number of the current journal.
+       seq int
+       // f is w as a flusher.
+       f flusher
+       // buf[i:j] is the bytes that will become the current chunk.
+       // The low bound, i, includes the chunk header.
+       i, j int
+       // buf[:written] has already been written to w.
+       // written is zero unless Flush has been called.
+       written int
+       // first is whether the current chunk is the first chunk of the journal.
+       first bool
+       // pending is whether a chunk is buffered but not yet written.
+       pending bool
+       // err is any accumulated error.
+       err error
+       // buf is the buffer.
+       buf [blockSize]byte
+}
+
+// NewWriter returns a new Writer.
+func NewWriter(w io.Writer) *Writer {
+       f, _ := w.(flusher)
+       return &Writer{
+               w: w,
+               f: f,
+       }
+}
+
+// fillHeader fills in the header for the pending chunk.
+func (w *Writer) fillHeader(last bool) {
+       if w.i+headerSize > w.j || w.j > blockSize {
+               panic("leveldb/journal: bad writer state")
+       }
+       if last {
+               if w.first {
+                       w.buf[w.i+6] = fullChunkType
+               } else {
+                       w.buf[w.i+6] = lastChunkType
+               }
+       } else {
+               if w.first {
+                       w.buf[w.i+6] = firstChunkType
+               } else {
+                       w.buf[w.i+6] = middleChunkType
+               }
+       }
+       binary.LittleEndian.PutUint32(w.buf[w.i+0:w.i+4], util.NewCRC(w.buf[w.i+6:w.j]).Value())
+       binary.LittleEndian.PutUint16(w.buf[w.i+4:w.i+6], uint16(w.j-w.i-headerSize))
+}
+
+// writeBlock writes the buffered block to the underlying writer, and reserves
+// space for the next chunk's header.
+func (w *Writer) writeBlock() {
+       _, w.err = w.w.Write(w.buf[w.written:])
+       w.i = 0
+       w.j = headerSize
+       w.written = 0
+}
+
+// writePending finishes the current journal and writes the buffer to the
+// underlying writer.
+func (w *Writer) writePending() {
+       if w.err != nil {
+               return
+       }
+       if w.pending {
+               w.fillHeader(true)
+               w.pending = false
+       }
+       _, w.err = w.w.Write(w.buf[w.written:w.j])
+       w.written = w.j
+}
+
+// Close finishes the current journal and closes the writer.
+func (w *Writer) Close() error {
+       w.seq++
+       w.writePending()
+       if w.err != nil {
+               return w.err
+       }
+       w.err = errors.New("leveldb/journal: closed Writer")
+       return nil
+}
+
+// Flush finishes the current journal, writes to the underlying writer, and
+// flushes it if that writer implements interface{ Flush() error }.
+func (w *Writer) Flush() error {
+       w.seq++
+       w.writePending()
+       if w.err != nil {
+               return w.err
+       }
+       if w.f != nil {
+               w.err = w.f.Flush()
+               return w.err
+       }
+       return nil
+}
+
+// Reset resets the journal writer, allows reuse of the journal writer. Reset
+// will also closes the journal writer if not already.
+func (w *Writer) Reset(writer io.Writer) (err error) {
+       w.seq++
+       if w.err == nil {
+               w.writePending()
+               err = w.err
+       }
+       w.w = writer
+       w.f, _ = writer.(flusher)
+       w.i = 0
+       w.j = 0
+       w.written = 0
+       w.first = false
+       w.pending = false
+       w.err = nil
+       return
+}
+
+// Next returns a writer for the next journal. The writer returned becomes stale
+// after the next Close, Flush or Next call, and should no longer be used.
+func (w *Writer) Next() (io.Writer, error) {
+       w.seq++
+       if w.err != nil {
+               return nil, w.err
+       }
+       if w.pending {
+               w.fillHeader(true)
+       }
+       w.i = w.j
+       w.j = w.j + headerSize
+       // Check if there is room in the block for the header.
+       if w.j > blockSize {
+               // Fill in the rest of the block with zeroes.
+               for k := w.i; k < blockSize; k++ {
+                       w.buf[k] = 0
+               }
+               w.writeBlock()
+               if w.err != nil {
+                       return nil, w.err
+               }
+       }
+       w.first = true
+       w.pending = true
+       return singleWriter{w, w.seq}, nil
+}
+
+type singleWriter struct {
+       w   *Writer
+       seq int
+}
+
+func (x singleWriter) Write(p []byte) (int, error) {
+       w := x.w
+       if w.seq != x.seq {
+               return 0, errors.New("leveldb/journal: stale writer")
+       }
+       if w.err != nil {
+               return 0, w.err
+       }
+       n0 := len(p)
+       for len(p) > 0 {
+               // Write a block, if it is full.
+               if w.j == blockSize {
+                       w.fillHeader(false)
+                       w.writeBlock()
+                       if w.err != nil {
+                               return 0, w.err
+                       }
+                       w.first = false
+               }
+               // Copy bytes into the buffer.
+               n := copy(w.buf[w.j:], p)
+               w.j += n
+               p = p[n:]
+       }
+       return n0, nil
+}