1 // Copyright (c) 2016 The btcsuite developers
2 // Use of this source code is governed by an ISC
3 // license that can be found in the LICENSE file.
16 // maxFailedAttempts is the maximum number of successive failed connection
17 // attempts after which network failure is assumed and new connections will
18 // be delayed by the configured retry duration.
19 const maxFailedAttempts = 25
22 //ErrDialNil is used to indicate that Dial cannot be nil in the configuration.
23 ErrDialNil = errors.New("Config: Dial cannot be nil")
25 // maxRetryDuration is the max duration of time retrying of a persistent
26 // connection is allowed to grow to. This is necessary since the retry
27 // logic uses a backoff mechanism which increases the interval base times
28 // the number of retries that have been done.
29 maxRetryDuration = time.Minute * 5
31 // defaultRetryDuration is the default duration of time for retrying
32 // persistent connections.
33 defaultRetryDuration = time.Second * 5
35 // defaultTargetOutbound is the default number of outbound connections to
37 defaultTargetOutbound = uint32(8)
40 // ConnState represents the state of the requested connection.
43 // ConnState can be either pending, established, disconnected or failed. When
44 // a new connection is requested, it is attempted and categorized as
45 // established or failed depending on the connection result. An established
46 // connection which was disconnected is categorized as disconnected.
48 ConnPending ConnState = iota
54 // ConnReq is the connection request to a network address. If permanent, the
55 // connection will be retried on disconnection.
57 // The following variables must only be used atomically.
69 // updateState updates the state of the connection request.
70 func (c *ConnReq) updateState(state ConnState) {
76 // ID returns a unique identifier for the connection request.
77 func (c *ConnReq) ID() uint64 {
78 return atomic.LoadUint64(&c.id)
81 // State is the connection state of the requested connection.
82 func (c *ConnReq) State() ConnState {
89 // String returns a human-readable string for the connection request.
90 func (c *ConnReq) String() string {
91 if c.Addr.String() == "" {
92 return fmt.Sprintf("reqid %d", atomic.LoadUint64(&c.id))
94 return fmt.Sprintf("%s (reqid %d)", c.Addr, atomic.LoadUint64(&c.id))
97 // Config holds the configuration options related to the connection manager.
99 // Listeners defines a slice of listeners for which the connection
100 // manager will take ownership of and accept connections. When a
101 // connection is accepted, the OnAccept handler will be invoked with the
102 // connection. Since the connection manager takes ownership of these
103 // listeners, they will be closed when the connection manager is
106 // This field will not have any effect if the OnAccept field is not
107 // also specified. It may be nil if the caller does not wish to listen
108 // for incoming connections.
109 Listeners []net.Listener
111 // OnAccept is a callback that is fired when an inbound connection is
112 // accepted. It is the caller's responsibility to close the connection.
113 // Failure to close the connection will result in the connection manager
114 // believing the connection is still active and thus have undesirable
115 // side effects such as still counting toward maximum connection limits.
117 // This field will not have any effect if the Listeners field is not
118 // also specified since there couldn't possibly be any accepted
119 // connections in that case.
120 OnAccept func(net.Conn)
122 // TargetOutbound is the number of outbound network connections to
123 // maintain. Defaults to 8.
124 TargetOutbound uint32
126 // RetryDuration is the duration to wait before retrying connection
127 // requests. Defaults to 5s.
128 RetryDuration time.Duration
130 // OnConnection is a callback that is fired when a new outbound
131 // connection is established.
132 OnConnection func(*ConnReq, net.Conn)
134 // OnDisconnection is a callback that is fired when an outbound
135 // connection is disconnected.
136 OnDisconnection func(*ConnReq)
138 // GetNewAddress is a way to get an address to make a network connection
139 // to. If nil, no new connections will be made automatically.
140 GetNewAddress func() (net.Addr, error)
142 // Dial connects to the address on the named network. It cannot be nil.
143 Dial func(net.Addr) (net.Conn, error)
146 // handleConnected is used to queue a successful connection.
147 type handleConnected struct {
152 // handleDisconnected is used to remove a connection.
153 type handleDisconnected struct {
158 // handleFailed is used to remove a pending connection.
159 type handleFailed struct {
164 // ConnManager provides a manager to handle network connections.
165 type ConnManager struct {
166 // The following variables must only be used atomically.
173 failedAttempts uint64
174 requests chan interface{}
178 // handleFailedConn handles a connection failed due to a disconnect or any
179 // other failure. If permanent, it retries the connection after the configured
180 // retry duration. Otherwise, if required, it makes a new connection request.
181 // After maxFailedConnectionAttempts new connections will be retried after the
182 // configured retry duration.
183 func (cm *ConnManager) handleFailedConn(c *ConnReq) {
184 if atomic.LoadInt32(&cm.stop) != 0 {
189 d := time.Duration(c.retryCount) * cm.cfg.RetryDuration
190 if d > maxRetryDuration {
193 log.Debugf("Retrying connection to %v in %v", c, d)
194 time.AfterFunc(d, func() {
197 } else if cm.cfg.GetNewAddress != nil {
199 if cm.failedAttempts >= maxFailedAttempts {
200 log.Debugf("Max failed connection attempts reached: [%d] "+
201 "-- retrying connection in: %v", maxFailedAttempts,
202 cm.cfg.RetryDuration)
203 time.AfterFunc(cm.cfg.RetryDuration, func() {
212 // connHandler handles all connection related requests. It must be run as a
215 // The connection handler makes sure that we maintain a pool of active outbound
216 // connections so that we remain connected to the network. Connection requests
217 // are processed and mapped by their assigned ids.
218 func (cm *ConnManager) connHandler() {
219 conns := make(map[uint64]*ConnReq, cm.cfg.TargetOutbound)
223 case req := <-cm.requests:
224 switch msg := req.(type) {
226 case handleConnected:
228 connReq.updateState(ConnEstablished)
229 connReq.conn = msg.conn
230 conns[connReq.id] = connReq
231 log.Debugf("Connected to %v", connReq)
232 connReq.retryCount = 0
233 cm.failedAttempts = 0
235 if cm.cfg.OnConnection != nil {
236 go cm.cfg.OnConnection(connReq, msg.conn)
239 case handleDisconnected:
240 if connReq, ok := conns[msg.id]; ok {
241 connReq.updateState(ConnDisconnected)
242 if connReq.conn != nil {
245 log.Debugf("Disconnected from %v", connReq)
246 delete(conns, msg.id)
248 if cm.cfg.OnDisconnection != nil {
249 go cm.cfg.OnDisconnection(connReq)
252 if uint32(len(conns)) < cm.cfg.TargetOutbound && msg.retry {
253 cm.handleFailedConn(connReq)
256 log.Errorf("Unknown connection: %d", msg.id)
261 connReq.updateState(ConnFailed)
262 log.Debugf("Failed to connect to %v: %v", connReq, msg.err)
263 cm.handleFailedConn(connReq)
272 log.Trace("Connection handler done")
275 // NewConnReq creates a new connection request and connects to the
276 // corresponding address.
277 func (cm *ConnManager) NewConnReq() {
278 if atomic.LoadInt32(&cm.stop) != 0 {
281 if cm.cfg.GetNewAddress == nil {
286 atomic.StoreUint64(&c.id, atomic.AddUint64(&cm.connReqCount, 1))
288 addr, err := cm.cfg.GetNewAddress()
290 cm.requests <- handleFailed{c, err}
299 // Connect assigns an id and dials a connection to the address of the
300 // connection request.
301 func (cm *ConnManager) Connect(c *ConnReq) {
302 if atomic.LoadInt32(&cm.stop) != 0 {
305 if atomic.LoadUint64(&c.id) == 0 {
306 atomic.StoreUint64(&c.id, atomic.AddUint64(&cm.connReqCount, 1))
308 log.Debugf("Attempting to connect to %v", c)
309 conn, err := cm.cfg.Dial(c.Addr)
311 cm.requests <- handleFailed{c, err}
313 cm.requests <- handleConnected{c, conn}
317 // Disconnect disconnects the connection corresponding to the given connection
318 // id. If permanent, the connection will be retried with an increasing backoff
320 func (cm *ConnManager) Disconnect(id uint64) {
321 if atomic.LoadInt32(&cm.stop) != 0 {
324 cm.requests <- handleDisconnected{id, true}
327 // Remove removes the connection corresponding to the given connection
328 // id from known connections.
329 func (cm *ConnManager) Remove(id uint64) {
330 if atomic.LoadInt32(&cm.stop) != 0 {
333 cm.requests <- handleDisconnected{id, false}
336 // listenHandler accepts incoming connections on a given listener. It must be
337 // run as a goroutine.
338 func (cm *ConnManager) listenHandler(listener net.Listener) {
339 log.Infof("Server listening on %s", listener.Addr())
340 for atomic.LoadInt32(&cm.stop) == 0 {
341 conn, err := listener.Accept()
343 // Only log the error if not forcibly shutting down.
344 if atomic.LoadInt32(&cm.stop) == 0 {
345 log.Errorf("Can't accept connection: %v", err)
349 go cm.cfg.OnAccept(conn)
353 log.Tracef("Listener handler done for %s", listener.Addr())
356 // Start launches the connection manager and begins connecting to the network.
357 func (cm *ConnManager) Start() {
359 if atomic.AddInt32(&cm.start, 1) != 1 {
363 log.Trace("Connection manager started")
367 // Start all the listeners so long as the caller requested them and
368 // provided a callback to be invoked when connections are accepted.
369 if cm.cfg.OnAccept != nil {
370 for _, listner := range cm.cfg.Listeners {
372 go cm.listenHandler(listner)
376 for i := atomic.LoadUint64(&cm.connReqCount); i < uint64(cm.cfg.TargetOutbound); i++ {
381 // Wait blocks until the connection manager halts gracefully.
382 func (cm *ConnManager) Wait() {
386 // Stop gracefully shuts down the connection manager.
387 func (cm *ConnManager) Stop() {
388 if atomic.AddInt32(&cm.stop, 1) != 1 {
389 log.Warnf("Connection manager already stopped")
393 // Stop all the listeners. There will not be any listeners if
394 // listening is disabled.
395 for _, listener := range cm.cfg.Listeners {
396 // Ignore the error since this is shutdown and there is no way
397 // to recover anyways.
402 log.Trace("Connection manager stopped")
405 // New returns a new connection manager.
406 // Use Start to start connecting to the network.
407 func New(cfg *Config) (*ConnManager, error) {
409 return nil, ErrDialNil
411 // Default to sane values
412 if cfg.RetryDuration <= 0 {
413 cfg.RetryDuration = defaultRetryDuration
415 if cfg.TargetOutbound == 0 {
416 cfg.TargetOutbound = defaultTargetOutbound
419 cfg: *cfg, // Copy so caller can't mutate
420 requests: make(chan interface{}),
421 quit: make(chan struct{}),