Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
118 changes: 83 additions & 35 deletions src/commit_gate_core/gate.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,17 @@
"signature",
)

# Fields copied into the audit receipt snapshot before mutation_callback runs.
# This is the authorised record shape: post-callback mutation cannot alter it.
_SNAPSHOT_FIELDS: tuple[str, ...] = (
"actor_id",
"action",
"object_id",
"environment",
"commit_hash",
"policy_version",
)


@dataclass(frozen=True)
class GateResult:
Expand Down Expand Up @@ -99,8 +110,8 @@ class CommitGate:
4. verify exact scope
5. verify nonce unused
6. consume nonce
7. call mutation callback
8. append audit on every exit
7. call mutation callback (record snapshot taken before this step)
8. append audit on every exit (audit failure returns controlled GateResult)
"""

def __init__(
Expand Down Expand Up @@ -138,6 +149,10 @@ def execute(
decision_id: Optional[str] = None
nonce: Optional[str] = None
nonce_consumed = False
# record_snapshot is set after all validation passes, before mutation.
# _finish() uses this snapshot so post-callback mutation cannot alter
# what the audit receipt says was authorised. (Issue #11)
record_snapshot: Optional[Mapping[str, Any]] = None

try:
now = self._clock.now()
Expand All @@ -157,7 +172,7 @@ def execute(
code=f"DENY:{structural_error}",
decision_id=decision_id,
attempted=self._attempt(actor_id, action, object_id, environment, commit_hash),
record=record,
record_snapshot=self._snapshot(record),
)

decision_id = str(record["decision_id"])
Expand All @@ -168,15 +183,15 @@ def execute(
code=f"DENY:VERDICT_NOT_ALLOW:{record['verdict']}",
decision_id=decision_id,
attempted=self._attempt(actor_id, action, object_id, environment, commit_hash),
record=record,
record_snapshot=self._snapshot(record),
)

if record["policy_version"] not in self._accepted_policy_versions:
return self._deny(
code=f"DENY:POLICY_VERSION_REJECTED:{record['policy_version']}",
decision_id=decision_id,
attempted=self._attempt(actor_id, action, object_id, environment, commit_hash),
record=record,
record_snapshot=self._snapshot(record),
)

# 2. verify signature
Expand All @@ -185,7 +200,7 @@ def execute(
code="DENY:INVALID_SIGNATURE",
decision_id=decision_id,
attempted=self._attempt(actor_id, action, object_id, environment, commit_hash),
record=record,
record_snapshot=self._snapshot(record),
)

# 3. verify timestamps
Expand All @@ -195,14 +210,14 @@ def execute(
code="DENY:ISSUED_AT_IN_FUTURE",
decision_id=decision_id,
attempted=self._attempt(actor_id, action, object_id, environment, commit_hash),
record=record,
record_snapshot=self._snapshot(record),
)
if now > expires_at:
return self._deny(
code="DENY:DECISION_EXPIRED",
decision_id=decision_id,
attempted=self._attempt(actor_id, action, object_id, environment, commit_hash),
record=record,
record_snapshot=self._snapshot(record),
)

# 4. verify exact scope
Expand All @@ -219,7 +234,7 @@ def execute(
code=f"DENY:{scope_error}",
decision_id=decision_id,
attempted=self._attempt(actor_id, action, object_id, environment, commit_hash),
record=record,
record_snapshot=self._snapshot(record),
)

# 5. verify nonce unused
Expand All @@ -228,9 +243,13 @@ def execute(
code="DENY:NONCE_REPLAYED",
decision_id=decision_id,
attempted=self._attempt(actor_id, action, object_id, environment, commit_hash),
record=record,
record_snapshot=self._snapshot(record),
)

# Issue #11: snapshot authorised record fields NOW — after all
# validation passes, before mutation_callback can alter the record.
record_snapshot = self._snapshot(record)

# 6 + 7. consume nonce then mutate. Roll back nonce if mutation fails.
self._nonce_ledger.consume(nonce, decision_id)
nonce_consumed = True
Expand All @@ -245,7 +264,7 @@ def execute(
code=f"ROLLBACK:MUTATION_FAILED:{type(exc).__name__}",
decision_id=decision_id,
attempted=self._attempt(actor_id, action, object_id, environment, commit_hash),
record=record,
record_snapshot=record_snapshot,
)
except Exception as rollback_exc:
return self._error(
Expand All @@ -255,21 +274,21 @@ def execute(
),
decision_id=decision_id,
attempted=self._attempt(actor_id, action, object_id, environment, commit_hash),
record=record,
record_snapshot=record_snapshot,
)

return self._allow(
decision_id=decision_id,
attempted=self._attempt(actor_id, action, object_id, environment, commit_hash),
record=record,
record_snapshot=record_snapshot,
)

except ValueError as exc:
return self._deny(
code=f"DENY:{str(exc)}",
decision_id=decision_id,
attempted=self._attempt(actor_id, action, object_id, environment, commit_hash),
record=record,
record_snapshot=record_snapshot,
)
except Exception as exc:
# Last-resort fail-closed path. Try to roll back a consumed nonce.
Expand All @@ -280,7 +299,7 @@ def execute(
code=f"ROLLBACK:UNEXPECTED:{type(exc).__name__}",
decision_id=decision_id,
attempted=self._attempt(actor_id, action, object_id, environment, commit_hash),
record=record,
record_snapshot=record_snapshot,
)
except Exception as rollback_exc:
return self._error(
Expand All @@ -290,15 +309,25 @@ def execute(
),
decision_id=decision_id,
attempted=self._attempt(actor_id, action, object_id, environment, commit_hash),
record=record,
record_snapshot=record_snapshot,
)
return self._error(
code=f"ERROR:UNEXPECTED:{type(exc).__name__}",
decision_id=decision_id,
attempted=self._attempt(actor_id, action, object_id, environment, commit_hash),
record=record,
record_snapshot=record_snapshot,
)

def _snapshot(self, record: Mapping[str, Any]) -> Mapping[str, Any]:
"""Return an immutable copy of authorised record fields.

Called after all validation passes, before mutation_callback runs.
_finish() uses this snapshot for record_scope so the audit receipt
reflects the authorised record, not whatever the callback may have
mutated. (Issue #11)
"""
return {field: record.get(field) for field in _SNAPSHOT_FIELDS}

def _structural_error(self, record: Mapping[str, Any]) -> Optional[str]:
missing = [field for field in REQUIRED_FIELDS if field not in record]
if missing:
Expand Down Expand Up @@ -352,14 +381,14 @@ def _allow(
*,
decision_id: str,
attempted: Mapping[str, Any],
record: Mapping[str, Any],
record_snapshot: Optional[Mapping[str, Any]],
) -> GateResult:
return self._finish(
allowed=True,
code="ALLOW",
decision_id=decision_id,
attempted=attempted,
record=record,
record_snapshot=record_snapshot,
)

def _deny(
Expand All @@ -368,14 +397,14 @@ def _deny(
code: str,
decision_id: Optional[str],
attempted: Mapping[str, Any],
record: Optional[Mapping[str, Any]] = None,
record_snapshot: Optional[Mapping[str, Any]] = None,
) -> GateResult:
return self._finish(
allowed=False,
code=code,
decision_id=decision_id,
attempted=attempted,
record=record,
record_snapshot=record_snapshot,
)

def _error(
Expand All @@ -384,14 +413,14 @@ def _error(
code: str,
decision_id: Optional[str],
attempted: Mapping[str, Any],
record: Optional[Mapping[str, Any]] = None,
record_snapshot: Optional[Mapping[str, Any]] = None,
) -> GateResult:
return self._finish(
allowed=False,
code=code,
decision_id=decision_id,
attempted=attempted,
record=record,
record_snapshot=record_snapshot,
)

def _finish(
Expand All @@ -401,33 +430,52 @@ def _finish(
code: str,
decision_id: Optional[str],
attempted: Mapping[str, Any],
record: Optional[Mapping[str, Any]],
record_snapshot: Optional[Mapping[str, Any]],
) -> GateResult:
"""Build GateResult and attempt to append audit event.

Issue #10: audit.append is wrapped so that a failing audit sink
never escapes _finish() as an unhandled exception. If append raises,
_finish() returns a controlled GateResult with
ERROR:AUDIT_APPEND_FAILED:<ExceptionType>. The original result is
discarded so the caller is never misled about receipt status.
"""
timestamp = self._clock.now().astimezone(timezone.utc).isoformat().replace("+00:00", "Z")
result = GateResult(
allowed=allowed,
code=code,
decision_id=decision_id,
timestamp=timestamp,
)
audit_event = {
audit_event: dict[str, Any] = {
"event_type": "GATE_EVALUATION",
"allowed": result.allowed,
"code": result.code,
"decision_id": result.decision_id,
"timestamp": result.timestamp,
"attempted": dict(attempted),
}
if record is not None:
audit_event["record_scope"] = {
"actor_id": record.get("actor_id"),
"action": record.get("action"),
"object_id": record.get("object_id"),
"environment": record.get("environment"),
"commit_hash": record.get("commit_hash"),
"policy_version": record.get("policy_version"),
}
self._audit.append(audit_event)
# Issue #11: record_scope is built from the pre-mutation snapshot,
# not the live record. The callback cannot alter what is logged here.
if record_snapshot is not None:
audit_event["record_scope"] = dict(record_snapshot)

try:
self._audit.append(audit_event)
except Exception as audit_exc:
# Issue #10: controlled audit failure — never raise out of _finish().
# Return a new GateResult that honestly reports the audit failure.
# allowed is always False: we cannot confirm the receipt was written.
audit_fail_timestamp = (
self._clock.now().astimezone(timezone.utc).isoformat().replace("+00:00", "Z")
)
return GateResult(
allowed=False,
code=f"ERROR:AUDIT_APPEND_FAILED:{type(audit_exc).__name__}",
decision_id=decision_id,
timestamp=audit_fail_timestamp,
)

return result

def _attempt(
Expand Down
Loading
Loading