2 Copyright 2012 Google Inc.
4 Licensed under the Apache License, Version 2.0 (the "License");
5 you may not use this file except in compliance with the License.
6 You may obtain a copy of the License at
8 http://www.apache.org/licenses/LICENSE-2.0
10 Unless required by applicable law or agreed to in writing, software
11 distributed under the License is distributed on an "AS IS" BASIS,
12 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 See the License for the specific language governing permissions and
14 limitations under the License.
17 // Package groupcache provides a data loading mechanism with caching
18 // and de-duplication that works across a set of peer processes.
20 // Each data Get first consults its local cache, otherwise delegates
21 // to the requested key's canonical owner, which then checks its cache
22 // or finally gets the data. In the common case, many concurrent
23 // cache misses across a set of peers for the same key result in just
34 pb "github.com/golang/groupcache/groupcachepb"
35 "github.com/golang/groupcache/lru"
36 "github.com/golang/groupcache/singleflight"
39 // A Getter loads data for a key.
40 type Getter interface {
41 // Get returns the value identified by key, populating dest.
43 // The returned data must be unversioned. That is, key must
44 // uniquely describe the loaded data, without an implicit
45 // current time, and without relying on cache expiration
47 Get(ctx Context, key string, dest Sink) error
50 // A GetterFunc implements Getter with a function.
51 type GetterFunc func(ctx Context, key string, dest Sink) error
53 func (f GetterFunc) Get(ctx Context, key string, dest Sink) error {
54 return f(ctx, key, dest)
59 groups = make(map[string]*Group)
61 initPeerServerOnce sync.Once
65 // GetGroup returns the named group previously created with NewGroup, or
66 // nil if there's no such group.
67 func GetGroup(name string) *Group {
74 // NewGroup creates a coordinated group-aware Getter from a Getter.
76 // The returned Getter tries (but does not guarantee) to run only one
77 // Get call at once for a given key across an entire set of peer
78 // processes. Concurrent callers both in the local process and in
79 // other processes receive copies of the answer once the original Get
82 // The group name must be unique for each getter.
83 func NewGroup(name string, cacheBytes int64, getter Getter) *Group {
84 return newGroup(name, cacheBytes, getter, nil)
87 // If peers is nil, the peerPicker is called via a sync.Once to initialize it.
88 func newGroup(name string, cacheBytes int64, getter Getter, peers PeerPicker) *Group {
94 initPeerServerOnce.Do(callInitPeerServer)
95 if _, dup := groups[name]; dup {
96 panic("duplicate registration of group " + name)
102 cacheBytes: cacheBytes,
103 loadGroup: &singleflight.Group{},
105 if fn := newGroupHook; fn != nil {
112 // newGroupHook, if non-nil, is called right after a new group is created.
113 var newGroupHook func(*Group)
115 // RegisterNewGroupHook registers a hook that is run each time
116 // a group is created.
117 func RegisterNewGroupHook(fn func(*Group)) {
118 if newGroupHook != nil {
119 panic("RegisterNewGroupHook called more than once")
124 // RegisterServerStart registers a hook that is run when the first
126 func RegisterServerStart(fn func()) {
127 if initPeerServer != nil {
128 panic("RegisterServerStart called more than once")
133 func callInitPeerServer() {
134 if initPeerServer != nil {
139 // A Group is a cache namespace and associated data loaded spread over
140 // a group of 1 or more machines.
146 cacheBytes int64 // limit for sum of mainCache and hotCache size
148 // mainCache is a cache of the keys for which this process
149 // (amongst its peers) is authoritative. That is, this cache
150 // contains keys which consistent hash on to this process's
154 // hotCache contains keys/values for which this peer is not
155 // authoritative (otherwise they would be in mainCache), but
156 // are popular enough to warrant mirroring in this process to
157 // avoid going over the network to fetch from a peer. Having
158 // a hotCache avoids network hotspotting, where a peer's
159 // network card could become the bottleneck on a popular key.
160 // This cache is used sparingly to maximize the total number
161 // of key/value pairs that can be stored globally.
164 // loadGroup ensures that each key is only fetched once
165 // (either locally or remotely), regardless of the number of
166 // concurrent callers.
167 loadGroup flightGroup
169 _ int32 // force Stats to be 8-byte aligned on 32-bit platforms
171 // Stats are statistics on the group.
175 // flightGroup is defined as an interface which flightgroup.Group
176 // satisfies. We define this so that we may test with an alternate
178 type flightGroup interface {
179 // Done is called when Do is done.
180 Do(key string, fn func() (interface{}, error)) (interface{}, error)
183 // Stats are per-group statistics.
185 Gets AtomicInt // any Get request, including from peers
186 CacheHits AtomicInt // either cache was good
187 PeerLoads AtomicInt // either remote load or remote cache hit (not an error)
189 Loads AtomicInt // (gets - cacheHits)
190 LoadsDeduped AtomicInt // after singleflight
191 LocalLoads AtomicInt // total good local loads
192 LocalLoadErrs AtomicInt // total bad local loads
193 ServerRequests AtomicInt // gets that came over the network from peers
196 // Name returns the name of the group.
197 func (g *Group) Name() string {
201 func (g *Group) initPeers() {
203 g.peers = getPeers(g.name)
207 func (g *Group) Get(ctx Context, key string, dest Sink) error {
208 g.peersOnce.Do(g.initPeers)
211 return errors.New("groupcache: nil dest Sink")
213 value, cacheHit := g.lookupCache(key)
216 g.Stats.CacheHits.Add(1)
217 return setSinkView(dest, value)
220 // Optimization to avoid double unmarshalling or copying: keep
221 // track of whether the dest was already populated. One caller
222 // (if local) will set this; the losers will not. The common
223 // case will likely be one caller.
224 destPopulated := false
225 value, destPopulated, err := g.load(ctx, key, dest)
232 return setSinkView(dest, value)
235 // load loads key either by invoking the getter locally or by sending it to another machine.
236 func (g *Group) load(ctx Context, key string, dest Sink) (value ByteView, destPopulated bool, err error) {
238 viewi, err := g.loadGroup.Do(key, func() (interface{}, error) {
239 // Check the cache again because singleflight can only dedup calls
240 // that overlap concurrently. It's possible for 2 concurrent
241 // requests to miss the cache, resulting in 2 load() calls. An
242 // unfortunate goroutine scheduling would result in this callback
243 // being run twice, serially. If we don't check the cache again,
244 // cache.nbytes would be incremented below even though there will
245 // be only one entry for this key.
247 // Consider the following serialized event ordering for two
248 // goroutines in which this callback gets called twice for hte
252 // 1: lookupCache("key")
253 // 2: lookupCache("key")
256 // 1: loadGroup.Do("key", fn)
258 // 2: loadGroup.Do("key", fn)
260 if value, cacheHit := g.lookupCache(key); cacheHit {
261 g.Stats.CacheHits.Add(1)
264 g.Stats.LoadsDeduped.Add(1)
267 if peer, ok := g.peers.PickPeer(key); ok {
268 value, err = g.getFromPeer(ctx, peer, key)
270 g.Stats.PeerLoads.Add(1)
273 g.Stats.PeerErrors.Add(1)
274 // TODO(bradfitz): log the peer's error? keep
275 // log of the past few for /groupcachez? It's
276 // probably boring (normal task movement), so not
277 // worth logging I imagine.
279 value, err = g.getLocally(ctx, key, dest)
281 g.Stats.LocalLoadErrs.Add(1)
284 g.Stats.LocalLoads.Add(1)
285 destPopulated = true // only one caller of load gets this return value
286 g.populateCache(key, value, &g.mainCache)
290 value = viewi.(ByteView)
295 func (g *Group) getLocally(ctx Context, key string, dest Sink) (ByteView, error) {
296 err := g.getter.Get(ctx, key, dest)
298 return ByteView{}, err
303 func (g *Group) getFromPeer(ctx Context, peer ProtoGetter, key string) (ByteView, error) {
304 req := &pb.GetRequest{
308 res := &pb.GetResponse{}
309 err := peer.Get(ctx, req, res)
311 return ByteView{}, err
313 value := ByteView{b: res.Value}
314 // TODO(bradfitz): use res.MinuteQps or something smart to
315 // conditionally populate hotCache. For now just do it some
316 // percentage of the time.
317 if rand.Intn(10) == 0 {
318 g.populateCache(key, value, &g.hotCache)
323 func (g *Group) lookupCache(key string) (value ByteView, ok bool) {
324 if g.cacheBytes <= 0 {
327 value, ok = g.mainCache.get(key)
331 value, ok = g.hotCache.get(key)
335 func (g *Group) populateCache(key string, value ByteView, cache *cache) {
336 if g.cacheBytes <= 0 {
339 cache.add(key, value)
341 // Evict items from cache(s) if necessary.
343 mainBytes := g.mainCache.bytes()
344 hotBytes := g.hotCache.bytes()
345 if mainBytes+hotBytes <= g.cacheBytes {
349 // TODO(bradfitz): this is good-enough-for-now logic.
350 // It should be something based on measurements and/or
351 // respecting the costs of different resources.
352 victim := &g.mainCache
353 if hotBytes > mainBytes/8 {
356 victim.removeOldest()
360 // CacheType represents a type of cache.
364 // The MainCache is the cache for items that this peer is the
366 MainCache CacheType = iota + 1
368 // The HotCache is the cache for items that seem popular
369 // enough to replicate to this node, even though it's not the
374 // CacheStats returns stats about the provided cache within the group.
375 func (g *Group) CacheStats(which CacheType) CacheStats {
378 return g.mainCache.stats()
380 return g.hotCache.stats()
386 // cache is a wrapper around an *lru.Cache that adds synchronization,
387 // makes values always be ByteView, and counts the size of all keys and
391 nbytes int64 // of all keys and values
394 nevict int64 // number of evictions
397 func (c *cache) stats() CacheStats {
402 Items: c.itemsLocked(),
409 func (c *cache) add(key string, value ByteView) {
414 OnEvicted: func(key lru.Key, value interface{}) {
415 val := value.(ByteView)
416 c.nbytes -= int64(len(key.(string))) + int64(val.Len())
421 c.lru.Add(key, value)
422 c.nbytes += int64(len(key)) + int64(value.Len())
425 func (c *cache) get(key string) (value ByteView, ok bool) {
432 vi, ok := c.lru.Get(key)
437 return vi.(ByteView), true
440 func (c *cache) removeOldest() {
448 func (c *cache) bytes() int64 {
454 func (c *cache) items() int64 {
457 return c.itemsLocked()
460 func (c *cache) itemsLocked() int64 {
464 return int64(c.lru.Len())
467 // An AtomicInt is an int64 to be accessed atomically.
470 // Add atomically adds n to i.
471 func (i *AtomicInt) Add(n int64) {
472 atomic.AddInt64((*int64)(i), n)
475 // Get atomically gets the value of i.
476 func (i *AtomicInt) Get() int64 {
477 return atomic.LoadInt64((*int64)(i))
480 func (i *AtomicInt) String() string {
481 return strconv.FormatInt(i.Get(), 10)
484 // CacheStats are returned by stats accessors on Group.
485 type CacheStats struct {