1 // Copyright (c) 2012, Suryandaru Triandana <syndtr@gmail.com>
2 // All rights reserved.
4 // Use of this source code is governed by a BSD-style license that can be
5 // found in the LICENSE file.
14 "github.com/syndtr/goleveldb/leveldb/errors"
15 "github.com/syndtr/goleveldb/leveldb/memdb"
16 "github.com/syndtr/goleveldb/leveldb/storage"
19 // ErrBatchCorrupted records reason of batch corruption. This error will be
20 // wrapped with errors.ErrCorrupted.
21 type ErrBatchCorrupted struct {
25 func (e *ErrBatchCorrupted) Error() string {
26 return fmt.Sprintf("leveldb: batch corrupted: %s", e.Reason)
29 func newErrBatchCorrupted(reason string) error {
30 return errors.NewErrCorrupted(storage.FileDesc{}, &ErrBatchCorrupted{reason})
34 batchHeaderLen = 8 + 4
39 // BatchReplay wraps basic batch operations.
40 type BatchReplay interface {
41 Put(key, value []byte)
45 type batchIndex struct {
48 valuePos, valueLen int
51 func (index batchIndex) k(data []byte) []byte {
52 return data[index.keyPos : index.keyPos+index.keyLen]
55 func (index batchIndex) v(data []byte) []byte {
56 if index.valueLen != 0 {
57 return data[index.valuePos : index.valuePos+index.valueLen]
62 func (index batchIndex) kv(data []byte) (key, value []byte) {
63 return index.k(data), index.v(data)
66 // Batch is a write batch.
71 // internalLen is sums of key/value pair length plus 8-bytes internal key.
75 func (b *Batch) grow(n int) {
77 if cap(b.data)-o < n {
79 if len(b.index) > batchGrowRec {
80 div = len(b.index) / batchGrowRec
82 ndata := make([]byte, o, o+n+o/div)
88 func (b *Batch) appendRec(kt keyType, key, value []byte) {
89 n := 1 + binary.MaxVarintLen32 + len(key)
91 n += binary.MaxVarintLen32 + len(value)
94 index := batchIndex{keyType: kt}
99 o += binary.PutUvarint(data[o:], uint64(len(key)))
101 index.keyLen = len(key)
102 o += copy(data[o:], key)
103 if kt == keyTypeVal {
104 o += binary.PutUvarint(data[o:], uint64(len(value)))
106 index.valueLen = len(value)
107 o += copy(data[o:], value)
110 b.index = append(b.index, index)
111 b.internalLen += index.keyLen + index.valueLen + 8
114 // Put appends 'put operation' of the given key/value pair to the batch.
115 // It is safe to modify the contents of the argument after Put returns but not
117 func (b *Batch) Put(key, value []byte) {
118 b.appendRec(keyTypeVal, key, value)
121 // Delete appends 'delete operation' of the given key to the batch.
122 // It is safe to modify the contents of the argument after Delete returns but
124 func (b *Batch) Delete(key []byte) {
125 b.appendRec(keyTypeDel, key, nil)
128 // Dump dumps batch contents. The returned slice can be loaded into the
129 // batch using Load method.
130 // The returned slice is not its own copy, so the contents should not be
132 func (b *Batch) Dump() []byte {
136 // Load loads given slice into the batch. Previous contents of the batch
137 // will be discarded.
138 // The given slice will not be copied and will be used as batch buffer, so
139 // it is not safe to modify the contents of the slice.
140 func (b *Batch) Load(data []byte) error {
141 return b.decode(data, -1)
144 // Replay replays batch contents.
145 func (b *Batch) Replay(r BatchReplay) error {
146 for _, index := range b.index {
147 switch index.keyType {
149 r.Put(index.k(b.data), index.v(b.data))
151 r.Delete(index.k(b.data))
157 // Len returns number of records in the batch.
158 func (b *Batch) Len() int {
162 // Reset resets the batch.
163 func (b *Batch) Reset() {
165 b.index = b.index[:0]
169 func (b *Batch) replayInternal(fn func(i int, kt keyType, k, v []byte) error) error {
170 for i, index := range b.index {
171 if err := fn(i, index.keyType, index.k(b.data), index.v(b.data)); err != nil {
178 func (b *Batch) append(p *Batch) {
181 b.data = append(b.data, p.data...)
182 b.index = append(b.index, p.index...)
183 b.internalLen += p.internalLen
185 // Updating index offset.
187 for ; oi < len(b.index); oi++ {
188 index := &b.index[oi]
190 if index.valueLen != 0 {
197 func (b *Batch) decode(data []byte, expectedLen int) error {
199 b.index = b.index[:0]
201 err := decodeBatch(data, func(i int, index batchIndex) error {
202 b.index = append(b.index, index)
203 b.internalLen += index.keyLen + index.valueLen + 8
209 if expectedLen >= 0 && len(b.index) != expectedLen {
210 return newErrBatchCorrupted(fmt.Sprintf("invalid records length: %d vs %d", expectedLen, len(b.index)))
215 func (b *Batch) putMem(seq uint64, mdb *memdb.DB) error {
217 for i, index := range b.index {
218 ik = makeInternalKey(ik, index.k(b.data), seq+uint64(i), index.keyType)
219 if err := mdb.Put(ik, index.v(b.data)); err != nil {
226 func (b *Batch) revertMem(seq uint64, mdb *memdb.DB) error {
228 for i, index := range b.index {
229 ik = makeInternalKey(ik, index.k(b.data), seq+uint64(i), index.keyType)
230 if err := mdb.Delete(ik); err != nil {
237 func newBatch() interface{} {
241 func decodeBatch(data []byte, fn func(i int, index batchIndex) error) error {
243 for i, o := 0, 0; o < len(data); i++ {
245 index.keyType = keyType(data[o])
246 if index.keyType > keyTypeVal {
247 return newErrBatchCorrupted(fmt.Sprintf("bad record: invalid type %#x", uint(index.keyType)))
252 x, n := binary.Uvarint(data[o:])
254 if n <= 0 || o+int(x) > len(data) {
255 return newErrBatchCorrupted("bad record: invalid key length")
258 index.keyLen = int(x)
262 if index.keyType == keyTypeVal {
263 x, n = binary.Uvarint(data[o:])
265 if n <= 0 || o+int(x) > len(data) {
266 return newErrBatchCorrupted("bad record: invalid value length")
269 index.valueLen = int(x)
276 if err := fn(i, index); err != nil {
283 func decodeBatchToMem(data []byte, expectSeq uint64, mdb *memdb.DB) (seq uint64, batchLen int, err error) {
284 seq, batchLen, err = decodeBatchHeader(data)
289 return 0, 0, newErrBatchCorrupted("invalid sequence number")
291 data = data[batchHeaderLen:]
294 err = decodeBatch(data, func(i int, index batchIndex) error {
296 return newErrBatchCorrupted("invalid records length")
298 ik = makeInternalKey(ik, index.k(data), seq+uint64(i), index.keyType)
299 if err := mdb.Put(ik, index.v(data)); err != nil {
305 if err == nil && decodedLen != batchLen {
306 err = newErrBatchCorrupted(fmt.Sprintf("invalid records length: %d vs %d", batchLen, decodedLen))
311 func encodeBatchHeader(dst []byte, seq uint64, batchLen int) []byte {
312 dst = ensureBuffer(dst, batchHeaderLen)
313 binary.LittleEndian.PutUint64(dst, seq)
314 binary.LittleEndian.PutUint32(dst[8:], uint32(batchLen))
318 func decodeBatchHeader(data []byte) (seq uint64, batchLen int, err error) {
319 if len(data) < batchHeaderLen {
320 return 0, 0, newErrBatchCorrupted("too short")
323 seq = binary.LittleEndian.Uint64(data)
324 batchLen = int(binary.LittleEndian.Uint32(data[8:]))
326 return 0, 0, newErrBatchCorrupted("invalid records length")
331 func batchesLen(batches []*Batch) int {
333 for _, batch := range batches {
334 batchLen += batch.Len()
339 func writeBatchesWithHeader(wr io.Writer, batches []*Batch, seq uint64) error {
340 if _, err := wr.Write(encodeBatchHeader(nil, seq, batchesLen(batches))); err != nil {
343 for _, batch := range batches {
344 if _, err := wr.Write(batch.data); err != nil {