OSDN Git Service

new repo
[bytom/vapor.git] / vendor / github.com / syndtr / goleveldb / leveldb / table / writer.go
1 // Copyright (c) 2012, Suryandaru Triandana <syndtr@gmail.com>
2 // All rights reserved.
3 //
4 // Use of this source code is governed by a BSD-style license that can be
5 // found in the LICENSE file.
6
7 package table
8
9 import (
10         "encoding/binary"
11         "errors"
12         "fmt"
13         "io"
14
15         "github.com/golang/snappy"
16
17         "github.com/syndtr/goleveldb/leveldb/comparer"
18         "github.com/syndtr/goleveldb/leveldb/filter"
19         "github.com/syndtr/goleveldb/leveldb/opt"
20         "github.com/syndtr/goleveldb/leveldb/util"
21 )
22
23 func sharedPrefixLen(a, b []byte) int {
24         i, n := 0, len(a)
25         if n > len(b) {
26                 n = len(b)
27         }
28         for i < n && a[i] == b[i] {
29                 i++
30         }
31         return i
32 }
33
34 type blockWriter struct {
35         restartInterval int
36         buf             util.Buffer
37         nEntries        int
38         prevKey         []byte
39         restarts        []uint32
40         scratch         []byte
41 }
42
43 func (w *blockWriter) append(key, value []byte) {
44         nShared := 0
45         if w.nEntries%w.restartInterval == 0 {
46                 w.restarts = append(w.restarts, uint32(w.buf.Len()))
47         } else {
48                 nShared = sharedPrefixLen(w.prevKey, key)
49         }
50         n := binary.PutUvarint(w.scratch[0:], uint64(nShared))
51         n += binary.PutUvarint(w.scratch[n:], uint64(len(key)-nShared))
52         n += binary.PutUvarint(w.scratch[n:], uint64(len(value)))
53         w.buf.Write(w.scratch[:n])
54         w.buf.Write(key[nShared:])
55         w.buf.Write(value)
56         w.prevKey = append(w.prevKey[:0], key...)
57         w.nEntries++
58 }
59
60 func (w *blockWriter) finish() {
61         // Write restarts entry.
62         if w.nEntries == 0 {
63                 // Must have at least one restart entry.
64                 w.restarts = append(w.restarts, 0)
65         }
66         w.restarts = append(w.restarts, uint32(len(w.restarts)))
67         for _, x := range w.restarts {
68                 buf4 := w.buf.Alloc(4)
69                 binary.LittleEndian.PutUint32(buf4, x)
70         }
71 }
72
73 func (w *blockWriter) reset() {
74         w.buf.Reset()
75         w.nEntries = 0
76         w.restarts = w.restarts[:0]
77 }
78
79 func (w *blockWriter) bytesLen() int {
80         restartsLen := len(w.restarts)
81         if restartsLen == 0 {
82                 restartsLen = 1
83         }
84         return w.buf.Len() + 4*restartsLen + 4
85 }
86
87 type filterWriter struct {
88         generator filter.FilterGenerator
89         buf       util.Buffer
90         nKeys     int
91         offsets   []uint32
92 }
93
94 func (w *filterWriter) add(key []byte) {
95         if w.generator == nil {
96                 return
97         }
98         w.generator.Add(key)
99         w.nKeys++
100 }
101
102 func (w *filterWriter) flush(offset uint64) {
103         if w.generator == nil {
104                 return
105         }
106         for x := int(offset / filterBase); x > len(w.offsets); {
107                 w.generate()
108         }
109 }
110
111 func (w *filterWriter) finish() {
112         if w.generator == nil {
113                 return
114         }
115         // Generate last keys.
116
117         if w.nKeys > 0 {
118                 w.generate()
119         }
120         w.offsets = append(w.offsets, uint32(w.buf.Len()))
121         for _, x := range w.offsets {
122                 buf4 := w.buf.Alloc(4)
123                 binary.LittleEndian.PutUint32(buf4, x)
124         }
125         w.buf.WriteByte(filterBaseLg)
126 }
127
128 func (w *filterWriter) generate() {
129         // Record offset.
130         w.offsets = append(w.offsets, uint32(w.buf.Len()))
131         // Generate filters.
132         if w.nKeys > 0 {
133                 w.generator.Generate(&w.buf)
134                 w.nKeys = 0
135         }
136 }
137
138 // Writer is a table writer.
139 type Writer struct {
140         writer io.Writer
141         err    error
142         // Options
143         cmp         comparer.Comparer
144         filter      filter.Filter
145         compression opt.Compression
146         blockSize   int
147
148         dataBlock   blockWriter
149         indexBlock  blockWriter
150         filterBlock filterWriter
151         pendingBH   blockHandle
152         offset      uint64
153         nEntries    int
154         // Scratch allocated enough for 5 uvarint. Block writer should not use
155         // first 20-bytes since it will be used to encode block handle, which
156         // then passed to the block writer itself.
157         scratch            [50]byte
158         comparerScratch    []byte
159         compressionScratch []byte
160 }
161
162 func (w *Writer) writeBlock(buf *util.Buffer, compression opt.Compression) (bh blockHandle, err error) {
163         // Compress the buffer if necessary.
164         var b []byte
165         if compression == opt.SnappyCompression {
166                 // Allocate scratch enough for compression and block trailer.
167                 if n := snappy.MaxEncodedLen(buf.Len()) + blockTrailerLen; len(w.compressionScratch) < n {
168                         w.compressionScratch = make([]byte, n)
169                 }
170                 compressed := snappy.Encode(w.compressionScratch, buf.Bytes())
171                 n := len(compressed)
172                 b = compressed[:n+blockTrailerLen]
173                 b[n] = blockTypeSnappyCompression
174         } else {
175                 tmp := buf.Alloc(blockTrailerLen)
176                 tmp[0] = blockTypeNoCompression
177                 b = buf.Bytes()
178         }
179
180         // Calculate the checksum.
181         n := len(b) - 4
182         checksum := util.NewCRC(b[:n]).Value()
183         binary.LittleEndian.PutUint32(b[n:], checksum)
184
185         // Write the buffer to the file.
186         _, err = w.writer.Write(b)
187         if err != nil {
188                 return
189         }
190         bh = blockHandle{w.offset, uint64(len(b) - blockTrailerLen)}
191         w.offset += uint64(len(b))
192         return
193 }
194
195 func (w *Writer) flushPendingBH(key []byte) {
196         if w.pendingBH.length == 0 {
197                 return
198         }
199         var separator []byte
200         if len(key) == 0 {
201                 separator = w.cmp.Successor(w.comparerScratch[:0], w.dataBlock.prevKey)
202         } else {
203                 separator = w.cmp.Separator(w.comparerScratch[:0], w.dataBlock.prevKey, key)
204         }
205         if separator == nil {
206                 separator = w.dataBlock.prevKey
207         } else {
208                 w.comparerScratch = separator
209         }
210         n := encodeBlockHandle(w.scratch[:20], w.pendingBH)
211         // Append the block handle to the index block.
212         w.indexBlock.append(separator, w.scratch[:n])
213         // Reset prev key of the data block.
214         w.dataBlock.prevKey = w.dataBlock.prevKey[:0]
215         // Clear pending block handle.
216         w.pendingBH = blockHandle{}
217 }
218
219 func (w *Writer) finishBlock() error {
220         w.dataBlock.finish()
221         bh, err := w.writeBlock(&w.dataBlock.buf, w.compression)
222         if err != nil {
223                 return err
224         }
225         w.pendingBH = bh
226         // Reset the data block.
227         w.dataBlock.reset()
228         // Flush the filter block.
229         w.filterBlock.flush(w.offset)
230         return nil
231 }
232
233 // Append appends key/value pair to the table. The keys passed must
234 // be in increasing order.
235 //
236 // It is safe to modify the contents of the arguments after Append returns.
237 func (w *Writer) Append(key, value []byte) error {
238         if w.err != nil {
239                 return w.err
240         }
241         if w.nEntries > 0 && w.cmp.Compare(w.dataBlock.prevKey, key) >= 0 {
242                 w.err = fmt.Errorf("leveldb/table: Writer: keys are not in increasing order: %q, %q", w.dataBlock.prevKey, key)
243                 return w.err
244         }
245
246         w.flushPendingBH(key)
247         // Append key/value pair to the data block.
248         w.dataBlock.append(key, value)
249         // Add key to the filter block.
250         w.filterBlock.add(key)
251
252         // Finish the data block if block size target reached.
253         if w.dataBlock.bytesLen() >= w.blockSize {
254                 if err := w.finishBlock(); err != nil {
255                         w.err = err
256                         return w.err
257                 }
258         }
259         w.nEntries++
260         return nil
261 }
262
263 // BlocksLen returns number of blocks written so far.
264 func (w *Writer) BlocksLen() int {
265         n := w.indexBlock.nEntries
266         if w.pendingBH.length > 0 {
267                 // Includes the pending block.
268                 n++
269         }
270         return n
271 }
272
273 // EntriesLen returns number of entries added so far.
274 func (w *Writer) EntriesLen() int {
275         return w.nEntries
276 }
277
278 // BytesLen returns number of bytes written so far.
279 func (w *Writer) BytesLen() int {
280         return int(w.offset)
281 }
282
283 // Close will finalize the table. Calling Append is not possible
284 // after Close, but calling BlocksLen, EntriesLen and BytesLen
285 // is still possible.
286 func (w *Writer) Close() error {
287         if w.err != nil {
288                 return w.err
289         }
290
291         // Write the last data block. Or empty data block if there
292         // aren't any data blocks at all.
293         if w.dataBlock.nEntries > 0 || w.nEntries == 0 {
294                 if err := w.finishBlock(); err != nil {
295                         w.err = err
296                         return w.err
297                 }
298         }
299         w.flushPendingBH(nil)
300
301         // Write the filter block.
302         var filterBH blockHandle
303         w.filterBlock.finish()
304         if buf := &w.filterBlock.buf; buf.Len() > 0 {
305                 filterBH, w.err = w.writeBlock(buf, opt.NoCompression)
306                 if w.err != nil {
307                         return w.err
308                 }
309         }
310
311         // Write the metaindex block.
312         if filterBH.length > 0 {
313                 key := []byte("filter." + w.filter.Name())
314                 n := encodeBlockHandle(w.scratch[:20], filterBH)
315                 w.dataBlock.append(key, w.scratch[:n])
316         }
317         w.dataBlock.finish()
318         metaindexBH, err := w.writeBlock(&w.dataBlock.buf, w.compression)
319         if err != nil {
320                 w.err = err
321                 return w.err
322         }
323
324         // Write the index block.
325         w.indexBlock.finish()
326         indexBH, err := w.writeBlock(&w.indexBlock.buf, w.compression)
327         if err != nil {
328                 w.err = err
329                 return w.err
330         }
331
332         // Write the table footer.
333         footer := w.scratch[:footerLen]
334         for i := range footer {
335                 footer[i] = 0
336         }
337         n := encodeBlockHandle(footer, metaindexBH)
338         encodeBlockHandle(footer[n:], indexBH)
339         copy(footer[footerLen-len(magic):], magic)
340         if _, err := w.writer.Write(footer); err != nil {
341                 w.err = err
342                 return w.err
343         }
344         w.offset += footerLen
345
346         w.err = errors.New("leveldb/table: writer is closed")
347         return nil
348 }
349
350 // NewWriter creates a new initialized table writer for the file.
351 //
352 // Table writer is not safe for concurrent use.
353 func NewWriter(f io.Writer, o *opt.Options) *Writer {
354         w := &Writer{
355                 writer:          f,
356                 cmp:             o.GetComparer(),
357                 filter:          o.GetFilter(),
358                 compression:     o.GetCompression(),
359                 blockSize:       o.GetBlockSize(),
360                 comparerScratch: make([]byte, 0),
361         }
362         // data block
363         w.dataBlock.restartInterval = o.GetBlockRestartInterval()
364         // The first 20-bytes are used for encoding block handle.
365         w.dataBlock.scratch = w.scratch[20:]
366         // index block
367         w.indexBlock.restartInterval = 1
368         w.indexBlock.scratch = w.scratch[20:]
369         // filter block
370         if w.filter != nil {
371                 w.filterBlock.generator = w.filter.NewGenerator()
372                 w.filterBlock.flush(0)
373         }
374         return w
375 }