From a84ce1ee8e57d3dee294c2520fef8703d6f0e3c1 Mon Sep 17 00:00:00 2001 From: Leopold Talirz Date: Mon, 1 Jun 2026 14:27:23 +0200 Subject: [PATCH] feat(api): re-run only failed steps force_rerun = True would simply rerun all steps. Moving to `rerun=all` (old behavior) and `rerun=failed`, supporting rerun of only the failed steps. --- amorphouspy_api/src/amorphouspy_api/models.py | 10 ++ .../src/amorphouspy_api/routers/jobs.py | 93 +++++++++++++------ .../src/tests/test_meltquench_integration.py | 2 +- 3 files changed, 77 insertions(+), 28 deletions(-) diff --git a/amorphouspy_api/src/amorphouspy_api/models.py b/amorphouspy_api/src/amorphouspy_api/models.py index c14f10f8..b74b696e 100644 --- a/amorphouspy_api/src/amorphouspy_api/models.py +++ b/amorphouspy_api/src/amorphouspy_api/models.py @@ -176,6 +176,16 @@ class JobStatus(StrEnum): CANCELLED = "cancelled" +class RerunMode(StrEnum): + """Controls how executorlib cache is handled on submission.""" + + FAILED = "failed" + """Delete only failed step outputs and re-run them; preserve successful results.""" + + ALL = "all" + """Delete *all* cached outputs and re-run every step from scratch.""" + + # --------------------------------------------------------------------------- # Analysis configurations (discriminated union) # --------------------------------------------------------------------------- diff --git a/amorphouspy_api/src/amorphouspy_api/routers/jobs.py b/amorphouspy_api/src/amorphouspy_api/routers/jobs.py index 5a907e8c..74ba0892 100644 --- a/amorphouspy_api/src/amorphouspy_api/routers/jobs.py +++ b/amorphouspy_api/src/amorphouspy_api/routers/jobs.py @@ -45,6 +45,7 @@ JobStatus, JobStatusResponse, JobSubmission, + RerunMode, TagsResponse, TagsUpdate, _job_urls, @@ -73,17 +74,44 @@ # --------------------------------------------------------------------------- -def _clear_executor_cache(request_hash: str) -> None: - """Delete executorlib HDF5 cache files for *request_hash* (all steps + merge).""" +def _clear_executor_cache(request_hash: str, *, failed_only: bool = False) -> None: + """Delete executorlib HDF5 cache files for *request_hash*. + + Args: + request_hash: The cache key prefix. + failed_only: When ``True``, only delete ``_o.h5`` files whose stored + result is an error (plus the corresponding ``_i.h5``). Successful + outputs are preserved so executorlib can reuse them. + """ from amorphouspy_api.config import MELTQUENCH_PROJECT_DIR cache_dir = Path(MELTQUENCH_PROJECT_DIR) if not cache_dir.is_dir(): return - # Matches: {hash}_i.h5, {hash}_o.h5, {hash}_{step}_i.h5, {hash}_{step}_o.h5 - for f in cache_dir.glob(f"{request_hash}*.h5"): - logger.info("Removing cached file %s (force re-run)", f.name) + + if not failed_only: + for f in cache_dir.glob(f"{request_hash}*.h5"): + logger.info("Removing cached file %s (force re-run)", f.name) + f.unlink(missing_ok=True) + return + + # Selective cleanup: only remove output files that contain errors. + import h5py + + for f in sorted(cache_dir.glob(f"{request_hash}*_o.h5")): + try: + with h5py.File(f, "r") as hdf: + if "output" in hdf: + continue # successful — keep it + except Exception: + logger.debug("Could not read %s, treating as failed", f.name) + logger.info("Removing failed cache file %s (retry)", f.name) f.unlink(missing_ok=True) + # Also remove the matching _i.h5 so executorlib re-submits this step + i_file = f.with_name(f.name.replace("_o.h5", "_i.h5")) + if i_file.exists(): + logger.info("Removing input file %s (retry)", i_file.name) + i_file.unlink(missing_ok=True) def _composition_elements(composition: dict[str, float]) -> set[str]: @@ -140,6 +168,25 @@ def _validate_or_select_potential( ) +def _return_cached_job(cached: Job, submission: JobSubmission, store) -> JobCreatedResponse: + """Return an already-completed job, merging any new tags.""" + logger.info("Returning cached job %s", cached.job_id) + if submission.tags: + existing = set(cached.tags or []) + merged = sorted(existing | set(submission.tags)) + if merged != sorted(existing): + store.update_job(cached.job_id, tags=merged) + return JobCreatedResponse( + id=cached.job_id, + status=JobStatus(cached.status), + composition=Composition.from_canonical(cached.composition), + potential=cached.potential, + tags=sorted(set(cached.tags or []) | set(submission.tags)), + created_at=(cached.created_at.isoformat() if cached.created_at else _iso_now()), + urls=_job_urls(cached.job_id), + ) + + # --------------------------------------------------------------------------- # Endpoints # --------------------------------------------------------------------------- @@ -148,13 +195,18 @@ def _validate_or_select_potential( @router.post("", response_model=JobCreatedResponse, dependencies=[Depends(verify_token)]) def submit_job( submission: JobSubmission, - force: Annotated[bool, Query(description="Skip cache and force a fresh run")] = False, + rerun: Annotated[ + RerunMode | None, + Query( + description=("Skip and delete cache for this job ('all' - all steps, 'failed' - failed steps only). "), + ), + ] = None, ) -> JobCreatedResponse: """Submit a new simulation job. The server resolves the dependency DAG internally. If an identical job already completed, the cached result is returned - unless ``force=true`` is set. + by default. """ # Validate and normalise composition so downstream code always sees # fractions that sum to exactly 1.0 (the pipeline uses the default @@ -192,30 +244,17 @@ def submit_job( norm_comp = submission.composition.canonical req_hash = _job_hash(submission, norm_comp) - # Check for cached result (skipped when force=True) - if not force: + # Check for cached result (skipped when rerun is set) + if rerun is None: cached = store.find_completed_by_hash(req_hash) if cached: - logger.info("Returning cached job %s", cached.job_id) - # Merge any new tags into the cached job - if submission.tags: - existing = set(cached.tags or []) - merged = sorted(existing | set(submission.tags)) - if merged != sorted(existing): - store.update_job(cached.job_id, tags=merged) - return JobCreatedResponse( - id=cached.job_id, - status=JobStatus(cached.status), - composition=Composition.from_canonical(cached.composition), - potential=cached.potential, - tags=sorted(set(cached.tags or []) | set(submission.tags)), - created_at=(cached.created_at.isoformat() if cached.created_at else _iso_now()), - urls=_job_urls(cached.job_id), - ) + return _return_cached_job(cached, submission, store) - # When forcing, remove stale executor cache files so executorlib runs fresh - if force: + # Clear executor cache according to the requested rerun mode + if rerun is RerunMode.ALL: _clear_executor_cache(req_hash) + elif rerun is RerunMode.FAILED: + _clear_executor_cache(req_hash, failed_only=True) # Create new job record job_id = str(uuid4()) diff --git a/amorphouspy_api/src/tests/test_meltquench_integration.py b/amorphouspy_api/src/tests/test_meltquench_integration.py index 4431d1da..1be222bf 100644 --- a/amorphouspy_api/src/tests/test_meltquench_integration.py +++ b/amorphouspy_api/src/tests/test_meltquench_integration.py @@ -61,7 +61,7 @@ def test_jobs_api_integration() -> None: "analyses": [{"type": "structure_characterization"}], } logger.info("Submitting job with fast rates: quench_rate=%s", payload["simulation"]["quench_rate"]) - r = requests.post(f"{api_url}/jobs?force=true", json=payload, timeout=200) + r = requests.post(f"{api_url}/jobs?rerun=all", json=payload, timeout=200) r.raise_for_status() data = r.json() assert "id" in data