From 678df228735b47e0382c765ddb229ec36b784248 Mon Sep 17 00:00:00 2001 From: ProfRandom92 Date: Fri, 22 May 2026 14:32:30 +0200 Subject: [PATCH 1/4] feat: add safe PR gate --- docs/codex_skills/git_pr_workflow.md | 1 + scripts/safe_pr_gate.py | 159 +++++++++++++++++++++++++++ tests/test_safe_pr_gate.py | 108 ++++++++++++++++++ 3 files changed, 268 insertions(+) create mode 100644 scripts/safe_pr_gate.py create mode 100644 tests/test_safe_pr_gate.py diff --git a/docs/codex_skills/git_pr_workflow.md b/docs/codex_skills/git_pr_workflow.md index 538cc63..f41749c 100644 --- a/docs/codex_skills/git_pr_workflow.md +++ b/docs/codex_skills/git_pr_workflow.md @@ -32,6 +32,7 @@ Use for branch setup, syncing from `main`, local commits, staged-file review, pu - Before commit: `git status --short`, `git diff --stat`, and `git diff --cached --name-only`. - Before push: clean `git status --short` and current branch confirmation. - For PRs: include validation commands actually run; do not claim CI status unless checked. +- For agent-assisted pre-push or pre-PR checks, run `python scripts/safe_pr_gate.py` on the current branch before pushing. ## Stop conditions diff --git a/scripts/safe_pr_gate.py b/scripts/safe_pr_gate.py new file mode 100644 index 0000000..cd5e0a1 --- /dev/null +++ b/scripts/safe_pr_gate.py @@ -0,0 +1,159 @@ +"""Deterministic local safety gate for agent-assisted PR workflows.""" + +from __future__ import annotations + +import argparse +import json +import subprocess +import sys +from dataclasses import dataclass +from pathlib import Path +from typing import Any + +REPO_ROOT = Path(__file__).resolve().parents[1] +if str(REPO_ROOT) not in sys.path: + sys.path.insert(0, str(REPO_ROOT)) + + +@dataclass(frozen=True, slots=True) +class GateState: + branch: str + status_short: tuple[str, ...] + changed_paths: tuple[str, ...] + + +@dataclass(frozen=True, slots=True) +class GateResult: + ok: bool + branch: str + allow_dirty: bool + allowed_prefixes: tuple[str, ...] + status_short: tuple[str, ...] + changed_paths: tuple[str, ...] + problems: tuple[str, ...] + + def to_dict(self) -> dict[str, Any]: + return { + "allowed_prefixes": list(self.allowed_prefixes), + "allow_dirty": self.allow_dirty, + "branch": self.branch, + "changed_paths": list(self.changed_paths), + "ok": self.ok, + "problems": list(self.problems), + "result": "PASS" if self.ok else "FAIL", + "status_short": list(self.status_short), + } + + +def _run_git(args: list[str]) -> str: + completed = subprocess.run( + ["git", *args], + cwd=REPO_ROOT, + check=True, + capture_output=True, + text=True, + ) + return completed.stdout + + +def _parse_porcelain_paths(output: str) -> tuple[str, ...]: + if not output: + return () + + entries = output.split("\0") + paths: list[str] = [] + index = 0 + while index < len(entries): + entry = entries[index] + if not entry: + index += 1 + continue + path = entry[3:] + status = entry[:2] + if status and status[0] in {"R", "C"}: + if index + 1 < len(entries) and entries[index + 1]: + paths.append(entries[index + 1]) + index += 2 + continue + paths.append(path) + index += 1 + return tuple(paths) + + +def collect_gate_state() -> GateState: + branch = _run_git(["branch", "--show-current"]).strip() + status_short_output = _run_git(["status", "--short", "--untracked-files=all"]) + porcelain_output = _run_git(["status", "--porcelain=v1", "-z"]) + status_short = tuple(line for line in status_short_output.splitlines() if line) + changed_paths = _parse_porcelain_paths(porcelain_output) + return GateState(branch=branch, status_short=status_short, changed_paths=changed_paths) + + +def _path_in_prefix(path: str, prefix: str) -> bool: + normalized = prefix.rstrip("/") + return path == normalized or path.startswith(normalized + "/") + + +def evaluate_gate( + state: GateState, + *, + allow_dirty: bool = False, + allowed_prefixes: tuple[str, ...] = (), +) -> GateResult: + problems: list[str] = [] + + if not state.branch: + problems.append("detached_head") + elif state.branch == "main": + problems.append("on_main_branch") + + if state.status_short and not allow_dirty: + problems.append("dirty_working_tree") + + if allowed_prefixes: + disallowed_paths = sorted( + path + for path in state.changed_paths + if not any(_path_in_prefix(path, prefix) for prefix in allowed_prefixes) + ) + if disallowed_paths: + problems.append("changed_files_outside_allowed_prefixes") + problems.extend(f"outside_prefix:{path}" for path in disallowed_paths) + + return GateResult( + ok=not problems, + branch=state.branch, + allow_dirty=allow_dirty, + allowed_prefixes=tuple(sorted(allowed_prefixes)), + status_short=state.status_short, + changed_paths=state.changed_paths, + problems=tuple(problems), + ) + + +def _parse_args(argv: list[str]) -> argparse.Namespace: + parser = argparse.ArgumentParser(description="Run a deterministic local safety gate for PR-ready agent work.") + parser.add_argument("--allow-dirty", action="store_true", help="Permit a dirty working tree while still checking allowed prefixes.") + parser.add_argument( + "--allowed-prefix", + action="append", + default=[], + help="Require changed paths to stay within this repo-relative prefix. May be repeated.", + ) + return parser.parse_args(argv) + + +def main(argv: list[str] | None = None) -> int: + args = _parse_args(sys.argv[1:] if argv is None else argv) + state = collect_gate_state() + result = evaluate_gate( + state, + allow_dirty=args.allow_dirty, + allowed_prefixes=tuple(args.allowed_prefix), + ) + sys.stdout.write(json.dumps(result.to_dict(), indent=2, sort_keys=True) + "\n") + return 0 if result.ok else 1 + + +if __name__ == "__main__": + raise SystemExit(main()) diff --git a/tests/test_safe_pr_gate.py b/tests/test_safe_pr_gate.py new file mode 100644 index 0000000..3fb5cf6 --- /dev/null +++ b/tests/test_safe_pr_gate.py @@ -0,0 +1,108 @@ +from __future__ import annotations + +import json +import subprocess +import sys +from pathlib import Path + +from scripts.safe_pr_gate import GateState, evaluate_gate + + +REPO_ROOT = Path(__file__).resolve().parents[1] +SCRIPT_PATH = Path("scripts/safe_pr_gate.py") + + +def _run_gate(*args: str) -> subprocess.CompletedProcess[str]: + return subprocess.run( + [sys.executable, str(SCRIPT_PATH), *args], + check=False, + capture_output=True, + text=True, + ) + + +def test_evaluate_gate_passes_on_clean_feature_branch_state() -> None: + result = evaluate_gate( + GateState(branch="feat/safe-pr-gate", status_short=(), changed_paths=()), + allowed_prefixes=("scripts/",), + ) + + assert result.ok is True + assert result.problems == () + assert result.to_dict() == { + "allowed_prefixes": ["scripts/"], + "allow_dirty": False, + "branch": "feat/safe-pr-gate", + "changed_paths": [], + "ok": True, + "problems": [], + "result": "PASS", + "status_short": [], + } + + +def test_evaluate_gate_fails_on_main_branch() -> None: + result = evaluate_gate(GateState(branch="main", status_short=(), changed_paths=())) + + assert result.ok is False + assert result.problems == ("on_main_branch",) + + +def test_evaluate_gate_fails_on_dirty_tree_without_allow_dirty() -> None: + result = evaluate_gate( + GateState(branch="feat/safe-pr-gate", status_short=(" M scripts/example.py",), changed_paths=("scripts/example.py",)) + ) + + assert result.ok is False + assert result.problems == ("dirty_working_tree",) + + +def test_evaluate_gate_flags_paths_outside_allowed_prefixes() -> None: + result = evaluate_gate( + GateState(branch="feat/safe-pr-gate", status_short=(" M docs/example.md",), changed_paths=("docs/example.md",)), + allow_dirty=True, + allowed_prefixes=("scripts/",), + ) + + assert result.ok is False + assert result.problems == ("changed_files_outside_allowed_prefixes", "outside_prefix:docs/example.md") + + +def test_cli_pass_and_fail_outputs_are_deterministic() -> None: + passing = _run_gate( + "--allow-dirty", + "--allowed-prefix", + "docs/", + "--allowed-prefix", + "scripts/", + "--allowed-prefix", + "tests/", + ) + dirty_path = REPO_ROOT / "_safe_pr_gate_dirty_test.tmp" + dirty_path.write_text("dirty\n", encoding="utf-8") + try: + failing = _run_gate( + "--allow-dirty", + "--allowed-prefix", + "docs/", + "--allowed-prefix", + "scripts/", + "--allowed-prefix", + "tests/", + ) + finally: + dirty_path.unlink(missing_ok=True) + + assert passing.returncode == 0 + assert failing.returncode == 1 + + passing_payload = json.loads(passing.stdout) + failing_payload = json.loads(failing.stdout) + + assert passing_payload["result"] == "PASS" + assert passing_payload["allow_dirty"] is True + assert failing_payload["result"] == "FAIL" + assert failing_payload["problems"] == [ + "changed_files_outside_allowed_prefixes", + "outside_prefix:_safe_pr_gate_dirty_test.tmp", + ] From a9b931cefb82def6044b4eb2157e8d91f9b0961e Mon Sep 17 00:00:00 2001 From: ProfRandom92 Date: Fri, 22 May 2026 14:52:12 +0200 Subject: [PATCH 2/4] fix: allow safe PR gate in detached CI checkouts --- scripts/safe_pr_gate.py | 4 +--- tests/test_safe_pr_gate.py | 7 +++++++ 2 files changed, 8 insertions(+), 3 deletions(-) diff --git a/scripts/safe_pr_gate.py b/scripts/safe_pr_gate.py index cd5e0a1..af4ea21 100644 --- a/scripts/safe_pr_gate.py +++ b/scripts/safe_pr_gate.py @@ -102,9 +102,7 @@ def evaluate_gate( ) -> GateResult: problems: list[str] = [] - if not state.branch: - problems.append("detached_head") - elif state.branch == "main": + if state.branch == "main": problems.append("on_main_branch") if state.status_short and not allow_dirty: diff --git a/tests/test_safe_pr_gate.py b/tests/test_safe_pr_gate.py index 3fb5cf6..3a02587 100644 --- a/tests/test_safe_pr_gate.py +++ b/tests/test_safe_pr_gate.py @@ -48,6 +48,13 @@ def test_evaluate_gate_fails_on_main_branch() -> None: assert result.problems == ("on_main_branch",) +def test_evaluate_gate_allows_detached_head_state() -> None: + result = evaluate_gate(GateState(branch="", status_short=(), changed_paths=())) + + assert result.ok is True + assert result.problems == () + + def test_evaluate_gate_fails_on_dirty_tree_without_allow_dirty() -> None: result = evaluate_gate( GateState(branch="feat/safe-pr-gate", status_short=(" M scripts/example.py",), changed_paths=("scripts/example.py",)) From 19f5ce8bd4aa882a70fa980b31d6eef9906fea7d Mon Sep 17 00:00:00 2001 From: ProfRandom92 Date: Fri, 22 May 2026 15:04:31 +0200 Subject: [PATCH 3/4] test: isolate safe PR gate CLI checks --- tests/test_safe_pr_gate.py | 54 +++++++++++++++++++++++++------------- 1 file changed, 36 insertions(+), 18 deletions(-) diff --git a/tests/test_safe_pr_gate.py b/tests/test_safe_pr_gate.py index 3a02587..a81a383 100644 --- a/tests/test_safe_pr_gate.py +++ b/tests/test_safe_pr_gate.py @@ -1,26 +1,41 @@ from __future__ import annotations import json +import shutil import subprocess import sys +import tempfile +from contextlib import contextmanager from pathlib import Path from scripts.safe_pr_gate import GateState, evaluate_gate REPO_ROOT = Path(__file__).resolve().parents[1] -SCRIPT_PATH = Path("scripts/safe_pr_gate.py") -def _run_gate(*args: str) -> subprocess.CompletedProcess[str]: +def _run_gate(*args: str, cwd: Path | None = None) -> subprocess.CompletedProcess[str]: return subprocess.run( - [sys.executable, str(SCRIPT_PATH), *args], + [sys.executable, "scripts/safe_pr_gate.py", *args], check=False, capture_output=True, text=True, + cwd=cwd, ) +@contextmanager +def _temporary_git_repo(branch: str): + with tempfile.TemporaryDirectory() as temp_dir: + repo_root = Path(temp_dir) + scripts_dir = repo_root / "scripts" + scripts_dir.mkdir() + shutil.copy2(REPO_ROOT / "scripts" / "safe_pr_gate.py", scripts_dir / "safe_pr_gate.py") + subprocess.run(["git", "init"], cwd=repo_root, check=True, capture_output=True, text=True) + subprocess.run(["git", "checkout", "-b", branch], cwd=repo_root, check=True, capture_output=True, text=True) + yield repo_root + + def test_evaluate_gate_passes_on_clean_feature_branch_state() -> None: result = evaluate_gate( GateState(branch="feat/safe-pr-gate", status_short=(), changed_paths=()), @@ -76,19 +91,8 @@ def test_evaluate_gate_flags_paths_outside_allowed_prefixes() -> None: def test_cli_pass_and_fail_outputs_are_deterministic() -> None: - passing = _run_gate( - "--allow-dirty", - "--allowed-prefix", - "docs/", - "--allowed-prefix", - "scripts/", - "--allowed-prefix", - "tests/", - ) - dirty_path = REPO_ROOT / "_safe_pr_gate_dirty_test.tmp" - dirty_path.write_text("dirty\n", encoding="utf-8") - try: - failing = _run_gate( + with _temporary_git_repo("feat/safe-pr-gate") as repo_root: + passing = _run_gate( "--allow-dirty", "--allowed-prefix", "docs/", @@ -96,9 +100,23 @@ def test_cli_pass_and_fail_outputs_are_deterministic() -> None: "scripts/", "--allowed-prefix", "tests/", + cwd=repo_root, ) - finally: - dirty_path.unlink(missing_ok=True) + dirty_path = repo_root / "_safe_pr_gate_dirty_test.tmp" + dirty_path.write_text("dirty\n", encoding="utf-8") + try: + failing = _run_gate( + "--allow-dirty", + "--allowed-prefix", + "docs/", + "--allowed-prefix", + "scripts/", + "--allowed-prefix", + "tests/", + cwd=repo_root, + ) + finally: + dirty_path.unlink(missing_ok=True) assert passing.returncode == 0 assert failing.returncode == 1 From f3e8d7592acf95928a153fd380b87f79dd2cd871 Mon Sep 17 00:00:00 2001 From: ProfRandom92 Date: Fri, 22 May 2026 15:12:40 +0200 Subject: [PATCH 4/4] fix: harden safe PR gate git parsing --- scripts/safe_pr_gate.py | 54 +++++++++++++++++++++++++++----------- tests/test_safe_pr_gate.py | 43 +++++++++++++++++++++++++++++- 2 files changed, 80 insertions(+), 17 deletions(-) diff --git a/scripts/safe_pr_gate.py b/scripts/safe_pr_gate.py index af4ea21..ca23858 100644 --- a/scripts/safe_pr_gate.py +++ b/scripts/safe_pr_gate.py @@ -46,13 +46,20 @@ def to_dict(self) -> dict[str, Any]: def _run_git(args: list[str]) -> str: - completed = subprocess.run( - ["git", *args], - cwd=REPO_ROOT, - check=True, - capture_output=True, - text=True, - ) + try: + completed = subprocess.run( + ["git", *args], + cwd=REPO_ROOT, + check=True, + capture_output=True, + text=True, + ) + except FileNotFoundError as exc: + raise RuntimeError(f"git executable not found while running: git {' '.join(args)}") from exc + except subprocess.CalledProcessError as exc: + raise RuntimeError( + f"git command failed with exit code {exc.returncode}: git {' '.join(args)}" + ) from exc return completed.stdout @@ -70,7 +77,7 @@ def _parse_porcelain_paths(output: str) -> tuple[str, ...]: continue path = entry[3:] status = entry[:2] - if status and status[0] in {"R", "C"}: + if status and any(char in {"R", "C"} for char in status): if index + 1 < len(entries) and entries[index + 1]: paths.append(entries[index + 1]) index += 2 @@ -141,16 +148,31 @@ def _parse_args(argv: list[str]) -> argparse.Namespace: return parser.parse_args(argv) +def _error_response(exc: RuntimeError) -> dict[str, Any]: + return { + "error": { + "message": str(exc), + "type": exc.__class__.__name__, + }, + "ok": False, + "result": "ERROR", + } + + def main(argv: list[str] | None = None) -> int: args = _parse_args(sys.argv[1:] if argv is None else argv) - state = collect_gate_state() - result = evaluate_gate( - state, - allow_dirty=args.allow_dirty, - allowed_prefixes=tuple(args.allowed_prefix), - ) - sys.stdout.write(json.dumps(result.to_dict(), indent=2, sort_keys=True) + "\n") - return 0 if result.ok else 1 + try: + state = collect_gate_state() + result = evaluate_gate( + state, + allow_dirty=args.allow_dirty, + allowed_prefixes=tuple(args.allowed_prefix), + ) + sys.stdout.write(json.dumps(result.to_dict(), indent=2, sort_keys=True) + "\n") + return 0 if result.ok else 1 + except RuntimeError as exc: + sys.stdout.write(json.dumps(_error_response(exc), indent=2, sort_keys=True) + "\n") + return 1 if __name__ == "__main__": diff --git a/tests/test_safe_pr_gate.py b/tests/test_safe_pr_gate.py index a81a383..b22225c 100644 --- a/tests/test_safe_pr_gate.py +++ b/tests/test_safe_pr_gate.py @@ -8,7 +8,10 @@ from contextlib import contextmanager from pathlib import Path -from scripts.safe_pr_gate import GateState, evaluate_gate +import pytest + +import scripts.safe_pr_gate as safe_pr_gate +from scripts.safe_pr_gate import GateState, _parse_porcelain_paths, evaluate_gate REPO_ROOT = Path(__file__).resolve().parents[1] @@ -90,6 +93,44 @@ def test_evaluate_gate_flags_paths_outside_allowed_prefixes() -> None: assert result.problems == ("changed_files_outside_allowed_prefixes", "outside_prefix:docs/example.md") +def test_parse_porcelain_paths_handles_rename_status_in_second_position() -> None: + assert _parse_porcelain_paths(" R old-name.txt\0new-name.txt\0") == ("new-name.txt",) + + +@pytest.mark.parametrize( + ("raised", "expected_message"), + [ + (subprocess.CalledProcessError(1, ["git", "status", "--short"]), "git command failed with exit code 1: git status --short"), + (FileNotFoundError(), "git executable not found while running: git status --short"), + ], +) +def test_run_git_wraps_git_subprocess_failures(monkeypatch: pytest.MonkeyPatch, raised: BaseException, expected_message: str) -> None: + def fake_run(*args: object, **kwargs: object) -> subprocess.CompletedProcess[str]: + raise raised + + monkeypatch.setattr(safe_pr_gate.subprocess, "run", fake_run) + + with pytest.raises(RuntimeError, match=expected_message): + safe_pr_gate._run_git(["status", "--short"]) + + +def test_main_reports_deterministic_error_json_on_git_failure(monkeypatch: pytest.MonkeyPatch, capsys: pytest.CaptureFixture[str]) -> None: + monkeypatch.setattr(safe_pr_gate, "collect_gate_state", lambda: (_ for _ in ()).throw(RuntimeError("git command failed with exit code 1: git status --short"))) + + exit_code = safe_pr_gate.main([]) + output = json.loads(capsys.readouterr().out) + + assert exit_code == 1 + assert output == { + "error": { + "message": "git command failed with exit code 1: git status --short", + "type": "RuntimeError", + }, + "ok": False, + "result": "ERROR", + } + + def test_cli_pass_and_fail_outputs_are_deterministic() -> None: with _temporary_git_repo("feat/safe-pr-gate") as repo_root: passing = _run_gate(