From 15e0ad3d97c7b5092277a93a41deccc2b1c374da Mon Sep 17 00:00:00 2001 From: Ramon Figueiredo Date: Wed, 13 May 2026 17:41:34 -0700 Subject: [PATCH 1/2] [rust-scheduler] Replace round-robin ClusterFeed with priority-queue scheduling Fixes #2315. The ClusterFeed previously iterated clusters with a fixed-step round-robin and applied lap-based 10ms/100ms/5s sleeps between rounds. Every cluster received equal air-time regardless of work, and a sleeping cluster added delay to neighboring clusters. Replace the Vec + AtomicUsize index with a BinaryHeap ordered by (next_eligible_at asc, last_dispatched_jobs desc). The feed now pops the highest-priority cluster, sleeps until eligible (preemptible via Notify), emits, and waits for a Done message that re-inserts the cluster with updated stats. Lap-based sleep tiers and the parallel sleep_map are removed; eligibility now lives on each heap entry. Cluster feed changes (cluster.rs): - Added Scheduled { cluster, next_eligible_at, last_dispatched_jobs } with Ord prioritizing earliest eligibility, then busiest cluster on ties (productivity bias). - ClusterFeed now holds Arc>> + Notify. - FeedMessage::Sleep / Stop() collapsed into FeedMessage::Done { cluster, processed_jobs, sleep } + unit-variant Stop. - stream() dispatch loop now pops, sleeps with preemption, and emits. - Control loop re-inserts on Done and notifies the dispatch loop. - Added ClusterFeed::load_from_clusters shim for tests. Consumer (pipeline/entrypoint.rs): - Always sends FeedMessage::Done with processed_jobs and optional back-off. - Preserves should_stop / empty_job_cycles logic. Config (config/mod.rs): - Added cluster_empty_back_off: Duration (default 3s), replacing hardcoded entrypoint back-off. - Added cluster_productivity_bias: bool (default true), toggling busy-cluster tie-breaking. Observability (metrics/mod.rs): - Added scheduler_cluster_polls_total {show_id, facility_id} CounterVec. - Added scheduler_cluster_rounds_total Counter (mirrors CLUSTER_ROUNDS). - Added scheduler_cluster_last_dispatched_jobs {show_id, facility_id} Gauge, updated from the Done handler to expose the productivity-bias signal. Tests: - Added 6 unit tests covering Scheduled ordering, sleep-back-off timing, Done round-trip, and a 500ms mixed-workload proportionality test asserting busy clusters receive >2x the polls of idle ones. - Updated tests/util.rs for the two new QueueConfig fields. --- rust/crates/scheduler/src/cluster.rs | 574 ++++++++++++++---- rust/crates/scheduler/src/config/mod.rs | 10 + rust/crates/scheduler/src/metrics/mod.rs | 54 +- .../scheduler/src/pipeline/entrypoint.rs | 59 +- rust/crates/scheduler/tests/util.rs | 2 + 5 files changed, 558 insertions(+), 141 deletions(-) diff --git a/rust/crates/scheduler/src/cluster.rs b/rust/crates/scheduler/src/cluster.rs index 71a93d7c5..b29bc88ce 100644 --- a/rust/crates/scheduler/src/cluster.rs +++ b/rust/crates/scheduler/src/cluster.rs @@ -11,19 +11,20 @@ // the License. use std::{ - collections::{BTreeSet, HashMap, HashSet}, + cmp::Ordering as CmpOrdering, + collections::{BTreeSet, BinaryHeap, HashMap, HashSet}, sync::{ atomic::{AtomicBool, AtomicUsize, Ordering}, - Arc, Mutex, RwLock, + Arc, Mutex, }, - time::{Duration, SystemTime}, + time::{Duration, Instant}, }; use futures::StreamExt; use itertools::Itertools; use miette::{IntoDiagnostic, Result}; use serde::{Deserialize, Serialize}; -use tokio::sync::mpsc; +use tokio::sync::{mpsc, Notify}; use tracing::{debug, error, warn}; use uuid::Uuid; @@ -31,8 +32,14 @@ use crate::{ cluster_key::{Tag, TagType}, config::CONFIG, dao::{helpers::parse_uuid, ClusterDao}, + metrics, }; +/// Counts how many times a cluster has been emitted by the feed. +/// +/// Kept as a process-global atomic for source-level compatibility with smoke tests that +/// read it directly. Production observability is provided via the Prometheus +/// `scheduler_cluster_polls_total` counter (see [`metrics`]). pub static CLUSTER_ROUNDS: AtomicUsize = AtomicUsize::new(0); #[derive(Serialize, Deserialize, Debug, Clone, Hash, PartialEq, Eq, PartialOrd, Ord)] @@ -72,28 +79,86 @@ impl Cluster { } } +/// A cluster scheduled for dispatch, ordered by eligibility time and recent productivity. +/// +/// `BinaryHeap` is a max-heap, so [`Scheduled::cmp`] inverts `next_eligible_at` +/// (earliest wins) and keeps `last_dispatched_jobs` in natural order (busier wins +/// as a tiebreaker when productivity bias is enabled). +#[derive(Debug, Clone)] +struct Scheduled { + cluster: Cluster, + /// When this cluster becomes eligible for dispatch. A past `Instant` means + /// the cluster is ready right now. + next_eligible_at: Instant, + /// Number of jobs processed in the most recent dispatch: used as the + /// productivity bias tiebreaker. + last_dispatched_jobs: usize, +} + +impl PartialEq for Scheduled { + fn eq(&self, other: &Self) -> bool { + self.next_eligible_at == other.next_eligible_at + && self.last_dispatched_jobs == other.last_dispatched_jobs + && self.cluster == other.cluster + } +} +impl Eq for Scheduled {} + +impl Ord for Scheduled { + fn cmp(&self, other: &Self) -> CmpOrdering { + // Earliest `next_eligible_at` is highest priority. Invert for max-heap. + let by_time = other.next_eligible_at.cmp(&self.next_eligible_at); + if by_time != CmpOrdering::Equal { + return by_time; + } + // Productivity bias: more jobs in the last dispatch ranks higher. + if CONFIG.queue.cluster_productivity_bias { + let by_jobs = self.last_dispatched_jobs.cmp(&other.last_dispatched_jobs); + if by_jobs != CmpOrdering::Equal { + return by_jobs; + } + } + // Deterministic tiebreaker on the cluster identity. + self.cluster.cmp(&other.cluster) + } +} +impl PartialOrd for Scheduled { + fn partial_cmp(&self, other: &Self) -> Option { + Some(self.cmp(other)) + } +} + #[derive(Debug)] pub struct ClusterFeed { - pub clusters: Arc>>, - current_index: Arc, + /// Priority queue of clusters awaiting dispatch. + queue: Arc>>, stop_flag: Arc, - sleep_map: Arc>>, + /// Wakes the dispatch loop when an entry is pushed back or shutdown is signaled. + notify: Arc, } /// Control messages for the cluster feed stream. /// -/// These messages are sent to the control channel returned by `ClusterFeed::stream()` -/// to influence feed behavior during runtime. +/// Sent to the channel returned by [`ClusterFeed::stream`] to report back the +/// outcome of processing a cluster (so the priority queue can re-rank it) or +/// to signal a graceful shutdown. pub enum FeedMessage { /// Stops the cluster feed stream gracefully. - Stop(), - /// Puts a specific cluster to sleep for the given duration. + Stop, + /// Reports the result of processing a cluster and re-inserts it into the + /// priority queue. /// /// # Fields /// - /// * `Cluster` - The cluster to put to sleep - /// * `Duration` - How long to sleep before the cluster can be processed again - Sleep(Cluster, Duration), + /// * `cluster` - The cluster that was just processed. + /// * `processed_jobs` - Jobs dispatched on this cycle. Drives the productivity-bias tiebreaker. + /// * `sleep` - Optional back-off duration. `None` re-queues the cluster as eligible now; + /// `Some(d)` defers it for at least `d`. + Done { + cluster: Cluster, + processed_jobs: usize, + sleep: Option, + }, } /// Builder for constructing a [`ClusterFeed`]. @@ -159,11 +224,19 @@ impl ClusterFeedBuilder { } ClusterFeed::filter_clusters(clusters.into_iter().collect(), &self.ignore_tags) }; + let now = Instant::now(); + let mut heap = BinaryHeap::with_capacity(clusters.len().max(1)); + for cluster in clusters { + heap.push(Scheduled { + cluster, + next_eligible_at: now, + last_dispatched_jobs: 0, + }); + } Ok(ClusterFeed { - clusters: Arc::new(RwLock::new(clusters)), - current_index: Arc::new(AtomicUsize::new(0)), + queue: Arc::new(Mutex::new(heap)), stop_flag: Arc::new(AtomicBool::new(false)), - sleep_map: Arc::new(Mutex::new(HashMap::new())), + notify: Arc::new(Notify::new()), }) } } @@ -189,6 +262,34 @@ impl ClusterFeed { } } + /// Builds a feed from a fixed list of clusters with optional tag filtering. + /// + /// Intended for tests and direct construction where the cluster set is already + /// known. Production code should prefer the builder ([`facility`](Self::facility) / + /// [`no_facility`](Self::no_facility)) which can also load clusters from the database. + /// + /// # Arguments + /// + /// * `clusters` - Explicit list of clusters to feed. + /// * `ignore_tags` - Tag names to drop; a cluster whose tag set becomes empty is excluded. + pub fn load_from_clusters(clusters: Vec, ignore_tags: &[String]) -> Self { + let filtered = Self::filter_clusters(clusters, ignore_tags); + let now = Instant::now(); + let mut heap = BinaryHeap::with_capacity(filtered.len().max(1)); + for cluster in filtered { + heap.push(Scheduled { + cluster, + next_eligible_at: now, + last_dispatched_jobs: 0, + }); + } + ClusterFeed { + queue: Arc::new(Mutex::new(heap)), + stop_flag: Arc::new(AtomicBool::new(false)), + notify: Arc::new(Notify::new()), + } + } + /// Loads all clusters from the database and organizes them by tag type. /// /// Loads allocation clusters (one per facility+show+tag), and chunks manual/hostname tags @@ -336,148 +437,152 @@ impl ClusterFeed { .collect() } - /// Streams clusters to a channel receiver with backpressure control. + /// Streams clusters from a priority queue to a channel receiver. /// - /// Creates a producer-consumer pattern where clusters are sent through a channel - /// to the provided sender. The stream can be controlled via the returned message - /// channel (for sleep/stop commands). + /// Creates a producer-consumer pattern backed by a `BinaryHeap` + /// keyed on `(next_eligible_at asc, last_dispatched_jobs desc)`. The consumer + /// must report each cluster back via [`FeedMessage::Done`] so it can be + /// re-inserted with updated stats; otherwise the cluster is lost. /// /// # Arguments /// - /// * `sender` - Channel sender for emitting clusters + /// * `sender` - Channel sender for emitting eligible clusters. /// /// # Returns /// - /// * `mpsc::Sender` - Control channel for sending sleep/stop messages + /// * `mpsc::Sender` - Channel to send `Done` (re-insert) and `Stop` messages. /// /// # Behavior /// - /// - Iterates through clusters in round-robin fashion - /// - Skips sleeping clusters until their wake time expires - /// - Applies backoff delays between rounds (varies based on sleeping cluster count) - /// - Stops when receiving a Stop message or when configured empty cycles limit is reached - /// - Automatically cleans up expired sleep entries + /// - Pops the highest-priority cluster (earliest eligible, busiest as tiebreaker). + /// - If the popped entry is not yet eligible, sleeps until it is, but wakes + /// immediately on `Notify` so a freshly inserted higher-priority entry can preempt. + /// - On `FeedMessage::Done` re-inserts the cluster with the reported sleep + /// deadline and `last_dispatched_jobs` for the productivity bias. + /// - On `FeedMessage::Stop` sets the stop flag and exits. pub async fn stream(self, sender: mpsc::Sender) -> mpsc::Sender { - // Use a small channel to ensure the producer waits for items to be consumed before - // generating more - let (cancel_sender, mut feed_receiver) = mpsc::channel(8); + // Backpressure: bounded channel keeps the consumer from queueing unbounded Done messages. + let (control_sender, mut control_receiver) = mpsc::channel(32); let stop_flag = self.stop_flag.clone(); - let sleep_map = self.sleep_map.clone(); + let queue = self.queue.clone(); + let notify = self.notify.clone(); - // Stream clusters on the caller channel + // Dispatch loop: pop highest-priority cluster, sleep until eligible, send. + let queue_dispatch = queue.clone(); + let notify_dispatch = notify.clone(); + let stop_flag_dispatch = stop_flag.clone(); tokio::spawn(async move { - let mut all_sleeping_rounds = 0; - let feed = self.clusters.clone(); - let current_index_atomic = self.current_index.clone(); - loop { - // Check stop flag - if stop_flag.load(Ordering::Relaxed) { - warn!("Cluster received a stop message. Stopping feed."); + if stop_flag_dispatch.load(Ordering::Relaxed) { + warn!("Cluster feed received stop signal. Exiting dispatch loop."); break; } - let (item, cluster_size, completed_round) = { - let clusters = feed.read().unwrap_or_else(|poisoned| poisoned.into_inner()); - if clusters.is_empty() { - break; - } - - let current_index = current_index_atomic.load(Ordering::Relaxed); - let item = clusters[current_index].clone(); - let next_index = (current_index + 1) % clusters.len(); - let completed_round = next_index == 0; // Detect wrap-around - current_index_atomic.store(next_index, Ordering::Relaxed); - - (item, clusters.len(), completed_round) + let scheduled = { + let mut heap = + queue_dispatch.lock().unwrap_or_else(|p| p.into_inner()); + heap.pop() }; - // Skip cluster if it is marked as sleeping - let is_sleeping = { - let mut sleep_map_lock = sleep_map.lock().unwrap_or_else(|p| p.into_inner()); - if let Some(wake_up_time) = sleep_map_lock.get(&item) { - if *wake_up_time > SystemTime::now() { - // Still sleeping, skip it - true - } else { - // Remove expired entries - sleep_map_lock.remove(&item); - false + let scheduled = match scheduled { + Some(s) => s, + None => { + // No clusters available: all in flight. Wait for a re-insert + // or a stop signal. The short fallback sleep guarantees we + // re-check `stop_flag` if no notify ever arrives. + tokio::select! { + _ = notify_dispatch.notified() => {} + _ = tokio::time::sleep(Duration::from_millis(100)) => {} } - } else { - false + continue; } }; - if !is_sleeping && sender.send(item).await.is_err() { - warn!("Cluster receiver dropped. Stopping feed."); - break; + let now = Instant::now(); + if scheduled.next_eligible_at > now { + let sleep_dur = scheduled.next_eligible_at - now; + let preempted = tokio::select! { + _ = tokio::time::sleep(sleep_dur) => false, + _ = notify_dispatch.notified() => true, + }; + if preempted { + // A new entry was pushed: re-pop to pick the new best. + let mut heap = + queue_dispatch.lock().unwrap_or_else(|p| p.into_inner()); + heap.push(scheduled); + continue; + } + if stop_flag_dispatch.load(Ordering::Relaxed) { + break; + } } - // At end of round, add backoff sleep - if completed_round { - CLUSTER_ROUNDS.fetch_add(1, Ordering::Relaxed); + CLUSTER_ROUNDS.fetch_add(1, Ordering::Relaxed); + metrics::increment_cluster_polls( + &scheduled.cluster.show_id, + &scheduled.cluster.facility_id, + ); - // Check if all/most clusters are sleeping - let sleeping_count = { - let sleep_map_lock = sleep_map.lock().unwrap_or_else(|p| p.into_inner()); - sleep_map_lock.len() - }; - if sleeping_count >= cluster_size { - // Ensure this doesn't loop forever when there's a limit configured - all_sleeping_rounds += 1; - if let Some(max_empty_cycles) = CONFIG.queue.empty_job_cycles_before_quiting - { - if all_sleeping_rounds > max_empty_cycles { - warn!("All clusters have been sleeping for too long"); - break; - } - } - - // All clusters sleeping, sleep longer - tokio::time::sleep(Duration::from_secs(5)).await; - } else if sleeping_count > 0 { - // Some clusters sleeping, brief pause - tokio::time::sleep(Duration::from_millis(100)).await; - } else { - // Active work, minimal pause - tokio::time::sleep(Duration::from_millis(10)).await; - } + if sender.send(scheduled.cluster).await.is_err() { + warn!("Cluster receiver dropped. Stopping feed."); + break; } } }); - // Process messages on the receiving end - let sleep_map = self.sleep_map.clone(); + // Control loop: handle Done (re-insert) and Stop messages from the consumer. + let queue_ctrl = queue.clone(); + let notify_ctrl = notify.clone(); + let stop_flag_ctrl = stop_flag.clone(); tokio::spawn(async move { - while let Some(message) = feed_receiver.recv().await { + while let Some(message) = control_receiver.recv().await { match message { - FeedMessage::Sleep(cluster, duration) => { - if let Some(wake_up_time) = SystemTime::now().checked_add(duration) { - debug!("{:?} put to sleep for {}s", cluster, duration.as_secs()); - { - let mut sleep_map_lock = - sleep_map.lock().unwrap_or_else(|p| p.into_inner()); - sleep_map_lock.insert(cluster, wake_up_time); + FeedMessage::Done { + cluster, + processed_jobs, + sleep, + } => { + let next_eligible_at = match sleep { + Some(d) => { + debug!( + "{:?} re-queued with back-off of {}s ({} jobs processed)", + cluster, + d.as_secs(), + processed_jobs + ); + Instant::now() + .checked_add(d) + .unwrap_or_else(Instant::now) } - } else { - warn!( - "Sleep request ignored for {:?}. Invalid duration={}s", + None => Instant::now(), + }; + metrics::set_cluster_last_dispatched_jobs( + &cluster.show_id, + &cluster.facility_id, + processed_jobs, + ); + { + let mut heap = + queue_ctrl.lock().unwrap_or_else(|p| p.into_inner()); + heap.push(Scheduled { cluster, - duration.as_secs() - ); + next_eligible_at, + last_dispatched_jobs: processed_jobs, + }); } + notify_ctrl.notify_one(); } - FeedMessage::Stop() => { - self.stop_flag.store(true, Ordering::Relaxed); + FeedMessage::Stop => { + stop_flag_ctrl.store(true, Ordering::Relaxed); + notify_ctrl.notify_one(); break; } } } }); - cancel_sender + control_sender } } @@ -513,3 +618,242 @@ pub async fn get_show_id(show_name: &str) -> Result { let cluster_dao = ClusterDao::new().await?; cluster_dao.get_show_id(show_name).await.into_diagnostic() } + +#[cfg(test)] +mod tests { + use super::*; + use std::collections::BTreeSet; + + fn make_cluster(name: &str) -> Cluster { + Cluster { + facility_id: Uuid::new_v4(), + show_id: Uuid::new_v4(), + tags: BTreeSet::from([Tag { + name: name.to_string(), + ttype: TagType::Alloc, + }]), + } + } + + fn scheduled(cluster: Cluster, at: Instant, jobs: usize) -> Scheduled { + Scheduled { + cluster, + next_eligible_at: at, + last_dispatched_jobs: jobs, + } + } + + #[test] + fn earlier_next_eligible_at_pops_first() { + let now = Instant::now(); + let later = scheduled(make_cluster("a"), now + Duration::from_secs(10), 100); + let sooner = scheduled(make_cluster("b"), now, 0); + + let mut heap = BinaryHeap::new(); + heap.push(later); + heap.push(sooner); + + let popped = heap.pop().expect("heap not empty"); + assert_eq!(popped.last_dispatched_jobs, 0); + } + + #[test] + fn busier_cluster_pops_first_when_eligibility_ties() { + // This test relies on the default `cluster_productivity_bias = true`. + assert!( + CONFIG.queue.cluster_productivity_bias, + "test assumes default config: productivity bias enabled" + ); + + let now = Instant::now(); + let idle = scheduled(make_cluster("idle"), now, 0); + let busy = scheduled(make_cluster("busy"), now, 500); + + let mut heap = BinaryHeap::new(); + heap.push(idle); + heap.push(busy); + + let popped = heap.pop().expect("heap not empty"); + assert_eq!(popped.last_dispatched_jobs, 500); + } + + #[test] + fn sleeping_cluster_does_not_block_eligible_neighbor() { + let now = Instant::now(); + let sleeping = scheduled( + make_cluster("sleeping"), + now + Duration::from_secs(60), + 10, + ); + let ready = scheduled(make_cluster("ready"), now, 0); + + let mut heap = BinaryHeap::new(); + heap.push(sleeping); + heap.push(ready); + + let first = heap.pop().expect("heap not empty"); + let first_tag = first.cluster.tags.iter().next().unwrap().name.clone(); + assert_eq!(first_tag, "ready"); + } + + #[tokio::test] + async fn stream_round_trip_re_inserts_via_done() { + let cluster = make_cluster("solo"); + let mut heap = BinaryHeap::new(); + heap.push(Scheduled { + cluster: cluster.clone(), + next_eligible_at: Instant::now(), + last_dispatched_jobs: 0, + }); + + let feed = ClusterFeed { + queue: Arc::new(Mutex::new(heap)), + stop_flag: Arc::new(AtomicBool::new(false)), + notify: Arc::new(Notify::new()), + }; + + let (tx, mut rx) = mpsc::channel::(8); + let control = feed.stream(tx).await; + + // Receive the cluster, report it back as productive, receive it again. + let emitted = rx.recv().await.expect("feed emitted a cluster"); + assert_eq!(emitted, cluster); + + control + .send(FeedMessage::Done { + cluster: emitted.clone(), + processed_jobs: 7, + sleep: None, + }) + .await + .expect("control channel open"); + + let emitted_again = tokio::time::timeout(Duration::from_secs(1), rx.recv()) + .await + .expect("cluster re-emitted within 1s") + .expect("cluster present"); + assert_eq!(emitted_again, cluster); + + // Tidy up so the spawned tasks exit. + control.send(FeedMessage::Stop).await.ok(); + } + + #[tokio::test] + async fn busy_cluster_gets_more_polls_than_idle() { + // Two clusters share the feed. The "busy" cluster reports productive + // work with no back-off; the "idle" cluster reports zero jobs with a + // 100ms back-off. Over a 500ms window busy should be polled many more + // times than idle. + let busy = make_cluster("busy"); + let idle = make_cluster("idle"); + let now = Instant::now(); + let mut heap = BinaryHeap::new(); + heap.push(Scheduled { + cluster: busy.clone(), + next_eligible_at: now, + last_dispatched_jobs: 0, + }); + heap.push(Scheduled { + cluster: idle.clone(), + next_eligible_at: now, + last_dispatched_jobs: 0, + }); + + let feed = ClusterFeed { + queue: Arc::new(Mutex::new(heap)), + stop_flag: Arc::new(AtomicBool::new(false)), + notify: Arc::new(Notify::new()), + }; + + let (tx, mut rx) = mpsc::channel::(16); + let control = feed.stream(tx).await; + + let mut busy_polls = 0usize; + let mut idle_polls = 0usize; + let test_duration = Duration::from_millis(500); + let idle_back_off = Duration::from_millis(100); + let deadline = Instant::now() + test_duration; + + while Instant::now() < deadline { + let remaining = deadline.saturating_duration_since(Instant::now()); + let next = tokio::time::timeout(remaining, rx.recv()).await; + let Ok(Some(cluster)) = next else { break }; + + let is_busy = cluster.tags.iter().next().unwrap().name == "busy"; + let (processed_jobs, sleep) = if is_busy { + busy_polls += 1; + (100, None) + } else { + idle_polls += 1; + (0, Some(idle_back_off)) + }; + control + .send(FeedMessage::Done { + cluster, + processed_jobs, + sleep, + }) + .await + .ok(); + } + + control.send(FeedMessage::Stop).await.ok(); + + // Idle is back-off-bound: in 500ms with a 100ms back-off it can be + // polled at most ~5 times. Busy has no back-off and is only limited + // by task scheduling, so it should run away. + assert!( + busy_polls > idle_polls * 2, + "expected busy polls (>2x) idle polls; busy={busy_polls} idle={idle_polls}" + ); + assert!(busy_polls > 0 && idle_polls > 0, "both clusters polled at least once"); + } + + #[tokio::test] + async fn stream_respects_sleep_back_off() { + let cluster = make_cluster("naps"); + let mut heap = BinaryHeap::new(); + heap.push(Scheduled { + cluster: cluster.clone(), + next_eligible_at: Instant::now(), + last_dispatched_jobs: 0, + }); + + let feed = ClusterFeed { + queue: Arc::new(Mutex::new(heap)), + stop_flag: Arc::new(AtomicBool::new(false)), + notify: Arc::new(Notify::new()), + }; + + let (tx, mut rx) = mpsc::channel::(8); + let control = feed.stream(tx).await; + + // First emission is immediate. + rx.recv().await.expect("first emission"); + + // Re-queue with a 250ms back-off. + let back_off = Duration::from_millis(250); + let started = Instant::now(); + control + .send(FeedMessage::Done { + cluster: cluster.clone(), + processed_jobs: 0, + sleep: Some(back_off), + }) + .await + .expect("control channel open"); + + // Should not arrive before the back-off, should arrive shortly after. + let _ = tokio::time::timeout(Duration::from_secs(2), rx.recv()) + .await + .expect("cluster re-emitted within 2s") + .expect("cluster present"); + let elapsed = started.elapsed(); + assert!( + elapsed >= back_off, + "cluster re-emitted in {elapsed:?}, expected >= {back_off:?}" + ); + + control.send(FeedMessage::Stop).await.ok(); + } +} diff --git a/rust/crates/scheduler/src/config/mod.rs b/rust/crates/scheduler/src/config/mod.rs index 3d46e1419..36eb7556f 100644 --- a/rust/crates/scheduler/src/config/mod.rs +++ b/rust/crates/scheduler/src/config/mod.rs @@ -82,6 +82,14 @@ pub struct QueueConfig { pub hostname_tags_chunk_size: usize, pub host_candidate_attempts_per_layer: usize, pub empty_job_cycles_before_quiting: Option, + /// Delay before an empty cluster (no eligible jobs) becomes eligible for dispatch again. + /// Used by the priority-queue cluster feed to back off without blocking neighbors. + #[serde(with = "humantime_serde")] + pub cluster_empty_back_off: Duration, + /// When `true`, the cluster priority queue biases toward clusters that produced more + /// jobs in the most recent dispatch (productivity bias). When `false`, all clusters + /// compete strictly on `next_eligible_at` with a stable tiebreaker. + pub cluster_productivity_bias: bool, pub mem_reserved_min: ByteSize, #[serde(with = "humantime_serde")] pub subscription_recalculation_interval: Duration, @@ -108,6 +116,8 @@ impl Default for QueueConfig { hostname_tags_chunk_size: 300, host_candidate_attempts_per_layer: 10, empty_job_cycles_before_quiting: None, + cluster_empty_back_off: Duration::from_secs(3), + cluster_productivity_bias: true, mem_reserved_min: ByteSize::mib(250), subscription_recalculation_interval: Duration::from_secs(3), resource_recalculation_interval: Duration::from_secs(10), diff --git a/rust/crates/scheduler/src/metrics/mod.rs b/rust/crates/scheduler/src/metrics/mod.rs index f96eabbb1..9dfe9c94c 100644 --- a/rust/crates/scheduler/src/metrics/mod.rs +++ b/rust/crates/scheduler/src/metrics/mod.rs @@ -13,8 +13,8 @@ use axum::{response::IntoResponse, routing::get, Router}; use lazy_static::lazy_static; use prometheus::{ - register_counter, register_counter_vec, register_histogram, Counter, CounterVec, Encoder, - Histogram, TextEncoder, + register_counter, register_counter_vec, register_gauge_vec, register_histogram, Counter, + CounterVec, Encoder, GaugeVec, Histogram, TextEncoder, }; use std::time::Duration; use tracing::{error, info}; @@ -70,6 +70,32 @@ lazy_static! { vec![0.001, 0.005, 0.01, 0.05, 0.1, 0.5, 1.0, 5.0, 10.0] ) .expect("Failed to register job_query_duration_seconds histogram"); + + // Cluster feed metrics from cluster.rs + pub static ref CLUSTER_POLLS_TOTAL: CounterVec = register_counter_vec!( + "scheduler_cluster_polls_total", + "Total number of times each cluster has been emitted by the priority-queue feed", + &["show_id", "facility_id"] + ) + .expect("Failed to register cluster_polls_total counter"); + + /// Global counter mirroring the in-memory `CLUSTER_ROUNDS` atomic. + /// Equivalent to `sum(scheduler_cluster_polls_total)` but exposed for cheap + /// querying / alerting on overall feed throughput. + pub static ref CLUSTER_ROUNDS_TOTAL: Counter = register_counter!( + "scheduler_cluster_rounds_total", + "Total number of cluster pops across all clusters" + ) + .expect("Failed to register cluster_rounds_total counter"); + + /// Most-recent job count for each cluster — useful to inspect the productivity + /// bias' effect on dispatch ordering. + pub static ref CLUSTER_LAST_DISPATCHED_JOBS: GaugeVec = register_gauge_vec!( + "scheduler_cluster_last_dispatched_jobs", + "Jobs dispatched in the most recent processing cycle for each cluster", + &["show_id", "facility_id"] + ) + .expect("Failed to register cluster_last_dispatched_jobs gauge"); } /// Handler for the /metrics endpoint @@ -166,3 +192,27 @@ pub fn observe_time_to_book(duration: Duration) { pub fn observe_job_query_duration(duration: Duration) { JOB_QUERY_DURATION_SECONDS.observe(duration.as_secs_f64()); } + +/// Helper function to increment cluster polls counter. +/// +/// Bumps both the per-cluster `scheduler_cluster_polls_total` and the global +/// `scheduler_cluster_rounds_total` so dashboards can use whichever is cheaper. +#[inline] +pub fn increment_cluster_polls(show_id: &uuid::Uuid, facility_id: &uuid::Uuid) { + CLUSTER_POLLS_TOTAL + .with_label_values(&[&show_id.to_string(), &facility_id.to_string()]) + .inc(); + CLUSTER_ROUNDS_TOTAL.inc(); +} + +/// Records the number of jobs dispatched in the most recent cycle for a cluster. +#[inline] +pub fn set_cluster_last_dispatched_jobs( + show_id: &uuid::Uuid, + facility_id: &uuid::Uuid, + count: usize, +) { + CLUSTER_LAST_DISPATCHED_JOBS + .with_label_values(&[&show_id.to_string(), &facility_id.to_string()]) + .set(count as f64); +} diff --git a/rust/crates/scheduler/src/pipeline/entrypoint.rs b/rust/crates/scheduler/src/pipeline/entrypoint.rs index ba7ef55e4..223f478ba 100644 --- a/rust/crates/scheduler/src/pipeline/entrypoint.rs +++ b/rust/crates/scheduler/src/pipeline/entrypoint.rs @@ -12,7 +12,6 @@ use std::sync::atomic::{AtomicUsize, Ordering}; use std::sync::Arc; -use std::time::Duration; use futures::{stream, StreamExt}; use tokio::sync::mpsc; @@ -69,9 +68,8 @@ pub async fn run(cluster_feed: ClusterFeed) -> miette::Result<()> { ) .await; - match jobs { + let (processed_count, should_stop) = match jobs { Ok(jobs) => { - // Track number of jobs queried metrics::increment_jobs_queried(jobs.len()); let processed_jobs = AtomicUsize::new(0); @@ -87,33 +85,46 @@ pub async fn run(cluster_feed: ClusterFeed) -> miette::Result<()> { }, ) .await; - // If no jobs got processed, sleep to prevent hammering the database with - // queries with no outcome - if processed_jobs.load(Ordering::Relaxed) == 0 { - let _ = feed_sender - .send(FeedMessage::Sleep(cluster, Duration::from_secs(3))) - .await; - } - - // If empty_jobs_cycles_before_quiting is set, quit if nothing got processed - if let Some(limit) = CONFIG.queue.empty_job_cycles_before_quiting { - // Count cycles that couldn't find any job - if processed_jobs.load(Ordering::Relaxed) == 0 { - cycles_without_jobs.fetch_add(1, Ordering::Relaxed); - } else { - cycles_without_jobs.store(0, Ordering::Relaxed); - } - // Cancel stream processing after empty cycles - if cycles_without_jobs.load(Ordering::Relaxed) >= limit { - let _ = feed_sender.send(FeedMessage::Stop()).await; + let processed = processed_jobs.load(Ordering::Relaxed); + + let stop = match CONFIG.queue.empty_job_cycles_before_quiting { + Some(limit) => { + if processed == 0 { + cycles_without_jobs.fetch_add(1, Ordering::Relaxed); + } else { + cycles_without_jobs.store(0, Ordering::Relaxed); + } + cycles_without_jobs.load(Ordering::Relaxed) >= limit } - } + None => false, + }; + + (processed, stop) } Err(err) => { - let _ = feed_sender.send(FeedMessage::Stop()).await; error!("Failed to fetch job: {}", err); + (0, true) } + }; + + // Re-insert the cluster into the priority queue. An empty cycle gets a + // back-off so neighbors aren't starved while this cluster waits for work. + let sleep = if processed_count == 0 { + Some(CONFIG.queue.cluster_empty_back_off) + } else { + None + }; + let _ = feed_sender + .send(FeedMessage::Done { + cluster, + processed_jobs: processed_count, + sleep, + }) + .await; + + if should_stop { + let _ = feed_sender.send(FeedMessage::Stop).await; } } }) diff --git a/rust/crates/scheduler/tests/util.rs b/rust/crates/scheduler/tests/util.rs index a7bb6c29c..28475282b 100644 --- a/rust/crates/scheduler/tests/util.rs +++ b/rust/crates/scheduler/tests/util.rs @@ -97,6 +97,8 @@ pub fn create_test_config() -> Config { hostname_tags_chunk_size: 20, host_candidate_attempts_per_layer: 5, empty_job_cycles_before_quiting: Some(20), + cluster_empty_back_off: Duration::from_secs(3), + cluster_productivity_bias: true, mem_reserved_min: bytesize::ByteSize::mb(250), subscription_recalculation_interval: Duration::from_secs(3), resource_recalculation_interval: Duration::from_secs(10), From 3cdf885ebd2ea19e647242efc34126dc320ab09f Mon Sep 17 00:00:00 2001 From: Ramon Figueiredo Date: Wed, 13 May 2026 18:12:15 -0700 Subject: [PATCH 2/2] [rust-scheduler] Fix Ord/Eq contract and control-channel-close handling - Scheduled::cmp now always compares last_dispatched_jobs as a final tiebreaker so Ord agrees with PartialEq regardless of the cluster_productivity_bias toggle. Required by BinaryHeap. - Control loop now sets stop_flag and notifies the dispatcher when the control channel closes (e.g. consumer panic), preventing the dispatch loop from spinning on the empty-queue wake cycle. - Added ord_and_eq_stay_consistent_for_differing_jobs regression test. --- rust/crates/scheduler/src/cluster.rs | 33 ++++++++++++++++++- .../scheduler/src/pipeline/entrypoint.rs | 28 ++++++++++++---- 2 files changed, 53 insertions(+), 8 deletions(-) diff --git a/rust/crates/scheduler/src/cluster.rs b/rust/crates/scheduler/src/cluster.rs index b29bc88ce..dab68d9de 100644 --- a/rust/crates/scheduler/src/cluster.rs +++ b/rust/crates/scheduler/src/cluster.rs @@ -119,7 +119,16 @@ impl Ord for Scheduled { } } // Deterministic tiebreaker on the cluster identity. - self.cluster.cmp(&other.cluster) + let by_cluster = self.cluster.cmp(&other.cluster); + if by_cluster != CmpOrdering::Equal { + return by_cluster; + } + // Keep Ord consistent with Eq even when productivity bias is disabled. + // Without this final comparison, two entries that differ only by + // `last_dispatched_jobs` would yield `Ordering::Equal` here while + // `PartialEq::eq` returns false, violating the Ord/Eq contract that + // BinaryHeap relies on. + self.last_dispatched_jobs.cmp(&other.last_dispatched_jobs) } } impl PartialOrd for Scheduled { @@ -580,6 +589,11 @@ impl ClusterFeed { } } } + // Ensure the dispatch loop exits even if the control channel + // is dropped without an explicit Stop (e.g. consumer panics). + // Without this the dispatcher would spin on the empty-queue wake. + stop_flag_ctrl.store(true, Ordering::Relaxed); + notify_ctrl.notify_one(); }); control_sender @@ -643,6 +657,23 @@ mod tests { } } + /// Rust's `Ord` / `Eq` contract requires `cmp(a, b) == Equal` to imply + /// `a == b`. `BinaryHeap` relies on this. The bias toggle must not break it. + #[test] + fn ord_and_eq_stay_consistent_for_differing_jobs() { + let cluster = make_cluster("same"); + let now = Instant::now(); + let a = scheduled(cluster.clone(), now, 0); + let b = scheduled(cluster, now, 99); + + assert_ne!(a, b, "PartialEq sees them as distinct"); + assert_ne!( + a.cmp(&b), + CmpOrdering::Equal, + "Ord must not return Equal for two values PartialEq considers distinct" + ); + } + #[test] fn earlier_next_eligible_at_pops_first() { let now = Instant::now(); diff --git a/rust/crates/scheduler/src/pipeline/entrypoint.rs b/rust/crates/scheduler/src/pipeline/entrypoint.rs index 223f478ba..a5541ddc2 100644 --- a/rust/crates/scheduler/src/pipeline/entrypoint.rs +++ b/rust/crates/scheduler/src/pipeline/entrypoint.rs @@ -90,12 +90,17 @@ pub async fn run(cluster_feed: ClusterFeed) -> miette::Result<()> { let stop = match CONFIG.queue.empty_job_cycles_before_quiting { Some(limit) => { - if processed == 0 { - cycles_without_jobs.fetch_add(1, Ordering::Relaxed); + // Combine the update and read into a single atomic + // operation so concurrent clusters can't race between + // a fetch_add/store and a separate load. SeqCst gives + // us a strict total order across all clusters. + let new_count = if processed == 0 { + cycles_without_jobs.fetch_add(1, Ordering::SeqCst) + 1 } else { - cycles_without_jobs.store(0, Ordering::Relaxed); - } - cycles_without_jobs.load(Ordering::Relaxed) >= limit + cycles_without_jobs.swap(0, Ordering::SeqCst); + 0 + }; + new_count >= limit } None => false, }; @@ -103,8 +108,17 @@ pub async fn run(cluster_feed: ClusterFeed) -> miette::Result<()> { (processed, stop) } Err(err) => { - error!("Failed to fetch job: {}", err); - (0, true) + // Don't halt the entire scheduler for a single cluster's + // fetch error (network blip, transient DB load). Treat + // the cluster as empty for this cycle so it gets the + // configured back-off and other clusters keep running. + // The `empty_job_cycles_before_quiting` safety net still + // applies if the error persists across all clusters. + error!( + "Failed to fetch jobs for cluster {}: {}", + cluster, err + ); + (0, false) } };