diff --git a/docs/src/core/reference/resource-monitoring.md b/docs/src/core/reference/resource-monitoring.md index ea2db192..ec28ea13 100644 --- a/docs/src/core/reference/resource-monitoring.md +++ b/docs/src/core/reference/resource-monitoring.md @@ -227,9 +227,10 @@ metrics. ## Slurm Accounting Stats -When running inside a Slurm allocation, Torc calls `sacct` after each job step completes and stores -the results in the `slurm_stats` table. These complement the sysinfo-based metrics above with -Slurm-native cgroup measurements. +When running inside a Slurm allocation, Torc queues completed job steps for asynchronous `sacct` +collection and stores the results in the `slurm_stats` table. Lookups are batched by allocation so +job completion does not wait on Slurm accounting latency. These complement the sysinfo-based metrics +above with Slurm-native cgroup measurements. ### Fields @@ -245,7 +246,8 @@ Slurm-native cgroup measurements. Additional identifying fields stored per record: `workflow_id`, `job_id`, `run_id`, `attempt_id`, `slurm_job_id`. -Fields are `null` when: +Because `sacct` lookup is asynchronous, these rows may appear shortly after job completion instead +of immediately. Fields are `null` when: - The job ran locally (no `SLURM_JOB_ID` in the environment) - `sacct` is not available on the node diff --git a/docs/src/specialized/design/srun-monitoring.md b/docs/src/specialized/design/srun-monitoring.md index 4454fa06..18fd201d 100644 --- a/docs/src/specialized/design/srun-monitoring.md +++ b/docs/src/specialized/design/srun-monitoring.md @@ -234,15 +234,16 @@ sstat calls for completed steps return non-zero exit codes. These are logged at ## sacct Collection -**Module:** `src/client/async_cli_command.rs` +**Modules:** `src/client/job_runner.rs`, `src/client/async_cli_command.rs` -After a job step exits, `collect_sacct_stats()` retrieves the final Slurm accounting record. This is -a blocking call that runs on the job runner thread. +After a job step exits, the job runner records completion first and enqueues the step for a +background `sacct` worker. That worker batches lookups by Slurm allocation and calls +`collect_sacct_stats_for_steps()` off the completion path. ### Retry Logic The Slurm accounting daemon (`slurmdbd`) often has a delay between step completion and record -availability. The collector retries up to 6 times with 5-second delays: +availability. The background collector retries up to 6 times with 5-second delays: ```mermaid flowchart TD diff --git a/docs/src/specialized/hpc/slurm.md b/docs/src/specialized/hpc/slurm.md index 1aeabb35..901aa411 100644 --- a/docs/src/specialized/hpc/slurm.md +++ b/docs/src/specialized/hpc/slurm.md @@ -397,8 +397,9 @@ if enabled. ### Slurm Accounting Stats -After each job step exits, Torc calls `sacct` once to collect the following Slurm-native accounting -fields and stores them in the `slurm_stats` table: +After each job step exits, Torc records completion immediately and queues Slurm accounting +collection on a background worker. That worker batches `sacct` lookups by allocation and stores the +following Slurm-native accounting fields in the `slurm_stats` table: | Field | sacct source | Description | | ---------------------- | -------------- | ------------------------------------- | @@ -412,7 +413,9 @@ fields and stores them in the `slurm_stats` table: These fields complement the existing sysinfo-based metrics (`peak_memory_bytes`, `peak_cpu_percent`, etc.) and are available via `torc slurm stats `. -`sacct` data is collected on a best-effort basis. Fields are `null` when: +`sacct` data is collected on a best-effort basis. Because lookup runs asynchronously, stats may +appear shortly after job completion rather than inline with the completion path. Fields are `null` +when: - The job ran locally (no `SLURM_JOB_ID`) - `sacct` is not available on the node diff --git a/src/client/async_cli_command.rs b/src/client/async_cli_command.rs index b26dbd97..f0ba4dab 100644 --- a/src/client/async_cli_command.rs +++ b/src/client/async_cli_command.rs @@ -29,9 +29,10 @@ use crate::client::resource_monitor::ResourceMonitor; use crate::client::slurm_utils::{parse_slurm_cpu_time, parse_slurm_memory}; use crate::client::workflow_spec::{ExecutionMode, StdioMode}; use crate::memory_utils::{memory_string_to_bytes, memory_string_to_mb}; -use crate::models::{JobModel, JobStatus, ResourceRequirementsModel, ResultModel, SlurmStatsModel}; +use crate::models::{JobModel, JobStatus, ResourceRequirementsModel, ResultModel}; use chrono::{DateTime, Utc}; use log::{self, debug, error, info, warn}; +use std::collections::{HashMap, HashSet}; use std::fs::File; use std::path::Path; use std::process::{Child, Command, Stdio}; @@ -148,10 +149,10 @@ pub struct AsyncCliCommand { workflow_id: Option, run_id: Option, attempt_id: Option, + /// Slurm allocation ID when running inside an allocation. + slurm_job_id: Option, /// Slurm step name set when running inside an allocation (for sacct lookup). step_name: Option, - /// Slurm accounting stats collected via sacct after step completion. - slurm_stats: Option, handle: Option, pid: Option, pub is_running: bool, @@ -177,8 +178,8 @@ impl AsyncCliCommand { workflow_id: None, run_id: None, attempt_id: None, + slurm_job_id: None, step_name: None, - slurm_stats: None, handle: None, pid: None, is_running: false, @@ -199,6 +200,11 @@ impl AsyncCliCommand { self.step_name.as_deref() } + /// Returns the Slurm allocation ID, if running inside an allocation. + pub fn slurm_job_id(&self) -> Option<&str> { + self.slurm_job_id.as_deref() + } + #[allow(clippy::too_many_arguments)] pub fn start( &mut self, @@ -355,6 +361,7 @@ impl AsyncCliCommand { self.workflow_id = Some(workflow_id); self.run_id = Some(run_id); self.attempt_id = Some(attempt_id); + self.slurm_job_id = slurm_job_id; self.is_running = true; self.start_time = Utc::now(); self.status = JobStatus::Running; @@ -506,12 +513,6 @@ impl AsyncCliCommand { result } - /// Returns the Slurm accounting stats collected for this job step, if any. - /// Only populated when the job ran inside a Slurm allocation and sacct succeeded. - pub fn take_slurm_stats(&mut self) -> Option { - self.slurm_stats.take() - } - /// Immediately kills the job process using SIGKILL. /// /// This method sends SIGKILL to the process, which cannot be caught or ignored. @@ -739,71 +740,6 @@ impl AsyncCliCommand { self.return_code = Some(return_code); self.handle = None; - // Collect Slurm accounting stats via sacct when running inside an allocation. - // Note: collect_sacct_stats is synchronous and may delay this polling cycle: it sleeps - // 5 seconds between retry attempts (up to 6 attempts, worst-case ~25 seconds) when the - // Slurm accounting daemon hasn't written the step record yet. - if let (Ok(slurm_job_id), Some(step_name)) = - (std::env::var("SLURM_JOB_ID"), self.step_name.as_deref()) - { - info!( - "Collecting sacct stats for workflow_id={} job_id={} step={}", - self.workflow_id.unwrap_or(0), - self.job_id, - step_name - ); - if let Some(stats) = collect_sacct_stats(&slurm_job_id, step_name) - && let (Some(workflow_id), Some(run_id), Some(attempt_id)) = - (self.workflow_id, self.run_id, self.attempt_id) - { - // Override the return code based on sacct State. - // When Slurm's cgroup OOM-kills a step, srun exits with code 1 - // and sacct ExitCode is 0:125 — neither produces the conventional - // 137 (128+SIGKILL) that recovery heuristics check. The sacct State - // field reliably reports OUT_OF_MEMORY / TIMEOUT. - // - // TIMEOUT is only overridden when the process did not exit cleanly - // (return_code != 0). When the process handled SIGTERM (from - // --signal) and exited 0, we keep the successful result even though - // sacct may report State=TIMEOUT for the step. - // - // TIMEOUT maps to Terminated (system-initiated kill due to walltime) - // rather than Failed (job error), matching the old behaviour where - // the runner would send SIGTERM before the allocation expired. - if let Some(ref state) = stats.state { - let override_rc = match state.as_str() { - "OUT_OF_MEMORY" => Some((137i64, JobStatus::Failed)), - "TIMEOUT" if return_code != 0 => Some((152i64, JobStatus::Terminated)), - _ => None, - }; - if let Some((sacct_rc, sacct_status)) = override_rc { - info!( - "Overriding srun return_code {} with {} (sacct State={}) for \ - workflow_id={} job_id={} step={}", - return_code, sacct_rc, state, workflow_id, self.job_id, step_name - ); - self.return_code = Some(sacct_rc); - self.status = sacct_status; - } - } - - let mut slurm_stats = - SlurmStatsModel::new(workflow_id, self.job_id, run_id, attempt_id); - slurm_stats.slurm_job_id = Some(slurm_job_id); - slurm_stats.max_rss_bytes = stats.max_rss_bytes; - slurm_stats.max_vm_size_bytes = stats.max_vm_size_bytes; - slurm_stats.max_disk_read_bytes = stats.max_disk_read_bytes; - slurm_stats.max_disk_write_bytes = stats.max_disk_write_bytes; - slurm_stats.ave_cpu_seconds = stats.ave_cpu_seconds; - slurm_stats.node_list = stats.node_list; - info!( - "Sacct stats collected workflow_id={} job_id={} step={}", - workflow_id, self.job_id, step_name - ); - self.slurm_stats = Some(slurm_stats); - } - } - let final_rc = self.return_code.unwrap_or(return_code); let final_status = format!("{:?}", self.status).to_lowercase(); info!( @@ -894,18 +830,29 @@ impl AsyncCliCommand { } /// Slurm accounting stats collected from `sacct` after step completion. -struct SacctStats { - max_rss_bytes: Option, - max_vm_size_bytes: Option, - max_disk_read_bytes: Option, - max_disk_write_bytes: Option, - ave_cpu_seconds: Option, - node_list: Option, +#[derive(Clone, Debug, PartialEq)] +pub(crate) struct SacctStats { + pub(crate) max_rss_bytes: Option, + pub(crate) max_vm_size_bytes: Option, + pub(crate) max_disk_read_bytes: Option, + pub(crate) max_disk_write_bytes: Option, + pub(crate) ave_cpu_seconds: Option, + pub(crate) node_list: Option, /// Slurm step state (e.g. "COMPLETED", "OUT_OF_MEMORY", "TIMEOUT", "FAILED"). /// When Slurm's cgroup OOM-kills a step, the ExitCode is often `0:0` and `srun` /// exits with code 1, losing the OOM signal. The State field is the reliable way /// to detect OOM kills and timeouts. - state: Option, + pub(crate) state: Option, +} + +fn sacct_stats_have_useful_data(stats: &SacctStats) -> bool { + stats.max_rss_bytes.is_some() + || stats.max_vm_size_bytes.is_some() + || stats.max_disk_read_bytes.is_some() + || stats.max_disk_write_bytes.is_some() + || stats.ave_cpu_seconds.is_some() + || stats.node_list.is_some() + || stats.state.is_some() } /// Convert a `std::process::ExitStatus` to a return code. @@ -941,9 +888,17 @@ fn exit_status_to_return_code(status: &std::process::ExitStatus) -> i64 { /// Returns `None` if sacct is unavailable, returns no data for the step after all retries, or /// the output cannot be parsed. This is a best-effort call — failures are logged at debug level /// and do not affect job result reporting. -fn collect_sacct_stats(slurm_job_id: &str, step_name: &str) -> Option { +pub(crate) fn collect_sacct_stats_for_steps( + slurm_job_id: &str, + expected_step_names: &HashSet, +) -> HashMap { const MAX_SACCT_ATTEMPTS: u32 = 6; const SACCT_RETRY_DELAY: std::time::Duration = std::time::Duration::from_secs(5); + let mut collected = HashMap::new(); + + if expected_step_names.is_empty() { + return collected; + } // Allow tests to substitute a fake sacct binary via TORC_FAKE_SACCT. let sacct_binary = std::env::var("TORC_FAKE_SACCT").unwrap_or_else(|_| "sacct".to_string()); @@ -969,8 +924,8 @@ fn collect_sacct_stats(slurm_job_id: &str, step_name: &str) -> Option Option o, Err(e) => { debug!( - "sacct not available or failed for step {}: {}", - step_name, e + "sacct not available or failed for allocation {}: {}", + slurm_job_id, e ); - return None; + return collected; } }; @@ -989,8 +944,8 @@ fn collect_sacct_stats(slurm_job_id: &str, step_name: &str) -> Option Option = l.split('|').collect(); - fields.len() >= 2 && fields[0].trim() == step_name - }); - - match line { - Some(line) => { - let stats = parse_sacct_line(line, step_name); - // The step row can appear with node_list populated but MaxRSS/AveCPU still - // empty while slurmdbd is committing the accounting data asynchronously. - // Retry if we have no useful data yet, rather than returning empty stats. - let has_data = stats - .as_ref() - .is_some_and(|s| s.max_rss_bytes.is_some() || s.ave_cpu_seconds.is_some()); - if has_data || attempt == MAX_SACCT_ATTEMPTS { - return stats; - } - debug!( - "sacct row for step {} found but data fields are empty (attempt {}/{}), retrying", - step_name, attempt, MAX_SACCT_ATTEMPTS - ); + + let mut found_this_attempt = HashMap::new(); + for line in stdout.lines() { + let fields: Vec<&str> = line.split('|').collect(); + if fields.len() < 2 { + continue; } - None => { - if attempt < MAX_SACCT_ATTEMPTS { - debug!( - "sacct has no record for step {} yet (attempt {}/{}), retrying", - step_name, attempt, MAX_SACCT_ATTEMPTS - ); - } else { - warn!( - "sacct has no record for step {} after {} attempts; \ - raw sacct output: {:?}", - step_name, - MAX_SACCT_ATTEMPTS, - stdout.as_ref() - ); - } + let step_name = fields[0].trim(); + if !expected_step_names.contains(step_name) { + continue; + } + if let Some(stats) = parse_sacct_line(line, step_name) + && (sacct_stats_have_useful_data(&stats) || attempt == MAX_SACCT_ATTEMPTS) + { + found_this_attempt.insert(step_name.to_string(), stats); + } + } + + collected.extend(found_this_attempt); + + let missing_steps: Vec<&String> = expected_step_names + .iter() + .filter(|step_name| !collected.contains_key(*step_name)) + .collect(); + if missing_steps.is_empty() { + return collected; + } + + if attempt < MAX_SACCT_ATTEMPTS { + debug!( + "sacct missing {} step(s) for allocation {} (attempt {}/{}): {:?}", + missing_steps.len(), + slurm_job_id, + attempt, + MAX_SACCT_ATTEMPTS, + missing_steps + ); + } else { + warn!( + "sacct missing {} step(s) for allocation {} after {} attempts: {:?}", + missing_steps.len(), + slurm_job_id, + MAX_SACCT_ATTEMPTS, + missing_steps + ); + if !collected.is_empty() { + return collected; } } } - None + collected } /// Parse a single pipe-separated `sacct` output line into a [`SacctStats`]. @@ -1131,6 +1092,15 @@ impl Drop for AsyncCliCommand { #[cfg(test)] mod tests { use super::*; + use serial_test::serial; + use std::path::PathBuf; + + fn fake_sacct_path() -> String { + PathBuf::from(env!("CARGO_MANIFEST_DIR")) + .join("tests/scripts/fake_sacct.sh") + .to_string_lossy() + .to_string() + } #[test] fn test_parse_sacct_line_with_state() { @@ -1193,4 +1163,47 @@ mod tests { let stats = parse_sacct_line(line, "step1"); assert!(stats.is_none()); } + + #[test] + #[serial] + fn test_collect_sacct_stats_for_steps_batches_multiple_matches() { + unsafe { + std::env::set_var("TORC_FAKE_SACCT", fake_sacct_path()); + } + let expected = HashSet::from([ + "wf1_j1_r1_a1".to_string(), + "wf1_j2_r1_a1".to_string(), + "missing_step".to_string(), + ]); + + let stats = collect_sacct_stats_for_steps("12345", &expected); + + assert_eq!(stats.len(), 2); + assert!(stats.contains_key("wf1_j1_r1_a1")); + assert!(stats.contains_key("wf1_j2_r1_a1")); + assert!(!stats.contains_key("missing_step")); + + unsafe { + std::env::remove_var("TORC_FAKE_SACCT"); + } + } + + #[test] + #[serial] + fn test_collect_sacct_stats_for_steps_returns_empty_on_failure() { + unsafe { + std::env::set_var("TORC_FAKE_SACCT", fake_sacct_path()); + std::env::set_var("TORC_FAKE_SACCT_FAIL", "1"); + } + let expected = HashSet::from(["wf1_j1_r1_a1".to_string()]); + + let stats = collect_sacct_stats_for_steps("12345", &expected); + + assert!(stats.is_empty()); + + unsafe { + std::env::remove_var("TORC_FAKE_SACCT_FAIL"); + std::env::remove_var("TORC_FAKE_SACCT"); + } + } } diff --git a/src/client/job_runner.rs b/src/client/job_runner.rs index c3ef8668..53a8a834 100644 --- a/src/client/job_runner.rs +++ b/src/client/job_runner.rs @@ -27,17 +27,19 @@ use chrono::{DateTime, Utc}; use log::{self, debug, error, info, warn}; -use std::collections::{HashMap, VecDeque}; +use std::collections::{HashMap, HashSet, VecDeque}; use std::fs; use std::path::{Path, PathBuf}; use std::sync::Arc; use std::sync::atomic::{AtomicBool, Ordering}; +use std::sync::mpsc::{self, Sender}; use std::thread; +use std::thread::JoinHandle; use std::time::{Duration, Instant}; use crate::client::apis; use crate::client::apis::configuration::Configuration; -use crate::client::async_cli_command::AsyncCliCommand; +use crate::client::async_cli_command::{AsyncCliCommand, collect_sacct_stats_for_steps}; use crate::client::resource_correction::format_duration_iso8601; use crate::client::resource_monitor::{ResourceMonitor, ResourceMonitorConfig}; use crate::client::utils; @@ -189,6 +191,22 @@ pub struct WorkerResult { pub had_terminations: bool, } +#[derive(Debug, Clone)] +struct PendingSlurmStats { + workflow_id: i64, + job_id: i64, + run_id: i64, + attempt_id: i64, + slurm_job_id: String, + step_name: String, +} + +#[derive(Debug)] +struct SacctBatchRequest { + slurm_job_id: String, + steps: Vec, +} + /// Outcome of attempting to recover a failed job via failure handler. #[derive(Debug, Clone, PartialEq)] pub enum RecoveryOutcome { @@ -274,6 +292,8 @@ pub struct JobRunner { execution_config: ExecutionConfig, rules: ComputeNodeRules, resource_monitor: Option, + sacct_batch_tx: Option>, + sacct_worker_handle: Option>, /// Flag set when SIGTERM is received. Shared with signal handler. termination_requested: Arc, /// Monotonic timestamp of when a job was last claimed. Used for idle timeout. @@ -511,6 +531,14 @@ impl JobRunner { None }; + let (sacct_batch_tx, sacct_worker_handle) = + if execution_config.effective_mode() == ExecutionMode::Slurm { + let (tx, handle) = Self::spawn_sacct_worker(config.clone()); + (Some(tx), Some(handle)) + } else { + (None, None) + }; + JobRunner { config, torc_config, @@ -540,6 +568,8 @@ impl JobRunner { execution_config, rules, resource_monitor, + sacct_batch_tx, + sacct_worker_handle, termination_requested: Arc::new(AtomicBool::new(false)), last_job_claimed_time: None, had_failures: false, @@ -577,6 +607,61 @@ impl JobRunner { }) } + fn spawn_sacct_worker(config: Configuration) -> (Sender, JoinHandle<()>) { + let (tx, rx) = mpsc::channel::(); + let handle = thread::spawn(move || { + while let Ok(request) = rx.recv() { + let expected_step_names: HashSet = request + .steps + .iter() + .map(|step| step.step_name.clone()) + .collect(); + let stats_by_step = + collect_sacct_stats_for_steps(&request.slurm_job_id, &expected_step_names); + + for step in request.steps { + let Some(stats) = stats_by_step.get(&step.step_name) else { + debug!( + "No sacct stats available yet for workflow_id={} job_id={} step={}", + step.workflow_id, step.job_id, step.step_name + ); + continue; + }; + + let mut slurm_stats = SlurmStatsModel::new( + step.workflow_id, + step.job_id, + step.run_id, + step.attempt_id, + ); + slurm_stats.slurm_job_id = Some(step.slurm_job_id.clone()); + slurm_stats.max_rss_bytes = stats.max_rss_bytes; + slurm_stats.max_vm_size_bytes = stats.max_vm_size_bytes; + slurm_stats.max_disk_read_bytes = stats.max_disk_read_bytes; + slurm_stats.max_disk_write_bytes = stats.max_disk_write_bytes; + slurm_stats.ave_cpu_seconds = stats.ave_cpu_seconds; + slurm_stats.node_list = stats.node_list.clone(); + + match apis::slurm_stats_api::create_slurm_stats(&config, slurm_stats) { + Ok(_) => { + info!( + "Stored slurm_stats workflow_id={} job_id={} step={}", + step.workflow_id, step.job_id, step.step_name + ); + } + Err(e) => { + warn!( + "Failed to store slurm_stats workflow_id={} job_id={} step={} error={}", + step.workflow_id, step.job_id, step.step_name, e + ); + } + } + } + } + }); + (tx, handle) + } + /// Atomically claim a workflow action for execution. /// /// This is a convenience method that wraps [`utils::claim_action`] with @@ -836,6 +921,14 @@ impl JobRunner { monitor.shutdown(); } + // Stop the background sacct worker after all queued accounting work is flushed. + self.sacct_batch_tx.take(); + if let Some(handle) = self.sacct_worker_handle.take() + && let Err(e) = handle.join() + { + error!("Failed to join sacct worker thread: {:?}", e); + } + // Deactivate compute node and set duration self.deactivate_compute_node(); @@ -1106,7 +1199,6 @@ impl JobRunner { /// Check the status of running jobs and remove completed ones. fn check_job_status(&mut self) { - let mut completed_jobs = Vec::new(); let mut job_results = Vec::new(); // First pass: check status and collect completed jobs @@ -1114,8 +1206,6 @@ impl JobRunner { match async_job.check_status() { Ok(()) => { if async_job.is_complete { - completed_jobs.push(*job_id); - let attempt_id = async_job.job.attempt_id.unwrap_or(1); let result = async_job.get_result( self.run_id, @@ -1126,8 +1216,19 @@ impl JobRunner { // Extract output_file_ids for validation let output_file_ids = async_job.job.output_file_ids.clone(); + let pending_slurm_stats = async_job + .slurm_job_id() + .zip(async_job.step_name()) + .map(|(slurm_job_id, step_name)| PendingSlurmStats { + workflow_id: self.workflow_id, + job_id: *job_id, + run_id: self.run_id, + attempt_id, + slurm_job_id: slurm_job_id.to_string(), + step_name: step_name.to_string(), + }); - job_results.push((*job_id, result, output_file_ids)); + job_results.push((*job_id, result, output_file_ids, pending_slurm_stats)); } } Err(e) => { @@ -1137,7 +1238,8 @@ impl JobRunner { } // Second pass: validate output files and complete jobs - for (job_id, mut result, output_file_ids) in job_results { + let mut pending_sacct_stats = Vec::new(); + for (job_id, mut result, output_file_ids, pending_slurm_stats) in job_results { // Validate output files if job completed successfully if result.return_code == 0 && let Err(e) = self.validate_and_update_output_files(job_id, &output_file_ids) @@ -1147,7 +1249,44 @@ impl JobRunner { result.status = JobStatus::Failed; } - self.handle_job_completion(job_id, result); + if self.handle_job_completion(job_id, result) + && let Some(step) = pending_slurm_stats + { + pending_sacct_stats.push(step); + } + } + + if !pending_sacct_stats.is_empty() { + self.enqueue_sacct_stats(pending_sacct_stats); + } + } + + fn enqueue_sacct_stats(&self, steps: Vec) { + let Some(tx) = &self.sacct_batch_tx else { + return; + }; + + let mut by_allocation: HashMap> = HashMap::new(); + for step in steps { + by_allocation + .entry(step.slurm_job_id.clone()) + .or_default() + .push(step); + } + + for (slurm_job_id, steps) in by_allocation { + let step_count = steps.len(); + if let Err(e) = tx.send(SacctBatchRequest { + slurm_job_id, + steps, + }) { + warn!("Failed to enqueue sacct batch request: {}", e); + } else { + debug!( + "Enqueued async sacct batch for {} completed step(s)", + step_count + ); + } } } @@ -1447,20 +1586,7 @@ impl JobRunner { } } - fn handle_job_completion(&mut self, job_id: i64, result: ResultModel) { - // Take sacct stats now, before the result is sent to the server, so we can backfill - // resource fields. For srun-wrapped jobs the sysinfo monitor only sees the srun process - // (negligible overhead), so sacct provides the authoritative peak memory and CPU data. - let slurm_stats = self - .running_jobs - .get_mut(&job_id) - .and_then(|j| j.take_slurm_stats()); - - let mut final_result = result; - if let Some(ref stats) = slurm_stats { - backfill_sacct_into_result(&mut final_result, stats); - } - + fn handle_job_completion(&mut self, job_id: i64, mut final_result: ResultModel) -> bool { // Get job info before removing from running_jobs let job_info = self.running_jobs.get(&job_id).map(|cmd| { ( @@ -1500,7 +1626,7 @@ impl JobRunner { self.last_job_claimed_time = Some(Instant::now()); self.running_jobs.remove(&job_id); self.job_resources.remove(&job_id); - return; + return false; } RecoveryOutcome::NoHandler | RecoveryOutcome::NoMatchingRule => { // Check if workflow has use_pending_failed enabled @@ -1553,25 +1679,6 @@ impl JobRunner { "Job completed workflow_id={} job_id={} run_id={} status={}", self.workflow_id, job_id, final_result.run_id, status_str ); - // Store Slurm accounting stats if collected (best-effort, non-blocking). - // slurm_stats was taken at the top of handle_job_completion so we could backfill - // resource fields into the result before reporting to the server. - if let Some(stats) = slurm_stats { - match apis::slurm_stats_api::create_slurm_stats(&self.config, stats) { - Ok(_) => { - info!( - "Stored slurm_stats workflow_id={} job_id={}", - self.workflow_id, job_id - ); - } - Err(e) => { - warn!( - "Failed to store slurm_stats workflow_id={} job_id={}: {}", - self.workflow_id, job_id, e - ); - } - } - } if let Some(job_rr) = self.job_resources.get(&job_id).cloned() { self.increment_node_resources(job_id, &job_rr); self.increment_resources(&job_rr); @@ -1601,6 +1708,7 @@ impl JobRunner { self.running_jobs.remove(&job_id); self.job_resources.remove(&job_id); self.release_gpu_devices(job_id); + true } /// Delete stdio files for a completed job. @@ -2755,6 +2863,7 @@ pub fn cleanup_job_stdio_files(stdout_path: Option<&str>, stderr_path: Option<&s /// from the sstat time-series if TimeSeries monitoring was configured. /// This ensures that even when sstat time-series monitoring missed a spike, the sacct /// post-mortem data fills in accurate resource usage. +#[cfg(test)] fn backfill_sacct_into_result(result: &mut ResultModel, stats: &SlurmStatsModel) { if let Some(max_rss) = stats.max_rss_bytes { // sacct MaxRSS is the job-lifetime peak memory. Take the max against any