8 "github.com/go-kit/kit/log"
11 // Dialer imitates net.Dial. Dialer is assumed to yield connections that are
12 // safe for use by multiple concurrent goroutines.
13 type Dialer func(network, address string) (net.Conn, error)
15 // AfterFunc imitates time.After.
16 type AfterFunc func(time.Duration) <-chan time.Time
18 // Manager manages a net.Conn.
20 // Clients provide a way to create the connection with a Dialer, network, and
21 // address. Clients should Take the connection when they want to use it, and Put
22 // back whatever error they receive from its use. When a non-nil error is Put,
23 // the connection is invalidated, and a new connection is established.
24 // Connection failures are retried after an exponential backoff.
36 // NewManager returns a connection manager using the passed Dialer, network, and
37 // address. The AfterFunc is used to control exponential backoff and retries.
38 // The logger is used to log errors; pass a log.NopLogger if you don't care to
39 // receive them. For normal use, prefer NewDefaultManager.
40 func NewManager(d Dialer, network, address string, after AfterFunc, logger log.Logger) *Manager {
48 takec: make(chan net.Conn),
49 putc: make(chan error),
55 // NewDefaultManager is a helper constructor, suitable for most normal use in
56 // real (non-test) code. It uses the real net.Dial and time.After functions.
57 func NewDefaultManager(network, address string, logger log.Logger) *Manager {
58 return NewManager(net.Dial, network, address, time.After, logger)
61 // Take yields the current connection. It may be nil.
62 func (m *Manager) Take() net.Conn {
66 // Put accepts an error that came from a previously yielded connection. If the
67 // error is non-nil, the manager will invalidate the current connection and try
68 // to reconnect, with exponential backoff. Putting a nil error is a no-op.
69 func (m *Manager) Put(err error) {
73 // Write writes the passed data to the connection in a single Take/Put cycle.
74 func (m *Manager) Write(b []byte) (int, error) {
77 return 0, ErrConnectionUnavailable
79 n, err := conn.Write(b)
84 func (m *Manager) loop() {
86 conn = dial(m.dialer, m.network, m.address, m.logger) // may block slightly
87 connc = make(chan net.Conn, 1)
88 reconnectc <-chan time.Time // initially nil
92 // If the initial dial fails, we need to trigger a reconnect via the loop
93 // body, below. If we did this in a goroutine, we would race on the conn
94 // variable. So we use a buffered chan instead.
100 reconnectc = nil // one-shot
101 go func() { connc <- dial(m.dialer, m.network, m.address, m.logger) }()
106 backoff = exponential(backoff) // wait longer
107 reconnectc = m.after(backoff) // try again
110 backoff = time.Second // reset wait time
111 reconnectc = nil // no retry necessary
114 case m.takec <- conn:
116 case err := <-m.putc:
117 if err != nil && conn != nil {
118 m.logger.Log("err", err)
119 conn = nil // connection is bad
120 reconnectc = m.after(time.Nanosecond) // trigger immediately
126 func dial(d Dialer, network, address string, logger log.Logger) net.Conn {
127 conn, err := d(network, address)
129 logger.Log("err", err)
130 conn = nil // just to be sure
135 func exponential(d time.Duration) time.Duration {
143 // ErrConnectionUnavailable is returned by the Manager's Write method when the
144 // manager cannot yield a good connection.
145 var ErrConnectionUnavailable = errors.New("connection unavailable")