diff --git a/src/relay/switch/mod.rs b/src/relay/switch/mod.rs index 1538f2a..f07f26e 100644 --- a/src/relay/switch/mod.rs +++ b/src/relay/switch/mod.rs @@ -19,9 +19,11 @@ use bb8_redis::{ RedisConnectionManager, }; use queue::Queue; +use rand::Rng; pub use session::{MessageID, SessionID}; use std::collections::HashMap; use std::sync::Arc; +use std::time::SystemTime; use tokio::sync::{Mutex, Semaphore}; use tokio::time::Duration; use tokio_util::sync::{CancellationToken, DropGuard}; @@ -67,7 +69,9 @@ pub const DEFAULT_USERS: usize = 100_1000; const READ_COUNT: usize = 100; const RETRY_DELAY: Duration = Duration::from_secs(2); const QUEUE_MAXLEN: usize = 10000; -const QUEUE_EXPIRE: usize = 3600; // queues can live max of 1 hour +const QUEUE_EXPIRE: usize = 1800; // idle queues can live max of 30 minutes (matches MAX_TTL) +const ENTRY_MAX_AGE_MS: u64 = 1800 * 1000; // entries older than 30 minutes in busy queues are evicted when housekeeping (XTRIM) is triggered +const XTRIM_SAMPLE_RATE: u32 = 10; // 10% of sends trigger housekeeping (XTRIM) #[derive(thiserror::Error, Debug)] pub enum SwitchError { @@ -315,6 +319,12 @@ impl Sink { } } +/// A helper function to decide if a probabilistic action should be triggered. +fn passes_chance(sample_rate: u32) -> bool { + // If sample_rate is 10, this creates a 10% chance of being true. + rand::thread_rng().gen_range(0..100) < sample_rate +} + async fn send( stream_id: &SessionID, pool: &Pool, @@ -344,6 +354,39 @@ async fn send( .query_async(&mut *con) .await?; + // The cleanup strategy is multi-layered but it's necessary to ensure + // the system doesn't grow indefinitely. + // + // Without both EXPIRE and XTRIM, the system would be incomplete + // EXPIRE alone can't clean active streams, and XTRIM alone wouldn't remove entries from abandoned streams. + // so basically: + // - EXPIRE: It's the garbage collector for abandoned or idle streams. whole stream is removed. + // - XTRIM MINID: It's the housekeeper for busy streams preventing them from becoming bloated with expired messages by removing old entries. + // + // Probabilistic time-based trimming (10% of sends) + // This remove messages older than ENTRY_MAX_AGE_MS which is the max time allowed for a messages in the queue. + // running XTRIM on every send would needlessly tax the system for no meaningful gain. + // The 10% probabilistic approach remains superior, automatically adapts, moore sends = more XTRIM + if passes_chance(XTRIM_SAMPLE_RATE) { + let now_ms = SystemTime::now() + .duration_since(SystemTime::UNIX_EPOCH) + .unwrap() + .as_millis() as u64; + + let cutoff_ms = now_ms.saturating_sub(ENTRY_MAX_AGE_MS); + + // XTRIM is best-effort; don't fail send if it errors + if let Err(e) = cmd("XTRIM") + .arg(stream_id) + .arg("MINID") + .arg(cutoff_ms) + .query_async::(&mut *con) + .await + { + log::debug!("XTRIM failed for {}: {}", stream_id, e); + } + } + MESSAGE_RX.inc(); MESSAGE_RX_BYTES.inc_by(msg.len() as u64); @@ -359,7 +402,7 @@ mod test { use crate::{redis, relay::switch::SessionID}; - use super::{ConnectionSender, MessageID, Switch}; + use super::{passes_chance, ConnectionSender, MessageID, Switch}; #[derive(Clone)] struct MessageSender { @@ -464,4 +507,74 @@ mod test { assert_eq!(count, expected_count); } + + #[test] + fn test_passes_chance_probability() { + const TOTAL_RUNS: u32 = 100_000; + const SAMPLE_RATE: u32 = 10; // 10% + let mut true_count = 0; + + for _ in 0..TOTAL_RUNS { + if passes_chance(SAMPLE_RATE) { + true_count += 1; + } + } + + let actual_percentage = (true_count as f64 / TOTAL_RUNS as f64) * 100.0; + let expected_percentage = SAMPLE_RATE as f64; + + // We assert that the actual percentage is within a reasonable margin (e.g., 2%) + // of the expected percentage. This avoids flaky tests due to randomness. + let margin = 2.0; + assert!( + (actual_percentage - expected_percentage).abs() < margin, + "Actual percentage {:.2}% was not within {:.2}% of expected {}%", + actual_percentage, + margin, + expected_percentage + ); + } + + #[test] + fn test_passes_chance_scales_with_activity() { + const ROUNDS: u32 = 10000; + const HIGH_ACTIVITY_PER_ROUND: u32 = 7; + const LOW_ACTIVITY_PER_ROUND: u32 = 3; + const SAMPLE_RATE: u32 = 10; // 10% + + let mut high_activity_hits = 0; + let mut low_activity_hits = 0; + + for _ in 0..ROUNDS { + // In each round, simulate 7 high-activity and 3 low-activity sends + for _ in 0..HIGH_ACTIVITY_PER_ROUND { + if passes_chance(SAMPLE_RATE) { + high_activity_hits += 1; + } + } + for _ in 0..LOW_ACTIVITY_PER_ROUND { + if passes_chance(SAMPLE_RATE) { + low_activity_hits += 1; + } + } + } + + let total_hits = high_activity_hits + low_activity_hits; + // Avoid division by zero if no hits occurred (very unlikely but possible) + assert!(total_hits > 0, "total hits is zero"); + + let high_activity_percentage = (high_activity_hits as f64 / total_hits as f64) * 100.0; + + // The high-activity group was responsible for 70% of the sends (70000 out of 100000). + // We expect it to receive roughly 70% of the cleanup triggers. + let expected_percentage = 70.0; + let margin = 5.0; // Allow a 5% margin for randomness + + assert!( + (high_activity_percentage - expected_percentage).abs() < margin, + "High-activity group received {:.2}% of cleanups, expected around {:.2}%", + high_activity_percentage, + expected_percentage + ); + } }