Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
225 changes: 225 additions & 0 deletions scripts/validate_agent_artifact_bundle.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,225 @@
#!/usr/bin/env python3
"""Validate deterministic agent artifact bundle payloads."""

from __future__ import annotations

import argparse
import json
import re
import sys
from pathlib import Path
from typing import Any

REPO_ROOT = Path(__file__).resolve().parents[1]
DEFAULT_BUNDLE_PATH = REPO_ROOT / "artifacts" / "agent_artifact_bundle_example.json"

REQUIRED_BUNDLE_FIELDS = (
"ok",
"result",
"branch",
"changed_files",
"safe_pr_gate",
"validation_evidence",
)
REQUIRED_SAFE_GATE_FIELDS = (
"allow_dirty",
"allowed_prefixes",
"branch",
"changed_paths",
"ok",
"problems",
"result",
"status_short",
)
DISALLOWED_TIME_KEYS = {
"timestamp",
"generated_at",
"created_at",
"updated_at",
"completed_at",
"requested_at",
}
DISALLOWED_RANDOM_ID_KEYS = {
"generated_id",
"random_id",
"request_id",
"run_id",
"uuid",
}
UUID_PATTERN = re.compile(
r"^[0-9a-fA-F]{8}-[0-9a-fA-F]{4}-[1-5][0-9a-fA-F]{3}-[89abAB][0-9a-fA-F]{3}-[0-9a-fA-F]{12}$"
)


def _relative(path: Path) -> str:
try:
return path.resolve().relative_to(REPO_ROOT).as_posix()
except ValueError:
return path.as_posix()


def _load_json_object(path: Path) -> dict[str, Any]:
try:
payload = json.loads(path.read_text(encoding="utf-8"))
except FileNotFoundError as exc:
raise RuntimeError(f"missing bundle file: {_relative(path)}") from exc
except json.JSONDecodeError as exc:
raise RuntimeError(f"invalid JSON in bundle file: {_relative(path)}") from exc
if not isinstance(payload, dict):
raise RuntimeError(f"bundle file must contain a JSON object: {_relative(path)}")
return payload


def _is_string_list(value: object) -> bool:
return isinstance(value, list) and all(isinstance(item, str) for item in value)
Comment on lines +73 to +74
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

According to the repository rules, null or missing values for expected list fields should be treated as empty lists, while other non-list types should trigger a RuntimeError to maintain strictness. The current implementation of _is_string_list should be updated to handle None as True and raise a RuntimeError for invalid types.

Suggested change
def _is_string_list(value: object) -> bool:
return isinstance(value, list) and all(isinstance(item, str) for item in value)
def _is_string_list(value: object) -> bool:
if value is None:
return True
if not isinstance(value, list):
raise RuntimeError(f"Expected list, got {type(value).__name__}")
return all(isinstance(item, str) for item in value)
References
  1. When processing JSON data, treat null or missing values for expected list fields as empty lists, but raise a RuntimeError for other non-list types to maintain strictness.



def _expected_result(ok: bool) -> str:
return "PASS" if ok else "FAIL"


def _bundle_from_payload(payload: dict[str, Any]) -> tuple[dict[str, Any] | None, list[str]]:
bundle = payload.get("bundle", payload)
if isinstance(bundle, dict):
return bundle, []
return None, ["bundle must be a JSON object"]


def _scan_for_nondeterministic_fields(value: object, path: str = "$") -> list[str]:
issues: list[str] = []
if isinstance(value, dict):
for key, child in value.items():
key_path = f"{path}.{key}"
normalized = key.lower()
if normalized in DISALLOWED_TIME_KEYS:
issues.append(f"{key_path}: timestamp-like field is not allowed")
if normalized in DISALLOWED_RANDOM_ID_KEYS:
issues.append(f"{key_path}: random-looking generated id field is not allowed")
issues.extend(_scan_for_nondeterministic_fields(child, key_path))
elif isinstance(value, list):
for index, child in enumerate(value):
issues.extend(_scan_for_nondeterministic_fields(child, f"{path}[{index}]"))
elif isinstance(value, str) and UUID_PATTERN.fullmatch(value):
issues.append(f"{path}: UUID-like value is not allowed")
return issues


def validate_bundle_payload(payload: dict[str, Any]) -> dict[str, Any]:
issues: list[str] = []
issues.extend(_scan_for_nondeterministic_fields(payload))

bundle, bundle_issues = _bundle_from_payload(payload)
issues.extend(bundle_issues)
if bundle is None:
return {"issues": sorted(issues), "ok": False, "result": "FAIL"}

for field in REQUIRED_BUNDLE_FIELDS:
if field not in bundle:
issues.append(f"bundle missing required field: {field}")

ok = bundle.get("ok")
result = bundle.get("result")
branch = bundle.get("branch")
changed_files = bundle.get("changed_files")
safe_pr_gate = bundle.get("safe_pr_gate")
validation_evidence = bundle.get("validation_evidence")

if not isinstance(ok, bool):
issues.append("bundle.ok must be a boolean")
if not isinstance(result, str):
issues.append("bundle.result must be a string")
if isinstance(ok, bool) and isinstance(result, str) and result != _expected_result(ok):
issues.append("bundle.result must match bundle.ok")
if not isinstance(branch, str):
issues.append("bundle.branch must be a string")
if not _is_string_list(changed_files):
issues.append("bundle.changed_files must be a list of strings")

if not isinstance(safe_pr_gate, dict):
issues.append("bundle.safe_pr_gate must be a JSON object")
else:
issues.extend(_validate_safe_pr_gate(safe_pr_gate, ok))
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

To ensure the integrity of the artifact bundle, the validator should verify that the top-level branch and changed_files fields are consistent with the corresponding fields inside the safe_pr_gate object. This prevents discrepancies between the bundle summary and the detailed gate state.

Suggested change
issues.extend(_validate_safe_pr_gate(safe_pr_gate, ok))
issues.extend(_validate_safe_pr_gate(safe_pr_gate, ok))
if isinstance(branch, str) and isinstance(safe_pr_gate.get("branch"), str) and branch != safe_pr_gate.get("branch"):
issues.append("bundle.branch must match bundle.safe_pr_gate.branch")
if _is_string_list(changed_files) and _is_string_list(safe_pr_gate.get("changed_paths")):
if (changed_files or []) != (safe_pr_gate.get("changed_paths") or []):
issues.append("bundle.changed_files must match bundle.safe_pr_gate.changed_paths")


if not isinstance(validation_evidence, list):
issues.append("bundle.validation_evidence must be a list")
else:
issues.extend(_validate_validation_evidence(validation_evidence))
Comment on lines +143 to +146
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

Following the repository rule to treat null or missing list fields as empty lists and raise a RuntimeError for other non-list types, the check for validation_evidence should be updated. If it is None, it should be treated as an empty list; if it is not a list, a RuntimeError should be raised instead of appending to the issues list.

    if validation_evidence is not None and not isinstance(validation_evidence, list):
        raise RuntimeError("bundle.validation_evidence must be a list")
    issues.extend(_validate_validation_evidence(validation_evidence or []))
References
  1. When processing JSON data, treat null or missing values for expected list fields as empty lists, but raise a RuntimeError for other non-list types to maintain strictness.


return {"issues": sorted(issues), "ok": not issues, "result": "PASS" if not issues else "FAIL"}


def _validate_safe_pr_gate(safe_pr_gate: dict[str, Any], bundle_ok: object) -> list[str]:
issues: list[str] = []
for field in REQUIRED_SAFE_GATE_FIELDS:
if field not in safe_pr_gate:
issues.append(f"bundle.safe_pr_gate missing required field: {field}")

gate_ok = safe_pr_gate.get("ok")
gate_result = safe_pr_gate.get("result")
if not isinstance(gate_ok, bool):
issues.append("bundle.safe_pr_gate.ok must be a boolean")
if not isinstance(gate_result, str):
issues.append("bundle.safe_pr_gate.result must be a string")
if isinstance(gate_ok, bool) and isinstance(gate_result, str) and gate_result != _expected_result(gate_ok):
issues.append("bundle.safe_pr_gate.result must match bundle.safe_pr_gate.ok")
if isinstance(bundle_ok, bool) and isinstance(gate_ok, bool) and bundle_ok != gate_ok:
issues.append("bundle.ok must match bundle.safe_pr_gate.ok")
if not isinstance(safe_pr_gate.get("allow_dirty"), bool):
issues.append("bundle.safe_pr_gate.allow_dirty must be a boolean")
if not isinstance(safe_pr_gate.get("branch"), str):
issues.append("bundle.safe_pr_gate.branch must be a string")
for field in ("allowed_prefixes", "changed_paths", "problems", "status_short"):
if not _is_string_list(safe_pr_gate.get(field)):
issues.append(f"bundle.safe_pr_gate.{field} must be a list of strings")
return issues


def _validate_validation_evidence(validation_evidence: list[object]) -> list[str]:
issues: list[str] = []
for index, entry in enumerate(validation_evidence):
if not isinstance(entry, dict):
issues.append(f"bundle.validation_evidence[{index}] must be a JSON object")
continue
if not isinstance(entry.get("command"), str):
issues.append(f"bundle.validation_evidence[{index}].command must be a string")
if not isinstance(entry.get("result"), str):
issues.append(f"bundle.validation_evidence[{index}].result must be a string")
return issues


def validate_bundle_file(path: Path) -> dict[str, Any]:
payload = _load_json_object(path)
result = validate_bundle_payload(payload)
return {
"bundle": _relative(path),
"issues": result["issues"],
"ok": result["ok"],
"result": result["result"],
}


def _parse_args(argv: list[str]) -> argparse.Namespace:
parser = argparse.ArgumentParser(description="Validate a deterministic agent artifact bundle.")
parser.add_argument("--bundle", type=Path, default=DEFAULT_BUNDLE_PATH, help="Bundle JSON path.")
return parser.parse_args(argv)


def main(argv: list[str] | None = None) -> int:
args = _parse_args(sys.argv[1:] if argv is None else argv)
try:
result = validate_bundle_file(args.bundle)
except RuntimeError as exc:
result = {
"error": {
"message": str(exc),
"type": exc.__class__.__name__,
},
"ok": False,
"result": "ERROR",
}
sys.stdout.write(json.dumps(result, indent=2, sort_keys=True) + "\n")
return 0 if result["ok"] else 1


if __name__ == "__main__":
raise SystemExit(main())
130 changes: 130 additions & 0 deletions tests/test_validate_agent_artifact_bundle.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,130 @@
from __future__ import annotations

import json
from pathlib import Path

import scripts.validate_agent_artifact_bundle as validator

ARTIFACT_PATH = Path("artifacts/agent_artifact_bundle_example.json")


def _valid_bundle() -> dict[str, object]:
return {
"branch": "feat/example",
"changed_files": ["scripts/example.py"],
"ok": True,
"result": "PASS",
"safe_pr_gate": {
"allow_dirty": False,
"allowed_prefixes": [],
"branch": "feat/example",
"changed_paths": ["scripts/example.py"],
"ok": True,
"problems": [],
"result": "PASS",
"status_short": [],
},
"validation_evidence": [
{
"command": "python -m compileall -q scripts/example.py",
"result": "pass",
}
],
}


def test_committed_agent_artifact_bundle_example_is_valid() -> None:
result = validator.validate_bundle_file(ARTIFACT_PATH)

assert result == {
"bundle": "artifacts/agent_artifact_bundle_example.json",
"issues": [],
"ok": True,
"result": "PASS",
}


def test_validator_accepts_raw_bundle_payload() -> None:
result = validator.validate_bundle_payload(_valid_bundle())

assert result == {"issues": [], "ok": True, "result": "PASS"}


def test_validator_rejects_result_that_does_not_match_ok() -> None:
bundle = _valid_bundle()
bundle["result"] = "FAIL"

result = validator.validate_bundle_payload(bundle)

assert result["ok"] is False
assert result["result"] == "FAIL"
assert result["issues"] == ["bundle.result must match bundle.ok"]


def test_validator_rejects_safe_gate_status_mismatch() -> None:
bundle = _valid_bundle()
safe_pr_gate = bundle["safe_pr_gate"]
assert isinstance(safe_pr_gate, dict)
safe_pr_gate["ok"] = False
safe_pr_gate["result"] = "FAIL"

result = validator.validate_bundle_payload(bundle)

assert result["ok"] is False
assert result["issues"] == ["bundle.ok must match bundle.safe_pr_gate.ok"]


def test_validator_rejects_timestamp_and_random_id_fields() -> None:
bundle = _valid_bundle()
bundle["generated_at"] = "2026-01-01T00:00:00Z"
bundle["run_id"] = "gha:123"
bundle["uuid"] = "123e4567-e89b-12d3-a456-426614174000"

result = validator.validate_bundle_payload(bundle)

assert result["ok"] is False
assert result["issues"] == [
"$.generated_at: timestamp-like field is not allowed",
"$.run_id: random-looking generated id field is not allowed",
"$.uuid: UUID-like value is not allowed",
"$.uuid: random-looking generated id field is not allowed",
]


def test_validator_rejects_missing_safe_gate_deterministic_field() -> None:
bundle = _valid_bundle()
safe_pr_gate = bundle["safe_pr_gate"]
assert isinstance(safe_pr_gate, dict)
del safe_pr_gate["status_short"]

result = validator.validate_bundle_payload(bundle)

assert result["ok"] is False
assert result["issues"] == [
"bundle.safe_pr_gate missing required field: status_short",
"bundle.safe_pr_gate.status_short must be a list of strings",
]


def test_cli_outputs_deterministic_json_for_invalid_bundle(tmp_path: Path, capsys) -> None:
invalid_path = tmp_path / "invalid_bundle.json"
invalid_path.write_text(json.dumps({"bundle": {"ok": True, "result": "FAIL"}}, sort_keys=True), encoding="utf-8")

exit_code = validator.main(["--bundle", str(invalid_path)])
output = json.loads(capsys.readouterr().out)

assert exit_code == 1
assert output["bundle"] == invalid_path.as_posix()
assert output["ok"] is False
assert output["result"] == "FAIL"
assert output["issues"] == [
"bundle missing required field: branch",
"bundle missing required field: changed_files",
"bundle missing required field: safe_pr_gate",
"bundle missing required field: validation_evidence",
"bundle.branch must be a string",
"bundle.changed_files must be a list of strings",
"bundle.result must match bundle.ok",
"bundle.safe_pr_gate must be a JSON object",
"bundle.validation_evidence must be a list",
]
Loading