Skip to content

Commit 5a5aefa

Browse files
committed
fix: harden job cleanup and stuck job recovery
1 parent d7927a9 commit 5a5aefa

4 files changed

Lines changed: 37 additions & 26 deletions

File tree

app/backends/local.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -320,6 +320,8 @@ async def cleanup(
320320
try:
321321
pid = int(pid_path.read_text().strip())
322322
os.kill(pid, signal.SIGTERM)
323+
await asyncio.sleep(5)
324+
os.kill(pid, signal.SIGKILL)
323325
except ProcessLookupError as exc:
324326
logger.debug("Process already gone for job %s: %s", job_id, exc)
325327
except (ValueError, OSError) as exc:

app/backends/slurm_ssh.py

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,6 @@
1919
from app.config import SlurmSSHConfig
2020
from app.models import JobStatus, WorkflowFileInfo
2121
from app.utils import (
22-
build_snkmt_setup_commands,
2322
build_wrapper_script,
2423
enforce_error_limit,
2524
rename_with_cleanup,
@@ -527,9 +526,17 @@ async def cleanup(
527526
work_dir: str,
528527
) -> None:
529528
# 1. Kill the wrapper process if a PID file exists
529+
pid_file = f"{shlex.quote(work_dir)}/.pid"
530530
await self._run_ssh(
531-
f"test -f {shlex.quote(work_dir)}/.pid && "
532-
f"kill $(cat {shlex.quote(work_dir)}/.pid) 2>/dev/null || true",
531+
f"test -f {pid_file} && "
532+
f"kill $(cat {pid_file}) 2>/dev/null || true",
533+
check=False,
534+
)
535+
# Wait before sending SIGKILL in case SIGTERM wasn't enough
536+
await asyncio.sleep(5)
537+
await self._run_ssh(
538+
f"test -f {pid_file} && "
539+
f"kill -9 $(cat {pid_file}) 2>/dev/null || true",
533540
check=False,
534541
)
535542
# 2. Cancel only SLURM jobs launched from this work_dir

app/tasks.py

Lines changed: 21 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -179,8 +179,9 @@ def log_callback(line: str) -> None:
179179
)
180180

181181
except asyncio.CancelledError:
182-
logger.info("Job %s was cancelled", job_id)
183-
store._mark_cancelled(job_id)
182+
if record.status not in TERMINAL_STATUSES:
183+
logger.info("Job %s was cancelled", job_id)
184+
store._mark_cancelled(job_id)
184185
raise
185186

186187
except Exception as exc:
@@ -224,27 +225,27 @@ async def sync_job_data_loop(
224225
if not record.work_dir:
225226
continue
226227

227-
# Recover orphaned RUNNING jobs (no active monitor task)
228-
if record.task is None or record.task.done():
229-
try:
230-
exit_code = await backend.check_job_status(
231-
job_id, record.work_dir
232-
)
233-
if exit_code is not None:
234-
logger.warning(
235-
"Recovering orphaned job %s (exit code %d)",
236-
job_id,
237-
exit_code,
238-
)
239-
store.mark_finished(job_id, exit_code)
240-
store.persist(record)
241-
continue
242-
except Exception:
228+
try:
229+
exit_code = await backend.check_job_status(
230+
job_id, record.work_dir
231+
)
232+
if exit_code is not None:
243233
logger.warning(
244-
"Failed to check orphaned job %s",
234+
"Recovering stuck job %s (exit code %d)",
245235
job_id,
246-
exc_info=True,
236+
exit_code,
247237
)
238+
store.mark_finished(job_id, exit_code)
239+
store.persist(record)
240+
if record.task and not record.task.done():
241+
record.task.cancel()
242+
continue
243+
except Exception:
244+
logger.warning(
245+
"Failed to check job %s",
246+
job_id,
247+
exc_info=True,
248+
)
248249

249250
_flush_logs(store, job_id)
250251
snkmt_db_path = store.get_snkmt_db_path(job_id)

tests/test_slurm_ssh.py

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -527,7 +527,7 @@ async def mock_connect(*args, **kwargs):
527527

528528

529529
class TestCleanupSlurm:
530-
async def test_cleanup_issues_three_separate_ssh_commands(self, backend):
530+
async def test_cleanup_issues_four_separate_ssh_commands(self, backend):
531531
ssh_calls: list[str] = []
532532

533533
async def run_ssh(cmd, **kwargs):
@@ -540,7 +540,8 @@ async def run_ssh(cmd, **kwargs):
540540
with patch.object(backend, "_run_ssh", side_effect=run_ssh):
541541
await backend.cleanup("job-1", "/scratch/jobs/job-1")
542542

543-
assert len(ssh_calls) == 3
544-
assert any("kill" in cmd for cmd in ssh_calls)
543+
assert len(ssh_calls) == 4
544+
assert any("kill" in cmd and "kill -9" not in cmd for cmd in ssh_calls)
545+
assert any("kill -9" in cmd for cmd in ssh_calls)
545546
assert any("scancel" in cmd for cmd in ssh_calls)
546547
assert any("rm -rf" in cmd for cmd in ssh_calls)

0 commit comments

Comments
 (0)