OSDN Git Service

Merge pull request #41 from Bytom/dev
[bytom/vapor.git] / vendor / github.com / golang / groupcache / groupcache.go
1 /*
2 Copyright 2012 Google Inc.
3
4 Licensed under the Apache License, Version 2.0 (the "License");
5 you may not use this file except in compliance with the License.
6 You may obtain a copy of the License at
7
8      http://www.apache.org/licenses/LICENSE-2.0
9
10 Unless required by applicable law or agreed to in writing, software
11 distributed under the License is distributed on an "AS IS" BASIS,
12 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 See the License for the specific language governing permissions and
14 limitations under the License.
15 */
16
17 // Package groupcache provides a data loading mechanism with caching
18 // and de-duplication that works across a set of peer processes.
19 //
20 // Each data Get first consults its local cache, otherwise delegates
21 // to the requested key's canonical owner, which then checks its cache
22 // or finally gets the data.  In the common case, many concurrent
23 // cache misses across a set of peers for the same key result in just
24 // one cache fill.
25 package groupcache
26
27 import (
28         "errors"
29         "math/rand"
30         "strconv"
31         "sync"
32         "sync/atomic"
33
34         pb "github.com/golang/groupcache/groupcachepb"
35         "github.com/golang/groupcache/lru"
36         "github.com/golang/groupcache/singleflight"
37 )
38
39 // A Getter loads data for a key.
40 type Getter interface {
41         // Get returns the value identified by key, populating dest.
42         //
43         // The returned data must be unversioned. That is, key must
44         // uniquely describe the loaded data, without an implicit
45         // current time, and without relying on cache expiration
46         // mechanisms.
47         Get(ctx Context, key string, dest Sink) error
48 }
49
50 // A GetterFunc implements Getter with a function.
51 type GetterFunc func(ctx Context, key string, dest Sink) error
52
53 func (f GetterFunc) Get(ctx Context, key string, dest Sink) error {
54         return f(ctx, key, dest)
55 }
56
57 var (
58         mu     sync.RWMutex
59         groups = make(map[string]*Group)
60
61         initPeerServerOnce sync.Once
62         initPeerServer     func()
63 )
64
65 // GetGroup returns the named group previously created with NewGroup, or
66 // nil if there's no such group.
67 func GetGroup(name string) *Group {
68         mu.RLock()
69         g := groups[name]
70         mu.RUnlock()
71         return g
72 }
73
74 // NewGroup creates a coordinated group-aware Getter from a Getter.
75 //
76 // The returned Getter tries (but does not guarantee) to run only one
77 // Get call at once for a given key across an entire set of peer
78 // processes. Concurrent callers both in the local process and in
79 // other processes receive copies of the answer once the original Get
80 // completes.
81 //
82 // The group name must be unique for each getter.
83 func NewGroup(name string, cacheBytes int64, getter Getter) *Group {
84         return newGroup(name, cacheBytes, getter, nil)
85 }
86
87 // If peers is nil, the peerPicker is called via a sync.Once to initialize it.
88 func newGroup(name string, cacheBytes int64, getter Getter, peers PeerPicker) *Group {
89         if getter == nil {
90                 panic("nil Getter")
91         }
92         mu.Lock()
93         defer mu.Unlock()
94         initPeerServerOnce.Do(callInitPeerServer)
95         if _, dup := groups[name]; dup {
96                 panic("duplicate registration of group " + name)
97         }
98         g := &Group{
99                 name:       name,
100                 getter:     getter,
101                 peers:      peers,
102                 cacheBytes: cacheBytes,
103                 loadGroup:  &singleflight.Group{},
104         }
105         if fn := newGroupHook; fn != nil {
106                 fn(g)
107         }
108         groups[name] = g
109         return g
110 }
111
112 // newGroupHook, if non-nil, is called right after a new group is created.
113 var newGroupHook func(*Group)
114
115 // RegisterNewGroupHook registers a hook that is run each time
116 // a group is created.
117 func RegisterNewGroupHook(fn func(*Group)) {
118         if newGroupHook != nil {
119                 panic("RegisterNewGroupHook called more than once")
120         }
121         newGroupHook = fn
122 }
123
124 // RegisterServerStart registers a hook that is run when the first
125 // group is created.
126 func RegisterServerStart(fn func()) {
127         if initPeerServer != nil {
128                 panic("RegisterServerStart called more than once")
129         }
130         initPeerServer = fn
131 }
132
133 func callInitPeerServer() {
134         if initPeerServer != nil {
135                 initPeerServer()
136         }
137 }
138
139 // A Group is a cache namespace and associated data loaded spread over
140 // a group of 1 or more machines.
141 type Group struct {
142         name       string
143         getter     Getter
144         peersOnce  sync.Once
145         peers      PeerPicker
146         cacheBytes int64 // limit for sum of mainCache and hotCache size
147
148         // mainCache is a cache of the keys for which this process
149         // (amongst its peers) is authoritative. That is, this cache
150         // contains keys which consistent hash on to this process's
151         // peer number.
152         mainCache cache
153
154         // hotCache contains keys/values for which this peer is not
155         // authoritative (otherwise they would be in mainCache), but
156         // are popular enough to warrant mirroring in this process to
157         // avoid going over the network to fetch from a peer.  Having
158         // a hotCache avoids network hotspotting, where a peer's
159         // network card could become the bottleneck on a popular key.
160         // This cache is used sparingly to maximize the total number
161         // of key/value pairs that can be stored globally.
162         hotCache cache
163
164         // loadGroup ensures that each key is only fetched once
165         // (either locally or remotely), regardless of the number of
166         // concurrent callers.
167         loadGroup flightGroup
168
169         _ int32 // force Stats to be 8-byte aligned on 32-bit platforms
170
171         // Stats are statistics on the group.
172         Stats Stats
173 }
174
175 // flightGroup is defined as an interface which flightgroup.Group
176 // satisfies.  We define this so that we may test with an alternate
177 // implementation.
178 type flightGroup interface {
179         // Done is called when Do is done.
180         Do(key string, fn func() (interface{}, error)) (interface{}, error)
181 }
182
183 // Stats are per-group statistics.
184 type Stats struct {
185         Gets           AtomicInt // any Get request, including from peers
186         CacheHits      AtomicInt // either cache was good
187         PeerLoads      AtomicInt // either remote load or remote cache hit (not an error)
188         PeerErrors     AtomicInt
189         Loads          AtomicInt // (gets - cacheHits)
190         LoadsDeduped   AtomicInt // after singleflight
191         LocalLoads     AtomicInt // total good local loads
192         LocalLoadErrs  AtomicInt // total bad local loads
193         ServerRequests AtomicInt // gets that came over the network from peers
194 }
195
196 // Name returns the name of the group.
197 func (g *Group) Name() string {
198         return g.name
199 }
200
201 func (g *Group) initPeers() {
202         if g.peers == nil {
203                 g.peers = getPeers(g.name)
204         }
205 }
206
207 func (g *Group) Get(ctx Context, key string, dest Sink) error {
208         g.peersOnce.Do(g.initPeers)
209         g.Stats.Gets.Add(1)
210         if dest == nil {
211                 return errors.New("groupcache: nil dest Sink")
212         }
213         value, cacheHit := g.lookupCache(key)
214
215         if cacheHit {
216                 g.Stats.CacheHits.Add(1)
217                 return setSinkView(dest, value)
218         }
219
220         // Optimization to avoid double unmarshalling or copying: keep
221         // track of whether the dest was already populated. One caller
222         // (if local) will set this; the losers will not. The common
223         // case will likely be one caller.
224         destPopulated := false
225         value, destPopulated, err := g.load(ctx, key, dest)
226         if err != nil {
227                 return err
228         }
229         if destPopulated {
230                 return nil
231         }
232         return setSinkView(dest, value)
233 }
234
235 // load loads key either by invoking the getter locally or by sending it to another machine.
236 func (g *Group) load(ctx Context, key string, dest Sink) (value ByteView, destPopulated bool, err error) {
237         g.Stats.Loads.Add(1)
238         viewi, err := g.loadGroup.Do(key, func() (interface{}, error) {
239                 // Check the cache again because singleflight can only dedup calls
240                 // that overlap concurrently.  It's possible for 2 concurrent
241                 // requests to miss the cache, resulting in 2 load() calls.  An
242                 // unfortunate goroutine scheduling would result in this callback
243                 // being run twice, serially.  If we don't check the cache again,
244                 // cache.nbytes would be incremented below even though there will
245                 // be only one entry for this key.
246                 //
247                 // Consider the following serialized event ordering for two
248                 // goroutines in which this callback gets called twice for hte
249                 // same key:
250                 // 1: Get("key")
251                 // 2: Get("key")
252                 // 1: lookupCache("key")
253                 // 2: lookupCache("key")
254                 // 1: load("key")
255                 // 2: load("key")
256                 // 1: loadGroup.Do("key", fn)
257                 // 1: fn()
258                 // 2: loadGroup.Do("key", fn)
259                 // 2: fn()
260                 if value, cacheHit := g.lookupCache(key); cacheHit {
261                         g.Stats.CacheHits.Add(1)
262                         return value, nil
263                 }
264                 g.Stats.LoadsDeduped.Add(1)
265                 var value ByteView
266                 var err error
267                 if peer, ok := g.peers.PickPeer(key); ok {
268                         value, err = g.getFromPeer(ctx, peer, key)
269                         if err == nil {
270                                 g.Stats.PeerLoads.Add(1)
271                                 return value, nil
272                         }
273                         g.Stats.PeerErrors.Add(1)
274                         // TODO(bradfitz): log the peer's error? keep
275                         // log of the past few for /groupcachez?  It's
276                         // probably boring (normal task movement), so not
277                         // worth logging I imagine.
278                 }
279                 value, err = g.getLocally(ctx, key, dest)
280                 if err != nil {
281                         g.Stats.LocalLoadErrs.Add(1)
282                         return nil, err
283                 }
284                 g.Stats.LocalLoads.Add(1)
285                 destPopulated = true // only one caller of load gets this return value
286                 g.populateCache(key, value, &g.mainCache)
287                 return value, nil
288         })
289         if err == nil {
290                 value = viewi.(ByteView)
291         }
292         return
293 }
294
295 func (g *Group) getLocally(ctx Context, key string, dest Sink) (ByteView, error) {
296         err := g.getter.Get(ctx, key, dest)
297         if err != nil {
298                 return ByteView{}, err
299         }
300         return dest.view()
301 }
302
303 func (g *Group) getFromPeer(ctx Context, peer ProtoGetter, key string) (ByteView, error) {
304         req := &pb.GetRequest{
305                 Group: &g.name,
306                 Key:   &key,
307         }
308         res := &pb.GetResponse{}
309         err := peer.Get(ctx, req, res)
310         if err != nil {
311                 return ByteView{}, err
312         }
313         value := ByteView{b: res.Value}
314         // TODO(bradfitz): use res.MinuteQps or something smart to
315         // conditionally populate hotCache.  For now just do it some
316         // percentage of the time.
317         if rand.Intn(10) == 0 {
318                 g.populateCache(key, value, &g.hotCache)
319         }
320         return value, nil
321 }
322
323 func (g *Group) lookupCache(key string) (value ByteView, ok bool) {
324         if g.cacheBytes <= 0 {
325                 return
326         }
327         value, ok = g.mainCache.get(key)
328         if ok {
329                 return
330         }
331         value, ok = g.hotCache.get(key)
332         return
333 }
334
335 func (g *Group) populateCache(key string, value ByteView, cache *cache) {
336         if g.cacheBytes <= 0 {
337                 return
338         }
339         cache.add(key, value)
340
341         // Evict items from cache(s) if necessary.
342         for {
343                 mainBytes := g.mainCache.bytes()
344                 hotBytes := g.hotCache.bytes()
345                 if mainBytes+hotBytes <= g.cacheBytes {
346                         return
347                 }
348
349                 // TODO(bradfitz): this is good-enough-for-now logic.
350                 // It should be something based on measurements and/or
351                 // respecting the costs of different resources.
352                 victim := &g.mainCache
353                 if hotBytes > mainBytes/8 {
354                         victim = &g.hotCache
355                 }
356                 victim.removeOldest()
357         }
358 }
359
360 // CacheType represents a type of cache.
361 type CacheType int
362
363 const (
364         // The MainCache is the cache for items that this peer is the
365         // owner for.
366         MainCache CacheType = iota + 1
367
368         // The HotCache is the cache for items that seem popular
369         // enough to replicate to this node, even though it's not the
370         // owner.
371         HotCache
372 )
373
374 // CacheStats returns stats about the provided cache within the group.
375 func (g *Group) CacheStats(which CacheType) CacheStats {
376         switch which {
377         case MainCache:
378                 return g.mainCache.stats()
379         case HotCache:
380                 return g.hotCache.stats()
381         default:
382                 return CacheStats{}
383         }
384 }
385
386 // cache is a wrapper around an *lru.Cache that adds synchronization,
387 // makes values always be ByteView, and counts the size of all keys and
388 // values.
389 type cache struct {
390         mu         sync.RWMutex
391         nbytes     int64 // of all keys and values
392         lru        *lru.Cache
393         nhit, nget int64
394         nevict     int64 // number of evictions
395 }
396
397 func (c *cache) stats() CacheStats {
398         c.mu.RLock()
399         defer c.mu.RUnlock()
400         return CacheStats{
401                 Bytes:     c.nbytes,
402                 Items:     c.itemsLocked(),
403                 Gets:      c.nget,
404                 Hits:      c.nhit,
405                 Evictions: c.nevict,
406         }
407 }
408
409 func (c *cache) add(key string, value ByteView) {
410         c.mu.Lock()
411         defer c.mu.Unlock()
412         if c.lru == nil {
413                 c.lru = &lru.Cache{
414                         OnEvicted: func(key lru.Key, value interface{}) {
415                                 val := value.(ByteView)
416                                 c.nbytes -= int64(len(key.(string))) + int64(val.Len())
417                                 c.nevict++
418                         },
419                 }
420         }
421         c.lru.Add(key, value)
422         c.nbytes += int64(len(key)) + int64(value.Len())
423 }
424
425 func (c *cache) get(key string) (value ByteView, ok bool) {
426         c.mu.Lock()
427         defer c.mu.Unlock()
428         c.nget++
429         if c.lru == nil {
430                 return
431         }
432         vi, ok := c.lru.Get(key)
433         if !ok {
434                 return
435         }
436         c.nhit++
437         return vi.(ByteView), true
438 }
439
440 func (c *cache) removeOldest() {
441         c.mu.Lock()
442         defer c.mu.Unlock()
443         if c.lru != nil {
444                 c.lru.RemoveOldest()
445         }
446 }
447
448 func (c *cache) bytes() int64 {
449         c.mu.RLock()
450         defer c.mu.RUnlock()
451         return c.nbytes
452 }
453
454 func (c *cache) items() int64 {
455         c.mu.RLock()
456         defer c.mu.RUnlock()
457         return c.itemsLocked()
458 }
459
460 func (c *cache) itemsLocked() int64 {
461         if c.lru == nil {
462                 return 0
463         }
464         return int64(c.lru.Len())
465 }
466
467 // An AtomicInt is an int64 to be accessed atomically.
468 type AtomicInt int64
469
470 // Add atomically adds n to i.
471 func (i *AtomicInt) Add(n int64) {
472         atomic.AddInt64((*int64)(i), n)
473 }
474
475 // Get atomically gets the value of i.
476 func (i *AtomicInt) Get() int64 {
477         return atomic.LoadInt64((*int64)(i))
478 }
479
480 func (i *AtomicInt) String() string {
481         return strconv.FormatInt(i.Get(), 10)
482 }
483
484 // CacheStats are returned by stats accessors on Group.
485 type CacheStats struct {
486         Bytes     int64
487         Items     int64
488         Gets      int64
489         Hits      int64
490         Evictions int64
491 }