Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
117 changes: 115 additions & 2 deletions src/relay/switch/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,11 @@ use bb8_redis::{
RedisConnectionManager,
};
use queue::Queue;
use rand::Rng;
pub use session::{MessageID, SessionID};
use std::collections::HashMap;
use std::sync::Arc;
use std::time::SystemTime;
use tokio::sync::{Mutex, Semaphore};
use tokio::time::Duration;
use tokio_util::sync::{CancellationToken, DropGuard};
Expand Down Expand Up @@ -67,7 +69,9 @@ pub const DEFAULT_USERS: usize = 100_1000;
const READ_COUNT: usize = 100;
const RETRY_DELAY: Duration = Duration::from_secs(2);
const QUEUE_MAXLEN: usize = 10000;
const QUEUE_EXPIRE: usize = 3600; // queues can live max of 1 hour
const QUEUE_EXPIRE: usize = 1800; // idle queues can live max of 30 minutes (matches MAX_TTL)
const ENTRY_MAX_AGE_MS: u64 = 1800 * 1000; // entries older than 30 minutes in busy queues are evicted when housekeeping (XTRIM) is triggered
const XTRIM_SAMPLE_RATE: u32 = 10; // 10% of sends trigger housekeeping (XTRIM)

#[derive(thiserror::Error, Debug)]
pub enum SwitchError {
Expand Down Expand Up @@ -315,6 +319,12 @@ impl Sink {
}
}

/// A helper function to decide if a probabilistic action should be triggered.
fn passes_chance(sample_rate: u32) -> bool {
// If sample_rate is 10, this creates a 10% chance of being true.
rand::thread_rng().gen_range(0..100) < sample_rate
}

async fn send(
stream_id: &SessionID,
pool: &Pool<RedisConnectionManager>,
Expand Down Expand Up @@ -344,6 +354,39 @@ async fn send(
.query_async(&mut *con)
.await?;

// The cleanup strategy is multi-layered but it's necessary to ensure
// the system doesn't grow indefinitely.
//
// Without both EXPIRE and XTRIM, the system would be incomplete
// EXPIRE alone can't clean active streams, and XTRIM alone wouldn't remove entries from abandoned streams.
// so basically:
// - EXPIRE: It's the garbage collector for abandoned or idle streams. whole stream is removed.
// - XTRIM MINID: It's the housekeeper for busy streams preventing them from becoming bloated with expired messages by removing old entries.
//
// Probabilistic time-based trimming (10% of sends)
// This remove messages older than ENTRY_MAX_AGE_MS which is the max time allowed for a messages in the queue.
// running XTRIM on every send would needlessly tax the system for no meaningful gain.
// The 10% probabilistic approach remains superior, automatically adapts, moore sends = more XTRIM
if passes_chance(XTRIM_SAMPLE_RATE) {
let now_ms = SystemTime::now()
.duration_since(SystemTime::UNIX_EPOCH)
.unwrap()
.as_millis() as u64;

let cutoff_ms = now_ms.saturating_sub(ENTRY_MAX_AGE_MS);

// XTRIM is best-effort; don't fail send if it errors
if let Err(e) = cmd("XTRIM")
.arg(stream_id)
.arg("MINID")
.arg(cutoff_ms)
.query_async::<i64>(&mut *con)
.await
{
log::debug!("XTRIM failed for {}: {}", stream_id, e);
}
}

MESSAGE_RX.inc();
MESSAGE_RX_BYTES.inc_by(msg.len() as u64);

Expand All @@ -359,7 +402,7 @@ mod test {

use crate::{redis, relay::switch::SessionID};

use super::{ConnectionSender, MessageID, Switch};
use super::{passes_chance, ConnectionSender, MessageID, Switch};

#[derive(Clone)]
struct MessageSender {
Expand Down Expand Up @@ -464,4 +507,74 @@ mod test {

assert_eq!(count, expected_count);
}

#[test]
fn test_passes_chance_probability() {
const TOTAL_RUNS: u32 = 100_000;
const SAMPLE_RATE: u32 = 10; // 10%
let mut true_count = 0;

for _ in 0..TOTAL_RUNS {
if passes_chance(SAMPLE_RATE) {
true_count += 1;
}
}

let actual_percentage = (true_count as f64 / TOTAL_RUNS as f64) * 100.0;
let expected_percentage = SAMPLE_RATE as f64;

// We assert that the actual percentage is within a reasonable margin (e.g., 2%)
// of the expected percentage. This avoids flaky tests due to randomness.
let margin = 2.0;
assert!(
(actual_percentage - expected_percentage).abs() < margin,
"Actual percentage {:.2}% was not within {:.2}% of expected {}%",
actual_percentage,
margin,
expected_percentage
);
}

#[test]
fn test_passes_chance_scales_with_activity() {
const ROUNDS: u32 = 10000;
const HIGH_ACTIVITY_PER_ROUND: u32 = 7;
const LOW_ACTIVITY_PER_ROUND: u32 = 3;
const SAMPLE_RATE: u32 = 10; // 10%

let mut high_activity_hits = 0;
let mut low_activity_hits = 0;

for _ in 0..ROUNDS {
// In each round, simulate 7 high-activity and 3 low-activity sends
for _ in 0..HIGH_ACTIVITY_PER_ROUND {
if passes_chance(SAMPLE_RATE) {
high_activity_hits += 1;
}
}
for _ in 0..LOW_ACTIVITY_PER_ROUND {
if passes_chance(SAMPLE_RATE) {
low_activity_hits += 1;
}
}
}

let total_hits = high_activity_hits + low_activity_hits;
// Avoid division by zero if no hits occurred (very unlikely but possible)
assert!(total_hits > 0, "total hits is zero");

let high_activity_percentage = (high_activity_hits as f64 / total_hits as f64) * 100.0;

// The high-activity group was responsible for 70% of the sends (70000 out of 100000).
// We expect it to receive roughly 70% of the cleanup triggers.
let expected_percentage = 70.0;
let margin = 5.0; // Allow a 5% margin for randomness

assert!(
(high_activity_percentage - expected_percentage).abs() < margin,
"High-activity group received {:.2}% of cleanups, expected around {:.2}%",
high_activity_percentage,
expected_percentage
);
}
}
Loading