Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
56 changes: 56 additions & 0 deletions scripts/safe_pr_gate.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,15 @@
if str(REPO_ROOT) not in sys.path:
sys.path.insert(0, str(REPO_ROOT))

RISKY_PATH_NAMES = frozenset({".env", "id_ed25519", "id_rsa"})
RISKY_PATH_SUFFIXES = (".key", ".pem")
PRIVATE_MARKERS = (
"BEGIN PRIVATE KEY",
"GITHUB_TOKEN=",
"OPENAI_API_KEY=",
"GEMINI_API_KEY=",
)


@dataclass(frozen=True, slots=True)
class GateState:
Expand Down Expand Up @@ -101,6 +110,51 @@ def _path_in_prefix(path: str, prefix: str) -> bool:
return path == normalized or path.startswith(normalized + "/")


def _repo_relative_path(path: str) -> Path | None:
candidate = (REPO_ROOT / path).resolve()
try:
candidate.relative_to(REPO_ROOT)
except ValueError:
return None
return candidate


def _is_risky_path(path: str) -> bool:
name = Path(path).name
return name in RISKY_PATH_NAMES or name.endswith(RISKY_PATH_SUFFIXES)


def _read_changed_text(path: str) -> str | None:
candidate = _repo_relative_path(path)
if candidate is None or not candidate.is_file():
return None
try:
data = candidate.read_bytes()
except OSError:
return None
if b"\0" in data:
return None
try:
return data.decode("utf-8")
except UnicodeDecodeError:
return None
Comment on lines +127 to +140
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

Refactor _read_changed_text to accept a Path object directly. This avoids redundant path resolution and existence checks when called from _privacy_problems. The simplified error handling also makes the function more readable while maintaining the same logic for binary and encoding detection.

Suggested change
def _read_changed_text(path: str) -> str | None:
candidate = _repo_relative_path(path)
if candidate is None or not candidate.is_file():
return None
try:
data = candidate.read_bytes()
except OSError:
return None
if b"\0" in data:
return None
try:
return data.decode("utf-8")
except UnicodeDecodeError:
return None
def _read_changed_text(candidate: Path) -> str | None:
try:
data = candidate.read_bytes()
if b"\0" in data:
return None
return data.decode("utf-8")
except (OSError, UnicodeDecodeError):
return None



def _privacy_problems(changed_paths: tuple[str, ...]) -> tuple[str, ...]:
problems: list[str] = []
for path in sorted(changed_paths):
if _is_risky_path(path):
problems.append(f"privacy_risky_path:{path}")

text = _read_changed_text(path)
if text is None:
continue
for marker in PRIVATE_MARKERS:
if marker in text:
problems.append(f"privacy_marker:{marker}:{path}")
return tuple(problems)
Comment on lines +143 to +155
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

The current implementation flags risky paths even if the file has been deleted in the PR. This is a significant issue as it blocks users from committing the removal of secrets or sensitive files. By moving the existence check to the start of the loop, we avoid false positives for deleted files and can pass the resolved Path to _read_changed_text to eliminate redundant system calls.

def _privacy_problems(changed_paths: tuple[str, ...]) -> tuple[str, ...]:
    problems: list[str] = []
    for path in sorted(changed_paths):
        repo_path = _repo_relative_path(path)
        if repo_path is None or not repo_path.is_file():
            continue

        if _is_risky_path(path):
            problems.append(f"privacy_risky_path:{path}")

        text = _read_changed_text(repo_path)
        if text is None:
            continue
        for marker in PRIVATE_MARKERS:
            if marker in text:
                problems.append(f"privacy_marker:{marker}:{path}")
    return tuple(problems)



def evaluate_gate(
state: GateState,
*,
Expand All @@ -125,6 +179,8 @@ def evaluate_gate(
problems.append("changed_files_outside_allowed_prefixes")
problems.extend(f"outside_prefix:{path}" for path in disallowed_paths)

problems.extend(_privacy_problems(state.changed_paths))

return GateResult(
ok=not problems,
branch=state.branch,
Expand Down
49 changes: 49 additions & 0 deletions tests/test_safe_pr_gate.py
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,55 @@ def test_evaluate_gate_flags_paths_outside_allowed_prefixes() -> None:
assert result.problems == ("changed_files_outside_allowed_prefixes", "outside_prefix:docs/example.md")


def test_evaluate_gate_flags_risky_privacy_paths_in_stable_order() -> None:
result = evaluate_gate(
GateState(
branch="feat/safe-pr-gate",
status_short=(),
changed_paths=(
"secrets/id_rsa",
"config/.env",
"keys/service.key",
"certs/client.pem",
),
)
)

assert result.ok is False
assert result.problems == (
"privacy_risky_path:certs/client.pem",
"privacy_risky_path:config/.env",
"privacy_risky_path:keys/service.key",
"privacy_risky_path:secrets/id_rsa",
)


def test_evaluate_gate_flags_private_markers_in_changed_text_files(
monkeypatch: pytest.MonkeyPatch,
tmp_path: Path,
) -> None:
text_path = tmp_path / "docs" / "example.md"
binary_path = tmp_path / "docs" / "binary.bin"
text_path.parent.mkdir()
text_path.write_text("GITHUB_TOKEN=example\nOPENAI_API_KEY=example\n", encoding="utf-8")
binary_path.write_bytes(b"\0OPENAI_API_KEY=example")
monkeypatch.setattr(safe_pr_gate, "REPO_ROOT", tmp_path)

result = safe_pr_gate.evaluate_gate(
GateState(
branch="feat/safe-pr-gate",
status_short=(),
changed_paths=("docs/example.md", "docs/binary.bin"),
)
)

assert result.ok is False
assert result.problems == (
"privacy_marker:GITHUB_TOKEN=:docs/example.md",
"privacy_marker:OPENAI_API_KEY=:docs/example.md",
)


def test_parse_porcelain_paths_handles_rename_status_in_second_position() -> None:
assert _parse_porcelain_paths(" R old-name.txt\0new-name.txt\0") == ("new-name.txt",)

Expand Down
Loading