1 // Copyright 2011 The LevelDB-Go Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style
3 // license that can be found in the LICENSE file.
5 // Taken from: https://code.google.com/p/leveldb-go/source/browse/leveldb/record/record.go?r=1d5ccbe03246da926391ee12d1c6caae054ff4b0
6 // License, authors and contributors informations can be found at bellow URLs respectively:
7 // https://code.google.com/p/leveldb-go/source/browse/LICENSE
8 // https://code.google.com/p/leveldb-go/source/browse/AUTHORS
9 // https://code.google.com/p/leveldb-go/source/browse/CONTRIBUTORS
11 // Package journal reads and writes sequences of journals. Each journal is a stream
12 // of bytes that completes before the next journal starts.
14 // When reading, call Next to obtain an io.Reader for the next journal. Next will
15 // return io.EOF when there are no more journals. It is valid to call Next
16 // without reading the current journal to exhaustion.
18 // When writing, call Next to obtain an io.Writer for the next journal. Calling
19 // Next finishes the current journal. Call Close to finish the final journal.
21 // Optionally, call Flush to finish the current journal and flush the underlying
22 // writer without starting a new journal. To start a new journal after flushing,
25 // Neither Readers or Writers are safe to use concurrently.
28 // func read(r io.Reader) ([]string, error) {
30 // journals := journal.NewReader(r, nil, true, true)
32 // j, err := journals.Next()
39 // s, err := ioutil.ReadAll(j)
43 // ss = append(ss, string(s))
48 // func write(w io.Writer, ss []string) error {
49 // journals := journal.NewWriter(w)
50 // for _, s := range ss {
51 // j, err := journals.Next()
55 // if _, err := j.Write([]byte(s)), err != nil {
59 // return journals.Close()
62 // The wire format is that the stream is divided into 32KiB blocks, and each
63 // block contains a number of tightly packed chunks. Chunks cannot cross block
64 // boundaries. The last block may be shorter than 32 KiB. Any unused bytes in a
65 // block must be zero.
67 // A journal maps to one or more chunks. Each chunk has a 7 byte header (a 4
68 // byte checksum, a 2 byte little-endian uint16 length, and a 1 byte chunk type)
69 // followed by a payload. The checksum is over the chunk type and the payload.
71 // There are four chunk types: whether the chunk is the full journal, or the
72 // first, middle or last chunk of a multi-chunk journal. A multi-chunk journal
73 // has one first chunk, zero or more middle chunks, and one last chunk.
75 // The wire format allows for limited recovery in the face of data corruption:
76 // on a format error (such as a checksum mismatch), the reader moves to the
77 // next block and looks for the next full or first chunk.
85 "github.com/syndtr/goleveldb/leveldb/errors"
86 "github.com/syndtr/goleveldb/leveldb/storage"
87 "github.com/syndtr/goleveldb/leveldb/util"
90 // These constants are part of the wire format and should not be changed.
103 type flusher interface {
107 // ErrCorrupted is the error type that generated by corrupted block or chunk.
108 type ErrCorrupted struct {
113 func (e *ErrCorrupted) Error() string {
114 return fmt.Sprintf("leveldb/journal: block/chunk corrupted: %s (%d bytes)", e.Reason, e.Size)
117 // Dropper is the interface that wrap simple Drop method. The Drop
118 // method will be called when the journal reader dropping a block or chunk.
119 type Dropper interface {
123 // Reader reads journals from an underlying io.Reader.
125 // r is the underlying reader.
133 // seq is the sequence number of the current journal.
135 // buf[i:j] is the unread portion of the current chunk's payload.
136 // The low bound, i, excludes the chunk header.
138 // n is the number of bytes of buf that are valid. Once reading has started,
139 // only the final block can have n < blockSize.
141 // last is whether the current chunk is the last chunk of the journal.
143 // err is any accumulated error.
145 // buf is the buffer.
149 // NewReader returns a new reader. The dropper may be nil, and if
150 // strict is true then corrupted or invalid chunk will halt the journal
152 func NewReader(r io.Reader, dropper Dropper, strict, checksum bool) *Reader {
162 var errSkip = errors.New("leveldb/journal: skipped")
164 func (r *Reader) corrupt(n int, reason string, skip bool) error {
165 if r.dropper != nil {
166 r.dropper.Drop(&ErrCorrupted{n, reason})
168 if r.strict && !skip {
169 r.err = errors.NewErrCorrupted(storage.FileDesc{}, &ErrCorrupted{n, reason})
175 // nextChunk sets r.buf[r.i:r.j] to hold the next chunk's payload, reading the
176 // next block into the buffer if necessary.
177 func (r *Reader) nextChunk(first bool) error {
179 if r.j+headerSize <= r.n {
180 checksum := binary.LittleEndian.Uint32(r.buf[r.j+0 : r.j+4])
181 length := binary.LittleEndian.Uint16(r.buf[r.j+4 : r.j+6])
182 chunkType := r.buf[r.j+6]
183 unprocBlock := r.n - r.j
184 if checksum == 0 && length == 0 && chunkType == 0 {
185 // Drop entire block.
188 return r.corrupt(unprocBlock, "zero header", false)
190 if chunkType < fullChunkType || chunkType > lastChunkType {
191 // Drop entire block.
194 return r.corrupt(unprocBlock, fmt.Sprintf("invalid chunk type %#x", chunkType), false)
196 r.i = r.j + headerSize
197 r.j = r.j + headerSize + int(length)
199 // Drop entire block.
202 return r.corrupt(unprocBlock, "chunk length overflows block", false)
203 } else if r.checksum && checksum != util.NewCRC(r.buf[r.i-1:r.j]).Value() {
204 // Drop entire block.
207 return r.corrupt(unprocBlock, "checksum mismatch", false)
209 if first && chunkType != fullChunkType && chunkType != firstChunkType {
210 chunkLength := (r.j - r.i) + headerSize
212 // Report the error, but skip it.
213 return r.corrupt(chunkLength, "orphan chunk", true)
215 r.last = chunkType == fullChunkType || chunkType == lastChunkType
220 if r.n < blockSize && r.n > 0 {
222 return r.corrupt(0, "missing chunk part", false)
229 n, err := io.ReadFull(r.r, r.buf[:])
230 if err != nil && err != io.EOF && err != io.ErrUnexpectedEOF {
235 return r.corrupt(0, "missing chunk part", false)
240 r.i, r.j, r.n = 0, 0, n
244 // Next returns a reader for the next journal. It returns io.EOF if there are no
245 // more journals. The reader returned becomes stale after the next Next call,
246 // and should no longer be used. If strict is false, the reader will returns
247 // io.ErrUnexpectedEOF error when found corrupted journal.
248 func (r *Reader) Next() (io.Reader, error) {
255 if err := r.nextChunk(true); err == nil {
257 } else if err != errSkip {
261 return &singleReader{r, r.seq, nil}, nil
264 // Reset resets the journal reader, allows reuse of the journal reader. Reset returns
265 // last accumulated error.
266 func (r *Reader) Reset(reader io.Reader, dropper Dropper, strict, checksum bool) error {
272 r.checksum = checksum
281 type singleReader struct {
287 func (x *singleReader) Read(p []byte) (int, error) {
290 return 0, errors.New("leveldb/journal: stale reader")
302 x.err = r.nextChunk(false)
304 if x.err == errSkip {
305 x.err = io.ErrUnexpectedEOF
310 n := copy(p, r.buf[r.i:r.j])
315 func (x *singleReader) ReadByte() (byte, error) {
318 return 0, errors.New("leveldb/journal: stale reader")
330 x.err = r.nextChunk(false)
332 if x.err == errSkip {
333 x.err = io.ErrUnexpectedEOF
343 // Writer writes journals to an underlying io.Writer.
345 // w is the underlying writer.
347 // seq is the sequence number of the current journal.
349 // f is w as a flusher.
351 // buf[i:j] is the bytes that will become the current chunk.
352 // The low bound, i, includes the chunk header.
354 // buf[:written] has already been written to w.
355 // written is zero unless Flush has been called.
357 // first is whether the current chunk is the first chunk of the journal.
359 // pending is whether a chunk is buffered but not yet written.
361 // err is any accumulated error.
363 // buf is the buffer.
367 // NewWriter returns a new Writer.
368 func NewWriter(w io.Writer) *Writer {
376 // fillHeader fills in the header for the pending chunk.
377 func (w *Writer) fillHeader(last bool) {
378 if w.i+headerSize > w.j || w.j > blockSize {
379 panic("leveldb/journal: bad writer state")
383 w.buf[w.i+6] = fullChunkType
385 w.buf[w.i+6] = lastChunkType
389 w.buf[w.i+6] = firstChunkType
391 w.buf[w.i+6] = middleChunkType
394 binary.LittleEndian.PutUint32(w.buf[w.i+0:w.i+4], util.NewCRC(w.buf[w.i+6:w.j]).Value())
395 binary.LittleEndian.PutUint16(w.buf[w.i+4:w.i+6], uint16(w.j-w.i-headerSize))
398 // writeBlock writes the buffered block to the underlying writer, and reserves
399 // space for the next chunk's header.
400 func (w *Writer) writeBlock() {
401 _, w.err = w.w.Write(w.buf[w.written:])
407 // writePending finishes the current journal and writes the buffer to the
408 // underlying writer.
409 func (w *Writer) writePending() {
417 _, w.err = w.w.Write(w.buf[w.written:w.j])
421 // Close finishes the current journal and closes the writer.
422 func (w *Writer) Close() error {
428 w.err = errors.New("leveldb/journal: closed Writer")
432 // Flush finishes the current journal, writes to the underlying writer, and
433 // flushes it if that writer implements interface{ Flush() error }.
434 func (w *Writer) Flush() error {
447 // Reset resets the journal writer, allows reuse of the journal writer. Reset
448 // will also closes the journal writer if not already.
449 func (w *Writer) Reset(writer io.Writer) (err error) {
456 w.f, _ = writer.(flusher)
466 // Next returns a writer for the next journal. The writer returned becomes stale
467 // after the next Close, Flush or Next call, and should no longer be used.
468 func (w *Writer) Next() (io.Writer, error) {
477 w.j = w.j + headerSize
478 // Check if there is room in the block for the header.
480 // Fill in the rest of the block with zeroes.
481 for k := w.i; k < blockSize; k++ {
491 return singleWriter{w, w.seq}, nil
494 type singleWriter struct {
499 func (x singleWriter) Write(p []byte) (int, error) {
502 return 0, errors.New("leveldb/journal: stale writer")
509 // Write a block, if it is full.
510 if w.j == blockSize {
518 // Copy bytes into the buffer.
519 n := copy(w.buf[w.j:], p)