13 etcd "github.com/coreos/etcd/client"
17 // ErrNoKey indicates a client method needs a key but receives none.
18 ErrNoKey = errors.New("no key provided")
20 // ErrNoValue indicates a client method needs a value but receives none.
21 ErrNoValue = errors.New("no value provided")
24 // Client is a wrapper around the etcd client.
25 type Client interface {
26 // GetEntries queries the given prefix in etcd and returns a slice
27 // containing the values of all keys found, recursively, underneath that
29 GetEntries(prefix string) ([]string, error)
31 // WatchPrefix watches the given prefix in etcd for changes. When a change
32 // is detected, it will signal on the passed channel. Clients are expected
33 // to call GetEntries to update themselves with the latest set of complete
34 // values. WatchPrefix will always send an initial sentinel value on the
35 // channel after establishing the watch, to ensure that clients always
36 // receive the latest set of values. WatchPrefix will block until the
37 // context passed to the NewClient constructor is terminated.
38 WatchPrefix(prefix string, ch chan struct{})
40 // Register a service with etcd.
41 Register(s Service) error
43 // Deregister a service with etcd.
44 Deregister(s Service) error
52 // ClientOptions defines options for the etcd client. All values are optional.
53 // If any duration is not specified, a default of 3 seconds will be used.
54 type ClientOptions struct {
58 DialTimeout time.Duration
59 DialKeepAlive time.Duration
60 HeaderTimeoutPerRequest time.Duration
63 // NewClient returns Client with a connection to the named machines. It will
64 // return an error if a connection to the cluster cannot be made. The parameter
65 // machines needs to be a full URL with schemas. e.g. "http://localhost:2379"
66 // will work, but "localhost:2379" will not.
67 func NewClient(ctx context.Context, machines []string, options ClientOptions) (Client, error) {
68 if options.DialTimeout == 0 {
69 options.DialTimeout = 3 * time.Second
71 if options.DialKeepAlive == 0 {
72 options.DialKeepAlive = 3 * time.Second
74 if options.HeaderTimeoutPerRequest == 0 {
75 options.HeaderTimeoutPerRequest = 3 * time.Second
78 transport := etcd.DefaultTransport
79 if options.Cert != "" && options.Key != "" {
80 tlsCert, err := tls.LoadX509KeyPair(options.Cert, options.Key)
84 caCertCt, err := ioutil.ReadFile(options.CACert)
88 caCertPool := x509.NewCertPool()
89 caCertPool.AppendCertsFromPEM(caCertCt)
90 transport = &http.Transport{
91 TLSClientConfig: &tls.Config{
92 Certificates: []tls.Certificate{tlsCert},
95 Dial: func(network, address string) (net.Conn, error) {
97 Timeout: options.DialTimeout,
98 KeepAlive: options.DialKeepAlive,
99 }).Dial(network, address)
104 ce, err := etcd.New(etcd.Config{
106 Transport: transport,
107 HeaderTimeoutPerRequest: options.HeaderTimeoutPerRequest,
114 keysAPI: etcd.NewKeysAPI(ce),
119 // GetEntries implements the etcd Client interface.
120 func (c *client) GetEntries(key string) ([]string, error) {
121 resp, err := c.keysAPI.Get(c.ctx, key, &etcd.GetOptions{Recursive: true})
126 // Special case. Note that it's possible that len(resp.Node.Nodes) == 0 and
127 // resp.Node.Value is also empty, in which case the key is empty and we
128 // should not return any entries.
129 if len(resp.Node.Nodes) == 0 && resp.Node.Value != "" {
130 return []string{resp.Node.Value}, nil
133 entries := make([]string, len(resp.Node.Nodes))
134 for i, node := range resp.Node.Nodes {
135 entries[i] = node.Value
140 // WatchPrefix implements the etcd Client interface.
141 func (c *client) WatchPrefix(prefix string, ch chan struct{}) {
142 watch := c.keysAPI.Watcher(prefix, &etcd.WatcherOptions{AfterIndex: 0, Recursive: true})
143 ch <- struct{}{} // make sure caller invokes GetEntries
145 if _, err := watch.Next(c.ctx); err != nil {
152 func (c *client) Register(s Service) error {
161 _, err = c.keysAPI.Set(c.ctx, s.Key, s.Value, &etcd.SetOptions{
162 PrevExist: etcd.PrevIgnore,
166 _, err = c.keysAPI.Create(c.ctx, s.Key, s.Value)
171 func (c *client) Deregister(s Service) error {
175 _, err := c.keysAPI.Delete(c.ctx, s.Key, s.DeleteOptions)