From 15e0ad3d97c7b5092277a93a41deccc2b1c374da Mon Sep 17 00:00:00 2001 From: Ramon Figueiredo Date: Wed, 13 May 2026 17:41:34 -0700 Subject: [PATCH 01/10] [rust-scheduler] Replace round-robin ClusterFeed with priority-queue scheduling Fixes #2315. The ClusterFeed previously iterated clusters with a fixed-step round-robin and applied lap-based 10ms/100ms/5s sleeps between rounds. Every cluster received equal air-time regardless of work, and a sleeping cluster added delay to neighboring clusters. Replace the Vec + AtomicUsize index with a BinaryHeap ordered by (next_eligible_at asc, last_dispatched_jobs desc). The feed now pops the highest-priority cluster, sleeps until eligible (preemptible via Notify), emits, and waits for a Done message that re-inserts the cluster with updated stats. Lap-based sleep tiers and the parallel sleep_map are removed; eligibility now lives on each heap entry. Cluster feed changes (cluster.rs): - Added Scheduled { cluster, next_eligible_at, last_dispatched_jobs } with Ord prioritizing earliest eligibility, then busiest cluster on ties (productivity bias). - ClusterFeed now holds Arc>> + Notify. - FeedMessage::Sleep / Stop() collapsed into FeedMessage::Done { cluster, processed_jobs, sleep } + unit-variant Stop. - stream() dispatch loop now pops, sleeps with preemption, and emits. - Control loop re-inserts on Done and notifies the dispatch loop. - Added ClusterFeed::load_from_clusters shim for tests. Consumer (pipeline/entrypoint.rs): - Always sends FeedMessage::Done with processed_jobs and optional back-off. - Preserves should_stop / empty_job_cycles logic. Config (config/mod.rs): - Added cluster_empty_back_off: Duration (default 3s), replacing hardcoded entrypoint back-off. - Added cluster_productivity_bias: bool (default true), toggling busy-cluster tie-breaking. Observability (metrics/mod.rs): - Added scheduler_cluster_polls_total {show_id, facility_id} CounterVec. - Added scheduler_cluster_rounds_total Counter (mirrors CLUSTER_ROUNDS). - Added scheduler_cluster_last_dispatched_jobs {show_id, facility_id} Gauge, updated from the Done handler to expose the productivity-bias signal. Tests: - Added 6 unit tests covering Scheduled ordering, sleep-back-off timing, Done round-trip, and a 500ms mixed-workload proportionality test asserting busy clusters receive >2x the polls of idle ones. - Updated tests/util.rs for the two new QueueConfig fields. --- rust/crates/scheduler/src/cluster.rs | 574 ++++++++++++++---- rust/crates/scheduler/src/config/mod.rs | 10 + rust/crates/scheduler/src/metrics/mod.rs | 54 +- .../scheduler/src/pipeline/entrypoint.rs | 59 +- rust/crates/scheduler/tests/util.rs | 2 + 5 files changed, 558 insertions(+), 141 deletions(-) diff --git a/rust/crates/scheduler/src/cluster.rs b/rust/crates/scheduler/src/cluster.rs index 71a93d7c5..b29bc88ce 100644 --- a/rust/crates/scheduler/src/cluster.rs +++ b/rust/crates/scheduler/src/cluster.rs @@ -11,19 +11,20 @@ // the License. use std::{ - collections::{BTreeSet, HashMap, HashSet}, + cmp::Ordering as CmpOrdering, + collections::{BTreeSet, BinaryHeap, HashMap, HashSet}, sync::{ atomic::{AtomicBool, AtomicUsize, Ordering}, - Arc, Mutex, RwLock, + Arc, Mutex, }, - time::{Duration, SystemTime}, + time::{Duration, Instant}, }; use futures::StreamExt; use itertools::Itertools; use miette::{IntoDiagnostic, Result}; use serde::{Deserialize, Serialize}; -use tokio::sync::mpsc; +use tokio::sync::{mpsc, Notify}; use tracing::{debug, error, warn}; use uuid::Uuid; @@ -31,8 +32,14 @@ use crate::{ cluster_key::{Tag, TagType}, config::CONFIG, dao::{helpers::parse_uuid, ClusterDao}, + metrics, }; +/// Counts how many times a cluster has been emitted by the feed. +/// +/// Kept as a process-global atomic for source-level compatibility with smoke tests that +/// read it directly. Production observability is provided via the Prometheus +/// `scheduler_cluster_polls_total` counter (see [`metrics`]). pub static CLUSTER_ROUNDS: AtomicUsize = AtomicUsize::new(0); #[derive(Serialize, Deserialize, Debug, Clone, Hash, PartialEq, Eq, PartialOrd, Ord)] @@ -72,28 +79,86 @@ impl Cluster { } } +/// A cluster scheduled for dispatch, ordered by eligibility time and recent productivity. +/// +/// `BinaryHeap` is a max-heap, so [`Scheduled::cmp`] inverts `next_eligible_at` +/// (earliest wins) and keeps `last_dispatched_jobs` in natural order (busier wins +/// as a tiebreaker when productivity bias is enabled). +#[derive(Debug, Clone)] +struct Scheduled { + cluster: Cluster, + /// When this cluster becomes eligible for dispatch. A past `Instant` means + /// the cluster is ready right now. + next_eligible_at: Instant, + /// Number of jobs processed in the most recent dispatch: used as the + /// productivity bias tiebreaker. + last_dispatched_jobs: usize, +} + +impl PartialEq for Scheduled { + fn eq(&self, other: &Self) -> bool { + self.next_eligible_at == other.next_eligible_at + && self.last_dispatched_jobs == other.last_dispatched_jobs + && self.cluster == other.cluster + } +} +impl Eq for Scheduled {} + +impl Ord for Scheduled { + fn cmp(&self, other: &Self) -> CmpOrdering { + // Earliest `next_eligible_at` is highest priority. Invert for max-heap. + let by_time = other.next_eligible_at.cmp(&self.next_eligible_at); + if by_time != CmpOrdering::Equal { + return by_time; + } + // Productivity bias: more jobs in the last dispatch ranks higher. + if CONFIG.queue.cluster_productivity_bias { + let by_jobs = self.last_dispatched_jobs.cmp(&other.last_dispatched_jobs); + if by_jobs != CmpOrdering::Equal { + return by_jobs; + } + } + // Deterministic tiebreaker on the cluster identity. + self.cluster.cmp(&other.cluster) + } +} +impl PartialOrd for Scheduled { + fn partial_cmp(&self, other: &Self) -> Option { + Some(self.cmp(other)) + } +} + #[derive(Debug)] pub struct ClusterFeed { - pub clusters: Arc>>, - current_index: Arc, + /// Priority queue of clusters awaiting dispatch. + queue: Arc>>, stop_flag: Arc, - sleep_map: Arc>>, + /// Wakes the dispatch loop when an entry is pushed back or shutdown is signaled. + notify: Arc, } /// Control messages for the cluster feed stream. /// -/// These messages are sent to the control channel returned by `ClusterFeed::stream()` -/// to influence feed behavior during runtime. +/// Sent to the channel returned by [`ClusterFeed::stream`] to report back the +/// outcome of processing a cluster (so the priority queue can re-rank it) or +/// to signal a graceful shutdown. pub enum FeedMessage { /// Stops the cluster feed stream gracefully. - Stop(), - /// Puts a specific cluster to sleep for the given duration. + Stop, + /// Reports the result of processing a cluster and re-inserts it into the + /// priority queue. /// /// # Fields /// - /// * `Cluster` - The cluster to put to sleep - /// * `Duration` - How long to sleep before the cluster can be processed again - Sleep(Cluster, Duration), + /// * `cluster` - The cluster that was just processed. + /// * `processed_jobs` - Jobs dispatched on this cycle. Drives the productivity-bias tiebreaker. + /// * `sleep` - Optional back-off duration. `None` re-queues the cluster as eligible now; + /// `Some(d)` defers it for at least `d`. + Done { + cluster: Cluster, + processed_jobs: usize, + sleep: Option, + }, } /// Builder for constructing a [`ClusterFeed`]. @@ -159,11 +224,19 @@ impl ClusterFeedBuilder { } ClusterFeed::filter_clusters(clusters.into_iter().collect(), &self.ignore_tags) }; + let now = Instant::now(); + let mut heap = BinaryHeap::with_capacity(clusters.len().max(1)); + for cluster in clusters { + heap.push(Scheduled { + cluster, + next_eligible_at: now, + last_dispatched_jobs: 0, + }); + } Ok(ClusterFeed { - clusters: Arc::new(RwLock::new(clusters)), - current_index: Arc::new(AtomicUsize::new(0)), + queue: Arc::new(Mutex::new(heap)), stop_flag: Arc::new(AtomicBool::new(false)), - sleep_map: Arc::new(Mutex::new(HashMap::new())), + notify: Arc::new(Notify::new()), }) } } @@ -189,6 +262,34 @@ impl ClusterFeed { } } + /// Builds a feed from a fixed list of clusters with optional tag filtering. + /// + /// Intended for tests and direct construction where the cluster set is already + /// known. Production code should prefer the builder ([`facility`](Self::facility) / + /// [`no_facility`](Self::no_facility)) which can also load clusters from the database. + /// + /// # Arguments + /// + /// * `clusters` - Explicit list of clusters to feed. + /// * `ignore_tags` - Tag names to drop; a cluster whose tag set becomes empty is excluded. + pub fn load_from_clusters(clusters: Vec, ignore_tags: &[String]) -> Self { + let filtered = Self::filter_clusters(clusters, ignore_tags); + let now = Instant::now(); + let mut heap = BinaryHeap::with_capacity(filtered.len().max(1)); + for cluster in filtered { + heap.push(Scheduled { + cluster, + next_eligible_at: now, + last_dispatched_jobs: 0, + }); + } + ClusterFeed { + queue: Arc::new(Mutex::new(heap)), + stop_flag: Arc::new(AtomicBool::new(false)), + notify: Arc::new(Notify::new()), + } + } + /// Loads all clusters from the database and organizes them by tag type. /// /// Loads allocation clusters (one per facility+show+tag), and chunks manual/hostname tags @@ -336,148 +437,152 @@ impl ClusterFeed { .collect() } - /// Streams clusters to a channel receiver with backpressure control. + /// Streams clusters from a priority queue to a channel receiver. /// - /// Creates a producer-consumer pattern where clusters are sent through a channel - /// to the provided sender. The stream can be controlled via the returned message - /// channel (for sleep/stop commands). + /// Creates a producer-consumer pattern backed by a `BinaryHeap` + /// keyed on `(next_eligible_at asc, last_dispatched_jobs desc)`. The consumer + /// must report each cluster back via [`FeedMessage::Done`] so it can be + /// re-inserted with updated stats; otherwise the cluster is lost. /// /// # Arguments /// - /// * `sender` - Channel sender for emitting clusters + /// * `sender` - Channel sender for emitting eligible clusters. /// /// # Returns /// - /// * `mpsc::Sender` - Control channel for sending sleep/stop messages + /// * `mpsc::Sender` - Channel to send `Done` (re-insert) and `Stop` messages. /// /// # Behavior /// - /// - Iterates through clusters in round-robin fashion - /// - Skips sleeping clusters until their wake time expires - /// - Applies backoff delays between rounds (varies based on sleeping cluster count) - /// - Stops when receiving a Stop message or when configured empty cycles limit is reached - /// - Automatically cleans up expired sleep entries + /// - Pops the highest-priority cluster (earliest eligible, busiest as tiebreaker). + /// - If the popped entry is not yet eligible, sleeps until it is, but wakes + /// immediately on `Notify` so a freshly inserted higher-priority entry can preempt. + /// - On `FeedMessage::Done` re-inserts the cluster with the reported sleep + /// deadline and `last_dispatched_jobs` for the productivity bias. + /// - On `FeedMessage::Stop` sets the stop flag and exits. pub async fn stream(self, sender: mpsc::Sender) -> mpsc::Sender { - // Use a small channel to ensure the producer waits for items to be consumed before - // generating more - let (cancel_sender, mut feed_receiver) = mpsc::channel(8); + // Backpressure: bounded channel keeps the consumer from queueing unbounded Done messages. + let (control_sender, mut control_receiver) = mpsc::channel(32); let stop_flag = self.stop_flag.clone(); - let sleep_map = self.sleep_map.clone(); + let queue = self.queue.clone(); + let notify = self.notify.clone(); - // Stream clusters on the caller channel + // Dispatch loop: pop highest-priority cluster, sleep until eligible, send. + let queue_dispatch = queue.clone(); + let notify_dispatch = notify.clone(); + let stop_flag_dispatch = stop_flag.clone(); tokio::spawn(async move { - let mut all_sleeping_rounds = 0; - let feed = self.clusters.clone(); - let current_index_atomic = self.current_index.clone(); - loop { - // Check stop flag - if stop_flag.load(Ordering::Relaxed) { - warn!("Cluster received a stop message. Stopping feed."); + if stop_flag_dispatch.load(Ordering::Relaxed) { + warn!("Cluster feed received stop signal. Exiting dispatch loop."); break; } - let (item, cluster_size, completed_round) = { - let clusters = feed.read().unwrap_or_else(|poisoned| poisoned.into_inner()); - if clusters.is_empty() { - break; - } - - let current_index = current_index_atomic.load(Ordering::Relaxed); - let item = clusters[current_index].clone(); - let next_index = (current_index + 1) % clusters.len(); - let completed_round = next_index == 0; // Detect wrap-around - current_index_atomic.store(next_index, Ordering::Relaxed); - - (item, clusters.len(), completed_round) + let scheduled = { + let mut heap = + queue_dispatch.lock().unwrap_or_else(|p| p.into_inner()); + heap.pop() }; - // Skip cluster if it is marked as sleeping - let is_sleeping = { - let mut sleep_map_lock = sleep_map.lock().unwrap_or_else(|p| p.into_inner()); - if let Some(wake_up_time) = sleep_map_lock.get(&item) { - if *wake_up_time > SystemTime::now() { - // Still sleeping, skip it - true - } else { - // Remove expired entries - sleep_map_lock.remove(&item); - false + let scheduled = match scheduled { + Some(s) => s, + None => { + // No clusters available: all in flight. Wait for a re-insert + // or a stop signal. The short fallback sleep guarantees we + // re-check `stop_flag` if no notify ever arrives. + tokio::select! { + _ = notify_dispatch.notified() => {} + _ = tokio::time::sleep(Duration::from_millis(100)) => {} } - } else { - false + continue; } }; - if !is_sleeping && sender.send(item).await.is_err() { - warn!("Cluster receiver dropped. Stopping feed."); - break; + let now = Instant::now(); + if scheduled.next_eligible_at > now { + let sleep_dur = scheduled.next_eligible_at - now; + let preempted = tokio::select! { + _ = tokio::time::sleep(sleep_dur) => false, + _ = notify_dispatch.notified() => true, + }; + if preempted { + // A new entry was pushed: re-pop to pick the new best. + let mut heap = + queue_dispatch.lock().unwrap_or_else(|p| p.into_inner()); + heap.push(scheduled); + continue; + } + if stop_flag_dispatch.load(Ordering::Relaxed) { + break; + } } - // At end of round, add backoff sleep - if completed_round { - CLUSTER_ROUNDS.fetch_add(1, Ordering::Relaxed); + CLUSTER_ROUNDS.fetch_add(1, Ordering::Relaxed); + metrics::increment_cluster_polls( + &scheduled.cluster.show_id, + &scheduled.cluster.facility_id, + ); - // Check if all/most clusters are sleeping - let sleeping_count = { - let sleep_map_lock = sleep_map.lock().unwrap_or_else(|p| p.into_inner()); - sleep_map_lock.len() - }; - if sleeping_count >= cluster_size { - // Ensure this doesn't loop forever when there's a limit configured - all_sleeping_rounds += 1; - if let Some(max_empty_cycles) = CONFIG.queue.empty_job_cycles_before_quiting - { - if all_sleeping_rounds > max_empty_cycles { - warn!("All clusters have been sleeping for too long"); - break; - } - } - - // All clusters sleeping, sleep longer - tokio::time::sleep(Duration::from_secs(5)).await; - } else if sleeping_count > 0 { - // Some clusters sleeping, brief pause - tokio::time::sleep(Duration::from_millis(100)).await; - } else { - // Active work, minimal pause - tokio::time::sleep(Duration::from_millis(10)).await; - } + if sender.send(scheduled.cluster).await.is_err() { + warn!("Cluster receiver dropped. Stopping feed."); + break; } } }); - // Process messages on the receiving end - let sleep_map = self.sleep_map.clone(); + // Control loop: handle Done (re-insert) and Stop messages from the consumer. + let queue_ctrl = queue.clone(); + let notify_ctrl = notify.clone(); + let stop_flag_ctrl = stop_flag.clone(); tokio::spawn(async move { - while let Some(message) = feed_receiver.recv().await { + while let Some(message) = control_receiver.recv().await { match message { - FeedMessage::Sleep(cluster, duration) => { - if let Some(wake_up_time) = SystemTime::now().checked_add(duration) { - debug!("{:?} put to sleep for {}s", cluster, duration.as_secs()); - { - let mut sleep_map_lock = - sleep_map.lock().unwrap_or_else(|p| p.into_inner()); - sleep_map_lock.insert(cluster, wake_up_time); + FeedMessage::Done { + cluster, + processed_jobs, + sleep, + } => { + let next_eligible_at = match sleep { + Some(d) => { + debug!( + "{:?} re-queued with back-off of {}s ({} jobs processed)", + cluster, + d.as_secs(), + processed_jobs + ); + Instant::now() + .checked_add(d) + .unwrap_or_else(Instant::now) } - } else { - warn!( - "Sleep request ignored for {:?}. Invalid duration={}s", + None => Instant::now(), + }; + metrics::set_cluster_last_dispatched_jobs( + &cluster.show_id, + &cluster.facility_id, + processed_jobs, + ); + { + let mut heap = + queue_ctrl.lock().unwrap_or_else(|p| p.into_inner()); + heap.push(Scheduled { cluster, - duration.as_secs() - ); + next_eligible_at, + last_dispatched_jobs: processed_jobs, + }); } + notify_ctrl.notify_one(); } - FeedMessage::Stop() => { - self.stop_flag.store(true, Ordering::Relaxed); + FeedMessage::Stop => { + stop_flag_ctrl.store(true, Ordering::Relaxed); + notify_ctrl.notify_one(); break; } } } }); - cancel_sender + control_sender } } @@ -513,3 +618,242 @@ pub async fn get_show_id(show_name: &str) -> Result { let cluster_dao = ClusterDao::new().await?; cluster_dao.get_show_id(show_name).await.into_diagnostic() } + +#[cfg(test)] +mod tests { + use super::*; + use std::collections::BTreeSet; + + fn make_cluster(name: &str) -> Cluster { + Cluster { + facility_id: Uuid::new_v4(), + show_id: Uuid::new_v4(), + tags: BTreeSet::from([Tag { + name: name.to_string(), + ttype: TagType::Alloc, + }]), + } + } + + fn scheduled(cluster: Cluster, at: Instant, jobs: usize) -> Scheduled { + Scheduled { + cluster, + next_eligible_at: at, + last_dispatched_jobs: jobs, + } + } + + #[test] + fn earlier_next_eligible_at_pops_first() { + let now = Instant::now(); + let later = scheduled(make_cluster("a"), now + Duration::from_secs(10), 100); + let sooner = scheduled(make_cluster("b"), now, 0); + + let mut heap = BinaryHeap::new(); + heap.push(later); + heap.push(sooner); + + let popped = heap.pop().expect("heap not empty"); + assert_eq!(popped.last_dispatched_jobs, 0); + } + + #[test] + fn busier_cluster_pops_first_when_eligibility_ties() { + // This test relies on the default `cluster_productivity_bias = true`. + assert!( + CONFIG.queue.cluster_productivity_bias, + "test assumes default config: productivity bias enabled" + ); + + let now = Instant::now(); + let idle = scheduled(make_cluster("idle"), now, 0); + let busy = scheduled(make_cluster("busy"), now, 500); + + let mut heap = BinaryHeap::new(); + heap.push(idle); + heap.push(busy); + + let popped = heap.pop().expect("heap not empty"); + assert_eq!(popped.last_dispatched_jobs, 500); + } + + #[test] + fn sleeping_cluster_does_not_block_eligible_neighbor() { + let now = Instant::now(); + let sleeping = scheduled( + make_cluster("sleeping"), + now + Duration::from_secs(60), + 10, + ); + let ready = scheduled(make_cluster("ready"), now, 0); + + let mut heap = BinaryHeap::new(); + heap.push(sleeping); + heap.push(ready); + + let first = heap.pop().expect("heap not empty"); + let first_tag = first.cluster.tags.iter().next().unwrap().name.clone(); + assert_eq!(first_tag, "ready"); + } + + #[tokio::test] + async fn stream_round_trip_re_inserts_via_done() { + let cluster = make_cluster("solo"); + let mut heap = BinaryHeap::new(); + heap.push(Scheduled { + cluster: cluster.clone(), + next_eligible_at: Instant::now(), + last_dispatched_jobs: 0, + }); + + let feed = ClusterFeed { + queue: Arc::new(Mutex::new(heap)), + stop_flag: Arc::new(AtomicBool::new(false)), + notify: Arc::new(Notify::new()), + }; + + let (tx, mut rx) = mpsc::channel::(8); + let control = feed.stream(tx).await; + + // Receive the cluster, report it back as productive, receive it again. + let emitted = rx.recv().await.expect("feed emitted a cluster"); + assert_eq!(emitted, cluster); + + control + .send(FeedMessage::Done { + cluster: emitted.clone(), + processed_jobs: 7, + sleep: None, + }) + .await + .expect("control channel open"); + + let emitted_again = tokio::time::timeout(Duration::from_secs(1), rx.recv()) + .await + .expect("cluster re-emitted within 1s") + .expect("cluster present"); + assert_eq!(emitted_again, cluster); + + // Tidy up so the spawned tasks exit. + control.send(FeedMessage::Stop).await.ok(); + } + + #[tokio::test] + async fn busy_cluster_gets_more_polls_than_idle() { + // Two clusters share the feed. The "busy" cluster reports productive + // work with no back-off; the "idle" cluster reports zero jobs with a + // 100ms back-off. Over a 500ms window busy should be polled many more + // times than idle. + let busy = make_cluster("busy"); + let idle = make_cluster("idle"); + let now = Instant::now(); + let mut heap = BinaryHeap::new(); + heap.push(Scheduled { + cluster: busy.clone(), + next_eligible_at: now, + last_dispatched_jobs: 0, + }); + heap.push(Scheduled { + cluster: idle.clone(), + next_eligible_at: now, + last_dispatched_jobs: 0, + }); + + let feed = ClusterFeed { + queue: Arc::new(Mutex::new(heap)), + stop_flag: Arc::new(AtomicBool::new(false)), + notify: Arc::new(Notify::new()), + }; + + let (tx, mut rx) = mpsc::channel::(16); + let control = feed.stream(tx).await; + + let mut busy_polls = 0usize; + let mut idle_polls = 0usize; + let test_duration = Duration::from_millis(500); + let idle_back_off = Duration::from_millis(100); + let deadline = Instant::now() + test_duration; + + while Instant::now() < deadline { + let remaining = deadline.saturating_duration_since(Instant::now()); + let next = tokio::time::timeout(remaining, rx.recv()).await; + let Ok(Some(cluster)) = next else { break }; + + let is_busy = cluster.tags.iter().next().unwrap().name == "busy"; + let (processed_jobs, sleep) = if is_busy { + busy_polls += 1; + (100, None) + } else { + idle_polls += 1; + (0, Some(idle_back_off)) + }; + control + .send(FeedMessage::Done { + cluster, + processed_jobs, + sleep, + }) + .await + .ok(); + } + + control.send(FeedMessage::Stop).await.ok(); + + // Idle is back-off-bound: in 500ms with a 100ms back-off it can be + // polled at most ~5 times. Busy has no back-off and is only limited + // by task scheduling, so it should run away. + assert!( + busy_polls > idle_polls * 2, + "expected busy polls (>2x) idle polls; busy={busy_polls} idle={idle_polls}" + ); + assert!(busy_polls > 0 && idle_polls > 0, "both clusters polled at least once"); + } + + #[tokio::test] + async fn stream_respects_sleep_back_off() { + let cluster = make_cluster("naps"); + let mut heap = BinaryHeap::new(); + heap.push(Scheduled { + cluster: cluster.clone(), + next_eligible_at: Instant::now(), + last_dispatched_jobs: 0, + }); + + let feed = ClusterFeed { + queue: Arc::new(Mutex::new(heap)), + stop_flag: Arc::new(AtomicBool::new(false)), + notify: Arc::new(Notify::new()), + }; + + let (tx, mut rx) = mpsc::channel::(8); + let control = feed.stream(tx).await; + + // First emission is immediate. + rx.recv().await.expect("first emission"); + + // Re-queue with a 250ms back-off. + let back_off = Duration::from_millis(250); + let started = Instant::now(); + control + .send(FeedMessage::Done { + cluster: cluster.clone(), + processed_jobs: 0, + sleep: Some(back_off), + }) + .await + .expect("control channel open"); + + // Should not arrive before the back-off, should arrive shortly after. + let _ = tokio::time::timeout(Duration::from_secs(2), rx.recv()) + .await + .expect("cluster re-emitted within 2s") + .expect("cluster present"); + let elapsed = started.elapsed(); + assert!( + elapsed >= back_off, + "cluster re-emitted in {elapsed:?}, expected >= {back_off:?}" + ); + + control.send(FeedMessage::Stop).await.ok(); + } +} diff --git a/rust/crates/scheduler/src/config/mod.rs b/rust/crates/scheduler/src/config/mod.rs index 3d46e1419..36eb7556f 100644 --- a/rust/crates/scheduler/src/config/mod.rs +++ b/rust/crates/scheduler/src/config/mod.rs @@ -82,6 +82,14 @@ pub struct QueueConfig { pub hostname_tags_chunk_size: usize, pub host_candidate_attempts_per_layer: usize, pub empty_job_cycles_before_quiting: Option, + /// Delay before an empty cluster (no eligible jobs) becomes eligible for dispatch again. + /// Used by the priority-queue cluster feed to back off without blocking neighbors. + #[serde(with = "humantime_serde")] + pub cluster_empty_back_off: Duration, + /// When `true`, the cluster priority queue biases toward clusters that produced more + /// jobs in the most recent dispatch (productivity bias). When `false`, all clusters + /// compete strictly on `next_eligible_at` with a stable tiebreaker. + pub cluster_productivity_bias: bool, pub mem_reserved_min: ByteSize, #[serde(with = "humantime_serde")] pub subscription_recalculation_interval: Duration, @@ -108,6 +116,8 @@ impl Default for QueueConfig { hostname_tags_chunk_size: 300, host_candidate_attempts_per_layer: 10, empty_job_cycles_before_quiting: None, + cluster_empty_back_off: Duration::from_secs(3), + cluster_productivity_bias: true, mem_reserved_min: ByteSize::mib(250), subscription_recalculation_interval: Duration::from_secs(3), resource_recalculation_interval: Duration::from_secs(10), diff --git a/rust/crates/scheduler/src/metrics/mod.rs b/rust/crates/scheduler/src/metrics/mod.rs index f96eabbb1..9dfe9c94c 100644 --- a/rust/crates/scheduler/src/metrics/mod.rs +++ b/rust/crates/scheduler/src/metrics/mod.rs @@ -13,8 +13,8 @@ use axum::{response::IntoResponse, routing::get, Router}; use lazy_static::lazy_static; use prometheus::{ - register_counter, register_counter_vec, register_histogram, Counter, CounterVec, Encoder, - Histogram, TextEncoder, + register_counter, register_counter_vec, register_gauge_vec, register_histogram, Counter, + CounterVec, Encoder, GaugeVec, Histogram, TextEncoder, }; use std::time::Duration; use tracing::{error, info}; @@ -70,6 +70,32 @@ lazy_static! { vec![0.001, 0.005, 0.01, 0.05, 0.1, 0.5, 1.0, 5.0, 10.0] ) .expect("Failed to register job_query_duration_seconds histogram"); + + // Cluster feed metrics from cluster.rs + pub static ref CLUSTER_POLLS_TOTAL: CounterVec = register_counter_vec!( + "scheduler_cluster_polls_total", + "Total number of times each cluster has been emitted by the priority-queue feed", + &["show_id", "facility_id"] + ) + .expect("Failed to register cluster_polls_total counter"); + + /// Global counter mirroring the in-memory `CLUSTER_ROUNDS` atomic. + /// Equivalent to `sum(scheduler_cluster_polls_total)` but exposed for cheap + /// querying / alerting on overall feed throughput. + pub static ref CLUSTER_ROUNDS_TOTAL: Counter = register_counter!( + "scheduler_cluster_rounds_total", + "Total number of cluster pops across all clusters" + ) + .expect("Failed to register cluster_rounds_total counter"); + + /// Most-recent job count for each cluster — useful to inspect the productivity + /// bias' effect on dispatch ordering. + pub static ref CLUSTER_LAST_DISPATCHED_JOBS: GaugeVec = register_gauge_vec!( + "scheduler_cluster_last_dispatched_jobs", + "Jobs dispatched in the most recent processing cycle for each cluster", + &["show_id", "facility_id"] + ) + .expect("Failed to register cluster_last_dispatched_jobs gauge"); } /// Handler for the /metrics endpoint @@ -166,3 +192,27 @@ pub fn observe_time_to_book(duration: Duration) { pub fn observe_job_query_duration(duration: Duration) { JOB_QUERY_DURATION_SECONDS.observe(duration.as_secs_f64()); } + +/// Helper function to increment cluster polls counter. +/// +/// Bumps both the per-cluster `scheduler_cluster_polls_total` and the global +/// `scheduler_cluster_rounds_total` so dashboards can use whichever is cheaper. +#[inline] +pub fn increment_cluster_polls(show_id: &uuid::Uuid, facility_id: &uuid::Uuid) { + CLUSTER_POLLS_TOTAL + .with_label_values(&[&show_id.to_string(), &facility_id.to_string()]) + .inc(); + CLUSTER_ROUNDS_TOTAL.inc(); +} + +/// Records the number of jobs dispatched in the most recent cycle for a cluster. +#[inline] +pub fn set_cluster_last_dispatched_jobs( + show_id: &uuid::Uuid, + facility_id: &uuid::Uuid, + count: usize, +) { + CLUSTER_LAST_DISPATCHED_JOBS + .with_label_values(&[&show_id.to_string(), &facility_id.to_string()]) + .set(count as f64); +} diff --git a/rust/crates/scheduler/src/pipeline/entrypoint.rs b/rust/crates/scheduler/src/pipeline/entrypoint.rs index ba7ef55e4..223f478ba 100644 --- a/rust/crates/scheduler/src/pipeline/entrypoint.rs +++ b/rust/crates/scheduler/src/pipeline/entrypoint.rs @@ -12,7 +12,6 @@ use std::sync::atomic::{AtomicUsize, Ordering}; use std::sync::Arc; -use std::time::Duration; use futures::{stream, StreamExt}; use tokio::sync::mpsc; @@ -69,9 +68,8 @@ pub async fn run(cluster_feed: ClusterFeed) -> miette::Result<()> { ) .await; - match jobs { + let (processed_count, should_stop) = match jobs { Ok(jobs) => { - // Track number of jobs queried metrics::increment_jobs_queried(jobs.len()); let processed_jobs = AtomicUsize::new(0); @@ -87,33 +85,46 @@ pub async fn run(cluster_feed: ClusterFeed) -> miette::Result<()> { }, ) .await; - // If no jobs got processed, sleep to prevent hammering the database with - // queries with no outcome - if processed_jobs.load(Ordering::Relaxed) == 0 { - let _ = feed_sender - .send(FeedMessage::Sleep(cluster, Duration::from_secs(3))) - .await; - } - - // If empty_jobs_cycles_before_quiting is set, quit if nothing got processed - if let Some(limit) = CONFIG.queue.empty_job_cycles_before_quiting { - // Count cycles that couldn't find any job - if processed_jobs.load(Ordering::Relaxed) == 0 { - cycles_without_jobs.fetch_add(1, Ordering::Relaxed); - } else { - cycles_without_jobs.store(0, Ordering::Relaxed); - } - // Cancel stream processing after empty cycles - if cycles_without_jobs.load(Ordering::Relaxed) >= limit { - let _ = feed_sender.send(FeedMessage::Stop()).await; + let processed = processed_jobs.load(Ordering::Relaxed); + + let stop = match CONFIG.queue.empty_job_cycles_before_quiting { + Some(limit) => { + if processed == 0 { + cycles_without_jobs.fetch_add(1, Ordering::Relaxed); + } else { + cycles_without_jobs.store(0, Ordering::Relaxed); + } + cycles_without_jobs.load(Ordering::Relaxed) >= limit } - } + None => false, + }; + + (processed, stop) } Err(err) => { - let _ = feed_sender.send(FeedMessage::Stop()).await; error!("Failed to fetch job: {}", err); + (0, true) } + }; + + // Re-insert the cluster into the priority queue. An empty cycle gets a + // back-off so neighbors aren't starved while this cluster waits for work. + let sleep = if processed_count == 0 { + Some(CONFIG.queue.cluster_empty_back_off) + } else { + None + }; + let _ = feed_sender + .send(FeedMessage::Done { + cluster, + processed_jobs: processed_count, + sleep, + }) + .await; + + if should_stop { + let _ = feed_sender.send(FeedMessage::Stop).await; } } }) diff --git a/rust/crates/scheduler/tests/util.rs b/rust/crates/scheduler/tests/util.rs index a7bb6c29c..28475282b 100644 --- a/rust/crates/scheduler/tests/util.rs +++ b/rust/crates/scheduler/tests/util.rs @@ -97,6 +97,8 @@ pub fn create_test_config() -> Config { hostname_tags_chunk_size: 20, host_candidate_attempts_per_layer: 5, empty_job_cycles_before_quiting: Some(20), + cluster_empty_back_off: Duration::from_secs(3), + cluster_productivity_bias: true, mem_reserved_min: bytesize::ByteSize::mb(250), subscription_recalculation_interval: Duration::from_secs(3), resource_recalculation_interval: Duration::from_secs(10), From 3cdf885ebd2ea19e647242efc34126dc320ab09f Mon Sep 17 00:00:00 2001 From: Ramon Figueiredo Date: Wed, 13 May 2026 18:12:15 -0700 Subject: [PATCH 02/10] [rust-scheduler] Fix Ord/Eq contract and control-channel-close handling - Scheduled::cmp now always compares last_dispatched_jobs as a final tiebreaker so Ord agrees with PartialEq regardless of the cluster_productivity_bias toggle. Required by BinaryHeap. - Control loop now sets stop_flag and notifies the dispatcher when the control channel closes (e.g. consumer panic), preventing the dispatch loop from spinning on the empty-queue wake cycle. - Added ord_and_eq_stay_consistent_for_differing_jobs regression test. --- rust/crates/scheduler/src/cluster.rs | 33 ++++++++++++++++++- .../scheduler/src/pipeline/entrypoint.rs | 28 ++++++++++++---- 2 files changed, 53 insertions(+), 8 deletions(-) diff --git a/rust/crates/scheduler/src/cluster.rs b/rust/crates/scheduler/src/cluster.rs index b29bc88ce..dab68d9de 100644 --- a/rust/crates/scheduler/src/cluster.rs +++ b/rust/crates/scheduler/src/cluster.rs @@ -119,7 +119,16 @@ impl Ord for Scheduled { } } // Deterministic tiebreaker on the cluster identity. - self.cluster.cmp(&other.cluster) + let by_cluster = self.cluster.cmp(&other.cluster); + if by_cluster != CmpOrdering::Equal { + return by_cluster; + } + // Keep Ord consistent with Eq even when productivity bias is disabled. + // Without this final comparison, two entries that differ only by + // `last_dispatched_jobs` would yield `Ordering::Equal` here while + // `PartialEq::eq` returns false, violating the Ord/Eq contract that + // BinaryHeap relies on. + self.last_dispatched_jobs.cmp(&other.last_dispatched_jobs) } } impl PartialOrd for Scheduled { @@ -580,6 +589,11 @@ impl ClusterFeed { } } } + // Ensure the dispatch loop exits even if the control channel + // is dropped without an explicit Stop (e.g. consumer panics). + // Without this the dispatcher would spin on the empty-queue wake. + stop_flag_ctrl.store(true, Ordering::Relaxed); + notify_ctrl.notify_one(); }); control_sender @@ -643,6 +657,23 @@ mod tests { } } + /// Rust's `Ord` / `Eq` contract requires `cmp(a, b) == Equal` to imply + /// `a == b`. `BinaryHeap` relies on this. The bias toggle must not break it. + #[test] + fn ord_and_eq_stay_consistent_for_differing_jobs() { + let cluster = make_cluster("same"); + let now = Instant::now(); + let a = scheduled(cluster.clone(), now, 0); + let b = scheduled(cluster, now, 99); + + assert_ne!(a, b, "PartialEq sees them as distinct"); + assert_ne!( + a.cmp(&b), + CmpOrdering::Equal, + "Ord must not return Equal for two values PartialEq considers distinct" + ); + } + #[test] fn earlier_next_eligible_at_pops_first() { let now = Instant::now(); diff --git a/rust/crates/scheduler/src/pipeline/entrypoint.rs b/rust/crates/scheduler/src/pipeline/entrypoint.rs index 223f478ba..a5541ddc2 100644 --- a/rust/crates/scheduler/src/pipeline/entrypoint.rs +++ b/rust/crates/scheduler/src/pipeline/entrypoint.rs @@ -90,12 +90,17 @@ pub async fn run(cluster_feed: ClusterFeed) -> miette::Result<()> { let stop = match CONFIG.queue.empty_job_cycles_before_quiting { Some(limit) => { - if processed == 0 { - cycles_without_jobs.fetch_add(1, Ordering::Relaxed); + // Combine the update and read into a single atomic + // operation so concurrent clusters can't race between + // a fetch_add/store and a separate load. SeqCst gives + // us a strict total order across all clusters. + let new_count = if processed == 0 { + cycles_without_jobs.fetch_add(1, Ordering::SeqCst) + 1 } else { - cycles_without_jobs.store(0, Ordering::Relaxed); - } - cycles_without_jobs.load(Ordering::Relaxed) >= limit + cycles_without_jobs.swap(0, Ordering::SeqCst); + 0 + }; + new_count >= limit } None => false, }; @@ -103,8 +108,17 @@ pub async fn run(cluster_feed: ClusterFeed) -> miette::Result<()> { (processed, stop) } Err(err) => { - error!("Failed to fetch job: {}", err); - (0, true) + // Don't halt the entire scheduler for a single cluster's + // fetch error (network blip, transient DB load). Treat + // the cluster as empty for this cycle so it gets the + // configured back-off and other clusters keep running. + // The `empty_job_cycles_before_quiting` safety net still + // applies if the error persists across all clusters. + error!( + "Failed to fetch jobs for cluster {}: {}", + cluster, err + ); + (0, false) } }; From 9cfefea0aa4bdec88a6d8c79a62bee297f3803a4 Mon Sep 17 00:00:00 2001 From: Ramon Figueiredo Date: Wed, 13 May 2026 18:58:23 -0700 Subject: [PATCH 03/10] [rust-scheduler] Wire HOSTS_ATTEMPTED and WASTED_ATTEMPTS to Prometheus Addresses issue #2318 (item 7). These atomic counters in pipeline/matcher.rs were incremented on every host candidate selection and every wasted job cycle, but never exposed through the /metrics endpoint, so they had no operational value. Add scheduler_hosts_attempted_total and scheduler_wasted_attempts_total Counters in metrics/mod.rs and bump them alongside the existing atomics. HOSTS_ATTEMPTED / WASTED_ATTEMPTS are kept for source-level compatibility with smoke tests that read them directly; CLUSTER_ROUNDS was already wired to Prometheus in #2315. --- rust/crates/scheduler/src/metrics/mod.rs | 25 +++++++++++++++++++ rust/crates/scheduler/src/pipeline/matcher.rs | 2 ++ 2 files changed, 27 insertions(+) diff --git a/rust/crates/scheduler/src/metrics/mod.rs b/rust/crates/scheduler/src/metrics/mod.rs index 9dfe9c94c..75c76c889 100644 --- a/rust/crates/scheduler/src/metrics/mod.rs +++ b/rust/crates/scheduler/src/metrics/mod.rs @@ -96,6 +96,19 @@ lazy_static! { &["show_id", "facility_id"] ) .expect("Failed to register cluster_last_dispatched_jobs gauge"); + + // Matcher metrics mirroring the in-process atomics in pipeline/matcher.rs. + pub static ref HOSTS_ATTEMPTED_TOTAL: Counter = register_counter!( + "scheduler_hosts_attempted_total", + "Total host-candidate selection attempts across all layers" + ) + .expect("Failed to register hosts_attempted_total counter"); + + pub static ref WASTED_ATTEMPTS_TOTAL: Counter = register_counter!( + "scheduler_wasted_attempts_total", + "Jobs that processed zero layers (e.g. all locked by another scheduler)" + ) + .expect("Failed to register wasted_attempts_total counter"); } /// Handler for the /metrics endpoint @@ -216,3 +229,15 @@ pub fn set_cluster_last_dispatched_jobs( .with_label_values(&[&show_id.to_string(), &facility_id.to_string()]) .set(count as f64); } + +/// Records a host-candidate selection attempt. +#[inline] +pub fn increment_hosts_attempted() { + HOSTS_ATTEMPTED_TOTAL.inc(); +} + +/// Records a job that processed zero layers (e.g. all locked by another scheduler). +#[inline] +pub fn increment_wasted_attempts() { + WASTED_ATTEMPTS_TOTAL.inc(); +} diff --git a/rust/crates/scheduler/src/pipeline/matcher.rs b/rust/crates/scheduler/src/pipeline/matcher.rs index 3c2761e75..aeae9cfed 100644 --- a/rust/crates/scheduler/src/pipeline/matcher.rs +++ b/rust/crates/scheduler/src/pipeline/matcher.rs @@ -176,6 +176,7 @@ impl MatchingService { if processed_layers.load(Ordering::Relaxed) == 0 { WASTED_ATTEMPTS.fetch_add(1, Ordering::Relaxed); + metrics::increment_wasted_attempts(); debug!("Job {} didn't process any layer", job_disp); } } @@ -250,6 +251,7 @@ impl MatchingService { while try_again && attempts > 0 { attempts -= 1; HOSTS_ATTEMPTED.fetch_add(1, Ordering::Relaxed); + metrics::increment_hosts_attempted(); // Take ownership of the layer for this iteration let layer = current_layer_version From c86fa53c9f6e0e5328b4c8df0f265180b28ed5b1 Mon Sep 17 00:00:00 2001 From: Ramon Figueiredo Date: Wed, 13 May 2026 19:07:10 -0700 Subject: [PATCH 04/10] [rust-scheduler] Make host reservation TTL a safety net only Addresses issue #2318 (item 4). The 10s reservation TTL in HostCacheService is fragile: dispatches that take >10s (slow RQD, retries) silently let the reservation expire, opening the door for a second layer to grab the same host. The eager release path via check_in / Invalidate already exists; the timer was only meant to catch leaked reservations. Raise the TTL to 5 min via new HostCacheConfig::host_reservation_safety_ttl (humantime-serde configurable) and document its intent as a leak-recovery net rather than primary lifecycle. Explicit release on check_in / Invalidate remains unchanged. --- rust/crates/scheduler/src/config/mod.rs | 7 +++++++ rust/crates/scheduler/src/host_cache/actor.rs | 13 ++++++++++--- 2 files changed, 17 insertions(+), 3 deletions(-) diff --git a/rust/crates/scheduler/src/config/mod.rs b/rust/crates/scheduler/src/config/mod.rs index 36eb7556f..2747b5a8b 100644 --- a/rust/crates/scheduler/src/config/mod.rs +++ b/rust/crates/scheduler/src/config/mod.rs @@ -232,6 +232,12 @@ pub struct HostCacheConfig { #[serde(with = "humantime_serde")] pub host_staleness_threshold: Duration, pub update_stat_on_book: bool, + /// Safety net for leaked host reservations. The reservation system relies + /// on explicit `check_in` / `Invalidate` calls; this TTL only fires when a + /// task crashes or otherwise drops a checked-out host without releasing it. + /// Should be larger than the longest plausible dispatch (slow RQD, retries). + #[serde(with = "humantime_serde")] + pub host_reservation_safety_ttl: Duration, } impl Default for HostCacheConfig { @@ -246,6 +252,7 @@ impl Default for HostCacheConfig { concurrent_fetch_permit: 4, host_staleness_threshold: Duration::from_secs(2 * 60), // 2 minutes update_stat_on_book: false, + host_reservation_safety_ttl: Duration::from_secs(5 * 60), // 5 minutes } } } diff --git a/rust/crates/scheduler/src/host_cache/actor.rs b/rust/crates/scheduler/src/host_cache/actor.rs index 40d098850..ca2a56da3 100644 --- a/rust/crates/scheduler/src/host_cache/actor.rs +++ b/rust/crates/scheduler/src/host_cache/actor.rs @@ -48,8 +48,14 @@ pub struct HostCacheService { concurrency_semaphore: Arc, } -/// Use a reservation system to prevent race conditions when trying to book a host -/// that belongs to multiple groups. +/// Reservation guard preventing a host from being booked twice while it's +/// checked out. The primary lifecycle is explicit: `check_out` reserves and +/// `check_in` / `Invalidate` releases. [`expired`](Self::expired) is only a +/// safety net for leaked reservations (e.g. a task panics between check-out +/// and check-in). The TTL is configurable via +/// [`HostCacheConfig::host_reservation_safety_ttl`] and should be larger than +/// the longest plausible dispatch so a slow RQD call cannot accidentally +/// release the reservation mid-flight. struct HostReservation { reserved_time: SystemTime, } @@ -62,7 +68,8 @@ impl HostReservation { } pub fn expired(&self) -> bool { - self.reserved_time.elapsed().unwrap_or_default() > Duration::from_secs(10) + self.reserved_time.elapsed().unwrap_or_default() + > CONFIG.host_cache.host_reservation_safety_ttl } } From 50e0f3d956776ca35b97b87cf5606900630ffffe Mon Sep 17 00:00:00 2001 From: Ramon Figueiredo Date: Wed, 13 May 2026 22:16:44 -0700 Subject: [PATCH 05/10] [rust-scheduler] Replace host-cache panic with DB circuit breaker MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Addresses issue #2318 (item 6). matcher.rs previously called panic! when HostCacheService failed to query the database. A single transient hiccup (statement timeout, network blip, replica failover) brought the whole scheduler down and rolled back every in-flight dispatch. Add a DbCircuitBreaker in host_cache/actor.rs that wraps the DB query in fetch_group_data: - On success: reset the failure counter and close the breaker. - On failure: bump the counter and open the breaker for an exponentially growing window (base 500ms, capped at 30s). - Concurrent callers that hit an open breaker short-circuit without retrying the DB. - After CONFIG.host_cache.db_circuit_breaker.failure_threshold (10) consecutive failures the host cache logs and process::exit(1)s — the orchestration layer handles the restart, no panic backtrace. matcher.rs now logs the FailedToQueryHostCache arm at warn and lets the layer skip; other clusters keep dispatching. Added 3 unit tests for the breaker state machine. --- rust/crates/scheduler/src/config/mod.rs | 30 ++++ rust/crates/scheduler/src/host_cache/actor.rs | 164 +++++++++++++++++- rust/crates/scheduler/src/pipeline/matcher.rs | 36 ++-- 3 files changed, 203 insertions(+), 27 deletions(-) diff --git a/rust/crates/scheduler/src/config/mod.rs b/rust/crates/scheduler/src/config/mod.rs index 2747b5a8b..634d520c0 100644 --- a/rust/crates/scheduler/src/config/mod.rs +++ b/rust/crates/scheduler/src/config/mod.rs @@ -238,6 +238,35 @@ pub struct HostCacheConfig { /// Should be larger than the longest plausible dispatch (slow RQD, retries). #[serde(with = "humantime_serde")] pub host_reservation_safety_ttl: Duration, + /// Circuit-breaker configuration for host-cache DB queries. + pub db_circuit_breaker: DbCircuitBreakerConfig, +} + +/// Circuit-breaker tuning for host-cache database queries. +/// +/// On consecutive failures the breaker opens for an exponentially growing +/// window (`base_backoff` → `max_backoff`), short-circuiting further queries +/// without hitting the DB. Once `failure_threshold` consecutive failures +/// accumulate, the scheduler exits with status 1 so the orchestrator restarts +/// it cleanly rather than letting it limp along. +#[derive(Debug, Deserialize, Clone)] +#[serde(default)] +pub struct DbCircuitBreakerConfig { + pub failure_threshold: u32, + #[serde(with = "humantime_serde")] + pub base_backoff: Duration, + #[serde(with = "humantime_serde")] + pub max_backoff: Duration, +} + +impl Default for DbCircuitBreakerConfig { + fn default() -> Self { + Self { + failure_threshold: 10, + base_backoff: Duration::from_millis(500), + max_backoff: Duration::from_secs(30), + } + } } impl Default for HostCacheConfig { @@ -253,6 +282,7 @@ impl Default for HostCacheConfig { host_staleness_threshold: Duration::from_secs(2 * 60), // 2 minutes update_stat_on_book: false, host_reservation_safety_ttl: Duration::from_secs(5 * 60), // 5 minutes + db_circuit_breaker: DbCircuitBreakerConfig::default(), } } } diff --git a/rust/crates/scheduler/src/host_cache/actor.rs b/rust/crates/scheduler/src/host_cache/actor.rs index ca2a56da3..aebf4f536 100644 --- a/rust/crates/scheduler/src/host_cache/actor.rs +++ b/rust/crates/scheduler/src/host_cache/actor.rs @@ -19,16 +19,16 @@ use scc::{hash_map::OccupiedEntry, HashMap, HashSet}; use std::{ cmp::Ordering, sync::{ - atomic::{self, AtomicU64}, - Arc, + atomic::{self, AtomicU32, AtomicU64}, + Arc, Mutex, }, - time::{Duration, SystemTime}, + time::{Duration, Instant, SystemTime}, }; use futures::{stream, StreamExt}; use miette::Result; use tokio::sync::Semaphore; -use tracing::{debug, error, info, trace}; +use tracing::{debug, error, info, trace, warn}; use crate::{ cluster_key::{ClusterKey, Tag, TagType}, @@ -46,6 +46,80 @@ pub struct HostCacheService { cache_hit: Arc, cache_miss: Arc, concurrency_semaphore: Arc, + db_circuit: Arc, +} + +/// Circuit breaker around host-cache DB queries. +/// +/// A transient query failure (network blip, statement timeout) trips the +/// breaker for an exponentially growing window so concurrent callers fail +/// fast instead of all hitting the DB. On success the breaker resets. +/// After `failure_threshold` consecutive failures the scheduler exits with +/// status 1 — the orchestration layer (k8s, systemd) handles the restart. +pub(super) struct DbCircuitBreaker { + consecutive_failures: AtomicU32, + open_until: Mutex>, +} + +impl DbCircuitBreaker { + fn new() -> Self { + Self { + consecutive_failures: AtomicU32::new(0), + open_until: Mutex::new(None), + } + } + + /// Returns `Some(remaining)` if the breaker is currently open; the caller + /// should short-circuit. Returns `None` if it's safe to proceed. + fn check_open(&self) -> Option { + let guard = self.open_until.lock().unwrap_or_else(|p| p.into_inner()); + match *guard { + Some(until) => { + let now = Instant::now(); + if until > now { + Some(until - now) + } else { + None + } + } + None => None, + } + } + + fn record_success(&self) { + self.consecutive_failures + .store(0, atomic::Ordering::Relaxed); + *self + .open_until + .lock() + .unwrap_or_else(|p| p.into_inner()) = None; + } + + /// Records a failure and opens the breaker for an exponential backoff. + /// Returns `true` when the consecutive-failure threshold has been reached + /// and the caller should escalate to a clean process exit. + fn record_failure(&self) -> bool { + let cfg = &CONFIG.host_cache.db_circuit_breaker; + let count = self + .consecutive_failures + .fetch_add(1, atomic::Ordering::Relaxed) + + 1; + if count >= cfg.failure_threshold { + return true; + } + // Exponential backoff (capped). Clamp the shift so the multiplier + // can't overflow u64 even on huge failure counts. + let shift = count.min(20).saturating_sub(1); + let multiplier = 1u64.checked_shl(shift).unwrap_or(u64::MAX); + let base_ms = cfg.base_backoff.as_millis() as u64; + let backoff_ms = base_ms.saturating_mul(multiplier); + let backoff = Duration::from_millis(backoff_ms).min(cfg.max_backoff); + *self + .open_until + .lock() + .unwrap_or_else(|p| p.into_inner()) = Some(Instant::now() + backoff); + false + } } /// Reservation guard preventing a host from being booked twice while it's @@ -188,6 +262,7 @@ impl HostCacheService { CONFIG.host_cache.concurrent_fetch_permit, )), reserved_hosts: Arc::new(HashMap::new()), + db_circuit: Arc::new(DbCircuitBreaker::new()), }) } @@ -470,6 +545,20 @@ impl HostCacheService { &self, key: &ClusterKey, ) -> Result> { + // If a recent failure tripped the breaker, fail fast without hitting + // the DB. Callers (matcher) treat this as "no candidates available" + // and skip the current layer cleanly. + if let Some(remaining) = self.db_circuit.check_open() { + debug!( + "Host cache circuit open for {:?} more — short-circuiting fetch", + remaining + ); + return Err(miette::miette!( + "host cache DB circuit open (retry in {:?})", + remaining + )); + } + let _permit = self .concurrency_semaphore .acquire() @@ -477,11 +566,34 @@ impl HostCacheService { .into_diagnostic()?; let tag = key.tag.to_string(); - let hosts = self + let hosts = match self .host_dao .fetch_hosts_by_show_facility_tag(key.show_id, key.facility_id, &tag) .await - .into_diagnostic()?; + { + Ok(hosts) => { + self.db_circuit.record_success(); + hosts + } + Err(err) => { + let escalate = self.db_circuit.record_failure(); + if escalate { + error!( + "Host cache DB query failed past circuit-breaker threshold. \ + Exiting so the orchestrator can restart the scheduler. Last error: {}", + err + ); + // Clean exit (no panic backtrace). The orchestration layer + // (k8s, systemd, etc.) handles the restart. + std::process::exit(1); + } + warn!( + "Host cache DB query failed (transient, circuit will back off): {}", + err + ); + return Err(err).into_diagnostic(); + } + }; let cache = self.cluster_index.entry_sync(key.clone()).or_default(); @@ -500,3 +612,43 @@ impl HostCacheService { Ok(cache) } } + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn breaker_closed_by_default() { + let cb = DbCircuitBreaker::new(); + assert!(cb.check_open().is_none()); + } + + #[test] + fn breaker_opens_after_failure_and_closes_after_success() { + let cb = DbCircuitBreaker::new(); + + let escalate = cb.record_failure(); + assert!(!escalate, "single failure shouldn't escalate"); + assert!( + cb.check_open().is_some(), + "breaker should open after a failure" + ); + + cb.record_success(); + assert!(cb.check_open().is_none(), "success should reset breaker"); + } + + #[test] + fn breaker_escalates_after_threshold() { + let cb = DbCircuitBreaker::new(); + let threshold = CONFIG.host_cache.db_circuit_breaker.failure_threshold; + // Trip up to (threshold - 1) without escalating, then the next call escalates. + for _ in 0..threshold - 1 { + assert!(!cb.record_failure()); + } + assert!( + cb.record_failure(), + "Nth consecutive failure should signal escalation" + ); + } +} diff --git a/rust/crates/scheduler/src/pipeline/matcher.rs b/rust/crates/scheduler/src/pipeline/matcher.rs index aeae9cfed..25f49cac7 100644 --- a/rust/crates/scheduler/src/pipeline/matcher.rs +++ b/rust/crates/scheduler/src/pipeline/matcher.rs @@ -41,7 +41,7 @@ use crate::{ use actix::Addr; use miette::{Context, Result}; use tokio::sync::Semaphore; -use tracing::{debug, error, info, trace}; +use tracing::{debug, error, info, trace, warn}; pub static HOSTS_ATTEMPTED: AtomicUsize = AtomicUsize::new(0); pub static WASTED_ATTEMPTS: AtomicUsize = AtomicUsize::new(0); @@ -385,27 +385,21 @@ impl MatchingService { try_again = false; } crate::host_cache::HostCacheError::FailedToQueryHostCache(err) => { - // CRITICAL: Database connection failure in host cache query - // - // When the host cache cannot query the database, the matching service - // cannot reliably find hosts for job dispatch. This is a systemic - // failure that affects all job processing. - // - // We panic here rather than propagating an error because: - // 1. The entire service is compromised - no jobs can be matched - // 2. Graceful degradation is not possible without host candidates - // 3. The orchestration layer (e.g., Kubernetes) should restart the - // service to re-establish database connectivity - // 4. Bubbling the error up would add unnecessary complexity for a - // condition that always requires service restart - // - // This allows the orchestration layer to handle the failure through - // its standard restart policies rather than attempting partial recovery. - panic!( - "Host cache failed to query database - service is non-functional \ - and requires restart. Error: {}", + // Transient DB query failure. The host_cache circuit breaker + // applies exponential backoff and will short-circuit further + // queries until it recovers. After + // CONFIG.host_cache.db_circuit_breaker.failure_threshold + // consecutive failures the host cache exits the process cleanly + // so the orchestrator (k8s, systemd) restarts us. From the + // matcher's perspective we just skip this layer for now — + // other clusters keep running, and the next cycle may find + // candidates once the breaker closes again. + warn!( + "Host cache query failed for layer {} — skipping for now: {}", + current_layer_version.as_ref().unwrap(), err - ) + ); + try_again = false; } } } From 54a70c1fa9720eb1d460e6c8075d133fe45d4299 Mon Sep 17 00:00:00 2001 From: Ramon Figueiredo Date: Wed, 13 May 2026 22:22:13 -0700 Subject: [PATCH 06/10] [rust-scheduler] Remove redundant outer transaction in dispatcher Addresses issue #2318 (item 3). Handler previously opened an outer transaction purely to scope a session-level host advisory lock around dispatch. The outer tx did no DB writes of its own and held a pool connection across every per-frame transaction, gRPC call, and compensation step of the entire layer dispatch. - Switch the host lock from session-level pg_try_advisory_lock to transaction-scoped pg_try_advisory_xact_lock and acquire it inside each per-proc transaction in dispatch_virtual_proc. The lock now auto-releases on COMMIT/ROLLBACK, hold time is bounded to the small DB-only critical section, and no connection is held during gRPC. - Remove the outer transaction from Handler. - Drop the host_dao::lock / host_dao::unlock methods (no callers). - dispatch() no longer takes a Transaction parameter. Behavioral change: dispatch locking is now per-frame rather than per-layer. Two schedulers may interleave frames on the same host; each frame still has exclusive lock-protected DB updates (and the frame int_version optimistic check still gates frame ownership), so correctness is preserved while parallelism improves. --- rust/crates/scheduler/src/dao/host_dao.rs | 40 +++----- .../src/pipeline/dispatcher/actor.rs | 91 ++++++++----------- 2 files changed, 49 insertions(+), 82 deletions(-) diff --git a/rust/crates/scheduler/src/dao/host_dao.rs b/rust/crates/scheduler/src/dao/host_dao.rs index b517a340e..dda67b9bb 100644 --- a/rust/crates/scheduler/src/dao/host_dao.rs +++ b/rust/crates/scheduler/src/dao/host_dao.rs @@ -295,44 +295,30 @@ impl HostDao { /// * `Ok(true)` - Lock successfully acquired /// * `Ok(false)` - Lock already held by another process /// * `Err(miette::Error)` - Database operation failed - pub async fn lock( - &self, - transaction: &mut Transaction<'_, Postgres>, - host_id: &Uuid, - ) -> Result { - trace!("Locking {}", host_id); - sqlx::query_scalar::<_, bool>("SELECT pg_try_advisory_lock(hashtext($1))") - .bind(host_id.to_string()) - .fetch_one(&mut **transaction) - .await - .into_diagnostic() - .wrap_err("Failed to acquire advisory lock") - } - - /// Releases an advisory lock on a host after dispatch completion. + /// Acquires a transaction-scoped advisory lock on a host. /// - /// Releases the PostgreSQL advisory lock that was acquired during - /// the dispatch process, allowing other dispatchers to access the host. - /// - /// # Arguments - /// * `host_id` - The UUID of the host to unlock + /// Uses `pg_try_advisory_xact_lock`, which auto-releases on COMMIT or + /// ROLLBACK. This is the preferred lock variant for short critical + /// sections inside the per-proc transaction — callers don't need to issue + /// an explicit unlock and the lock cannot leak if the connection is + /// returned to the pool while still session-locked. /// /// # Returns - /// * `Ok(true)` - Lock successfully released - /// * `Ok(false)` - Lock was not held by this process - /// * `Err(miette::Error)` - Database operation failed - pub async fn unlock( + /// * `Ok(true)` — lock acquired (and will release at tx end) + /// * `Ok(false)` — another transaction holds the lock; caller should skip + /// * `Err(_)` — database error + pub async fn try_xact_lock( &self, transaction: &mut Transaction<'_, Postgres>, host_id: &Uuid, ) -> Result { - trace!("Unlocking {}", host_id); - sqlx::query_scalar::<_, bool>("SELECT pg_advisory_unlock(hashtext($1))") + trace!("Acquiring xact advisory lock for {}", host_id); + sqlx::query_scalar::<_, bool>("SELECT pg_try_advisory_xact_lock(hashtext($1))") .bind(host_id.to_string()) .fetch_one(&mut **transaction) .await .into_diagnostic() - .wrap_err("Failed to release advisory lock") + .wrap_err("Failed to acquire transaction-scoped advisory lock") } /// Updates a host's available resource counts after frame dispatch. diff --git a/rust/crates/scheduler/src/pipeline/dispatcher/actor.rs b/rust/crates/scheduler/src/pipeline/dispatcher/actor.rs index e020644d0..ac5babc6f 100644 --- a/rust/crates/scheduler/src/pipeline/dispatcher/actor.rs +++ b/rust/crates/scheduler/src/pipeline/dispatcher/actor.rs @@ -86,38 +86,19 @@ impl Handler for RqdDispatcherService { layer.layer_name, host.name ); + // No outer transaction here: the host advisory lock now lives inside + // each per-proc transaction (see dispatch_virtual_proc) as a + // transaction-scoped lock that auto-releases on commit/rollback. The + // outer tx previously held an idle connection across all gRPC calls + // for no DB work of its own. Box::pin( async move { - // Note: In a real implementation, we would need to coordinate with a transaction manager - // or pass the transaction through the message. For now, we'll create a new transaction - // within the dispatcher's database operations. - - // Create a database transaction scope - let mut transaction = begin_transaction() - .await - .map_err(DispatchError::DbFailure)?; - - match dispatcher.dispatch(&layer, host, &mut transaction).await { - Ok((updated_host, updated_layer)) => { - // Commit the transaction - transaction - .commit() - .await - .map_err(DispatchError::DbFailure)?; - - Ok(DispatchResult { - updated_host, - updated_layer, - }) - } - Err(e) => { - // Rollback the transaction on error - if let Err(rollback_err) = transaction.rollback().await { - error!("Failed to rollback transaction: {}", rollback_err); - } - Err(e) - } - } + dispatcher.dispatch(&layer, host).await.map( + |(updated_host, updated_layer)| DispatchResult { + updated_host, + updated_layer, + }, + ) } .into_actor(self) .map(|result, _actor, _ctx| result), @@ -158,52 +139,34 @@ impl RqdDispatcherService { }) } - /// Dispatches a layer to a specific host with proper locking and error handling. + /// Dispatches a layer to a specific host. /// - /// The dispatch process: - /// 1. Acquires an exclusive lock on the target host - /// 2. Performs the actual dispatch operation - /// 3. Ensures the host lock is always released, even on panic or failure + /// The host advisory lock is acquired per-proc as a transaction-scoped + /// lock inside [`dispatch_virtual_proc`], not at the layer level. This + /// keeps the lock hold time bounded to the per-frame DB transaction + /// (committed before the gRPC call) instead of spanning the entire + /// layer's dispatch. /// /// # Arguments /// * `layer` - The layer containing frames to dispatch /// * `host` - The target host for frame execution /// /// # Returns - /// * `Ok(())` on successful dispatch + /// * `Ok((updated_host, remaining_layer))` on successful dispatch /// * `Err(DispatchError)` on various failure conditions async fn dispatch( &self, layer: &DispatchLayer, host: Host, - transaction: &mut Transaction<'_, Postgres>, ) -> Result<(Host, DispatchLayer), DispatchError> { let host_id = host.id; let host_disp = format!("{}", &host); let layer_disp = format!("{}", &layer); - // Acquire lock first - if !self - .host_dao - .lock(transaction, &host.id) - .await - .map_err(DispatchError::Failure)? - { - return Err(DispatchError::HostLock(host.name.clone())); - } - - // Ensure unlock is always called, regardless of panics or fails let result = std::panic::AssertUnwindSafe(self.dispatch_inner(layer, host)) .catch_unwind() .await; - // Always attempt to unlock, regardless of outcome. Failing to unlock here can be ignored as - // endint the transaction will automatically unlock. - if let Err(unlock_err) = self.host_dao.unlock(transaction, &host_id).await { - trace!("Failed to unlock host {}: {}", host_disp, unlock_err); - } - - // Handle the result from dispatch_inner match result { Ok(result) => { if result.is_ok() { @@ -481,6 +444,24 @@ impl RqdDispatcherService { DispatchVirtualProcError::FailedToStartOnDb(DispatchError::DbFailure(e)) })?; + // Acquire a transaction-scoped advisory lock on the host. Using + // pg_try_advisory_xact_lock so the lock is released automatically + // when this proc transaction commits or rolls back, and concurrent + // dispatchers fail fast on the same host instead of waiting. + let host_locked = self + .host_dao + .try_xact_lock(&mut proc_transaction, &host.id) + .await + .map_err(|e| { + DispatchVirtualProcError::FailedToStartOnDb(DispatchError::Failure(e)) + })?; + if !host_locked { + let _ = proc_transaction.rollback().await; + return Err(DispatchVirtualProcError::FailedToStartOnDb( + DispatchError::HostLock(host.name.clone()), + )); + } + // Confirm the layer still has limits before proceeding if !self .layer_dao From df117ad0f42e86b3a0c64e128d25ae77a7288888 Mon Sep 17 00:00:00 2001 From: Ramon Figueiredo Date: Wed, 13 May 2026 22:26:46 -0700 Subject: [PATCH 07/10] [rust-scheduler] Shard HostCache::hosts_index lock per CoreKey bucket Addresses issue #2318 (item 1). The hosts_index was a single RwLock> that every check_out / check_in acquired. check_out took the outer read lock to scan and then re-acquired the outer write lock to remove on a hit; check_in always took the outer write lock. All operations across all core buckets serialized through this one lock. Wrap each per-CoreKey bucket in its own Arc>: - The outer RwLock now guards only the (rarely changing) bucket directory and is read on every operation. - check_out collects bucket Arcs under a brief outer read, then scans each bucket independently with its own Mutex. - check_in fast-paths into the outer read lock and an existing bucket Mutex; only the rare new-CoreKey case upgrades to outer write. Disjoint core sizes now proceed in parallel, which is the common case under mixed workloads. Same-bucket contention still exists but is bounded to short critical sections instead of the entire dispatch. Added `concurrent_check_ins_on_disjoint_buckets` test exercising the sharded paths. All 19 host_cache::cache unit tests pass. --- rust/crates/scheduler/src/host_cache/cache.rs | 221 ++++++++++++------ 1 file changed, 149 insertions(+), 72 deletions(-) diff --git a/rust/crates/scheduler/src/host_cache/cache.rs b/rust/crates/scheduler/src/host_cache/cache.rs index d41252d34..8cf2030cc 100644 --- a/rust/crates/scheduler/src/host_cache/cache.rs +++ b/rust/crates/scheduler/src/host_cache/cache.rs @@ -35,7 +35,7 @@ use std::{ cell::RefCell, collections::{BTreeMap, HashSet}, - sync::RwLock, + sync::{Arc, Mutex, RwLock}, time::{Duration, SystemTime}, }; @@ -55,9 +55,19 @@ type MemoryKey = u64; /// A B-Tree of Hosts ordered by memory pub type MemoryBTree = BTreeMap>; +/// Per-CoreKey bucket. Wrapped in an `Arc` so concurrent check-outs and +/// check-ins on disjoint core sizes don't contend with each other — the outer +/// `RwLock` only protects the (rarely changing) bucket directory. +type MemoryBucket = Arc>; + pub struct HostCache { - /// B-Tree of host groups ordered by their number of available cores - hosts_index: RwLock>, + /// Bucket directory keyed by available cores. The outer `RwLock` is read + /// on every check-out / check-in to look up a bucket and almost never + /// upgraded to write (only when a brand-new `CoreKey` shows up). Once a + /// bucket Arc is obtained, the outer lock is released; the bucket's own + /// `Mutex` guards the inner `MemoryBTree`, so different buckets are fully + /// independent. + hosts_index: RwLock>, /// If a cache stops being queried for a certain amount of time, stop keeping it up to date last_queried: RwLock, /// Marks if the data on this cache have expired @@ -206,74 +216,83 @@ impl HostCache { let mut attempts = 5; loop { - // Step 1: Find a candidate host in the index - let candidate_info = { - let host_index_lock = self.hosts_index.read().unwrap_or_else(|p| p.into_inner()); - let mut iter: Box> = - if !self.strategy.core_saturation { - // Reverse order to find hosts with max amount of cores available - Box::new(host_index_lock.range(core_key..).rev()) - } else { - Box::new(host_index_lock.range(core_key..)) - }; - - iter.find_map(|(by_core_key, hosts_by_memory)| { - let find_fn = |(by_memory_key, hosts): (&u64, &HashSet)| { - hosts.iter().find_map(|host_id| { - HOST_STORE.get(host_id).and_then(|host| { - // Check validation and memory capacity - if host_validation(&host) { - Some(( - *by_core_key, - *by_memory_key, - *host_id, - host.last_updated, - )) - } else { - None - } - }) - }) - }; - - if self.strategy.memory_saturation { - // Search for hosts with at least the same amount of memory requested - hosts_by_memory.range(memory_key..).find_map(find_fn) - } else { - // Search for hosts with the most amount of memory available - hosts_by_memory.range(memory_key..).rev().find_map(find_fn) - } - }) + // Step 1: Snapshot the candidate bucket list under the outer read + // lock — this clones a handful of Arc> handles and drops + // the outer lock immediately, so check-ins on other buckets are + // not blocked while we scan. + let bucket_arcs: Vec<(CoreKey, MemoryBucket)> = { + let outer = self.hosts_index.read().unwrap_or_else(|p| p.into_inner()); + if !self.strategy.core_saturation { + // Reverse order: prefer hosts with the most cores + outer + .range(core_key..) + .rev() + .map(|(&k, v)| (k, v.clone())) + .collect() + } else { + outer + .range(core_key..) + .map(|(&k, v)| (k, v.clone())) + .collect() + } }; - // Step 2: Attempt atomic removal if we found a candidate - if let Some((by_core_key, by_memory_key, host_id, expected_last_updated)) = + // Step 2: Scan each bucket independently. Holding only one bucket + // mutex at a time keeps disjoint core groups concurrent. + let candidate_info = + bucket_arcs + .into_iter() + .find_map(|(by_core_key, bucket)| { + let bucket_lock = bucket.lock().unwrap_or_else(|p| p.into_inner()); + let find_fn = + |(by_memory_key, hosts): (&u64, &HashSet)| { + hosts.iter().find_map(|host_id| { + HOST_STORE.get(host_id).and_then(|host| { + if host_validation(&host) { + Some(( + *by_memory_key, + *host_id, + host.last_updated, + )) + } else { + None + } + }) + }) + }; + let inner = if self.strategy.memory_saturation { + bucket_lock.range(memory_key..).find_map(find_fn) + } else { + bucket_lock.range(memory_key..).rev().find_map(find_fn) + }; + drop(bucket_lock); + inner.map(|(mk, hid, ts)| { + (by_core_key, mk, hid, ts, bucket.clone()) + }) + }); + + // Step 3: Attempt atomic removal if we found a candidate + if let Some((_by_core_key, by_memory_key, host_id, expected_last_updated, bucket)) = candidate_info { - // Atomic check-and-remove from HOST_STORE - // Ensure host is still valid when it's time to remove it match HOST_STORE.atomic_remove_if_valid( &host_id, expected_last_updated, host_validation, ) { Ok(Some(removed_host)) => { - // Successfully removed from store, now remove from index - let mut host_index_lock = - self.hosts_index.write().unwrap_or_else(|p| p.into_inner()); - - // Remove from hosts_by_core_and_memory index - host_index_lock - .get_mut(&by_core_key) - .and_then(|hosts_by_memory| hosts_by_memory.get_mut(&by_memory_key)) - .map(|hosts| hosts.remove(&host_id)); - + // Remove from this single bucket's index. Other buckets + // are untouched. + let mut bucket_lock = + bucket.lock().unwrap_or_else(|p| p.into_inner()); + if let Some(hosts) = bucket_lock.get_mut(&by_memory_key) { + hosts.remove(&host_id); + } return Some(removed_host); } Ok(None) | Err(()) => { // Host was removed by another thread. Try another candidate attempts -= 1; - // Mark the failed candidate to avoid retrying it again. failed_candidates.borrow_mut().push(host_id); if attempts <= 0 { break; @@ -309,18 +328,30 @@ impl HostCache { let core_key = last_host_version.idle_cores.value() as CoreKey; let memory_key = Self::gen_memory_key(last_host_version.idle_memory); - let mut host_index = self - .hosts_index - .write() - .unwrap_or_else(|poisoned| poisoned.into_inner()); - - // Insert at the new location - host_index - .entry(core_key) - .or_default() - .entry(memory_key) - .or_default() - .insert(host_id); + // Double-checked locking: fast path takes only the outer read lock + // and grabs an existing bucket Arc. Slow path (rare) upgrades to the + // outer write lock only when a brand-new CoreKey appears. + let bucket: MemoryBucket = { + let outer = self + .hosts_index + .read() + .unwrap_or_else(|poisoned| poisoned.into_inner()); + outer.get(&core_key).cloned() + } + .unwrap_or_else(|| { + let mut outer = self + .hosts_index + .write() + .unwrap_or_else(|poisoned| poisoned.into_inner()); + outer + .entry(core_key) + .or_insert_with(|| Arc::new(Mutex::new(BTreeMap::new()))) + .clone() + }); + + // Per-bucket mutex insert. Disjoint core sizes proceed in parallel. + let mut bucket_lock = bucket.lock().unwrap_or_else(|p| p.into_inner()); + bucket_lock.entry(memory_key).or_default().insert(host_id); } /// Generates a memory key for cache indexing by bucketing memory values. @@ -474,10 +505,13 @@ mod tests { assert!(HOST_STORE.get(&host_id).is_none()); let hosts_index = cache.hosts_index.read().unwrap(); - let left_over_host = hosts_index - .get(&core_key) - .and_then(|hosts_by_memory| hosts_by_memory.get(&memory_key)) - .and_then(|hosts| hosts.get(&checked_out_host.id)); + let left_over_host = hosts_index.get(&core_key).and_then(|bucket| { + let bucket_lock = bucket.lock().unwrap(); + bucket_lock + .get(&memory_key) + .and_then(|hosts| hosts.get(&checked_out_host.id)) + .copied() + }); assert!(left_over_host.is_none()) } @@ -653,4 +687,47 @@ mod tests { // The hosts should be different assert_ne!(result1.unwrap().id, result2.unwrap().id); } + + /// Concurrent check-ins on disjoint core buckets must not interfere — this + /// is the whole point of the sharded design. All N inserts complete and + /// land in distinct buckets. + #[test] + fn concurrent_check_ins_on_disjoint_buckets() { + use std::sync::Arc as StdArc; + + let cache = StdArc::new(HostCache::default()); + let mut handles = Vec::new(); + // Each thread checks in a host with a unique core count → distinct + // bucket → no per-bucket contention. + for cores in 1i32..=8 { + let cache = cache.clone(); + handles.push(thread::spawn(move || { + let host_id = Uuid::new_v4(); + let host = create_test_host(host_id, cores, ByteSize::gb(8)); + cache.check_in(host, false); + host_id + })); + } + + let host_ids: Vec = handles + .into_iter() + .map(|h| h.join().expect("worker panicked")) + .collect(); + + // Every bucket should be populated. + let outer = cache.hosts_index.read().unwrap(); + assert_eq!(outer.len(), 8, "one bucket per core size"); + for (core_key, bucket) in outer.iter() { + let bucket_lock = bucket.lock().unwrap(); + let total_hosts: usize = bucket_lock.values().map(|set| set.len()).sum(); + assert_eq!( + total_hosts, 1, + "bucket at core_key={core_key} should contain exactly one host" + ); + } + // And every checked-in host id is reachable. + for id in host_ids { + assert!(HOST_STORE.get(&id).is_some(), "host {id} present in store"); + } + } } From acd41dac67a0a517019f9b4d779ea52cbd2613a3 Mon Sep 17 00:00:00 2001 From: Ramon Figueiredo Date: Wed, 13 May 2026 22:33:28 -0700 Subject: [PATCH 08/10] [rust-scheduler] Replace LayerPermitService with FOR UPDATE SKIP LOCKED MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Addresses issue #2318 (item 2). The in-memory LayerPermitService was an Actix actor that issued time-limited permits keyed on layer ID to dedupe concurrent layer dispatches. The permit could only deduplicate within a single scheduler process, so multi-replica deployments would happily race each other on the same layer. Replace it with a per-layer row lock acquired via `SELECT pk_layer FROM layer WHERE pk_layer = $1 FOR UPDATE SKIP LOCKED` held in a transaction for the duration of layer processing: - Multi-replica coordination is now handled by Postgres directly. - LayerLockGuard's transaction is committed (or implicitly rolled back) when processing finishes, releasing the row lock. - LayerPermitService and its Actix message types are deleted entirely. Each concurrent layer dispatch now holds two DB connections (the lock transaction plus the per-proc transaction), so the matcher's concurrency_semaphore is halved from (pool_size - 1) to (pool_size / 2 - 1) — at minimum 1 — to keep us inside the pool budget. --- rust/crates/scheduler/src/dao/layer_dao.rs | 48 ++++ .../scheduler/src/pipeline/layer_permit.rs | 226 ------------------ rust/crates/scheduler/src/pipeline/matcher.rs | 99 ++++---- rust/crates/scheduler/src/pipeline/mod.rs | 1 - 4 files changed, 93 insertions(+), 281 deletions(-) delete mode 100644 rust/crates/scheduler/src/pipeline/layer_permit.rs diff --git a/rust/crates/scheduler/src/dao/layer_dao.rs b/rust/crates/scheduler/src/dao/layer_dao.rs index 965a478a8..80979fef7 100644 --- a/rust/crates/scheduler/src/dao/layer_dao.rs +++ b/rust/crates/scheduler/src/dao/layer_dao.rs @@ -289,6 +289,23 @@ ORDER BY lf.int_layer_order "#; +/// Guard returned by [`LayerDao::try_lock_layer`] holding an open transaction +/// that has a row-level lock on the layer. Drop / commit the guard to release +/// the lock so other schedulers can pick the layer up. +pub struct LayerLockGuard { + transaction: sqlx::Transaction<'static, Postgres>, +} + +impl LayerLockGuard { + /// Releases the lock by committing the underlying empty transaction. + /// Dropping the guard without calling this also releases (via implicit + /// rollback) but that path logs an extra rollback failure on some + /// sqlx versions; prefer this explicit release on the happy path. + pub async fn release(self) -> Result<(), sqlx::Error> { + self.transaction.commit().await + } +} + impl LayerDao { /// Creates a new LayerDao from database configuration. /// @@ -339,6 +356,37 @@ impl LayerDao { Ok(self.group_layers_and_frames(combined_models)) } + /// Attempts to acquire a row-level lock on a layer via + /// `SELECT … FOR UPDATE SKIP LOCKED`. + /// + /// Used to deduplicate dispatch attempts across concurrent scheduler + /// processes / replicas — if another scheduler already holds the row's + /// lock, this returns `Ok(None)` immediately (no waiting) and the caller + /// should skip the layer. + /// + /// The lock is scoped to the returned [`LayerLockGuard`]'s transaction and + /// is released when the guard is committed or dropped. + pub async fn try_lock_layer( + &self, + layer_id: Uuid, + ) -> Result, sqlx::Error> { + let mut tx = self.connection_pool.begin().await?; + let locked: Option = sqlx::query_scalar( + "SELECT pk_layer FROM layer WHERE pk_layer = $1 FOR UPDATE SKIP LOCKED", + ) + .bind(layer_id.to_string()) + .fetch_optional(&mut *tx) + .await?; + + if locked.is_some() { + Ok(Some(LayerLockGuard { transaction: tx })) + } else { + // No row returned means another transaction already locked it. + let _ = tx.rollback().await; + Ok(None) + } + } + /// Groups flat query results into layers with their associated frames. /// /// Transforms the denormalized query results into a structured hierarchy diff --git a/rust/crates/scheduler/src/pipeline/layer_permit.rs b/rust/crates/scheduler/src/pipeline/layer_permit.rs deleted file mode 100644 index 72db23f75..000000000 --- a/rust/crates/scheduler/src/pipeline/layer_permit.rs +++ /dev/null @@ -1,226 +0,0 @@ -// Copyright Contributors to the OpenCue Project -// -// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except -// in compliance with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software distributed under the License -// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express -// or implied. See the License for the specific language governing permissions and limitations under -// the License. - -use actix::{Actor, AsyncContext, Handler, Message, WrapFuture}; -use scc::HashMap; -use std::{ - sync::Arc, - time::{Duration, SystemTime}, -}; -use tokio::sync::OnceCell; -use tracing::{debug, info}; -use uuid::Uuid; - -use miette::Result; - -/// Actor message to request a permit for a layer. -/// -/// Requests a permit to process a specific layer. If the layer is already -/// being processed by another task (permit hasn't expired), returns false. -/// Otherwise, grants the permit and returns true. -/// -/// # Fields -/// -/// * `id` - Unique identifier for the layer -/// * `duration` - How long the permit should be valid -/// -/// # Returns -/// -/// * `bool` - true if permit was granted, false if layer is already locked -#[derive(Message)] -#[rtype(result = "bool")] -pub struct Request { - pub id: Uuid, - pub duration: Duration, -} - -/// Actor message to release a permit for a layer. -/// -/// # Fields -/// -/// * `id` - Unique identifier for the layer -/// -/// # Returns -/// -/// * `bool` - true if permit was release, false if there wasn't a valid permit -#[derive(Message)] -#[rtype(result = "bool")] -pub struct Release { - pub id: Uuid, -} - -/// Internal representation of a layer permit. -/// -/// Tracks when a permit was issued and how long it's valid for. -struct LayerPermit { - granted_at: SystemTime, - duration: Duration, -} - -impl LayerPermit { - /// Creates a new permit with the specified duration. - fn new(duration: Duration) -> Self { - LayerPermit { - granted_at: SystemTime::now(), - duration, - } - } - - /// Checks if the permit has expired. - fn expired(&self) -> bool { - self.granted_at.elapsed().unwrap_or_default() > self.duration - } -} - -/// Service for managing layer processing permits using the Actor model. -/// -/// Prevents multiple tasks from processing the same layer concurrently by -/// issuing time-limited permits. Each layer ID can only have one active -/// permit at a time. -#[derive(Clone)] -pub struct LayerPermitService { - permits: Arc>, -} - -impl Actor for LayerPermitService { - type Context = actix::Context; - - fn started(&mut self, ctx: &mut Self::Context) { - let service = self.clone(); - - // Run cleanup every 5 minutes - ctx.run_interval(Duration::from_secs(5 * 60), move |_act, ctx| { - let service = service.clone(); - let actor_clone = service.clone(); - ctx.spawn( - async move { service.cleanup_expired_permits().await }.into_actor(&actor_clone), - ); - }); - - info!("LayerPermitService actor started"); - } - - fn stopped(&mut self, _ctx: &mut Self::Context) { - info!("LayerPermitService actor stopped"); - } -} - -impl Handler for LayerPermitService { - type Result = bool; - - fn handle(&mut self, msg: Request, _ctx: &mut Self::Context) -> Self::Result { - let Request { id, duration } = msg; - - // Check if there's an existing permit - let existing = self.permits.read_sync(&id, |_, permit| { - if permit.expired() { - // Permit exists but has expired - None - } else { - // Permit exists and is still valid - Some(()) - } - }); - - match existing { - Some(Some(())) => { - // Valid permit already exists - deny request - debug!("Layer {} already has an active permit", id); - false - } - _ => { - // No valid permit exists - grant new permit - let new_permit = LayerPermit::new(duration); - let _ = self.permits.insert_sync(id, new_permit); - debug!("Granted permit for layer {} (duration: {:?})", id, duration); - true - } - } - } -} - -impl Handler for LayerPermitService { - type Result = bool; - - fn handle(&mut self, msg: Release, _ctx: &mut Self::Context) -> Self::Result { - let Release { id } = msg; - - // Check if there's an existing permit - let existing = self.permits.remove_sync(&id); - - match existing { - Some((_, permit)) if !permit.expired() => { - // Valid permit removed - true - } - _ => { - // No valid permit found - false - } - } - } -} - -impl LayerPermitService { - /// Creates a new LayerPermitService with an empty permit map. - pub fn new() -> Self { - LayerPermitService { - permits: Arc::new(HashMap::new()), - } - } - - /// Removes expired permits from the map. - /// - /// Runs periodically to prevent unbounded growth of the permit map. - async fn cleanup_expired_permits(&self) { - let mut expired_keys = Vec::new(); - - // Collect expired permit IDs - self.permits.iter_sync(|id, permit| { - if permit.expired() { - expired_keys.push(*id); - } - true - }); - - // Remove expired permits - for id in &expired_keys { - let _ = self.permits.remove_sync(id); - } - - if !expired_keys.is_empty() { - debug!("Cleaned up {} expired layer permits", expired_keys.len()); - } - } -} - -static LAYER_PERMIT_SERVICE: OnceCell> = OnceCell::const_new(); - -/// Gets or initializes the singleton layer permit service actor. -/// -/// Returns a shared reference to the LayerPermitService actor, creating it -/// if it doesn't exist. The service manages layer processing permits to -/// prevent concurrent processing of the same layer. -/// -/// # Returns -/// -/// * `Ok(Addr)` - Actor address for sending messages -/// * `Err(miette::Error)` - Failed to initialize the service -pub async fn layer_permit_service() -> Result> { - LAYER_PERMIT_SERVICE - .get_or_try_init(|| async { - let service = LayerPermitService::new().start(); - Ok(service) - }) - .await - .cloned() -} diff --git a/rust/crates/scheduler/src/pipeline/matcher.rs b/rust/crates/scheduler/src/pipeline/matcher.rs index 25f49cac7..b52f0738f 100644 --- a/rust/crates/scheduler/src/pipeline/matcher.rs +++ b/rust/crates/scheduler/src/pipeline/matcher.rs @@ -10,12 +10,9 @@ // or implied. See the License for the specific language governing permissions and limitations under // the License. -use std::{ - sync::{ - atomic::{AtomicUsize, Ordering}, - Arc, - }, - time::Duration, +use std::sync::{ + atomic::{AtomicUsize, Ordering}, + Arc, }; use uuid::Uuid; @@ -28,13 +25,10 @@ use crate::{ host_cache::{host_cache_service, messages::*, HostCacheService}, metrics, models::{CoreSize, DispatchJob, DispatchLayer, Host}, - pipeline::{ - dispatcher::{ - error::DispatchError, - messages::{DispatchLayerMessage, DispatchResult}, - rqd_dispatcher_service, RqdDispatcherService, - }, - layer_permit::{layer_permit_service, LayerPermitService, Release, Request}, + pipeline::dispatcher::{ + error::DispatchError, + messages::{DispatchLayerMessage, DispatchResult}, + rqd_dispatcher_service, RqdDispatcherService, }, resource_accounting::{resource_accounting_service, ResourceAccountingService}, }; @@ -55,7 +49,6 @@ pub static WASTED_ATTEMPTS: AtomicUsize = AtomicUsize::new(0); /// - Dispatching frames to selected hosts via the RQD dispatcher pub struct MatchingService { host_service: Addr, - layer_permit_service: Addr, layer_dao: LayerDao, dispatcher_service: Addr, concurrency_semaphore: Arc, @@ -78,11 +71,13 @@ impl MatchingService { pub async fn new() -> Result { let layer_dao = LayerDao::new().await?; let host_service = host_cache_service().await?; - let layer_permit_service = layer_permit_service().await?; - // Limiting the concurrency here is necessary to avoid consuming the entire - // database connection pool - let max_concurrent_transactions = (CONFIG.database.pool_size as usize).saturating_sub(1); + // Each concurrent layer now holds two connections: one for the + // SKIP LOCKED layer lock (held for the whole dispatch) and one for + // its per-proc transaction. Halve the semaphore relative to the + // pool so we don't risk exhausting it. + let pool_size = CONFIG.database.pool_size as usize; + let max_concurrent_transactions = (pool_size / 2).saturating_sub(1).max(1); let dispatcher_service = rqd_dispatcher_service().await?; let resource_accounting_service = resource_accounting_service() @@ -91,7 +86,6 @@ impl MatchingService { Ok(MatchingService { host_service, - layer_permit_service, layer_dao, dispatcher_service, concurrency_semaphore: Arc::new(Semaphore::new(max_concurrent_transactions)), @@ -128,8 +122,9 @@ impl MatchingService { // Stream elegible layers from this job and dispatch one by one for layer in layers { let layer_disp = format!("{}", layer); - // Limiting the concurrency here is necessary to avoid consuming the entire - // database connection pool + // Limit concurrent layer processing to stay within the DB + // connection budget — each layer holds a SKIP-LOCKED lock + // transaction plus its per-proc transactions. let _permit = self .concurrency_semaphore .acquire() @@ -138,40 +133,36 @@ impl MatchingService { let cluster = cluster.clone(); - // Holding a permit for a layer is intended to eliminate a race condition - // between concurrent cluster_rounds attempting to process the same layer. - // The race condition is mitigated, but not complitely avoided, as the permit - // is acquired after the layers and frames have been queried. Acquiring the - // permit before querying would require breaking 'query_layers' into separate - // queries, one per layer, which greatly impacts performance. The rare cases - // that race each other are controlled by the frame.int_version lock on - // frame_dao.lock_for_update - let layer_permit = self - .layer_permit_service - .send(Request { - id: layer.id, - duration: Duration::from_secs(2 * layer.frames.len() as u64), - }) - .await - .expect("Layer permit service is not available"); - - if layer_permit { - let layer_id = layer.id; - self.process_layer(layer, cluster).await; - debug!("{}: Processed layer", layer_disp); - - self.layer_permit_service - .send(Release { id: layer_id }) - .await - .expect("Layer permit service is not available"); - - processed_layers.fetch_add(1, Ordering::Relaxed); - } else { - debug!( - "Layer skipped. {} already being processed by another task.", - layer - ); + // Try to acquire a row-level SELECT … FOR UPDATE SKIP LOCKED + // on the layer. This both deduplicates dispatch across + // concurrent schedulers (single-process or multi-replica) + // and replaces the in-memory LayerPermitService. + let lock_guard = match self.layer_dao.try_lock_layer(layer.id).await { + Ok(Some(guard)) => guard, + Ok(None) => { + debug!( + "Layer skipped. {} already being processed by another scheduler.", + layer + ); + continue; + } + Err(err) => { + error!("Failed to lock layer {}: {:?}", layer_disp, err); + continue; + } + }; + + self.process_layer(layer, cluster).await; + debug!("{}: Processed layer", layer_disp); + + if let Err(err) = lock_guard.release().await { + // Releasing only fails if the connection is dead, in which + // case sqlx has already rolled back implicitly so the lock + // is gone — log and move on. + warn!("Failed to release layer lock for {}: {:?}", layer_disp, err); } + + processed_layers.fetch_add(1, Ordering::Relaxed); } if processed_layers.load(Ordering::Relaxed) == 0 { diff --git a/rust/crates/scheduler/src/pipeline/mod.rs b/rust/crates/scheduler/src/pipeline/mod.rs index 9a34c0db8..53c2ecd1d 100644 --- a/rust/crates/scheduler/src/pipeline/mod.rs +++ b/rust/crates/scheduler/src/pipeline/mod.rs @@ -12,7 +12,6 @@ mod dispatcher; pub mod entrypoint; -mod layer_permit; mod matcher; pub use entrypoint::run; From 505b55c7ab7b53ee43fd25ca1ebf620307711a2c Mon Sep 17 00:00:00 2001 From: Ramon Figueiredo Date: Wed, 13 May 2026 22:39:03 -0700 Subject: [PATCH 09/10] [rust-scheduler] Drop Actix actor layer MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Addresses issue #2318 (item 5). HostCacheService and RqdDispatcherService were Actix actors whose Handler impls did nothing but await an async block — no actor-private state, no message ordering, no supervision. Every call paid for an extra mailbox round-trip, a heap-allocated message envelope, and a ResponseActFuture boxing per send. Refactor both to plain `#[derive(Clone)]` structs (already Arc-wrapped internally) with regular async methods: - HostCacheService now exposes `check_out`, `check_in_payload`, and `cache_ratio` as `pub` async / sync methods. The periodic refresh and cleanup loops live in `spawn_background_tasks`, started by the singleton initializer. - RqdDispatcherService exposes `dispatch_layer` as a pub async method. - The OnceCell singletons now hold the service value directly (cheap Clone) instead of an `Addr<...>`. Matcher and main.rs: - MatchingService fields lose their `Addr<...>` wrapper. - Matcher call sites become direct `.method(...).await` calls; the `Actor is unresponsive` expects are gone since no mailbox exists. - main.rs no longer constructs an actix::System; it just block_on's the Tokio runtime directly. The redundant `System::current().stop()` is removed. Cargo.toml: - Remove the `actix = "0.13"` dependency. Messages modules retain the data types that are still useful as parameter / response grouping (`CheckedOutHost`, `CheckInPayload`, `CacheRatioResponse`, `DispatchLayerMessage`, `DispatchResult`); the now-redundant `CheckOut`, `CheckIn`, and `CacheRatio` marker structs are deleted along with their actix `Message` / `MessageResponse` derives. stress_tests.rs swaps `#[actix::test]` for `#[tokio::test]` so the (pre-existing) feature-gated smoke tests at least target the current toolchain. --- rust/crates/scheduler/Cargo.toml | 1 - rust/crates/scheduler/src/host_cache/actor.rs | 103 +++++----------- .../scheduler/src/host_cache/messages.rs | 114 ++---------------- rust/crates/scheduler/src/host_cache/mod.rs | 40 ++---- rust/crates/scheduler/src/main.rs | 11 +- .../src/pipeline/dispatcher/actor.rs | 56 +++------ .../src/pipeline/dispatcher/messages.rs | 12 +- .../scheduler/src/pipeline/dispatcher/mod.rs | 44 ++----- rust/crates/scheduler/src/pipeline/matcher.rs | 44 +++---- rust/crates/scheduler/tests/stress_tests.rs | 2 +- 10 files changed, 99 insertions(+), 328 deletions(-) diff --git a/rust/crates/scheduler/Cargo.toml b/rust/crates/scheduler/Cargo.toml index b2247f8c0..5d416b17b 100644 --- a/rust/crates/scheduler/Cargo.toml +++ b/rust/crates/scheduler/Cargo.toml @@ -18,7 +18,6 @@ debug = true opencue-proto = { path = "../opencue-proto" } # External Dependencies -actix = "0.13" chrono = "0.4.38" futures = { workspace = true } scc = "3.1" diff --git a/rust/crates/scheduler/src/host_cache/actor.rs b/rust/crates/scheduler/src/host_cache/actor.rs index aebf4f536..e96d2eb38 100644 --- a/rust/crates/scheduler/src/host_cache/actor.rs +++ b/rust/crates/scheduler/src/host_cache/actor.rs @@ -10,8 +10,6 @@ // or implied. See the License for the specific language governing permissions and limitations under // the License. -use actix::{Actor, ActorFutureExt, AsyncContext, Handler, ResponseActFuture, WrapFuture}; - use bytesize::ByteSize; use itertools::Itertools; use miette::IntoDiagnostic; @@ -147,93 +145,56 @@ impl HostReservation { } } -impl Actor for HostCacheService { - type Context = actix::Context; - - fn started(&mut self, ctx: &mut Self::Context) { - let service_for_monitor = self.clone(); - let service_for_clean_up = self.clone(); - - ctx.run_interval(CONFIG.host_cache.monitoring_interval, move |_act, ctx| { - let service = service_for_monitor.clone(); - let actor_clone = service.clone(); - ctx.spawn(async move { service.refresh_cache().await }.into_actor(&actor_clone)); +impl HostCacheService { + /// Spawns the background tasks that periodically refresh and prune the + /// cache. Returns immediately; the spawned tasks live for the lifetime + /// of the process. + pub fn spawn_background_tasks(&self) { + let monitor = self.clone(); + tokio::spawn(async move { + let mut interval = tokio::time::interval(CONFIG.host_cache.monitoring_interval); + interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip); + // First tick fires immediately; skip it to mirror Actix + // `run_interval` semantics which fire after the first interval. + interval.tick().await; + loop { + interval.tick().await; + monitor.refresh_cache().await; + } }); - ctx.run_interval(CONFIG.host_cache.clean_up_interval, move |_act, _ctx| { - let service = service_for_clean_up.clone(); - - // Clean up stale hosts from the host store - service.cleanup_stale_hosts(); + let cleaner = self.clone(); + tokio::spawn(async move { + let mut interval = tokio::time::interval(CONFIG.host_cache.clean_up_interval); + interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip); + interval.tick().await; + loop { + interval.tick().await; + cleaner.cleanup_stale_hosts(); + } }); - info!("HostCacheService actor started"); - } - - fn stopped(&mut self, _ctx: &mut Self::Context) { - info!("HostCacheService actor stopped"); - } -} - -impl Handler> for HostCacheService -where - F: Fn(&Host) -> bool + 'static, -{ - type Result = ResponseActFuture>; - - fn handle(&mut self, msg: CheckOut, _ctx: &mut Self::Context) -> Self::Result { - let CheckOut { - facility_id, - show_id, - tags, - cores, - memory, - validation, - } = msg; - - let service = self.clone(); - - Box::pin( - async move { - let out = service - .check_out(facility_id, show_id, tags, cores, memory, validation) - .await; - if let Ok(host) = &out { - debug!("Checked out {}", host.1); - } - out - } - .into_actor(self) - .map(|result, _, _| result), - ) + info!("HostCacheService background tasks spawned"); } -} -impl Handler for HostCacheService { - type Result = (); - - fn handle(&mut self, msg: CheckIn, _ctx: &mut Self::Context) -> Self::Result { - let CheckIn(cluster_key, payload) = msg; + /// Public wrapper around the internal `check_in` that adds debug logging + /// and handles the Invalidate path uniformly. + pub fn check_in_payload(&self, cluster_key: ClusterKey, payload: CheckInPayload) { match payload { CheckInPayload::Host(host) => { let host_str = format!("{}", host); self.check_in(cluster_key, host); - debug!("Checked in {}", &host_str); } CheckInPayload::Invalidate(host_id) => { let _ = self.reserved_hosts.remove_sync(&host_id); - debug!("Checked in {} (invalid)", &host_id); } } } -} - -impl Handler for HostCacheService { - type Result = CacheRatioResponse; - fn handle(&mut self, _msg: CacheRatio, _ctx: &mut Self::Context) -> Self::Result { + /// Returns the current cache hit/miss snapshot. + pub fn cache_ratio(&self) -> CacheRatioResponse { CacheRatioResponse { hit: self.cache_hit.load(atomic::Ordering::Relaxed), miss: self.cache_miss.load(atomic::Ordering::Relaxed), @@ -285,7 +246,7 @@ impl HostCacheService { /// /// * `Ok(CheckedOutHost)` - Host with cluster key /// * `Err(HostCacheError)` - No suitable host found or database error - async fn check_out( + pub async fn check_out( &self, facility_id: Uuid, show_id: Uuid, diff --git a/rust/crates/scheduler/src/host_cache/messages.rs b/rust/crates/scheduler/src/host_cache/messages.rs index bb25ced14..c2bbafbcc 100644 --- a/rust/crates/scheduler/src/host_cache/messages.rs +++ b/rust/crates/scheduler/src/host_cache/messages.rs @@ -10,123 +10,25 @@ // or implied. See the License for the specific language governing permissions and limitations under // the License. -use actix::{Message, MessageResponse}; - -use bytesize::ByteSize; -use miette::Result; use uuid::Uuid; -use crate::{ - cluster_key::{ClusterKey, Tag}, - host_cache::HostCacheError, - models::{CoreSize, Host}, -}; +use crate::{cluster_key::ClusterKey, models::Host}; -/// Response containing a checked-out host and its associated cluster key. -/// -/// Returned when a host is successfully checked out from the cache. The cluster -/// key is needed to return the host to the correct cache group after use. +/// Result of a successful host check-out. /// -/// # Fields -/// -/// * `0` - ClusterKey identifying the cache group this host belongs to -/// * `1` - Host with reserved resources -#[derive(MessageResponse)] +/// The cluster key is needed to return the host to the correct cache group +/// after use. pub struct CheckedOutHost(pub ClusterKey, pub Host); -/// Actor message to check out a host from the cache. -/// -/// Requests a host that matches the specified resource requirements and passes -/// the validation function. The cache will search through groups for each tag -/// in priority order (MANUAL > HOSTNAME > ALLOC) until a suitable host is found. -/// -/// If not found in cache, the service will fetch from the database. The host -/// is removed from the cache and must be checked back in after use. -/// -/// # Fields -/// -/// * `facility_id` - Facility identifier for the cluster key -/// * `show_id` - Show identifier for the cluster key -/// * `tags` - List of tags to search (tried in priority order) -/// * `cores` - Minimum number of cores required -/// * `memory` - Minimum memory required -/// * `validation` - Function to validate additional host requirements -/// -/// # Returns -/// -/// * `Ok(CheckedOutHost)` - Successfully found and reserved a matching host -/// * `Err(HostCacheError::NoCandidateAvailable)` - No host meets requirements -/// * `Err(HostCacheError::FailedToQueryHostCache)` - Database query failed -#[derive(Message)] -#[rtype(result = "Result")] -pub struct CheckOut -where - F: Fn(&Host) -> bool, -{ - pub facility_id: Uuid, - pub show_id: Uuid, - pub tags: Vec, - pub cores: CoreSize, - pub memory: ByteSize, - pub validation: F, -} - -/// Payload for checking in a host or invalidating a host in the cache. -/// -/// Allows either returning a host with updated resources to the cache or -/// invalidating a host by its id, removing it from the cache entirely. -/// -/// # Variants -/// -/// * `Host(Host)` - Return a host with updated idle resource counts -/// * `Invalidate(Uuid)` - Invalidate and remove a host by id +/// Argument to [`HostCacheService::check_in_payload`] — either return a host +/// with updated resources, or invalidate a host by id (removing any +/// outstanding reservation but not re-adding the host to the cache). pub enum CheckInPayload { Host(Host), Invalidate(Uuid), } -/// Actor message to return a host to the cache or invalidate it. -/// -/// Returns a host back to its cache group with updated resource state, or -/// invalidates a host by id, removing it from the cache. When returning -/// a host, it is removed from the reservation list and becomes available for -/// checkout again. If the cache group has expired, the host is dropped. -/// -/// # Fields -/// -/// * `0` - ClusterKey identifying which cache group the host belongs to -/// * `1` - CheckInPayload specifying whether to return a host or invalidate by id -/// -/// # Returns -/// -/// * `()` - Operation completed successfully (host returned/invalidated or cache group expired) -#[derive(Message)] -#[rtype(result = "()")] -pub struct CheckIn(pub ClusterKey, pub CheckInPayload); - -/// Actor message to retrieve cache performance metrics. -/// -/// Requests the current cache hit/miss statistics from the HostCacheService. -/// Used for monitoring cache effectiveness. -/// -/// # Returns -/// -/// * `CacheRatioResponse` - Cache performance metrics -#[derive(Message)] -#[rtype(result = CacheRatioResponse)] -pub struct CacheRatio; - -/// Response containing cache performance statistics. -/// -/// Provides metrics about cache hit/miss rates for monitoring cache effectiveness. -/// A high hit ratio indicates the cache is effectively reducing database queries. -/// -/// # Fields -/// -/// * `hit` - Total number of cache hits (hosts found in cache) -/// * `miss` - Total number of cache misses (required database fetch) -/// * `hit_ratio` - Percentage of cache hits (0-100) -#[derive(MessageResponse)] +/// Snapshot of host-cache hit/miss statistics. pub struct CacheRatioResponse { #[allow(dead_code)] pub hit: u64, diff --git a/rust/crates/scheduler/src/host_cache/mod.rs b/rust/crates/scheduler/src/host_cache/mod.rs index e04e91a8d..04125bdb1 100644 --- a/rust/crates/scheduler/src/host_cache/mod.rs +++ b/rust/crates/scheduler/src/host_cache/mod.rs @@ -15,7 +15,6 @@ mod cache; pub mod messages; mod store; -use actix::{Actor, Addr}; pub use cache::HostCache; use miette::Diagnostic; @@ -25,29 +24,21 @@ use uuid::Uuid; use miette::Result; use tokio::sync::OnceCell; -use crate::host_cache::messages::CacheRatio; - pub use actor::HostCacheService; pub type HostId = Uuid; -static HOST_CACHE: OnceCell> = OnceCell::const_new(); +static HOST_CACHE: OnceCell = OnceCell::const_new(); -/// Gets or initializes the singleton host cache service actor. -/// -/// Returns a shared reference to the HostCacheService actor, creating it -/// if it doesn't exist. The service manages host availability caching and -/// checkout/checkin operations. -/// -/// # Returns +/// Gets or initializes the singleton host cache service. /// -/// * `Ok(Addr)` - Actor address for sending messages -/// * `Err(miette::Error)` - Failed to initialize the service -pub async fn host_cache_service() -> Result> { +/// Returns a cloned handle to the service. `HostCacheService` is `Clone` and +/// internally `Arc`-shared, so the returned value is cheap to pass around. +pub async fn host_cache_service() -> Result { HOST_CACHE .get_or_try_init(|| async { - let service = HostCacheService::new().await?.start(); - + let service = HostCacheService::new().await?; + service.spawn_background_tasks(); Ok(service) }) .await @@ -57,22 +48,11 @@ pub async fn host_cache_service() -> Result> { /// Retrieves the current cache hit ratio as a percentage. /// /// Returns the ratio of cache hits to total cache accesses (hits + misses) -/// as a percentage value between 0 and 100. -/// -/// # Returns -/// -/// * `usize` - Cache hit ratio percentage (0-100), or 0 if service unavailable +/// as a percentage value between 0 and 100, or 0 if the service is unavailable. #[allow(dead_code)] pub async fn hit_ratio() -> usize { - let host_cache = host_cache_service().await; - match host_cache { - Ok(cache) => { - cache - .send(CacheRatio) - .await - .expect("Actor is offline") - .hit_ratio - } + match host_cache_service().await { + Ok(cache) => cache.cache_ratio().hit_ratio, Err(_) => 0, } } diff --git a/rust/crates/scheduler/src/main.rs b/rust/crates/scheduler/src/main.rs index 842853c63..8844b15f5 100644 --- a/rust/crates/scheduler/src/main.rs +++ b/rust/crates/scheduler/src/main.rs @@ -263,10 +263,7 @@ fn main() -> miette::Result<()> { .build() .into_diagnostic()?; - // Spawn the actor system in the background - let actor_system = actix::System::with_tokio_rt(|| runtime); - - actor_system.block_on(async_main()) + runtime.block_on(async_main()) } async fn async_main() -> miette::Result<()> { @@ -363,9 +360,5 @@ async fn async_main() -> miette::Result<()> { }); let opts = JobQueueCli::from_args(); - let result = opts.run().await; - - actix::System::current().stop(); - - result + opts.run().await } diff --git a/rust/crates/scheduler/src/pipeline/dispatcher/actor.rs b/rust/crates/scheduler/src/pipeline/dispatcher/actor.rs index ac5babc6f..cd1bb2aec 100644 --- a/rust/crates/scheduler/src/pipeline/dispatcher/actor.rs +++ b/rust/crates/scheduler/src/pipeline/dispatcher/actor.rs @@ -10,7 +10,6 @@ // or implied. See the License for the specific language governing permissions and limitations under // the License. -use actix::{Actor, ActorFutureExt, Handler, ResponseActFuture, WrapFuture}; use bytesize::{ByteSize, KIB, MIB}; use chrono::Utc; use futures::FutureExt; @@ -62,47 +61,28 @@ pub struct RqdDispatcherService { dry_run_mode: bool, } -impl Actor for RqdDispatcherService { - type Context = actix::Context; - - fn started(&mut self, _ctx: &mut Self::Context) { - info!("RqdDispatcherService actor started"); - } - - fn stopped(&mut self, _ctx: &mut Self::Context) { - info!("RqdDispatcherService actor stopped"); - } -} - -impl Handler for RqdDispatcherService { - type Result = ResponseActFuture>; - - fn handle(&mut self, msg: DispatchLayerMessage, _ctx: &mut Self::Context) -> Self::Result { +impl RqdDispatcherService { + /// Dispatches a layer's frames to the given host. + /// + /// The host advisory lock is acquired inside the per-proc transaction + /// (see [`dispatch_virtual_proc`]), not at the layer level, so it + /// auto-releases on commit/rollback and doesn't pin a connection during + /// the gRPC call. + pub async fn dispatch_layer( + &self, + msg: DispatchLayerMessage, + ) -> Result { let DispatchLayerMessage { layer, host } = msg; - - let dispatcher = self.clone(); debug!( - "Received dispatch message for layer {} on host {}", + "Received dispatch request for layer {} on host {}", layer.layer_name, host.name ); - - // No outer transaction here: the host advisory lock now lives inside - // each per-proc transaction (see dispatch_virtual_proc) as a - // transaction-scoped lock that auto-releases on commit/rollback. The - // outer tx previously held an idle connection across all gRPC calls - // for no DB work of its own. - Box::pin( - async move { - dispatcher.dispatch(&layer, host).await.map( - |(updated_host, updated_layer)| DispatchResult { - updated_host, - updated_layer, - }, - ) - } - .into_actor(self) - .map(|result, _actor, _ctx| result), - ) + self.dispatch(&layer, host) + .await + .map(|(updated_host, updated_layer)| DispatchResult { + updated_host, + updated_layer, + }) } } diff --git a/rust/crates/scheduler/src/pipeline/dispatcher/messages.rs b/rust/crates/scheduler/src/pipeline/dispatcher/messages.rs index 1c4415ece..fec8be56e 100644 --- a/rust/crates/scheduler/src/pipeline/dispatcher/messages.rs +++ b/rust/crates/scheduler/src/pipeline/dispatcher/messages.rs @@ -10,13 +10,7 @@ // or implied. See the License for the specific language governing permissions and limitations under // the License. -use actix::{Message, MessageResponse}; -use miette::Result; - -use crate::{ - models::{DispatchLayer, Host}, - pipeline::dispatcher::error::DispatchError, -}; +use crate::models::{DispatchLayer, Host}; /// Actor message to dispatch a layer's frames to a specific host. /// @@ -37,8 +31,6 @@ use crate::{ /// /// * `Ok(DispatchResult)` - Successfully dispatched frames with updated state /// * `Err(DispatchError)` - Dispatch failed due to various errors -#[derive(Message)] -#[rtype(result = "Result")] pub struct DispatchLayerMessage { pub layer: DispatchLayer, pub host: Host, @@ -55,7 +47,7 @@ pub struct DispatchLayerMessage { /// * `updated_host` - Host with updated idle resource counts after dispatch /// * `updated_layer` - Layer with dispatched frames removed from the frames list /// * `dispatched_frames` - List of frame names that were successfully dispatched -#[derive(MessageResponse, Debug)] +#[derive(Debug)] pub struct DispatchResult { pub updated_host: Host, pub updated_layer: DispatchLayer, diff --git a/rust/crates/scheduler/src/pipeline/dispatcher/mod.rs b/rust/crates/scheduler/src/pipeline/dispatcher/mod.rs index d524f5b93..529f2fece 100644 --- a/rust/crates/scheduler/src/pipeline/dispatcher/mod.rs +++ b/rust/crates/scheduler/src/pipeline/dispatcher/mod.rs @@ -18,46 +18,18 @@ pub mod messages; use miette::Result; use std::sync::Arc; -// Actor and singleton support -use actix::{Actor, Addr}; pub use actor::RqdDispatcherService; use tokio::sync::OnceCell; use crate::dao::{LayerDao, ProcDao}; -static RQD_DISPATCHER: OnceCell> = OnceCell::const_new(); +static RQD_DISPATCHER: OnceCell = OnceCell::const_new(); -/// Singleton getter for the RQD dispatcher service +/// Singleton getter for the RQD dispatcher service. /// -/// Creates and returns a singleton instance of the RqdDispatcherService actor. -/// The service is initialized with configuration from CONFIG on first access. -/// -/// # Usage Example -/// ```rust,ignore -/// use crate::pipeline::dispatcher::{rqd_dispatcher_service, messages::DispatchLayer}; -/// use crate::models::{DispatchLayer as ModelDispatchLayer, Host}; -/// -/// async fn dispatch_example() -> miette::Result<()> { -/// let dispatcher = rqd_dispatcher_service().await?; -/// -/// let message = DispatchLayer { -/// layer: my_layer, -/// host: my_host, -/// transaction_id: "tx-123".to_string(), -/// }; -/// -/// match dispatcher.send(message).await { -/// Ok(Ok(result)) => { -/// println!("Dispatched {} frames", result.dispatched_frames.len()); -/// } -/// Ok(Err(e)) => println!("Dispatch error: {}", e), -/// Err(e) => println!("Actor mailbox error: {}", e), -/// } -/// -/// Ok(()) -/// } -/// ``` -pub async fn rqd_dispatcher_service() -> Result, miette::Error> { +/// Returns a cloned handle to the service. `RqdDispatcherService` is `Clone` +/// and internally `Arc`-shared, so the returned value is cheap to pass around. +pub async fn rqd_dispatcher_service() -> Result { RQD_DISPATCHER .get_or_try_init(|| async { use crate::{ @@ -70,16 +42,14 @@ pub async fn rqd_dispatcher_service() -> Result, miet let host_dao = Arc::new(HostDao::new().await?); let proc_dao = Arc::new(ProcDao::new().await?); - let service = RqdDispatcherService::new( + RqdDispatcherService::new( frame_dao, layer_dao, host_dao, proc_dao, CONFIG.rqd.dry_run_mode, ) - .await?; - - Ok(service.start()) + .await }) .await .cloned() diff --git a/rust/crates/scheduler/src/pipeline/matcher.rs b/rust/crates/scheduler/src/pipeline/matcher.rs index b52f0738f..09f5c2e4c 100644 --- a/rust/crates/scheduler/src/pipeline/matcher.rs +++ b/rust/crates/scheduler/src/pipeline/matcher.rs @@ -32,7 +32,6 @@ use crate::{ }, resource_accounting::{resource_accounting_service, ResourceAccountingService}, }; -use actix::Addr; use miette::{Context, Result}; use tokio::sync::Semaphore; use tracing::{debug, error, info, trace, warn}; @@ -48,9 +47,9 @@ pub static WASTED_ATTEMPTS: AtomicUsize = AtomicUsize::new(0); /// - Matching layers to available host candidates /// - Dispatching frames to selected hosts via the RQD dispatcher pub struct MatchingService { - host_service: Addr, + host_service: HostCacheService, layer_dao: LayerDao, - dispatcher_service: Addr, + dispatcher_service: RqdDispatcherService, concurrency_semaphore: Arc, resource_accounting_service: Arc, } @@ -278,13 +277,13 @@ impl MatchingService { let host_candidate = self .host_service - .send(CheckOut { - facility_id: layer.facility_id, - show_id: layer.show_id, + .check_out( + layer.facility_id, + layer.show_id, tags, - cores: cores_requested, - memory: layer.mem_min, - validation: move |host| { + cores_requested, + layer.mem_min, + move |host| { Self::validate_match( host, &layer_id, @@ -294,9 +293,8 @@ impl MatchingService { os.as_deref(), ) }, - }) - .await - .expect("Host Cache actor is unresponsive"); + ) + .await; match host_candidate { Ok(CheckedOutHost(cluster_key, host)) => { @@ -307,21 +305,20 @@ impl MatchingService { match self .dispatcher_service - .send(DispatchLayerMessage { + .dispatch_layer(DispatchLayerMessage { layer, // Move ownership here host, }) .await - .expect("Dispatcher actor is unresponsive") { Ok(DispatchResult { updated_host, updated_layer, }) => { - self.host_service - .send(CheckIn(cluster_key, CheckInPayload::Host(updated_host))) - .await - .expect("Host Cache actor is unresponsive"); + self.host_service.check_in_payload( + cluster_key, + CheckInPayload::Host(updated_host), + ); if updated_layer.frames.is_empty() { // Stop on the first successful attempt @@ -350,13 +347,10 @@ impl MatchingService { &layer_job_id, &host_before_dispatch, ); - self.host_service - .send(CheckIn( - cluster_key, - CheckInPayload::Invalidate(host_before_dispatch.id), - )) - .await - .expect("Host Cache actor is unresponsive"); + self.host_service.check_in_payload( + cluster_key, + CheckInPayload::Invalidate(host_before_dispatch.id), + ); try_again = false; // Can't continue without the layer } }; diff --git a/rust/crates/scheduler/tests/stress_tests.rs b/rust/crates/scheduler/tests/stress_tests.rs index ad47917ea..74c4ef508 100644 --- a/rust/crates/scheduler/tests/stress_tests.rs +++ b/rust/crates/scheduler/tests/stress_tests.rs @@ -68,7 +68,7 @@ mod stress_test { clean_up_test_data(test_prefix).await } - #[actix::test] + #[tokio::test] // #[traced_test] async fn test_stress_small() { let desc = TestDescription { From 8fc4d65c3d1570fadb2e0ed32ad7d9b570394b0b Mon Sep 17 00:00:00 2001 From: Ramon Figueiredo Date: Thu, 14 May 2026 00:32:39 -0700 Subject: [PATCH 10/10] [rust-scheduler] Address review feedback on perf/architecture PR Apply CodeRabbit fixes to the cue-scheduler optimizations: - cluster.rs: dedupe filtered clusters via HashSet before seeding the priority heap (filter_clusters can fold distinct inputs onto the same identity); make the dispatch publish path preemptible by racing sender.reserve() against the stop notify, and only bump CLUSTER_ROUNDS / cluster_polls_total once a Permit is in hand. - metrics: cache per-cluster Counter / Gauge handles in two scc::HashMap<(Uuid, Uuid), _> statics so Uuid::to_string() allocations happen at most once per (show, facility) pair instead of on every dispatch. Add scheduler_layers_skipped_by_lock_total and tighten the wasted_attempts description. - entrypoint.rs: fold fetch-error cycles into the same cycles_without_jobs atomic the success path uses so the empty_job_cycles_before_quiting safety net actually trips when the DB is persistently unhealthy. - matcher.rs: track peer-lock skips in a separate atomic and only fire WASTED_ATTEMPTS when no layer was processed and none were skipped by a peer lock. Prevents the metric from being dominated by expected multi-replica contention. - tests/stress_tests.rs: run the harness on Tokio's multi-thread runtime (worker_threads = 8) so the new contention paths are actually exercised. --- rust/crates/scheduler/src/cluster.rs | 49 +++++++++++++-- rust/crates/scheduler/src/metrics/mod.rs | 60 +++++++++++++++---- .../scheduler/src/pipeline/entrypoint.rs | 23 ++++--- rust/crates/scheduler/src/pipeline/matcher.rs | 13 +++- rust/crates/scheduler/tests/stress_tests.rs | 2 +- 5 files changed, 122 insertions(+), 25 deletions(-) diff --git a/rust/crates/scheduler/src/cluster.rs b/rust/crates/scheduler/src/cluster.rs index dab68d9de..bd748187d 100644 --- a/rust/crates/scheduler/src/cluster.rs +++ b/rust/crates/scheduler/src/cluster.rs @@ -233,6 +233,11 @@ impl ClusterFeedBuilder { } ClusterFeed::filter_clusters(clusters.into_iter().collect(), &self.ignore_tags) }; + // Stripping ignored tags can collapse different inputs into the same + // Cluster identity (same facility / show / surviving tag set), so + // dedupe via a HashSet before seeding the heap to avoid scheduling + // the same logical cluster twice. + let clusters: HashSet = clusters.into_iter().collect(); let now = Instant::now(); let mut heap = BinaryHeap::with_capacity(clusters.len().max(1)); for cluster in clusters { @@ -282,7 +287,12 @@ impl ClusterFeed { /// * `clusters` - Explicit list of clusters to feed. /// * `ignore_tags` - Tag names to drop; a cluster whose tag set becomes empty is excluded. pub fn load_from_clusters(clusters: Vec, ignore_tags: &[String]) -> Self { - let filtered = Self::filter_clusters(clusters, ignore_tags); + // Dedupe after ignore-tag filtering for the same reason as `build()`: + // stripping tags can fold distinct inputs onto the same Cluster + // identity, and we don't want the heap to schedule the duplicate. + let filtered: HashSet = Self::filter_clusters(clusters, ignore_tags) + .into_iter() + .collect(); let now = Instant::now(); let mut heap = BinaryHeap::with_capacity(filtered.len().max(1)); for cluster in filtered { @@ -527,16 +537,43 @@ impl ClusterFeed { } } + // Final shutdown gate before we publish. Without this, a stop + // arriving between the eligibility wait and the send below + // would still let us emit one more cluster (and a possibly + // long-blocked send couldn't be preempted at all). Race the + // channel reservation against a stop notification so we exit + // promptly if the consumer closed or shutdown is signalled. + if stop_flag_dispatch.load(Ordering::Relaxed) { + break; + } + let permit = loop { + tokio::select! { + reserve = sender.reserve() => { + match reserve { + Ok(p) => break Some(p), + Err(_) => { + warn!("Cluster receiver dropped. Stopping feed."); + break None; + } + } + } + _ = notify_dispatch.notified() => { + if stop_flag_dispatch.load(Ordering::Relaxed) { + break None; + } + // Notify but no stop yet — keep waiting for a slot. + } + } + }; + let Some(permit) = permit else { break }; + + // Only count a poll once we have a slot and are about to publish. CLUSTER_ROUNDS.fetch_add(1, Ordering::Relaxed); metrics::increment_cluster_polls( &scheduled.cluster.show_id, &scheduled.cluster.facility_id, ); - - if sender.send(scheduled.cluster).await.is_err() { - warn!("Cluster receiver dropped. Stopping feed."); - break; - } + permit.send(scheduled.cluster); } }); diff --git a/rust/crates/scheduler/src/metrics/mod.rs b/rust/crates/scheduler/src/metrics/mod.rs index 75c76c889..45611999c 100644 --- a/rust/crates/scheduler/src/metrics/mod.rs +++ b/rust/crates/scheduler/src/metrics/mod.rs @@ -14,8 +14,9 @@ use axum::{response::IntoResponse, routing::get, Router}; use lazy_static::lazy_static; use prometheus::{ register_counter, register_counter_vec, register_gauge_vec, register_histogram, Counter, - CounterVec, Encoder, GaugeVec, Histogram, TextEncoder, + CounterVec, Encoder, Gauge, GaugeVec, Histogram, TextEncoder, }; +use scc::HashMap as SccHashMap; use std::time::Duration; use tracing::{error, info}; @@ -97,6 +98,17 @@ lazy_static! { ) .expect("Failed to register cluster_last_dispatched_jobs gauge"); + /// Per-cluster `Counter` handles cached so the per-call `Uuid::to_string()` + /// allocations into `with_label_values` happen at most once per + /// `(show_id, facility_id)` pair, instead of on every dispatch loop pop. + /// Subsequent increments just bump an `Arc`-backed counter. + static ref CLUSTER_POLL_COUNTERS: + SccHashMap<(uuid::Uuid, uuid::Uuid), Counter> = SccHashMap::new(); + + /// Same caching as above for the per-cluster `last_dispatched_jobs` gauge. + static ref CLUSTER_LAST_DISPATCHED_GAUGES: + SccHashMap<(uuid::Uuid, uuid::Uuid), Gauge> = SccHashMap::new(); + // Matcher metrics mirroring the in-process atomics in pipeline/matcher.rs. pub static ref HOSTS_ATTEMPTED_TOTAL: Counter = register_counter!( "scheduler_hosts_attempted_total", @@ -106,9 +118,15 @@ lazy_static! { pub static ref WASTED_ATTEMPTS_TOTAL: Counter = register_counter!( "scheduler_wasted_attempts_total", - "Jobs that processed zero layers (e.g. all locked by another scheduler)" + "Jobs that processed zero layers and where every layer was free (i.e. no host candidate found)" ) .expect("Failed to register wasted_attempts_total counter"); + + pub static ref LAYERS_SKIPPED_BY_LOCK_TOTAL: Counter = register_counter!( + "scheduler_layers_skipped_by_lock_total", + "Layers skipped because another scheduler held the row lock (expected in multi-replica deployments, not wasted work)" + ) + .expect("Failed to register layers_skipped_by_lock_total counter"); } /// Handler for the /metrics endpoint @@ -206,28 +224,42 @@ pub fn observe_job_query_duration(duration: Duration) { JOB_QUERY_DURATION_SECONDS.observe(duration.as_secs_f64()); } -/// Helper function to increment cluster polls counter. +/// Helper function to increment the cluster polls counter. /// /// Bumps both the per-cluster `scheduler_cluster_polls_total` and the global /// `scheduler_cluster_rounds_total` so dashboards can use whichever is cheaper. +/// +/// The per-cluster `Counter` is cached in [`CLUSTER_POLL_COUNTERS`] so the +/// `Uuid::to_string()` allocations only run on the first sighting of each +/// `(show_id, facility_id)` pair. #[inline] pub fn increment_cluster_polls(show_id: &uuid::Uuid, facility_id: &uuid::Uuid) { - CLUSTER_POLLS_TOTAL - .with_label_values(&[&show_id.to_string(), &facility_id.to_string()]) - .inc(); + let key = (*show_id, *facility_id); + let entry = CLUSTER_POLL_COUNTERS.entry_sync(key).or_insert_with(|| { + CLUSTER_POLLS_TOTAL.with_label_values(&[&show_id.to_string(), &facility_id.to_string()]) + }); + entry.get().inc(); CLUSTER_ROUNDS_TOTAL.inc(); } /// Records the number of jobs dispatched in the most recent cycle for a cluster. +/// +/// Uses the [`CLUSTER_LAST_DISPATCHED_GAUGES`] cache to avoid per-call +/// `Uuid::to_string()` allocations on the control-loop hot path. #[inline] pub fn set_cluster_last_dispatched_jobs( show_id: &uuid::Uuid, facility_id: &uuid::Uuid, count: usize, ) { - CLUSTER_LAST_DISPATCHED_JOBS - .with_label_values(&[&show_id.to_string(), &facility_id.to_string()]) - .set(count as f64); + let key = (*show_id, *facility_id); + let entry = CLUSTER_LAST_DISPATCHED_GAUGES + .entry_sync(key) + .or_insert_with(|| { + CLUSTER_LAST_DISPATCHED_JOBS + .with_label_values(&[&show_id.to_string(), &facility_id.to_string()]) + }); + entry.get().set(count as f64); } /// Records a host-candidate selection attempt. @@ -236,8 +268,16 @@ pub fn increment_hosts_attempted() { HOSTS_ATTEMPTED_TOTAL.inc(); } -/// Records a job that processed zero layers (e.g. all locked by another scheduler). +/// Records a job that processed zero layers AND none were skipped due to a +/// peer-scheduler lock, i.e. a real "no host candidate" miss, not contention. #[inline] pub fn increment_wasted_attempts() { WASTED_ATTEMPTS_TOTAL.inc(); } + +/// Records a layer skipped because another scheduler currently holds its row lock. +/// Expected in multi-replica deployments — the work is happening elsewhere, not lost. +#[inline] +pub fn increment_layers_skipped_by_lock() { + LAYERS_SKIPPED_BY_LOCK_TOTAL.inc(); +} diff --git a/rust/crates/scheduler/src/pipeline/entrypoint.rs b/rust/crates/scheduler/src/pipeline/entrypoint.rs index a5541ddc2..1b527486f 100644 --- a/rust/crates/scheduler/src/pipeline/entrypoint.rs +++ b/rust/crates/scheduler/src/pipeline/entrypoint.rs @@ -108,17 +108,26 @@ pub async fn run(cluster_feed: ClusterFeed) -> miette::Result<()> { (processed, stop) } Err(err) => { - // Don't halt the entire scheduler for a single cluster's - // fetch error (network blip, transient DB load). Treat - // the cluster as empty for this cycle so it gets the - // configured back-off and other clusters keep running. - // The `empty_job_cycles_before_quiting` safety net still - // applies if the error persists across all clusters. error!( "Failed to fetch jobs for cluster {}: {}", cluster, err ); - (0, false) + // Don't halt the scheduler for a single cluster's fetch + // error (network blip, transient DB load). Treat it as + // an empty cycle so the cluster gets the configured + // back-off and other clusters keep running, but feed + // the same atomic counter the success path uses so the + // `empty_job_cycles_before_quiting` safety net actually + // trips if fetches keep failing across the feed. + let stop = match CONFIG.queue.empty_job_cycles_before_quiting { + Some(limit) => { + let new_count = + cycles_without_jobs.fetch_add(1, Ordering::SeqCst) + 1; + new_count >= limit + } + None => false, + }; + (0, stop) } }; diff --git a/rust/crates/scheduler/src/pipeline/matcher.rs b/rust/crates/scheduler/src/pipeline/matcher.rs index 09f5c2e4c..bf9e7cba1 100644 --- a/rust/crates/scheduler/src/pipeline/matcher.rs +++ b/rust/crates/scheduler/src/pipeline/matcher.rs @@ -117,6 +117,9 @@ impl MatchingService { match layers { Ok(layers) => { let processed_layers = AtomicUsize::new(0); + // Track peer-lock skips separately from real waste — see the + // wasted-attempts guard below. + let layers_skipped_by_lock = AtomicUsize::new(0); // Stream elegible layers from this job and dispatch one by one for layer in layers { @@ -143,6 +146,8 @@ impl MatchingService { "Layer skipped. {} already being processed by another scheduler.", layer ); + layers_skipped_by_lock.fetch_add(1, Ordering::Relaxed); + metrics::increment_layers_skipped_by_lock(); continue; } Err(err) => { @@ -164,7 +169,13 @@ impl MatchingService { processed_layers.fetch_add(1, Ordering::Relaxed); } - if processed_layers.load(Ordering::Relaxed) == 0 { + // Only flag a job as "wasted" when we actually attempted + // dispatch but found nothing to do. Peer-lock skips are not + // waste — the work is being done on another scheduler — so + // they don't count against this metric. + if processed_layers.load(Ordering::Relaxed) == 0 + && layers_skipped_by_lock.load(Ordering::Relaxed) == 0 + { WASTED_ATTEMPTS.fetch_add(1, Ordering::Relaxed); metrics::increment_wasted_attempts(); debug!("Job {} didn't process any layer", job_disp); diff --git a/rust/crates/scheduler/tests/stress_tests.rs b/rust/crates/scheduler/tests/stress_tests.rs index 74c4ef508..ffd0f32e9 100644 --- a/rust/crates/scheduler/tests/stress_tests.rs +++ b/rust/crates/scheduler/tests/stress_tests.rs @@ -68,7 +68,7 @@ mod stress_test { clean_up_test_data(test_prefix).await } - #[tokio::test] + #[tokio::test(flavor = "multi_thread", worker_threads = 8)] // #[traced_test] async fn test_stress_small() { let desc = TestDescription {