Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 12 additions & 9 deletions agent_assembly/core/runtime_interceptor.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,10 +55,11 @@

# Native decisions that authoritatively permit the tool to proceed. ``deny`` and
# ``pending`` are handled explicitly; only these are treated as an allow. Anything
# else β€” an unknown string, an empty value, or a missing ``decision`` key β€” is not
# an authoritative allow and must fail closed under ``enforce`` (AAASM-4014); the
# runtime remains the authority on redaction, so ``redact`` proceeds here.
_ALLOW_DECISIONS = frozenset({"allow", "redact", "unspecified"})
# else β€” the proto3 zero value ``unspecified`` ("no decision rendered"), an unknown
# string, an empty value, or a missing ``decision`` key β€” is not an authoritative
# allow and must fail closed under ``enforce`` (AAASM-4014, AAASM-4166), matching the
# Node SDK; the runtime remains the authority on redaction, so ``redact`` proceeds.
_ALLOW_DECISIONS = frozenset({"allow", "redact"})


def _local_posture_is_enforce(enforcement_mode: str | None) -> bool:
Expand Down Expand Up @@ -247,11 +248,13 @@ def check_tool_start(
* ``"deny"`` β†’ ``{"status": "deny", "reason": ...}``.
* ``"pending"`` β†’ ``{"status": "pending", "reason": ...}`` so the
adapter's existing approval path runs.
* ``"allow"`` / ``"redact"`` / ``"unspecified"`` β†’ ``{"status": "allow"}``.
The runtime redacts authoritatively; this layer never redacts.
* A raising ``query_policy`` or an error-sentinel ``decision``
(``query_failed`` / ``channel_closed`` / ``shutdown``) β†’ ``deny`` under
``enforce`` (fail closed, AAASM-3106), else ``allow`` (fail open).
* ``"allow"`` / ``"redact"`` β†’ ``{"status": "allow"}``. The runtime
redacts authoritatively; this layer never redacts.
* A raising ``query_policy``, the proto3 zero value ``"unspecified"``
("no decision rendered"), an error-sentinel ``decision``
(``query_failed`` / ``channel_closed`` / ``shutdown``), or any
unrecognized decision β†’ ``deny`` under ``enforce`` (fail closed,
AAASM-3106 / AAASM-4166, matching Node), else ``allow`` (fail open).

Before any of the above, the live op-control kill switch (AAASM-3491) is
consulted when an ``op_id`` is supplied and a subscriber is wired: a
Expand Down
30 changes: 28 additions & 2 deletions test/unit/core/test_runtime_interceptor.py
Original file line number Diff line number Diff line change
Expand Up @@ -615,17 +615,43 @@ def query_policy(self, *_args: Any, **_kwargs: Any) -> dict[str, str]:
assert result["status"] == "deny"


@pytest.mark.parametrize("decision", ["allow", "redact", "unspecified"])
@pytest.mark.parametrize("decision", ["allow", "redact"])
def test_known_good_decisions_allow_under_enforce(decision: str) -> None:
"""Authoritative allow verdicts still proceed under enforce; the runtime
remains the authority on redaction, so ``redact`` proceeds here."""
remains the authority on redaction, so ``redact`` proceeds here. ``unspecified``
is no longer among them β€” it fails closed (AAASM-4166)."""
interceptor = RuntimeQueryInterceptor(_FakeGatewayClient(), _FakeRuntimeClient(decision), "agent-001", enforce=True)

result = interceptor.check_tool_start(serialized={"name": "t"}, input_str="i")

assert result == {"status": "allow"}


def test_unspecified_decision_denies_under_enforce() -> None:
"""AAASM-4166: the proto3 zero value ``unspecified`` ("no decision rendered")
is not an authoritative allow β€” it must fail closed under enforce (deny),
matching the Node SDK, rather than being folded onto allow as before."""
interceptor = RuntimeQueryInterceptor(
_FakeGatewayClient(), _FakeRuntimeClient("unspecified"), "agent-001", enforce=True
)

result = interceptor.check_tool_start(serialized={"name": "t"}, input_str="i")

assert result["status"] == "deny"


def test_unspecified_decision_allows_under_observe() -> None:
"""Under observe the ``unspecified`` verdict still proceeds (fail open), like
any other non-authoritative decision."""
interceptor = RuntimeQueryInterceptor(
_FakeGatewayClient(), _FakeRuntimeClient("unspecified"), "agent-001", enforce=False
)

result = interceptor.check_tool_start(serialized={"name": "t"}, input_str="i")

assert result == {"status": "allow"}


def test_unknown_decision_allows_under_observe() -> None:
"""Under observe the unknown-decision path still proceeds (fail open)."""
interceptor = RuntimeQueryInterceptor(
Expand Down