1 // Copyright (c) 2012, Suryandaru Triandana <syndtr@gmail.com>
2 // All rights reserved.
4 // Use of this source code is governed by a BSD-style license that can be
5 // found in the LICENSE file.
7 // Package cache provides interface and implementation of a cache algorithms.
15 "github.com/syndtr/goleveldb/leveldb/util"
18 // Cacher provides interface to implements a caching functionality.
19 // An implementation must be safe for concurrent use.
20 type Cacher interface {
21 // Capacity returns cache capacity.
24 // SetCapacity sets cache capacity.
25 SetCapacity(capacity int)
27 // Promote promotes the 'cache node'.
30 // Ban evicts the 'cache node' and prevent subsequent 'promote'.
33 // Evict evicts the 'cache node'.
36 // EvictNS evicts 'cache node' with the given namespace.
39 // EvictAll evicts all 'cache node'.
42 // Close closes the 'cache tree'
46 // Value is a 'cacheable object'. It may implements util.Releaser, if
47 // so the the Release method will be called once object is released.
48 type Value interface{}
50 // NamespaceGetter provides convenient wrapper for namespace.
51 type NamespaceGetter struct {
56 // Get simply calls Cache.Get() method.
57 func (g *NamespaceGetter) Get(key uint64, setFunc func() (size int, value Value)) *Handle {
58 return g.Cache.Get(g.NS, key, setFunc)
61 // The hash tables implementation is based on:
62 // "Dynamic-Sized Nonblocking Hash Tables", by Yujie Liu,
63 // Kunlong Zhang, and Michael Spear.
64 // ACM Symposium on Principles of Distributed Computing, Jul 2014.
68 mOverflowThreshold = 1 << 5
69 mOverflowGrowThreshold = 1 << 7
78 func (b *mBucket) freeze() []*Node {
87 func (b *mBucket) get(r *Cache, h *mNode, hash uint32, ns, key uint64, noset bool) (done, added bool, n *Node) {
96 for _, n := range b.node {
97 if n.hash == hash && n.ns == ns && n.key == key {
98 atomic.AddInt32(&n.ref, 1)
100 return true, false, n
107 return true, false, nil
118 // Add node to bucket.
119 b.node = append(b.node, n)
124 grow := atomic.AddInt32(&r.nodes, 1) >= h.growThreshold
125 if bLen > mOverflowThreshold {
126 grow = grow || atomic.AddInt32(&h.overflow, 1) >= mOverflowGrowThreshold
130 if grow && atomic.CompareAndSwapInt32(&h.resizeInProgess, 0, 1) {
131 nhLen := len(h.buckets) << 1
133 buckets: make([]unsafe.Pointer, nhLen),
134 mask: uint32(nhLen) - 1,
135 pred: unsafe.Pointer(h),
136 growThreshold: int32(nhLen * mOverflowThreshold),
137 shrinkThreshold: int32(nhLen >> 1),
139 ok := atomic.CompareAndSwapPointer(&r.mHead, unsafe.Pointer(h), unsafe.Pointer(nh))
141 panic("BUG: failed swapping head")
149 func (b *mBucket) delete(r *Cache, h *mNode, hash uint32, ns, key uint64) (done, deleted bool) {
162 for i := range b.node {
164 if n.ns == ns && n.key == key {
165 if atomic.LoadInt32(&n.ref) == 0 {
170 if r, ok := n.value.(util.Releaser); ok {
176 // Remove node from bucket.
177 b.node = append(b.node[:i], b.node[i+1:]...)
187 for _, f := range n.onDel {
192 atomic.AddInt32(&r.size, int32(n.size)*-1)
193 shrink := atomic.AddInt32(&r.nodes, -1) < h.shrinkThreshold
194 if bLen >= mOverflowThreshold {
195 atomic.AddInt32(&h.overflow, -1)
199 if shrink && len(h.buckets) > mInitialSize && atomic.CompareAndSwapInt32(&h.resizeInProgess, 0, 1) {
200 nhLen := len(h.buckets) >> 1
202 buckets: make([]unsafe.Pointer, nhLen),
203 mask: uint32(nhLen) - 1,
204 pred: unsafe.Pointer(h),
205 growThreshold: int32(nhLen * mOverflowThreshold),
206 shrinkThreshold: int32(nhLen >> 1),
208 ok := atomic.CompareAndSwapPointer(&r.mHead, unsafe.Pointer(h), unsafe.Pointer(nh))
210 panic("BUG: failed swapping head")
220 buckets []unsafe.Pointer // []*mBucket
222 pred unsafe.Pointer // *mNode
223 resizeInProgess int32
227 shrinkThreshold int32
230 func (n *mNode) initBucket(i uint32) *mBucket {
231 if b := (*mBucket)(atomic.LoadPointer(&n.buckets[i])); b != nil {
235 p := (*mNode)(atomic.LoadPointer(&n.pred))
240 pb := (*mBucket)(atomic.LoadPointer(&p.buckets[i&p.mask]))
242 pb = p.initBucket(i & p.mask)
246 for _, x := range m {
247 if x.hash&n.mask == i {
248 node = append(node, x)
253 pb0 := (*mBucket)(atomic.LoadPointer(&p.buckets[i]))
255 pb0 = p.initBucket(i)
257 pb1 := (*mBucket)(atomic.LoadPointer(&p.buckets[i+uint32(len(n.buckets))]))
259 pb1 = p.initBucket(i + uint32(len(n.buckets)))
264 node = make([]*Node, 0, len(m0)+len(m1))
265 node = append(node, m0...)
266 node = append(node, m1...)
268 b := &mBucket{node: node}
269 if atomic.CompareAndSwapPointer(&n.buckets[i], nil, unsafe.Pointer(b)) {
270 if len(node) > mOverflowThreshold {
271 atomic.AddInt32(&n.overflow, int32(len(node)-mOverflowThreshold))
277 return (*mBucket)(atomic.LoadPointer(&n.buckets[i]))
280 func (n *mNode) initBuckets() {
281 for i := range n.buckets {
282 n.initBucket(uint32(i))
284 atomic.StorePointer(&n.pred, nil)
287 // Cache is a 'cache map'.
290 mHead unsafe.Pointer // *mNode
297 // NewCache creates a new 'cache map'. The cacher is optional and
299 func NewCache(cacher Cacher) *Cache {
301 buckets: make([]unsafe.Pointer, mInitialSize),
302 mask: mInitialSize - 1,
303 growThreshold: int32(mInitialSize * mOverflowThreshold),
306 for i := range h.buckets {
307 h.buckets[i] = unsafe.Pointer(&mBucket{})
310 mHead: unsafe.Pointer(h),
316 func (r *Cache) getBucket(hash uint32) (*mNode, *mBucket) {
317 h := (*mNode)(atomic.LoadPointer(&r.mHead))
319 b := (*mBucket)(atomic.LoadPointer(&h.buckets[i]))
326 func (r *Cache) delete(n *Node) bool {
328 h, b := r.getBucket(n.hash)
329 done, deleted := b.delete(r, h, n.hash, n.ns, n.key)
337 // Nodes returns number of 'cache node' in the map.
338 func (r *Cache) Nodes() int {
339 return int(atomic.LoadInt32(&r.nodes))
342 // Size returns sums of 'cache node' size in the map.
343 func (r *Cache) Size() int {
344 return int(atomic.LoadInt32(&r.size))
347 // Capacity returns cache capacity.
348 func (r *Cache) Capacity() int {
352 return r.cacher.Capacity()
355 // SetCapacity sets cache capacity.
356 func (r *Cache) SetCapacity(capacity int) {
358 r.cacher.SetCapacity(capacity)
362 // Get gets 'cache node' with the given namespace and key.
363 // If cache node is not found and setFunc is not nil, Get will atomically creates
364 // the 'cache node' by calling setFunc. Otherwise Get will returns nil.
366 // The returned 'cache handle' should be released after use by calling Release
368 func (r *Cache) Get(ns, key uint64, setFunc func() (size int, value Value)) *Handle {
375 hash := murmur32(ns, key, 0xf00)
377 h, b := r.getBucket(hash)
378 done, _, n := b.get(r, h, hash, ns, key, setFunc == nil)
389 n.size, n.value = setFunc()
396 atomic.AddInt32(&r.size, int32(n.size))
402 return &Handle{unsafe.Pointer(n)}
411 // Delete removes and ban 'cache node' with the given namespace and key.
412 // A banned 'cache node' will never inserted into the 'cache tree'. Ban
413 // only attributed to the particular 'cache node', so when a 'cache node'
414 // is recreated it will not be banned.
416 // If onDel is not nil, then it will be executed if such 'cache node'
417 // doesn't exist or once the 'cache node' is released.
419 // Delete return true is such 'cache node' exist.
420 func (r *Cache) Delete(ns, key uint64, onDel func()) bool {
427 hash := murmur32(ns, key, 0xf00)
429 h, b := r.getBucket(hash)
430 done, _, n := b.get(r, h, hash, ns, key, true)
435 n.onDel = append(n.onDel, onDel)
456 // Evict evicts 'cache node' with the given namespace and key. This will
457 // simply call Cacher.Evict.
459 // Evict return true is such 'cache node' exist.
460 func (r *Cache) Evict(ns, key uint64) bool {
467 hash := murmur32(ns, key, 0xf00)
469 h, b := r.getBucket(hash)
470 done, _, n := b.get(r, h, hash, ns, key, true)
487 // EvictNS evicts 'cache node' with the given namespace. This will
488 // simply call Cacher.EvictNS.
489 func (r *Cache) EvictNS(ns uint64) {
501 // EvictAll evicts all 'cache node'. This will simply call Cacher.EvictAll.
502 func (r *Cache) EvictAll() {
514 // Close closes the 'cache map' and forcefully releases all 'cache node'.
515 func (r *Cache) Close() error {
520 h := (*mNode)(r.mHead)
523 for i := range h.buckets {
524 b := (*mBucket)(h.buckets[i])
525 for _, n := range b.node {
528 if r, ok := n.value.(util.Releaser); ok {
535 for _, f := range n.onDel {
546 if err := r.cacher.Close(); err != nil {
553 // CloseWeak closes the 'cache map' and evict all 'cache node' from cacher, but
554 // unlike Close it doesn't forcefully releases 'cache node'.
555 func (r *Cache) CloseWeak() error {
565 if err := r.cacher.Close(); err != nil {
572 // Node is a 'cache node'.
586 CacheData unsafe.Pointer
589 // NS returns this 'cache node' namespace.
590 func (n *Node) NS() uint64 {
594 // Key returns this 'cache node' key.
595 func (n *Node) Key() uint64 {
599 // Size returns this 'cache node' size.
600 func (n *Node) Size() int {
604 // Value returns this 'cache node' value.
605 func (n *Node) Value() Value {
609 // Ref returns this 'cache node' ref counter.
610 func (n *Node) Ref() int32 {
611 return atomic.LoadInt32(&n.ref)
614 // GetHandle returns an handle for this 'cache node'.
615 func (n *Node) GetHandle() *Handle {
616 if atomic.AddInt32(&n.ref, 1) <= 1 {
617 panic("BUG: Node.GetHandle on zero ref")
619 return &Handle{unsafe.Pointer(n)}
622 func (n *Node) unref() {
623 if atomic.AddInt32(&n.ref, -1) == 0 {
628 func (n *Node) unrefLocked() {
629 if atomic.AddInt32(&n.ref, -1) == 0 {
638 // Handle is a 'cache handle' of a 'cache node'.
640 n unsafe.Pointer // *Node
643 // Value returns the value of the 'cache node'.
644 func (h *Handle) Value() Value {
645 n := (*Node)(atomic.LoadPointer(&h.n))
652 // Release releases this 'cache handle'.
653 // It is safe to call release multiple times.
654 func (h *Handle) Release() {
655 nPtr := atomic.LoadPointer(&h.n)
656 if nPtr != nil && atomic.CompareAndSwapPointer(&h.n, nPtr, nil) {
662 func murmur32(ns, key uint64, seed uint32) uint32 {
664 m = uint32(0x5bd1e995)
668 k1 := uint32(ns >> 32)
670 k3 := uint32(key >> 32)