diff --git a/rust/crates/scheduler/Cargo.toml b/rust/crates/scheduler/Cargo.toml index b2247f8c0..5d416b17b 100644 --- a/rust/crates/scheduler/Cargo.toml +++ b/rust/crates/scheduler/Cargo.toml @@ -18,7 +18,6 @@ debug = true opencue-proto = { path = "../opencue-proto" } # External Dependencies -actix = "0.13" chrono = "0.4.38" futures = { workspace = true } scc = "3.1" diff --git a/rust/crates/scheduler/src/cluster.rs b/rust/crates/scheduler/src/cluster.rs index 71a93d7c5..bd748187d 100644 --- a/rust/crates/scheduler/src/cluster.rs +++ b/rust/crates/scheduler/src/cluster.rs @@ -11,19 +11,20 @@ // the License. use std::{ - collections::{BTreeSet, HashMap, HashSet}, + cmp::Ordering as CmpOrdering, + collections::{BTreeSet, BinaryHeap, HashMap, HashSet}, sync::{ atomic::{AtomicBool, AtomicUsize, Ordering}, - Arc, Mutex, RwLock, + Arc, Mutex, }, - time::{Duration, SystemTime}, + time::{Duration, Instant}, }; use futures::StreamExt; use itertools::Itertools; use miette::{IntoDiagnostic, Result}; use serde::{Deserialize, Serialize}; -use tokio::sync::mpsc; +use tokio::sync::{mpsc, Notify}; use tracing::{debug, error, warn}; use uuid::Uuid; @@ -31,8 +32,14 @@ use crate::{ cluster_key::{Tag, TagType}, config::CONFIG, dao::{helpers::parse_uuid, ClusterDao}, + metrics, }; +/// Counts how many times a cluster has been emitted by the feed. +/// +/// Kept as a process-global atomic for source-level compatibility with smoke tests that +/// read it directly. Production observability is provided via the Prometheus +/// `scheduler_cluster_polls_total` counter (see [`metrics`]). pub static CLUSTER_ROUNDS: AtomicUsize = AtomicUsize::new(0); #[derive(Serialize, Deserialize, Debug, Clone, Hash, PartialEq, Eq, PartialOrd, Ord)] @@ -72,28 +79,95 @@ impl Cluster { } } +/// A cluster scheduled for dispatch, ordered by eligibility time and recent productivity. +/// +/// `BinaryHeap` is a max-heap, so [`Scheduled::cmp`] inverts `next_eligible_at` +/// (earliest wins) and keeps `last_dispatched_jobs` in natural order (busier wins +/// as a tiebreaker when productivity bias is enabled). +#[derive(Debug, Clone)] +struct Scheduled { + cluster: Cluster, + /// When this cluster becomes eligible for dispatch. A past `Instant` means + /// the cluster is ready right now. + next_eligible_at: Instant, + /// Number of jobs processed in the most recent dispatch: used as the + /// productivity bias tiebreaker. + last_dispatched_jobs: usize, +} + +impl PartialEq for Scheduled { + fn eq(&self, other: &Self) -> bool { + self.next_eligible_at == other.next_eligible_at + && self.last_dispatched_jobs == other.last_dispatched_jobs + && self.cluster == other.cluster + } +} +impl Eq for Scheduled {} + +impl Ord for Scheduled { + fn cmp(&self, other: &Self) -> CmpOrdering { + // Earliest `next_eligible_at` is highest priority. Invert for max-heap. + let by_time = other.next_eligible_at.cmp(&self.next_eligible_at); + if by_time != CmpOrdering::Equal { + return by_time; + } + // Productivity bias: more jobs in the last dispatch ranks higher. + if CONFIG.queue.cluster_productivity_bias { + let by_jobs = self.last_dispatched_jobs.cmp(&other.last_dispatched_jobs); + if by_jobs != CmpOrdering::Equal { + return by_jobs; + } + } + // Deterministic tiebreaker on the cluster identity. + let by_cluster = self.cluster.cmp(&other.cluster); + if by_cluster != CmpOrdering::Equal { + return by_cluster; + } + // Keep Ord consistent with Eq even when productivity bias is disabled. + // Without this final comparison, two entries that differ only by + // `last_dispatched_jobs` would yield `Ordering::Equal` here while + // `PartialEq::eq` returns false, violating the Ord/Eq contract that + // BinaryHeap relies on. + self.last_dispatched_jobs.cmp(&other.last_dispatched_jobs) + } +} +impl PartialOrd for Scheduled { + fn partial_cmp(&self, other: &Self) -> Option { + Some(self.cmp(other)) + } +} + #[derive(Debug)] pub struct ClusterFeed { - pub clusters: Arc>>, - current_index: Arc, + /// Priority queue of clusters awaiting dispatch. + queue: Arc>>, stop_flag: Arc, - sleep_map: Arc>>, + /// Wakes the dispatch loop when an entry is pushed back or shutdown is signaled. + notify: Arc, } /// Control messages for the cluster feed stream. /// -/// These messages are sent to the control channel returned by `ClusterFeed::stream()` -/// to influence feed behavior during runtime. +/// Sent to the channel returned by [`ClusterFeed::stream`] to report back the +/// outcome of processing a cluster (so the priority queue can re-rank it) or +/// to signal a graceful shutdown. pub enum FeedMessage { /// Stops the cluster feed stream gracefully. - Stop(), - /// Puts a specific cluster to sleep for the given duration. + Stop, + /// Reports the result of processing a cluster and re-inserts it into the + /// priority queue. /// /// # Fields /// - /// * `Cluster` - The cluster to put to sleep - /// * `Duration` - How long to sleep before the cluster can be processed again - Sleep(Cluster, Duration), + /// * `cluster` - The cluster that was just processed. + /// * `processed_jobs` - Jobs dispatched on this cycle. Drives the productivity-bias tiebreaker. + /// * `sleep` - Optional back-off duration. `None` re-queues the cluster as eligible now; + /// `Some(d)` defers it for at least `d`. + Done { + cluster: Cluster, + processed_jobs: usize, + sleep: Option, + }, } /// Builder for constructing a [`ClusterFeed`]. @@ -159,11 +233,24 @@ impl ClusterFeedBuilder { } ClusterFeed::filter_clusters(clusters.into_iter().collect(), &self.ignore_tags) }; + // Stripping ignored tags can collapse different inputs into the same + // Cluster identity (same facility / show / surviving tag set), so + // dedupe via a HashSet before seeding the heap to avoid scheduling + // the same logical cluster twice. + let clusters: HashSet = clusters.into_iter().collect(); + let now = Instant::now(); + let mut heap = BinaryHeap::with_capacity(clusters.len().max(1)); + for cluster in clusters { + heap.push(Scheduled { + cluster, + next_eligible_at: now, + last_dispatched_jobs: 0, + }); + } Ok(ClusterFeed { - clusters: Arc::new(RwLock::new(clusters)), - current_index: Arc::new(AtomicUsize::new(0)), + queue: Arc::new(Mutex::new(heap)), stop_flag: Arc::new(AtomicBool::new(false)), - sleep_map: Arc::new(Mutex::new(HashMap::new())), + notify: Arc::new(Notify::new()), }) } } @@ -189,6 +276,39 @@ impl ClusterFeed { } } + /// Builds a feed from a fixed list of clusters with optional tag filtering. + /// + /// Intended for tests and direct construction where the cluster set is already + /// known. Production code should prefer the builder ([`facility`](Self::facility) / + /// [`no_facility`](Self::no_facility)) which can also load clusters from the database. + /// + /// # Arguments + /// + /// * `clusters` - Explicit list of clusters to feed. + /// * `ignore_tags` - Tag names to drop; a cluster whose tag set becomes empty is excluded. + pub fn load_from_clusters(clusters: Vec, ignore_tags: &[String]) -> Self { + // Dedupe after ignore-tag filtering for the same reason as `build()`: + // stripping tags can fold distinct inputs onto the same Cluster + // identity, and we don't want the heap to schedule the duplicate. + let filtered: HashSet = Self::filter_clusters(clusters, ignore_tags) + .into_iter() + .collect(); + let now = Instant::now(); + let mut heap = BinaryHeap::with_capacity(filtered.len().max(1)); + for cluster in filtered { + heap.push(Scheduled { + cluster, + next_eligible_at: now, + last_dispatched_jobs: 0, + }); + } + ClusterFeed { + queue: Arc::new(Mutex::new(heap)), + stop_flag: Arc::new(AtomicBool::new(false)), + notify: Arc::new(Notify::new()), + } + } + /// Loads all clusters from the database and organizes them by tag type. /// /// Loads allocation clusters (one per facility+show+tag), and chunks manual/hostname tags @@ -336,148 +456,184 @@ impl ClusterFeed { .collect() } - /// Streams clusters to a channel receiver with backpressure control. + /// Streams clusters from a priority queue to a channel receiver. /// - /// Creates a producer-consumer pattern where clusters are sent through a channel - /// to the provided sender. The stream can be controlled via the returned message - /// channel (for sleep/stop commands). + /// Creates a producer-consumer pattern backed by a `BinaryHeap` + /// keyed on `(next_eligible_at asc, last_dispatched_jobs desc)`. The consumer + /// must report each cluster back via [`FeedMessage::Done`] so it can be + /// re-inserted with updated stats; otherwise the cluster is lost. /// /// # Arguments /// - /// * `sender` - Channel sender for emitting clusters + /// * `sender` - Channel sender for emitting eligible clusters. /// /// # Returns /// - /// * `mpsc::Sender` - Control channel for sending sleep/stop messages + /// * `mpsc::Sender` - Channel to send `Done` (re-insert) and `Stop` messages. /// /// # Behavior /// - /// - Iterates through clusters in round-robin fashion - /// - Skips sleeping clusters until their wake time expires - /// - Applies backoff delays between rounds (varies based on sleeping cluster count) - /// - Stops when receiving a Stop message or when configured empty cycles limit is reached - /// - Automatically cleans up expired sleep entries + /// - Pops the highest-priority cluster (earliest eligible, busiest as tiebreaker). + /// - If the popped entry is not yet eligible, sleeps until it is, but wakes + /// immediately on `Notify` so a freshly inserted higher-priority entry can preempt. + /// - On `FeedMessage::Done` re-inserts the cluster with the reported sleep + /// deadline and `last_dispatched_jobs` for the productivity bias. + /// - On `FeedMessage::Stop` sets the stop flag and exits. pub async fn stream(self, sender: mpsc::Sender) -> mpsc::Sender { - // Use a small channel to ensure the producer waits for items to be consumed before - // generating more - let (cancel_sender, mut feed_receiver) = mpsc::channel(8); + // Backpressure: bounded channel keeps the consumer from queueing unbounded Done messages. + let (control_sender, mut control_receiver) = mpsc::channel(32); let stop_flag = self.stop_flag.clone(); - let sleep_map = self.sleep_map.clone(); + let queue = self.queue.clone(); + let notify = self.notify.clone(); - // Stream clusters on the caller channel + // Dispatch loop: pop highest-priority cluster, sleep until eligible, send. + let queue_dispatch = queue.clone(); + let notify_dispatch = notify.clone(); + let stop_flag_dispatch = stop_flag.clone(); tokio::spawn(async move { - let mut all_sleeping_rounds = 0; - let feed = self.clusters.clone(); - let current_index_atomic = self.current_index.clone(); - loop { - // Check stop flag - if stop_flag.load(Ordering::Relaxed) { - warn!("Cluster received a stop message. Stopping feed."); + if stop_flag_dispatch.load(Ordering::Relaxed) { + warn!("Cluster feed received stop signal. Exiting dispatch loop."); break; } - let (item, cluster_size, completed_round) = { - let clusters = feed.read().unwrap_or_else(|poisoned| poisoned.into_inner()); - if clusters.is_empty() { - break; - } - - let current_index = current_index_atomic.load(Ordering::Relaxed); - let item = clusters[current_index].clone(); - let next_index = (current_index + 1) % clusters.len(); - let completed_round = next_index == 0; // Detect wrap-around - current_index_atomic.store(next_index, Ordering::Relaxed); - - (item, clusters.len(), completed_round) + let scheduled = { + let mut heap = + queue_dispatch.lock().unwrap_or_else(|p| p.into_inner()); + heap.pop() }; - // Skip cluster if it is marked as sleeping - let is_sleeping = { - let mut sleep_map_lock = sleep_map.lock().unwrap_or_else(|p| p.into_inner()); - if let Some(wake_up_time) = sleep_map_lock.get(&item) { - if *wake_up_time > SystemTime::now() { - // Still sleeping, skip it - true - } else { - // Remove expired entries - sleep_map_lock.remove(&item); - false + let scheduled = match scheduled { + Some(s) => s, + None => { + // No clusters available: all in flight. Wait for a re-insert + // or a stop signal. The short fallback sleep guarantees we + // re-check `stop_flag` if no notify ever arrives. + tokio::select! { + _ = notify_dispatch.notified() => {} + _ = tokio::time::sleep(Duration::from_millis(100)) => {} } - } else { - false + continue; } }; - if !is_sleeping && sender.send(item).await.is_err() { - warn!("Cluster receiver dropped. Stopping feed."); - break; + let now = Instant::now(); + if scheduled.next_eligible_at > now { + let sleep_dur = scheduled.next_eligible_at - now; + let preempted = tokio::select! { + _ = tokio::time::sleep(sleep_dur) => false, + _ = notify_dispatch.notified() => true, + }; + if preempted { + // A new entry was pushed: re-pop to pick the new best. + let mut heap = + queue_dispatch.lock().unwrap_or_else(|p| p.into_inner()); + heap.push(scheduled); + continue; + } + if stop_flag_dispatch.load(Ordering::Relaxed) { + break; + } } - // At end of round, add backoff sleep - if completed_round { - CLUSTER_ROUNDS.fetch_add(1, Ordering::Relaxed); - - // Check if all/most clusters are sleeping - let sleeping_count = { - let sleep_map_lock = sleep_map.lock().unwrap_or_else(|p| p.into_inner()); - sleep_map_lock.len() - }; - if sleeping_count >= cluster_size { - // Ensure this doesn't loop forever when there's a limit configured - all_sleeping_rounds += 1; - if let Some(max_empty_cycles) = CONFIG.queue.empty_job_cycles_before_quiting - { - if all_sleeping_rounds > max_empty_cycles { - warn!("All clusters have been sleeping for too long"); - break; + // Final shutdown gate before we publish. Without this, a stop + // arriving between the eligibility wait and the send below + // would still let us emit one more cluster (and a possibly + // long-blocked send couldn't be preempted at all). Race the + // channel reservation against a stop notification so we exit + // promptly if the consumer closed or shutdown is signalled. + if stop_flag_dispatch.load(Ordering::Relaxed) { + break; + } + let permit = loop { + tokio::select! { + reserve = sender.reserve() => { + match reserve { + Ok(p) => break Some(p), + Err(_) => { + warn!("Cluster receiver dropped. Stopping feed."); + break None; + } } } - - // All clusters sleeping, sleep longer - tokio::time::sleep(Duration::from_secs(5)).await; - } else if sleeping_count > 0 { - // Some clusters sleeping, brief pause - tokio::time::sleep(Duration::from_millis(100)).await; - } else { - // Active work, minimal pause - tokio::time::sleep(Duration::from_millis(10)).await; + _ = notify_dispatch.notified() => { + if stop_flag_dispatch.load(Ordering::Relaxed) { + break None; + } + // Notify but no stop yet — keep waiting for a slot. + } } - } + }; + let Some(permit) = permit else { break }; + + // Only count a poll once we have a slot and are about to publish. + CLUSTER_ROUNDS.fetch_add(1, Ordering::Relaxed); + metrics::increment_cluster_polls( + &scheduled.cluster.show_id, + &scheduled.cluster.facility_id, + ); + permit.send(scheduled.cluster); } }); - // Process messages on the receiving end - let sleep_map = self.sleep_map.clone(); + // Control loop: handle Done (re-insert) and Stop messages from the consumer. + let queue_ctrl = queue.clone(); + let notify_ctrl = notify.clone(); + let stop_flag_ctrl = stop_flag.clone(); tokio::spawn(async move { - while let Some(message) = feed_receiver.recv().await { + while let Some(message) = control_receiver.recv().await { match message { - FeedMessage::Sleep(cluster, duration) => { - if let Some(wake_up_time) = SystemTime::now().checked_add(duration) { - debug!("{:?} put to sleep for {}s", cluster, duration.as_secs()); - { - let mut sleep_map_lock = - sleep_map.lock().unwrap_or_else(|p| p.into_inner()); - sleep_map_lock.insert(cluster, wake_up_time); + FeedMessage::Done { + cluster, + processed_jobs, + sleep, + } => { + let next_eligible_at = match sleep { + Some(d) => { + debug!( + "{:?} re-queued with back-off of {}s ({} jobs processed)", + cluster, + d.as_secs(), + processed_jobs + ); + Instant::now() + .checked_add(d) + .unwrap_or_else(Instant::now) } - } else { - warn!( - "Sleep request ignored for {:?}. Invalid duration={}s", + None => Instant::now(), + }; + metrics::set_cluster_last_dispatched_jobs( + &cluster.show_id, + &cluster.facility_id, + processed_jobs, + ); + { + let mut heap = + queue_ctrl.lock().unwrap_or_else(|p| p.into_inner()); + heap.push(Scheduled { cluster, - duration.as_secs() - ); + next_eligible_at, + last_dispatched_jobs: processed_jobs, + }); } + notify_ctrl.notify_one(); } - FeedMessage::Stop() => { - self.stop_flag.store(true, Ordering::Relaxed); + FeedMessage::Stop => { + stop_flag_ctrl.store(true, Ordering::Relaxed); + notify_ctrl.notify_one(); break; } } } + // Ensure the dispatch loop exits even if the control channel + // is dropped without an explicit Stop (e.g. consumer panics). + // Without this the dispatcher would spin on the empty-queue wake. + stop_flag_ctrl.store(true, Ordering::Relaxed); + notify_ctrl.notify_one(); }); - cancel_sender + control_sender } } @@ -513,3 +669,259 @@ pub async fn get_show_id(show_name: &str) -> Result { let cluster_dao = ClusterDao::new().await?; cluster_dao.get_show_id(show_name).await.into_diagnostic() } + +#[cfg(test)] +mod tests { + use super::*; + use std::collections::BTreeSet; + + fn make_cluster(name: &str) -> Cluster { + Cluster { + facility_id: Uuid::new_v4(), + show_id: Uuid::new_v4(), + tags: BTreeSet::from([Tag { + name: name.to_string(), + ttype: TagType::Alloc, + }]), + } + } + + fn scheduled(cluster: Cluster, at: Instant, jobs: usize) -> Scheduled { + Scheduled { + cluster, + next_eligible_at: at, + last_dispatched_jobs: jobs, + } + } + + /// Rust's `Ord` / `Eq` contract requires `cmp(a, b) == Equal` to imply + /// `a == b`. `BinaryHeap` relies on this. The bias toggle must not break it. + #[test] + fn ord_and_eq_stay_consistent_for_differing_jobs() { + let cluster = make_cluster("same"); + let now = Instant::now(); + let a = scheduled(cluster.clone(), now, 0); + let b = scheduled(cluster, now, 99); + + assert_ne!(a, b, "PartialEq sees them as distinct"); + assert_ne!( + a.cmp(&b), + CmpOrdering::Equal, + "Ord must not return Equal for two values PartialEq considers distinct" + ); + } + + #[test] + fn earlier_next_eligible_at_pops_first() { + let now = Instant::now(); + let later = scheduled(make_cluster("a"), now + Duration::from_secs(10), 100); + let sooner = scheduled(make_cluster("b"), now, 0); + + let mut heap = BinaryHeap::new(); + heap.push(later); + heap.push(sooner); + + let popped = heap.pop().expect("heap not empty"); + assert_eq!(popped.last_dispatched_jobs, 0); + } + + #[test] + fn busier_cluster_pops_first_when_eligibility_ties() { + // This test relies on the default `cluster_productivity_bias = true`. + assert!( + CONFIG.queue.cluster_productivity_bias, + "test assumes default config: productivity bias enabled" + ); + + let now = Instant::now(); + let idle = scheduled(make_cluster("idle"), now, 0); + let busy = scheduled(make_cluster("busy"), now, 500); + + let mut heap = BinaryHeap::new(); + heap.push(idle); + heap.push(busy); + + let popped = heap.pop().expect("heap not empty"); + assert_eq!(popped.last_dispatched_jobs, 500); + } + + #[test] + fn sleeping_cluster_does_not_block_eligible_neighbor() { + let now = Instant::now(); + let sleeping = scheduled( + make_cluster("sleeping"), + now + Duration::from_secs(60), + 10, + ); + let ready = scheduled(make_cluster("ready"), now, 0); + + let mut heap = BinaryHeap::new(); + heap.push(sleeping); + heap.push(ready); + + let first = heap.pop().expect("heap not empty"); + let first_tag = first.cluster.tags.iter().next().unwrap().name.clone(); + assert_eq!(first_tag, "ready"); + } + + #[tokio::test] + async fn stream_round_trip_re_inserts_via_done() { + let cluster = make_cluster("solo"); + let mut heap = BinaryHeap::new(); + heap.push(Scheduled { + cluster: cluster.clone(), + next_eligible_at: Instant::now(), + last_dispatched_jobs: 0, + }); + + let feed = ClusterFeed { + queue: Arc::new(Mutex::new(heap)), + stop_flag: Arc::new(AtomicBool::new(false)), + notify: Arc::new(Notify::new()), + }; + + let (tx, mut rx) = mpsc::channel::(8); + let control = feed.stream(tx).await; + + // Receive the cluster, report it back as productive, receive it again. + let emitted = rx.recv().await.expect("feed emitted a cluster"); + assert_eq!(emitted, cluster); + + control + .send(FeedMessage::Done { + cluster: emitted.clone(), + processed_jobs: 7, + sleep: None, + }) + .await + .expect("control channel open"); + + let emitted_again = tokio::time::timeout(Duration::from_secs(1), rx.recv()) + .await + .expect("cluster re-emitted within 1s") + .expect("cluster present"); + assert_eq!(emitted_again, cluster); + + // Tidy up so the spawned tasks exit. + control.send(FeedMessage::Stop).await.ok(); + } + + #[tokio::test] + async fn busy_cluster_gets_more_polls_than_idle() { + // Two clusters share the feed. The "busy" cluster reports productive + // work with no back-off; the "idle" cluster reports zero jobs with a + // 100ms back-off. Over a 500ms window busy should be polled many more + // times than idle. + let busy = make_cluster("busy"); + let idle = make_cluster("idle"); + let now = Instant::now(); + let mut heap = BinaryHeap::new(); + heap.push(Scheduled { + cluster: busy.clone(), + next_eligible_at: now, + last_dispatched_jobs: 0, + }); + heap.push(Scheduled { + cluster: idle.clone(), + next_eligible_at: now, + last_dispatched_jobs: 0, + }); + + let feed = ClusterFeed { + queue: Arc::new(Mutex::new(heap)), + stop_flag: Arc::new(AtomicBool::new(false)), + notify: Arc::new(Notify::new()), + }; + + let (tx, mut rx) = mpsc::channel::(16); + let control = feed.stream(tx).await; + + let mut busy_polls = 0usize; + let mut idle_polls = 0usize; + let test_duration = Duration::from_millis(500); + let idle_back_off = Duration::from_millis(100); + let deadline = Instant::now() + test_duration; + + while Instant::now() < deadline { + let remaining = deadline.saturating_duration_since(Instant::now()); + let next = tokio::time::timeout(remaining, rx.recv()).await; + let Ok(Some(cluster)) = next else { break }; + + let is_busy = cluster.tags.iter().next().unwrap().name == "busy"; + let (processed_jobs, sleep) = if is_busy { + busy_polls += 1; + (100, None) + } else { + idle_polls += 1; + (0, Some(idle_back_off)) + }; + control + .send(FeedMessage::Done { + cluster, + processed_jobs, + sleep, + }) + .await + .ok(); + } + + control.send(FeedMessage::Stop).await.ok(); + + // Idle is back-off-bound: in 500ms with a 100ms back-off it can be + // polled at most ~5 times. Busy has no back-off and is only limited + // by task scheduling, so it should run away. + assert!( + busy_polls > idle_polls * 2, + "expected busy polls (>2x) idle polls; busy={busy_polls} idle={idle_polls}" + ); + assert!(busy_polls > 0 && idle_polls > 0, "both clusters polled at least once"); + } + + #[tokio::test] + async fn stream_respects_sleep_back_off() { + let cluster = make_cluster("naps"); + let mut heap = BinaryHeap::new(); + heap.push(Scheduled { + cluster: cluster.clone(), + next_eligible_at: Instant::now(), + last_dispatched_jobs: 0, + }); + + let feed = ClusterFeed { + queue: Arc::new(Mutex::new(heap)), + stop_flag: Arc::new(AtomicBool::new(false)), + notify: Arc::new(Notify::new()), + }; + + let (tx, mut rx) = mpsc::channel::(8); + let control = feed.stream(tx).await; + + // First emission is immediate. + rx.recv().await.expect("first emission"); + + // Re-queue with a 250ms back-off. + let back_off = Duration::from_millis(250); + let started = Instant::now(); + control + .send(FeedMessage::Done { + cluster: cluster.clone(), + processed_jobs: 0, + sleep: Some(back_off), + }) + .await + .expect("control channel open"); + + // Should not arrive before the back-off, should arrive shortly after. + let _ = tokio::time::timeout(Duration::from_secs(2), rx.recv()) + .await + .expect("cluster re-emitted within 2s") + .expect("cluster present"); + let elapsed = started.elapsed(); + assert!( + elapsed >= back_off, + "cluster re-emitted in {elapsed:?}, expected >= {back_off:?}" + ); + + control.send(FeedMessage::Stop).await.ok(); + } +} diff --git a/rust/crates/scheduler/src/config/mod.rs b/rust/crates/scheduler/src/config/mod.rs index 3d46e1419..634d520c0 100644 --- a/rust/crates/scheduler/src/config/mod.rs +++ b/rust/crates/scheduler/src/config/mod.rs @@ -82,6 +82,14 @@ pub struct QueueConfig { pub hostname_tags_chunk_size: usize, pub host_candidate_attempts_per_layer: usize, pub empty_job_cycles_before_quiting: Option, + /// Delay before an empty cluster (no eligible jobs) becomes eligible for dispatch again. + /// Used by the priority-queue cluster feed to back off without blocking neighbors. + #[serde(with = "humantime_serde")] + pub cluster_empty_back_off: Duration, + /// When `true`, the cluster priority queue biases toward clusters that produced more + /// jobs in the most recent dispatch (productivity bias). When `false`, all clusters + /// compete strictly on `next_eligible_at` with a stable tiebreaker. + pub cluster_productivity_bias: bool, pub mem_reserved_min: ByteSize, #[serde(with = "humantime_serde")] pub subscription_recalculation_interval: Duration, @@ -108,6 +116,8 @@ impl Default for QueueConfig { hostname_tags_chunk_size: 300, host_candidate_attempts_per_layer: 10, empty_job_cycles_before_quiting: None, + cluster_empty_back_off: Duration::from_secs(3), + cluster_productivity_bias: true, mem_reserved_min: ByteSize::mib(250), subscription_recalculation_interval: Duration::from_secs(3), resource_recalculation_interval: Duration::from_secs(10), @@ -222,6 +232,41 @@ pub struct HostCacheConfig { #[serde(with = "humantime_serde")] pub host_staleness_threshold: Duration, pub update_stat_on_book: bool, + /// Safety net for leaked host reservations. The reservation system relies + /// on explicit `check_in` / `Invalidate` calls; this TTL only fires when a + /// task crashes or otherwise drops a checked-out host without releasing it. + /// Should be larger than the longest plausible dispatch (slow RQD, retries). + #[serde(with = "humantime_serde")] + pub host_reservation_safety_ttl: Duration, + /// Circuit-breaker configuration for host-cache DB queries. + pub db_circuit_breaker: DbCircuitBreakerConfig, +} + +/// Circuit-breaker tuning for host-cache database queries. +/// +/// On consecutive failures the breaker opens for an exponentially growing +/// window (`base_backoff` → `max_backoff`), short-circuiting further queries +/// without hitting the DB. Once `failure_threshold` consecutive failures +/// accumulate, the scheduler exits with status 1 so the orchestrator restarts +/// it cleanly rather than letting it limp along. +#[derive(Debug, Deserialize, Clone)] +#[serde(default)] +pub struct DbCircuitBreakerConfig { + pub failure_threshold: u32, + #[serde(with = "humantime_serde")] + pub base_backoff: Duration, + #[serde(with = "humantime_serde")] + pub max_backoff: Duration, +} + +impl Default for DbCircuitBreakerConfig { + fn default() -> Self { + Self { + failure_threshold: 10, + base_backoff: Duration::from_millis(500), + max_backoff: Duration::from_secs(30), + } + } } impl Default for HostCacheConfig { @@ -236,6 +281,8 @@ impl Default for HostCacheConfig { concurrent_fetch_permit: 4, host_staleness_threshold: Duration::from_secs(2 * 60), // 2 minutes update_stat_on_book: false, + host_reservation_safety_ttl: Duration::from_secs(5 * 60), // 5 minutes + db_circuit_breaker: DbCircuitBreakerConfig::default(), } } } diff --git a/rust/crates/scheduler/src/dao/host_dao.rs b/rust/crates/scheduler/src/dao/host_dao.rs index b517a340e..dda67b9bb 100644 --- a/rust/crates/scheduler/src/dao/host_dao.rs +++ b/rust/crates/scheduler/src/dao/host_dao.rs @@ -295,44 +295,30 @@ impl HostDao { /// * `Ok(true)` - Lock successfully acquired /// * `Ok(false)` - Lock already held by another process /// * `Err(miette::Error)` - Database operation failed - pub async fn lock( - &self, - transaction: &mut Transaction<'_, Postgres>, - host_id: &Uuid, - ) -> Result { - trace!("Locking {}", host_id); - sqlx::query_scalar::<_, bool>("SELECT pg_try_advisory_lock(hashtext($1))") - .bind(host_id.to_string()) - .fetch_one(&mut **transaction) - .await - .into_diagnostic() - .wrap_err("Failed to acquire advisory lock") - } - - /// Releases an advisory lock on a host after dispatch completion. + /// Acquires a transaction-scoped advisory lock on a host. /// - /// Releases the PostgreSQL advisory lock that was acquired during - /// the dispatch process, allowing other dispatchers to access the host. - /// - /// # Arguments - /// * `host_id` - The UUID of the host to unlock + /// Uses `pg_try_advisory_xact_lock`, which auto-releases on COMMIT or + /// ROLLBACK. This is the preferred lock variant for short critical + /// sections inside the per-proc transaction — callers don't need to issue + /// an explicit unlock and the lock cannot leak if the connection is + /// returned to the pool while still session-locked. /// /// # Returns - /// * `Ok(true)` - Lock successfully released - /// * `Ok(false)` - Lock was not held by this process - /// * `Err(miette::Error)` - Database operation failed - pub async fn unlock( + /// * `Ok(true)` — lock acquired (and will release at tx end) + /// * `Ok(false)` — another transaction holds the lock; caller should skip + /// * `Err(_)` — database error + pub async fn try_xact_lock( &self, transaction: &mut Transaction<'_, Postgres>, host_id: &Uuid, ) -> Result { - trace!("Unlocking {}", host_id); - sqlx::query_scalar::<_, bool>("SELECT pg_advisory_unlock(hashtext($1))") + trace!("Acquiring xact advisory lock for {}", host_id); + sqlx::query_scalar::<_, bool>("SELECT pg_try_advisory_xact_lock(hashtext($1))") .bind(host_id.to_string()) .fetch_one(&mut **transaction) .await .into_diagnostic() - .wrap_err("Failed to release advisory lock") + .wrap_err("Failed to acquire transaction-scoped advisory lock") } /// Updates a host's available resource counts after frame dispatch. diff --git a/rust/crates/scheduler/src/dao/layer_dao.rs b/rust/crates/scheduler/src/dao/layer_dao.rs index 965a478a8..80979fef7 100644 --- a/rust/crates/scheduler/src/dao/layer_dao.rs +++ b/rust/crates/scheduler/src/dao/layer_dao.rs @@ -289,6 +289,23 @@ ORDER BY lf.int_layer_order "#; +/// Guard returned by [`LayerDao::try_lock_layer`] holding an open transaction +/// that has a row-level lock on the layer. Drop / commit the guard to release +/// the lock so other schedulers can pick the layer up. +pub struct LayerLockGuard { + transaction: sqlx::Transaction<'static, Postgres>, +} + +impl LayerLockGuard { + /// Releases the lock by committing the underlying empty transaction. + /// Dropping the guard without calling this also releases (via implicit + /// rollback) but that path logs an extra rollback failure on some + /// sqlx versions; prefer this explicit release on the happy path. + pub async fn release(self) -> Result<(), sqlx::Error> { + self.transaction.commit().await + } +} + impl LayerDao { /// Creates a new LayerDao from database configuration. /// @@ -339,6 +356,37 @@ impl LayerDao { Ok(self.group_layers_and_frames(combined_models)) } + /// Attempts to acquire a row-level lock on a layer via + /// `SELECT … FOR UPDATE SKIP LOCKED`. + /// + /// Used to deduplicate dispatch attempts across concurrent scheduler + /// processes / replicas — if another scheduler already holds the row's + /// lock, this returns `Ok(None)` immediately (no waiting) and the caller + /// should skip the layer. + /// + /// The lock is scoped to the returned [`LayerLockGuard`]'s transaction and + /// is released when the guard is committed or dropped. + pub async fn try_lock_layer( + &self, + layer_id: Uuid, + ) -> Result, sqlx::Error> { + let mut tx = self.connection_pool.begin().await?; + let locked: Option = sqlx::query_scalar( + "SELECT pk_layer FROM layer WHERE pk_layer = $1 FOR UPDATE SKIP LOCKED", + ) + .bind(layer_id.to_string()) + .fetch_optional(&mut *tx) + .await?; + + if locked.is_some() { + Ok(Some(LayerLockGuard { transaction: tx })) + } else { + // No row returned means another transaction already locked it. + let _ = tx.rollback().await; + Ok(None) + } + } + /// Groups flat query results into layers with their associated frames. /// /// Transforms the denormalized query results into a structured hierarchy diff --git a/rust/crates/scheduler/src/host_cache/actor.rs b/rust/crates/scheduler/src/host_cache/actor.rs index 40d098850..e96d2eb38 100644 --- a/rust/crates/scheduler/src/host_cache/actor.rs +++ b/rust/crates/scheduler/src/host_cache/actor.rs @@ -10,8 +10,6 @@ // or implied. See the License for the specific language governing permissions and limitations under // the License. -use actix::{Actor, ActorFutureExt, AsyncContext, Handler, ResponseActFuture, WrapFuture}; - use bytesize::ByteSize; use itertools::Itertools; use miette::IntoDiagnostic; @@ -19,16 +17,16 @@ use scc::{hash_map::OccupiedEntry, HashMap, HashSet}; use std::{ cmp::Ordering, sync::{ - atomic::{self, AtomicU64}, - Arc, + atomic::{self, AtomicU32, AtomicU64}, + Arc, Mutex, }, - time::{Duration, SystemTime}, + time::{Duration, Instant, SystemTime}, }; use futures::{stream, StreamExt}; use miette::Result; use tokio::sync::Semaphore; -use tracing::{debug, error, info, trace}; +use tracing::{debug, error, info, trace, warn}; use crate::{ cluster_key::{ClusterKey, Tag, TagType}, @@ -46,10 +44,90 @@ pub struct HostCacheService { cache_hit: Arc, cache_miss: Arc, concurrency_semaphore: Arc, + db_circuit: Arc, +} + +/// Circuit breaker around host-cache DB queries. +/// +/// A transient query failure (network blip, statement timeout) trips the +/// breaker for an exponentially growing window so concurrent callers fail +/// fast instead of all hitting the DB. On success the breaker resets. +/// After `failure_threshold` consecutive failures the scheduler exits with +/// status 1 — the orchestration layer (k8s, systemd) handles the restart. +pub(super) struct DbCircuitBreaker { + consecutive_failures: AtomicU32, + open_until: Mutex>, +} + +impl DbCircuitBreaker { + fn new() -> Self { + Self { + consecutive_failures: AtomicU32::new(0), + open_until: Mutex::new(None), + } + } + + /// Returns `Some(remaining)` if the breaker is currently open; the caller + /// should short-circuit. Returns `None` if it's safe to proceed. + fn check_open(&self) -> Option { + let guard = self.open_until.lock().unwrap_or_else(|p| p.into_inner()); + match *guard { + Some(until) => { + let now = Instant::now(); + if until > now { + Some(until - now) + } else { + None + } + } + None => None, + } + } + + fn record_success(&self) { + self.consecutive_failures + .store(0, atomic::Ordering::Relaxed); + *self + .open_until + .lock() + .unwrap_or_else(|p| p.into_inner()) = None; + } + + /// Records a failure and opens the breaker for an exponential backoff. + /// Returns `true` when the consecutive-failure threshold has been reached + /// and the caller should escalate to a clean process exit. + fn record_failure(&self) -> bool { + let cfg = &CONFIG.host_cache.db_circuit_breaker; + let count = self + .consecutive_failures + .fetch_add(1, atomic::Ordering::Relaxed) + + 1; + if count >= cfg.failure_threshold { + return true; + } + // Exponential backoff (capped). Clamp the shift so the multiplier + // can't overflow u64 even on huge failure counts. + let shift = count.min(20).saturating_sub(1); + let multiplier = 1u64.checked_shl(shift).unwrap_or(u64::MAX); + let base_ms = cfg.base_backoff.as_millis() as u64; + let backoff_ms = base_ms.saturating_mul(multiplier); + let backoff = Duration::from_millis(backoff_ms).min(cfg.max_backoff); + *self + .open_until + .lock() + .unwrap_or_else(|p| p.into_inner()) = Some(Instant::now() + backoff); + false + } } -/// Use a reservation system to prevent race conditions when trying to book a host -/// that belongs to multiple groups. +/// Reservation guard preventing a host from being booked twice while it's +/// checked out. The primary lifecycle is explicit: `check_out` reserves and +/// `check_in` / `Invalidate` releases. [`expired`](Self::expired) is only a +/// safety net for leaked reservations (e.g. a task panics between check-out +/// and check-in). The TTL is configurable via +/// [`HostCacheConfig::host_reservation_safety_ttl`] and should be larger than +/// the longest plausible dispatch so a slow RQD call cannot accidentally +/// release the reservation mid-flight. struct HostReservation { reserved_time: SystemTime, } @@ -62,97 +140,61 @@ impl HostReservation { } pub fn expired(&self) -> bool { - self.reserved_time.elapsed().unwrap_or_default() > Duration::from_secs(10) + self.reserved_time.elapsed().unwrap_or_default() + > CONFIG.host_cache.host_reservation_safety_ttl } } -impl Actor for HostCacheService { - type Context = actix::Context; - - fn started(&mut self, ctx: &mut Self::Context) { - let service_for_monitor = self.clone(); - let service_for_clean_up = self.clone(); - - ctx.run_interval(CONFIG.host_cache.monitoring_interval, move |_act, ctx| { - let service = service_for_monitor.clone(); - let actor_clone = service.clone(); - ctx.spawn(async move { service.refresh_cache().await }.into_actor(&actor_clone)); +impl HostCacheService { + /// Spawns the background tasks that periodically refresh and prune the + /// cache. Returns immediately; the spawned tasks live for the lifetime + /// of the process. + pub fn spawn_background_tasks(&self) { + let monitor = self.clone(); + tokio::spawn(async move { + let mut interval = tokio::time::interval(CONFIG.host_cache.monitoring_interval); + interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip); + // First tick fires immediately; skip it to mirror Actix + // `run_interval` semantics which fire after the first interval. + interval.tick().await; + loop { + interval.tick().await; + monitor.refresh_cache().await; + } }); - ctx.run_interval(CONFIG.host_cache.clean_up_interval, move |_act, _ctx| { - let service = service_for_clean_up.clone(); - - // Clean up stale hosts from the host store - service.cleanup_stale_hosts(); + let cleaner = self.clone(); + tokio::spawn(async move { + let mut interval = tokio::time::interval(CONFIG.host_cache.clean_up_interval); + interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip); + interval.tick().await; + loop { + interval.tick().await; + cleaner.cleanup_stale_hosts(); + } }); - info!("HostCacheService actor started"); + info!("HostCacheService background tasks spawned"); } - fn stopped(&mut self, _ctx: &mut Self::Context) { - info!("HostCacheService actor stopped"); - } -} - -impl Handler> for HostCacheService -where - F: Fn(&Host) -> bool + 'static, -{ - type Result = ResponseActFuture>; - - fn handle(&mut self, msg: CheckOut, _ctx: &mut Self::Context) -> Self::Result { - let CheckOut { - facility_id, - show_id, - tags, - cores, - memory, - validation, - } = msg; - - let service = self.clone(); - - Box::pin( - async move { - let out = service - .check_out(facility_id, show_id, tags, cores, memory, validation) - .await; - if let Ok(host) = &out { - debug!("Checked out {}", host.1); - } - out - } - .into_actor(self) - .map(|result, _, _| result), - ) - } -} - -impl Handler for HostCacheService { - type Result = (); - - fn handle(&mut self, msg: CheckIn, _ctx: &mut Self::Context) -> Self::Result { - let CheckIn(cluster_key, payload) = msg; + /// Public wrapper around the internal `check_in` that adds debug logging + /// and handles the Invalidate path uniformly. + pub fn check_in_payload(&self, cluster_key: ClusterKey, payload: CheckInPayload) { match payload { CheckInPayload::Host(host) => { let host_str = format!("{}", host); self.check_in(cluster_key, host); - debug!("Checked in {}", &host_str); } CheckInPayload::Invalidate(host_id) => { let _ = self.reserved_hosts.remove_sync(&host_id); - debug!("Checked in {} (invalid)", &host_id); } } } -} - -impl Handler for HostCacheService { - type Result = CacheRatioResponse; - fn handle(&mut self, _msg: CacheRatio, _ctx: &mut Self::Context) -> Self::Result { + /// Returns the current cache hit/miss snapshot. + pub fn cache_ratio(&self) -> CacheRatioResponse { CacheRatioResponse { hit: self.cache_hit.load(atomic::Ordering::Relaxed), miss: self.cache_miss.load(atomic::Ordering::Relaxed), @@ -181,6 +223,7 @@ impl HostCacheService { CONFIG.host_cache.concurrent_fetch_permit, )), reserved_hosts: Arc::new(HashMap::new()), + db_circuit: Arc::new(DbCircuitBreaker::new()), }) } @@ -203,7 +246,7 @@ impl HostCacheService { /// /// * `Ok(CheckedOutHost)` - Host with cluster key /// * `Err(HostCacheError)` - No suitable host found or database error - async fn check_out( + pub async fn check_out( &self, facility_id: Uuid, show_id: Uuid, @@ -463,6 +506,20 @@ impl HostCacheService { &self, key: &ClusterKey, ) -> Result> { + // If a recent failure tripped the breaker, fail fast without hitting + // the DB. Callers (matcher) treat this as "no candidates available" + // and skip the current layer cleanly. + if let Some(remaining) = self.db_circuit.check_open() { + debug!( + "Host cache circuit open for {:?} more — short-circuiting fetch", + remaining + ); + return Err(miette::miette!( + "host cache DB circuit open (retry in {:?})", + remaining + )); + } + let _permit = self .concurrency_semaphore .acquire() @@ -470,11 +527,34 @@ impl HostCacheService { .into_diagnostic()?; let tag = key.tag.to_string(); - let hosts = self + let hosts = match self .host_dao .fetch_hosts_by_show_facility_tag(key.show_id, key.facility_id, &tag) .await - .into_diagnostic()?; + { + Ok(hosts) => { + self.db_circuit.record_success(); + hosts + } + Err(err) => { + let escalate = self.db_circuit.record_failure(); + if escalate { + error!( + "Host cache DB query failed past circuit-breaker threshold. \ + Exiting so the orchestrator can restart the scheduler. Last error: {}", + err + ); + // Clean exit (no panic backtrace). The orchestration layer + // (k8s, systemd, etc.) handles the restart. + std::process::exit(1); + } + warn!( + "Host cache DB query failed (transient, circuit will back off): {}", + err + ); + return Err(err).into_diagnostic(); + } + }; let cache = self.cluster_index.entry_sync(key.clone()).or_default(); @@ -493,3 +573,43 @@ impl HostCacheService { Ok(cache) } } + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn breaker_closed_by_default() { + let cb = DbCircuitBreaker::new(); + assert!(cb.check_open().is_none()); + } + + #[test] + fn breaker_opens_after_failure_and_closes_after_success() { + let cb = DbCircuitBreaker::new(); + + let escalate = cb.record_failure(); + assert!(!escalate, "single failure shouldn't escalate"); + assert!( + cb.check_open().is_some(), + "breaker should open after a failure" + ); + + cb.record_success(); + assert!(cb.check_open().is_none(), "success should reset breaker"); + } + + #[test] + fn breaker_escalates_after_threshold() { + let cb = DbCircuitBreaker::new(); + let threshold = CONFIG.host_cache.db_circuit_breaker.failure_threshold; + // Trip up to (threshold - 1) without escalating, then the next call escalates. + for _ in 0..threshold - 1 { + assert!(!cb.record_failure()); + } + assert!( + cb.record_failure(), + "Nth consecutive failure should signal escalation" + ); + } +} diff --git a/rust/crates/scheduler/src/host_cache/cache.rs b/rust/crates/scheduler/src/host_cache/cache.rs index d41252d34..8cf2030cc 100644 --- a/rust/crates/scheduler/src/host_cache/cache.rs +++ b/rust/crates/scheduler/src/host_cache/cache.rs @@ -35,7 +35,7 @@ use std::{ cell::RefCell, collections::{BTreeMap, HashSet}, - sync::RwLock, + sync::{Arc, Mutex, RwLock}, time::{Duration, SystemTime}, }; @@ -55,9 +55,19 @@ type MemoryKey = u64; /// A B-Tree of Hosts ordered by memory pub type MemoryBTree = BTreeMap>; +/// Per-CoreKey bucket. Wrapped in an `Arc` so concurrent check-outs and +/// check-ins on disjoint core sizes don't contend with each other — the outer +/// `RwLock` only protects the (rarely changing) bucket directory. +type MemoryBucket = Arc>; + pub struct HostCache { - /// B-Tree of host groups ordered by their number of available cores - hosts_index: RwLock>, + /// Bucket directory keyed by available cores. The outer `RwLock` is read + /// on every check-out / check-in to look up a bucket and almost never + /// upgraded to write (only when a brand-new `CoreKey` shows up). Once a + /// bucket Arc is obtained, the outer lock is released; the bucket's own + /// `Mutex` guards the inner `MemoryBTree`, so different buckets are fully + /// independent. + hosts_index: RwLock>, /// If a cache stops being queried for a certain amount of time, stop keeping it up to date last_queried: RwLock, /// Marks if the data on this cache have expired @@ -206,74 +216,83 @@ impl HostCache { let mut attempts = 5; loop { - // Step 1: Find a candidate host in the index - let candidate_info = { - let host_index_lock = self.hosts_index.read().unwrap_or_else(|p| p.into_inner()); - let mut iter: Box> = - if !self.strategy.core_saturation { - // Reverse order to find hosts with max amount of cores available - Box::new(host_index_lock.range(core_key..).rev()) - } else { - Box::new(host_index_lock.range(core_key..)) - }; - - iter.find_map(|(by_core_key, hosts_by_memory)| { - let find_fn = |(by_memory_key, hosts): (&u64, &HashSet)| { - hosts.iter().find_map(|host_id| { - HOST_STORE.get(host_id).and_then(|host| { - // Check validation and memory capacity - if host_validation(&host) { - Some(( - *by_core_key, - *by_memory_key, - *host_id, - host.last_updated, - )) - } else { - None - } - }) - }) - }; - - if self.strategy.memory_saturation { - // Search for hosts with at least the same amount of memory requested - hosts_by_memory.range(memory_key..).find_map(find_fn) - } else { - // Search for hosts with the most amount of memory available - hosts_by_memory.range(memory_key..).rev().find_map(find_fn) - } - }) + // Step 1: Snapshot the candidate bucket list under the outer read + // lock — this clones a handful of Arc> handles and drops + // the outer lock immediately, so check-ins on other buckets are + // not blocked while we scan. + let bucket_arcs: Vec<(CoreKey, MemoryBucket)> = { + let outer = self.hosts_index.read().unwrap_or_else(|p| p.into_inner()); + if !self.strategy.core_saturation { + // Reverse order: prefer hosts with the most cores + outer + .range(core_key..) + .rev() + .map(|(&k, v)| (k, v.clone())) + .collect() + } else { + outer + .range(core_key..) + .map(|(&k, v)| (k, v.clone())) + .collect() + } }; - // Step 2: Attempt atomic removal if we found a candidate - if let Some((by_core_key, by_memory_key, host_id, expected_last_updated)) = + // Step 2: Scan each bucket independently. Holding only one bucket + // mutex at a time keeps disjoint core groups concurrent. + let candidate_info = + bucket_arcs + .into_iter() + .find_map(|(by_core_key, bucket)| { + let bucket_lock = bucket.lock().unwrap_or_else(|p| p.into_inner()); + let find_fn = + |(by_memory_key, hosts): (&u64, &HashSet)| { + hosts.iter().find_map(|host_id| { + HOST_STORE.get(host_id).and_then(|host| { + if host_validation(&host) { + Some(( + *by_memory_key, + *host_id, + host.last_updated, + )) + } else { + None + } + }) + }) + }; + let inner = if self.strategy.memory_saturation { + bucket_lock.range(memory_key..).find_map(find_fn) + } else { + bucket_lock.range(memory_key..).rev().find_map(find_fn) + }; + drop(bucket_lock); + inner.map(|(mk, hid, ts)| { + (by_core_key, mk, hid, ts, bucket.clone()) + }) + }); + + // Step 3: Attempt atomic removal if we found a candidate + if let Some((_by_core_key, by_memory_key, host_id, expected_last_updated, bucket)) = candidate_info { - // Atomic check-and-remove from HOST_STORE - // Ensure host is still valid when it's time to remove it match HOST_STORE.atomic_remove_if_valid( &host_id, expected_last_updated, host_validation, ) { Ok(Some(removed_host)) => { - // Successfully removed from store, now remove from index - let mut host_index_lock = - self.hosts_index.write().unwrap_or_else(|p| p.into_inner()); - - // Remove from hosts_by_core_and_memory index - host_index_lock - .get_mut(&by_core_key) - .and_then(|hosts_by_memory| hosts_by_memory.get_mut(&by_memory_key)) - .map(|hosts| hosts.remove(&host_id)); - + // Remove from this single bucket's index. Other buckets + // are untouched. + let mut bucket_lock = + bucket.lock().unwrap_or_else(|p| p.into_inner()); + if let Some(hosts) = bucket_lock.get_mut(&by_memory_key) { + hosts.remove(&host_id); + } return Some(removed_host); } Ok(None) | Err(()) => { // Host was removed by another thread. Try another candidate attempts -= 1; - // Mark the failed candidate to avoid retrying it again. failed_candidates.borrow_mut().push(host_id); if attempts <= 0 { break; @@ -309,18 +328,30 @@ impl HostCache { let core_key = last_host_version.idle_cores.value() as CoreKey; let memory_key = Self::gen_memory_key(last_host_version.idle_memory); - let mut host_index = self - .hosts_index - .write() - .unwrap_or_else(|poisoned| poisoned.into_inner()); - - // Insert at the new location - host_index - .entry(core_key) - .or_default() - .entry(memory_key) - .or_default() - .insert(host_id); + // Double-checked locking: fast path takes only the outer read lock + // and grabs an existing bucket Arc. Slow path (rare) upgrades to the + // outer write lock only when a brand-new CoreKey appears. + let bucket: MemoryBucket = { + let outer = self + .hosts_index + .read() + .unwrap_or_else(|poisoned| poisoned.into_inner()); + outer.get(&core_key).cloned() + } + .unwrap_or_else(|| { + let mut outer = self + .hosts_index + .write() + .unwrap_or_else(|poisoned| poisoned.into_inner()); + outer + .entry(core_key) + .or_insert_with(|| Arc::new(Mutex::new(BTreeMap::new()))) + .clone() + }); + + // Per-bucket mutex insert. Disjoint core sizes proceed in parallel. + let mut bucket_lock = bucket.lock().unwrap_or_else(|p| p.into_inner()); + bucket_lock.entry(memory_key).or_default().insert(host_id); } /// Generates a memory key for cache indexing by bucketing memory values. @@ -474,10 +505,13 @@ mod tests { assert!(HOST_STORE.get(&host_id).is_none()); let hosts_index = cache.hosts_index.read().unwrap(); - let left_over_host = hosts_index - .get(&core_key) - .and_then(|hosts_by_memory| hosts_by_memory.get(&memory_key)) - .and_then(|hosts| hosts.get(&checked_out_host.id)); + let left_over_host = hosts_index.get(&core_key).and_then(|bucket| { + let bucket_lock = bucket.lock().unwrap(); + bucket_lock + .get(&memory_key) + .and_then(|hosts| hosts.get(&checked_out_host.id)) + .copied() + }); assert!(left_over_host.is_none()) } @@ -653,4 +687,47 @@ mod tests { // The hosts should be different assert_ne!(result1.unwrap().id, result2.unwrap().id); } + + /// Concurrent check-ins on disjoint core buckets must not interfere — this + /// is the whole point of the sharded design. All N inserts complete and + /// land in distinct buckets. + #[test] + fn concurrent_check_ins_on_disjoint_buckets() { + use std::sync::Arc as StdArc; + + let cache = StdArc::new(HostCache::default()); + let mut handles = Vec::new(); + // Each thread checks in a host with a unique core count → distinct + // bucket → no per-bucket contention. + for cores in 1i32..=8 { + let cache = cache.clone(); + handles.push(thread::spawn(move || { + let host_id = Uuid::new_v4(); + let host = create_test_host(host_id, cores, ByteSize::gb(8)); + cache.check_in(host, false); + host_id + })); + } + + let host_ids: Vec = handles + .into_iter() + .map(|h| h.join().expect("worker panicked")) + .collect(); + + // Every bucket should be populated. + let outer = cache.hosts_index.read().unwrap(); + assert_eq!(outer.len(), 8, "one bucket per core size"); + for (core_key, bucket) in outer.iter() { + let bucket_lock = bucket.lock().unwrap(); + let total_hosts: usize = bucket_lock.values().map(|set| set.len()).sum(); + assert_eq!( + total_hosts, 1, + "bucket at core_key={core_key} should contain exactly one host" + ); + } + // And every checked-in host id is reachable. + for id in host_ids { + assert!(HOST_STORE.get(&id).is_some(), "host {id} present in store"); + } + } } diff --git a/rust/crates/scheduler/src/host_cache/messages.rs b/rust/crates/scheduler/src/host_cache/messages.rs index bb25ced14..c2bbafbcc 100644 --- a/rust/crates/scheduler/src/host_cache/messages.rs +++ b/rust/crates/scheduler/src/host_cache/messages.rs @@ -10,123 +10,25 @@ // or implied. See the License for the specific language governing permissions and limitations under // the License. -use actix::{Message, MessageResponse}; - -use bytesize::ByteSize; -use miette::Result; use uuid::Uuid; -use crate::{ - cluster_key::{ClusterKey, Tag}, - host_cache::HostCacheError, - models::{CoreSize, Host}, -}; +use crate::{cluster_key::ClusterKey, models::Host}; -/// Response containing a checked-out host and its associated cluster key. -/// -/// Returned when a host is successfully checked out from the cache. The cluster -/// key is needed to return the host to the correct cache group after use. +/// Result of a successful host check-out. /// -/// # Fields -/// -/// * `0` - ClusterKey identifying the cache group this host belongs to -/// * `1` - Host with reserved resources -#[derive(MessageResponse)] +/// The cluster key is needed to return the host to the correct cache group +/// after use. pub struct CheckedOutHost(pub ClusterKey, pub Host); -/// Actor message to check out a host from the cache. -/// -/// Requests a host that matches the specified resource requirements and passes -/// the validation function. The cache will search through groups for each tag -/// in priority order (MANUAL > HOSTNAME > ALLOC) until a suitable host is found. -/// -/// If not found in cache, the service will fetch from the database. The host -/// is removed from the cache and must be checked back in after use. -/// -/// # Fields -/// -/// * `facility_id` - Facility identifier for the cluster key -/// * `show_id` - Show identifier for the cluster key -/// * `tags` - List of tags to search (tried in priority order) -/// * `cores` - Minimum number of cores required -/// * `memory` - Minimum memory required -/// * `validation` - Function to validate additional host requirements -/// -/// # Returns -/// -/// * `Ok(CheckedOutHost)` - Successfully found and reserved a matching host -/// * `Err(HostCacheError::NoCandidateAvailable)` - No host meets requirements -/// * `Err(HostCacheError::FailedToQueryHostCache)` - Database query failed -#[derive(Message)] -#[rtype(result = "Result")] -pub struct CheckOut -where - F: Fn(&Host) -> bool, -{ - pub facility_id: Uuid, - pub show_id: Uuid, - pub tags: Vec, - pub cores: CoreSize, - pub memory: ByteSize, - pub validation: F, -} - -/// Payload for checking in a host or invalidating a host in the cache. -/// -/// Allows either returning a host with updated resources to the cache or -/// invalidating a host by its id, removing it from the cache entirely. -/// -/// # Variants -/// -/// * `Host(Host)` - Return a host with updated idle resource counts -/// * `Invalidate(Uuid)` - Invalidate and remove a host by id +/// Argument to [`HostCacheService::check_in_payload`] — either return a host +/// with updated resources, or invalidate a host by id (removing any +/// outstanding reservation but not re-adding the host to the cache). pub enum CheckInPayload { Host(Host), Invalidate(Uuid), } -/// Actor message to return a host to the cache or invalidate it. -/// -/// Returns a host back to its cache group with updated resource state, or -/// invalidates a host by id, removing it from the cache. When returning -/// a host, it is removed from the reservation list and becomes available for -/// checkout again. If the cache group has expired, the host is dropped. -/// -/// # Fields -/// -/// * `0` - ClusterKey identifying which cache group the host belongs to -/// * `1` - CheckInPayload specifying whether to return a host or invalidate by id -/// -/// # Returns -/// -/// * `()` - Operation completed successfully (host returned/invalidated or cache group expired) -#[derive(Message)] -#[rtype(result = "()")] -pub struct CheckIn(pub ClusterKey, pub CheckInPayload); - -/// Actor message to retrieve cache performance metrics. -/// -/// Requests the current cache hit/miss statistics from the HostCacheService. -/// Used for monitoring cache effectiveness. -/// -/// # Returns -/// -/// * `CacheRatioResponse` - Cache performance metrics -#[derive(Message)] -#[rtype(result = CacheRatioResponse)] -pub struct CacheRatio; - -/// Response containing cache performance statistics. -/// -/// Provides metrics about cache hit/miss rates for monitoring cache effectiveness. -/// A high hit ratio indicates the cache is effectively reducing database queries. -/// -/// # Fields -/// -/// * `hit` - Total number of cache hits (hosts found in cache) -/// * `miss` - Total number of cache misses (required database fetch) -/// * `hit_ratio` - Percentage of cache hits (0-100) -#[derive(MessageResponse)] +/// Snapshot of host-cache hit/miss statistics. pub struct CacheRatioResponse { #[allow(dead_code)] pub hit: u64, diff --git a/rust/crates/scheduler/src/host_cache/mod.rs b/rust/crates/scheduler/src/host_cache/mod.rs index e04e91a8d..04125bdb1 100644 --- a/rust/crates/scheduler/src/host_cache/mod.rs +++ b/rust/crates/scheduler/src/host_cache/mod.rs @@ -15,7 +15,6 @@ mod cache; pub mod messages; mod store; -use actix::{Actor, Addr}; pub use cache::HostCache; use miette::Diagnostic; @@ -25,29 +24,21 @@ use uuid::Uuid; use miette::Result; use tokio::sync::OnceCell; -use crate::host_cache::messages::CacheRatio; - pub use actor::HostCacheService; pub type HostId = Uuid; -static HOST_CACHE: OnceCell> = OnceCell::const_new(); +static HOST_CACHE: OnceCell = OnceCell::const_new(); -/// Gets or initializes the singleton host cache service actor. -/// -/// Returns a shared reference to the HostCacheService actor, creating it -/// if it doesn't exist. The service manages host availability caching and -/// checkout/checkin operations. -/// -/// # Returns +/// Gets or initializes the singleton host cache service. /// -/// * `Ok(Addr)` - Actor address for sending messages -/// * `Err(miette::Error)` - Failed to initialize the service -pub async fn host_cache_service() -> Result> { +/// Returns a cloned handle to the service. `HostCacheService` is `Clone` and +/// internally `Arc`-shared, so the returned value is cheap to pass around. +pub async fn host_cache_service() -> Result { HOST_CACHE .get_or_try_init(|| async { - let service = HostCacheService::new().await?.start(); - + let service = HostCacheService::new().await?; + service.spawn_background_tasks(); Ok(service) }) .await @@ -57,22 +48,11 @@ pub async fn host_cache_service() -> Result> { /// Retrieves the current cache hit ratio as a percentage. /// /// Returns the ratio of cache hits to total cache accesses (hits + misses) -/// as a percentage value between 0 and 100. -/// -/// # Returns -/// -/// * `usize` - Cache hit ratio percentage (0-100), or 0 if service unavailable +/// as a percentage value between 0 and 100, or 0 if the service is unavailable. #[allow(dead_code)] pub async fn hit_ratio() -> usize { - let host_cache = host_cache_service().await; - match host_cache { - Ok(cache) => { - cache - .send(CacheRatio) - .await - .expect("Actor is offline") - .hit_ratio - } + match host_cache_service().await { + Ok(cache) => cache.cache_ratio().hit_ratio, Err(_) => 0, } } diff --git a/rust/crates/scheduler/src/main.rs b/rust/crates/scheduler/src/main.rs index 842853c63..8844b15f5 100644 --- a/rust/crates/scheduler/src/main.rs +++ b/rust/crates/scheduler/src/main.rs @@ -263,10 +263,7 @@ fn main() -> miette::Result<()> { .build() .into_diagnostic()?; - // Spawn the actor system in the background - let actor_system = actix::System::with_tokio_rt(|| runtime); - - actor_system.block_on(async_main()) + runtime.block_on(async_main()) } async fn async_main() -> miette::Result<()> { @@ -363,9 +360,5 @@ async fn async_main() -> miette::Result<()> { }); let opts = JobQueueCli::from_args(); - let result = opts.run().await; - - actix::System::current().stop(); - - result + opts.run().await } diff --git a/rust/crates/scheduler/src/metrics/mod.rs b/rust/crates/scheduler/src/metrics/mod.rs index f96eabbb1..45611999c 100644 --- a/rust/crates/scheduler/src/metrics/mod.rs +++ b/rust/crates/scheduler/src/metrics/mod.rs @@ -13,9 +13,10 @@ use axum::{response::IntoResponse, routing::get, Router}; use lazy_static::lazy_static; use prometheus::{ - register_counter, register_counter_vec, register_histogram, Counter, CounterVec, Encoder, - Histogram, TextEncoder, + register_counter, register_counter_vec, register_gauge_vec, register_histogram, Counter, + CounterVec, Encoder, Gauge, GaugeVec, Histogram, TextEncoder, }; +use scc::HashMap as SccHashMap; use std::time::Duration; use tracing::{error, info}; @@ -70,6 +71,62 @@ lazy_static! { vec![0.001, 0.005, 0.01, 0.05, 0.1, 0.5, 1.0, 5.0, 10.0] ) .expect("Failed to register job_query_duration_seconds histogram"); + + // Cluster feed metrics from cluster.rs + pub static ref CLUSTER_POLLS_TOTAL: CounterVec = register_counter_vec!( + "scheduler_cluster_polls_total", + "Total number of times each cluster has been emitted by the priority-queue feed", + &["show_id", "facility_id"] + ) + .expect("Failed to register cluster_polls_total counter"); + + /// Global counter mirroring the in-memory `CLUSTER_ROUNDS` atomic. + /// Equivalent to `sum(scheduler_cluster_polls_total)` but exposed for cheap + /// querying / alerting on overall feed throughput. + pub static ref CLUSTER_ROUNDS_TOTAL: Counter = register_counter!( + "scheduler_cluster_rounds_total", + "Total number of cluster pops across all clusters" + ) + .expect("Failed to register cluster_rounds_total counter"); + + /// Most-recent job count for each cluster — useful to inspect the productivity + /// bias' effect on dispatch ordering. + pub static ref CLUSTER_LAST_DISPATCHED_JOBS: GaugeVec = register_gauge_vec!( + "scheduler_cluster_last_dispatched_jobs", + "Jobs dispatched in the most recent processing cycle for each cluster", + &["show_id", "facility_id"] + ) + .expect("Failed to register cluster_last_dispatched_jobs gauge"); + + /// Per-cluster `Counter` handles cached so the per-call `Uuid::to_string()` + /// allocations into `with_label_values` happen at most once per + /// `(show_id, facility_id)` pair, instead of on every dispatch loop pop. + /// Subsequent increments just bump an `Arc`-backed counter. + static ref CLUSTER_POLL_COUNTERS: + SccHashMap<(uuid::Uuid, uuid::Uuid), Counter> = SccHashMap::new(); + + /// Same caching as above for the per-cluster `last_dispatched_jobs` gauge. + static ref CLUSTER_LAST_DISPATCHED_GAUGES: + SccHashMap<(uuid::Uuid, uuid::Uuid), Gauge> = SccHashMap::new(); + + // Matcher metrics mirroring the in-process atomics in pipeline/matcher.rs. + pub static ref HOSTS_ATTEMPTED_TOTAL: Counter = register_counter!( + "scheduler_hosts_attempted_total", + "Total host-candidate selection attempts across all layers" + ) + .expect("Failed to register hosts_attempted_total counter"); + + pub static ref WASTED_ATTEMPTS_TOTAL: Counter = register_counter!( + "scheduler_wasted_attempts_total", + "Jobs that processed zero layers and where every layer was free (i.e. no host candidate found)" + ) + .expect("Failed to register wasted_attempts_total counter"); + + pub static ref LAYERS_SKIPPED_BY_LOCK_TOTAL: Counter = register_counter!( + "scheduler_layers_skipped_by_lock_total", + "Layers skipped because another scheduler held the row lock (expected in multi-replica deployments, not wasted work)" + ) + .expect("Failed to register layers_skipped_by_lock_total counter"); } /// Handler for the /metrics endpoint @@ -166,3 +223,61 @@ pub fn observe_time_to_book(duration: Duration) { pub fn observe_job_query_duration(duration: Duration) { JOB_QUERY_DURATION_SECONDS.observe(duration.as_secs_f64()); } + +/// Helper function to increment the cluster polls counter. +/// +/// Bumps both the per-cluster `scheduler_cluster_polls_total` and the global +/// `scheduler_cluster_rounds_total` so dashboards can use whichever is cheaper. +/// +/// The per-cluster `Counter` is cached in [`CLUSTER_POLL_COUNTERS`] so the +/// `Uuid::to_string()` allocations only run on the first sighting of each +/// `(show_id, facility_id)` pair. +#[inline] +pub fn increment_cluster_polls(show_id: &uuid::Uuid, facility_id: &uuid::Uuid) { + let key = (*show_id, *facility_id); + let entry = CLUSTER_POLL_COUNTERS.entry_sync(key).or_insert_with(|| { + CLUSTER_POLLS_TOTAL.with_label_values(&[&show_id.to_string(), &facility_id.to_string()]) + }); + entry.get().inc(); + CLUSTER_ROUNDS_TOTAL.inc(); +} + +/// Records the number of jobs dispatched in the most recent cycle for a cluster. +/// +/// Uses the [`CLUSTER_LAST_DISPATCHED_GAUGES`] cache to avoid per-call +/// `Uuid::to_string()` allocations on the control-loop hot path. +#[inline] +pub fn set_cluster_last_dispatched_jobs( + show_id: &uuid::Uuid, + facility_id: &uuid::Uuid, + count: usize, +) { + let key = (*show_id, *facility_id); + let entry = CLUSTER_LAST_DISPATCHED_GAUGES + .entry_sync(key) + .or_insert_with(|| { + CLUSTER_LAST_DISPATCHED_JOBS + .with_label_values(&[&show_id.to_string(), &facility_id.to_string()]) + }); + entry.get().set(count as f64); +} + +/// Records a host-candidate selection attempt. +#[inline] +pub fn increment_hosts_attempted() { + HOSTS_ATTEMPTED_TOTAL.inc(); +} + +/// Records a job that processed zero layers AND none were skipped due to a +/// peer-scheduler lock, i.e. a real "no host candidate" miss, not contention. +#[inline] +pub fn increment_wasted_attempts() { + WASTED_ATTEMPTS_TOTAL.inc(); +} + +/// Records a layer skipped because another scheduler currently holds its row lock. +/// Expected in multi-replica deployments — the work is happening elsewhere, not lost. +#[inline] +pub fn increment_layers_skipped_by_lock() { + LAYERS_SKIPPED_BY_LOCK_TOTAL.inc(); +} diff --git a/rust/crates/scheduler/src/pipeline/dispatcher/actor.rs b/rust/crates/scheduler/src/pipeline/dispatcher/actor.rs index e020644d0..cd1bb2aec 100644 --- a/rust/crates/scheduler/src/pipeline/dispatcher/actor.rs +++ b/rust/crates/scheduler/src/pipeline/dispatcher/actor.rs @@ -10,7 +10,6 @@ // or implied. See the License for the specific language governing permissions and limitations under // the License. -use actix::{Actor, ActorFutureExt, Handler, ResponseActFuture, WrapFuture}; use bytesize::{ByteSize, KIB, MIB}; use chrono::Utc; use futures::FutureExt; @@ -62,66 +61,28 @@ pub struct RqdDispatcherService { dry_run_mode: bool, } -impl Actor for RqdDispatcherService { - type Context = actix::Context; - - fn started(&mut self, _ctx: &mut Self::Context) { - info!("RqdDispatcherService actor started"); - } - - fn stopped(&mut self, _ctx: &mut Self::Context) { - info!("RqdDispatcherService actor stopped"); - } -} - -impl Handler for RqdDispatcherService { - type Result = ResponseActFuture>; - - fn handle(&mut self, msg: DispatchLayerMessage, _ctx: &mut Self::Context) -> Self::Result { +impl RqdDispatcherService { + /// Dispatches a layer's frames to the given host. + /// + /// The host advisory lock is acquired inside the per-proc transaction + /// (see [`dispatch_virtual_proc`]), not at the layer level, so it + /// auto-releases on commit/rollback and doesn't pin a connection during + /// the gRPC call. + pub async fn dispatch_layer( + &self, + msg: DispatchLayerMessage, + ) -> Result { let DispatchLayerMessage { layer, host } = msg; - - let dispatcher = self.clone(); debug!( - "Received dispatch message for layer {} on host {}", + "Received dispatch request for layer {} on host {}", layer.layer_name, host.name ); - - Box::pin( - async move { - // Note: In a real implementation, we would need to coordinate with a transaction manager - // or pass the transaction through the message. For now, we'll create a new transaction - // within the dispatcher's database operations. - - // Create a database transaction scope - let mut transaction = begin_transaction() - .await - .map_err(DispatchError::DbFailure)?; - - match dispatcher.dispatch(&layer, host, &mut transaction).await { - Ok((updated_host, updated_layer)) => { - // Commit the transaction - transaction - .commit() - .await - .map_err(DispatchError::DbFailure)?; - - Ok(DispatchResult { - updated_host, - updated_layer, - }) - } - Err(e) => { - // Rollback the transaction on error - if let Err(rollback_err) = transaction.rollback().await { - error!("Failed to rollback transaction: {}", rollback_err); - } - Err(e) - } - } - } - .into_actor(self) - .map(|result, _actor, _ctx| result), - ) + self.dispatch(&layer, host) + .await + .map(|(updated_host, updated_layer)| DispatchResult { + updated_host, + updated_layer, + }) } } @@ -158,52 +119,34 @@ impl RqdDispatcherService { }) } - /// Dispatches a layer to a specific host with proper locking and error handling. + /// Dispatches a layer to a specific host. /// - /// The dispatch process: - /// 1. Acquires an exclusive lock on the target host - /// 2. Performs the actual dispatch operation - /// 3. Ensures the host lock is always released, even on panic or failure + /// The host advisory lock is acquired per-proc as a transaction-scoped + /// lock inside [`dispatch_virtual_proc`], not at the layer level. This + /// keeps the lock hold time bounded to the per-frame DB transaction + /// (committed before the gRPC call) instead of spanning the entire + /// layer's dispatch. /// /// # Arguments /// * `layer` - The layer containing frames to dispatch /// * `host` - The target host for frame execution /// /// # Returns - /// * `Ok(())` on successful dispatch + /// * `Ok((updated_host, remaining_layer))` on successful dispatch /// * `Err(DispatchError)` on various failure conditions async fn dispatch( &self, layer: &DispatchLayer, host: Host, - transaction: &mut Transaction<'_, Postgres>, ) -> Result<(Host, DispatchLayer), DispatchError> { let host_id = host.id; let host_disp = format!("{}", &host); let layer_disp = format!("{}", &layer); - // Acquire lock first - if !self - .host_dao - .lock(transaction, &host.id) - .await - .map_err(DispatchError::Failure)? - { - return Err(DispatchError::HostLock(host.name.clone())); - } - - // Ensure unlock is always called, regardless of panics or fails let result = std::panic::AssertUnwindSafe(self.dispatch_inner(layer, host)) .catch_unwind() .await; - // Always attempt to unlock, regardless of outcome. Failing to unlock here can be ignored as - // endint the transaction will automatically unlock. - if let Err(unlock_err) = self.host_dao.unlock(transaction, &host_id).await { - trace!("Failed to unlock host {}: {}", host_disp, unlock_err); - } - - // Handle the result from dispatch_inner match result { Ok(result) => { if result.is_ok() { @@ -481,6 +424,24 @@ impl RqdDispatcherService { DispatchVirtualProcError::FailedToStartOnDb(DispatchError::DbFailure(e)) })?; + // Acquire a transaction-scoped advisory lock on the host. Using + // pg_try_advisory_xact_lock so the lock is released automatically + // when this proc transaction commits or rolls back, and concurrent + // dispatchers fail fast on the same host instead of waiting. + let host_locked = self + .host_dao + .try_xact_lock(&mut proc_transaction, &host.id) + .await + .map_err(|e| { + DispatchVirtualProcError::FailedToStartOnDb(DispatchError::Failure(e)) + })?; + if !host_locked { + let _ = proc_transaction.rollback().await; + return Err(DispatchVirtualProcError::FailedToStartOnDb( + DispatchError::HostLock(host.name.clone()), + )); + } + // Confirm the layer still has limits before proceeding if !self .layer_dao diff --git a/rust/crates/scheduler/src/pipeline/dispatcher/messages.rs b/rust/crates/scheduler/src/pipeline/dispatcher/messages.rs index 1c4415ece..fec8be56e 100644 --- a/rust/crates/scheduler/src/pipeline/dispatcher/messages.rs +++ b/rust/crates/scheduler/src/pipeline/dispatcher/messages.rs @@ -10,13 +10,7 @@ // or implied. See the License for the specific language governing permissions and limitations under // the License. -use actix::{Message, MessageResponse}; -use miette::Result; - -use crate::{ - models::{DispatchLayer, Host}, - pipeline::dispatcher::error::DispatchError, -}; +use crate::models::{DispatchLayer, Host}; /// Actor message to dispatch a layer's frames to a specific host. /// @@ -37,8 +31,6 @@ use crate::{ /// /// * `Ok(DispatchResult)` - Successfully dispatched frames with updated state /// * `Err(DispatchError)` - Dispatch failed due to various errors -#[derive(Message)] -#[rtype(result = "Result")] pub struct DispatchLayerMessage { pub layer: DispatchLayer, pub host: Host, @@ -55,7 +47,7 @@ pub struct DispatchLayerMessage { /// * `updated_host` - Host with updated idle resource counts after dispatch /// * `updated_layer` - Layer with dispatched frames removed from the frames list /// * `dispatched_frames` - List of frame names that were successfully dispatched -#[derive(MessageResponse, Debug)] +#[derive(Debug)] pub struct DispatchResult { pub updated_host: Host, pub updated_layer: DispatchLayer, diff --git a/rust/crates/scheduler/src/pipeline/dispatcher/mod.rs b/rust/crates/scheduler/src/pipeline/dispatcher/mod.rs index d524f5b93..529f2fece 100644 --- a/rust/crates/scheduler/src/pipeline/dispatcher/mod.rs +++ b/rust/crates/scheduler/src/pipeline/dispatcher/mod.rs @@ -18,46 +18,18 @@ pub mod messages; use miette::Result; use std::sync::Arc; -// Actor and singleton support -use actix::{Actor, Addr}; pub use actor::RqdDispatcherService; use tokio::sync::OnceCell; use crate::dao::{LayerDao, ProcDao}; -static RQD_DISPATCHER: OnceCell> = OnceCell::const_new(); +static RQD_DISPATCHER: OnceCell = OnceCell::const_new(); -/// Singleton getter for the RQD dispatcher service +/// Singleton getter for the RQD dispatcher service. /// -/// Creates and returns a singleton instance of the RqdDispatcherService actor. -/// The service is initialized with configuration from CONFIG on first access. -/// -/// # Usage Example -/// ```rust,ignore -/// use crate::pipeline::dispatcher::{rqd_dispatcher_service, messages::DispatchLayer}; -/// use crate::models::{DispatchLayer as ModelDispatchLayer, Host}; -/// -/// async fn dispatch_example() -> miette::Result<()> { -/// let dispatcher = rqd_dispatcher_service().await?; -/// -/// let message = DispatchLayer { -/// layer: my_layer, -/// host: my_host, -/// transaction_id: "tx-123".to_string(), -/// }; -/// -/// match dispatcher.send(message).await { -/// Ok(Ok(result)) => { -/// println!("Dispatched {} frames", result.dispatched_frames.len()); -/// } -/// Ok(Err(e)) => println!("Dispatch error: {}", e), -/// Err(e) => println!("Actor mailbox error: {}", e), -/// } -/// -/// Ok(()) -/// } -/// ``` -pub async fn rqd_dispatcher_service() -> Result, miette::Error> { +/// Returns a cloned handle to the service. `RqdDispatcherService` is `Clone` +/// and internally `Arc`-shared, so the returned value is cheap to pass around. +pub async fn rqd_dispatcher_service() -> Result { RQD_DISPATCHER .get_or_try_init(|| async { use crate::{ @@ -70,16 +42,14 @@ pub async fn rqd_dispatcher_service() -> Result, miet let host_dao = Arc::new(HostDao::new().await?); let proc_dao = Arc::new(ProcDao::new().await?); - let service = RqdDispatcherService::new( + RqdDispatcherService::new( frame_dao, layer_dao, host_dao, proc_dao, CONFIG.rqd.dry_run_mode, ) - .await?; - - Ok(service.start()) + .await }) .await .cloned() diff --git a/rust/crates/scheduler/src/pipeline/entrypoint.rs b/rust/crates/scheduler/src/pipeline/entrypoint.rs index ba7ef55e4..1b527486f 100644 --- a/rust/crates/scheduler/src/pipeline/entrypoint.rs +++ b/rust/crates/scheduler/src/pipeline/entrypoint.rs @@ -12,7 +12,6 @@ use std::sync::atomic::{AtomicUsize, Ordering}; use std::sync::Arc; -use std::time::Duration; use futures::{stream, StreamExt}; use tokio::sync::mpsc; @@ -69,9 +68,8 @@ pub async fn run(cluster_feed: ClusterFeed) -> miette::Result<()> { ) .await; - match jobs { + let (processed_count, should_stop) = match jobs { Ok(jobs) => { - // Track number of jobs queried metrics::increment_jobs_queried(jobs.len()); let processed_jobs = AtomicUsize::new(0); @@ -87,33 +85,69 @@ pub async fn run(cluster_feed: ClusterFeed) -> miette::Result<()> { }, ) .await; - // If no jobs got processed, sleep to prevent hammering the database with - // queries with no outcome - if processed_jobs.load(Ordering::Relaxed) == 0 { - let _ = feed_sender - .send(FeedMessage::Sleep(cluster, Duration::from_secs(3))) - .await; - } - - // If empty_jobs_cycles_before_quiting is set, quit if nothing got processed - if let Some(limit) = CONFIG.queue.empty_job_cycles_before_quiting { - // Count cycles that couldn't find any job - if processed_jobs.load(Ordering::Relaxed) == 0 { - cycles_without_jobs.fetch_add(1, Ordering::Relaxed); - } else { - cycles_without_jobs.store(0, Ordering::Relaxed); - } - // Cancel stream processing after empty cycles - if cycles_without_jobs.load(Ordering::Relaxed) >= limit { - let _ = feed_sender.send(FeedMessage::Stop()).await; + let processed = processed_jobs.load(Ordering::Relaxed); + + let stop = match CONFIG.queue.empty_job_cycles_before_quiting { + Some(limit) => { + // Combine the update and read into a single atomic + // operation so concurrent clusters can't race between + // a fetch_add/store and a separate load. SeqCst gives + // us a strict total order across all clusters. + let new_count = if processed == 0 { + cycles_without_jobs.fetch_add(1, Ordering::SeqCst) + 1 + } else { + cycles_without_jobs.swap(0, Ordering::SeqCst); + 0 + }; + new_count >= limit } - } + None => false, + }; + + (processed, stop) } Err(err) => { - let _ = feed_sender.send(FeedMessage::Stop()).await; - error!("Failed to fetch job: {}", err); + error!( + "Failed to fetch jobs for cluster {}: {}", + cluster, err + ); + // Don't halt the scheduler for a single cluster's fetch + // error (network blip, transient DB load). Treat it as + // an empty cycle so the cluster gets the configured + // back-off and other clusters keep running, but feed + // the same atomic counter the success path uses so the + // `empty_job_cycles_before_quiting` safety net actually + // trips if fetches keep failing across the feed. + let stop = match CONFIG.queue.empty_job_cycles_before_quiting { + Some(limit) => { + let new_count = + cycles_without_jobs.fetch_add(1, Ordering::SeqCst) + 1; + new_count >= limit + } + None => false, + }; + (0, stop) } + }; + + // Re-insert the cluster into the priority queue. An empty cycle gets a + // back-off so neighbors aren't starved while this cluster waits for work. + let sleep = if processed_count == 0 { + Some(CONFIG.queue.cluster_empty_back_off) + } else { + None + }; + let _ = feed_sender + .send(FeedMessage::Done { + cluster, + processed_jobs: processed_count, + sleep, + }) + .await; + + if should_stop { + let _ = feed_sender.send(FeedMessage::Stop).await; } } }) diff --git a/rust/crates/scheduler/src/pipeline/layer_permit.rs b/rust/crates/scheduler/src/pipeline/layer_permit.rs deleted file mode 100644 index 72db23f75..000000000 --- a/rust/crates/scheduler/src/pipeline/layer_permit.rs +++ /dev/null @@ -1,226 +0,0 @@ -// Copyright Contributors to the OpenCue Project -// -// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except -// in compliance with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software distributed under the License -// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express -// or implied. See the License for the specific language governing permissions and limitations under -// the License. - -use actix::{Actor, AsyncContext, Handler, Message, WrapFuture}; -use scc::HashMap; -use std::{ - sync::Arc, - time::{Duration, SystemTime}, -}; -use tokio::sync::OnceCell; -use tracing::{debug, info}; -use uuid::Uuid; - -use miette::Result; - -/// Actor message to request a permit for a layer. -/// -/// Requests a permit to process a specific layer. If the layer is already -/// being processed by another task (permit hasn't expired), returns false. -/// Otherwise, grants the permit and returns true. -/// -/// # Fields -/// -/// * `id` - Unique identifier for the layer -/// * `duration` - How long the permit should be valid -/// -/// # Returns -/// -/// * `bool` - true if permit was granted, false if layer is already locked -#[derive(Message)] -#[rtype(result = "bool")] -pub struct Request { - pub id: Uuid, - pub duration: Duration, -} - -/// Actor message to release a permit for a layer. -/// -/// # Fields -/// -/// * `id` - Unique identifier for the layer -/// -/// # Returns -/// -/// * `bool` - true if permit was release, false if there wasn't a valid permit -#[derive(Message)] -#[rtype(result = "bool")] -pub struct Release { - pub id: Uuid, -} - -/// Internal representation of a layer permit. -/// -/// Tracks when a permit was issued and how long it's valid for. -struct LayerPermit { - granted_at: SystemTime, - duration: Duration, -} - -impl LayerPermit { - /// Creates a new permit with the specified duration. - fn new(duration: Duration) -> Self { - LayerPermit { - granted_at: SystemTime::now(), - duration, - } - } - - /// Checks if the permit has expired. - fn expired(&self) -> bool { - self.granted_at.elapsed().unwrap_or_default() > self.duration - } -} - -/// Service for managing layer processing permits using the Actor model. -/// -/// Prevents multiple tasks from processing the same layer concurrently by -/// issuing time-limited permits. Each layer ID can only have one active -/// permit at a time. -#[derive(Clone)] -pub struct LayerPermitService { - permits: Arc>, -} - -impl Actor for LayerPermitService { - type Context = actix::Context; - - fn started(&mut self, ctx: &mut Self::Context) { - let service = self.clone(); - - // Run cleanup every 5 minutes - ctx.run_interval(Duration::from_secs(5 * 60), move |_act, ctx| { - let service = service.clone(); - let actor_clone = service.clone(); - ctx.spawn( - async move { service.cleanup_expired_permits().await }.into_actor(&actor_clone), - ); - }); - - info!("LayerPermitService actor started"); - } - - fn stopped(&mut self, _ctx: &mut Self::Context) { - info!("LayerPermitService actor stopped"); - } -} - -impl Handler for LayerPermitService { - type Result = bool; - - fn handle(&mut self, msg: Request, _ctx: &mut Self::Context) -> Self::Result { - let Request { id, duration } = msg; - - // Check if there's an existing permit - let existing = self.permits.read_sync(&id, |_, permit| { - if permit.expired() { - // Permit exists but has expired - None - } else { - // Permit exists and is still valid - Some(()) - } - }); - - match existing { - Some(Some(())) => { - // Valid permit already exists - deny request - debug!("Layer {} already has an active permit", id); - false - } - _ => { - // No valid permit exists - grant new permit - let new_permit = LayerPermit::new(duration); - let _ = self.permits.insert_sync(id, new_permit); - debug!("Granted permit for layer {} (duration: {:?})", id, duration); - true - } - } - } -} - -impl Handler for LayerPermitService { - type Result = bool; - - fn handle(&mut self, msg: Release, _ctx: &mut Self::Context) -> Self::Result { - let Release { id } = msg; - - // Check if there's an existing permit - let existing = self.permits.remove_sync(&id); - - match existing { - Some((_, permit)) if !permit.expired() => { - // Valid permit removed - true - } - _ => { - // No valid permit found - false - } - } - } -} - -impl LayerPermitService { - /// Creates a new LayerPermitService with an empty permit map. - pub fn new() -> Self { - LayerPermitService { - permits: Arc::new(HashMap::new()), - } - } - - /// Removes expired permits from the map. - /// - /// Runs periodically to prevent unbounded growth of the permit map. - async fn cleanup_expired_permits(&self) { - let mut expired_keys = Vec::new(); - - // Collect expired permit IDs - self.permits.iter_sync(|id, permit| { - if permit.expired() { - expired_keys.push(*id); - } - true - }); - - // Remove expired permits - for id in &expired_keys { - let _ = self.permits.remove_sync(id); - } - - if !expired_keys.is_empty() { - debug!("Cleaned up {} expired layer permits", expired_keys.len()); - } - } -} - -static LAYER_PERMIT_SERVICE: OnceCell> = OnceCell::const_new(); - -/// Gets or initializes the singleton layer permit service actor. -/// -/// Returns a shared reference to the LayerPermitService actor, creating it -/// if it doesn't exist. The service manages layer processing permits to -/// prevent concurrent processing of the same layer. -/// -/// # Returns -/// -/// * `Ok(Addr)` - Actor address for sending messages -/// * `Err(miette::Error)` - Failed to initialize the service -pub async fn layer_permit_service() -> Result> { - LAYER_PERMIT_SERVICE - .get_or_try_init(|| async { - let service = LayerPermitService::new().start(); - Ok(service) - }) - .await - .cloned() -} diff --git a/rust/crates/scheduler/src/pipeline/matcher.rs b/rust/crates/scheduler/src/pipeline/matcher.rs index 3c2761e75..bf9e7cba1 100644 --- a/rust/crates/scheduler/src/pipeline/matcher.rs +++ b/rust/crates/scheduler/src/pipeline/matcher.rs @@ -10,12 +10,9 @@ // or implied. See the License for the specific language governing permissions and limitations under // the License. -use std::{ - sync::{ - atomic::{AtomicUsize, Ordering}, - Arc, - }, - time::Duration, +use std::sync::{ + atomic::{AtomicUsize, Ordering}, + Arc, }; use uuid::Uuid; @@ -28,20 +25,16 @@ use crate::{ host_cache::{host_cache_service, messages::*, HostCacheService}, metrics, models::{CoreSize, DispatchJob, DispatchLayer, Host}, - pipeline::{ - dispatcher::{ - error::DispatchError, - messages::{DispatchLayerMessage, DispatchResult}, - rqd_dispatcher_service, RqdDispatcherService, - }, - layer_permit::{layer_permit_service, LayerPermitService, Release, Request}, + pipeline::dispatcher::{ + error::DispatchError, + messages::{DispatchLayerMessage, DispatchResult}, + rqd_dispatcher_service, RqdDispatcherService, }, resource_accounting::{resource_accounting_service, ResourceAccountingService}, }; -use actix::Addr; use miette::{Context, Result}; use tokio::sync::Semaphore; -use tracing::{debug, error, info, trace}; +use tracing::{debug, error, info, trace, warn}; pub static HOSTS_ATTEMPTED: AtomicUsize = AtomicUsize::new(0); pub static WASTED_ATTEMPTS: AtomicUsize = AtomicUsize::new(0); @@ -54,10 +47,9 @@ pub static WASTED_ATTEMPTS: AtomicUsize = AtomicUsize::new(0); /// - Matching layers to available host candidates /// - Dispatching frames to selected hosts via the RQD dispatcher pub struct MatchingService { - host_service: Addr, - layer_permit_service: Addr, + host_service: HostCacheService, layer_dao: LayerDao, - dispatcher_service: Addr, + dispatcher_service: RqdDispatcherService, concurrency_semaphore: Arc, resource_accounting_service: Arc, } @@ -78,11 +70,13 @@ impl MatchingService { pub async fn new() -> Result { let layer_dao = LayerDao::new().await?; let host_service = host_cache_service().await?; - let layer_permit_service = layer_permit_service().await?; - // Limiting the concurrency here is necessary to avoid consuming the entire - // database connection pool - let max_concurrent_transactions = (CONFIG.database.pool_size as usize).saturating_sub(1); + // Each concurrent layer now holds two connections: one for the + // SKIP LOCKED layer lock (held for the whole dispatch) and one for + // its per-proc transaction. Halve the semaphore relative to the + // pool so we don't risk exhausting it. + let pool_size = CONFIG.database.pool_size as usize; + let max_concurrent_transactions = (pool_size / 2).saturating_sub(1).max(1); let dispatcher_service = rqd_dispatcher_service().await?; let resource_accounting_service = resource_accounting_service() @@ -91,7 +85,6 @@ impl MatchingService { Ok(MatchingService { host_service, - layer_permit_service, layer_dao, dispatcher_service, concurrency_semaphore: Arc::new(Semaphore::new(max_concurrent_transactions)), @@ -124,12 +117,16 @@ impl MatchingService { match layers { Ok(layers) => { let processed_layers = AtomicUsize::new(0); + // Track peer-lock skips separately from real waste — see the + // wasted-attempts guard below. + let layers_skipped_by_lock = AtomicUsize::new(0); // Stream elegible layers from this job and dispatch one by one for layer in layers { let layer_disp = format!("{}", layer); - // Limiting the concurrency here is necessary to avoid consuming the entire - // database connection pool + // Limit concurrent layer processing to stay within the DB + // connection budget — each layer holds a SKIP-LOCKED lock + // transaction plus its per-proc transactions. let _permit = self .concurrency_semaphore .acquire() @@ -138,44 +135,49 @@ impl MatchingService { let cluster = cluster.clone(); - // Holding a permit for a layer is intended to eliminate a race condition - // between concurrent cluster_rounds attempting to process the same layer. - // The race condition is mitigated, but not complitely avoided, as the permit - // is acquired after the layers and frames have been queried. Acquiring the - // permit before querying would require breaking 'query_layers' into separate - // queries, one per layer, which greatly impacts performance. The rare cases - // that race each other are controlled by the frame.int_version lock on - // frame_dao.lock_for_update - let layer_permit = self - .layer_permit_service - .send(Request { - id: layer.id, - duration: Duration::from_secs(2 * layer.frames.len() as u64), - }) - .await - .expect("Layer permit service is not available"); - - if layer_permit { - let layer_id = layer.id; - self.process_layer(layer, cluster).await; - debug!("{}: Processed layer", layer_disp); - - self.layer_permit_service - .send(Release { id: layer_id }) - .await - .expect("Layer permit service is not available"); - - processed_layers.fetch_add(1, Ordering::Relaxed); - } else { - debug!( - "Layer skipped. {} already being processed by another task.", - layer - ); + // Try to acquire a row-level SELECT … FOR UPDATE SKIP LOCKED + // on the layer. This both deduplicates dispatch across + // concurrent schedulers (single-process or multi-replica) + // and replaces the in-memory LayerPermitService. + let lock_guard = match self.layer_dao.try_lock_layer(layer.id).await { + Ok(Some(guard)) => guard, + Ok(None) => { + debug!( + "Layer skipped. {} already being processed by another scheduler.", + layer + ); + layers_skipped_by_lock.fetch_add(1, Ordering::Relaxed); + metrics::increment_layers_skipped_by_lock(); + continue; + } + Err(err) => { + error!("Failed to lock layer {}: {:?}", layer_disp, err); + continue; + } + }; + + self.process_layer(layer, cluster).await; + debug!("{}: Processed layer", layer_disp); + + if let Err(err) = lock_guard.release().await { + // Releasing only fails if the connection is dead, in which + // case sqlx has already rolled back implicitly so the lock + // is gone — log and move on. + warn!("Failed to release layer lock for {}: {:?}", layer_disp, err); } + + processed_layers.fetch_add(1, Ordering::Relaxed); } - if processed_layers.load(Ordering::Relaxed) == 0 { + // Only flag a job as "wasted" when we actually attempted + // dispatch but found nothing to do. Peer-lock skips are not + // waste — the work is being done on another scheduler — so + // they don't count against this metric. + if processed_layers.load(Ordering::Relaxed) == 0 + && layers_skipped_by_lock.load(Ordering::Relaxed) == 0 + { WASTED_ATTEMPTS.fetch_add(1, Ordering::Relaxed); + metrics::increment_wasted_attempts(); debug!("Job {} didn't process any layer", job_disp); } } @@ -250,6 +252,7 @@ impl MatchingService { while try_again && attempts > 0 { attempts -= 1; HOSTS_ATTEMPTED.fetch_add(1, Ordering::Relaxed); + metrics::increment_hosts_attempted(); // Take ownership of the layer for this iteration let layer = current_layer_version @@ -285,13 +288,13 @@ impl MatchingService { let host_candidate = self .host_service - .send(CheckOut { - facility_id: layer.facility_id, - show_id: layer.show_id, + .check_out( + layer.facility_id, + layer.show_id, tags, - cores: cores_requested, - memory: layer.mem_min, - validation: move |host| { + cores_requested, + layer.mem_min, + move |host| { Self::validate_match( host, &layer_id, @@ -301,9 +304,8 @@ impl MatchingService { os.as_deref(), ) }, - }) - .await - .expect("Host Cache actor is unresponsive"); + ) + .await; match host_candidate { Ok(CheckedOutHost(cluster_key, host)) => { @@ -314,21 +316,20 @@ impl MatchingService { match self .dispatcher_service - .send(DispatchLayerMessage { + .dispatch_layer(DispatchLayerMessage { layer, // Move ownership here host, }) .await - .expect("Dispatcher actor is unresponsive") { Ok(DispatchResult { updated_host, updated_layer, }) => { - self.host_service - .send(CheckIn(cluster_key, CheckInPayload::Host(updated_host))) - .await - .expect("Host Cache actor is unresponsive"); + self.host_service.check_in_payload( + cluster_key, + CheckInPayload::Host(updated_host), + ); if updated_layer.frames.is_empty() { // Stop on the first successful attempt @@ -357,13 +358,10 @@ impl MatchingService { &layer_job_id, &host_before_dispatch, ); - self.host_service - .send(CheckIn( - cluster_key, - CheckInPayload::Invalidate(host_before_dispatch.id), - )) - .await - .expect("Host Cache actor is unresponsive"); + self.host_service.check_in_payload( + cluster_key, + CheckInPayload::Invalidate(host_before_dispatch.id), + ); try_again = false; // Can't continue without the layer } }; @@ -383,27 +381,21 @@ impl MatchingService { try_again = false; } crate::host_cache::HostCacheError::FailedToQueryHostCache(err) => { - // CRITICAL: Database connection failure in host cache query - // - // When the host cache cannot query the database, the matching service - // cannot reliably find hosts for job dispatch. This is a systemic - // failure that affects all job processing. - // - // We panic here rather than propagating an error because: - // 1. The entire service is compromised - no jobs can be matched - // 2. Graceful degradation is not possible without host candidates - // 3. The orchestration layer (e.g., Kubernetes) should restart the - // service to re-establish database connectivity - // 4. Bubbling the error up would add unnecessary complexity for a - // condition that always requires service restart - // - // This allows the orchestration layer to handle the failure through - // its standard restart policies rather than attempting partial recovery. - panic!( - "Host cache failed to query database - service is non-functional \ - and requires restart. Error: {}", + // Transient DB query failure. The host_cache circuit breaker + // applies exponential backoff and will short-circuit further + // queries until it recovers. After + // CONFIG.host_cache.db_circuit_breaker.failure_threshold + // consecutive failures the host cache exits the process cleanly + // so the orchestrator (k8s, systemd) restarts us. From the + // matcher's perspective we just skip this layer for now — + // other clusters keep running, and the next cycle may find + // candidates once the breaker closes again. + warn!( + "Host cache query failed for layer {} — skipping for now: {}", + current_layer_version.as_ref().unwrap(), err - ) + ); + try_again = false; } } } diff --git a/rust/crates/scheduler/src/pipeline/mod.rs b/rust/crates/scheduler/src/pipeline/mod.rs index 9a34c0db8..53c2ecd1d 100644 --- a/rust/crates/scheduler/src/pipeline/mod.rs +++ b/rust/crates/scheduler/src/pipeline/mod.rs @@ -12,7 +12,6 @@ mod dispatcher; pub mod entrypoint; -mod layer_permit; mod matcher; pub use entrypoint::run; diff --git a/rust/crates/scheduler/tests/stress_tests.rs b/rust/crates/scheduler/tests/stress_tests.rs index ad47917ea..ffd0f32e9 100644 --- a/rust/crates/scheduler/tests/stress_tests.rs +++ b/rust/crates/scheduler/tests/stress_tests.rs @@ -68,7 +68,7 @@ mod stress_test { clean_up_test_data(test_prefix).await } - #[actix::test] + #[tokio::test(flavor = "multi_thread", worker_threads = 8)] // #[traced_test] async fn test_stress_small() { let desc = TestDescription { diff --git a/rust/crates/scheduler/tests/util.rs b/rust/crates/scheduler/tests/util.rs index a7bb6c29c..28475282b 100644 --- a/rust/crates/scheduler/tests/util.rs +++ b/rust/crates/scheduler/tests/util.rs @@ -97,6 +97,8 @@ pub fn create_test_config() -> Config { hostname_tags_chunk_size: 20, host_candidate_attempts_per_layer: 5, empty_job_cycles_before_quiting: Some(20), + cluster_empty_back_off: Duration::from_secs(3), + cluster_productivity_bias: true, mem_reserved_min: bytesize::ByteSize::mb(250), subscription_recalculation_interval: Duration::from_secs(3), resource_recalculation_interval: Duration::from_secs(10),