diff --git a/src/osekit/utils/job.py b/src/osekit/utils/job.py index 9a22c68e..0d7962e9 100644 --- a/src/osekit/utils/job.py +++ b/src/osekit/utils/job.py @@ -4,6 +4,7 @@ jobs, with writting/submitting of PBS files. """ +from __future__ import annotations import subprocess from dataclasses import dataclass @@ -307,15 +308,35 @@ def write_pbs(self, path: Path) -> None: self.path = path self.progress() - def submit_pbs(self) -> None: - """Submit the PBS file of the job to a PBS queueing system.""" + def submit_pbs(self, dependency: Job | list[Job] | str | list[str] | None = None) -> None: + """Submit the PBS file of the job to a PBS queueing system. + + Parameters + ---------- + dependency: Job | list[Job] | str | None + Job dependency. Can be: + - A Job instance: will wait for that job to complete successfully + - A list of Job instances: will wait for all jobs to complete successfully + - A string: job ID (e.g., "12345.datarmor") or dependency specification + - None: no dependency + + """ if self.update_status() is not JobStatus.PREPARED: msg = "Job should be written before being submitted." raise ValueError(msg) + cmd = ["qsub"] + + if dependency is not None: + dependency_str = self._build_dependency_string(dependency) + if dependency_str: + cmd.extend(["-W", f"depend={dependency_str}"]) + + cmd.append(str(self.path)) + try: request = subprocess.run( - ["qsub", self.path], + cmd, capture_output=True, text=True, check=False, @@ -327,6 +348,72 @@ def submit_pbs(self) -> None: self.job_id = request.stdout.split(".", maxsplit=1)[0].strip() self.progress() + _VALID_DEPENDENCY_TYPES = {"afterok", "afterany", "afternotok", "after"} + + @staticmethod + def _validate_dependency_type(dependency_type: str) -> None: + if dependency_type not in Job._VALID_DEPENDENCY_TYPES: + raise ValueError( + f"Unsupported dependency type '{dependency_type}'. " + f"Expected one of {sorted(Job._VALID_DEPENDENCY_TYPES)}." + ) + + @staticmethod + def _validate_dependency(dependency: list[str] | list[Job]) -> list[str]: + job_ids = [dep.job_id if isinstance(dep, Job) else dep for dep in dependency] + for job_id in job_ids: + if not job_id.isdigit() or len(job_id)!=7: + raise ValueError( + f"Invalid job ID '{job_id}'. Job IDs must be 7 digits long." + ) + return job_ids + + @staticmethod + def _build_dependency_string( + dependency: str | Job | list[str] | list[Job], + dependency_type: str = "afterok", + ) -> str: + """Build a PBS dependency string. + + Parameters + ---------- + dependency: Job | str + Job or job ID to depend on. + dependency_type: str + Type of dependency (afterok, afterany, afternotok, after). + + Returns + ------- + str + PBS dependency string. + + Examples + -------- + >>> Job._build_dependency_string("1234567") + 'afterok:1234567' + >>> Job._build_dependency_string(["1234567", "4567891"]) + 'afterok:1234567:4567891' + >>> Job._build_dependency_string("7894561", dependency_type="afterany") + 'afterany:7894651' + + """ + dependency = dependency if isinstance(dependency, list) else [dependency] + id_str = Job._validate_dependency(dependency) + Job._validate_dependency_type(dependency_type) + + if unsubmitted_job := next( + ( + j + for j in dependency + if isinstance(j, Job) and j.status.value < JobStatus.QUEUED.value + ), + None, + ): + msg = f"Job '{unsubmitted_job.name}' has not been submitted yet." + raise ValueError(msg) + + return f"{dependency_type}:{':'.join(id_str)}" + def update_info(self) -> None: """Request info about the job and update it.""" if self.job_id is None: @@ -443,9 +530,25 @@ def create_job( job.write_pbs(output_folder / f"{name}.pbs") self.jobs.append(job) - def submit_pbs(self) -> None: - """Submit all repared jobs to the PBS queueing system.""" + def submit_pbs( + self, dependencies: dict[str, "Job | list[Job]"] | None = None + ) -> None: + """Submit all prepared jobs to the PBS queueing system. + + Parameters + ---------- + dependencies: dict[str, Job | list[Job]] | None + Optional dictionary mapping job names to their dependencies. + Example: {"job2": job1, "job3": [job1, job2]} + + """ for job in self.jobs: if job.update_status() is not JobStatus.PREPARED: continue - job.submit_pbs() + + # Check if this job has dependencies + depend_on = None + if dependencies and job.name in dependencies: + depend_on = dependencies[job.name] + + job.submit_pbs(dependency=depend_on) diff --git a/tests/test_job.py b/tests/test_job.py index f98305b3..34dd0cc3 100644 --- a/tests/test_job.py +++ b/tests/test_job.py @@ -1,4 +1,7 @@ +from __future__ import annotations + import subprocess +from contextlib import nullcontext from pathlib import Path import pytest @@ -414,7 +417,7 @@ def __init__(self, name: str, status: JobStatus) -> None: self.name = name self.status = status - def submit_pbs(self) -> None: + def submit_pbs(self, dependency=None) -> None: submitted_jobs.append(self.name) def update_status(self) -> JobStatus: @@ -436,3 +439,169 @@ def update_status(self) -> JobStatus: job_builder.submit_pbs() assert submitted_jobs == ["prepared"] + + +@pytest.mark.parametrize( + ("dependency", "ids", "status", "expected"), + [ + pytest.param( + ["1234567"], + [None], + [None], + nullcontext("afterok:1234567"), + id="single_job_id", + ), + pytest.param( + ["1234567", "4567891", "7891234"], + [None] * 3, + [None] * 3, + nullcontext("afterok:1234567:4567891:7891234"), + id="multiple_job_ids", + ), + pytest.param( + ["123"], + [None], + [None], + pytest.raises( + ValueError, + match=r"Invalid job ID '123'\. Job IDs must be 7 digits long\.", + ), + id="invalid_job_id_too_short", + ), + pytest.param( + [Job(script_path=Path("test.py"), name="job_1")], + ["12345678"], + [JobStatus.QUEUED], + pytest.raises( + ValueError, + match=r"Invalid job ID '12345678'\. Job IDs must be 7 digits long\.", + ), + id="invalid_job_id_too_long", + ), + pytest.param( + ["abcdefg"], + [None], + [None], + pytest.raises( + ValueError, + match=r"Invalid job ID 'abcdefg'\. Job IDs must be 7 digits long\.", + ), + id="invalid_job_id_non_numeric", + ), + pytest.param( + ["1234567", "not_a_job_id"], + [None] * 2, + [None] * 2, + pytest.raises( + ValueError, + match=r"Invalid job ID 'not_a_job_id'\. Job IDs must be 7 digits long\.", + ), + id="multiple_job_id_one_invalid", + ), + pytest.param( + [Job(script_path=Path("test.py"), name="job_1")], + ["1234567"], + [JobStatus.QUEUED], + nullcontext("afterok:1234567"), + id="single_job_instance", + ), + pytest.param( + [ + Job(script_path=Path("horse_with.py"), name="job_1"), + Job(script_path=Path("no_name.py"), name="job_2"), + ], + ["1234567", "4567891"], + [JobStatus.QUEUED, JobStatus.QUEUED], + nullcontext("afterok:1234567:4567891"), + id="multiple_job_instance", + ), + pytest.param( + [ + Job(script_path=Path("king_crimson.py"), name="job_1"), + Job(script_path=Path("crimson_king.py"), name="job_2"), + ], + ["1234567", "not_an_id"], + [JobStatus.QUEUED, JobStatus.QUEUED], + pytest.raises( + ValueError, + match=r"Invalid job ID 'not_an_id'\. Job IDs must be 7 digits long\.", + ), + id="multiple_job_instance_invalid_one", + ), + pytest.param( + [ + Job(script_path=Path("king_crimson.py"), name="job_1"), + "9876543", + ], + ["1234567", None], + [JobStatus.QUEUED, None], + nullcontext("afterok:1234567:9876543"), + id="job_and_string_input", + ), + pytest.param( + [Job(script_path=Path("test.py"), name="tornero")], + ["1234567"], + [JobStatus.UNPREPARED], + pytest.raises( + ValueError, + match="Job 'tornero' has not been submitted yet.", + ), + id="unprepared_job_instance", + ), + pytest.param( + [ + Job(script_path=Path("script.py"), name="dalida"), + Job(script_path=Path("script.py"), name="mourir_sur_scene"), + ], + ["1234567", "4567896"], + [JobStatus.QUEUED, JobStatus.PREPARED], + pytest.raises( + ValueError, + match="Job 'mourir_sur_scene' has not been submitted yet.", + ), + id="multiple_job_instance_one_not_submitted", + ), + ], +) + +def test_build_dependency_string_with_string_input( + dependency: list[str] | list[Job], + ids: list[str] | None, + status: list[JobStatus], + expected: str | None, +) -> None: + """Test building dependency string from string and Job inputs.""" + for dep, id, st in zip(dependency, ids, status, strict=True): + if isinstance(dep, Job): + dep.status = st + dep.job_id = id + + with expected as e: + assert Job._build_dependency_string(dependency) == e + + +@pytest.mark.parametrize( + ("dependency_type", "expected"), + [ + pytest.param("afterok", nullcontext("afterok:1234567"), id="afterok"), + pytest.param("afterany", nullcontext("afterany:1234567"), id="afterany"), + pytest.param("afternotok", nullcontext("afternotok:1234567"), id="afternotok"), + pytest.param("after", nullcontext("after:1234567"), id="after"), + pytest.param( + "not_a_supported_type", + pytest.raises( + ValueError, + match=r"Unsupported dependency type 'not_a_supported_type'\. Expected one of \['after', 'afterany', 'afternotok', 'afterok'\]\.", + ), + id="invalid_dependency_type", + ), + ], +) + +def test_build_dependency_string_with_different_types( + dependency_type: str, + expected: type[Exception], +) -> None: + """Test building dependency strings with different dependency types.""" + with expected as e: + assert Job._build_dependency_string("1234567", dependency_type) == e \ No newline at end of file