Skip to content

Commit 53451ea

Browse files
committed
feat: use bare repo + worktree instead of full clone and add slurm job logs resolution logic
1 parent 5a5aefa commit 53451ea

13 files changed

Lines changed: 571 additions & 159 deletions

File tree

app/backends/base.py

Lines changed: 100 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -6,10 +6,12 @@
66
from pathlib import Path, PurePosixPath
77
from typing import TYPE_CHECKING
88

9+
from app.utils import bare_repo_dir, parse_default_branch
10+
911
if TYPE_CHECKING:
1012
from collections.abc import AsyncGenerator, Awaitable, Callable
1113

12-
from app.models import WorkflowFileInfo
14+
from app.models import SnkmtJobResponse, WorkflowFileInfo
1315

1416
logger = logging.getLogger(__name__)
1517

@@ -60,19 +62,103 @@ def work_dir(self, job_id: str) -> str:
6062
...
6163

6264
@abstractmethod
65+
async def _run_git_cmd(self, *args: str) -> str:
66+
"""Run a git command and return its stdout as a string.
67+
68+
Implementations must raise on non-zero exit codes.
69+
"""
70+
...
71+
72+
@abstractmethod
73+
async def _copy_local_workflow(self, src: str, dst: str) -> None:
74+
"""Copy a local workflow directory to the work directory."""
75+
...
76+
77+
@abstractmethod
78+
def _scratch_dir(self) -> str:
79+
"""Return the scratch directory path for this backend."""
80+
...
81+
6382
async def prepare(
6483
self,
6584
job_id: str,
6685
workflow: str,
6786
git_ref: str | None = None,
6887
) -> tuple[str, str | None, str | None]:
69-
"""
70-
Prepare workflow in a new working directory.
88+
"""Prepare workflow in a new working directory.
7189
72-
If workflow is a URL, clone the repo (checking out git_ref if provided).
73-
If it is a local path, upload the directory to the compute host.
90+
If workflow is a URL, set up a bare repo and create a worktree.
91+
If it is a local path, copy/upload the directory.
7492
"""
75-
...
93+
work_dir = self.work_dir(job_id)
94+
git_sha: str | None = None
95+
96+
if workflow.startswith(("http://", "https://")):
97+
repo_dir = bare_repo_dir(self._scratch_dir(), workflow)
98+
resolved_ref = git_ref
99+
100+
# 1. Ensure bare repo exists (idempotent)
101+
await self._run_git_cmd("git", "init", "--bare", repo_dir)
102+
103+
# 2. Resolve default branch if git_ref not provided
104+
if resolved_ref is None:
105+
out = await self._run_git_cmd(
106+
"git",
107+
"ls-remote",
108+
"--symref",
109+
workflow,
110+
"HEAD",
111+
)
112+
resolved_ref = parse_default_branch(out)
113+
114+
# 3. Fetch to a job-scoped ref
115+
job_ref = f"refs/snakedispatch/{job_id}"
116+
await self._run_git_cmd(
117+
"git",
118+
"-C",
119+
repo_dir,
120+
"fetch",
121+
workflow,
122+
f"{resolved_ref}:{job_ref}",
123+
)
124+
125+
# 4. Create worktree (detached HEAD)
126+
await self._run_git_cmd(
127+
"git",
128+
"-C",
129+
repo_dir,
130+
"worktree",
131+
"add",
132+
"--detach",
133+
work_dir,
134+
job_ref,
135+
)
136+
137+
# 5. Extract SHA
138+
out = await self._run_git_cmd(
139+
"git",
140+
"-C",
141+
work_dir,
142+
"rev-parse",
143+
"HEAD",
144+
)
145+
git_sha = out.strip()
146+
147+
# 6. Clean up temp ref
148+
await self._run_git_cmd(
149+
"git",
150+
"-C",
151+
repo_dir,
152+
"update-ref",
153+
"-d",
154+
job_ref,
155+
)
156+
157+
git_ref = resolved_ref
158+
else:
159+
await self._copy_local_workflow(workflow, work_dir)
160+
161+
return work_dir, git_ref, git_sha
76162

77163
@abstractmethod
78164
async def setup(
@@ -184,6 +270,14 @@ async def check_connectivity(self) -> bool:
184270
"""Check whether the backend is reachable. Returns True if healthy."""
185271
...
186272

273+
def resolve_job_logs(
274+
self,
275+
jobs: list[SnkmtJobResponse],
276+
workflow_files: list[WorkflowFileInfo] | None,
277+
) -> None:
278+
"""Set job.log for each job based on backend-specific log paths."""
279+
return
280+
187281
@abstractmethod
188282
async def cleanup(
189283
self,

app/backends/local.py

Lines changed: 28 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -35,51 +35,39 @@ def __init__(self, config: LocalConfig) -> None:
3535
def work_dir(self, job_id: str) -> str:
3636
return f"{self._config.scratch_dir}/jobs/{job_id}"
3737

38+
def _scratch_dir(self) -> str:
39+
return self._config.scratch_dir
40+
41+
async def _run_git_cmd(self, *args: str) -> str:
42+
"""Run a git subprocess locally, returning stdout."""
43+
if "init" in args:
44+
Path(args[-1]).parent.mkdir(parents=True, exist_ok=True)
45+
proc = await asyncio.create_subprocess_exec(
46+
*args,
47+
stdout=asyncio.subprocess.PIPE,
48+
stderr=asyncio.subprocess.PIPE,
49+
)
50+
stdout, stderr = await proc.communicate()
51+
if proc.returncode != 0:
52+
cmd_str = " ".join(args)
53+
msg = (
54+
f"git command failed (exit {proc.returncode}): {cmd_str}\n"
55+
f"stderr: {(stderr or b'').decode()}"
56+
)
57+
raise RuntimeError(msg)
58+
return (stdout or b"").decode()
59+
60+
async def _copy_local_workflow(self, src: str, dst: str) -> None:
61+
await asyncio.to_thread(shutil.copytree, src, dst, dirs_exist_ok=True)
62+
3863
async def prepare(
3964
self,
4065
job_id: str,
4166
workflow: str,
4267
git_ref: str | None = None,
4368
) -> tuple[str, str | None, str | None]:
4469
Path(self._config.scratch_dir).mkdir(parents=True, exist_ok=True)
45-
work_dir = self.work_dir(job_id)
46-
git_sha: str | None = None
47-
48-
if workflow.startswith(("http://", "https://")):
49-
cmd = ["git", "clone", "--depth=1"]
50-
if git_ref:
51-
cmd += ["--branch", git_ref]
52-
cmd += [workflow, work_dir]
53-
proc = await asyncio.create_subprocess_exec(
54-
*cmd,
55-
stdout=asyncio.subprocess.PIPE,
56-
stderr=asyncio.subprocess.PIPE,
57-
)
58-
_, stderr = await proc.communicate()
59-
if proc.returncode != 0:
60-
msg = f"git clone failed (exit {proc.returncode}): {stderr.decode()}"
61-
raise RuntimeError(msg)
62-
63-
proc = await asyncio.create_subprocess_exec(
64-
"git",
65-
"-C",
66-
work_dir,
67-
"rev-parse",
68-
"HEAD",
69-
"--abbrev-ref",
70-
"HEAD",
71-
stdout=asyncio.subprocess.PIPE,
72-
)
73-
out, _ = await proc.communicate()
74-
lines = out.decode().strip().splitlines()
75-
git_sha = lines[0] if lines else None
76-
git_ref = lines[1] if len(lines) > 1 else None
77-
else:
78-
src = Path(workflow)
79-
dst = Path(work_dir)
80-
await asyncio.to_thread(shutil.copytree, src, dst, dirs_exist_ok=True)
81-
82-
return work_dir, git_ref, git_sha
70+
return await super().prepare(job_id, workflow, git_ref)
8371

8472
async def setup(
8573
self,
@@ -94,7 +82,6 @@ async def setup(
9482
await asyncio.to_thread(full_path.write_text, content)
9583
logger.debug("Wrote setup file: %s", full_path)
9684

97-
9885
async def launch(
9986
self,
10087
job_id: str,
@@ -202,11 +189,12 @@ async def check_job_status(self, job_id: str, work_dir: str) -> int | None:
202189
try:
203190
pid = int(pid_path.read_text(encoding="utf-8").strip())
204191
os.kill(pid, 0)
205-
return None # still running
206192
except (ProcessLookupError, PermissionError):
207193
return -1 # dead without exitcode
208194
except (ValueError, OSError):
209195
return None
196+
else:
197+
return None # still running
210198
return None
211199

212200
async def list_workflow_files(

app/backends/slurm_ssh.py

Lines changed: 55 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@
1717

1818
from app.backends.base import CHUNK_SIZE, SNKMT_DB_FILENAME, ComputeBackend
1919
from app.config import SlurmSSHConfig
20-
from app.models import JobStatus, WorkflowFileInfo
20+
from app.models import JobStatus, SnkmtJobResponse, WorkflowFileInfo
2121
from app.utils import (
2222
build_wrapper_script,
2323
enforce_error_limit,
@@ -218,34 +218,21 @@ async def _upload_dir(self, local_dir: str, remote_dir: str) -> None:
218218
msg = f"local tar failed with exit {local_proc.returncode}"
219219
raise RuntimeError(msg)
220220

221-
async def prepare(
222-
self,
223-
job_id: str,
224-
workflow: str,
225-
git_ref: str | None = None,
226-
) -> tuple[str, str | None, str | None]:
227-
"""Clone/upload workflow and return (work_dir, git_ref, git_sha)."""
228-
work_dir = self.work_dir(job_id)
229-
git_sha: str | None = None
230-
231-
if workflow.startswith(("http://", "https://")):
232-
cmd = "git clone --depth=1 "
233-
if git_ref:
234-
cmd += f"--branch {shlex.quote(git_ref)} "
235-
cmd += f"{shlex.quote(workflow)} {shlex.quote(work_dir)}"
236-
await self._run_ssh(cmd)
221+
def _scratch_dir(self) -> str:
222+
return self._config.scratch_dir
237223

238-
result = await self._run_ssh(
239-
f"git -C {shlex.quote(work_dir)} rev-parse HEAD && "
240-
f"git -C {shlex.quote(work_dir)} rev-parse --abbrev-ref HEAD"
241-
)
242-
lines = result.stdout.strip().splitlines()
243-
git_sha = lines[0] if lines else None
244-
git_ref = lines[1] if len(lines) > 1 else None
245-
else:
246-
await self._upload_dir(workflow, work_dir)
224+
async def _run_git_cmd(self, *args: str) -> str:
225+
"""Run a git command over SSH, returning stdout."""
226+
quoted = [shlex.quote(a) for a in args]
227+
# For init, ensure parent dirs exist remotely
228+
if "init" in args:
229+
repo_path = shlex.quote(args[-1])
230+
await self._run_ssh(f"mkdir -p $(dirname {repo_path})")
231+
result = await self._run_ssh(" ".join(quoted))
232+
return result.stdout or ""
247233

248-
return work_dir, git_ref, git_sha
234+
async def _copy_local_workflow(self, src: str, dst: str) -> None:
235+
await self._upload_dir(src, dst)
249236

250237
async def setup(
251238
self,
@@ -268,7 +255,6 @@ async def setup(
268255
await f.write(content)
269256
logger.debug("Wrote setup file: %s", full_path)
270257

271-
272258
async def launch(
273259
self,
274260
job_id: str,
@@ -520,6 +506,45 @@ async def sync_snkmt_db(self, job_id: str, work_dir: str, local_path: Path) -> N
520506
finally:
521507
await self._run_ssh(f"rm -f {shlex.quote(remote_bak)}", check=False)
522508

509+
def resolve_job_logs(
510+
self,
511+
jobs: list[SnkmtJobResponse],
512+
workflow_files: list[WorkflowFileInfo] | None,
513+
) -> None:
514+
"""Match slurm log files to jobs via structured path convention.
515+
516+
Expects logs at: logs/slurm/{rule}/{wildcards_string}/{slurm_id}.out
517+
If multiple logs exist (retries), picks the latest (highest slurm ID).
518+
"""
519+
# Index slurm log files by directory prefix
520+
slurm_logs: dict[str, list[str]] = {}
521+
if workflow_files:
522+
for wf in workflow_files:
523+
p = wf["path"]
524+
if p.startswith("logs/slurm/") and p.endswith(".out"):
525+
dir_prefix = p.rsplit("/", 1)[0]
526+
slurm_logs.setdefault(dir_prefix, []).append(p)
527+
528+
def _slurm_id(path: str) -> int:
529+
fname = path.rsplit("/", 1)[-1].removesuffix(".out")
530+
try:
531+
return int(fname)
532+
except ValueError:
533+
return -1
534+
535+
for job in jobs:
536+
if job.wildcards:
537+
wildcards_str = ",".join(
538+
f"{k}={v}" for k, v in sorted(job.wildcards.items())
539+
)
540+
slurm_dir = f"logs/slurm/{job.rule}/{wildcards_str}"
541+
else:
542+
slurm_dir = f"logs/slurm/{job.rule}"
543+
544+
candidates = slurm_logs.get(slurm_dir, [])
545+
if candidates:
546+
job.log = max(candidates, key=_slurm_id)
547+
523548
async def cleanup(
524549
self,
525550
job_id: str,
@@ -528,15 +553,13 @@ async def cleanup(
528553
# 1. Kill the wrapper process if a PID file exists
529554
pid_file = f"{shlex.quote(work_dir)}/.pid"
530555
await self._run_ssh(
531-
f"test -f {pid_file} && "
532-
f"kill $(cat {pid_file}) 2>/dev/null || true",
556+
f"test -f {pid_file} && kill $(cat {pid_file}) 2>/dev/null || true",
533557
check=False,
534558
)
535559
# Wait before sending SIGKILL in case SIGTERM wasn't enough
536560
await asyncio.sleep(5)
537561
await self._run_ssh(
538-
f"test -f {pid_file} && "
539-
f"kill -9 $(cat {pid_file}) 2>/dev/null || true",
562+
f"test -f {pid_file} && kill -9 $(cat {pid_file}) 2>/dev/null || true",
540563
check=False,
541564
)
542565
# 2. Cancel only SLURM jobs launched from this work_dir

app/models.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -177,6 +177,7 @@ class SnkmtJobResponse(BaseModel):
177177
started_at: datetime | None = None
178178
completed_at: datetime | None = None
179179
files: list[SnkmtFileResponse] = []
180+
log: str | None = None
180181

181182
@field_validator("started_at", "completed_at", mode="before")
182183
@classmethod

0 commit comments

Comments
 (0)