diff --git a/VERSION.in b/VERSION.in index 5fb5a6b4f..d2ab029d3 100644 --- a/VERSION.in +++ b/VERSION.in @@ -1 +1 @@ -1.20 +1.21 diff --git a/cuebot/src/main/resources/conf/ddl/postgres/migrations/V41__scheduler_pending_query_indexes.sql b/cuebot/src/main/resources/conf/ddl/postgres/migrations/V41__scheduler_pending_query_indexes.sql new file mode 100644 index 000000000..1262ee7bd --- /dev/null +++ b/cuebot/src/main/resources/conf/ddl/postgres/migrations/V41__scheduler_pending_query_indexes.sql @@ -0,0 +1,41 @@ +-- Indexes supporting the rewritten QUERY_PENDING_BY_SHOW_FACILITY_TAG used +-- by the Rust scheduler's per-cluster pending-job query. Together they +-- eliminate the cardinality blowup from the previous DISTINCT pk_job join +-- and bring per-call cost from multi-second on big shows down to <500ms. +-- +-- OPERATIONAL NOTE +-- Flyway 5.2.0 (used by cuebot's test setup) wraps each migration in a +-- single transaction, and PostgreSQL rejects CREATE INDEX CONCURRENTLY +-- inside a transaction. The plain form below is safe for the embedded +-- test DB and for any environment where index-build downtime is acceptable. +-- +-- For production deployments against populated tables (where AccessExclusive +-- locks during the GIN index build could be measurable), apply this +-- migration manually via `psql` with CONCURRENTLY *before* running Flyway, +-- then mark the row in flyway_schema_history yourself, e.g.: +-- +-- CREATE INDEX CONCURRENTLY IF NOT EXISTS idx_layer_tags_array ... +-- CREATE INDEX CONCURRENTLY IF NOT EXISTS idx_job_pending_lookup ... +-- CREATE INDEX CONCURRENTLY IF NOT EXISTS idx_layer_stat_waiting ... +-- INSERT INTO flyway_schema_history(...) VALUES (40, ...); + +-- GIN index on layer.str_tags for the && array-overlap test in the rewritten +-- QUERY_PENDING_BY_SHOW_FACILITY_TAG. Without this, tag matching is a sequential +-- scan over every layer row. +CREATE INDEX IF NOT EXISTS idx_layer_tags_array + ON layer USING gin (string_to_array(REPLACE(str_tags, ' ', ''), '|')); + +-- Composite filter index on job for the bookable-job WHERE clause. +-- Plain (non-functional) pk_facility column is intentional: the scheduler +-- now compares pk_facility as a case-sensitive string (Cuebot writes +-- canonical casing on insert), so the previous LOWER() expression index +-- is no longer needed. +CREATE INDEX IF NOT EXISTS idx_job_pending_lookup + ON job (pk_show, pk_facility, str_state, b_paused) + WHERE str_state = 'PENDING' AND b_paused = false; + +-- Layer_stat lookup for the EXISTS subquery in the rewritten query. Restricts +-- the index to rows that actually have waiting frames, keeping it small. +CREATE INDEX IF NOT EXISTS idx_layer_stat_waiting + ON layer_stat (pk_layer) + WHERE int_waiting_count > 0; diff --git a/rust/config/scheduler.yaml b/rust/config/scheduler.yaml index 95b87a05a..57772fb77 100644 --- a/rust/config/scheduler.yaml +++ b/rust/config/scheduler.yaml @@ -97,18 +97,28 @@ queue: # Default: 300s (5 minutes) # job_back_off_duration: 300s + # Duration a cluster sleeps after a pass returned no dispatchable jobs. + # Larger values reduce empty-pass query load on the database. + # Default: 30s + # cluster_empty_sleep: 30s + # Chunk size for processing manual tags - # Default: 100 - # manual_tags_chunk_size: 100 + # Default: 50 + # manual_tags_chunk_size: 50 # Chunk size for processing hostname tags - # Default: 300 - # hostname_tags_chunk_size: 300 + # Default: 50 + # hostname_tags_chunk_size: 50 # Maximum number of host candidate attempts per layer # Default: 10 # host_candidate_attemps_per_layer: 10 + # Maximum number of jobs returned per cluster pass. Strict ORDER BY priority + # DESC means low-priority jobs are deferred to subsequent passes. + # Default: 20 + # max_jobs_per_cluster_pass: 20 + # Number of empty job cycles before scheduler quits (None = run forever) # Default: None # empty_job_cycles_before_quiting: 10 diff --git a/rust/crates/scheduler/src/cluster.rs b/rust/crates/scheduler/src/cluster.rs index 71a93d7c5..e98d15094 100644 --- a/rust/crates/scheduler/src/cluster.rs +++ b/rust/crates/scheduler/src/cluster.rs @@ -12,14 +12,16 @@ use std::{ collections::{BTreeSet, HashMap, HashSet}, + ops::ControlFlow, + panic::AssertUnwindSafe, sync::{ atomic::{AtomicBool, AtomicUsize, Ordering}, Arc, Mutex, RwLock, }, - time::{Duration, SystemTime}, + time::{Duration, Instant, SystemTime}, }; -use futures::StreamExt; +use futures::{FutureExt, StreamExt}; use itertools::Itertools; use miette::{IntoDiagnostic, Result}; use serde::{Deserialize, Serialize}; @@ -31,13 +33,14 @@ use crate::{ cluster_key::{Tag, TagType}, config::CONFIG, dao::{helpers::parse_uuid, ClusterDao}, + metrics::observe_cluster_round_trip, }; pub static CLUSTER_ROUNDS: AtomicUsize = AtomicUsize::new(0); #[derive(Serialize, Deserialize, Debug, Clone, Hash, PartialEq, Eq, PartialOrd, Ord)] pub struct Cluster { - pub facility_id: Uuid, + pub facility_id: String, pub show_id: Uuid, pub tags: BTreeSet, } @@ -55,7 +58,7 @@ impl std::fmt::Display for Cluster { } impl Cluster { - pub fn single_tag(facility_id: Uuid, show_id: Uuid, tag: Tag) -> Self { + pub fn single_tag(facility_id: String, show_id: Uuid, tag: Tag) -> Self { Cluster { facility_id, show_id, @@ -63,7 +66,7 @@ impl Cluster { } } - pub fn multiple_tag(facility_id: Uuid, show_id: Uuid, tags: Vec) -> Self { + pub fn multiple_tag(facility_id: String, show_id: Uuid, tags: Vec) -> Self { Cluster { facility_id, show_id, @@ -112,7 +115,7 @@ pub enum FeedMessage { /// .await?; /// ``` pub struct ClusterFeedBuilder { - facility_id: Option, + facility_id: Option, ignore_tags: Vec, clusters: Vec, entire_shows: Vec, @@ -170,7 +173,7 @@ impl ClusterFeedBuilder { impl ClusterFeed { /// Returns a builder for a feed scoped to the given facility. - pub fn facility(facility_id: Uuid) -> ClusterFeedBuilder { + pub fn facility(facility_id: String) -> ClusterFeedBuilder { ClusterFeedBuilder { facility_id: Some(facility_id), ignore_tags: Vec::new(), @@ -205,7 +208,7 @@ impl ClusterFeed { /// * `Ok(ClusterFeed)` - Successfully loaded cluster feed /// * `Err(miette::Error)` - Failed to load clusters from database pub async fn load_clusters( - facility_id: Option, + facility_id: Option, ignore_tags: &[String], shows_filter: Option>, ) -> Result> { @@ -213,12 +216,12 @@ impl ClusterFeed { // Fetch clusters for alloc and non_alloc tags let mut clusters_stream = cluster_dao - .fetch_alloc_clusters(facility_id, shows_filter.clone()) + .fetch_alloc_clusters(facility_id.clone(), shows_filter.clone()) .chain(cluster_dao.fetch_non_alloc_clusters(facility_id, shows_filter)); let mut clusters = Vec::new(); - let mut manual_tags: HashMap<(Uuid, Uuid), HashSet> = HashMap::new(); - let mut hardware_tags: HashMap<(Uuid, Uuid), HashSet> = HashMap::new(); - let mut hostname_tags: HashMap<(Uuid, Uuid), HashSet> = HashMap::new(); + let mut manual_tags: HashMap<(Uuid, String), HashSet> = HashMap::new(); + let mut hardware_tags: HashMap<(Uuid, String), HashSet> = HashMap::new(); + let mut hostname_tags: HashMap<(Uuid, String), HashSet> = HashMap::new(); // Collect all tags while let Some(record) = clusters_stream.next().await { @@ -229,7 +232,7 @@ impl ClusterFeed { continue; } - let facility_id = parse_uuid(&cluster.facility_id); + let facility_id = cluster.facility_id; let show_id = parse_uuid(&cluster.show_id); match cluster.ttype.as_str() { // Each alloc tag becomes its own cluster @@ -281,7 +284,11 @@ impl ClusterFeed { // Chunk Manual tags for ((show_id, facility_id), tags) in manual_tags.into_iter() { for chunk in &tags.into_iter().chunks(CONFIG.queue.manual_tags_chunk_size) { - clusters.push(Cluster::multiple_tag(facility_id, show_id, chunk.collect())) + clusters.push(Cluster::multiple_tag( + facility_id.clone(), + show_id, + chunk.collect(), + )) } } @@ -291,7 +298,11 @@ impl ClusterFeed { .into_iter() .chunks(CONFIG.queue.hostname_tags_chunk_size) { - clusters.push(Cluster::multiple_tag(facility_id, show_id, chunk.collect())) + clusters.push(Cluster::multiple_tag( + facility_id.clone(), + show_id, + chunk.collect(), + )) } } @@ -302,7 +313,11 @@ impl ClusterFeed { // Hardware share the same size as manual to simplify configuration .chunks(CONFIG.queue.manual_tags_chunk_size) { - clusters.push(Cluster::multiple_tag(facility_id, show_id, chunk.collect())) + clusters.push(Cluster::multiple_tag( + facility_id.clone(), + show_id, + chunk.collect(), + )) } } @@ -360,124 +375,182 @@ impl ClusterFeed { pub async fn stream(self, sender: mpsc::Sender) -> mpsc::Sender { // Use a small channel to ensure the producer waits for items to be consumed before // generating more - let (cancel_sender, mut feed_receiver) = mpsc::channel(8); + let (feed_sender, mut feed_receiver) = mpsc::channel(8); let stop_flag = self.stop_flag.clone(); let sleep_map = self.sleep_map.clone(); + // Tracks the last emission time per active cluster, for round-trip histogram. + // Entries are dropped when a cluster is put to sleep so wake-up doesn't produce + // a spurious sample covering the sleep duration. + let last_sent_map: Arc>> = + Arc::new(Mutex::new(HashMap::new())); + let last_sent_map_producer = last_sent_map.clone(); + // Stream clusters on the caller channel + let feed = self.clusters.clone(); + let current_index_atomic = self.current_index.clone(); tokio::spawn(async move { - let mut all_sleeping_rounds = 0; - let feed = self.clusters.clone(); - let current_index_atomic = self.current_index.clone(); + let mut all_sleeping_rounds: usize = 0; loop { - // Check stop flag - if stop_flag.load(Ordering::Relaxed) { - warn!("Cluster received a stop message. Stopping feed."); - break; - } - - let (item, cluster_size, completed_round) = { - let clusters = feed.read().unwrap_or_else(|poisoned| poisoned.into_inner()); - if clusters.is_empty() { - break; + let iteration = async { + // Check stop flag + if stop_flag.load(Ordering::Relaxed) { + warn!("Cluster received a stop message. Stopping feed."); + return ControlFlow::Break(()); } - let current_index = current_index_atomic.load(Ordering::Relaxed); - let item = clusters[current_index].clone(); - let next_index = (current_index + 1) % clusters.len(); - let completed_round = next_index == 0; // Detect wrap-around - current_index_atomic.store(next_index, Ordering::Relaxed); + let (item, cluster_size, completed_round) = { + let clusters = feed.read().unwrap_or_else(|poisoned| poisoned.into_inner()); + if clusters.is_empty() { + return ControlFlow::Break(()); + } + + let current_index = current_index_atomic.load(Ordering::Relaxed); + let item = clusters[current_index].clone(); + let next_index = (current_index + 1) % clusters.len(); + let completed_round = next_index == 0; // Detect wrap-around + current_index_atomic.store(next_index, Ordering::Relaxed); - (item, clusters.len(), completed_round) - }; + (item, clusters.len(), completed_round) + }; - // Skip cluster if it is marked as sleeping - let is_sleeping = { - let mut sleep_map_lock = sleep_map.lock().unwrap_or_else(|p| p.into_inner()); - if let Some(wake_up_time) = sleep_map_lock.get(&item) { - if *wake_up_time > SystemTime::now() { - // Still sleeping, skip it - true + // Skip cluster if it is marked as sleeping + let is_sleeping = { + let mut sleep_map_lock = + sleep_map.lock().unwrap_or_else(|p| p.into_inner()); + if let Some(wake_up_time) = sleep_map_lock.get(&item) { + if *wake_up_time > SystemTime::now() { + // Still sleeping, skip it + true + } else { + // Remove expired entries + sleep_map_lock.remove(&item); + false + } } else { - // Remove expired entries - sleep_map_lock.remove(&item); false } - } else { - false - } - }; - - if !is_sleeping && sender.send(item).await.is_err() { - warn!("Cluster receiver dropped. Stopping feed."); - break; - } + }; - // At end of round, add backoff sleep - if completed_round { - CLUSTER_ROUNDS.fetch_add(1, Ordering::Relaxed); + if !is_sleeping { + if sender.send(item.clone()).await.is_err() { + warn!("Cluster receiver dropped. Stopping feed."); + return ControlFlow::Break(()); + } + let now = Instant::now(); + let mut last_sent_lock = last_sent_map_producer + .lock() + .unwrap_or_else(|p| p.into_inner()); + if let Some(prev) = last_sent_lock.insert(item, now) { + observe_cluster_round_trip(now.duration_since(prev)); + } + } else if !completed_round { + // Skipped a sleeping cluster mid-round; yield so we don't starve the runtime. + tokio::task::yield_now().await; + } - // Check if all/most clusters are sleeping - let sleeping_count = { - let sleep_map_lock = sleep_map.lock().unwrap_or_else(|p| p.into_inner()); - sleep_map_lock.len() - }; - if sleeping_count >= cluster_size { - // Ensure this doesn't loop forever when there's a limit configured - all_sleeping_rounds += 1; - if let Some(max_empty_cycles) = CONFIG.queue.empty_job_cycles_before_quiting - { - if all_sleeping_rounds > max_empty_cycles { - warn!("All clusters have been sleeping for too long"); - break; + // At end of round, add backoff sleep + if completed_round { + CLUSTER_ROUNDS.fetch_add(1, Ordering::Relaxed); + + // Check if all/most clusters are sleeping + let sleeping_count = { + let sleep_map_lock = + sleep_map.lock().unwrap_or_else(|p| p.into_inner()); + sleep_map_lock.len() + }; + if sleeping_count >= cluster_size { + // Ensure this doesn't loop forever when there's a limit configured + all_sleeping_rounds += 1; + if let Some(max_empty_cycles) = + CONFIG.queue.empty_job_cycles_before_quiting + { + if all_sleeping_rounds > max_empty_cycles { + warn!("All clusters have been sleeping for too long"); + return ControlFlow::Break(()); + } } + + // All clusters sleeping, sleep longer + tokio::time::sleep(Duration::from_secs(5)).await; + } else if sleeping_count > 0 { + // Some clusters sleeping, brief pause + all_sleeping_rounds = 0; + tokio::time::sleep(Duration::from_millis(100)).await; + } else { + // Active work, minimal pause + all_sleeping_rounds = 0; + tokio::time::sleep(Duration::from_millis(10)).await; } + } + ControlFlow::Continue(()) + }; - // All clusters sleeping, sleep longer - tokio::time::sleep(Duration::from_secs(5)).await; - } else if sleeping_count > 0 { - // Some clusters sleeping, brief pause - tokio::time::sleep(Duration::from_millis(100)).await; - } else { - // Active work, minimal pause - tokio::time::sleep(Duration::from_millis(10)).await; + match AssertUnwindSafe(iteration).catch_unwind().await { + Ok(ControlFlow::Break(())) => break, + Ok(ControlFlow::Continue(())) => {} + Err(e) => { + error!("Cluster feed producer iteration panicked: {:?}", e); } } } }); // Process messages on the receiving end + let stop_flag_recv = self.stop_flag.clone(); let sleep_map = self.sleep_map.clone(); + let last_sent_map_receiver = last_sent_map.clone(); tokio::spawn(async move { - while let Some(message) = feed_receiver.recv().await { - match message { - FeedMessage::Sleep(cluster, duration) => { - if let Some(wake_up_time) = SystemTime::now().checked_add(duration) { - debug!("{:?} put to sleep for {}s", cluster, duration.as_secs()); - { - let mut sleep_map_lock = - sleep_map.lock().unwrap_or_else(|p| p.into_inner()); - sleep_map_lock.insert(cluster, wake_up_time); + loop { + let iteration = async { + let Some(message) = feed_receiver.recv().await else { + return ControlFlow::Break(()); + }; + match message { + FeedMessage::Sleep(cluster, duration) => { + let requested_wake_up_time = SystemTime::now().checked_add(duration); + if let Some(wake_up_time) = requested_wake_up_time { + debug!("{:?} put to sleep for {}s", cluster, duration.as_secs()); + { + let mut last_sent_lock = last_sent_map_receiver + .lock() + .unwrap_or_else(|p| p.into_inner()); + last_sent_lock.remove(&cluster); + } + { + let mut sleep_map_lock = + sleep_map.lock().unwrap_or_else(|p| p.into_inner()); + sleep_map_lock.insert(cluster, wake_up_time); + } + } else { + warn!( + "Sleep request ignored for {:?}. Invalid duration={}s", + cluster, + duration.as_secs() + ); } - } else { - warn!( - "Sleep request ignored for {:?}. Invalid duration={}s", - cluster, - duration.as_secs() - ); + ControlFlow::Continue(()) + } + FeedMessage::Stop() => { + stop_flag_recv.store(true, Ordering::Relaxed); + ControlFlow::Break(()) } } - FeedMessage::Stop() => { - self.stop_flag.store(true, Ordering::Relaxed); - break; + }; + + match AssertUnwindSafe(iteration).catch_unwind().await { + Ok(ControlFlow::Break(())) => break, + Ok(ControlFlow::Continue(())) => {} + Err(e) => { + error!("Cluster feed receiver iteration panicked: {:?}", e); } } } }); - cancel_sender + feed_sender } } @@ -489,9 +562,9 @@ impl ClusterFeed { /// /// # Returns /// -/// * `Ok(Uuid)` - The facility ID +/// * `Ok(String)` - The facility ID (verbatim from the DB, canonical casing) /// * `Err(miette::Error)` - If facility not found or database error -pub async fn get_facility_id(facility_name: &str) -> Result { +pub async fn get_facility_id(facility_name: &str) -> Result { let cluster_dao = ClusterDao::new().await?; cluster_dao .get_facility_id(facility_name) diff --git a/rust/crates/scheduler/src/cluster_key.rs b/rust/crates/scheduler/src/cluster_key.rs index 0aaafeb42..8902b0022 100644 --- a/rust/crates/scheduler/src/cluster_key.rs +++ b/rust/crates/scheduler/src/cluster_key.rs @@ -55,7 +55,7 @@ impl std::borrow::Borrow for Tag { #[derive(Serialize, Deserialize, Debug, Clone, Hash, PartialEq, Eq, PartialOrd, Ord)] pub struct ClusterKey { - pub facility_id: Uuid, + pub facility_id: String, pub show_id: Uuid, pub tag: Tag, } diff --git a/rust/crates/scheduler/src/config/mod.rs b/rust/crates/scheduler/src/config/mod.rs index 3d46e1419..04811abda 100644 --- a/rust/crates/scheduler/src/config/mod.rs +++ b/rust/crates/scheduler/src/config/mod.rs @@ -77,7 +77,17 @@ pub struct QueueConfig { pub memory_stranded_threshold: ByteSize, #[serde(with = "humantime_serde")] pub job_back_off_duration: Duration, + /// Duration a cluster sleeps after a pass returned no dispatchable jobs. + /// Larger values reduce empty-pass query load on the database. + #[serde(with = "humantime_serde")] + pub cluster_empty_sleep: Duration, pub stream: StreamConfig, + /// Maximum number of jobs returned per cluster pass. Caps the per-pass + /// dispatch cost so a big-show cluster doesn't iterate thousands of jobs + /// in a single round. Strict `ORDER BY priority DESC` means low-priority + /// jobs are deferred to subsequent passes when the high-priority backlog + /// drains. + pub max_jobs_per_cluster_pass: i64, pub manual_tags_chunk_size: usize, pub hostname_tags_chunk_size: usize, pub host_candidate_attempts_per_layer: usize, @@ -103,9 +113,11 @@ impl Default for QueueConfig { core_multiplier: 100, memory_stranded_threshold: ByteSize::gib(2), job_back_off_duration: Duration::from_secs(300), + cluster_empty_sleep: Duration::from_secs(30), stream: StreamConfig::default(), - manual_tags_chunk_size: 100, - hostname_tags_chunk_size: 300, + max_jobs_per_cluster_pass: 20, + manual_tags_chunk_size: 50, + hostname_tags_chunk_size: 50, host_candidate_attempts_per_layer: 10, empty_job_cycles_before_quiting: None, mem_reserved_min: ByteSize::mib(250), @@ -252,8 +264,7 @@ pub struct SchedulerConfig { impl SchedulerConfig { pub fn show_names(&self) -> Option> { - let mut show_names: HashSet = - HashSet::from_iter(self.entire_shows.iter().cloned()); + let mut show_names: HashSet = HashSet::from_iter(self.entire_shows.iter().cloned()); for tag in &self.alloc_tags { show_names.insert(tag.show.clone()); } diff --git a/rust/crates/scheduler/src/dao/cluster_dao.rs b/rust/crates/scheduler/src/dao/cluster_dao.rs index 3aeb8fa41..24af9ed5c 100644 --- a/rust/crates/scheduler/src/dao/cluster_dao.rs +++ b/rust/crates/scheduler/src/dao/cluster_dao.rs @@ -69,7 +69,7 @@ FROM host_tag JOIN show sh ON sub.pk_show = sh.pk_show WHERE str_tag_type = 'ALLOC' AND sh.b_active = true - AND LOWER(a.pk_facility) = LOWER($1) + AND a.pk_facility = $1 "#; static QUERY_ALLOC_CLUSTERS_WITH_SHOW_NAMES: &str = r#" @@ -99,7 +99,7 @@ FROM host_tag JOIN show sh ON sub.pk_show = sh.pk_show WHERE str_tag_type = 'ALLOC' AND sh.b_active = true - AND LOWER(a.pk_facility) = LOWER($1) + AND a.pk_facility = $1 AND sh.str_name = ANY($2) "#; @@ -130,7 +130,7 @@ JOIN host h on h.pk_host = host_tag.pk_host JOIN alloc a ON a.pk_alloc = h.pk_alloc JOIN subscription s ON a.pk_alloc = s.pk_alloc WHERE str_tag_type <> 'ALLOC' - AND LOWER(a.pk_facility) = LOWER($1) + AND a.pk_facility = $1 "#; static QUERY_NON_ALLOC_CLUSTERS_WITH_SHOW_NAMES: &str = r#" @@ -160,7 +160,7 @@ JOIN alloc a ON a.pk_alloc = h.pk_alloc JOIN subscription s ON a.pk_alloc = s.pk_alloc JOIN show sh ON sh.pk_show = s.pk_show WHERE str_tag_type <> 'ALLOC' - AND LOWER(a.pk_facility) = LOWER($1) + AND a.pk_facility = $1 AND sh.str_name = ANY($2) "#; @@ -207,7 +207,7 @@ impl ClusterDao { /// * `Stream>` - Stream of allocation clusters pub fn fetch_alloc_clusters( &self, - facility_id: Option, + facility_id: Option, shows_filter: Option>, ) -> std::pin::Pin> + '_>> { match (facility_id, shows_filter) { @@ -215,13 +215,13 @@ impl ClusterDao { sqlx::query_as::<_, ClusterModel>( QUERY_ALLOC_CLUSTERS_WITH_FACILITY_AND_SHOW_NAMES, ) - .bind(fid.to_string()) + .bind(fid) .bind(show_names) .fetch(&*self.connection_pool), ), (Some(fid), None) => Box::pin( sqlx::query_as::<_, ClusterModel>(QUERY_ALLOC_CLUSTERS_WITH_FACILITY) - .bind(fid.to_string()) + .bind(fid) .fetch(&*self.connection_pool), ), (None, Some(show_names)) => Box::pin( @@ -251,7 +251,7 @@ impl ClusterDao { /// * `Stream>` - Stream of non-allocation clusters pub fn fetch_non_alloc_clusters( &self, - facility_id: Option, + facility_id: Option, shows_filter: Option>, ) -> std::pin::Pin> + '_>> { match (facility_id, shows_filter) { @@ -259,13 +259,13 @@ impl ClusterDao { sqlx::query_as::<_, ClusterModel>( QUERY_NON_ALLOC_CLUSTERS_WITH_FACILITY_AND_SHOW_NAMES, ) - .bind(fid.to_string()) + .bind(fid) .bind(show_names) .fetch(&*self.connection_pool), ), (Some(fid), None) => Box::pin( sqlx::query_as::<_, ClusterModel>(QUERY_NON_ALLOC_CLUSTERS_WITH_FACILITY) - .bind(fid.to_string()) + .bind(fid) .fetch(&*self.connection_pool), ), (None, Some(show_names)) => Box::pin( @@ -288,14 +288,14 @@ impl ClusterDao { /// /// # Returns /// - /// * `Ok(Uuid)` - The facility ID + /// * `Ok(String)` - The facility ID (verbatim from the DB, canonical casing) /// * `Err(sqlx::Error)` - If facility not found or database error - pub async fn get_facility_id(&self, facility_name: &str) -> Result { + pub async fn get_facility_id(&self, facility_name: &str) -> Result { let row: (String,) = sqlx::query_as(QUERY_FACILITY_ID) .bind(facility_name) .fetch_one(&*self.connection_pool) .await?; - Ok(parse_uuid(&row.0)) + Ok(row.0) } /// Looks up a show ID by show name. diff --git a/rust/crates/scheduler/src/dao/frame_dao.rs b/rust/crates/scheduler/src/dao/frame_dao.rs index 152b6af70..b6c44d187 100644 --- a/rust/crates/scheduler/src/dao/frame_dao.rs +++ b/rust/crates/scheduler/src/dao/frame_dao.rs @@ -113,7 +113,7 @@ impl From for DispatchFrame { id: parse_uuid(&val.pk_frame), frame_name: val.str_frame_name, show_id: parse_uuid(&val.pk_show), - facility_id: parse_uuid(&val.pk_facility), + facility_id: val.pk_facility, job_id: parse_uuid(&val.pk_job), layer_id: parse_uuid(&val.pk_layer), command: val.str_cmd, diff --git a/rust/crates/scheduler/src/dao/host_dao.rs b/rust/crates/scheduler/src/dao/host_dao.rs index b517a340e..23c975d4e 100644 --- a/rust/crates/scheduler/src/dao/host_dao.rs +++ b/rust/crates/scheduler/src/dao/host_dao.rs @@ -179,7 +179,7 @@ FROM host h INNER JOIN alloc a ON h.pk_alloc = a.pk_alloc INNER JOIN subscription s ON s.pk_alloc = a.pk_alloc AND s.pk_show = $1 INNER JOIN host_tag ht ON h.pk_host = ht.pk_host -WHERE LOWER(a.pk_facility) = LOWER($2) +WHERE a.pk_facility = $2 AND h.str_lock_state = 'OPEN' AND hs.str_state = 'UP' AND ht.str_tag = $3 @@ -265,12 +265,12 @@ impl HostDao { pub async fn fetch_hosts_by_show_facility_tag<'a>( &'a self, show_id: Uuid, - facility_id: Uuid, + facility_id: &'a str, tag: &'a str, ) -> Result, sqlx::Error> { let out = sqlx::query_as::<_, HostModel>(QUERY_HOST_BY_SHOW_FACILITY_AND_TAG) .bind(show_id.to_string()) - .bind(facility_id.to_string()) + .bind(facility_id) .bind(tag) .fetch_all(&*self.connection_pool) .await; diff --git a/rust/crates/scheduler/src/dao/job_dao.rs b/rust/crates/scheduler/src/dao/job_dao.rs index 0dfcaf83c..5ce077622 100644 --- a/rust/crates/scheduler/src/dao/job_dao.rs +++ b/rust/crates/scheduler/src/dao/job_dao.rs @@ -65,49 +65,50 @@ impl DispatchJob { } static QUERY_PENDING_BY_SHOW_FACILITY_TAG: &str = r#" ---bookable_shows: Shows that have room in at least one of its subscriptions +-- bookable_shows: shows that still have room in at least one subscription. WITH bookable_shows AS ( - SELECT - distinct w.pk_show, - sh.str_name as show_name + SELECT DISTINCT w.pk_show, sh.str_name AS show_name FROM subscription s INNER JOIN vs_waiting w ON s.pk_show = w.pk_show INNER JOIN show sh ON sh.pk_show = w.pk_show WHERE s.pk_show = $1 - -- Burst == 0 is used to freeze a subscription + -- Burst == 0 is used to freeze a subscription. AND s.int_burst > 0 - -- At least one core unit available + -- At least one core unit available. AND s.int_burst - s.int_cores >= $2 AND s.int_cores < s.int_burst -), -filtered_jobs AS ( - SELECT - j.pk_job, - jr.int_priority, - bookable_shows.show_name - FROM job j - INNER JOIN bookable_shows on j.pk_show = bookable_shows.pk_show - INNER JOIN job_resource jr ON j.pk_job = jr.pk_job - INNER JOIN folder f ON j.pk_folder = f.pk_folder - INNER JOIN folder_resource fr ON f.pk_folder = fr.pk_folder - INNER JOIN layer l ON l.pk_job = j.pk_job - WHERE j.str_state = 'PENDING' - AND j.b_paused = false - -- Check for room on folder resources - AND (fr.int_max_cores = -1 OR fr.int_cores + l.int_cores_min < fr.int_max_cores) - AND (fr.int_max_gpus = -1 OR fr.int_gpus + l.int_gpus_min < fr.int_max_gpus) - -- Match tags: jobs with at least one layer that contains the queried tag - AND string_to_array(REPLACE($3, ' ', ''), '|') && string_to_array(REPLACE(l.str_tags, ' ', ''), '|') - AND LOWER(j.pk_facility) = LOWER($4) ) -SELECT DISTINCT - fj.pk_job, - fj.int_priority, - fj.show_name -FROM filtered_jobs fj -INNER JOIN layer_stat ls ON fj.pk_job = ls.pk_job -WHERE ls.int_waiting_count > 0 -ORDER BY int_priority DESC +SELECT + j.pk_job, + jr.int_priority, + bs.show_name +FROM job j +INNER JOIN bookable_shows bs ON j.pk_show = bs.pk_show +INNER JOIN job_resource jr ON j.pk_job = jr.pk_job +INNER JOIN folder f ON j.pk_folder = f.pk_folder +INNER JOIN folder_resource fr ON f.pk_folder = fr.pk_folder +WHERE j.str_state = 'PENDING' + AND j.b_paused = false + AND j.pk_facility = $4 + -- Folder must have any room at all; per-layer fit is checked below. + AND (fr.int_max_cores = -1 OR fr.int_cores < fr.int_max_cores) + AND (fr.int_max_gpus = -1 OR fr.int_gpus < fr.int_max_gpus) + -- The job must have at least one layer that matches the tag set, has waiting + -- frames, and fits within the folder cap. EXISTS short-circuits per job and + -- avoids the cardinality blowup of joining layer + layer_stat at the outer level. + AND EXISTS ( + SELECT 1 + FROM layer l + INNER JOIN layer_stat ls ON ls.pk_layer = l.pk_layer + WHERE l.pk_job = j.pk_job + AND ls.int_waiting_count > 0 + AND string_to_array(REPLACE($3, ' ', ''), '|') + && string_to_array(REPLACE(l.str_tags, ' ', ''), '|') + AND (fr.int_max_cores = -1 OR fr.int_cores + l.int_cores_min < fr.int_max_cores) + AND (fr.int_max_gpus = -1 OR fr.int_gpus + l.int_gpus_min < fr.int_max_gpus) + ) +ORDER BY jr.int_priority DESC +LIMIT $5 "#; impl JobDao { @@ -160,7 +161,7 @@ impl JobDao { pub async fn query_pending_jobs_by_show_facility_and_tags( &self, show_id: Uuid, - facility_id: Uuid, + facility_id: &str, tags: impl Iterator, ) -> Result, sqlx::Error> { trace!( @@ -177,7 +178,8 @@ impl JobDao { .bind(show_id.to_string()) .bind(CONFIG.queue.core_multiplier as i32) .bind(tags_collected.join(" | ").to_string()) - .bind(facility_id.to_string()) + .bind(facility_id) + .bind(CONFIG.queue.max_jobs_per_cluster_pass) .fetch_all(&*self.connection_pool) .await; observe_job_query_duration(start.elapsed()); diff --git a/rust/crates/scheduler/src/dao/layer_dao.rs b/rust/crates/scheduler/src/dao/layer_dao.rs index 965a478a8..1b0fd17a0 100644 --- a/rust/crates/scheduler/src/dao/layer_dao.rs +++ b/rust/crates/scheduler/src/dao/layer_dao.rs @@ -121,7 +121,7 @@ impl DispatchLayer { DispatchLayer { id: parse_uuid(&layer.pk_layer), job_id: parse_uuid(&layer.pk_job), - facility_id: parse_uuid(&layer.pk_facility), + facility_id: layer.pk_facility, show_id: parse_uuid(&layer.pk_show), job_name: layer.str_job_name, layer_name: layer.str_name, diff --git a/rust/crates/scheduler/src/host_cache/actor.rs b/rust/crates/scheduler/src/host_cache/actor.rs index 40d098850..49905f817 100644 --- a/rust/crates/scheduler/src/host_cache/actor.rs +++ b/rust/crates/scheduler/src/host_cache/actor.rs @@ -19,7 +19,7 @@ use scc::{hash_map::OccupiedEntry, HashMap, HashSet}; use std::{ cmp::Ordering, sync::{ - atomic::{self, AtomicU64}, + atomic::{self, AtomicBool, AtomicU64}, Arc, }, time::{Duration, SystemTime}, @@ -46,6 +46,18 @@ pub struct HostCacheService { cache_hit: Arc, cache_miss: Arc, concurrency_semaphore: Arc, + /// Set while a `refresh_cache` task is in flight. Skips overlapping ticks + /// when the previous refresh hasn't completed (e.g. under DB pressure). + refresh_in_progress: Arc, +} + +/// RAII guard that resets `refresh_in_progress` on drop, including on panic. +struct RefreshGuard(Arc); + +impl Drop for RefreshGuard { + fn drop(&mut self) { + self.0.store(false, atomic::Ordering::Release); + } } /// Use a reservation system to prevent race conditions when trying to book a host @@ -75,8 +87,25 @@ impl Actor for HostCacheService { ctx.run_interval(CONFIG.host_cache.monitoring_interval, move |_act, ctx| { let service = service_for_monitor.clone(); + // Skip this tick if the previous refresh is still running. Under DB + // pressure, refreshes can take longer than the monitoring interval, + // and stacking spawns multiplies PG load. + if service + .refresh_in_progress + .swap(true, atomic::Ordering::AcqRel) + { + debug!("host_cache refresh skipped: previous run still in flight"); + return; + } let actor_clone = service.clone(); - ctx.spawn(async move { service.refresh_cache().await }.into_actor(&actor_clone)); + let flag = service.refresh_in_progress.clone(); + ctx.spawn( + async move { + let _guard = RefreshGuard(flag); + service.refresh_cache().await; + } + .into_actor(&actor_clone), + ); }); ctx.run_interval(CONFIG.host_cache.clean_up_interval, move |_act, _ctx| { @@ -181,6 +210,7 @@ impl HostCacheService { CONFIG.host_cache.concurrent_fetch_permit, )), reserved_hosts: Arc::new(HashMap::new()), + refresh_in_progress: Arc::new(AtomicBool::new(false)), }) } @@ -205,7 +235,7 @@ impl HostCacheService { /// * `Err(HostCacheError)` - No suitable host found or database error async fn check_out( &self, - facility_id: Uuid, + facility_id: String, show_id: Uuid, tags: Vec, cores: CoreSize, @@ -365,13 +395,13 @@ impl HostCacheService { #[allow(clippy::map_entry)] fn gen_cache_keys( &self, - facility_id: Uuid, + facility_id: String, show_id: Uuid, tags: Vec, ) -> impl IntoIterator { tags.into_iter() - .map(|tag| ClusterKey { - facility_id, + .map(move |tag| ClusterKey { + facility_id: facility_id.clone(), show_id, tag, }) @@ -472,7 +502,7 @@ impl HostCacheService { let tag = key.tag.to_string(); let hosts = self .host_dao - .fetch_hosts_by_show_facility_tag(key.show_id, key.facility_id, &tag) + .fetch_hosts_by_show_facility_tag(key.show_id, &key.facility_id, &tag) .await .into_diagnostic()?; diff --git a/rust/crates/scheduler/src/host_cache/messages.rs b/rust/crates/scheduler/src/host_cache/messages.rs index bb25ced14..26b4c4ae9 100644 --- a/rust/crates/scheduler/src/host_cache/messages.rs +++ b/rust/crates/scheduler/src/host_cache/messages.rs @@ -63,7 +63,7 @@ pub struct CheckOut where F: Fn(&Host) -> bool, { - pub facility_id: Uuid, + pub facility_id: String, pub show_id: Uuid, pub tags: Vec, pub cores: CoreSize, diff --git a/rust/crates/scheduler/src/main.rs b/rust/crates/scheduler/src/main.rs index 842853c63..c10e586f2 100644 --- a/rust/crates/scheduler/src/main.rs +++ b/rust/crates/scheduler/src/main.rs @@ -190,7 +190,7 @@ impl JobQueueCli { .await .wrap_err(format!("Could not find show {}.", alloc_tag.show))?; clusters.push(Cluster::single_tag( - *facility_id, + facility_id.clone(), show_id, Tag { name: alloc_tag.tag.clone(), @@ -205,7 +205,7 @@ impl JobQueueCli { .await .wrap_err(format!("Could not find show {}.", manual_tag.show))?; clusters.push(Cluster::multiple_tag( - *facility_id, + facility_id.clone(), show_id, manual_tag .tags diff --git a/rust/crates/scheduler/src/metrics/mod.rs b/rust/crates/scheduler/src/metrics/mod.rs index f96eabbb1..89ec265d8 100644 --- a/rust/crates/scheduler/src/metrics/mod.rs +++ b/rust/crates/scheduler/src/metrics/mod.rs @@ -70,6 +70,14 @@ lazy_static! { vec![0.001, 0.005, 0.01, 0.05, 0.1, 0.5, 1.0, 5.0, 10.0] ) .expect("Failed to register job_query_duration_seconds histogram"); + + // Cluster feed metrics from cluster.rs + pub static ref CLUSTER_ROUND_TRIP_SECONDS: Histogram = register_histogram!( + "scheduler_cluster_round_trip_seconds", + "Time between successive emissions of the same active (non-sleeping) cluster", + vec![0.05, 0.1, 0.25, 0.5, 1.0, 2.5, 5.0, 10.0, 30.0, 60.0, 120.0] + ) + .expect("Failed to register cluster_round_trip_seconds histogram"); } /// Handler for the /metrics endpoint @@ -166,3 +174,9 @@ pub fn observe_time_to_book(duration: Duration) { pub fn observe_job_query_duration(duration: Duration) { JOB_QUERY_DURATION_SECONDS.observe(duration.as_secs_f64()); } + +/// Helper function to observe cluster round-trip duration +#[inline] +pub fn observe_cluster_round_trip(duration: Duration) { + CLUSTER_ROUND_TRIP_SECONDS.observe(duration.as_secs_f64()); +} diff --git a/rust/crates/scheduler/src/models/frame.rs b/rust/crates/scheduler/src/models/frame.rs index b83aaa2e0..e44935307 100644 --- a/rust/crates/scheduler/src/models/frame.rs +++ b/rust/crates/scheduler/src/models/frame.rs @@ -26,7 +26,7 @@ pub struct DispatchFrame { // LayerEntity fields pub show_id: Uuid, - pub facility_id: Uuid, + pub facility_id: String, pub job_id: Uuid, // FrameEntity fields diff --git a/rust/crates/scheduler/src/models/layer.rs b/rust/crates/scheduler/src/models/layer.rs index 10f795fa8..d42f73b1f 100644 --- a/rust/crates/scheduler/src/models/layer.rs +++ b/rust/crates/scheduler/src/models/layer.rs @@ -23,7 +23,7 @@ use crate::models::{core_size::CoreSize, fmt_uuid, DispatchFrame}; pub struct DispatchLayer { pub id: Uuid, pub job_id: Uuid, - pub facility_id: Uuid, + pub facility_id: String, pub show_id: Uuid, pub job_name: String, pub layer_name: String, diff --git a/rust/crates/scheduler/src/models/subscription.rs b/rust/crates/scheduler/src/models/subscription.rs index a9462bfab..e62e462ca 100644 --- a/rust/crates/scheduler/src/models/subscription.rs +++ b/rust/crates/scheduler/src/models/subscription.rs @@ -78,7 +78,7 @@ pub struct Allocation { pub billable: bool, /// Facility ID that owns this allocation - pub facility_id: Uuid, + pub facility_id: String, /// Whether this allocation is enabled pub enabled: bool, diff --git a/rust/crates/scheduler/src/pipeline/dispatcher/actor.rs b/rust/crates/scheduler/src/pipeline/dispatcher/actor.rs index e020644d0..2836b0fee 100644 --- a/rust/crates/scheduler/src/pipeline/dispatcher/actor.rs +++ b/rust/crates/scheduler/src/pipeline/dispatcher/actor.rs @@ -1291,7 +1291,7 @@ mod tests { id: Uuid::new_v4(), frame_name: "0001-test_frame".to_string(), show_id: Uuid::new_v4(), - facility_id: Uuid::new_v4(), + facility_id: Uuid::new_v4().to_string(), job_id: Uuid::new_v4(), layer_id: Uuid::new_v4(), command: "echo 'test command'".to_string(), diff --git a/rust/crates/scheduler/src/pipeline/entrypoint.rs b/rust/crates/scheduler/src/pipeline/entrypoint.rs index ba7ef55e4..a50626cac 100644 --- a/rust/crates/scheduler/src/pipeline/entrypoint.rs +++ b/rust/crates/scheduler/src/pipeline/entrypoint.rs @@ -12,7 +12,6 @@ use std::sync::atomic::{AtomicUsize, Ordering}; use std::sync::Arc; -use std::time::Duration; use futures::{stream, StreamExt}; use tokio::sync::mpsc; @@ -64,7 +63,7 @@ pub async fn run(cluster_feed: ClusterFeed) -> miette::Result<()> { let jobs = job_fetcher .query_pending_jobs_by_show_facility_and_tags( cluster.show_id, - cluster.facility_id, + &cluster.facility_id, cluster.tags.iter().map(|tag| tag.name.clone()), ) .await; @@ -91,7 +90,10 @@ pub async fn run(cluster_feed: ClusterFeed) -> miette::Result<()> { // queries with no outcome if processed_jobs.load(Ordering::Relaxed) == 0 { let _ = feed_sender - .send(FeedMessage::Sleep(cluster, Duration::from_secs(3))) + .send(FeedMessage::Sleep( + cluster, + CONFIG.queue.cluster_empty_sleep, + )) .await; } diff --git a/rust/crates/scheduler/src/pipeline/layer_permit.rs b/rust/crates/scheduler/src/pipeline/layer_permit.rs index 72db23f75..720a1e79c 100644 --- a/rust/crates/scheduler/src/pipeline/layer_permit.rs +++ b/rust/crates/scheduler/src/pipeline/layer_permit.rs @@ -140,7 +140,7 @@ impl Handler for LayerPermitService { _ => { // No valid permit exists - grant new permit let new_permit = LayerPermit::new(duration); - let _ = self.permits.insert_sync(id, new_permit); + let _ = self.permits.upsert_sync(id, new_permit); debug!("Granted permit for layer {} (duration: {:?})", id, duration); true } diff --git a/rust/crates/scheduler/src/pipeline/matcher.rs b/rust/crates/scheduler/src/pipeline/matcher.rs index 3c2761e75..025ff82a2 100644 --- a/rust/crates/scheduler/src/pipeline/matcher.rs +++ b/rust/crates/scheduler/src/pipeline/matcher.rs @@ -286,7 +286,7 @@ impl MatchingService { let host_candidate = self .host_service .send(CheckOut { - facility_id: layer.facility_id, + facility_id: layer.facility_id.clone(), show_id: layer.show_id, tags, cores: cores_requested, diff --git a/rust/crates/scheduler/src/resource_accounting.rs b/rust/crates/scheduler/src/resource_accounting.rs index 963c627da..66b76cd5a 100644 --- a/rust/crates/scheduler/src/resource_accounting.rs +++ b/rust/crates/scheduler/src/resource_accounting.rs @@ -12,12 +12,14 @@ use std::{ collections::HashMap, + panic::AssertUnwindSafe, sync::{Arc, RwLock}, }; +use futures::FutureExt; use miette::Result; use tokio::{sync::OnceCell, time}; -use tracing::warn; +use tracing::{error, warn}; use uuid::Uuid; use crate::{ @@ -71,15 +73,17 @@ impl ResourceAccountingService { let Some(sub) = show_subs.get_mut(&alloc_name) else { continue; }; - sub.booked_cores = - CoreSize::from_multiplied(cores_booked.try_into().unwrap_or_else(|_| { + sub.booked_cores = match cores_booked.try_into() { + Ok(multiplied) => CoreSize::from_multiplied(multiplied), + Err(_) => { warn!( "Recomputed booked cores overflowed i32 for \ show={show_id} alloc={alloc_name}, \ using subscription table value" ); - sub.booked_cores.value() - })); + sub.booked_cores + } + }; sub.booked_gpus = gpus_booked.try_into().unwrap_or_else(|_| { warn!( "Recomputed booked GPUs overflowed u32 for \ @@ -120,8 +124,15 @@ impl ResourceAccountingService { loop { interval.tick().await; - if let Err(err) = dao.recompute_all_from_proc(&target_shows_opt).await { - warn!("Failed to recompute resource accounting tables from proc: {err}"); + let result = AssertUnwindSafe(async { + if let Err(err) = dao.recompute_all_from_proc(&target_shows_opt).await { + warn!("Failed to recompute resource accounting tables from proc: {err}"); + } + }) + .catch_unwind() + .await; + if let Err(e) = result { + error!("Resource recalculation iteration panicked: {:?}", e); } } }); @@ -139,7 +150,14 @@ impl ResourceAccountingService { loop { interval.tick().await; - recalculate_and_refresh(&cache, &dao, &target_shows).await; + let result = AssertUnwindSafe(async { + recalculate_and_refresh(&cache, &dao, &target_shows).await; + }) + .catch_unwind() + .await; + if let Err(e) = result { + error!("Subscription recalculation iteration panicked: {:?}", e); + } } }); } diff --git a/rust/crates/scheduler/tests/util.rs b/rust/crates/scheduler/tests/util.rs index a7bb6c29c..8d196b190 100644 --- a/rust/crates/scheduler/tests/util.rs +++ b/rust/crates/scheduler/tests/util.rs @@ -89,10 +89,12 @@ pub fn create_test_config() -> Config { core_multiplier: 100, memory_stranded_threshold: bytesize::ByteSize::mb(100), job_back_off_duration: Duration::from_secs(10), + cluster_empty_sleep: Duration::from_secs(30), stream: scheduler::config::StreamConfig { cluster_buffer_size: 4, job_buffer_size: 8, }, + max_jobs_per_cluster_pass: 20, manual_tags_chunk_size: 10, hostname_tags_chunk_size: 20, host_candidate_attempts_per_layer: 5, @@ -592,7 +594,7 @@ pub async fn create_test_data( // Clusters. Chunk manual tags in approximatelly 4 groups for chunk in tags.chunks(tags.len() / 4) { let cluster = Cluster::multiple_tag( - facility_id, + facility_id.to_string(), show_id, chunk .iter() @@ -626,7 +628,7 @@ pub async fn create_test_data( for (alloc_id, alloc_name) in allocs.iter() { let cluster = Cluster::single_tag( - facility_id, + facility_id.to_string(), show_id, Tag { name: alloc_name.clone(),