OSDN Git Service

delete miner
[bytom/vapor.git] / vendor / github.com / go-kit / kit / sd / zk / client.go
1 package zk
2
3 import (
4         "errors"
5         "net"
6         "strings"
7         "time"
8
9         "github.com/samuel/go-zookeeper/zk"
10
11         "github.com/go-kit/kit/log"
12 )
13
14 // DefaultACL is the default ACL to use for creating znodes.
15 var (
16         DefaultACL            = zk.WorldACL(zk.PermAll)
17         ErrInvalidCredentials = errors.New("invalid credentials provided")
18         ErrClientClosed       = errors.New("client service closed")
19         ErrNotRegistered      = errors.New("not registered")
20         ErrNodeNotFound       = errors.New("node not found")
21 )
22
23 const (
24         // DefaultConnectTimeout is the default timeout to establish a connection to
25         // a ZooKeeper node.
26         DefaultConnectTimeout = 2 * time.Second
27         // DefaultSessionTimeout is the default timeout to keep the current
28         // ZooKeeper session alive during a temporary disconnect.
29         DefaultSessionTimeout = 5 * time.Second
30 )
31
32 // Client is a wrapper around a lower level ZooKeeper client implementation.
33 type Client interface {
34         // GetEntries should query the provided path in ZooKeeper, place a watch on
35         // it and retrieve data from its current child nodes.
36         GetEntries(path string) ([]string, <-chan zk.Event, error)
37         // CreateParentNodes should try to create the path in case it does not exist
38         // yet on ZooKeeper.
39         CreateParentNodes(path string) error
40         // Register a service with ZooKeeper.
41         Register(s *Service) error
42         // Deregister a service with ZooKeeper.
43         Deregister(s *Service) error
44         // Stop should properly shutdown the client implementation
45         Stop()
46 }
47
48 type clientConfig struct {
49         logger          log.Logger
50         acl             []zk.ACL
51         credentials     []byte
52         connectTimeout  time.Duration
53         sessionTimeout  time.Duration
54         rootNodePayload [][]byte
55         eventHandler    func(zk.Event)
56 }
57
58 // Option functions enable friendly APIs.
59 type Option func(*clientConfig) error
60
61 type client struct {
62         *zk.Conn
63         clientConfig
64         active bool
65         quit   chan struct{}
66 }
67
68 // ACL returns an Option specifying a non-default ACL for creating parent nodes.
69 func ACL(acl []zk.ACL) Option {
70         return func(c *clientConfig) error {
71                 c.acl = acl
72                 return nil
73         }
74 }
75
76 // Credentials returns an Option specifying a user/password combination which
77 // the client will use to authenticate itself with.
78 func Credentials(user, pass string) Option {
79         return func(c *clientConfig) error {
80                 if user == "" || pass == "" {
81                         return ErrInvalidCredentials
82                 }
83                 c.credentials = []byte(user + ":" + pass)
84                 return nil
85         }
86 }
87
88 // ConnectTimeout returns an Option specifying a non-default connection timeout
89 // when we try to establish a connection to a ZooKeeper server.
90 func ConnectTimeout(t time.Duration) Option {
91         return func(c *clientConfig) error {
92                 if t.Seconds() < 1 {
93                         return errors.New("invalid connect timeout (minimum value is 1 second)")
94                 }
95                 c.connectTimeout = t
96                 return nil
97         }
98 }
99
100 // SessionTimeout returns an Option specifying a non-default session timeout.
101 func SessionTimeout(t time.Duration) Option {
102         return func(c *clientConfig) error {
103                 if t.Seconds() < 1 {
104                         return errors.New("invalid session timeout (minimum value is 1 second)")
105                 }
106                 c.sessionTimeout = t
107                 return nil
108         }
109 }
110
111 // Payload returns an Option specifying non-default data values for each znode
112 // created by CreateParentNodes.
113 func Payload(payload [][]byte) Option {
114         return func(c *clientConfig) error {
115                 c.rootNodePayload = payload
116                 return nil
117         }
118 }
119
120 // EventHandler returns an Option specifying a callback function to handle
121 // incoming zk.Event payloads (ZooKeeper connection events).
122 func EventHandler(handler func(zk.Event)) Option {
123         return func(c *clientConfig) error {
124                 c.eventHandler = handler
125                 return nil
126         }
127 }
128
129 // NewClient returns a ZooKeeper client with a connection to the server cluster.
130 // It will return an error if the server cluster cannot be resolved.
131 func NewClient(servers []string, logger log.Logger, options ...Option) (Client, error) {
132         defaultEventHandler := func(event zk.Event) {
133                 logger.Log("eventtype", event.Type.String(), "server", event.Server, "state", event.State.String(), "err", event.Err)
134         }
135         config := clientConfig{
136                 acl:            DefaultACL,
137                 connectTimeout: DefaultConnectTimeout,
138                 sessionTimeout: DefaultSessionTimeout,
139                 eventHandler:   defaultEventHandler,
140                 logger:         logger,
141         }
142         for _, option := range options {
143                 if err := option(&config); err != nil {
144                         return nil, err
145                 }
146         }
147         // dialer overrides the default ZooKeeper library Dialer so we can configure
148         // the connectTimeout. The current library has a hardcoded value of 1 second
149         // and there are reports of race conditions, due to slow DNS resolvers and
150         // other network latency issues.
151         dialer := func(network, address string, _ time.Duration) (net.Conn, error) {
152                 return net.DialTimeout(network, address, config.connectTimeout)
153         }
154         conn, eventc, err := zk.Connect(servers, config.sessionTimeout, withLogger(logger), zk.WithDialer(dialer))
155
156         if err != nil {
157                 return nil, err
158         }
159
160         if len(config.credentials) > 0 {
161                 err = conn.AddAuth("digest", config.credentials)
162                 if err != nil {
163                         return nil, err
164                 }
165         }
166
167         c := &client{conn, config, true, make(chan struct{})}
168
169         // Start listening for incoming Event payloads and callback the set
170         // eventHandler.
171         go func() {
172                 for {
173                         select {
174                         case event := <-eventc:
175                                 config.eventHandler(event)
176                         case <-c.quit:
177                                 return
178                         }
179                 }
180         }()
181         return c, nil
182 }
183
184 // CreateParentNodes implements the ZooKeeper Client interface.
185 func (c *client) CreateParentNodes(path string) error {
186         if !c.active {
187                 return ErrClientClosed
188         }
189         if path[0] != '/' {
190                 return zk.ErrInvalidPath
191         }
192         payload := []byte("")
193         pathString := ""
194         pathNodes := strings.Split(path, "/")
195         for i := 1; i < len(pathNodes); i++ {
196                 if i <= len(c.rootNodePayload) {
197                         payload = c.rootNodePayload[i-1]
198                 } else {
199                         payload = []byte("")
200                 }
201                 pathString += "/" + pathNodes[i]
202                 _, err := c.Create(pathString, payload, 0, c.acl)
203                 // not being able to create the node because it exists or not having
204                 // sufficient rights is not an issue. It is ok for the node to already
205                 // exist and/or us to only have read rights
206                 if err != nil && err != zk.ErrNodeExists && err != zk.ErrNoAuth {
207                         return err
208                 }
209         }
210         return nil
211 }
212
213 // GetEntries implements the ZooKeeper Client interface.
214 func (c *client) GetEntries(path string) ([]string, <-chan zk.Event, error) {
215         // retrieve list of child nodes for given path and add watch to path
216         znodes, _, eventc, err := c.ChildrenW(path)
217
218         if err != nil {
219                 return nil, eventc, err
220         }
221
222         var resp []string
223         for _, znode := range znodes {
224                 // retrieve payload for child znode and add to response array
225                 if data, _, err := c.Get(path + "/" + znode); err == nil {
226                         resp = append(resp, string(data))
227                 }
228         }
229         return resp, eventc, nil
230 }
231
232 // Register implements the ZooKeeper Client interface.
233 func (c *client) Register(s *Service) error {
234         if s.Path[len(s.Path)-1] != '/' {
235                 s.Path += "/"
236         }
237         path := s.Path + s.Name
238         if err := c.CreateParentNodes(path); err != nil {
239                 return err
240         }
241         node, err := c.CreateProtectedEphemeralSequential(path, s.Data, c.acl)
242         if err != nil {
243                 return err
244         }
245         s.node = node
246         return nil
247 }
248
249 // Deregister implements the ZooKeeper Client interface.
250 func (c *client) Deregister(s *Service) error {
251         if s.node == "" {
252                 return ErrNotRegistered
253         }
254         path := s.Path + s.Name
255         found, stat, err := c.Exists(path)
256         if err != nil {
257                 return err
258         }
259         if !found {
260                 return ErrNodeNotFound
261         }
262         if err := c.Delete(path, stat.Version); err != nil {
263                 return err
264         }
265         return nil
266 }
267
268 // Stop implements the ZooKeeper Client interface.
269 func (c *client) Stop() {
270         c.active = false
271         close(c.quit)
272         c.Close()
273 }