@@ -722,15 +722,23 @@ impl JobRunner {
722722 "Failed to check workflow completion after retries: {}" ,
723723 retry_err
724724 ) ;
725+ self . kill_running_jobs ( ) ;
725726 return Err (
726727 format ! ( "Unable to check workflow completion: {}" , retry_err) . into ( ) ,
727728 ) ;
728729 }
729730 }
730731
731- self . check_job_status ( ) ;
732- if self . execution_config . limit_resources ( ) && exec_mode == ExecutionMode :: Direct {
733- self . handle_oom_violations ( ) ;
732+ if let Err ( e) = self . check_job_status ( ) {
733+ self . kill_running_jobs ( ) ;
734+ return Err ( e) ;
735+ }
736+ if self . execution_config . limit_resources ( )
737+ && exec_mode == ExecutionMode :: Direct
738+ && let Err ( e) = self . handle_oom_violations ( )
739+ {
740+ self . kill_running_jobs ( ) ;
741+ return Err ( e) ;
734742 }
735743 self . check_and_execute_actions ( ) ;
736744
@@ -853,6 +861,31 @@ impl JobRunner {
853861 } )
854862 }
855863
864+ /// Kill all running child processes without making any API calls.
865+ ///
866+ /// Used when the server is unreachable and we need to exit immediately.
867+ /// Jobs are left in their current server-side status (likely "running");
868+ /// the server will detect them as stale when the compute node is no longer
869+ /// reporting in.
870+ fn kill_running_jobs ( & mut self ) {
871+ if self . running_jobs . is_empty ( ) {
872+ return ;
873+ }
874+ error ! (
875+ "Killing {} running job(s) due to unrecoverable API failure workflow_id={}" ,
876+ self . running_jobs. len( ) ,
877+ self . workflow_id
878+ ) ;
879+ for ( job_id, async_job) in self . running_jobs . iter_mut ( ) {
880+ if let Err ( e) = async_job. send_sigkill ( ) {
881+ warn ! (
882+ "Failed to SIGKILL job workflow_id={} job_id={}: {}" ,
883+ self . workflow_id, job_id, e
884+ ) ;
885+ }
886+ }
887+ }
888+
856889 /// Deactivate the compute node and set its duration.
857890 fn deactivate_compute_node ( & self ) {
858891 let duration_seconds = self . start_instant . elapsed ( ) . as_secs_f64 ( ) ;
@@ -920,7 +953,12 @@ impl JobRunner {
920953 } ;
921954 }
922955 for ( job_id, result) in results {
923- self . handle_job_completion ( job_id, result) ;
956+ if let Err ( e) = self . handle_job_completion ( job_id, result) {
957+ error ! (
958+ "Failed to record canceled job completion workflow_id={} job_id={}: {}" ,
959+ self . workflow_id, job_id, e
960+ ) ;
961+ }
924962 }
925963 }
926964
@@ -1100,12 +1138,17 @@ impl JobRunner {
11001138
11011139 // Final pass: handle completions (notify server)
11021140 for ( job_id, result) in results {
1103- self . handle_job_completion ( job_id, result) ;
1141+ if let Err ( e) = self . handle_job_completion ( job_id, result) {
1142+ error ! (
1143+ "Failed to record terminated job completion workflow_id={} job_id={}: {}" ,
1144+ self . workflow_id, job_id, e
1145+ ) ;
1146+ }
11041147 }
11051148 }
11061149
11071150 /// Check the status of running jobs and remove completed ones.
1108- fn check_job_status ( & mut self ) {
1151+ fn check_job_status ( & mut self ) -> Result < ( ) , Box < dyn std :: error :: Error > > {
11091152 let mut completed_jobs = Vec :: new ( ) ;
11101153 let mut job_results = Vec :: new ( ) ;
11111154
@@ -1147,8 +1190,9 @@ impl JobRunner {
11471190 result. status = JobStatus :: Failed ;
11481191 }
11491192
1150- self . handle_job_completion ( job_id, result) ;
1193+ self . handle_job_completion ( job_id, result) ? ;
11511194 }
1195+ Ok ( ( ) )
11521196 }
11531197
11541198 /// Handle OOM violations detected by the resource monitor.
@@ -1161,14 +1205,14 @@ impl JobRunner {
11611205 /// 2. Immediately SIGKILLs the violating job (no grace period for OOM)
11621206 /// 3. Waits for the job to exit and collects its result
11631207 /// 4. Reports the job as failed with the configured `oom_exit_code`
1164- fn handle_oom_violations ( & mut self ) {
1208+ fn handle_oom_violations ( & mut self ) -> Result < ( ) , Box < dyn std :: error :: Error > > {
11651209 let violations = match & self . resource_monitor {
11661210 Some ( monitor) => monitor. recv_oom_violations ( ) ,
1167- None => return ,
1211+ None => return Ok ( ( ) ) ,
11681212 } ;
11691213
11701214 if violations. is_empty ( ) {
1171- return ;
1215+ return Ok ( ( ) ) ;
11721216 }
11731217
11741218 let oom_exit_code = self . execution_config . oom_exit_code ( ) ;
@@ -1243,8 +1287,9 @@ impl JobRunner {
12431287
12441288 // Third pass: handle completions (notify server)
12451289 for ( job_id, result) in results {
1246- self . handle_job_completion ( job_id, result) ;
1290+ self . handle_job_completion ( job_id, result) ? ;
12471291 }
1292+ Ok ( ( ) )
12481293 }
12491294
12501295 /// Validate that all expected output files exist and update their st_mtime
@@ -1447,7 +1492,11 @@ impl JobRunner {
14471492 }
14481493 }
14491494
1450- fn handle_job_completion ( & mut self , job_id : i64 , result : ResultModel ) {
1495+ fn handle_job_completion (
1496+ & mut self ,
1497+ job_id : i64 ,
1498+ result : ResultModel ,
1499+ ) -> Result < ( ) , Box < dyn std:: error:: Error > > {
14511500 // Take sacct stats now, before the result is sent to the server, so we can backfill
14521501 // resource fields. For srun-wrapped jobs the sysinfo monitor only sees the srun process
14531502 // (negligible overhead), so sacct provides the authoritative peak memory and CPU data.
@@ -1500,7 +1549,7 @@ impl JobRunner {
15001549 self . last_job_claimed_time = Some ( Instant :: now ( ) ) ;
15011550 self . running_jobs . remove ( & job_id) ;
15021551 self . job_resources . remove ( & job_id) ;
1503- return ;
1552+ return Ok ( ( ) ) ;
15041553 }
15051554 RecoveryOutcome :: NoHandler | RecoveryOutcome :: NoMatchingRule => {
15061555 // Check if workflow has use_pending_failed enabled
@@ -1593,14 +1642,22 @@ impl JobRunner {
15931642 }
15941643 Err ( e) => {
15951644 error ! (
1596- "Job complete failed workflow_id={} job_id={} error={}" ,
1645+ "Job complete failed after retries workflow_id={} job_id={} error={}" ,
15971646 self . workflow_id, job_id, e
15981647 ) ;
1648+ // Clean up local state before propagating the error
1649+ self . running_jobs . remove ( & job_id) ;
1650+ self . job_resources . remove ( & job_id) ;
1651+ self . release_gpu_devices ( job_id) ;
1652+ return Err (
1653+ format ! ( "Unable to record job completion for job {}: {}" , job_id, e) . into ( ) ,
1654+ ) ;
15991655 }
16001656 }
16011657 self . running_jobs . remove ( & job_id) ;
16021658 self . job_resources . remove ( & job_id) ;
16031659 self . release_gpu_devices ( job_id) ;
1660+ Ok ( ( ) )
16041661 }
16051662
16061663 /// Delete stdio files for a completed job.
@@ -2026,10 +2083,12 @@ impl JobRunner {
20262083 Ok ( rr) => rr,
20272084 Err ( e) => {
20282085 error ! (
2029- "Error getting resource requirements for job {}: {}" ,
2030- job_id, e
2086+ "Failed to get resource requirements after retries \
2087+ workflow_id={} job_id={} rr_id={}: {}",
2088+ self . workflow_id, job_id, rr_id, e
20312089 ) ;
2032- panic ! ( "Failed to get resource requirements" ) ;
2090+ self . revert_job_to_ready ( job_id) ;
2091+ continue ;
20332092 }
20342093 } ;
20352094
@@ -2045,10 +2104,13 @@ impl JobRunner {
20452104 debug ! ( "Successfully marked job {} as started in database" , job_id) ;
20462105 }
20472106 Err ( e) => {
2048- panic ! (
2049- "Failed to mark job {} as started in database after retries: {}" ,
2050- job_id, e
2107+ error ! (
2108+ "Failed to mark job as started after retries \
2109+ workflow_id={} job_id={}: {}",
2110+ self . workflow_id, job_id, e
20512111 ) ;
2112+ self . revert_job_to_ready ( job_id) ;
2113+ continue ;
20522114 }
20532115 }
20542116
@@ -2172,10 +2234,12 @@ impl JobRunner {
21722234 Ok ( rr) => rr,
21732235 Err ( e) => {
21742236 error ! (
2175- "Error getting resource requirements for job {}: {}" ,
2176- job_id, e
2237+ "Failed to get resource requirements after retries \
2238+ workflow_id={} job_id={} rr_id={}: {}",
2239+ self . workflow_id, job_id, rr_id, e
21772240 ) ;
2178- panic ! ( "Failed to get resource requirements" ) ;
2241+ self . revert_job_to_ready ( job_id) ;
2242+ continue ;
21792243 }
21802244 } ;
21812245
@@ -2193,10 +2257,11 @@ impl JobRunner {
21932257 }
21942258 Err ( e) => {
21952259 error ! (
2196- "Failed to mark job {} as started in database after retries: {}" ,
2197- job_id, e
2260+ "Failed to mark job as started after retries \
2261+ workflow_id={} job_id={}: {}",
2262+ self . workflow_id, job_id, e
21982263 ) ;
2199- // Skip this job if we can't mark it as started
2264+ self . revert_job_to_ready ( job_id ) ;
22002265 continue ;
22012266 }
22022267 }
@@ -2251,10 +2316,9 @@ impl JobRunner {
22512316 }
22522317 Err ( err) => {
22532318 error ! (
2254- "Job preparation failed workflow_id={} error= {}" ,
2319+ "Failed to claim jobs after retries workflow_id={}: {}" ,
22552320 self . workflow_id, err
22562321 ) ;
2257- panic ! ( "Failed to prepare jobs for submission after retries" ) ;
22582322 }
22592323 }
22602324 }
0 commit comments