From b5478b9c582ed29e102f75bf284afe6a2ffad50e Mon Sep 17 00:00:00 2001 From: Nathan Keilbart Date: Tue, 7 Apr 2026 22:04:59 -0700 Subject: [PATCH 1/2] Move Slurm sacct collection off the completion path Record Slurm job completion before any sacct lookup so short jobs do not inherit accounting latency. Queue completed steps by allocation and persist slurm_stats from a background worker after resources are freed. --- src/client/async_cli_command.rs | 287 +++++++++++++++++--------------- src/client/job_runner.rs | 193 ++++++++++++++++----- 2 files changed, 301 insertions(+), 179 deletions(-) diff --git a/src/client/async_cli_command.rs b/src/client/async_cli_command.rs index b26dbd97..f0ba4dab 100644 --- a/src/client/async_cli_command.rs +++ b/src/client/async_cli_command.rs @@ -29,9 +29,10 @@ use crate::client::resource_monitor::ResourceMonitor; use crate::client::slurm_utils::{parse_slurm_cpu_time, parse_slurm_memory}; use crate::client::workflow_spec::{ExecutionMode, StdioMode}; use crate::memory_utils::{memory_string_to_bytes, memory_string_to_mb}; -use crate::models::{JobModel, JobStatus, ResourceRequirementsModel, ResultModel, SlurmStatsModel}; +use crate::models::{JobModel, JobStatus, ResourceRequirementsModel, ResultModel}; use chrono::{DateTime, Utc}; use log::{self, debug, error, info, warn}; +use std::collections::{HashMap, HashSet}; use std::fs::File; use std::path::Path; use std::process::{Child, Command, Stdio}; @@ -148,10 +149,10 @@ pub struct AsyncCliCommand { workflow_id: Option, run_id: Option, attempt_id: Option, + /// Slurm allocation ID when running inside an allocation. + slurm_job_id: Option, /// Slurm step name set when running inside an allocation (for sacct lookup). step_name: Option, - /// Slurm accounting stats collected via sacct after step completion. - slurm_stats: Option, handle: Option, pid: Option, pub is_running: bool, @@ -177,8 +178,8 @@ impl AsyncCliCommand { workflow_id: None, run_id: None, attempt_id: None, + slurm_job_id: None, step_name: None, - slurm_stats: None, handle: None, pid: None, is_running: false, @@ -199,6 +200,11 @@ impl AsyncCliCommand { self.step_name.as_deref() } + /// Returns the Slurm allocation ID, if running inside an allocation. + pub fn slurm_job_id(&self) -> Option<&str> { + self.slurm_job_id.as_deref() + } + #[allow(clippy::too_many_arguments)] pub fn start( &mut self, @@ -355,6 +361,7 @@ impl AsyncCliCommand { self.workflow_id = Some(workflow_id); self.run_id = Some(run_id); self.attempt_id = Some(attempt_id); + self.slurm_job_id = slurm_job_id; self.is_running = true; self.start_time = Utc::now(); self.status = JobStatus::Running; @@ -506,12 +513,6 @@ impl AsyncCliCommand { result } - /// Returns the Slurm accounting stats collected for this job step, if any. - /// Only populated when the job ran inside a Slurm allocation and sacct succeeded. - pub fn take_slurm_stats(&mut self) -> Option { - self.slurm_stats.take() - } - /// Immediately kills the job process using SIGKILL. /// /// This method sends SIGKILL to the process, which cannot be caught or ignored. @@ -739,71 +740,6 @@ impl AsyncCliCommand { self.return_code = Some(return_code); self.handle = None; - // Collect Slurm accounting stats via sacct when running inside an allocation. - // Note: collect_sacct_stats is synchronous and may delay this polling cycle: it sleeps - // 5 seconds between retry attempts (up to 6 attempts, worst-case ~25 seconds) when the - // Slurm accounting daemon hasn't written the step record yet. - if let (Ok(slurm_job_id), Some(step_name)) = - (std::env::var("SLURM_JOB_ID"), self.step_name.as_deref()) - { - info!( - "Collecting sacct stats for workflow_id={} job_id={} step={}", - self.workflow_id.unwrap_or(0), - self.job_id, - step_name - ); - if let Some(stats) = collect_sacct_stats(&slurm_job_id, step_name) - && let (Some(workflow_id), Some(run_id), Some(attempt_id)) = - (self.workflow_id, self.run_id, self.attempt_id) - { - // Override the return code based on sacct State. - // When Slurm's cgroup OOM-kills a step, srun exits with code 1 - // and sacct ExitCode is 0:125 — neither produces the conventional - // 137 (128+SIGKILL) that recovery heuristics check. The sacct State - // field reliably reports OUT_OF_MEMORY / TIMEOUT. - // - // TIMEOUT is only overridden when the process did not exit cleanly - // (return_code != 0). When the process handled SIGTERM (from - // --signal) and exited 0, we keep the successful result even though - // sacct may report State=TIMEOUT for the step. - // - // TIMEOUT maps to Terminated (system-initiated kill due to walltime) - // rather than Failed (job error), matching the old behaviour where - // the runner would send SIGTERM before the allocation expired. - if let Some(ref state) = stats.state { - let override_rc = match state.as_str() { - "OUT_OF_MEMORY" => Some((137i64, JobStatus::Failed)), - "TIMEOUT" if return_code != 0 => Some((152i64, JobStatus::Terminated)), - _ => None, - }; - if let Some((sacct_rc, sacct_status)) = override_rc { - info!( - "Overriding srun return_code {} with {} (sacct State={}) for \ - workflow_id={} job_id={} step={}", - return_code, sacct_rc, state, workflow_id, self.job_id, step_name - ); - self.return_code = Some(sacct_rc); - self.status = sacct_status; - } - } - - let mut slurm_stats = - SlurmStatsModel::new(workflow_id, self.job_id, run_id, attempt_id); - slurm_stats.slurm_job_id = Some(slurm_job_id); - slurm_stats.max_rss_bytes = stats.max_rss_bytes; - slurm_stats.max_vm_size_bytes = stats.max_vm_size_bytes; - slurm_stats.max_disk_read_bytes = stats.max_disk_read_bytes; - slurm_stats.max_disk_write_bytes = stats.max_disk_write_bytes; - slurm_stats.ave_cpu_seconds = stats.ave_cpu_seconds; - slurm_stats.node_list = stats.node_list; - info!( - "Sacct stats collected workflow_id={} job_id={} step={}", - workflow_id, self.job_id, step_name - ); - self.slurm_stats = Some(slurm_stats); - } - } - let final_rc = self.return_code.unwrap_or(return_code); let final_status = format!("{:?}", self.status).to_lowercase(); info!( @@ -894,18 +830,29 @@ impl AsyncCliCommand { } /// Slurm accounting stats collected from `sacct` after step completion. -struct SacctStats { - max_rss_bytes: Option, - max_vm_size_bytes: Option, - max_disk_read_bytes: Option, - max_disk_write_bytes: Option, - ave_cpu_seconds: Option, - node_list: Option, +#[derive(Clone, Debug, PartialEq)] +pub(crate) struct SacctStats { + pub(crate) max_rss_bytes: Option, + pub(crate) max_vm_size_bytes: Option, + pub(crate) max_disk_read_bytes: Option, + pub(crate) max_disk_write_bytes: Option, + pub(crate) ave_cpu_seconds: Option, + pub(crate) node_list: Option, /// Slurm step state (e.g. "COMPLETED", "OUT_OF_MEMORY", "TIMEOUT", "FAILED"). /// When Slurm's cgroup OOM-kills a step, the ExitCode is often `0:0` and `srun` /// exits with code 1, losing the OOM signal. The State field is the reliable way /// to detect OOM kills and timeouts. - state: Option, + pub(crate) state: Option, +} + +fn sacct_stats_have_useful_data(stats: &SacctStats) -> bool { + stats.max_rss_bytes.is_some() + || stats.max_vm_size_bytes.is_some() + || stats.max_disk_read_bytes.is_some() + || stats.max_disk_write_bytes.is_some() + || stats.ave_cpu_seconds.is_some() + || stats.node_list.is_some() + || stats.state.is_some() } /// Convert a `std::process::ExitStatus` to a return code. @@ -941,9 +888,17 @@ fn exit_status_to_return_code(status: &std::process::ExitStatus) -> i64 { /// Returns `None` if sacct is unavailable, returns no data for the step after all retries, or /// the output cannot be parsed. This is a best-effort call — failures are logged at debug level /// and do not affect job result reporting. -fn collect_sacct_stats(slurm_job_id: &str, step_name: &str) -> Option { +pub(crate) fn collect_sacct_stats_for_steps( + slurm_job_id: &str, + expected_step_names: &HashSet, +) -> HashMap { const MAX_SACCT_ATTEMPTS: u32 = 6; const SACCT_RETRY_DELAY: std::time::Duration = std::time::Duration::from_secs(5); + let mut collected = HashMap::new(); + + if expected_step_names.is_empty() { + return collected; + } // Allow tests to substitute a fake sacct binary via TORC_FAKE_SACCT. let sacct_binary = std::env::var("TORC_FAKE_SACCT").unwrap_or_else(|_| "sacct".to_string()); @@ -969,8 +924,8 @@ fn collect_sacct_stats(slurm_job_id: &str, step_name: &str) -> Option Option o, Err(e) => { debug!( - "sacct not available or failed for step {}: {}", - step_name, e + "sacct not available or failed for allocation {}: {}", + slurm_job_id, e ); - return None; + return collected; } }; @@ -989,8 +944,8 @@ fn collect_sacct_stats(slurm_job_id: &str, step_name: &str) -> Option Option = l.split('|').collect(); - fields.len() >= 2 && fields[0].trim() == step_name - }); - - match line { - Some(line) => { - let stats = parse_sacct_line(line, step_name); - // The step row can appear with node_list populated but MaxRSS/AveCPU still - // empty while slurmdbd is committing the accounting data asynchronously. - // Retry if we have no useful data yet, rather than returning empty stats. - let has_data = stats - .as_ref() - .is_some_and(|s| s.max_rss_bytes.is_some() || s.ave_cpu_seconds.is_some()); - if has_data || attempt == MAX_SACCT_ATTEMPTS { - return stats; - } - debug!( - "sacct row for step {} found but data fields are empty (attempt {}/{}), retrying", - step_name, attempt, MAX_SACCT_ATTEMPTS - ); + + let mut found_this_attempt = HashMap::new(); + for line in stdout.lines() { + let fields: Vec<&str> = line.split('|').collect(); + if fields.len() < 2 { + continue; } - None => { - if attempt < MAX_SACCT_ATTEMPTS { - debug!( - "sacct has no record for step {} yet (attempt {}/{}), retrying", - step_name, attempt, MAX_SACCT_ATTEMPTS - ); - } else { - warn!( - "sacct has no record for step {} after {} attempts; \ - raw sacct output: {:?}", - step_name, - MAX_SACCT_ATTEMPTS, - stdout.as_ref() - ); - } + let step_name = fields[0].trim(); + if !expected_step_names.contains(step_name) { + continue; + } + if let Some(stats) = parse_sacct_line(line, step_name) + && (sacct_stats_have_useful_data(&stats) || attempt == MAX_SACCT_ATTEMPTS) + { + found_this_attempt.insert(step_name.to_string(), stats); + } + } + + collected.extend(found_this_attempt); + + let missing_steps: Vec<&String> = expected_step_names + .iter() + .filter(|step_name| !collected.contains_key(*step_name)) + .collect(); + if missing_steps.is_empty() { + return collected; + } + + if attempt < MAX_SACCT_ATTEMPTS { + debug!( + "sacct missing {} step(s) for allocation {} (attempt {}/{}): {:?}", + missing_steps.len(), + slurm_job_id, + attempt, + MAX_SACCT_ATTEMPTS, + missing_steps + ); + } else { + warn!( + "sacct missing {} step(s) for allocation {} after {} attempts: {:?}", + missing_steps.len(), + slurm_job_id, + MAX_SACCT_ATTEMPTS, + missing_steps + ); + if !collected.is_empty() { + return collected; } } } - None + collected } /// Parse a single pipe-separated `sacct` output line into a [`SacctStats`]. @@ -1131,6 +1092,15 @@ impl Drop for AsyncCliCommand { #[cfg(test)] mod tests { use super::*; + use serial_test::serial; + use std::path::PathBuf; + + fn fake_sacct_path() -> String { + PathBuf::from(env!("CARGO_MANIFEST_DIR")) + .join("tests/scripts/fake_sacct.sh") + .to_string_lossy() + .to_string() + } #[test] fn test_parse_sacct_line_with_state() { @@ -1193,4 +1163,47 @@ mod tests { let stats = parse_sacct_line(line, "step1"); assert!(stats.is_none()); } + + #[test] + #[serial] + fn test_collect_sacct_stats_for_steps_batches_multiple_matches() { + unsafe { + std::env::set_var("TORC_FAKE_SACCT", fake_sacct_path()); + } + let expected = HashSet::from([ + "wf1_j1_r1_a1".to_string(), + "wf1_j2_r1_a1".to_string(), + "missing_step".to_string(), + ]); + + let stats = collect_sacct_stats_for_steps("12345", &expected); + + assert_eq!(stats.len(), 2); + assert!(stats.contains_key("wf1_j1_r1_a1")); + assert!(stats.contains_key("wf1_j2_r1_a1")); + assert!(!stats.contains_key("missing_step")); + + unsafe { + std::env::remove_var("TORC_FAKE_SACCT"); + } + } + + #[test] + #[serial] + fn test_collect_sacct_stats_for_steps_returns_empty_on_failure() { + unsafe { + std::env::set_var("TORC_FAKE_SACCT", fake_sacct_path()); + std::env::set_var("TORC_FAKE_SACCT_FAIL", "1"); + } + let expected = HashSet::from(["wf1_j1_r1_a1".to_string()]); + + let stats = collect_sacct_stats_for_steps("12345", &expected); + + assert!(stats.is_empty()); + + unsafe { + std::env::remove_var("TORC_FAKE_SACCT_FAIL"); + std::env::remove_var("TORC_FAKE_SACCT"); + } + } } diff --git a/src/client/job_runner.rs b/src/client/job_runner.rs index c3ef8668..53a8a834 100644 --- a/src/client/job_runner.rs +++ b/src/client/job_runner.rs @@ -27,17 +27,19 @@ use chrono::{DateTime, Utc}; use log::{self, debug, error, info, warn}; -use std::collections::{HashMap, VecDeque}; +use std::collections::{HashMap, HashSet, VecDeque}; use std::fs; use std::path::{Path, PathBuf}; use std::sync::Arc; use std::sync::atomic::{AtomicBool, Ordering}; +use std::sync::mpsc::{self, Sender}; use std::thread; +use std::thread::JoinHandle; use std::time::{Duration, Instant}; use crate::client::apis; use crate::client::apis::configuration::Configuration; -use crate::client::async_cli_command::AsyncCliCommand; +use crate::client::async_cli_command::{AsyncCliCommand, collect_sacct_stats_for_steps}; use crate::client::resource_correction::format_duration_iso8601; use crate::client::resource_monitor::{ResourceMonitor, ResourceMonitorConfig}; use crate::client::utils; @@ -189,6 +191,22 @@ pub struct WorkerResult { pub had_terminations: bool, } +#[derive(Debug, Clone)] +struct PendingSlurmStats { + workflow_id: i64, + job_id: i64, + run_id: i64, + attempt_id: i64, + slurm_job_id: String, + step_name: String, +} + +#[derive(Debug)] +struct SacctBatchRequest { + slurm_job_id: String, + steps: Vec, +} + /// Outcome of attempting to recover a failed job via failure handler. #[derive(Debug, Clone, PartialEq)] pub enum RecoveryOutcome { @@ -274,6 +292,8 @@ pub struct JobRunner { execution_config: ExecutionConfig, rules: ComputeNodeRules, resource_monitor: Option, + sacct_batch_tx: Option>, + sacct_worker_handle: Option>, /// Flag set when SIGTERM is received. Shared with signal handler. termination_requested: Arc, /// Monotonic timestamp of when a job was last claimed. Used for idle timeout. @@ -511,6 +531,14 @@ impl JobRunner { None }; + let (sacct_batch_tx, sacct_worker_handle) = + if execution_config.effective_mode() == ExecutionMode::Slurm { + let (tx, handle) = Self::spawn_sacct_worker(config.clone()); + (Some(tx), Some(handle)) + } else { + (None, None) + }; + JobRunner { config, torc_config, @@ -540,6 +568,8 @@ impl JobRunner { execution_config, rules, resource_monitor, + sacct_batch_tx, + sacct_worker_handle, termination_requested: Arc::new(AtomicBool::new(false)), last_job_claimed_time: None, had_failures: false, @@ -577,6 +607,61 @@ impl JobRunner { }) } + fn spawn_sacct_worker(config: Configuration) -> (Sender, JoinHandle<()>) { + let (tx, rx) = mpsc::channel::(); + let handle = thread::spawn(move || { + while let Ok(request) = rx.recv() { + let expected_step_names: HashSet = request + .steps + .iter() + .map(|step| step.step_name.clone()) + .collect(); + let stats_by_step = + collect_sacct_stats_for_steps(&request.slurm_job_id, &expected_step_names); + + for step in request.steps { + let Some(stats) = stats_by_step.get(&step.step_name) else { + debug!( + "No sacct stats available yet for workflow_id={} job_id={} step={}", + step.workflow_id, step.job_id, step.step_name + ); + continue; + }; + + let mut slurm_stats = SlurmStatsModel::new( + step.workflow_id, + step.job_id, + step.run_id, + step.attempt_id, + ); + slurm_stats.slurm_job_id = Some(step.slurm_job_id.clone()); + slurm_stats.max_rss_bytes = stats.max_rss_bytes; + slurm_stats.max_vm_size_bytes = stats.max_vm_size_bytes; + slurm_stats.max_disk_read_bytes = stats.max_disk_read_bytes; + slurm_stats.max_disk_write_bytes = stats.max_disk_write_bytes; + slurm_stats.ave_cpu_seconds = stats.ave_cpu_seconds; + slurm_stats.node_list = stats.node_list.clone(); + + match apis::slurm_stats_api::create_slurm_stats(&config, slurm_stats) { + Ok(_) => { + info!( + "Stored slurm_stats workflow_id={} job_id={} step={}", + step.workflow_id, step.job_id, step.step_name + ); + } + Err(e) => { + warn!( + "Failed to store slurm_stats workflow_id={} job_id={} step={} error={}", + step.workflow_id, step.job_id, step.step_name, e + ); + } + } + } + } + }); + (tx, handle) + } + /// Atomically claim a workflow action for execution. /// /// This is a convenience method that wraps [`utils::claim_action`] with @@ -836,6 +921,14 @@ impl JobRunner { monitor.shutdown(); } + // Stop the background sacct worker after all queued accounting work is flushed. + self.sacct_batch_tx.take(); + if let Some(handle) = self.sacct_worker_handle.take() + && let Err(e) = handle.join() + { + error!("Failed to join sacct worker thread: {:?}", e); + } + // Deactivate compute node and set duration self.deactivate_compute_node(); @@ -1106,7 +1199,6 @@ impl JobRunner { /// Check the status of running jobs and remove completed ones. fn check_job_status(&mut self) { - let mut completed_jobs = Vec::new(); let mut job_results = Vec::new(); // First pass: check status and collect completed jobs @@ -1114,8 +1206,6 @@ impl JobRunner { match async_job.check_status() { Ok(()) => { if async_job.is_complete { - completed_jobs.push(*job_id); - let attempt_id = async_job.job.attempt_id.unwrap_or(1); let result = async_job.get_result( self.run_id, @@ -1126,8 +1216,19 @@ impl JobRunner { // Extract output_file_ids for validation let output_file_ids = async_job.job.output_file_ids.clone(); + let pending_slurm_stats = async_job + .slurm_job_id() + .zip(async_job.step_name()) + .map(|(slurm_job_id, step_name)| PendingSlurmStats { + workflow_id: self.workflow_id, + job_id: *job_id, + run_id: self.run_id, + attempt_id, + slurm_job_id: slurm_job_id.to_string(), + step_name: step_name.to_string(), + }); - job_results.push((*job_id, result, output_file_ids)); + job_results.push((*job_id, result, output_file_ids, pending_slurm_stats)); } } Err(e) => { @@ -1137,7 +1238,8 @@ impl JobRunner { } // Second pass: validate output files and complete jobs - for (job_id, mut result, output_file_ids) in job_results { + let mut pending_sacct_stats = Vec::new(); + for (job_id, mut result, output_file_ids, pending_slurm_stats) in job_results { // Validate output files if job completed successfully if result.return_code == 0 && let Err(e) = self.validate_and_update_output_files(job_id, &output_file_ids) @@ -1147,7 +1249,44 @@ impl JobRunner { result.status = JobStatus::Failed; } - self.handle_job_completion(job_id, result); + if self.handle_job_completion(job_id, result) + && let Some(step) = pending_slurm_stats + { + pending_sacct_stats.push(step); + } + } + + if !pending_sacct_stats.is_empty() { + self.enqueue_sacct_stats(pending_sacct_stats); + } + } + + fn enqueue_sacct_stats(&self, steps: Vec) { + let Some(tx) = &self.sacct_batch_tx else { + return; + }; + + let mut by_allocation: HashMap> = HashMap::new(); + for step in steps { + by_allocation + .entry(step.slurm_job_id.clone()) + .or_default() + .push(step); + } + + for (slurm_job_id, steps) in by_allocation { + let step_count = steps.len(); + if let Err(e) = tx.send(SacctBatchRequest { + slurm_job_id, + steps, + }) { + warn!("Failed to enqueue sacct batch request: {}", e); + } else { + debug!( + "Enqueued async sacct batch for {} completed step(s)", + step_count + ); + } } } @@ -1447,20 +1586,7 @@ impl JobRunner { } } - fn handle_job_completion(&mut self, job_id: i64, result: ResultModel) { - // Take sacct stats now, before the result is sent to the server, so we can backfill - // resource fields. For srun-wrapped jobs the sysinfo monitor only sees the srun process - // (negligible overhead), so sacct provides the authoritative peak memory and CPU data. - let slurm_stats = self - .running_jobs - .get_mut(&job_id) - .and_then(|j| j.take_slurm_stats()); - - let mut final_result = result; - if let Some(ref stats) = slurm_stats { - backfill_sacct_into_result(&mut final_result, stats); - } - + fn handle_job_completion(&mut self, job_id: i64, mut final_result: ResultModel) -> bool { // Get job info before removing from running_jobs let job_info = self.running_jobs.get(&job_id).map(|cmd| { ( @@ -1500,7 +1626,7 @@ impl JobRunner { self.last_job_claimed_time = Some(Instant::now()); self.running_jobs.remove(&job_id); self.job_resources.remove(&job_id); - return; + return false; } RecoveryOutcome::NoHandler | RecoveryOutcome::NoMatchingRule => { // Check if workflow has use_pending_failed enabled @@ -1553,25 +1679,6 @@ impl JobRunner { "Job completed workflow_id={} job_id={} run_id={} status={}", self.workflow_id, job_id, final_result.run_id, status_str ); - // Store Slurm accounting stats if collected (best-effort, non-blocking). - // slurm_stats was taken at the top of handle_job_completion so we could backfill - // resource fields into the result before reporting to the server. - if let Some(stats) = slurm_stats { - match apis::slurm_stats_api::create_slurm_stats(&self.config, stats) { - Ok(_) => { - info!( - "Stored slurm_stats workflow_id={} job_id={}", - self.workflow_id, job_id - ); - } - Err(e) => { - warn!( - "Failed to store slurm_stats workflow_id={} job_id={}: {}", - self.workflow_id, job_id, e - ); - } - } - } if let Some(job_rr) = self.job_resources.get(&job_id).cloned() { self.increment_node_resources(job_id, &job_rr); self.increment_resources(&job_rr); @@ -1601,6 +1708,7 @@ impl JobRunner { self.running_jobs.remove(&job_id); self.job_resources.remove(&job_id); self.release_gpu_devices(job_id); + true } /// Delete stdio files for a completed job. @@ -2755,6 +2863,7 @@ pub fn cleanup_job_stdio_files(stdout_path: Option<&str>, stderr_path: Option<&s /// from the sstat time-series if TimeSeries monitoring was configured. /// This ensures that even when sstat time-series monitoring missed a spike, the sacct /// post-mortem data fills in accurate resource usage. +#[cfg(test)] fn backfill_sacct_into_result(result: &mut ResultModel, stats: &SlurmStatsModel) { if let Some(max_rss) = stats.max_rss_bytes { // sacct MaxRSS is the job-lifetime peak memory. Take the max against any From 358a1b5b3918c57550bb1fce7716bf447897a693 Mon Sep 17 00:00:00 2001 From: Nathan Keilbart Date: Wed, 8 Apr 2026 16:52:16 -0700 Subject: [PATCH 2/2] Document async Slurm sacct collection --- docs/src/core/reference/resource-monitoring.md | 10 ++++++---- docs/src/specialized/design/srun-monitoring.md | 9 +++++---- docs/src/specialized/hpc/slurm.md | 9 ++++++--- 3 files changed, 17 insertions(+), 11 deletions(-) diff --git a/docs/src/core/reference/resource-monitoring.md b/docs/src/core/reference/resource-monitoring.md index ea2db192..ec28ea13 100644 --- a/docs/src/core/reference/resource-monitoring.md +++ b/docs/src/core/reference/resource-monitoring.md @@ -227,9 +227,10 @@ metrics. ## Slurm Accounting Stats -When running inside a Slurm allocation, Torc calls `sacct` after each job step completes and stores -the results in the `slurm_stats` table. These complement the sysinfo-based metrics above with -Slurm-native cgroup measurements. +When running inside a Slurm allocation, Torc queues completed job steps for asynchronous `sacct` +collection and stores the results in the `slurm_stats` table. Lookups are batched by allocation so +job completion does not wait on Slurm accounting latency. These complement the sysinfo-based metrics +above with Slurm-native cgroup measurements. ### Fields @@ -245,7 +246,8 @@ Slurm-native cgroup measurements. Additional identifying fields stored per record: `workflow_id`, `job_id`, `run_id`, `attempt_id`, `slurm_job_id`. -Fields are `null` when: +Because `sacct` lookup is asynchronous, these rows may appear shortly after job completion instead +of immediately. Fields are `null` when: - The job ran locally (no `SLURM_JOB_ID` in the environment) - `sacct` is not available on the node diff --git a/docs/src/specialized/design/srun-monitoring.md b/docs/src/specialized/design/srun-monitoring.md index 4454fa06..18fd201d 100644 --- a/docs/src/specialized/design/srun-monitoring.md +++ b/docs/src/specialized/design/srun-monitoring.md @@ -234,15 +234,16 @@ sstat calls for completed steps return non-zero exit codes. These are logged at ## sacct Collection -**Module:** `src/client/async_cli_command.rs` +**Modules:** `src/client/job_runner.rs`, `src/client/async_cli_command.rs` -After a job step exits, `collect_sacct_stats()` retrieves the final Slurm accounting record. This is -a blocking call that runs on the job runner thread. +After a job step exits, the job runner records completion first and enqueues the step for a +background `sacct` worker. That worker batches lookups by Slurm allocation and calls +`collect_sacct_stats_for_steps()` off the completion path. ### Retry Logic The Slurm accounting daemon (`slurmdbd`) often has a delay between step completion and record -availability. The collector retries up to 6 times with 5-second delays: +availability. The background collector retries up to 6 times with 5-second delays: ```mermaid flowchart TD diff --git a/docs/src/specialized/hpc/slurm.md b/docs/src/specialized/hpc/slurm.md index 1aeabb35..901aa411 100644 --- a/docs/src/specialized/hpc/slurm.md +++ b/docs/src/specialized/hpc/slurm.md @@ -397,8 +397,9 @@ if enabled. ### Slurm Accounting Stats -After each job step exits, Torc calls `sacct` once to collect the following Slurm-native accounting -fields and stores them in the `slurm_stats` table: +After each job step exits, Torc records completion immediately and queues Slurm accounting +collection on a background worker. That worker batches `sacct` lookups by allocation and stores the +following Slurm-native accounting fields in the `slurm_stats` table: | Field | sacct source | Description | | ---------------------- | -------------- | ------------------------------------- | @@ -412,7 +413,9 @@ fields and stores them in the `slurm_stats` table: These fields complement the existing sysinfo-based metrics (`peak_memory_bytes`, `peak_cpu_percent`, etc.) and are available via `torc slurm stats `. -`sacct` data is collected on a best-effort basis. Fields are `null` when: +`sacct` data is collected on a best-effort basis. Because lookup runs asynchronously, stats may +appear shortly after job completion rather than inline with the completion path. Fields are `null` +when: - The job ran locally (no `SLURM_JOB_ID`) - `sacct` is not available on the node