+++ /dev/null
-package zk
-
-import (
- "errors"
- "net"
- "strings"
- "time"
-
- "github.com/samuel/go-zookeeper/zk"
-
- "github.com/go-kit/kit/log"
-)
-
-// DefaultACL is the default ACL to use for creating znodes.
-var (
- DefaultACL = zk.WorldACL(zk.PermAll)
- ErrInvalidCredentials = errors.New("invalid credentials provided")
- ErrClientClosed = errors.New("client service closed")
- ErrNotRegistered = errors.New("not registered")
- ErrNodeNotFound = errors.New("node not found")
-)
-
-const (
- // DefaultConnectTimeout is the default timeout to establish a connection to
- // a ZooKeeper node.
- DefaultConnectTimeout = 2 * time.Second
- // DefaultSessionTimeout is the default timeout to keep the current
- // ZooKeeper session alive during a temporary disconnect.
- DefaultSessionTimeout = 5 * time.Second
-)
-
-// Client is a wrapper around a lower level ZooKeeper client implementation.
-type Client interface {
- // GetEntries should query the provided path in ZooKeeper, place a watch on
- // it and retrieve data from its current child nodes.
- GetEntries(path string) ([]string, <-chan zk.Event, error)
- // CreateParentNodes should try to create the path in case it does not exist
- // yet on ZooKeeper.
- CreateParentNodes(path string) error
- // Register a service with ZooKeeper.
- Register(s *Service) error
- // Deregister a service with ZooKeeper.
- Deregister(s *Service) error
- // Stop should properly shutdown the client implementation
- Stop()
-}
-
-type clientConfig struct {
- logger log.Logger
- acl []zk.ACL
- credentials []byte
- connectTimeout time.Duration
- sessionTimeout time.Duration
- rootNodePayload [][]byte
- eventHandler func(zk.Event)
-}
-
-// Option functions enable friendly APIs.
-type Option func(*clientConfig) error
-
-type client struct {
- *zk.Conn
- clientConfig
- active bool
- quit chan struct{}
-}
-
-// ACL returns an Option specifying a non-default ACL for creating parent nodes.
-func ACL(acl []zk.ACL) Option {
- return func(c *clientConfig) error {
- c.acl = acl
- return nil
- }
-}
-
-// Credentials returns an Option specifying a user/password combination which
-// the client will use to authenticate itself with.
-func Credentials(user, pass string) Option {
- return func(c *clientConfig) error {
- if user == "" || pass == "" {
- return ErrInvalidCredentials
- }
- c.credentials = []byte(user + ":" + pass)
- return nil
- }
-}
-
-// ConnectTimeout returns an Option specifying a non-default connection timeout
-// when we try to establish a connection to a ZooKeeper server.
-func ConnectTimeout(t time.Duration) Option {
- return func(c *clientConfig) error {
- if t.Seconds() < 1 {
- return errors.New("invalid connect timeout (minimum value is 1 second)")
- }
- c.connectTimeout = t
- return nil
- }
-}
-
-// SessionTimeout returns an Option specifying a non-default session timeout.
-func SessionTimeout(t time.Duration) Option {
- return func(c *clientConfig) error {
- if t.Seconds() < 1 {
- return errors.New("invalid session timeout (minimum value is 1 second)")
- }
- c.sessionTimeout = t
- return nil
- }
-}
-
-// Payload returns an Option specifying non-default data values for each znode
-// created by CreateParentNodes.
-func Payload(payload [][]byte) Option {
- return func(c *clientConfig) error {
- c.rootNodePayload = payload
- return nil
- }
-}
-
-// EventHandler returns an Option specifying a callback function to handle
-// incoming zk.Event payloads (ZooKeeper connection events).
-func EventHandler(handler func(zk.Event)) Option {
- return func(c *clientConfig) error {
- c.eventHandler = handler
- return nil
- }
-}
-
-// NewClient returns a ZooKeeper client with a connection to the server cluster.
-// It will return an error if the server cluster cannot be resolved.
-func NewClient(servers []string, logger log.Logger, options ...Option) (Client, error) {
- defaultEventHandler := func(event zk.Event) {
- logger.Log("eventtype", event.Type.String(), "server", event.Server, "state", event.State.String(), "err", event.Err)
- }
- config := clientConfig{
- acl: DefaultACL,
- connectTimeout: DefaultConnectTimeout,
- sessionTimeout: DefaultSessionTimeout,
- eventHandler: defaultEventHandler,
- logger: logger,
- }
- for _, option := range options {
- if err := option(&config); err != nil {
- return nil, err
- }
- }
- // dialer overrides the default ZooKeeper library Dialer so we can configure
- // the connectTimeout. The current library has a hardcoded value of 1 second
- // and there are reports of race conditions, due to slow DNS resolvers and
- // other network latency issues.
- dialer := func(network, address string, _ time.Duration) (net.Conn, error) {
- return net.DialTimeout(network, address, config.connectTimeout)
- }
- conn, eventc, err := zk.Connect(servers, config.sessionTimeout, withLogger(logger), zk.WithDialer(dialer))
-
- if err != nil {
- return nil, err
- }
-
- if len(config.credentials) > 0 {
- err = conn.AddAuth("digest", config.credentials)
- if err != nil {
- return nil, err
- }
- }
-
- c := &client{conn, config, true, make(chan struct{})}
-
- // Start listening for incoming Event payloads and callback the set
- // eventHandler.
- go func() {
- for {
- select {
- case event := <-eventc:
- config.eventHandler(event)
- case <-c.quit:
- return
- }
- }
- }()
- return c, nil
-}
-
-// CreateParentNodes implements the ZooKeeper Client interface.
-func (c *client) CreateParentNodes(path string) error {
- if !c.active {
- return ErrClientClosed
- }
- if path[0] != '/' {
- return zk.ErrInvalidPath
- }
- payload := []byte("")
- pathString := ""
- pathNodes := strings.Split(path, "/")
- for i := 1; i < len(pathNodes); i++ {
- if i <= len(c.rootNodePayload) {
- payload = c.rootNodePayload[i-1]
- } else {
- payload = []byte("")
- }
- pathString += "/" + pathNodes[i]
- _, err := c.Create(pathString, payload, 0, c.acl)
- // not being able to create the node because it exists or not having
- // sufficient rights is not an issue. It is ok for the node to already
- // exist and/or us to only have read rights
- if err != nil && err != zk.ErrNodeExists && err != zk.ErrNoAuth {
- return err
- }
- }
- return nil
-}
-
-// GetEntries implements the ZooKeeper Client interface.
-func (c *client) GetEntries(path string) ([]string, <-chan zk.Event, error) {
- // retrieve list of child nodes for given path and add watch to path
- znodes, _, eventc, err := c.ChildrenW(path)
-
- if err != nil {
- return nil, eventc, err
- }
-
- var resp []string
- for _, znode := range znodes {
- // retrieve payload for child znode and add to response array
- if data, _, err := c.Get(path + "/" + znode); err == nil {
- resp = append(resp, string(data))
- }
- }
- return resp, eventc, nil
-}
-
-// Register implements the ZooKeeper Client interface.
-func (c *client) Register(s *Service) error {
- if s.Path[len(s.Path)-1] != '/' {
- s.Path += "/"
- }
- path := s.Path + s.Name
- if err := c.CreateParentNodes(path); err != nil {
- return err
- }
- node, err := c.CreateProtectedEphemeralSequential(path, s.Data, c.acl)
- if err != nil {
- return err
- }
- s.node = node
- return nil
-}
-
-// Deregister implements the ZooKeeper Client interface.
-func (c *client) Deregister(s *Service) error {
- if s.node == "" {
- return ErrNotRegistered
- }
- path := s.Path + s.Name
- found, stat, err := c.Exists(path)
- if err != nil {
- return err
- }
- if !found {
- return ErrNodeNotFound
- }
- if err := c.Delete(path, stat.Version); err != nil {
- return err
- }
- return nil
-}
-
-// Stop implements the ZooKeeper Client interface.
-func (c *client) Stop() {
- c.active = false
- close(c.quit)
- c.Close()
-}