diff --git a/docs/codex_skills/git_pr_workflow.md b/docs/codex_skills/git_pr_workflow.md index 538cc63..f41749c 100644 --- a/docs/codex_skills/git_pr_workflow.md +++ b/docs/codex_skills/git_pr_workflow.md @@ -32,6 +32,7 @@ Use for branch setup, syncing from `main`, local commits, staged-file review, pu - Before commit: `git status --short`, `git diff --stat`, and `git diff --cached --name-only`. - Before push: clean `git status --short` and current branch confirmation. - For PRs: include validation commands actually run; do not claim CI status unless checked. +- For agent-assisted pre-push or pre-PR checks, run `python scripts/safe_pr_gate.py` on the current branch before pushing. ## Stop conditions diff --git a/scripts/safe_pr_gate.py b/scripts/safe_pr_gate.py new file mode 100644 index 0000000..ca23858 --- /dev/null +++ b/scripts/safe_pr_gate.py @@ -0,0 +1,179 @@ +"""Deterministic local safety gate for agent-assisted PR workflows.""" + +from __future__ import annotations + +import argparse +import json +import subprocess +import sys +from dataclasses import dataclass +from pathlib import Path +from typing import Any + +REPO_ROOT = Path(__file__).resolve().parents[1] +if str(REPO_ROOT) not in sys.path: + sys.path.insert(0, str(REPO_ROOT)) + + +@dataclass(frozen=True, slots=True) +class GateState: + branch: str + status_short: tuple[str, ...] + changed_paths: tuple[str, ...] + + +@dataclass(frozen=True, slots=True) +class GateResult: + ok: bool + branch: str + allow_dirty: bool + allowed_prefixes: tuple[str, ...] + status_short: tuple[str, ...] + changed_paths: tuple[str, ...] + problems: tuple[str, ...] + + def to_dict(self) -> dict[str, Any]: + return { + "allowed_prefixes": list(self.allowed_prefixes), + "allow_dirty": self.allow_dirty, + "branch": self.branch, + "changed_paths": list(self.changed_paths), + "ok": self.ok, + "problems": list(self.problems), + "result": "PASS" if self.ok else "FAIL", + "status_short": list(self.status_short), + } + + +def _run_git(args: list[str]) -> str: + try: + completed = subprocess.run( + ["git", *args], + cwd=REPO_ROOT, + check=True, + capture_output=True, + text=True, + ) + except FileNotFoundError as exc: + raise RuntimeError(f"git executable not found while running: git {' '.join(args)}") from exc + except subprocess.CalledProcessError as exc: + raise RuntimeError( + f"git command failed with exit code {exc.returncode}: git {' '.join(args)}" + ) from exc + return completed.stdout + + +def _parse_porcelain_paths(output: str) -> tuple[str, ...]: + if not output: + return () + + entries = output.split("\0") + paths: list[str] = [] + index = 0 + while index < len(entries): + entry = entries[index] + if not entry: + index += 1 + continue + path = entry[3:] + status = entry[:2] + if status and any(char in {"R", "C"} for char in status): + if index + 1 < len(entries) and entries[index + 1]: + paths.append(entries[index + 1]) + index += 2 + continue + paths.append(path) + index += 1 + return tuple(paths) + + +def collect_gate_state() -> GateState: + branch = _run_git(["branch", "--show-current"]).strip() + status_short_output = _run_git(["status", "--short", "--untracked-files=all"]) + porcelain_output = _run_git(["status", "--porcelain=v1", "-z"]) + status_short = tuple(line for line in status_short_output.splitlines() if line) + changed_paths = _parse_porcelain_paths(porcelain_output) + return GateState(branch=branch, status_short=status_short, changed_paths=changed_paths) + + +def _path_in_prefix(path: str, prefix: str) -> bool: + normalized = prefix.rstrip("/") + return path == normalized or path.startswith(normalized + "/") + + +def evaluate_gate( + state: GateState, + *, + allow_dirty: bool = False, + allowed_prefixes: tuple[str, ...] = (), +) -> GateResult: + problems: list[str] = [] + + if state.branch == "main": + problems.append("on_main_branch") + + if state.status_short and not allow_dirty: + problems.append("dirty_working_tree") + + if allowed_prefixes: + disallowed_paths = sorted( + path + for path in state.changed_paths + if not any(_path_in_prefix(path, prefix) for prefix in allowed_prefixes) + ) + if disallowed_paths: + problems.append("changed_files_outside_allowed_prefixes") + problems.extend(f"outside_prefix:{path}" for path in disallowed_paths) + + return GateResult( + ok=not problems, + branch=state.branch, + allow_dirty=allow_dirty, + allowed_prefixes=tuple(sorted(allowed_prefixes)), + status_short=state.status_short, + changed_paths=state.changed_paths, + problems=tuple(problems), + ) + + +def _parse_args(argv: list[str]) -> argparse.Namespace: + parser = argparse.ArgumentParser(description="Run a deterministic local safety gate for PR-ready agent work.") + parser.add_argument("--allow-dirty", action="store_true", help="Permit a dirty working tree while still checking allowed prefixes.") + parser.add_argument( + "--allowed-prefix", + action="append", + default=[], + help="Require changed paths to stay within this repo-relative prefix. May be repeated.", + ) + return parser.parse_args(argv) + + +def _error_response(exc: RuntimeError) -> dict[str, Any]: + return { + "error": { + "message": str(exc), + "type": exc.__class__.__name__, + }, + "ok": False, + "result": "ERROR", + } + + +def main(argv: list[str] | None = None) -> int: + args = _parse_args(sys.argv[1:] if argv is None else argv) + try: + state = collect_gate_state() + result = evaluate_gate( + state, + allow_dirty=args.allow_dirty, + allowed_prefixes=tuple(args.allowed_prefix), + ) + sys.stdout.write(json.dumps(result.to_dict(), indent=2, sort_keys=True) + "\n") + return 0 if result.ok else 1 + except RuntimeError as exc: + sys.stdout.write(json.dumps(_error_response(exc), indent=2, sort_keys=True) + "\n") + return 1 + + +if __name__ == "__main__": + raise SystemExit(main()) diff --git a/tests/test_safe_pr_gate.py b/tests/test_safe_pr_gate.py new file mode 100644 index 0000000..b22225c --- /dev/null +++ b/tests/test_safe_pr_gate.py @@ -0,0 +1,174 @@ +from __future__ import annotations + +import json +import shutil +import subprocess +import sys +import tempfile +from contextlib import contextmanager +from pathlib import Path + +import pytest + +import scripts.safe_pr_gate as safe_pr_gate +from scripts.safe_pr_gate import GateState, _parse_porcelain_paths, evaluate_gate + + +REPO_ROOT = Path(__file__).resolve().parents[1] + + +def _run_gate(*args: str, cwd: Path | None = None) -> subprocess.CompletedProcess[str]: + return subprocess.run( + [sys.executable, "scripts/safe_pr_gate.py", *args], + check=False, + capture_output=True, + text=True, + cwd=cwd, + ) + + +@contextmanager +def _temporary_git_repo(branch: str): + with tempfile.TemporaryDirectory() as temp_dir: + repo_root = Path(temp_dir) + scripts_dir = repo_root / "scripts" + scripts_dir.mkdir() + shutil.copy2(REPO_ROOT / "scripts" / "safe_pr_gate.py", scripts_dir / "safe_pr_gate.py") + subprocess.run(["git", "init"], cwd=repo_root, check=True, capture_output=True, text=True) + subprocess.run(["git", "checkout", "-b", branch], cwd=repo_root, check=True, capture_output=True, text=True) + yield repo_root + + +def test_evaluate_gate_passes_on_clean_feature_branch_state() -> None: + result = evaluate_gate( + GateState(branch="feat/safe-pr-gate", status_short=(), changed_paths=()), + allowed_prefixes=("scripts/",), + ) + + assert result.ok is True + assert result.problems == () + assert result.to_dict() == { + "allowed_prefixes": ["scripts/"], + "allow_dirty": False, + "branch": "feat/safe-pr-gate", + "changed_paths": [], + "ok": True, + "problems": [], + "result": "PASS", + "status_short": [], + } + + +def test_evaluate_gate_fails_on_main_branch() -> None: + result = evaluate_gate(GateState(branch="main", status_short=(), changed_paths=())) + + assert result.ok is False + assert result.problems == ("on_main_branch",) + + +def test_evaluate_gate_allows_detached_head_state() -> None: + result = evaluate_gate(GateState(branch="", status_short=(), changed_paths=())) + + assert result.ok is True + assert result.problems == () + + +def test_evaluate_gate_fails_on_dirty_tree_without_allow_dirty() -> None: + result = evaluate_gate( + GateState(branch="feat/safe-pr-gate", status_short=(" M scripts/example.py",), changed_paths=("scripts/example.py",)) + ) + + assert result.ok is False + assert result.problems == ("dirty_working_tree",) + + +def test_evaluate_gate_flags_paths_outside_allowed_prefixes() -> None: + result = evaluate_gate( + GateState(branch="feat/safe-pr-gate", status_short=(" M docs/example.md",), changed_paths=("docs/example.md",)), + allow_dirty=True, + allowed_prefixes=("scripts/",), + ) + + assert result.ok is False + assert result.problems == ("changed_files_outside_allowed_prefixes", "outside_prefix:docs/example.md") + + +def test_parse_porcelain_paths_handles_rename_status_in_second_position() -> None: + assert _parse_porcelain_paths(" R old-name.txt\0new-name.txt\0") == ("new-name.txt",) + + +@pytest.mark.parametrize( + ("raised", "expected_message"), + [ + (subprocess.CalledProcessError(1, ["git", "status", "--short"]), "git command failed with exit code 1: git status --short"), + (FileNotFoundError(), "git executable not found while running: git status --short"), + ], +) +def test_run_git_wraps_git_subprocess_failures(monkeypatch: pytest.MonkeyPatch, raised: BaseException, expected_message: str) -> None: + def fake_run(*args: object, **kwargs: object) -> subprocess.CompletedProcess[str]: + raise raised + + monkeypatch.setattr(safe_pr_gate.subprocess, "run", fake_run) + + with pytest.raises(RuntimeError, match=expected_message): + safe_pr_gate._run_git(["status", "--short"]) + + +def test_main_reports_deterministic_error_json_on_git_failure(monkeypatch: pytest.MonkeyPatch, capsys: pytest.CaptureFixture[str]) -> None: + monkeypatch.setattr(safe_pr_gate, "collect_gate_state", lambda: (_ for _ in ()).throw(RuntimeError("git command failed with exit code 1: git status --short"))) + + exit_code = safe_pr_gate.main([]) + output = json.loads(capsys.readouterr().out) + + assert exit_code == 1 + assert output == { + "error": { + "message": "git command failed with exit code 1: git status --short", + "type": "RuntimeError", + }, + "ok": False, + "result": "ERROR", + } + + +def test_cli_pass_and_fail_outputs_are_deterministic() -> None: + with _temporary_git_repo("feat/safe-pr-gate") as repo_root: + passing = _run_gate( + "--allow-dirty", + "--allowed-prefix", + "docs/", + "--allowed-prefix", + "scripts/", + "--allowed-prefix", + "tests/", + cwd=repo_root, + ) + dirty_path = repo_root / "_safe_pr_gate_dirty_test.tmp" + dirty_path.write_text("dirty\n", encoding="utf-8") + try: + failing = _run_gate( + "--allow-dirty", + "--allowed-prefix", + "docs/", + "--allowed-prefix", + "scripts/", + "--allowed-prefix", + "tests/", + cwd=repo_root, + ) + finally: + dirty_path.unlink(missing_ok=True) + + assert passing.returncode == 0 + assert failing.returncode == 1 + + passing_payload = json.loads(passing.stdout) + failing_payload = json.loads(failing.stdout) + + assert passing_payload["result"] == "PASS" + assert passing_payload["allow_dirty"] is True + assert failing_payload["result"] == "FAIL" + assert failing_payload["problems"] == [ + "changed_files_outside_allowed_prefixes", + "outside_prefix:_safe_pr_gate_dirty_test.tmp", + ]