OSDN Git Service

new repo
[bytom/vapor.git] / vendor / github.com / syndtr / goleveldb / leveldb / journal / journal.go
1 // Copyright 2011 The LevelDB-Go Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style
3 // license that can be found in the LICENSE file.
4
5 // Taken from: https://code.google.com/p/leveldb-go/source/browse/leveldb/record/record.go?r=1d5ccbe03246da926391ee12d1c6caae054ff4b0
6 // License, authors and contributors informations can be found at bellow URLs respectively:
7 //      https://code.google.com/p/leveldb-go/source/browse/LICENSE
8 //      https://code.google.com/p/leveldb-go/source/browse/AUTHORS
9 //  https://code.google.com/p/leveldb-go/source/browse/CONTRIBUTORS
10
11 // Package journal reads and writes sequences of journals. Each journal is a stream
12 // of bytes that completes before the next journal starts.
13 //
14 // When reading, call Next to obtain an io.Reader for the next journal. Next will
15 // return io.EOF when there are no more journals. It is valid to call Next
16 // without reading the current journal to exhaustion.
17 //
18 // When writing, call Next to obtain an io.Writer for the next journal. Calling
19 // Next finishes the current journal. Call Close to finish the final journal.
20 //
21 // Optionally, call Flush to finish the current journal and flush the underlying
22 // writer without starting a new journal. To start a new journal after flushing,
23 // call Next.
24 //
25 // Neither Readers or Writers are safe to use concurrently.
26 //
27 // Example code:
28 //      func read(r io.Reader) ([]string, error) {
29 //              var ss []string
30 //              journals := journal.NewReader(r, nil, true, true)
31 //              for {
32 //                      j, err := journals.Next()
33 //                      if err == io.EOF {
34 //                              break
35 //                      }
36 //                      if err != nil {
37 //                              return nil, err
38 //                      }
39 //                      s, err := ioutil.ReadAll(j)
40 //                      if err != nil {
41 //                              return nil, err
42 //                      }
43 //                      ss = append(ss, string(s))
44 //              }
45 //              return ss, nil
46 //      }
47 //
48 //      func write(w io.Writer, ss []string) error {
49 //              journals := journal.NewWriter(w)
50 //              for _, s := range ss {
51 //                      j, err := journals.Next()
52 //                      if err != nil {
53 //                              return err
54 //                      }
55 //                      if _, err := j.Write([]byte(s)), err != nil {
56 //                              return err
57 //                      }
58 //              }
59 //              return journals.Close()
60 //      }
61 //
62 // The wire format is that the stream is divided into 32KiB blocks, and each
63 // block contains a number of tightly packed chunks. Chunks cannot cross block
64 // boundaries. The last block may be shorter than 32 KiB. Any unused bytes in a
65 // block must be zero.
66 //
67 // A journal maps to one or more chunks. Each chunk has a 7 byte header (a 4
68 // byte checksum, a 2 byte little-endian uint16 length, and a 1 byte chunk type)
69 // followed by a payload. The checksum is over the chunk type and the payload.
70 //
71 // There are four chunk types: whether the chunk is the full journal, or the
72 // first, middle or last chunk of a multi-chunk journal. A multi-chunk journal
73 // has one first chunk, zero or more middle chunks, and one last chunk.
74 //
75 // The wire format allows for limited recovery in the face of data corruption:
76 // on a format error (such as a checksum mismatch), the reader moves to the
77 // next block and looks for the next full or first chunk.
78 package journal
79
80 import (
81         "encoding/binary"
82         "fmt"
83         "io"
84
85         "github.com/syndtr/goleveldb/leveldb/errors"
86         "github.com/syndtr/goleveldb/leveldb/storage"
87         "github.com/syndtr/goleveldb/leveldb/util"
88 )
89
90 // These constants are part of the wire format and should not be changed.
91 const (
92         fullChunkType   = 1
93         firstChunkType  = 2
94         middleChunkType = 3
95         lastChunkType   = 4
96 )
97
98 const (
99         blockSize  = 32 * 1024
100         headerSize = 7
101 )
102
103 type flusher interface {
104         Flush() error
105 }
106
107 // ErrCorrupted is the error type that generated by corrupted block or chunk.
108 type ErrCorrupted struct {
109         Size   int
110         Reason string
111 }
112
113 func (e *ErrCorrupted) Error() string {
114         return fmt.Sprintf("leveldb/journal: block/chunk corrupted: %s (%d bytes)", e.Reason, e.Size)
115 }
116
117 // Dropper is the interface that wrap simple Drop method. The Drop
118 // method will be called when the journal reader dropping a block or chunk.
119 type Dropper interface {
120         Drop(err error)
121 }
122
123 // Reader reads journals from an underlying io.Reader.
124 type Reader struct {
125         // r is the underlying reader.
126         r io.Reader
127         // the dropper.
128         dropper Dropper
129         // strict flag.
130         strict bool
131         // checksum flag.
132         checksum bool
133         // seq is the sequence number of the current journal.
134         seq int
135         // buf[i:j] is the unread portion of the current chunk's payload.
136         // The low bound, i, excludes the chunk header.
137         i, j int
138         // n is the number of bytes of buf that are valid. Once reading has started,
139         // only the final block can have n < blockSize.
140         n int
141         // last is whether the current chunk is the last chunk of the journal.
142         last bool
143         // err is any accumulated error.
144         err error
145         // buf is the buffer.
146         buf [blockSize]byte
147 }
148
149 // NewReader returns a new reader. The dropper may be nil, and if
150 // strict is true then corrupted or invalid chunk will halt the journal
151 // reader entirely.
152 func NewReader(r io.Reader, dropper Dropper, strict, checksum bool) *Reader {
153         return &Reader{
154                 r:        r,
155                 dropper:  dropper,
156                 strict:   strict,
157                 checksum: checksum,
158                 last:     true,
159         }
160 }
161
162 var errSkip = errors.New("leveldb/journal: skipped")
163
164 func (r *Reader) corrupt(n int, reason string, skip bool) error {
165         if r.dropper != nil {
166                 r.dropper.Drop(&ErrCorrupted{n, reason})
167         }
168         if r.strict && !skip {
169                 r.err = errors.NewErrCorrupted(storage.FileDesc{}, &ErrCorrupted{n, reason})
170                 return r.err
171         }
172         return errSkip
173 }
174
175 // nextChunk sets r.buf[r.i:r.j] to hold the next chunk's payload, reading the
176 // next block into the buffer if necessary.
177 func (r *Reader) nextChunk(first bool) error {
178         for {
179                 if r.j+headerSize <= r.n {
180                         checksum := binary.LittleEndian.Uint32(r.buf[r.j+0 : r.j+4])
181                         length := binary.LittleEndian.Uint16(r.buf[r.j+4 : r.j+6])
182                         chunkType := r.buf[r.j+6]
183                         unprocBlock := r.n - r.j
184                         if checksum == 0 && length == 0 && chunkType == 0 {
185                                 // Drop entire block.
186                                 r.i = r.n
187                                 r.j = r.n
188                                 return r.corrupt(unprocBlock, "zero header", false)
189                         }
190                         if chunkType < fullChunkType || chunkType > lastChunkType {
191                                 // Drop entire block.
192                                 r.i = r.n
193                                 r.j = r.n
194                                 return r.corrupt(unprocBlock, fmt.Sprintf("invalid chunk type %#x", chunkType), false)
195                         }
196                         r.i = r.j + headerSize
197                         r.j = r.j + headerSize + int(length)
198                         if r.j > r.n {
199                                 // Drop entire block.
200                                 r.i = r.n
201                                 r.j = r.n
202                                 return r.corrupt(unprocBlock, "chunk length overflows block", false)
203                         } else if r.checksum && checksum != util.NewCRC(r.buf[r.i-1:r.j]).Value() {
204                                 // Drop entire block.
205                                 r.i = r.n
206                                 r.j = r.n
207                                 return r.corrupt(unprocBlock, "checksum mismatch", false)
208                         }
209                         if first && chunkType != fullChunkType && chunkType != firstChunkType {
210                                 chunkLength := (r.j - r.i) + headerSize
211                                 r.i = r.j
212                                 // Report the error, but skip it.
213                                 return r.corrupt(chunkLength, "orphan chunk", true)
214                         }
215                         r.last = chunkType == fullChunkType || chunkType == lastChunkType
216                         return nil
217                 }
218
219                 // The last block.
220                 if r.n < blockSize && r.n > 0 {
221                         if !first {
222                                 return r.corrupt(0, "missing chunk part", false)
223                         }
224                         r.err = io.EOF
225                         return r.err
226                 }
227
228                 // Read block.
229                 n, err := io.ReadFull(r.r, r.buf[:])
230                 if err != nil && err != io.EOF && err != io.ErrUnexpectedEOF {
231                         return err
232                 }
233                 if n == 0 {
234                         if !first {
235                                 return r.corrupt(0, "missing chunk part", false)
236                         }
237                         r.err = io.EOF
238                         return r.err
239                 }
240                 r.i, r.j, r.n = 0, 0, n
241         }
242 }
243
244 // Next returns a reader for the next journal. It returns io.EOF if there are no
245 // more journals. The reader returned becomes stale after the next Next call,
246 // and should no longer be used. If strict is false, the reader will returns
247 // io.ErrUnexpectedEOF error when found corrupted journal.
248 func (r *Reader) Next() (io.Reader, error) {
249         r.seq++
250         if r.err != nil {
251                 return nil, r.err
252         }
253         r.i = r.j
254         for {
255                 if err := r.nextChunk(true); err == nil {
256                         break
257                 } else if err != errSkip {
258                         return nil, err
259                 }
260         }
261         return &singleReader{r, r.seq, nil}, nil
262 }
263
264 // Reset resets the journal reader, allows reuse of the journal reader. Reset returns
265 // last accumulated error.
266 func (r *Reader) Reset(reader io.Reader, dropper Dropper, strict, checksum bool) error {
267         r.seq++
268         err := r.err
269         r.r = reader
270         r.dropper = dropper
271         r.strict = strict
272         r.checksum = checksum
273         r.i = 0
274         r.j = 0
275         r.n = 0
276         r.last = true
277         r.err = nil
278         return err
279 }
280
281 type singleReader struct {
282         r   *Reader
283         seq int
284         err error
285 }
286
287 func (x *singleReader) Read(p []byte) (int, error) {
288         r := x.r
289         if r.seq != x.seq {
290                 return 0, errors.New("leveldb/journal: stale reader")
291         }
292         if x.err != nil {
293                 return 0, x.err
294         }
295         if r.err != nil {
296                 return 0, r.err
297         }
298         for r.i == r.j {
299                 if r.last {
300                         return 0, io.EOF
301                 }
302                 x.err = r.nextChunk(false)
303                 if x.err != nil {
304                         if x.err == errSkip {
305                                 x.err = io.ErrUnexpectedEOF
306                         }
307                         return 0, x.err
308                 }
309         }
310         n := copy(p, r.buf[r.i:r.j])
311         r.i += n
312         return n, nil
313 }
314
315 func (x *singleReader) ReadByte() (byte, error) {
316         r := x.r
317         if r.seq != x.seq {
318                 return 0, errors.New("leveldb/journal: stale reader")
319         }
320         if x.err != nil {
321                 return 0, x.err
322         }
323         if r.err != nil {
324                 return 0, r.err
325         }
326         for r.i == r.j {
327                 if r.last {
328                         return 0, io.EOF
329                 }
330                 x.err = r.nextChunk(false)
331                 if x.err != nil {
332                         if x.err == errSkip {
333                                 x.err = io.ErrUnexpectedEOF
334                         }
335                         return 0, x.err
336                 }
337         }
338         c := r.buf[r.i]
339         r.i++
340         return c, nil
341 }
342
343 // Writer writes journals to an underlying io.Writer.
344 type Writer struct {
345         // w is the underlying writer.
346         w io.Writer
347         // seq is the sequence number of the current journal.
348         seq int
349         // f is w as a flusher.
350         f flusher
351         // buf[i:j] is the bytes that will become the current chunk.
352         // The low bound, i, includes the chunk header.
353         i, j int
354         // buf[:written] has already been written to w.
355         // written is zero unless Flush has been called.
356         written int
357         // first is whether the current chunk is the first chunk of the journal.
358         first bool
359         // pending is whether a chunk is buffered but not yet written.
360         pending bool
361         // err is any accumulated error.
362         err error
363         // buf is the buffer.
364         buf [blockSize]byte
365 }
366
367 // NewWriter returns a new Writer.
368 func NewWriter(w io.Writer) *Writer {
369         f, _ := w.(flusher)
370         return &Writer{
371                 w: w,
372                 f: f,
373         }
374 }
375
376 // fillHeader fills in the header for the pending chunk.
377 func (w *Writer) fillHeader(last bool) {
378         if w.i+headerSize > w.j || w.j > blockSize {
379                 panic("leveldb/journal: bad writer state")
380         }
381         if last {
382                 if w.first {
383                         w.buf[w.i+6] = fullChunkType
384                 } else {
385                         w.buf[w.i+6] = lastChunkType
386                 }
387         } else {
388                 if w.first {
389                         w.buf[w.i+6] = firstChunkType
390                 } else {
391                         w.buf[w.i+6] = middleChunkType
392                 }
393         }
394         binary.LittleEndian.PutUint32(w.buf[w.i+0:w.i+4], util.NewCRC(w.buf[w.i+6:w.j]).Value())
395         binary.LittleEndian.PutUint16(w.buf[w.i+4:w.i+6], uint16(w.j-w.i-headerSize))
396 }
397
398 // writeBlock writes the buffered block to the underlying writer, and reserves
399 // space for the next chunk's header.
400 func (w *Writer) writeBlock() {
401         _, w.err = w.w.Write(w.buf[w.written:])
402         w.i = 0
403         w.j = headerSize
404         w.written = 0
405 }
406
407 // writePending finishes the current journal and writes the buffer to the
408 // underlying writer.
409 func (w *Writer) writePending() {
410         if w.err != nil {
411                 return
412         }
413         if w.pending {
414                 w.fillHeader(true)
415                 w.pending = false
416         }
417         _, w.err = w.w.Write(w.buf[w.written:w.j])
418         w.written = w.j
419 }
420
421 // Close finishes the current journal and closes the writer.
422 func (w *Writer) Close() error {
423         w.seq++
424         w.writePending()
425         if w.err != nil {
426                 return w.err
427         }
428         w.err = errors.New("leveldb/journal: closed Writer")
429         return nil
430 }
431
432 // Flush finishes the current journal, writes to the underlying writer, and
433 // flushes it if that writer implements interface{ Flush() error }.
434 func (w *Writer) Flush() error {
435         w.seq++
436         w.writePending()
437         if w.err != nil {
438                 return w.err
439         }
440         if w.f != nil {
441                 w.err = w.f.Flush()
442                 return w.err
443         }
444         return nil
445 }
446
447 // Reset resets the journal writer, allows reuse of the journal writer. Reset
448 // will also closes the journal writer if not already.
449 func (w *Writer) Reset(writer io.Writer) (err error) {
450         w.seq++
451         if w.err == nil {
452                 w.writePending()
453                 err = w.err
454         }
455         w.w = writer
456         w.f, _ = writer.(flusher)
457         w.i = 0
458         w.j = 0
459         w.written = 0
460         w.first = false
461         w.pending = false
462         w.err = nil
463         return
464 }
465
466 // Next returns a writer for the next journal. The writer returned becomes stale
467 // after the next Close, Flush or Next call, and should no longer be used.
468 func (w *Writer) Next() (io.Writer, error) {
469         w.seq++
470         if w.err != nil {
471                 return nil, w.err
472         }
473         if w.pending {
474                 w.fillHeader(true)
475         }
476         w.i = w.j
477         w.j = w.j + headerSize
478         // Check if there is room in the block for the header.
479         if w.j > blockSize {
480                 // Fill in the rest of the block with zeroes.
481                 for k := w.i; k < blockSize; k++ {
482                         w.buf[k] = 0
483                 }
484                 w.writeBlock()
485                 if w.err != nil {
486                         return nil, w.err
487                 }
488         }
489         w.first = true
490         w.pending = true
491         return singleWriter{w, w.seq}, nil
492 }
493
494 type singleWriter struct {
495         w   *Writer
496         seq int
497 }
498
499 func (x singleWriter) Write(p []byte) (int, error) {
500         w := x.w
501         if w.seq != x.seq {
502                 return 0, errors.New("leveldb/journal: stale writer")
503         }
504         if w.err != nil {
505                 return 0, w.err
506         }
507         n0 := len(p)
508         for len(p) > 0 {
509                 // Write a block, if it is full.
510                 if w.j == blockSize {
511                         w.fillHeader(false)
512                         w.writeBlock()
513                         if w.err != nil {
514                                 return 0, w.err
515                         }
516                         w.first = false
517                 }
518                 // Copy bytes into the buffer.
519                 n := copy(w.buf[w.j:], p)
520                 w.j += n
521                 p = p[n:]
522         }
523         return n0, nil
524 }