OSDN Git Service

new repo
[bytom/vapor.git] / vendor / github.com / go-kit / kit / sd / etcd / client.go
1 package etcd
2
3 import (
4         "context"
5         "crypto/tls"
6         "crypto/x509"
7         "errors"
8         "io/ioutil"
9         "net"
10         "net/http"
11         "time"
12
13         etcd "github.com/coreos/etcd/client"
14 )
15
16 var (
17         // ErrNoKey indicates a client method needs a key but receives none.
18         ErrNoKey = errors.New("no key provided")
19
20         // ErrNoValue indicates a client method needs a value but receives none.
21         ErrNoValue = errors.New("no value provided")
22 )
23
24 // Client is a wrapper around the etcd client.
25 type Client interface {
26         // GetEntries queries the given prefix in etcd and returns a slice
27         // containing the values of all keys found, recursively, underneath that
28         // prefix.
29         GetEntries(prefix string) ([]string, error)
30
31         // WatchPrefix watches the given prefix in etcd for changes. When a change
32         // is detected, it will signal on the passed channel. Clients are expected
33         // to call GetEntries to update themselves with the latest set of complete
34         // values. WatchPrefix will always send an initial sentinel value on the
35         // channel after establishing the watch, to ensure that clients always
36         // receive the latest set of values. WatchPrefix will block until the
37         // context passed to the NewClient constructor is terminated.
38         WatchPrefix(prefix string, ch chan struct{})
39
40         // Register a service with etcd.
41         Register(s Service) error
42
43         // Deregister a service with etcd.
44         Deregister(s Service) error
45 }
46
47 type client struct {
48         keysAPI etcd.KeysAPI
49         ctx     context.Context
50 }
51
52 // ClientOptions defines options for the etcd client. All values are optional.
53 // If any duration is not specified, a default of 3 seconds will be used.
54 type ClientOptions struct {
55         Cert                    string
56         Key                     string
57         CACert                  string
58         DialTimeout             time.Duration
59         DialKeepAlive           time.Duration
60         HeaderTimeoutPerRequest time.Duration
61 }
62
63 // NewClient returns Client with a connection to the named machines. It will
64 // return an error if a connection to the cluster cannot be made. The parameter
65 // machines needs to be a full URL with schemas. e.g. "http://localhost:2379"
66 // will work, but "localhost:2379" will not.
67 func NewClient(ctx context.Context, machines []string, options ClientOptions) (Client, error) {
68         if options.DialTimeout == 0 {
69                 options.DialTimeout = 3 * time.Second
70         }
71         if options.DialKeepAlive == 0 {
72                 options.DialKeepAlive = 3 * time.Second
73         }
74         if options.HeaderTimeoutPerRequest == 0 {
75                 options.HeaderTimeoutPerRequest = 3 * time.Second
76         }
77
78         transport := etcd.DefaultTransport
79         if options.Cert != "" && options.Key != "" {
80                 tlsCert, err := tls.LoadX509KeyPair(options.Cert, options.Key)
81                 if err != nil {
82                         return nil, err
83                 }
84                 caCertCt, err := ioutil.ReadFile(options.CACert)
85                 if err != nil {
86                         return nil, err
87                 }
88                 caCertPool := x509.NewCertPool()
89                 caCertPool.AppendCertsFromPEM(caCertCt)
90                 transport = &http.Transport{
91                         TLSClientConfig: &tls.Config{
92                                 Certificates: []tls.Certificate{tlsCert},
93                                 RootCAs:      caCertPool,
94                         },
95                         Dial: func(network, address string) (net.Conn, error) {
96                                 return (&net.Dialer{
97                                         Timeout:   options.DialTimeout,
98                                         KeepAlive: options.DialKeepAlive,
99                                 }).Dial(network, address)
100                         },
101                 }
102         }
103
104         ce, err := etcd.New(etcd.Config{
105                 Endpoints:               machines,
106                 Transport:               transport,
107                 HeaderTimeoutPerRequest: options.HeaderTimeoutPerRequest,
108         })
109         if err != nil {
110                 return nil, err
111         }
112
113         return &client{
114                 keysAPI: etcd.NewKeysAPI(ce),
115                 ctx:     ctx,
116         }, nil
117 }
118
119 // GetEntries implements the etcd Client interface.
120 func (c *client) GetEntries(key string) ([]string, error) {
121         resp, err := c.keysAPI.Get(c.ctx, key, &etcd.GetOptions{Recursive: true})
122         if err != nil {
123                 return nil, err
124         }
125
126         // Special case. Note that it's possible that len(resp.Node.Nodes) == 0 and
127         // resp.Node.Value is also empty, in which case the key is empty and we
128         // should not return any entries.
129         if len(resp.Node.Nodes) == 0 && resp.Node.Value != "" {
130                 return []string{resp.Node.Value}, nil
131         }
132
133         entries := make([]string, len(resp.Node.Nodes))
134         for i, node := range resp.Node.Nodes {
135                 entries[i] = node.Value
136         }
137         return entries, nil
138 }
139
140 // WatchPrefix implements the etcd Client interface.
141 func (c *client) WatchPrefix(prefix string, ch chan struct{}) {
142         watch := c.keysAPI.Watcher(prefix, &etcd.WatcherOptions{AfterIndex: 0, Recursive: true})
143         ch <- struct{}{} // make sure caller invokes GetEntries
144         for {
145                 if _, err := watch.Next(c.ctx); err != nil {
146                         return
147                 }
148                 ch <- struct{}{}
149         }
150 }
151
152 func (c *client) Register(s Service) error {
153         if s.Key == "" {
154                 return ErrNoKey
155         }
156         if s.Value == "" {
157                 return ErrNoValue
158         }
159         var err error
160         if s.TTL != nil {
161                 _, err = c.keysAPI.Set(c.ctx, s.Key, s.Value, &etcd.SetOptions{
162                         PrevExist: etcd.PrevIgnore,
163                         TTL:       s.TTL.ttl,
164                 })
165         } else {
166                 _, err = c.keysAPI.Create(c.ctx, s.Key, s.Value)
167         }
168         return err
169 }
170
171 func (c *client) Deregister(s Service) error {
172         if s.Key == "" {
173                 return ErrNoKey
174         }
175         _, err := c.keysAPI.Delete(c.ctx, s.Key, s.DeleteOptions)
176         return err
177 }