Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 22 additions & 17 deletions src/connection_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@ pub struct ConnectionRetryOptions {
pub connection_timeout: Duration,
/// keep-alive interval for each broker connection
pub keep_alive: Duration,
/// maximum idle time before a connection is eligible for cleanup
pub connection_max_idle: Duration,
}

impl Default for ConnectionRetryOptions {
Expand All @@ -46,6 +48,7 @@ impl Default for ConnectionRetryOptions {
max_retries: 12u32,
connection_timeout: Duration::from_secs(10),
keep_alive: Duration::from_secs(60),
connection_max_idle: Duration::from_secs(120),
}
}
}
Expand Down Expand Up @@ -156,7 +159,7 @@ impl Default for TlsOptions {
enum ConnectionStatus<Exe: Executor> {
Connected {
conn: Arc<Connection<Exe>>,
created_at: Instant,
last_used: Instant,
},
Connecting(Vec<oneshot::Sender<Result<Arc<Connection<Exe>>, ConnectionError>>>),
}
Expand Down Expand Up @@ -292,9 +295,11 @@ impl<Exe: Executor> ConnectionManager<Exe> {
trace!("[] no connection for {}", broker.url);
None
}
Some(ConnectionStatus::Connected { conn, .. }) => {
Some(ConnectionStatus::Connected { conn, last_used }) => {
if conn.is_valid() {
trace!("[connected] returning valid connection for {}", broker.url);
// Update last_used timestamp to prevent premature cleanup
*last_used = Instant::now();
return Ok(conn.clone());
} else {
warn!("[connected] invalid connection for {}", broker.url);
Expand Down Expand Up @@ -524,7 +529,7 @@ impl<Exe: Executor> ConnectionManager<Exe> {
broker,
ConnectionStatus::Connected {
conn: c.clone(),
created_at: Instant::now(),
last_used: Instant::now(),
},
);
match old {
Expand Down Expand Up @@ -557,35 +562,35 @@ impl<Exe: Executor> ConnectionManager<Exe> {
trace!("Retaining connection in `Connecting` state");
true
}
ConnectionStatus::Connected { conn, created_at } => {
// Grace period of 5 seconds for newly created connections
// to allow time for consumers/producers to grab a reference
let grace_period = Duration::from_secs(5);
let age = created_at.elapsed();
let in_grace_period = age < grace_period;
ConnectionStatus::Connected { conn, last_used } => {
let max_idle = self.connection_retry_options.connection_max_idle;
let idle_time = last_used.elapsed();
let recently_used = idle_time < max_idle;
let strong_count = Arc::strong_count(conn);
let is_valid = conn.is_valid();

// Keep connection if valid AND (someone is using it OR it's new)
let should_retain = is_valid && (strong_count > 1 || in_grace_period);
// Keep connection if valid AND (actively held OR recently used)
// This allows periodic use (like topic refresh) while cleaning up truly abandoned connections
let should_retain = is_valid && (strong_count > 1 || recently_used);

trace!(
"checking broker {} connection {}, is_valid: {}, strong_count: {}, age: {:?}, in_grace_period: {}",
"checking broker {} connection {}, is_valid: {}, strong_count: {}, idle_time: {:?}, max_idle: {:?}, recently_used: {}",
broker.url,
conn.id(),
is_valid,
strong_count,
age,
in_grace_period
idle_time,
max_idle,
recently_used
);
if !should_retain {
info!(
"Removing {} connection {} to {} (strong_count: {}, age: {:?})",
"Removing {} connection {} to {} (max_idle: {:?}, idle_time: {:?})",
if is_valid { "unused" } else { "invalid" },
conn.id(),
broker.url,
strong_count,
age
max_idle,
idle_time
);
}
should_retain
Expand Down