OSDN Git Service

new repo
[bytom/vapor.git] / vendor / github.com / syndtr / goleveldb / leveldb / cache / cache.go
1 // Copyright (c) 2012, Suryandaru Triandana <syndtr@gmail.com>
2 // All rights reserved.
3 //
4 // Use of this source code is governed by a BSD-style license that can be
5 // found in the LICENSE file.
6
7 // Package cache provides interface and implementation of a cache algorithms.
8 package cache
9
10 import (
11         "sync"
12         "sync/atomic"
13         "unsafe"
14
15         "github.com/syndtr/goleveldb/leveldb/util"
16 )
17
18 // Cacher provides interface to implements a caching functionality.
19 // An implementation must be safe for concurrent use.
20 type Cacher interface {
21         // Capacity returns cache capacity.
22         Capacity() int
23
24         // SetCapacity sets cache capacity.
25         SetCapacity(capacity int)
26
27         // Promote promotes the 'cache node'.
28         Promote(n *Node)
29
30         // Ban evicts the 'cache node' and prevent subsequent 'promote'.
31         Ban(n *Node)
32
33         // Evict evicts the 'cache node'.
34         Evict(n *Node)
35
36         // EvictNS evicts 'cache node' with the given namespace.
37         EvictNS(ns uint64)
38
39         // EvictAll evicts all 'cache node'.
40         EvictAll()
41
42         // Close closes the 'cache tree'
43         Close() error
44 }
45
46 // Value is a 'cacheable object'. It may implements util.Releaser, if
47 // so the the Release method will be called once object is released.
48 type Value interface{}
49
50 // NamespaceGetter provides convenient wrapper for namespace.
51 type NamespaceGetter struct {
52         Cache *Cache
53         NS    uint64
54 }
55
56 // Get simply calls Cache.Get() method.
57 func (g *NamespaceGetter) Get(key uint64, setFunc func() (size int, value Value)) *Handle {
58         return g.Cache.Get(g.NS, key, setFunc)
59 }
60
61 // The hash tables implementation is based on:
62 // "Dynamic-Sized Nonblocking Hash Tables", by Yujie Liu,
63 // Kunlong Zhang, and Michael Spear.
64 // ACM Symposium on Principles of Distributed Computing, Jul 2014.
65
66 const (
67         mInitialSize           = 1 << 4
68         mOverflowThreshold     = 1 << 5
69         mOverflowGrowThreshold = 1 << 7
70 )
71
72 type mBucket struct {
73         mu     sync.Mutex
74         node   []*Node
75         frozen bool
76 }
77
78 func (b *mBucket) freeze() []*Node {
79         b.mu.Lock()
80         defer b.mu.Unlock()
81         if !b.frozen {
82                 b.frozen = true
83         }
84         return b.node
85 }
86
87 func (b *mBucket) get(r *Cache, h *mNode, hash uint32, ns, key uint64, noset bool) (done, added bool, n *Node) {
88         b.mu.Lock()
89
90         if b.frozen {
91                 b.mu.Unlock()
92                 return
93         }
94
95         // Scan the node.
96         for _, n := range b.node {
97                 if n.hash == hash && n.ns == ns && n.key == key {
98                         atomic.AddInt32(&n.ref, 1)
99                         b.mu.Unlock()
100                         return true, false, n
101                 }
102         }
103
104         // Get only.
105         if noset {
106                 b.mu.Unlock()
107                 return true, false, nil
108         }
109
110         // Create node.
111         n = &Node{
112                 r:    r,
113                 hash: hash,
114                 ns:   ns,
115                 key:  key,
116                 ref:  1,
117         }
118         // Add node to bucket.
119         b.node = append(b.node, n)
120         bLen := len(b.node)
121         b.mu.Unlock()
122
123         // Update counter.
124         grow := atomic.AddInt32(&r.nodes, 1) >= h.growThreshold
125         if bLen > mOverflowThreshold {
126                 grow = grow || atomic.AddInt32(&h.overflow, 1) >= mOverflowGrowThreshold
127         }
128
129         // Grow.
130         if grow && atomic.CompareAndSwapInt32(&h.resizeInProgess, 0, 1) {
131                 nhLen := len(h.buckets) << 1
132                 nh := &mNode{
133                         buckets:         make([]unsafe.Pointer, nhLen),
134                         mask:            uint32(nhLen) - 1,
135                         pred:            unsafe.Pointer(h),
136                         growThreshold:   int32(nhLen * mOverflowThreshold),
137                         shrinkThreshold: int32(nhLen >> 1),
138                 }
139                 ok := atomic.CompareAndSwapPointer(&r.mHead, unsafe.Pointer(h), unsafe.Pointer(nh))
140                 if !ok {
141                         panic("BUG: failed swapping head")
142                 }
143                 go nh.initBuckets()
144         }
145
146         return true, true, n
147 }
148
149 func (b *mBucket) delete(r *Cache, h *mNode, hash uint32, ns, key uint64) (done, deleted bool) {
150         b.mu.Lock()
151
152         if b.frozen {
153                 b.mu.Unlock()
154                 return
155         }
156
157         // Scan the node.
158         var (
159                 n    *Node
160                 bLen int
161         )
162         for i := range b.node {
163                 n = b.node[i]
164                 if n.ns == ns && n.key == key {
165                         if atomic.LoadInt32(&n.ref) == 0 {
166                                 deleted = true
167
168                                 // Call releaser.
169                                 if n.value != nil {
170                                         if r, ok := n.value.(util.Releaser); ok {
171                                                 r.Release()
172                                         }
173                                         n.value = nil
174                                 }
175
176                                 // Remove node from bucket.
177                                 b.node = append(b.node[:i], b.node[i+1:]...)
178                                 bLen = len(b.node)
179                         }
180                         break
181                 }
182         }
183         b.mu.Unlock()
184
185         if deleted {
186                 // Call OnDel.
187                 for _, f := range n.onDel {
188                         f()
189                 }
190
191                 // Update counter.
192                 atomic.AddInt32(&r.size, int32(n.size)*-1)
193                 shrink := atomic.AddInt32(&r.nodes, -1) < h.shrinkThreshold
194                 if bLen >= mOverflowThreshold {
195                         atomic.AddInt32(&h.overflow, -1)
196                 }
197
198                 // Shrink.
199                 if shrink && len(h.buckets) > mInitialSize && atomic.CompareAndSwapInt32(&h.resizeInProgess, 0, 1) {
200                         nhLen := len(h.buckets) >> 1
201                         nh := &mNode{
202                                 buckets:         make([]unsafe.Pointer, nhLen),
203                                 mask:            uint32(nhLen) - 1,
204                                 pred:            unsafe.Pointer(h),
205                                 growThreshold:   int32(nhLen * mOverflowThreshold),
206                                 shrinkThreshold: int32(nhLen >> 1),
207                         }
208                         ok := atomic.CompareAndSwapPointer(&r.mHead, unsafe.Pointer(h), unsafe.Pointer(nh))
209                         if !ok {
210                                 panic("BUG: failed swapping head")
211                         }
212                         go nh.initBuckets()
213                 }
214         }
215
216         return true, deleted
217 }
218
219 type mNode struct {
220         buckets         []unsafe.Pointer // []*mBucket
221         mask            uint32
222         pred            unsafe.Pointer // *mNode
223         resizeInProgess int32
224
225         overflow        int32
226         growThreshold   int32
227         shrinkThreshold int32
228 }
229
230 func (n *mNode) initBucket(i uint32) *mBucket {
231         if b := (*mBucket)(atomic.LoadPointer(&n.buckets[i])); b != nil {
232                 return b
233         }
234
235         p := (*mNode)(atomic.LoadPointer(&n.pred))
236         if p != nil {
237                 var node []*Node
238                 if n.mask > p.mask {
239                         // Grow.
240                         pb := (*mBucket)(atomic.LoadPointer(&p.buckets[i&p.mask]))
241                         if pb == nil {
242                                 pb = p.initBucket(i & p.mask)
243                         }
244                         m := pb.freeze()
245                         // Split nodes.
246                         for _, x := range m {
247                                 if x.hash&n.mask == i {
248                                         node = append(node, x)
249                                 }
250                         }
251                 } else {
252                         // Shrink.
253                         pb0 := (*mBucket)(atomic.LoadPointer(&p.buckets[i]))
254                         if pb0 == nil {
255                                 pb0 = p.initBucket(i)
256                         }
257                         pb1 := (*mBucket)(atomic.LoadPointer(&p.buckets[i+uint32(len(n.buckets))]))
258                         if pb1 == nil {
259                                 pb1 = p.initBucket(i + uint32(len(n.buckets)))
260                         }
261                         m0 := pb0.freeze()
262                         m1 := pb1.freeze()
263                         // Merge nodes.
264                         node = make([]*Node, 0, len(m0)+len(m1))
265                         node = append(node, m0...)
266                         node = append(node, m1...)
267                 }
268                 b := &mBucket{node: node}
269                 if atomic.CompareAndSwapPointer(&n.buckets[i], nil, unsafe.Pointer(b)) {
270                         if len(node) > mOverflowThreshold {
271                                 atomic.AddInt32(&n.overflow, int32(len(node)-mOverflowThreshold))
272                         }
273                         return b
274                 }
275         }
276
277         return (*mBucket)(atomic.LoadPointer(&n.buckets[i]))
278 }
279
280 func (n *mNode) initBuckets() {
281         for i := range n.buckets {
282                 n.initBucket(uint32(i))
283         }
284         atomic.StorePointer(&n.pred, nil)
285 }
286
287 // Cache is a 'cache map'.
288 type Cache struct {
289         mu     sync.RWMutex
290         mHead  unsafe.Pointer // *mNode
291         nodes  int32
292         size   int32
293         cacher Cacher
294         closed bool
295 }
296
297 // NewCache creates a new 'cache map'. The cacher is optional and
298 // may be nil.
299 func NewCache(cacher Cacher) *Cache {
300         h := &mNode{
301                 buckets:         make([]unsafe.Pointer, mInitialSize),
302                 mask:            mInitialSize - 1,
303                 growThreshold:   int32(mInitialSize * mOverflowThreshold),
304                 shrinkThreshold: 0,
305         }
306         for i := range h.buckets {
307                 h.buckets[i] = unsafe.Pointer(&mBucket{})
308         }
309         r := &Cache{
310                 mHead:  unsafe.Pointer(h),
311                 cacher: cacher,
312         }
313         return r
314 }
315
316 func (r *Cache) getBucket(hash uint32) (*mNode, *mBucket) {
317         h := (*mNode)(atomic.LoadPointer(&r.mHead))
318         i := hash & h.mask
319         b := (*mBucket)(atomic.LoadPointer(&h.buckets[i]))
320         if b == nil {
321                 b = h.initBucket(i)
322         }
323         return h, b
324 }
325
326 func (r *Cache) delete(n *Node) bool {
327         for {
328                 h, b := r.getBucket(n.hash)
329                 done, deleted := b.delete(r, h, n.hash, n.ns, n.key)
330                 if done {
331                         return deleted
332                 }
333         }
334         return false
335 }
336
337 // Nodes returns number of 'cache node' in the map.
338 func (r *Cache) Nodes() int {
339         return int(atomic.LoadInt32(&r.nodes))
340 }
341
342 // Size returns sums of 'cache node' size in the map.
343 func (r *Cache) Size() int {
344         return int(atomic.LoadInt32(&r.size))
345 }
346
347 // Capacity returns cache capacity.
348 func (r *Cache) Capacity() int {
349         if r.cacher == nil {
350                 return 0
351         }
352         return r.cacher.Capacity()
353 }
354
355 // SetCapacity sets cache capacity.
356 func (r *Cache) SetCapacity(capacity int) {
357         if r.cacher != nil {
358                 r.cacher.SetCapacity(capacity)
359         }
360 }
361
362 // Get gets 'cache node' with the given namespace and key.
363 // If cache node is not found and setFunc is not nil, Get will atomically creates
364 // the 'cache node' by calling setFunc. Otherwise Get will returns nil.
365 //
366 // The returned 'cache handle' should be released after use by calling Release
367 // method.
368 func (r *Cache) Get(ns, key uint64, setFunc func() (size int, value Value)) *Handle {
369         r.mu.RLock()
370         defer r.mu.RUnlock()
371         if r.closed {
372                 return nil
373         }
374
375         hash := murmur32(ns, key, 0xf00)
376         for {
377                 h, b := r.getBucket(hash)
378                 done, _, n := b.get(r, h, hash, ns, key, setFunc == nil)
379                 if done {
380                         if n != nil {
381                                 n.mu.Lock()
382                                 if n.value == nil {
383                                         if setFunc == nil {
384                                                 n.mu.Unlock()
385                                                 n.unref()
386                                                 return nil
387                                         }
388
389                                         n.size, n.value = setFunc()
390                                         if n.value == nil {
391                                                 n.size = 0
392                                                 n.mu.Unlock()
393                                                 n.unref()
394                                                 return nil
395                                         }
396                                         atomic.AddInt32(&r.size, int32(n.size))
397                                 }
398                                 n.mu.Unlock()
399                                 if r.cacher != nil {
400                                         r.cacher.Promote(n)
401                                 }
402                                 return &Handle{unsafe.Pointer(n)}
403                         }
404
405                         break
406                 }
407         }
408         return nil
409 }
410
411 // Delete removes and ban 'cache node' with the given namespace and key.
412 // A banned 'cache node' will never inserted into the 'cache tree'. Ban
413 // only attributed to the particular 'cache node', so when a 'cache node'
414 // is recreated it will not be banned.
415 //
416 // If onDel is not nil, then it will be executed if such 'cache node'
417 // doesn't exist or once the 'cache node' is released.
418 //
419 // Delete return true is such 'cache node' exist.
420 func (r *Cache) Delete(ns, key uint64, onDel func()) bool {
421         r.mu.RLock()
422         defer r.mu.RUnlock()
423         if r.closed {
424                 return false
425         }
426
427         hash := murmur32(ns, key, 0xf00)
428         for {
429                 h, b := r.getBucket(hash)
430                 done, _, n := b.get(r, h, hash, ns, key, true)
431                 if done {
432                         if n != nil {
433                                 if onDel != nil {
434                                         n.mu.Lock()
435                                         n.onDel = append(n.onDel, onDel)
436                                         n.mu.Unlock()
437                                 }
438                                 if r.cacher != nil {
439                                         r.cacher.Ban(n)
440                                 }
441                                 n.unref()
442                                 return true
443                         }
444
445                         break
446                 }
447         }
448
449         if onDel != nil {
450                 onDel()
451         }
452
453         return false
454 }
455
456 // Evict evicts 'cache node' with the given namespace and key. This will
457 // simply call Cacher.Evict.
458 //
459 // Evict return true is such 'cache node' exist.
460 func (r *Cache) Evict(ns, key uint64) bool {
461         r.mu.RLock()
462         defer r.mu.RUnlock()
463         if r.closed {
464                 return false
465         }
466
467         hash := murmur32(ns, key, 0xf00)
468         for {
469                 h, b := r.getBucket(hash)
470                 done, _, n := b.get(r, h, hash, ns, key, true)
471                 if done {
472                         if n != nil {
473                                 if r.cacher != nil {
474                                         r.cacher.Evict(n)
475                                 }
476                                 n.unref()
477                                 return true
478                         }
479
480                         break
481                 }
482         }
483
484         return false
485 }
486
487 // EvictNS evicts 'cache node' with the given namespace. This will
488 // simply call Cacher.EvictNS.
489 func (r *Cache) EvictNS(ns uint64) {
490         r.mu.RLock()
491         defer r.mu.RUnlock()
492         if r.closed {
493                 return
494         }
495
496         if r.cacher != nil {
497                 r.cacher.EvictNS(ns)
498         }
499 }
500
501 // EvictAll evicts all 'cache node'. This will simply call Cacher.EvictAll.
502 func (r *Cache) EvictAll() {
503         r.mu.RLock()
504         defer r.mu.RUnlock()
505         if r.closed {
506                 return
507         }
508
509         if r.cacher != nil {
510                 r.cacher.EvictAll()
511         }
512 }
513
514 // Close closes the 'cache map' and forcefully releases all 'cache node'.
515 func (r *Cache) Close() error {
516         r.mu.Lock()
517         if !r.closed {
518                 r.closed = true
519
520                 h := (*mNode)(r.mHead)
521                 h.initBuckets()
522
523                 for i := range h.buckets {
524                         b := (*mBucket)(h.buckets[i])
525                         for _, n := range b.node {
526                                 // Call releaser.
527                                 if n.value != nil {
528                                         if r, ok := n.value.(util.Releaser); ok {
529                                                 r.Release()
530                                         }
531                                         n.value = nil
532                                 }
533
534                                 // Call OnDel.
535                                 for _, f := range n.onDel {
536                                         f()
537                                 }
538                                 n.onDel = nil
539                         }
540                 }
541         }
542         r.mu.Unlock()
543
544         // Avoid deadlock.
545         if r.cacher != nil {
546                 if err := r.cacher.Close(); err != nil {
547                         return err
548                 }
549         }
550         return nil
551 }
552
553 // CloseWeak closes the 'cache map' and evict all 'cache node' from cacher, but
554 // unlike Close it doesn't forcefully releases 'cache node'.
555 func (r *Cache) CloseWeak() error {
556         r.mu.Lock()
557         if !r.closed {
558                 r.closed = true
559         }
560         r.mu.Unlock()
561
562         // Avoid deadlock.
563         if r.cacher != nil {
564                 r.cacher.EvictAll()
565                 if err := r.cacher.Close(); err != nil {
566                         return err
567                 }
568         }
569         return nil
570 }
571
572 // Node is a 'cache node'.
573 type Node struct {
574         r *Cache
575
576         hash    uint32
577         ns, key uint64
578
579         mu    sync.Mutex
580         size  int
581         value Value
582
583         ref   int32
584         onDel []func()
585
586         CacheData unsafe.Pointer
587 }
588
589 // NS returns this 'cache node' namespace.
590 func (n *Node) NS() uint64 {
591         return n.ns
592 }
593
594 // Key returns this 'cache node' key.
595 func (n *Node) Key() uint64 {
596         return n.key
597 }
598
599 // Size returns this 'cache node' size.
600 func (n *Node) Size() int {
601         return n.size
602 }
603
604 // Value returns this 'cache node' value.
605 func (n *Node) Value() Value {
606         return n.value
607 }
608
609 // Ref returns this 'cache node' ref counter.
610 func (n *Node) Ref() int32 {
611         return atomic.LoadInt32(&n.ref)
612 }
613
614 // GetHandle returns an handle for this 'cache node'.
615 func (n *Node) GetHandle() *Handle {
616         if atomic.AddInt32(&n.ref, 1) <= 1 {
617                 panic("BUG: Node.GetHandle on zero ref")
618         }
619         return &Handle{unsafe.Pointer(n)}
620 }
621
622 func (n *Node) unref() {
623         if atomic.AddInt32(&n.ref, -1) == 0 {
624                 n.r.delete(n)
625         }
626 }
627
628 func (n *Node) unrefLocked() {
629         if atomic.AddInt32(&n.ref, -1) == 0 {
630                 n.r.mu.RLock()
631                 if !n.r.closed {
632                         n.r.delete(n)
633                 }
634                 n.r.mu.RUnlock()
635         }
636 }
637
638 // Handle is a 'cache handle' of a 'cache node'.
639 type Handle struct {
640         n unsafe.Pointer // *Node
641 }
642
643 // Value returns the value of the 'cache node'.
644 func (h *Handle) Value() Value {
645         n := (*Node)(atomic.LoadPointer(&h.n))
646         if n != nil {
647                 return n.value
648         }
649         return nil
650 }
651
652 // Release releases this 'cache handle'.
653 // It is safe to call release multiple times.
654 func (h *Handle) Release() {
655         nPtr := atomic.LoadPointer(&h.n)
656         if nPtr != nil && atomic.CompareAndSwapPointer(&h.n, nPtr, nil) {
657                 n := (*Node)(nPtr)
658                 n.unrefLocked()
659         }
660 }
661
662 func murmur32(ns, key uint64, seed uint32) uint32 {
663         const (
664                 m = uint32(0x5bd1e995)
665                 r = 24
666         )
667
668         k1 := uint32(ns >> 32)
669         k2 := uint32(ns)
670         k3 := uint32(key >> 32)
671         k4 := uint32(key)
672
673         k1 *= m
674         k1 ^= k1 >> r
675         k1 *= m
676
677         k2 *= m
678         k2 ^= k2 >> r
679         k2 *= m
680
681         k3 *= m
682         k3 ^= k3 >> r
683         k3 *= m
684
685         k4 *= m
686         k4 ^= k4 >> r
687         k4 *= m
688
689         h := seed
690
691         h *= m
692         h ^= k1
693         h *= m
694         h ^= k2
695         h *= m
696         h ^= k3
697         h *= m
698         h ^= k4
699
700         h ^= h >> 13
701         h *= m
702         h ^= h >> 15
703
704         return h
705 }