Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions amorphouspy_api/src/amorphouspy_api/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,16 @@ class JobStatus(StrEnum):
CANCELLED = "cancelled"


class RerunMode(StrEnum):
"""Controls how executorlib cache is handled on submission."""

FAILED = "failed"
"""Delete only failed step outputs and re-run them; preserve successful results."""

ALL = "all"
"""Delete *all* cached outputs and re-run every step from scratch."""


# ---------------------------------------------------------------------------
# Analysis configurations (discriminated union)
# ---------------------------------------------------------------------------
Expand Down
93 changes: 66 additions & 27 deletions amorphouspy_api/src/amorphouspy_api/routers/jobs.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
JobStatus,
JobStatusResponse,
JobSubmission,
RerunMode,
TagsResponse,
TagsUpdate,
_job_urls,
Expand Down Expand Up @@ -73,17 +74,44 @@
# ---------------------------------------------------------------------------


def _clear_executor_cache(request_hash: str) -> None:
"""Delete executorlib HDF5 cache files for *request_hash* (all steps + merge)."""
def _clear_executor_cache(request_hash: str, *, failed_only: bool = False) -> None:
"""Delete executorlib HDF5 cache files for *request_hash*.

Args:
request_hash: The cache key prefix.
failed_only: When ``True``, only delete ``_o.h5`` files whose stored
result is an error (plus the corresponding ``_i.h5``). Successful
outputs are preserved so executorlib can reuse them.
"""
from amorphouspy_api.config import MELTQUENCH_PROJECT_DIR

cache_dir = Path(MELTQUENCH_PROJECT_DIR)
if not cache_dir.is_dir():
return
# Matches: {hash}_i.h5, {hash}_o.h5, {hash}_{step}_i.h5, {hash}_{step}_o.h5
for f in cache_dir.glob(f"{request_hash}*.h5"):
logger.info("Removing cached file %s (force re-run)", f.name)

if not failed_only:
for f in cache_dir.glob(f"{request_hash}*.h5"):
logger.info("Removing cached file %s (force re-run)", f.name)
f.unlink(missing_ok=True)
return

# Selective cleanup: only remove output files that contain errors.
import h5py

for f in sorted(cache_dir.glob(f"{request_hash}*_o.h5")):
try:
with h5py.File(f, "r") as hdf:
if "output" in hdf:
continue # successful — keep it
except Exception:
logger.debug("Could not read %s, treating as failed", f.name)
logger.info("Removing failed cache file %s (retry)", f.name)
f.unlink(missing_ok=True)
# Also remove the matching _i.h5 so executorlib re-submits this step
i_file = f.with_name(f.name.replace("_o.h5", "_i.h5"))
if i_file.exists():
logger.info("Removing input file %s (retry)", i_file.name)
i_file.unlink(missing_ok=True)


def _composition_elements(composition: dict[str, float]) -> set[str]:
Expand Down Expand Up @@ -140,6 +168,25 @@ def _validate_or_select_potential(
)


def _return_cached_job(cached: Job, submission: JobSubmission, store) -> JobCreatedResponse:
"""Return an already-completed job, merging any new tags."""
logger.info("Returning cached job %s", cached.job_id)
if submission.tags:
existing = set(cached.tags or [])
merged = sorted(existing | set(submission.tags))
if merged != sorted(existing):
store.update_job(cached.job_id, tags=merged)
return JobCreatedResponse(
id=cached.job_id,
status=JobStatus(cached.status),
composition=Composition.from_canonical(cached.composition),
potential=cached.potential,
tags=sorted(set(cached.tags or []) | set(submission.tags)),
created_at=(cached.created_at.isoformat() if cached.created_at else _iso_now()),
urls=_job_urls(cached.job_id),
)


# ---------------------------------------------------------------------------
# Endpoints
# ---------------------------------------------------------------------------
Expand All @@ -148,13 +195,18 @@ def _validate_or_select_potential(
@router.post("", response_model=JobCreatedResponse, dependencies=[Depends(verify_token)])
def submit_job(
submission: JobSubmission,
force: Annotated[bool, Query(description="Skip cache and force a fresh run")] = False,
rerun: Annotated[
RerunMode | None,
Query(
description=("Skip and delete cache for this job ('all' - all steps, 'failed' - failed steps only). "),
),
] = None,
) -> JobCreatedResponse:
"""Submit a new simulation job.

The server resolves the dependency DAG internally.
If an identical job already completed, the cached result is returned
unless ``force=true`` is set.
by default.
"""
# Validate and normalise composition so downstream code always sees
# fractions that sum to exactly 1.0 (the pipeline uses the default
Expand Down Expand Up @@ -192,30 +244,17 @@ def submit_job(
norm_comp = submission.composition.canonical
req_hash = _job_hash(submission, norm_comp)

# Check for cached result (skipped when force=True)
if not force:
# Check for cached result (skipped when rerun is set)
if rerun is None:
cached = store.find_completed_by_hash(req_hash)
if cached:
logger.info("Returning cached job %s", cached.job_id)
# Merge any new tags into the cached job
if submission.tags:
existing = set(cached.tags or [])
merged = sorted(existing | set(submission.tags))
if merged != sorted(existing):
store.update_job(cached.job_id, tags=merged)
return JobCreatedResponse(
id=cached.job_id,
status=JobStatus(cached.status),
composition=Composition.from_canonical(cached.composition),
potential=cached.potential,
tags=sorted(set(cached.tags or []) | set(submission.tags)),
created_at=(cached.created_at.isoformat() if cached.created_at else _iso_now()),
urls=_job_urls(cached.job_id),
)
return _return_cached_job(cached, submission, store)

# When forcing, remove stale executor cache files so executorlib runs fresh
if force:
# Clear executor cache according to the requested rerun mode
if rerun is RerunMode.ALL:
_clear_executor_cache(req_hash)
elif rerun is RerunMode.FAILED:
_clear_executor_cache(req_hash, failed_only=True)

# Create new job record
job_id = str(uuid4())
Expand Down
2 changes: 1 addition & 1 deletion amorphouspy_api/src/tests/test_meltquench_integration.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ def test_jobs_api_integration() -> None:
"analyses": [{"type": "structure_characterization"}],
}
logger.info("Submitting job with fast rates: quench_rate=%s", payload["simulation"]["quench_rate"])
r = requests.post(f"{api_url}/jobs?force=true", json=payload, timeout=200)
r = requests.post(f"{api_url}/jobs?rerun=all", json=payload, timeout=200)
r.raise_for_status()
data = r.json()
assert "id" in data
Expand Down
Loading