diff --git a/agent_assembly/core/runtime_interceptor.py b/agent_assembly/core/runtime_interceptor.py index 4eb7ded..f3d3259 100644 --- a/agent_assembly/core/runtime_interceptor.py +++ b/agent_assembly/core/runtime_interceptor.py @@ -279,6 +279,13 @@ def check_tool_start( # proceed (fail open). return self._on_query_failure("runtime query failed") + # A non-dict result (e.g. ``None`` from a malformed/partial runtime + # response) is not an authoritative verdict; route it through the + # fail-closed path rather than raising ``AttributeError`` on ``.get`` + # (AAASM-4167). + if not isinstance(result, dict): + return self._on_query_failure("runtime returned non-dict result") + # No ``"allow"`` default: a missing / empty ``decision`` is not an # authoritative allow and must route through the fail-closed path below # under ``enforce`` (AAASM-4014). diff --git a/test/unit/core/test_runtime_interceptor.py b/test/unit/core/test_runtime_interceptor.py index d2c7603..9a1cb83 100644 --- a/test/unit/core/test_runtime_interceptor.py +++ b/test/unit/core/test_runtime_interceptor.py @@ -322,6 +322,40 @@ def test_observe_error_decision_still_fails_open(decision: str) -> None: assert result == {"status": "allow"} +class _NonDictRuntimeClient: + """A runtime whose query_policy returns a non-dict (e.g. ``None``).""" + + def __init__(self, result: Any) -> None: + self._result = result + + def query_policy(self, *_args: Any, **_kwargs: Any) -> Any: + return self._result + + +@pytest.mark.parametrize("bad_result", [None, "deny", 42, ["deny"]]) +def test_enforce_non_dict_return_fails_closed(bad_result: Any) -> None: + """AAASM-4167: a non-dict query_policy return denies under enforce rather + than raising AttributeError on ``result.get``.""" + interceptor = RuntimeQueryInterceptor( + _FakeGatewayClient(), _NonDictRuntimeClient(bad_result), "agent-001", enforce=True + ) + + result = interceptor.check_tool_start(serialized={"name": "t"}, input_str="i") + + assert result["status"] == "deny" + + +@pytest.mark.parametrize("bad_result", [None, "deny", 42, ["deny"]]) +def test_observe_non_dict_return_fails_open(bad_result: Any) -> None: + """AAASM-4167: without enforce a non-dict return degrades to a clean allow, + not an AttributeError.""" + interceptor = RuntimeQueryInterceptor(_FakeGatewayClient(), _NonDictRuntimeClient(bad_result), "agent-001") + + result = interceptor.check_tool_start(serialized={"name": "t"}, input_str="i") + + assert result == {"status": "allow"} + + def test_enforce_unreachable_runtime_denies_all(monkeypatch: pytest.MonkeyPatch) -> None: """AAASM-3106: native present but runtime unreachable yields a deny-all interceptor under enforce, not the fail-open bare client."""