9 "github.com/samuel/go-zookeeper/zk"
11 "github.com/go-kit/kit/log"
14 // DefaultACL is the default ACL to use for creating znodes.
16 DefaultACL = zk.WorldACL(zk.PermAll)
17 ErrInvalidCredentials = errors.New("invalid credentials provided")
18 ErrClientClosed = errors.New("client service closed")
19 ErrNotRegistered = errors.New("not registered")
20 ErrNodeNotFound = errors.New("node not found")
24 // DefaultConnectTimeout is the default timeout to establish a connection to
26 DefaultConnectTimeout = 2 * time.Second
27 // DefaultSessionTimeout is the default timeout to keep the current
28 // ZooKeeper session alive during a temporary disconnect.
29 DefaultSessionTimeout = 5 * time.Second
32 // Client is a wrapper around a lower level ZooKeeper client implementation.
33 type Client interface {
34 // GetEntries should query the provided path in ZooKeeper, place a watch on
35 // it and retrieve data from its current child nodes.
36 GetEntries(path string) ([]string, <-chan zk.Event, error)
37 // CreateParentNodes should try to create the path in case it does not exist
39 CreateParentNodes(path string) error
40 // Register a service with ZooKeeper.
41 Register(s *Service) error
42 // Deregister a service with ZooKeeper.
43 Deregister(s *Service) error
44 // Stop should properly shutdown the client implementation
48 type clientConfig struct {
52 connectTimeout time.Duration
53 sessionTimeout time.Duration
54 rootNodePayload [][]byte
55 eventHandler func(zk.Event)
58 // Option functions enable friendly APIs.
59 type Option func(*clientConfig) error
68 // ACL returns an Option specifying a non-default ACL for creating parent nodes.
69 func ACL(acl []zk.ACL) Option {
70 return func(c *clientConfig) error {
76 // Credentials returns an Option specifying a user/password combination which
77 // the client will use to authenticate itself with.
78 func Credentials(user, pass string) Option {
79 return func(c *clientConfig) error {
80 if user == "" || pass == "" {
81 return ErrInvalidCredentials
83 c.credentials = []byte(user + ":" + pass)
88 // ConnectTimeout returns an Option specifying a non-default connection timeout
89 // when we try to establish a connection to a ZooKeeper server.
90 func ConnectTimeout(t time.Duration) Option {
91 return func(c *clientConfig) error {
93 return errors.New("invalid connect timeout (minimum value is 1 second)")
100 // SessionTimeout returns an Option specifying a non-default session timeout.
101 func SessionTimeout(t time.Duration) Option {
102 return func(c *clientConfig) error {
104 return errors.New("invalid session timeout (minimum value is 1 second)")
111 // Payload returns an Option specifying non-default data values for each znode
112 // created by CreateParentNodes.
113 func Payload(payload [][]byte) Option {
114 return func(c *clientConfig) error {
115 c.rootNodePayload = payload
120 // EventHandler returns an Option specifying a callback function to handle
121 // incoming zk.Event payloads (ZooKeeper connection events).
122 func EventHandler(handler func(zk.Event)) Option {
123 return func(c *clientConfig) error {
124 c.eventHandler = handler
129 // NewClient returns a ZooKeeper client with a connection to the server cluster.
130 // It will return an error if the server cluster cannot be resolved.
131 func NewClient(servers []string, logger log.Logger, options ...Option) (Client, error) {
132 defaultEventHandler := func(event zk.Event) {
133 logger.Log("eventtype", event.Type.String(), "server", event.Server, "state", event.State.String(), "err", event.Err)
135 config := clientConfig{
137 connectTimeout: DefaultConnectTimeout,
138 sessionTimeout: DefaultSessionTimeout,
139 eventHandler: defaultEventHandler,
142 for _, option := range options {
143 if err := option(&config); err != nil {
147 // dialer overrides the default ZooKeeper library Dialer so we can configure
148 // the connectTimeout. The current library has a hardcoded value of 1 second
149 // and there are reports of race conditions, due to slow DNS resolvers and
150 // other network latency issues.
151 dialer := func(network, address string, _ time.Duration) (net.Conn, error) {
152 return net.DialTimeout(network, address, config.connectTimeout)
154 conn, eventc, err := zk.Connect(servers, config.sessionTimeout, withLogger(logger), zk.WithDialer(dialer))
160 if len(config.credentials) > 0 {
161 err = conn.AddAuth("digest", config.credentials)
167 c := &client{conn, config, true, make(chan struct{})}
169 // Start listening for incoming Event payloads and callback the set
174 case event := <-eventc:
175 config.eventHandler(event)
184 // CreateParentNodes implements the ZooKeeper Client interface.
185 func (c *client) CreateParentNodes(path string) error {
187 return ErrClientClosed
190 return zk.ErrInvalidPath
192 payload := []byte("")
194 pathNodes := strings.Split(path, "/")
195 for i := 1; i < len(pathNodes); i++ {
196 if i <= len(c.rootNodePayload) {
197 payload = c.rootNodePayload[i-1]
201 pathString += "/" + pathNodes[i]
202 _, err := c.Create(pathString, payload, 0, c.acl)
203 // not being able to create the node because it exists or not having
204 // sufficient rights is not an issue. It is ok for the node to already
205 // exist and/or us to only have read rights
206 if err != nil && err != zk.ErrNodeExists && err != zk.ErrNoAuth {
213 // GetEntries implements the ZooKeeper Client interface.
214 func (c *client) GetEntries(path string) ([]string, <-chan zk.Event, error) {
215 // retrieve list of child nodes for given path and add watch to path
216 znodes, _, eventc, err := c.ChildrenW(path)
219 return nil, eventc, err
223 for _, znode := range znodes {
224 // retrieve payload for child znode and add to response array
225 if data, _, err := c.Get(path + "/" + znode); err == nil {
226 resp = append(resp, string(data))
229 return resp, eventc, nil
232 // Register implements the ZooKeeper Client interface.
233 func (c *client) Register(s *Service) error {
234 if s.Path[len(s.Path)-1] != '/' {
237 path := s.Path + s.Name
238 if err := c.CreateParentNodes(path); err != nil {
241 node, err := c.CreateProtectedEphemeralSequential(path, s.Data, c.acl)
249 // Deregister implements the ZooKeeper Client interface.
250 func (c *client) Deregister(s *Service) error {
252 return ErrNotRegistered
254 path := s.Path + s.Name
255 found, stat, err := c.Exists(path)
260 return ErrNodeNotFound
262 if err := c.Delete(path, stat.Version); err != nil {
268 // Stop implements the ZooKeeper Client interface.
269 func (c *client) Stop() {