OSDN Git Service

new repo
[bytom/vapor.git] / vendor / github.com / syndtr / goleveldb / leveldb / batch.go
1 // Copyright (c) 2012, Suryandaru Triandana <syndtr@gmail.com>
2 // All rights reserved.
3 //
4 // Use of this source code is governed by a BSD-style license that can be
5 // found in the LICENSE file.
6
7 package leveldb
8
9 import (
10         "encoding/binary"
11         "fmt"
12         "io"
13
14         "github.com/syndtr/goleveldb/leveldb/errors"
15         "github.com/syndtr/goleveldb/leveldb/memdb"
16         "github.com/syndtr/goleveldb/leveldb/storage"
17 )
18
19 // ErrBatchCorrupted records reason of batch corruption. This error will be
20 // wrapped with errors.ErrCorrupted.
21 type ErrBatchCorrupted struct {
22         Reason string
23 }
24
25 func (e *ErrBatchCorrupted) Error() string {
26         return fmt.Sprintf("leveldb: batch corrupted: %s", e.Reason)
27 }
28
29 func newErrBatchCorrupted(reason string) error {
30         return errors.NewErrCorrupted(storage.FileDesc{}, &ErrBatchCorrupted{reason})
31 }
32
33 const (
34         batchHeaderLen = 8 + 4
35         batchGrowRec   = 3000
36         batchBufioSize = 16
37 )
38
39 // BatchReplay wraps basic batch operations.
40 type BatchReplay interface {
41         Put(key, value []byte)
42         Delete(key []byte)
43 }
44
45 type batchIndex struct {
46         keyType            keyType
47         keyPos, keyLen     int
48         valuePos, valueLen int
49 }
50
51 func (index batchIndex) k(data []byte) []byte {
52         return data[index.keyPos : index.keyPos+index.keyLen]
53 }
54
55 func (index batchIndex) v(data []byte) []byte {
56         if index.valueLen != 0 {
57                 return data[index.valuePos : index.valuePos+index.valueLen]
58         }
59         return nil
60 }
61
62 func (index batchIndex) kv(data []byte) (key, value []byte) {
63         return index.k(data), index.v(data)
64 }
65
66 // Batch is a write batch.
67 type Batch struct {
68         data  []byte
69         index []batchIndex
70
71         // internalLen is sums of key/value pair length plus 8-bytes internal key.
72         internalLen int
73 }
74
75 func (b *Batch) grow(n int) {
76         o := len(b.data)
77         if cap(b.data)-o < n {
78                 div := 1
79                 if len(b.index) > batchGrowRec {
80                         div = len(b.index) / batchGrowRec
81                 }
82                 ndata := make([]byte, o, o+n+o/div)
83                 copy(ndata, b.data)
84                 b.data = ndata
85         }
86 }
87
88 func (b *Batch) appendRec(kt keyType, key, value []byte) {
89         n := 1 + binary.MaxVarintLen32 + len(key)
90         if kt == keyTypeVal {
91                 n += binary.MaxVarintLen32 + len(value)
92         }
93         b.grow(n)
94         index := batchIndex{keyType: kt}
95         o := len(b.data)
96         data := b.data[:o+n]
97         data[o] = byte(kt)
98         o++
99         o += binary.PutUvarint(data[o:], uint64(len(key)))
100         index.keyPos = o
101         index.keyLen = len(key)
102         o += copy(data[o:], key)
103         if kt == keyTypeVal {
104                 o += binary.PutUvarint(data[o:], uint64(len(value)))
105                 index.valuePos = o
106                 index.valueLen = len(value)
107                 o += copy(data[o:], value)
108         }
109         b.data = data[:o]
110         b.index = append(b.index, index)
111         b.internalLen += index.keyLen + index.valueLen + 8
112 }
113
114 // Put appends 'put operation' of the given key/value pair to the batch.
115 // It is safe to modify the contents of the argument after Put returns but not
116 // before.
117 func (b *Batch) Put(key, value []byte) {
118         b.appendRec(keyTypeVal, key, value)
119 }
120
121 // Delete appends 'delete operation' of the given key to the batch.
122 // It is safe to modify the contents of the argument after Delete returns but
123 // not before.
124 func (b *Batch) Delete(key []byte) {
125         b.appendRec(keyTypeDel, key, nil)
126 }
127
128 // Dump dumps batch contents. The returned slice can be loaded into the
129 // batch using Load method.
130 // The returned slice is not its own copy, so the contents should not be
131 // modified.
132 func (b *Batch) Dump() []byte {
133         return b.data
134 }
135
136 // Load loads given slice into the batch. Previous contents of the batch
137 // will be discarded.
138 // The given slice will not be copied and will be used as batch buffer, so
139 // it is not safe to modify the contents of the slice.
140 func (b *Batch) Load(data []byte) error {
141         return b.decode(data, -1)
142 }
143
144 // Replay replays batch contents.
145 func (b *Batch) Replay(r BatchReplay) error {
146         for _, index := range b.index {
147                 switch index.keyType {
148                 case keyTypeVal:
149                         r.Put(index.k(b.data), index.v(b.data))
150                 case keyTypeDel:
151                         r.Delete(index.k(b.data))
152                 }
153         }
154         return nil
155 }
156
157 // Len returns number of records in the batch.
158 func (b *Batch) Len() int {
159         return len(b.index)
160 }
161
162 // Reset resets the batch.
163 func (b *Batch) Reset() {
164         b.data = b.data[:0]
165         b.index = b.index[:0]
166         b.internalLen = 0
167 }
168
169 func (b *Batch) replayInternal(fn func(i int, kt keyType, k, v []byte) error) error {
170         for i, index := range b.index {
171                 if err := fn(i, index.keyType, index.k(b.data), index.v(b.data)); err != nil {
172                         return err
173                 }
174         }
175         return nil
176 }
177
178 func (b *Batch) append(p *Batch) {
179         ob := len(b.data)
180         oi := len(b.index)
181         b.data = append(b.data, p.data...)
182         b.index = append(b.index, p.index...)
183         b.internalLen += p.internalLen
184
185         // Updating index offset.
186         if ob != 0 {
187                 for ; oi < len(b.index); oi++ {
188                         index := &b.index[oi]
189                         index.keyPos += ob
190                         if index.valueLen != 0 {
191                                 index.valuePos += ob
192                         }
193                 }
194         }
195 }
196
197 func (b *Batch) decode(data []byte, expectedLen int) error {
198         b.data = data
199         b.index = b.index[:0]
200         b.internalLen = 0
201         err := decodeBatch(data, func(i int, index batchIndex) error {
202                 b.index = append(b.index, index)
203                 b.internalLen += index.keyLen + index.valueLen + 8
204                 return nil
205         })
206         if err != nil {
207                 return err
208         }
209         if expectedLen >= 0 && len(b.index) != expectedLen {
210                 return newErrBatchCorrupted(fmt.Sprintf("invalid records length: %d vs %d", expectedLen, len(b.index)))
211         }
212         return nil
213 }
214
215 func (b *Batch) putMem(seq uint64, mdb *memdb.DB) error {
216         var ik []byte
217         for i, index := range b.index {
218                 ik = makeInternalKey(ik, index.k(b.data), seq+uint64(i), index.keyType)
219                 if err := mdb.Put(ik, index.v(b.data)); err != nil {
220                         return err
221                 }
222         }
223         return nil
224 }
225
226 func (b *Batch) revertMem(seq uint64, mdb *memdb.DB) error {
227         var ik []byte
228         for i, index := range b.index {
229                 ik = makeInternalKey(ik, index.k(b.data), seq+uint64(i), index.keyType)
230                 if err := mdb.Delete(ik); err != nil {
231                         return err
232                 }
233         }
234         return nil
235 }
236
237 func newBatch() interface{} {
238         return &Batch{}
239 }
240
241 func decodeBatch(data []byte, fn func(i int, index batchIndex) error) error {
242         var index batchIndex
243         for i, o := 0, 0; o < len(data); i++ {
244                 // Key type.
245                 index.keyType = keyType(data[o])
246                 if index.keyType > keyTypeVal {
247                         return newErrBatchCorrupted(fmt.Sprintf("bad record: invalid type %#x", uint(index.keyType)))
248                 }
249                 o++
250
251                 // Key.
252                 x, n := binary.Uvarint(data[o:])
253                 o += n
254                 if n <= 0 || o+int(x) > len(data) {
255                         return newErrBatchCorrupted("bad record: invalid key length")
256                 }
257                 index.keyPos = o
258                 index.keyLen = int(x)
259                 o += index.keyLen
260
261                 // Value.
262                 if index.keyType == keyTypeVal {
263                         x, n = binary.Uvarint(data[o:])
264                         o += n
265                         if n <= 0 || o+int(x) > len(data) {
266                                 return newErrBatchCorrupted("bad record: invalid value length")
267                         }
268                         index.valuePos = o
269                         index.valueLen = int(x)
270                         o += index.valueLen
271                 } else {
272                         index.valuePos = 0
273                         index.valueLen = 0
274                 }
275
276                 if err := fn(i, index); err != nil {
277                         return err
278                 }
279         }
280         return nil
281 }
282
283 func decodeBatchToMem(data []byte, expectSeq uint64, mdb *memdb.DB) (seq uint64, batchLen int, err error) {
284         seq, batchLen, err = decodeBatchHeader(data)
285         if err != nil {
286                 return 0, 0, err
287         }
288         if seq < expectSeq {
289                 return 0, 0, newErrBatchCorrupted("invalid sequence number")
290         }
291         data = data[batchHeaderLen:]
292         var ik []byte
293         var decodedLen int
294         err = decodeBatch(data, func(i int, index batchIndex) error {
295                 if i >= batchLen {
296                         return newErrBatchCorrupted("invalid records length")
297                 }
298                 ik = makeInternalKey(ik, index.k(data), seq+uint64(i), index.keyType)
299                 if err := mdb.Put(ik, index.v(data)); err != nil {
300                         return err
301                 }
302                 decodedLen++
303                 return nil
304         })
305         if err == nil && decodedLen != batchLen {
306                 err = newErrBatchCorrupted(fmt.Sprintf("invalid records length: %d vs %d", batchLen, decodedLen))
307         }
308         return
309 }
310
311 func encodeBatchHeader(dst []byte, seq uint64, batchLen int) []byte {
312         dst = ensureBuffer(dst, batchHeaderLen)
313         binary.LittleEndian.PutUint64(dst, seq)
314         binary.LittleEndian.PutUint32(dst[8:], uint32(batchLen))
315         return dst
316 }
317
318 func decodeBatchHeader(data []byte) (seq uint64, batchLen int, err error) {
319         if len(data) < batchHeaderLen {
320                 return 0, 0, newErrBatchCorrupted("too short")
321         }
322
323         seq = binary.LittleEndian.Uint64(data)
324         batchLen = int(binary.LittleEndian.Uint32(data[8:]))
325         if batchLen < 0 {
326                 return 0, 0, newErrBatchCorrupted("invalid records length")
327         }
328         return
329 }
330
331 func batchesLen(batches []*Batch) int {
332         batchLen := 0
333         for _, batch := range batches {
334                 batchLen += batch.Len()
335         }
336         return batchLen
337 }
338
339 func writeBatchesWithHeader(wr io.Writer, batches []*Batch, seq uint64) error {
340         if _, err := wr.Write(encodeBatchHeader(nil, seq, batchesLen(batches))); err != nil {
341                 return err
342         }
343         for _, batch := range batches {
344                 if _, err := wr.Write(batch.data); err != nil {
345                         return err
346                 }
347         }
348         return nil
349 }