1 // Copyright (c) 2012, Suryandaru Triandana <syndtr@gmail.com>
2 // All rights reserved.
4 // Use of this source code is governed by a BSD-style license that can be
5 // found in the LICENSE file.
15 "github.com/golang/snappy"
17 "github.com/syndtr/goleveldb/leveldb/comparer"
18 "github.com/syndtr/goleveldb/leveldb/filter"
19 "github.com/syndtr/goleveldb/leveldb/opt"
20 "github.com/syndtr/goleveldb/leveldb/util"
23 func sharedPrefixLen(a, b []byte) int {
28 for i < n && a[i] == b[i] {
34 type blockWriter struct {
43 func (w *blockWriter) append(key, value []byte) {
45 if w.nEntries%w.restartInterval == 0 {
46 w.restarts = append(w.restarts, uint32(w.buf.Len()))
48 nShared = sharedPrefixLen(w.prevKey, key)
50 n := binary.PutUvarint(w.scratch[0:], uint64(nShared))
51 n += binary.PutUvarint(w.scratch[n:], uint64(len(key)-nShared))
52 n += binary.PutUvarint(w.scratch[n:], uint64(len(value)))
53 w.buf.Write(w.scratch[:n])
54 w.buf.Write(key[nShared:])
56 w.prevKey = append(w.prevKey[:0], key...)
60 func (w *blockWriter) finish() {
61 // Write restarts entry.
63 // Must have at least one restart entry.
64 w.restarts = append(w.restarts, 0)
66 w.restarts = append(w.restarts, uint32(len(w.restarts)))
67 for _, x := range w.restarts {
68 buf4 := w.buf.Alloc(4)
69 binary.LittleEndian.PutUint32(buf4, x)
73 func (w *blockWriter) reset() {
76 w.restarts = w.restarts[:0]
79 func (w *blockWriter) bytesLen() int {
80 restartsLen := len(w.restarts)
84 return w.buf.Len() + 4*restartsLen + 4
87 type filterWriter struct {
88 generator filter.FilterGenerator
94 func (w *filterWriter) add(key []byte) {
95 if w.generator == nil {
102 func (w *filterWriter) flush(offset uint64) {
103 if w.generator == nil {
106 for x := int(offset / filterBase); x > len(w.offsets); {
111 func (w *filterWriter) finish() {
112 if w.generator == nil {
115 // Generate last keys.
120 w.offsets = append(w.offsets, uint32(w.buf.Len()))
121 for _, x := range w.offsets {
122 buf4 := w.buf.Alloc(4)
123 binary.LittleEndian.PutUint32(buf4, x)
125 w.buf.WriteByte(filterBaseLg)
128 func (w *filterWriter) generate() {
130 w.offsets = append(w.offsets, uint32(w.buf.Len()))
133 w.generator.Generate(&w.buf)
138 // Writer is a table writer.
143 cmp comparer.Comparer
145 compression opt.Compression
148 dataBlock blockWriter
149 indexBlock blockWriter
150 filterBlock filterWriter
151 pendingBH blockHandle
154 // Scratch allocated enough for 5 uvarint. Block writer should not use
155 // first 20-bytes since it will be used to encode block handle, which
156 // then passed to the block writer itself.
158 comparerScratch []byte
159 compressionScratch []byte
162 func (w *Writer) writeBlock(buf *util.Buffer, compression opt.Compression) (bh blockHandle, err error) {
163 // Compress the buffer if necessary.
165 if compression == opt.SnappyCompression {
166 // Allocate scratch enough for compression and block trailer.
167 if n := snappy.MaxEncodedLen(buf.Len()) + blockTrailerLen; len(w.compressionScratch) < n {
168 w.compressionScratch = make([]byte, n)
170 compressed := snappy.Encode(w.compressionScratch, buf.Bytes())
172 b = compressed[:n+blockTrailerLen]
173 b[n] = blockTypeSnappyCompression
175 tmp := buf.Alloc(blockTrailerLen)
176 tmp[0] = blockTypeNoCompression
180 // Calculate the checksum.
182 checksum := util.NewCRC(b[:n]).Value()
183 binary.LittleEndian.PutUint32(b[n:], checksum)
185 // Write the buffer to the file.
186 _, err = w.writer.Write(b)
190 bh = blockHandle{w.offset, uint64(len(b) - blockTrailerLen)}
191 w.offset += uint64(len(b))
195 func (w *Writer) flushPendingBH(key []byte) {
196 if w.pendingBH.length == 0 {
201 separator = w.cmp.Successor(w.comparerScratch[:0], w.dataBlock.prevKey)
203 separator = w.cmp.Separator(w.comparerScratch[:0], w.dataBlock.prevKey, key)
205 if separator == nil {
206 separator = w.dataBlock.prevKey
208 w.comparerScratch = separator
210 n := encodeBlockHandle(w.scratch[:20], w.pendingBH)
211 // Append the block handle to the index block.
212 w.indexBlock.append(separator, w.scratch[:n])
213 // Reset prev key of the data block.
214 w.dataBlock.prevKey = w.dataBlock.prevKey[:0]
215 // Clear pending block handle.
216 w.pendingBH = blockHandle{}
219 func (w *Writer) finishBlock() error {
221 bh, err := w.writeBlock(&w.dataBlock.buf, w.compression)
226 // Reset the data block.
228 // Flush the filter block.
229 w.filterBlock.flush(w.offset)
233 // Append appends key/value pair to the table. The keys passed must
234 // be in increasing order.
236 // It is safe to modify the contents of the arguments after Append returns.
237 func (w *Writer) Append(key, value []byte) error {
241 if w.nEntries > 0 && w.cmp.Compare(w.dataBlock.prevKey, key) >= 0 {
242 w.err = fmt.Errorf("leveldb/table: Writer: keys are not in increasing order: %q, %q", w.dataBlock.prevKey, key)
246 w.flushPendingBH(key)
247 // Append key/value pair to the data block.
248 w.dataBlock.append(key, value)
249 // Add key to the filter block.
250 w.filterBlock.add(key)
252 // Finish the data block if block size target reached.
253 if w.dataBlock.bytesLen() >= w.blockSize {
254 if err := w.finishBlock(); err != nil {
263 // BlocksLen returns number of blocks written so far.
264 func (w *Writer) BlocksLen() int {
265 n := w.indexBlock.nEntries
266 if w.pendingBH.length > 0 {
267 // Includes the pending block.
273 // EntriesLen returns number of entries added so far.
274 func (w *Writer) EntriesLen() int {
278 // BytesLen returns number of bytes written so far.
279 func (w *Writer) BytesLen() int {
283 // Close will finalize the table. Calling Append is not possible
284 // after Close, but calling BlocksLen, EntriesLen and BytesLen
285 // is still possible.
286 func (w *Writer) Close() error {
291 // Write the last data block. Or empty data block if there
292 // aren't any data blocks at all.
293 if w.dataBlock.nEntries > 0 || w.nEntries == 0 {
294 if err := w.finishBlock(); err != nil {
299 w.flushPendingBH(nil)
301 // Write the filter block.
302 var filterBH blockHandle
303 w.filterBlock.finish()
304 if buf := &w.filterBlock.buf; buf.Len() > 0 {
305 filterBH, w.err = w.writeBlock(buf, opt.NoCompression)
311 // Write the metaindex block.
312 if filterBH.length > 0 {
313 key := []byte("filter." + w.filter.Name())
314 n := encodeBlockHandle(w.scratch[:20], filterBH)
315 w.dataBlock.append(key, w.scratch[:n])
318 metaindexBH, err := w.writeBlock(&w.dataBlock.buf, w.compression)
324 // Write the index block.
325 w.indexBlock.finish()
326 indexBH, err := w.writeBlock(&w.indexBlock.buf, w.compression)
332 // Write the table footer.
333 footer := w.scratch[:footerLen]
334 for i := range footer {
337 n := encodeBlockHandle(footer, metaindexBH)
338 encodeBlockHandle(footer[n:], indexBH)
339 copy(footer[footerLen-len(magic):], magic)
340 if _, err := w.writer.Write(footer); err != nil {
344 w.offset += footerLen
346 w.err = errors.New("leveldb/table: writer is closed")
350 // NewWriter creates a new initialized table writer for the file.
352 // Table writer is not safe for concurrent use.
353 func NewWriter(f io.Writer, o *opt.Options) *Writer {
356 cmp: o.GetComparer(),
357 filter: o.GetFilter(),
358 compression: o.GetCompression(),
359 blockSize: o.GetBlockSize(),
360 comparerScratch: make([]byte, 0),
363 w.dataBlock.restartInterval = o.GetBlockRestartInterval()
364 // The first 20-bytes are used for encoding block handle.
365 w.dataBlock.scratch = w.scratch[20:]
367 w.indexBlock.restartInterval = 1
368 w.indexBlock.scratch = w.scratch[20:]
371 w.filterBlock.generator = w.filter.NewGenerator()
372 w.filterBlock.flush(0)