From a74cb2cd7385435e1e03f52bde8a8779fa09e29b Mon Sep 17 00:00:00 2001 From: Sameh Abouelsaad Date: Wed, 19 Nov 2025 02:24:47 +0200 Subject: [PATCH 1/4] feat: add probabilistic time-based stream trimming - Implemented 10% probabilistic XTRIM on message sends to remove expired messages older than 30 minutes - Reduced QUEUE_EXPIRE from 1 hour to 30 minutes to align with MAX_TTL - Added tests verifying probability distribution and activity-based scaling --- src/relay/switch/mod.rs | 119 +++++++++++++++++++++++++++++++++++++++- 1 file changed, 117 insertions(+), 2 deletions(-) diff --git a/src/relay/switch/mod.rs b/src/relay/switch/mod.rs index 1538f2a..f9a4891 100644 --- a/src/relay/switch/mod.rs +++ b/src/relay/switch/mod.rs @@ -19,9 +19,11 @@ use bb8_redis::{ RedisConnectionManager, }; use queue::Queue; +use rand::Rng; pub use session::{MessageID, SessionID}; use std::collections::HashMap; use std::sync::Arc; +use std::time::SystemTime; use tokio::sync::{Mutex, Semaphore}; use tokio::time::Duration; use tokio_util::sync::{CancellationToken, DropGuard}; @@ -67,7 +69,9 @@ pub const DEFAULT_USERS: usize = 100_1000; const READ_COUNT: usize = 100; const RETRY_DELAY: Duration = Duration::from_secs(2); const QUEUE_MAXLEN: usize = 10000; -const QUEUE_EXPIRE: usize = 3600; // queues can live max of 1 hour +const QUEUE_EXPIRE: usize = 1800; // 30 minutes (matches MAX_TTL) +const MAX_TTL_MS: u64 = 1800 * 1000; // 30 minutes in milliseconds +const XTRIM_SAMPLE_RATE: u32 = 10; // 10% of sends trigger XTRIM #[derive(thiserror::Error, Debug)] pub enum SwitchError { @@ -315,6 +319,12 @@ impl Sink { } } +/// A helper function to decide if a probabilistic action should be triggered. +fn passes_chance(sample_rate: u32) -> bool { + // If sample_rate is 10, this creates a 10% chance of being true. + rand::thread_rng().gen_range(0..100) < sample_rate +} + async fn send( stream_id: &SessionID, pool: &Pool, @@ -344,6 +354,39 @@ async fn send( .query_async(&mut *con) .await?; + // The cleanup strategy is multi-layered but it's necessary to ensure + // the system doesn't grow indefinitely. + // + // Without both EXPIRE and XTRIM, the system would be incomplete + // EXPIRE alone can't clean active streams, and XTRIM alone wouldn't remove streams for clients that have disconnected permanently. + // so basically: + // - EXPIRE: It's the garbage collector for abandoned or idle streams. + // - XTRIM MINID: It's the housekeeper for busy streams, preventing them from becoming bloated with expired messages. + // + // Probabilistic time-based trimming (10% of sends) + // This remove messages older than MAX_TTL_MS which is the max time allowed for a messages in transit. + // running XTRIM on every send would needlessly tax the system for no meaningful gain. + // The 10% probabilistic approach remains superior, automatically adapts, moore sends = more XTRIM + if passes_chance(XTRIM_SAMPLE_RATE) { + let now_ms = SystemTime::now() + .duration_since(SystemTime::UNIX_EPOCH) + .unwrap() + .as_millis() as u64; + + let cutoff_ms = now_ms.saturating_sub(MAX_TTL_MS); + + // XTRIM is best-effort; don't fail send if it errors + if let Err(e) = cmd("XTRIM") + .arg(stream_id) + .arg("MINID") + .arg(cutoff_ms) + .query_async::(&mut *con) + .await + { + log::debug!("XTRIM failed for {}: {}", stream_id, e); + } + } + MESSAGE_RX.inc(); MESSAGE_RX_BYTES.inc_by(msg.len() as u64); @@ -359,7 +402,7 @@ mod test { use crate::{redis, relay::switch::SessionID}; - use super::{ConnectionSender, MessageID, Switch}; + use super::{passes_chance, ConnectionSender, MessageID, Switch}; #[derive(Clone)] struct MessageSender { @@ -464,4 +507,76 @@ mod test { assert_eq!(count, expected_count); } + + #[test] + fn test_passes_chance_probability() { + const TOTAL_RUNS: u32 = 100_000; + const SAMPLE_RATE: u32 = 10; // 10% + let mut true_count = 0; + + for _ in 0..TOTAL_RUNS { + if passes_chance(SAMPLE_RATE) { + true_count += 1; + } + } + + let actual_percentage = (true_count as f64 / TOTAL_RUNS as f64) * 100.0; + let expected_percentage = SAMPLE_RATE as f64; + + // We assert that the actual percentage is within a reasonable margin (e.g., 2%) + // of the expected percentage. This avoids flaky tests due to randomness. + let margin = 2.0; + assert!( + (actual_percentage - expected_percentage).abs() < margin, + "Actual percentage {:.2}% was not within {:.2}% of expected {}%", + actual_percentage, + margin, + expected_percentage + ); + } + + #[test] + fn test_passes_chance_scales_with_activity() { + const ROUNDS: u32 = 10000; + const HIGH_ACTIVITY_PER_ROUND: u32 = 7; + const LOW_ACTIVITY_PER_ROUND: u32 = 3; + const SAMPLE_RATE: u32 = 10; // 10% + + let mut high_activity_hits = 0; + let mut low_activity_hits = 0; + + for _ in 0..ROUNDS { + // In each round, simulate 7 high-activity and 3 low-activity sends + for _ in 0..HIGH_ACTIVITY_PER_ROUND { + if passes_chance(SAMPLE_RATE) { + high_activity_hits += 1; + } + } + for _ in 0..LOW_ACTIVITY_PER_ROUND { + if passes_chance(SAMPLE_RATE) { + low_activity_hits += 1; + } + } + } + + let total_hits = high_activity_hits + low_activity_hits; + // Avoid division by zero if no hits occurred (very unlikely but possible) + if total_hits == 0 { + return; + } + + let high_activity_proportion = high_activity_hits as f64 / total_hits as f64; + + // The high-activity group was responsible for 70% of the sends (70000 out of 100000). + // We expect it to receive roughly 70% of the cleanup triggers. + let expected_proportion = 0.7; + let margin = 0.05; // Allow a 5% margin for randomness + + assert!( + (high_activity_proportion - expected_proportion).abs() < margin, + "High-activity group received {:.2}% of cleanups, expected around {:.2}%", + high_activity_proportion * 100.0, + expected_proportion * 100.0 + ); + } } From aed8fbd42766a93a0d914f8abb732d0f93cbcd69 Mon Sep 17 00:00:00 2001 From: Sameh Abouelsaad Date: Wed, 19 Nov 2025 03:03:12 +0200 Subject: [PATCH 2/4] test: convert test proportions to percentages for clarity --- src/relay/switch/mod.rs | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/src/relay/switch/mod.rs b/src/relay/switch/mod.rs index f9a4891..b20bd95 100644 --- a/src/relay/switch/mod.rs +++ b/src/relay/switch/mod.rs @@ -565,18 +565,18 @@ mod test { return; } - let high_activity_proportion = high_activity_hits as f64 / total_hits as f64; + let high_activity_percentage = (high_activity_hits as f64 / total_hits as f64) * 100.0; // The high-activity group was responsible for 70% of the sends (70000 out of 100000). // We expect it to receive roughly 70% of the cleanup triggers. - let expected_proportion = 0.7; - let margin = 0.05; // Allow a 5% margin for randomness + let expected_percentage = 70.0; + let margin = 5.0; // Allow a 5% margin for randomness assert!( - (high_activity_proportion - expected_proportion).abs() < margin, + (high_activity_percentage - expected_percentage).abs() < margin, "High-activity group received {:.2}% of cleanups, expected around {:.2}%", - high_activity_proportion * 100.0, - expected_proportion * 100.0 + high_activity_percentage, + expected_percentage ); } } From 5516d2abac39c7c9c0860ab93e1e2848792b8ae4 Mon Sep 17 00:00:00 2001 From: Sameh Abouelsaad Date: Wed, 19 Nov 2025 05:11:11 +0200 Subject: [PATCH 3/4] docs: clarify queue management comments and constants --- src/relay/switch/mod.rs | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/src/relay/switch/mod.rs b/src/relay/switch/mod.rs index b20bd95..6bc577f 100644 --- a/src/relay/switch/mod.rs +++ b/src/relay/switch/mod.rs @@ -69,9 +69,9 @@ pub const DEFAULT_USERS: usize = 100_1000; const READ_COUNT: usize = 100; const RETRY_DELAY: Duration = Duration::from_secs(2); const QUEUE_MAXLEN: usize = 10000; -const QUEUE_EXPIRE: usize = 1800; // 30 minutes (matches MAX_TTL) -const MAX_TTL_MS: u64 = 1800 * 1000; // 30 minutes in milliseconds -const XTRIM_SAMPLE_RATE: u32 = 10; // 10% of sends trigger XTRIM +const QUEUE_EXPIRE: usize = 1800; // idle queues can live max of 30 minutes (matches MAX_TTL) +const ENTRY_MAX_AGE_MS: u64 = 1800 * 1000; // entries older than 30 minutes in busy queues are evicted when housekeeping (XTRIM) is triggered +const XTRIM_SAMPLE_RATE: u32 = 10; // 10% of sends trigger housekeeping (XTRIM) #[derive(thiserror::Error, Debug)] pub enum SwitchError { @@ -358,13 +358,13 @@ async fn send( // the system doesn't grow indefinitely. // // Without both EXPIRE and XTRIM, the system would be incomplete - // EXPIRE alone can't clean active streams, and XTRIM alone wouldn't remove streams for clients that have disconnected permanently. + // EXPIRE alone can't clean active streams, and XTRIM alone wouldn't remove entries from abandoned streams. // so basically: - // - EXPIRE: It's the garbage collector for abandoned or idle streams. - // - XTRIM MINID: It's the housekeeper for busy streams, preventing them from becoming bloated with expired messages. + // - EXPIRE: It's the garbage collector for abandoned or idle streams. whole stream is removed. + // - XTRIM MINID: It's the housekeeper for busy streams preventing them from becoming bloated with expired messages by removing old entries. // // Probabilistic time-based trimming (10% of sends) - // This remove messages older than MAX_TTL_MS which is the max time allowed for a messages in transit. + // This remove messages older than ENTRY_MAX_AGE_MS which is the max time allowed for a messages in the queue. // running XTRIM on every send would needlessly tax the system for no meaningful gain. // The 10% probabilistic approach remains superior, automatically adapts, moore sends = more XTRIM if passes_chance(XTRIM_SAMPLE_RATE) { @@ -373,7 +373,7 @@ async fn send( .unwrap() .as_millis() as u64; - let cutoff_ms = now_ms.saturating_sub(MAX_TTL_MS); + let cutoff_ms = now_ms.saturating_sub(ENTRY_MAX_AGE_MS); // XTRIM is best-effort; don't fail send if it errors if let Err(e) = cmd("XTRIM") From 55bb279439434b94474fe62b73a76e289c17905e Mon Sep 17 00:00:00 2001 From: Sameh Abouelsaad Date: Wed, 19 Nov 2025 05:23:49 +0200 Subject: [PATCH 4/4] test: replace early return with assertion --- src/relay/switch/mod.rs | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/src/relay/switch/mod.rs b/src/relay/switch/mod.rs index 6bc577f..f07f26e 100644 --- a/src/relay/switch/mod.rs +++ b/src/relay/switch/mod.rs @@ -561,9 +561,7 @@ mod test { let total_hits = high_activity_hits + low_activity_hits; // Avoid division by zero if no hits occurred (very unlikely but possible) - if total_hits == 0 { - return; - } + assert!(total_hits > 0, "total hits is zero"); let high_activity_percentage = (high_activity_hits as f64 / total_hits as f64) * 100.0;