diff --git a/src/commit_gate_core/gate.py b/src/commit_gate_core/gate.py index 042dc8f..afc612d 100644 --- a/src/commit_gate_core/gate.py +++ b/src/commit_gate_core/gate.py @@ -30,6 +30,17 @@ "signature", ) +# Fields copied into the audit receipt snapshot before mutation_callback runs. +# This is the authorised record shape: post-callback mutation cannot alter it. +_SNAPSHOT_FIELDS: tuple[str, ...] = ( + "actor_id", + "action", + "object_id", + "environment", + "commit_hash", + "policy_version", +) + @dataclass(frozen=True) class GateResult: @@ -99,8 +110,8 @@ class CommitGate: 4. verify exact scope 5. verify nonce unused 6. consume nonce - 7. call mutation callback - 8. append audit on every exit + 7. call mutation callback (record snapshot taken before this step) + 8. append audit on every exit (audit failure returns controlled GateResult) """ def __init__( @@ -138,6 +149,10 @@ def execute( decision_id: Optional[str] = None nonce: Optional[str] = None nonce_consumed = False + # record_snapshot is set after all validation passes, before mutation. + # _finish() uses this snapshot so post-callback mutation cannot alter + # what the audit receipt says was authorised. (Issue #11) + record_snapshot: Optional[Mapping[str, Any]] = None try: now = self._clock.now() @@ -157,7 +172,7 @@ def execute( code=f"DENY:{structural_error}", decision_id=decision_id, attempted=self._attempt(actor_id, action, object_id, environment, commit_hash), - record=record, + record_snapshot=self._snapshot(record), ) decision_id = str(record["decision_id"]) @@ -168,7 +183,7 @@ def execute( code=f"DENY:VERDICT_NOT_ALLOW:{record['verdict']}", decision_id=decision_id, attempted=self._attempt(actor_id, action, object_id, environment, commit_hash), - record=record, + record_snapshot=self._snapshot(record), ) if record["policy_version"] not in self._accepted_policy_versions: @@ -176,7 +191,7 @@ def execute( code=f"DENY:POLICY_VERSION_REJECTED:{record['policy_version']}", decision_id=decision_id, attempted=self._attempt(actor_id, action, object_id, environment, commit_hash), - record=record, + record_snapshot=self._snapshot(record), ) # 2. verify signature @@ -185,7 +200,7 @@ def execute( code="DENY:INVALID_SIGNATURE", decision_id=decision_id, attempted=self._attempt(actor_id, action, object_id, environment, commit_hash), - record=record, + record_snapshot=self._snapshot(record), ) # 3. verify timestamps @@ -195,14 +210,14 @@ def execute( code="DENY:ISSUED_AT_IN_FUTURE", decision_id=decision_id, attempted=self._attempt(actor_id, action, object_id, environment, commit_hash), - record=record, + record_snapshot=self._snapshot(record), ) if now > expires_at: return self._deny( code="DENY:DECISION_EXPIRED", decision_id=decision_id, attempted=self._attempt(actor_id, action, object_id, environment, commit_hash), - record=record, + record_snapshot=self._snapshot(record), ) # 4. verify exact scope @@ -219,7 +234,7 @@ def execute( code=f"DENY:{scope_error}", decision_id=decision_id, attempted=self._attempt(actor_id, action, object_id, environment, commit_hash), - record=record, + record_snapshot=self._snapshot(record), ) # 5. verify nonce unused @@ -228,9 +243,13 @@ def execute( code="DENY:NONCE_REPLAYED", decision_id=decision_id, attempted=self._attempt(actor_id, action, object_id, environment, commit_hash), - record=record, + record_snapshot=self._snapshot(record), ) + # Issue #11: snapshot authorised record fields NOW — after all + # validation passes, before mutation_callback can alter the record. + record_snapshot = self._snapshot(record) + # 6 + 7. consume nonce then mutate. Roll back nonce if mutation fails. self._nonce_ledger.consume(nonce, decision_id) nonce_consumed = True @@ -245,7 +264,7 @@ def execute( code=f"ROLLBACK:MUTATION_FAILED:{type(exc).__name__}", decision_id=decision_id, attempted=self._attempt(actor_id, action, object_id, environment, commit_hash), - record=record, + record_snapshot=record_snapshot, ) except Exception as rollback_exc: return self._error( @@ -255,13 +274,13 @@ def execute( ), decision_id=decision_id, attempted=self._attempt(actor_id, action, object_id, environment, commit_hash), - record=record, + record_snapshot=record_snapshot, ) return self._allow( decision_id=decision_id, attempted=self._attempt(actor_id, action, object_id, environment, commit_hash), - record=record, + record_snapshot=record_snapshot, ) except ValueError as exc: @@ -269,7 +288,7 @@ def execute( code=f"DENY:{str(exc)}", decision_id=decision_id, attempted=self._attempt(actor_id, action, object_id, environment, commit_hash), - record=record, + record_snapshot=record_snapshot, ) except Exception as exc: # Last-resort fail-closed path. Try to roll back a consumed nonce. @@ -280,7 +299,7 @@ def execute( code=f"ROLLBACK:UNEXPECTED:{type(exc).__name__}", decision_id=decision_id, attempted=self._attempt(actor_id, action, object_id, environment, commit_hash), - record=record, + record_snapshot=record_snapshot, ) except Exception as rollback_exc: return self._error( @@ -290,15 +309,25 @@ def execute( ), decision_id=decision_id, attempted=self._attempt(actor_id, action, object_id, environment, commit_hash), - record=record, + record_snapshot=record_snapshot, ) return self._error( code=f"ERROR:UNEXPECTED:{type(exc).__name__}", decision_id=decision_id, attempted=self._attempt(actor_id, action, object_id, environment, commit_hash), - record=record, + record_snapshot=record_snapshot, ) + def _snapshot(self, record: Mapping[str, Any]) -> Mapping[str, Any]: + """Return an immutable copy of authorised record fields. + + Called after all validation passes, before mutation_callback runs. + _finish() uses this snapshot for record_scope so the audit receipt + reflects the authorised record, not whatever the callback may have + mutated. (Issue #11) + """ + return {field: record.get(field) for field in _SNAPSHOT_FIELDS} + def _structural_error(self, record: Mapping[str, Any]) -> Optional[str]: missing = [field for field in REQUIRED_FIELDS if field not in record] if missing: @@ -352,14 +381,14 @@ def _allow( *, decision_id: str, attempted: Mapping[str, Any], - record: Mapping[str, Any], + record_snapshot: Optional[Mapping[str, Any]], ) -> GateResult: return self._finish( allowed=True, code="ALLOW", decision_id=decision_id, attempted=attempted, - record=record, + record_snapshot=record_snapshot, ) def _deny( @@ -368,14 +397,14 @@ def _deny( code: str, decision_id: Optional[str], attempted: Mapping[str, Any], - record: Optional[Mapping[str, Any]] = None, + record_snapshot: Optional[Mapping[str, Any]] = None, ) -> GateResult: return self._finish( allowed=False, code=code, decision_id=decision_id, attempted=attempted, - record=record, + record_snapshot=record_snapshot, ) def _error( @@ -384,14 +413,14 @@ def _error( code: str, decision_id: Optional[str], attempted: Mapping[str, Any], - record: Optional[Mapping[str, Any]] = None, + record_snapshot: Optional[Mapping[str, Any]] = None, ) -> GateResult: return self._finish( allowed=False, code=code, decision_id=decision_id, attempted=attempted, - record=record, + record_snapshot=record_snapshot, ) def _finish( @@ -401,8 +430,16 @@ def _finish( code: str, decision_id: Optional[str], attempted: Mapping[str, Any], - record: Optional[Mapping[str, Any]], + record_snapshot: Optional[Mapping[str, Any]], ) -> GateResult: + """Build GateResult and attempt to append audit event. + + Issue #10: audit.append is wrapped so that a failing audit sink + never escapes _finish() as an unhandled exception. If append raises, + _finish() returns a controlled GateResult with + ERROR:AUDIT_APPEND_FAILED:. The original result is + discarded so the caller is never misled about receipt status. + """ timestamp = self._clock.now().astimezone(timezone.utc).isoformat().replace("+00:00", "Z") result = GateResult( allowed=allowed, @@ -410,7 +447,7 @@ def _finish( decision_id=decision_id, timestamp=timestamp, ) - audit_event = { + audit_event: dict[str, Any] = { "event_type": "GATE_EVALUATION", "allowed": result.allowed, "code": result.code, @@ -418,16 +455,27 @@ def _finish( "timestamp": result.timestamp, "attempted": dict(attempted), } - if record is not None: - audit_event["record_scope"] = { - "actor_id": record.get("actor_id"), - "action": record.get("action"), - "object_id": record.get("object_id"), - "environment": record.get("environment"), - "commit_hash": record.get("commit_hash"), - "policy_version": record.get("policy_version"), - } - self._audit.append(audit_event) + # Issue #11: record_scope is built from the pre-mutation snapshot, + # not the live record. The callback cannot alter what is logged here. + if record_snapshot is not None: + audit_event["record_scope"] = dict(record_snapshot) + + try: + self._audit.append(audit_event) + except Exception as audit_exc: + # Issue #10: controlled audit failure — never raise out of _finish(). + # Return a new GateResult that honestly reports the audit failure. + # allowed is always False: we cannot confirm the receipt was written. + audit_fail_timestamp = ( + self._clock.now().astimezone(timezone.utc).isoformat().replace("+00:00", "Z") + ) + return GateResult( + allowed=False, + code=f"ERROR:AUDIT_APPEND_FAILED:{type(audit_exc).__name__}", + decision_id=decision_id, + timestamp=audit_fail_timestamp, + ) + return result def _attempt( diff --git a/tests/test_beau_failure_classes.py b/tests/test_beau_failure_classes.py index 95cbc4a..988c6ab 100644 --- a/tests/test_beau_failure_classes.py +++ b/tests/test_beau_failure_classes.py @@ -98,7 +98,12 @@ def execute_valid(gate, record=None): ) +# --------------------------------------------------------------------------- +# Existing regression proofs (issues #7, #8, #9 — still open gaps) +# --------------------------------------------------------------------------- + def test_proof_consequence_ordering_exposes_mutation_before_durable_audit(): + """Issue #7 regression: mutation can occur before durable audit. Still open.""" effects = [] def mutate(record): @@ -110,10 +115,12 @@ def mutate(record): assert effects == [("sent", "user@example.com")] assert result.allowed is False - assert result.code == "ROLLBACK:UNEXPECTED:RuntimeError" + # Now returns controlled audit failure code instead of ROLLBACK:UNEXPECTED + assert result.code == "ERROR:AUDIT_APPEND_FAILED:RuntimeError" def test_proof_payload_binding_gap_allows_callback_to_create_unbound_effect(): + """Issue #8 regression: proof not bound to mutation payload. Still open.""" audit = MemoryAudit() effects = [] @@ -138,10 +145,13 @@ def mutate(record): "body": "unbound payload", } ] + # Issue #11 fixed: audit receipt uses the pre-mutation snapshot. + # The snapshot was taken before the callback ran, so object_id is correct. assert audit.events[0]["record_scope"]["object_id"] == "user@example.com" def test_atomic_commit_boundary_gap_when_audit_fails_after_nonce_and_mutation(): + """Issue #9 regression: no atomic commit boundary. Still open.""" nonce_ledger = MemoryNonceLedger() effects = [] @@ -157,38 +167,86 @@ def mutate(record): result = execute_valid(gate) assert effects == ["mutation-bound"] - assert nonce_ledger.consumed == set() - assert nonce_ledger.rolled_back == [("nonce-1", "decision-1")] + # Issue #10 fixed: audit failure returns controlled result, not exception. + # Nonce rollback no longer occurs on the audit-failure path because + # _finish() catches the audit exception directly. + # The nonce was consumed; mutation happened; audit failed. + # This is the documented open gap for #9 (atomic commit boundary). assert result.allowed is False - assert result.code == "ROLLBACK:UNEXPECTED:RuntimeError" + assert result.code == "ERROR:AUDIT_APPEND_FAILED:RuntimeError" + +# --------------------------------------------------------------------------- +# Issue #10 hardening proofs: controlled audit failure on every exit path +# --------------------------------------------------------------------------- -def test_audit_failure_on_deny_path_can_escape_without_gate_result(): +def test_audit_failure_on_deny_path_returns_controlled_gate_result(): + """Issue #10 fix: audit failure on deny path returns GateResult, never raises.""" gate = make_gate(audit=FailingAudit(), mutation_callback=lambda record: None) - try: - gate.execute( - record=None, - actor_id="actor-1", - action="email.send", - object_id="user@example.com", - environment="demo", - commit_hash="a" * 40, - ) - except RuntimeError as exc: - assert str(exc) == "audit unavailable" - else: - raise AssertionError("audit failure on deny path should currently escape") + result = gate.execute( + record=None, + actor_id="actor-1", + action="email.send", + object_id="user@example.com", + environment="demo", + commit_hash="a" * 40, + ) + + assert isinstance(result.allowed, bool) + assert result.allowed is False + assert result.code == "ERROR:AUDIT_APPEND_FAILED:RuntimeError" + assert result.decision_id is None + assert result.timestamp is not None + + +def test_audit_failure_on_scope_deny_returns_controlled_gate_result(): + """Issue #10 fix: audit failure on scope mismatch deny returns GateResult.""" + gate = make_gate(audit=FailingAudit(), mutation_callback=lambda record: None) + + result = gate.execute( + record=valid_record(), + actor_id="wrong-actor", + action="email.send", + object_id="user@example.com", + environment="demo", + commit_hash="a" * 40, + ) + + assert result.allowed is False + assert result.code == "ERROR:AUDIT_APPEND_FAILED:RuntimeError" -def test_mutable_record_can_be_changed_after_validation_before_audit_receipt(): +def test_audit_failure_on_allow_path_returns_controlled_gate_result(): + """Issue #10 fix: audit failure on allow path returns GateResult, not True. + + allowed must be False: the receipt was not written. + """ + gate = make_gate(audit=FailingAudit(), mutation_callback=lambda record: None) + + result = execute_valid(gate) + + assert result.allowed is False + assert result.code == "ERROR:AUDIT_APPEND_FAILED:RuntimeError" + + +# --------------------------------------------------------------------------- +# Issue #11 hardening proofs: pre-mutation snapshot protects audit receipt +# --------------------------------------------------------------------------- + +def test_pre_mutation_snapshot_protects_audit_receipt(): + """Issue #11 fix: callback mutation of record does not alter audit receipt.""" audit = MemoryAudit() record = valid_record() effects = [] - def mutate(record): - effects.append(("sent", record["object_id"])) - record["object_id"] = "attacker@example.com" + def mutate(rec): + effects.append(("sent", rec["object_id"])) + # Attempt to mutate the record passed to the callback. + # Before fix: this would alter what _finish() logged. + # After fix: _finish() uses the pre-mutation snapshot, so this has + # no effect on the audit receipt. + rec["object_id"] = "attacker@example.com" gate = make_gate(audit=audit, mutation_callback=mutate) @@ -196,5 +254,26 @@ def mutate(record): assert result.allowed is True assert effects == [("sent", "user@example.com")] - assert audit.events[0]["attempted"]["object_id"] == "user@example.com" - assert audit.events[0]["record_scope"]["object_id"] == "attacker@example.com" + # Audit receipt must reflect the authorised record, not the callback drift. + assert audit.events[0]["record_scope"]["object_id"] == "user@example.com" + + +def test_pre_mutation_snapshot_is_independent_of_original_dict(): + """Issue #11 fix: snapshot is a copy; original dict changes do not affect receipt.""" + audit = MemoryAudit() + record = dict(valid_record()) + + def mutate(rec): + rec["actor_id"] = "mutated-actor" + rec["action"] = "mutated-action" + rec["policy_version"] = "mutated-version" + + gate = make_gate(audit=audit, mutation_callback=mutate) + + result = execute_valid(gate, record=record) + + assert result.allowed is True + scope = audit.events[0]["record_scope"] + assert scope["actor_id"] == "actor-1" + assert scope["action"] == "email.send" + assert scope["policy_version"] == "v1"