From 42fbb32bf63c8995489c5e5c522d93fb95f2c7bd Mon Sep 17 00:00:00 2001 From: joshbouncesecurity Date: Thu, 14 May 2026 10:42:56 +0300 Subject: [PATCH 1/3] fix: atomic JSON writes for pipeline outputs --- libs/openant-core/core/analyzer.py | 3 +- libs/openant-core/core/enhancer.py | 3 +- libs/openant-core/core/verifier.py | 3 +- libs/openant-core/tests/test_atomic_io.py | 162 ++++++++++++++++++++++ libs/openant-core/utilities/atomic_io.py | 70 ++++++++++ 5 files changed, 238 insertions(+), 3 deletions(-) create mode 100644 libs/openant-core/tests/test_atomic_io.py create mode 100644 libs/openant-core/utilities/atomic_io.py diff --git a/libs/openant-core/core/analyzer.py b/libs/openant-core/core/analyzer.py index f8255f1..85d9561 100644 --- a/libs/openant-core/core/analyzer.py +++ b/libs/openant-core/core/analyzer.py @@ -27,6 +27,7 @@ # Import existing analysis machinery from utilities.llm_client import AnthropicClient, get_global_tracker +from utilities.atomic_io import atomic_write_json from utilities.file_io import read_json, write_json from utilities.json_corrector import JSONCorrector from utilities.rate_limiter import get_rate_limiter, is_rate_limit_error, is_retryable_error @@ -512,7 +513,7 @@ def _summary_callback(finding, usage=None): "code_by_route": code_by_route, } - write_json(results_path, experiment_result) + atomic_write_json(results_path, experiment_result) print(f"\n[Analyze] Results written to {results_path}", file=sys.stderr) # Checkpoints are preserved as a permanent artifact alongside results. diff --git a/libs/openant-core/core/enhancer.py b/libs/openant-core/core/enhancer.py index 70879b8..41a73ce 100644 --- a/libs/openant-core/core/enhancer.py +++ b/libs/openant-core/core/enhancer.py @@ -17,6 +17,7 @@ from core import tracking from core.progress import ProgressReporter from utilities.rate_limiter import configure_rate_limiter +from utilities.atomic_io import atomic_write_json from utilities.file_io import read_json, write_json @@ -137,7 +138,7 @@ def _on_restored(count: int): # Write enhanced dataset os.makedirs(os.path.dirname(os.path.abspath(output_path)), exist_ok=True) - write_json(output_path, enhanced) + atomic_write_json(output_path, enhanced) print(f"[Enhance] Enhanced dataset: {output_path}", file=sys.stderr) print(f"[Enhance] Classifications: {classifications}", file=sys.stderr) if error_count: diff --git a/libs/openant-core/core/verifier.py b/libs/openant-core/core/verifier.py index 705ca4a..c166a25 100644 --- a/libs/openant-core/core/verifier.py +++ b/libs/openant-core/core/verifier.py @@ -20,6 +20,7 @@ from core.progress import ProgressReporter from utilities.llm_client import TokenTracker, get_global_tracker +from utilities.atomic_io import atomic_write_json from utilities.file_io import read_json, write_json from utilities.finding_verifier import FindingVerifier from utilities.agentic_enhancer.repository_index import load_index_from_file @@ -267,7 +268,7 @@ def _write_verified_results( output["metrics"] = {"total": len(merged_results), **counts} - write_json(path, output, ensure_ascii=False) + atomic_write_json(path, output, ensure_ascii=False) def _build_code_by_route(results: list) -> dict: """Build code_by_route from result entries (fallback).""" code_by_route = {} diff --git a/libs/openant-core/tests/test_atomic_io.py b/libs/openant-core/tests/test_atomic_io.py new file mode 100644 index 0000000..a36029b --- /dev/null +++ b/libs/openant-core/tests/test_atomic_io.py @@ -0,0 +1,162 @@ +"""Tests for ``utilities.atomic_io.atomic_write_json``. + +These exercise the three properties that justify the helper's existence: + +1. Round-trip: a written dict reads back identically. +2. Atomicity: a mid-write failure leaves the *previous* file untouched — + no truncated/empty target on disk. +3. Same-directory temp file: required for ``os.replace`` to be atomic on + Windows (cross-volume rename falls back to copy+delete and loses the + atomicity guarantee). +""" + +from __future__ import annotations + +import json +import os +from pathlib import Path + +import pytest + +from utilities.atomic_io import atomic_write_json + + +def test_roundtrip_writes_and_reads(tmp_path: Path): + """Writing a dict and reading it back yields the same content.""" + target = tmp_path / "results.json" + data = { + "dataset": "geospatial_vuln12", + "metrics": {"total": 3, "vulnerable": 1}, + "results": [{"id": "u1", "verdict": "VULNERABLE"}], + } + + atomic_write_json(str(target), data) + + assert target.exists() + with open(target, encoding="utf-8") as f: + assert json.load(f) == data + + +def test_unicode_roundtrip_with_ensure_ascii_false(tmp_path: Path): + """ensure_ascii=False (used by verifier) preserves non-ASCII chars.""" + target = tmp_path / "results_verified.json" + data = {"note": "résumé — naïve café ☃"} + + atomic_write_json(str(target), data, ensure_ascii=False) + + text = target.read_text(encoding="utf-8") + # Raw UTF-8, not \u-escaped: + assert "résumé" in text + assert json.loads(text) == data + + +def test_failure_mid_write_preserves_existing_file(tmp_path: Path, monkeypatch): + """If json.dump raises mid-write, the previous target file is intact.""" + target = tmp_path / "results.json" + original = {"version": 1, "stable": True} + atomic_write_json(str(target), original) + + # Sanity check: original content is on disk before the failed write. + assert json.loads(target.read_text(encoding="utf-8")) == original + + # Force json.dump to blow up partway through. Because atomic_write_json + # writes to a temp file in the same directory and only os.replaces on + # success, the existing target must be untouched. + real_dump = json.dump + + def exploding_dump(*args, **kwargs): + # Write a few bytes first to prove that *temp* file partial writes + # don't reach the target — then raise. + f = args[1] + f.write('{"corru') + raise RuntimeError("simulated mid-write crash") + + monkeypatch.setattr("utilities.atomic_io.json.dump", exploding_dump) + + with pytest.raises(RuntimeError, match="simulated mid-write crash"): + atomic_write_json(str(target), {"version": 2, "broken": True}) + + # Restore so subsequent assertions/teardown aren't affected. + monkeypatch.setattr("utilities.atomic_io.json.dump", real_dump) + + # Target is unchanged: still the original content, still valid JSON. + assert target.exists() + assert json.loads(target.read_text(encoding="utf-8")) == original + + +def test_failure_cleans_up_temp_file(tmp_path: Path, monkeypatch): + """Failed writes must not leave stray ``.tmp-`` files behind.""" + target = tmp_path / "results.json" + + def exploding_dump(*args, **kwargs): + raise RuntimeError("boom") + + monkeypatch.setattr("utilities.atomic_io.json.dump", exploding_dump) + + with pytest.raises(RuntimeError): + atomic_write_json(str(target), {"x": 1}) + + leftovers = [p.name for p in tmp_path.iterdir()] + assert leftovers == [], f"expected empty dir, found: {leftovers}" + + +def test_failure_with_no_existing_target_does_not_create_target( + tmp_path: Path, monkeypatch, +): + """If no target exists and the write fails, target must not appear.""" + target = tmp_path / "results.json" + assert not target.exists() + + def exploding_dump(*args, **kwargs): + raise RuntimeError("boom") + + monkeypatch.setattr("utilities.atomic_io.json.dump", exploding_dump) + + with pytest.raises(RuntimeError): + atomic_write_json(str(target), {"x": 1}) + + assert not target.exists() + + +def test_temp_file_is_in_same_directory_as_target(tmp_path: Path, monkeypatch): + """Temp file must live in the same directory as the target. + + Cross-volume renames are not atomic on Windows; ``os.replace`` falls back + to copy+delete, so this is load-bearing for the atomicity guarantee. + """ + target_dir = tmp_path / "outputs" + target_dir.mkdir() + target = target_dir / "results.json" + + captured: dict[str, str] = {} + real_mkstemp = __import__("tempfile").mkstemp + + def spying_mkstemp(*args, **kwargs): + fd, path = real_mkstemp(*args, **kwargs) + captured["path"] = path + captured["dir_kwarg"] = kwargs.get("dir", "") + return fd, path + + monkeypatch.setattr("utilities.atomic_io.tempfile.mkstemp", spying_mkstemp) + + atomic_write_json(str(target), {"ok": True}) + + assert "path" in captured, "tempfile.mkstemp was not called" + tmp_dir = os.path.dirname(captured["path"]) + expected_dir = os.path.abspath(str(target_dir)) + assert os.path.abspath(tmp_dir) == expected_dir, ( + f"temp file in {tmp_dir!r}, expected {expected_dir!r}" + ) + # And the helper passed the same dir explicitly to mkstemp. + assert os.path.abspath(captured["dir_kwarg"]) == expected_dir + + +def test_overwrites_existing_file(tmp_path: Path): + """Successful second write replaces the target's content.""" + target = tmp_path / "results.json" + atomic_write_json(str(target), {"version": 1}) + atomic_write_json(str(target), {"version": 2, "extra": [1, 2, 3]}) + + assert json.loads(target.read_text(encoding="utf-8")) == { + "version": 2, "extra": [1, 2, 3], + } diff --git a/libs/openant-core/utilities/atomic_io.py b/libs/openant-core/utilities/atomic_io.py new file mode 100644 index 0000000..d691637 --- /dev/null +++ b/libs/openant-core/utilities/atomic_io.py @@ -0,0 +1,70 @@ +"""Atomic JSON file writes. + +Pipeline output files (``results.json``, ``enhanced_dataset.json``, +``results_verified.json``) represent the final artefact of an expensive +multi-stage LLM pipeline. A crash, power loss, or interrupted process during +``json.dump`` can leave the target file truncated or corrupt, destroying the +output of a long-running run that may have cost real money. + +``atomic_write_json`` writes the JSON to a temporary file in the **same +directory** as the target, ``fsync``s it, then ``os.replace``s it onto the +target path. ``os.replace`` is atomic on POSIX and on Windows (when both paths +sit on the same volume), so concurrent readers either see the previous file +intact or the fully-written new file — never a partial write. + +The same-directory requirement matters: cross-device renames fall back to +copy+delete on most platforms (and fail outright on Windows), losing +atomicity. Callers should never pass a temp dir on a different volume. +""" + +from __future__ import annotations + +import json +import os +import tempfile +from typing import Any + + +def atomic_write_json(path: str, data: Any, *, indent: int | None = 2, + ensure_ascii: bool = True) -> None: + """Atomically write ``data`` as JSON to ``path``. + + Writes to a temporary file in the same directory as ``path``, fsyncs, + then ``os.replace``s it onto the target. If any step fails the temp + file is removed and ``path`` is left untouched. + + Args: + path: Destination path. Parent directory must already exist. + data: JSON-serialisable object. + indent: Indentation passed through to ``json.dump`` (default 2). + ensure_ascii: Passed through to ``json.dump`` (default True). + """ + directory = os.path.dirname(os.path.abspath(path)) or "." + + # delete=False so we can close the handle and rename it; we clean up + # manually on error. Same-dir is required for atomic os.replace. + fd, tmp_path = tempfile.mkstemp( + prefix=".tmp-" + os.path.basename(path) + "-", + suffix=".json", + dir=directory, + ) + try: + with os.fdopen(fd, "w", encoding="utf-8") as f: + json.dump(data, f, indent=indent, ensure_ascii=ensure_ascii) + f.flush() + try: + os.fsync(f.fileno()) + except OSError: + # fsync can fail on some filesystems (e.g. certain network + # mounts). The replace below is still atomic at the VFS + # layer; durability across power loss is best-effort. + pass + os.replace(tmp_path, path) + except BaseException: + # Clean up the temp file on any failure (including KeyboardInterrupt + # mid-write) so we don't leave stray .tmp- files in the output dir. + try: + os.unlink(tmp_path) + except OSError: + pass + raise From eefafbdc8cf0c94a9d270284fb12532973e5d1a8 Mon Sep 17 00:00:00 2001 From: joshbouncesecurity Date: Thu, 14 May 2026 10:43:11 +0300 Subject: [PATCH 2/3] fix: preserve umask-derived permissions in atomic_write_json tempfile.mkstemp creates files with mode 0600 (owner-only). After os.replace the target inherits those tightened bits, silently regressing the permissions a plain open(path, "w") would have produced under umask. Restore umask-derived 0666 & ~umask on POSIX (no-op on Windows). Adds a POSIX-gated test pinning the behaviour. --- libs/openant-core/tests/test_atomic_io.py | 22 ++++++++++++++++++++++ libs/openant-core/utilities/atomic_io.py | 14 ++++++++++++++ 2 files changed, 36 insertions(+) diff --git a/libs/openant-core/tests/test_atomic_io.py b/libs/openant-core/tests/test_atomic_io.py index a36029b..ed83e28 100644 --- a/libs/openant-core/tests/test_atomic_io.py +++ b/libs/openant-core/tests/test_atomic_io.py @@ -160,3 +160,25 @@ def test_overwrites_existing_file(tmp_path: Path): assert json.loads(target.read_text(encoding="utf-8")) == { "version": 2, "extra": [1, 2, 3], } + + +@pytest.mark.skipif(os.name != "posix", reason="POSIX permission semantics") +def test_posix_permissions_match_umask(tmp_path: Path): + """The written file honours the process umask, not mkstemp's 0600 default. + + A regression here is silent: pipeline outputs that other users / tools + were previously able to read (under a typical 0022 umask -> 0644 file) + would suddenly become owner-only. We don't want atomic writes to tighten + permissions as a side effect. + """ + target = tmp_path / "results.json" + + old_umask = os.umask(0o022) + try: + atomic_write_json(str(target), {"ok": True}) + finally: + os.umask(old_umask) + + mode = os.stat(target).st_mode & 0o777 + # 0666 & ~0022 == 0644 + assert mode == 0o644, f"expected 0644, got {oct(mode)}" diff --git a/libs/openant-core/utilities/atomic_io.py b/libs/openant-core/utilities/atomic_io.py index d691637..5b4a946 100644 --- a/libs/openant-core/utilities/atomic_io.py +++ b/libs/openant-core/utilities/atomic_io.py @@ -49,6 +49,20 @@ def atomic_write_json(path: str, data: Any, *, indent: int | None = 2, dir=directory, ) try: + # tempfile.mkstemp creates files with mode 0600 (owner-only). A plain + # open(path, "w") honours the process umask (typically yielding 0644). + # Restore that behaviour so atomic writes don't silently tighten + # permissions on pipeline outputs that downstream tools/users read. + # On Windows os.chmod's effect is limited to the readonly bit, so + # this is effectively a no-op there. + if os.name == "posix": + try: + umask = os.umask(0) + os.umask(umask) + os.chmod(tmp_path, 0o666 & ~umask) + except OSError: + pass + with os.fdopen(fd, "w", encoding="utf-8") as f: json.dump(data, f, indent=indent, ensure_ascii=ensure_ascii) f.flush() From f482e09bad4c94a0a8a72713480f833d4b196eaa Mon Sep 17 00:00:00 2001 From: joshbouncesecurity Date: Thu, 14 May 2026 11:31:27 +0300 Subject: [PATCH 3/3] fix: extend atomic writes to parser, reporter, and experiment outputs Applies atomic_write_json to the remaining final pipeline outputs that were still using plain write_json: - core/reporter.py: pipeline_output.json (final report, ensure_ascii=False) - core/parser_adapter.py: dataset.json and analyzer_output.json (parse outputs consumed by subsequent expensive LLM stages) - experiment.py: experiment results (legacy direct-run path) Skipped: checkpoint files (designed for incremental writes), scanner intermediate state, and the diff_filter sidecar report (cheap to regenerate, not consumed downstream). --- libs/openant-core/core/parser_adapter.py | 7 ++++--- libs/openant-core/core/reporter.py | 3 ++- libs/openant-core/experiment.py | 3 ++- 3 files changed, 8 insertions(+), 5 deletions(-) diff --git a/libs/openant-core/core/parser_adapter.py b/libs/openant-core/core/parser_adapter.py index 605450a..94fbf39 100644 --- a/libs/openant-core/core/parser_adapter.py +++ b/libs/openant-core/core/parser_adapter.py @@ -18,6 +18,7 @@ from pathlib import Path from core.schemas import ParseResult +from utilities.atomic_io import atomic_write_json from utilities.file_io import open_utf8, read_json, write_json # Root of openant-core (where parsers/ lives) @@ -173,7 +174,7 @@ def _maybe_apply_diff_filter( stats = apply_diff_filter(units, manifest) - write_json(result.dataset_path, dataset) + atomic_write_json(result.dataset_path, dataset) # Expose stats on the ParseResult via a side-channel file; the parse # step_context reads this when assembling parse.report.json. diff_report_path = os.path.join(output_dir, "diff_filter.report.json") @@ -363,8 +364,8 @@ def _parse_python(repo_path: str, output_dir: str, processing_level: str, skip_t dataset = _apply_reachability_filter(dataset, output_dir, processing_level) # Write outputs - write_json(dataset_path, dataset) - write_json(analyzer_output_path, analyzer_output) + atomic_write_json(dataset_path, dataset) + atomic_write_json(analyzer_output_path, analyzer_output) units_count = len(dataset.get("units", [])) print(f" Python parser complete: {units_count} units", file=sys.stderr) diff --git a/libs/openant-core/core/reporter.py b/libs/openant-core/core/reporter.py index 9536c4d..3fffcf7 100644 --- a/libs/openant-core/core/reporter.py +++ b/libs/openant-core/core/reporter.py @@ -19,6 +19,7 @@ from pathlib import Path from core.schemas import ReportResult +from utilities.atomic_io import atomic_write_json from utilities.file_io import open_utf8, read_json, write_json # Root of openant-core @@ -367,7 +368,7 @@ def build_pipeline_output( print(_banner, file=sys.stderr) os.makedirs(os.path.dirname(os.path.abspath(output_path)), exist_ok=True) - write_json(output_path, pipeline_output, ensure_ascii=False) + atomic_write_json(output_path, pipeline_output, ensure_ascii=False) print(f" pipeline_output.json: {len(findings_data)} findings", file=sys.stderr) print(f" Written to {output_path}", file=sys.stderr) diff --git a/libs/openant-core/experiment.py b/libs/openant-core/experiment.py index 7eb8dda..c92aaa6 100644 --- a/libs/openant-core/experiment.py +++ b/libs/openant-core/experiment.py @@ -35,6 +35,7 @@ from pathlib import Path from utilities.llm_client import AnthropicClient, get_global_tracker +from utilities.atomic_io import atomic_write_json from utilities.file_io import read_json, write_json from prompts.prompt_selector import get_analysis_prompt from prompts.vulnerability_analysis import get_system_prompt as get_stage1_system_prompt @@ -1033,7 +1034,7 @@ def main(): suffix = "" if args.no_enhanced else "_enhanced" output_path = f"experiment_{args.dataset}_{args.model}{suffix}_{datetime.now().strftime('%Y%m%d_%H%M%S')}.json" - write_json(output_path, experiment) + atomic_write_json(output_path, experiment) print() print(f"Results saved to: {output_path}")