2 Copyright 2013 Google Inc.
4 Licensed under the Apache License, Version 2.0 (the "License");
5 you may not use this file except in compliance with the License.
6 You may obtain a copy of the License at
8 http://www.apache.org/licenses/LICENSE-2.0
10 Unless required by applicable law or agreed to in writing, software
11 distributed under the License is distributed on an "AS IS" BASIS,
12 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 See the License for the specific language governing permissions and
14 limitations under the License.
28 "github.com/golang/groupcache/consistenthash"
29 pb "github.com/golang/groupcache/groupcachepb"
30 "github.com/golang/protobuf/proto"
33 const defaultBasePath = "/_groupcache/"
35 const defaultReplicas = 50
37 // HTTPPool implements PeerPicker for a pool of HTTP peers.
38 type HTTPPool struct {
39 // Context optionally specifies a context for the server to use when it
40 // receives a request.
41 // If nil, the server uses a nil Context.
42 Context func(*http.Request) Context
44 // Transport optionally specifies an http.RoundTripper for the client
45 // to use when it makes a request.
46 // If nil, the client uses http.DefaultTransport.
47 Transport func(Context) http.RoundTripper
49 // this peer's base URL, e.g. "https://example.net:8000"
52 // opts specifies the options.
55 mu sync.Mutex // guards peers and httpGetters
56 peers *consistenthash.Map
57 httpGetters map[string]*httpGetter // keyed by e.g. "http://10.0.0.2:8008"
60 // HTTPPoolOptions are the configurations of a HTTPPool.
61 type HTTPPoolOptions struct {
62 // BasePath specifies the HTTP path that will serve groupcache requests.
63 // If blank, it defaults to "/_groupcache/".
66 // Replicas specifies the number of key replicas on the consistent hash.
67 // If blank, it defaults to 50.
70 // HashFn specifies the hash function of the consistent hash.
71 // If blank, it defaults to crc32.ChecksumIEEE.
72 HashFn consistenthash.Hash
75 // NewHTTPPool initializes an HTTP pool of peers, and registers itself as a PeerPicker.
76 // For convenience, it also registers itself as an http.Handler with http.DefaultServeMux.
77 // The self argument should be a valid base URL that points to the current server,
78 // for example "http://example.net:8000".
79 func NewHTTPPool(self string) *HTTPPool {
80 p := NewHTTPPoolOpts(self, nil)
81 http.Handle(p.opts.BasePath, p)
87 // NewHTTPPoolOpts initializes an HTTP pool of peers with the given options.
88 // Unlike NewHTTPPool, this function does not register the created pool as an HTTP handler.
89 // The returned *HTTPPool implements http.Handler and must be registered using http.Handle.
90 func NewHTTPPoolOpts(self string, o *HTTPPoolOptions) *HTTPPool {
92 panic("groupcache: NewHTTPPool must be called only once")
98 httpGetters: make(map[string]*httpGetter),
103 if p.opts.BasePath == "" {
104 p.opts.BasePath = defaultBasePath
106 if p.opts.Replicas == 0 {
107 p.opts.Replicas = defaultReplicas
109 p.peers = consistenthash.New(p.opts.Replicas, p.opts.HashFn)
111 RegisterPeerPicker(func() PeerPicker { return p })
115 // Set updates the pool's list of peers.
116 // Each peer value should be a valid base URL,
117 // for example "http://example.net:8000".
118 func (p *HTTPPool) Set(peers ...string) {
121 p.peers = consistenthash.New(p.opts.Replicas, p.opts.HashFn)
122 p.peers.Add(peers...)
123 p.httpGetters = make(map[string]*httpGetter, len(peers))
124 for _, peer := range peers {
125 p.httpGetters[peer] = &httpGetter{transport: p.Transport, baseURL: peer + p.opts.BasePath}
129 func (p *HTTPPool) PickPeer(key string) (ProtoGetter, bool) {
132 if p.peers.IsEmpty() {
135 if peer := p.peers.Get(key); peer != p.self {
136 return p.httpGetters[peer], true
141 func (p *HTTPPool) ServeHTTP(w http.ResponseWriter, r *http.Request) {
143 if !strings.HasPrefix(r.URL.Path, p.opts.BasePath) {
144 panic("HTTPPool serving unexpected path: " + r.URL.Path)
146 parts := strings.SplitN(r.URL.Path[len(p.opts.BasePath):], "/", 2)
148 http.Error(w, "bad request", http.StatusBadRequest)
151 groupName := parts[0]
154 // Fetch the value for this group/key.
155 group := GetGroup(groupName)
157 http.Error(w, "no such group: "+groupName, http.StatusNotFound)
161 if p.Context != nil {
165 group.Stats.ServerRequests.Add(1)
167 err := group.Get(ctx, key, AllocatingByteSliceSink(&value))
169 http.Error(w, err.Error(), http.StatusInternalServerError)
173 // Write the value to the response body as a proto message.
174 body, err := proto.Marshal(&pb.GetResponse{Value: value})
176 http.Error(w, err.Error(), http.StatusInternalServerError)
179 w.Header().Set("Content-Type", "application/x-protobuf")
183 type httpGetter struct {
184 transport func(Context) http.RoundTripper
188 var bufferPool = sync.Pool{
189 New: func() interface{} { return new(bytes.Buffer) },
192 func (h *httpGetter) Get(context Context, in *pb.GetRequest, out *pb.GetResponse) error {
196 url.QueryEscape(in.GetGroup()),
197 url.QueryEscape(in.GetKey()),
199 req, err := http.NewRequest("GET", u, nil)
203 tr := http.DefaultTransport
204 if h.transport != nil {
205 tr = h.transport(context)
207 res, err := tr.RoundTrip(req)
211 defer res.Body.Close()
212 if res.StatusCode != http.StatusOK {
213 return fmt.Errorf("server returned: %v", res.Status)
215 b := bufferPool.Get().(*bytes.Buffer)
217 defer bufferPool.Put(b)
218 _, err = io.Copy(b, res.Body)
220 return fmt.Errorf("reading response body: %v", err)
222 err = proto.Unmarshal(b.Bytes(), out)
224 return fmt.Errorf("decoding response body: %v", err)