-
Notifications
You must be signed in to change notification settings - Fork 0
feat: add safe PR gate #202
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,179 @@ | ||
| """Deterministic local safety gate for agent-assisted PR workflows.""" | ||
|
|
||
| from __future__ import annotations | ||
|
|
||
| import argparse | ||
| import json | ||
| import subprocess | ||
| import sys | ||
| from dataclasses import dataclass | ||
| from pathlib import Path | ||
| from typing import Any | ||
|
|
||
| REPO_ROOT = Path(__file__).resolve().parents[1] | ||
| if str(REPO_ROOT) not in sys.path: | ||
| sys.path.insert(0, str(REPO_ROOT)) | ||
|
|
||
|
|
||
| @dataclass(frozen=True, slots=True) | ||
| class GateState: | ||
| branch: str | ||
| status_short: tuple[str, ...] | ||
| changed_paths: tuple[str, ...] | ||
|
|
||
|
|
||
| @dataclass(frozen=True, slots=True) | ||
| class GateResult: | ||
| ok: bool | ||
| branch: str | ||
| allow_dirty: bool | ||
| allowed_prefixes: tuple[str, ...] | ||
| status_short: tuple[str, ...] | ||
| changed_paths: tuple[str, ...] | ||
| problems: tuple[str, ...] | ||
|
|
||
| def to_dict(self) -> dict[str, Any]: | ||
| return { | ||
| "allowed_prefixes": list(self.allowed_prefixes), | ||
| "allow_dirty": self.allow_dirty, | ||
| "branch": self.branch, | ||
| "changed_paths": list(self.changed_paths), | ||
| "ok": self.ok, | ||
| "problems": list(self.problems), | ||
| "result": "PASS" if self.ok else "FAIL", | ||
| "status_short": list(self.status_short), | ||
| } | ||
|
|
||
|
|
||
| def _run_git(args: list[str]) -> str: | ||
| try: | ||
| completed = subprocess.run( | ||
| ["git", *args], | ||
| cwd=REPO_ROOT, | ||
| check=True, | ||
| capture_output=True, | ||
| text=True, | ||
| ) | ||
| except FileNotFoundError as exc: | ||
| raise RuntimeError(f"git executable not found while running: git {' '.join(args)}") from exc | ||
| except subprocess.CalledProcessError as exc: | ||
| raise RuntimeError( | ||
| f"git command failed with exit code {exc.returncode}: git {' '.join(args)}" | ||
| ) from exc | ||
| return completed.stdout | ||
|
|
||
|
|
||
| def _parse_porcelain_paths(output: str) -> tuple[str, ...]: | ||
| if not output: | ||
| return () | ||
|
|
||
| entries = output.split("\0") | ||
| paths: list[str] = [] | ||
| index = 0 | ||
| while index < len(entries): | ||
| entry = entries[index] | ||
| if not entry: | ||
| index += 1 | ||
| continue | ||
| path = entry[3:] | ||
| status = entry[:2] | ||
| if status and any(char in {"R", "C"} for char in status): | ||
| if index + 1 < len(entries) and entries[index + 1]: | ||
| paths.append(entries[index + 1]) | ||
| index += 2 | ||
| continue | ||
| paths.append(path) | ||
| index += 1 | ||
| return tuple(paths) | ||
|
|
||
|
|
||
| def collect_gate_state() -> GateState: | ||
| branch = _run_git(["branch", "--show-current"]).strip() | ||
| status_short_output = _run_git(["status", "--short", "--untracked-files=all"]) | ||
| porcelain_output = _run_git(["status", "--porcelain=v1", "-z"]) | ||
| status_short = tuple(line for line in status_short_output.splitlines() if line) | ||
| changed_paths = _parse_porcelain_paths(porcelain_output) | ||
| return GateState(branch=branch, status_short=status_short, changed_paths=changed_paths) | ||
|
|
||
|
|
||
| def _path_in_prefix(path: str, prefix: str) -> bool: | ||
| normalized = prefix.rstrip("/") | ||
| return path == normalized or path.startswith(normalized + "/") | ||
|
|
||
|
|
||
| def evaluate_gate( | ||
| state: GateState, | ||
| *, | ||
| allow_dirty: bool = False, | ||
| allowed_prefixes: tuple[str, ...] = (), | ||
| ) -> GateResult: | ||
| problems: list[str] = [] | ||
|
|
||
| if state.branch == "main": | ||
| problems.append("on_main_branch") | ||
|
|
||
| if state.status_short and not allow_dirty: | ||
| problems.append("dirty_working_tree") | ||
|
|
||
| if allowed_prefixes: | ||
| disallowed_paths = sorted( | ||
| path | ||
| for path in state.changed_paths | ||
| if not any(_path_in_prefix(path, prefix) for prefix in allowed_prefixes) | ||
| ) | ||
| if disallowed_paths: | ||
| problems.append("changed_files_outside_allowed_prefixes") | ||
| problems.extend(f"outside_prefix:{path}" for path in disallowed_paths) | ||
|
|
||
| return GateResult( | ||
| ok=not problems, | ||
| branch=state.branch, | ||
| allow_dirty=allow_dirty, | ||
| allowed_prefixes=tuple(sorted(allowed_prefixes)), | ||
| status_short=state.status_short, | ||
| changed_paths=state.changed_paths, | ||
| problems=tuple(problems), | ||
| ) | ||
|
|
||
|
|
||
| def _parse_args(argv: list[str]) -> argparse.Namespace: | ||
| parser = argparse.ArgumentParser(description="Run a deterministic local safety gate for PR-ready agent work.") | ||
| parser.add_argument("--allow-dirty", action="store_true", help="Permit a dirty working tree while still checking allowed prefixes.") | ||
| parser.add_argument( | ||
| "--allowed-prefix", | ||
| action="append", | ||
| default=[], | ||
| help="Require changed paths to stay within this repo-relative prefix. May be repeated.", | ||
| ) | ||
| return parser.parse_args(argv) | ||
|
|
||
|
|
||
| def _error_response(exc: RuntimeError) -> dict[str, Any]: | ||
| return { | ||
| "error": { | ||
| "message": str(exc), | ||
| "type": exc.__class__.__name__, | ||
| }, | ||
| "ok": False, | ||
| "result": "ERROR", | ||
| } | ||
|
|
||
|
|
||
| def main(argv: list[str] | None = None) -> int: | ||
| args = _parse_args(sys.argv[1:] if argv is None else argv) | ||
| try: | ||
| state = collect_gate_state() | ||
| result = evaluate_gate( | ||
| state, | ||
| allow_dirty=args.allow_dirty, | ||
| allowed_prefixes=tuple(args.allowed_prefix), | ||
| ) | ||
| sys.stdout.write(json.dumps(result.to_dict(), indent=2, sort_keys=True) + "\n") | ||
| return 0 if result.ok else 1 | ||
| except RuntimeError as exc: | ||
| sys.stdout.write(json.dumps(_error_response(exc), indent=2, sort_keys=True) + "\n") | ||
| return 1 | ||
|
|
||
|
|
||
| if __name__ == "__main__": | ||
| raise SystemExit(main()) | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,174 @@ | ||
| from __future__ import annotations | ||
|
|
||
| import json | ||
| import shutil | ||
| import subprocess | ||
| import sys | ||
| import tempfile | ||
| from contextlib import contextmanager | ||
| from pathlib import Path | ||
|
|
||
| import pytest | ||
|
|
||
| import scripts.safe_pr_gate as safe_pr_gate | ||
| from scripts.safe_pr_gate import GateState, _parse_porcelain_paths, evaluate_gate | ||
|
|
||
|
|
||
| REPO_ROOT = Path(__file__).resolve().parents[1] | ||
|
|
||
|
|
||
| def _run_gate(*args: str, cwd: Path | None = None) -> subprocess.CompletedProcess[str]: | ||
| return subprocess.run( | ||
| [sys.executable, "scripts/safe_pr_gate.py", *args], | ||
| check=False, | ||
| capture_output=True, | ||
| text=True, | ||
| cwd=cwd, | ||
| ) | ||
|
|
||
|
|
||
| @contextmanager | ||
| def _temporary_git_repo(branch: str): | ||
| with tempfile.TemporaryDirectory() as temp_dir: | ||
| repo_root = Path(temp_dir) | ||
| scripts_dir = repo_root / "scripts" | ||
| scripts_dir.mkdir() | ||
| shutil.copy2(REPO_ROOT / "scripts" / "safe_pr_gate.py", scripts_dir / "safe_pr_gate.py") | ||
| subprocess.run(["git", "init"], cwd=repo_root, check=True, capture_output=True, text=True) | ||
| subprocess.run(["git", "checkout", "-b", branch], cwd=repo_root, check=True, capture_output=True, text=True) | ||
| yield repo_root | ||
|
|
||
|
|
||
| def test_evaluate_gate_passes_on_clean_feature_branch_state() -> None: | ||
| result = evaluate_gate( | ||
| GateState(branch="feat/safe-pr-gate", status_short=(), changed_paths=()), | ||
| allowed_prefixes=("scripts/",), | ||
| ) | ||
|
|
||
| assert result.ok is True | ||
| assert result.problems == () | ||
| assert result.to_dict() == { | ||
| "allowed_prefixes": ["scripts/"], | ||
| "allow_dirty": False, | ||
| "branch": "feat/safe-pr-gate", | ||
| "changed_paths": [], | ||
| "ok": True, | ||
| "problems": [], | ||
| "result": "PASS", | ||
| "status_short": [], | ||
| } | ||
|
|
||
|
|
||
| def test_evaluate_gate_fails_on_main_branch() -> None: | ||
| result = evaluate_gate(GateState(branch="main", status_short=(), changed_paths=())) | ||
|
|
||
| assert result.ok is False | ||
| assert result.problems == ("on_main_branch",) | ||
|
|
||
|
|
||
| def test_evaluate_gate_allows_detached_head_state() -> None: | ||
| result = evaluate_gate(GateState(branch="", status_short=(), changed_paths=())) | ||
|
|
||
| assert result.ok is True | ||
| assert result.problems == () | ||
|
|
||
|
|
||
| def test_evaluate_gate_fails_on_dirty_tree_without_allow_dirty() -> None: | ||
| result = evaluate_gate( | ||
| GateState(branch="feat/safe-pr-gate", status_short=(" M scripts/example.py",), changed_paths=("scripts/example.py",)) | ||
| ) | ||
|
|
||
| assert result.ok is False | ||
| assert result.problems == ("dirty_working_tree",) | ||
|
|
||
|
|
||
| def test_evaluate_gate_flags_paths_outside_allowed_prefixes() -> None: | ||
| result = evaluate_gate( | ||
| GateState(branch="feat/safe-pr-gate", status_short=(" M docs/example.md",), changed_paths=("docs/example.md",)), | ||
| allow_dirty=True, | ||
| allowed_prefixes=("scripts/",), | ||
| ) | ||
|
|
||
| assert result.ok is False | ||
| assert result.problems == ("changed_files_outside_allowed_prefixes", "outside_prefix:docs/example.md") | ||
|
|
||
|
|
||
| def test_parse_porcelain_paths_handles_rename_status_in_second_position() -> None: | ||
| assert _parse_porcelain_paths(" R old-name.txt\0new-name.txt\0") == ("new-name.txt",) | ||
|
|
||
|
|
||
| @pytest.mark.parametrize( | ||
| ("raised", "expected_message"), | ||
| [ | ||
| (subprocess.CalledProcessError(1, ["git", "status", "--short"]), "git command failed with exit code 1: git status --short"), | ||
| (FileNotFoundError(), "git executable not found while running: git status --short"), | ||
| ], | ||
| ) | ||
| def test_run_git_wraps_git_subprocess_failures(monkeypatch: pytest.MonkeyPatch, raised: BaseException, expected_message: str) -> None: | ||
| def fake_run(*args: object, **kwargs: object) -> subprocess.CompletedProcess[str]: | ||
| raise raised | ||
|
|
||
| monkeypatch.setattr(safe_pr_gate.subprocess, "run", fake_run) | ||
|
|
||
| with pytest.raises(RuntimeError, match=expected_message): | ||
| safe_pr_gate._run_git(["status", "--short"]) | ||
|
|
||
|
|
||
| def test_main_reports_deterministic_error_json_on_git_failure(monkeypatch: pytest.MonkeyPatch, capsys: pytest.CaptureFixture[str]) -> None: | ||
| monkeypatch.setattr(safe_pr_gate, "collect_gate_state", lambda: (_ for _ in ()).throw(RuntimeError("git command failed with exit code 1: git status --short"))) | ||
|
|
||
| exit_code = safe_pr_gate.main([]) | ||
| output = json.loads(capsys.readouterr().out) | ||
|
|
||
| assert exit_code == 1 | ||
| assert output == { | ||
| "error": { | ||
| "message": "git command failed with exit code 1: git status --short", | ||
| "type": "RuntimeError", | ||
| }, | ||
| "ok": False, | ||
| "result": "ERROR", | ||
| } | ||
|
|
||
|
|
||
| def test_cli_pass_and_fail_outputs_are_deterministic() -> None: | ||
| with _temporary_git_repo("feat/safe-pr-gate") as repo_root: | ||
| passing = _run_gate( | ||
| "--allow-dirty", | ||
| "--allowed-prefix", | ||
| "docs/", | ||
| "--allowed-prefix", | ||
| "scripts/", | ||
| "--allowed-prefix", | ||
| "tests/", | ||
| cwd=repo_root, | ||
| ) | ||
| dirty_path = repo_root / "_safe_pr_gate_dirty_test.tmp" | ||
| dirty_path.write_text("dirty\n", encoding="utf-8") | ||
| try: | ||
| failing = _run_gate( | ||
| "--allow-dirty", | ||
| "--allowed-prefix", | ||
| "docs/", | ||
| "--allowed-prefix", | ||
| "scripts/", | ||
| "--allowed-prefix", | ||
| "tests/", | ||
| cwd=repo_root, | ||
| ) | ||
| finally: | ||
| dirty_path.unlink(missing_ok=True) | ||
|
|
||
| assert passing.returncode == 0 | ||
| assert failing.returncode == 1 | ||
|
|
||
| passing_payload = json.loads(passing.stdout) | ||
| failing_payload = json.loads(failing.stdout) | ||
|
|
||
| assert passing_payload["result"] == "PASS" | ||
| assert passing_payload["allow_dirty"] is True | ||
| assert failing_payload["result"] == "FAIL" | ||
| assert failing_payload["problems"] == [ | ||
| "changed_files_outside_allowed_prefixes", | ||
| "outside_prefix:_safe_pr_gate_dirty_test.tmp", | ||
| ] | ||
|
Comment on lines
+134
to
+174
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This integration test is environment-dependent and will fail if the local repository where tests are run has any untracked or modified files outside the hardcoded allowed prefixes ( To make this test deterministic, consider refactoring it to call |
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The
_run_gitfunction lacks robust error handling. Ifgitis missing from the environment or if the command is run outside of a git repository, it will raise aFileNotFoundErrororsubprocess.CalledProcessErrorwith a traceback. Following the repository's pattern of strict error reporting, it is better to catch these and raise aRuntimeErrorwith descriptive context.References