Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions apps/identity-prime/src/identity_prime/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
"""identity-prime runtime helpers."""
81 changes: 81 additions & 0 deletions apps/identity-prime/src/identity_prime/proof_ingress.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
from __future__ import annotations

from datetime import datetime, timezone
from typing import Any, Literal
from uuid import uuid4

ProofSource = Literal[
"first_party_passkey",
"enterprise_oidc",
"enterprise_saml",
"workload_identity",
"recovery_flow",
]

ProofResult = Literal["accepted", "rejected", "inconclusive"]

_ALLOWED_PROOF_SOURCES = {
"first_party_passkey",
"enterprise_oidc",
"enterprise_saml",
"workload_identity",
"recovery_flow",
}

_ALLOWED_RESULTS = {"accepted", "rejected", "inconclusive"}


def utc_now_iso() -> str:
return datetime.now(timezone.utc).replace(microsecond=0).isoformat().replace("+00:00", "Z")


def emit_proof_ingress_record(
*,
proof_source: ProofSource,
tenant_id: str,
result: ProofResult,
subject_id: str | None = None,
issuer_ref: str | None = None,
upstream_subject: str | None = None,
assurance_context: dict[str, Any] | None = None,
evidence_refs: list[str] | None = None,
correlation_id: str | None = None,
proof_record_id: str | None = None,
received_at: str | None = None,
) -> dict[str, Any]:
"""Emit an IdentityProofIngressRecord v0.1 payload.

This helper only shapes a contract-conformant record. It does not verify an
upstream authenticator, issue a session, mutate gateway behavior, or persist
the record.
"""

if proof_source not in _ALLOWED_PROOF_SOURCES:
raise ValueError(f"unsupported proof_source: {proof_source}")
if result not in _ALLOWED_RESULTS:
raise ValueError(f"unsupported result: {result}")
if not tenant_id:
raise ValueError("tenant_id is required")

record: dict[str, Any] = {
"version": "0.1",
"proof_record_id": proof_record_id or f"proof_{uuid4()}",
"proof_source": proof_source,
"tenant_id": tenant_id,
"received_at": received_at or utc_now_iso(),
"result": result,
}

optional_fields: dict[str, Any | None] = {
"subject_id": subject_id,
"issuer_ref": issuer_ref,
"upstream_subject": upstream_subject,
"assurance_context": assurance_context,
"evidence_refs": evidence_refs,
"correlation_id": correlation_id,
}
for key, value in optional_fields.items():
if value is not None:
record[key] = value

return record
80 changes: 80 additions & 0 deletions tools/tests/test_identity_prime_proof_ingress.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
from __future__ import annotations

import importlib.util
import json
from pathlib import Path
from types import ModuleType

from jsonschema import Draft202012Validator


def load_module(repo_root: Path) -> ModuleType:
module_path = repo_root / "apps" / "identity-prime" / "src" / "identity_prime" / "proof_ingress.py"
spec = importlib.util.spec_from_file_location("identity_prime_proof_ingress", module_path)
assert spec is not None
assert spec.loader is not None
module = importlib.util.module_from_spec(spec)
spec.loader.exec_module(module)
return module


def load_json(path: Path) -> object:
with path.open("r", encoding="utf-8") as handle:
return json.load(handle)


def test_emit_proof_ingress_record_validates_against_contract() -> None:
repo_root = Path(__file__).resolve().parents[2]
module = load_module(repo_root)
schema = load_json(repo_root / "contracts" / "identity" / "IdentityProofIngressRecord.v0.1.json")
record = module.emit_proof_ingress_record(
proof_source="enterprise_oidc",
tenant_id="tenant_acme",
result="accepted",
subject_id="subj_01HUMAN123",
issuer_ref="issuer_acme_oidc",
upstream_subject="00u-example-subject",
assurance_context={"level": "aal2_phishing_resistant_target"},
evidence_refs=["evidence_proof_accepted_0001"],
correlation_id="corr_identity_ingress_0001",
proof_record_id="proof_enterprise_oidc_0001",
received_at="2026-05-04T19:50:00Z",
)

Draft202012Validator.check_schema(schema)
Draft202012Validator(schema).validate(record)
assert record["proof_record_id"] == "proof_enterprise_oidc_0001"
assert record["proof_source"] == "enterprise_oidc"
assert record["result"] == "accepted"


def test_emit_proof_ingress_record_rejects_invalid_source() -> None:
repo_root = Path(__file__).resolve().parents[2]
module = load_module(repo_root)

try:
module.emit_proof_ingress_record(
proof_source="unknown_source",
tenant_id="tenant_acme",
result="accepted",
)
except ValueError as exc:
assert "unsupported proof_source" in str(exc)
else:
raise AssertionError("expected ValueError for unsupported proof_source")


def test_emit_proof_ingress_record_requires_tenant_id() -> None:
repo_root = Path(__file__).resolve().parents[2]
module = load_module(repo_root)

try:
module.emit_proof_ingress_record(
proof_source="enterprise_oidc",
tenant_id="",
result="accepted",
)
except ValueError as exc:
assert "tenant_id is required" in str(exc)
else:
raise AssertionError("expected ValueError for missing tenant_id")
Loading