Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 10 additions & 3 deletions agent_assembly/core/assembly.py
Original file line number Diff line number Diff line change
Expand Up @@ -167,9 +167,16 @@ def init_assembly(
:param enforcement_mode: Per-agent governance posture applied to this
agent's actions (see :data:`EnforcementMode`). Defaults to ``None``,
which lets the gateway apply its server-side default (live ``enforce``).
Pass ``"observe"`` to register the agent in dry-run / sandbox mode:
every action
proceeds and the gateway records would-be violations as shadow audit
Because that default is ``enforce``, the SDK's *local* pre-execution
fast path also takes the fail-closed posture under ``None`` (AAASM-4130):
with the native runtime present an unreachable socket or an
unauthoritative ``query_policy`` **denies** rather than silently
proceeding, and on a pure-Python install (native extension absent) a loud
one-time warning is emitted because no in-process deny can run — the
gateway / proxy / eBPF layers remain authoritative, so init stays graceful
and never hard-fails on a missing runtime. Pass ``"observe"`` to register
the agent in dry-run / sandbox mode: every action proceeds (local checks
fail open) and the gateway records would-be violations as shadow audit
events.
"""
gateway_url = resolve_gateway_url(gateway_url)
Expand Down
80 changes: 64 additions & 16 deletions agent_assembly/core/runtime_interceptor.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,14 +14,17 @@

Failure posture is governed by ``enforcement_mode`` (AAASM-3106):

* Under ``enforce``, the SDK is a security control and **fails closed**. When the
native extension is missing the bare client is returned (no native authority
exists to consult — see :func:`build_governance_interceptor`), but once the
native extension is present every other failure denies: an unreachable runtime
socket yields a deny-all interceptor, a raising ``query_policy`` maps to
``deny``, and a native ``decision`` that is itself an error sentinel
(``query_failed`` / ``channel_closed``) maps to ``deny`` rather than allow.
* Under ``observe`` / ``disabled`` (or when no mode is supplied), the SDK is a
* Under an **enforce** posture — explicit ``enforce`` *or* the ``None`` default,
which is advertised as the gateway's live ``enforce`` (AAASM-4130) — the SDK is
a security control and **fails closed**. When the native extension is missing the
bare client is returned (no native authority exists to consult — see
:func:`build_governance_interceptor`) but a loud one-time warning is emitted so the
skipped in-process deny is not silent; once the native extension is present every
other failure denies: an unreachable runtime socket yields a deny-all interceptor,
a raising ``query_policy`` maps to ``deny``, and a native ``decision`` that is
itself an error sentinel (``query_failed`` / ``channel_closed``) maps to ``deny``
rather than allow.
* Under the explicit dry-run postures **observe** / **disabled**, the SDK is a
dry-run / hermetic-test layer and **fails open**: an unreachable runtime
returns the bare client unchanged, a raising or error ``query_policy``
proceeds, exactly as before.
Expand All @@ -35,6 +38,7 @@
import json
import os
import stat
import warnings
from importlib import metadata
from typing import Any

Expand All @@ -57,6 +61,40 @@
_ALLOW_DECISIONS = frozenset({"allow", "redact", "unspecified"})


def _local_posture_is_enforce(enforcement_mode: str | None) -> bool:
"""Whether the SDK's *local* failure posture must fail closed (AAASM-4130).

``enforcement_mode is None`` is the advertised default: it registers the agent
under the gateway's server-side default, which is live ``enforce``. So for the
SDK's own error posture ``None`` must behave exactly like ``enforce`` — an
unreachable runtime or an unauthoritative query denies rather than silently
proceeding. Only the explicit dry-run postures (``observe`` / ``disabled``) fail
open locally.
"""
return enforcement_mode is None or enforcement_mode == ENFORCE_MODE


def _warn_sdk_enforcement_unavailable() -> None:
"""Warn (loudly, once) that no *local* pre-execution deny can run (AAASM-4130).

Under an enforce posture (the ``None`` default or explicit ``enforce``) a policy
``deny`` is advertised to block the tool before it runs, but that in-process fast
path needs the native ``agent_assembly._core`` extension. On a pure-Python install
it is absent, so no local deny is possible. Rather than silently fail open (Python
diverging from go's fail-closed and node's ``ConfigurationError``), emit a warning
so the gap is visible; the gateway / proxy / eBPF layers remain authoritative, so
the SDK stays advisory and does not hard-fail on a missing runtime.
"""
warnings.warn(
"Agent Assembly enforcement is active but the native runtime extension "
"(agent_assembly._core) is not installed, so the SDK cannot apply a local "
"pre-execution deny; tool calls proceed in-process and rely on the gateway / "
"proxy / eBPF layers for enforcement. Install the native extension to enable "
"the in-process deny fast path.",
stacklevel=2,
)


def _resolve_runtime_socket_path(agent_id: str) -> str:
"""Resolve the runtime UDS path: ``AA_RUNTIME_SOCKET`` > default convention.

Expand Down Expand Up @@ -162,8 +200,9 @@ class RuntimeQueryInterceptor:
only *adds* ``check_tool_start``.

The failure posture of the added check is governed by ``enforce``: when
``True`` (``enforcement_mode == "enforce"``) any path that cannot obtain an
authoritative allow — a raising ``query_policy`` or an error-sentinel
``True`` (an enforce posture — explicit ``enforce`` or the ``None`` default,
resolved in :func:`build_governance_interceptor`) any path that cannot obtain
an authoritative allow — a raising ``query_policy`` or an error-sentinel
``decision`` — maps to ``deny`` (fail closed). When ``False`` those paths
proceed (fail open), preserving the observe / disabled behavior.
"""
Expand Down Expand Up @@ -461,14 +500,19 @@ def build_governance_interceptor(

When a native runtime is reachable, wrap ``client`` in a
:class:`RuntimeQueryInterceptor` so a runtime ``deny`` actually blocks the
tool. The failure posture depends on ``enforcement_mode`` (AAASM-3106):
tool. The failure posture depends on ``enforcement_mode`` (AAASM-3106). The
``None`` default is advertised as the gateway's live ``enforce``, so it takes
the same *local* fail-closed posture as an explicit ``enforce`` (AAASM-4130);
only ``observe`` / ``disabled`` fail open locally:

* The native extension is **missing**: return ``client`` unchanged in every
mode. There is no native authority to consult, so there is nothing to fail
closed *to* — the SDK fast path is simply not engaged.
mode — there is no native authority to fail closed *to*. Under an enforce
posture this would silently skip the in-process deny, so a loud one-time
warning is emitted first (AAASM-4130) rather than failing open silently.
* The native extension is **present** but the runtime socket is unreachable:
under ``enforce`` return a deny-all :class:`_FailClosedInterceptor`;
otherwise return ``client`` unchanged (fail open).
under an enforce posture (``None`` default or explicit ``enforce``) return a
deny-all :class:`_FailClosedInterceptor`; under ``observe`` / ``disabled``
return ``client`` unchanged (fail open).

:param runtime_client: A pre-connected native runtime client (e.g. the one
:func:`register_agent` registered the token on). When supplied it is
Expand All @@ -479,18 +523,22 @@ def build_governance_interceptor(
extension (fail open in every mode) from an unreachable runtime socket
(fail closed under enforce). When ``None`` it is detected here.
"""
enforce = enforcement_mode == ENFORCE_MODE
enforce = _local_posture_is_enforce(enforcement_mode)

if runtime_client is None and native_available is None:
# No caller-supplied client or hint: detect + connect ourselves.
if not _native_core_available():
if enforce:
_warn_sdk_enforcement_unavailable()
return client
runtime_client = connect_runtime_client(agent_id)
native_available = True

if runtime_client is None:
# Native missing → no authority to fail closed to, return bare client.
if not native_available:
if enforce:
_warn_sdk_enforcement_unavailable()
return client
# Native present but runtime unreachable: deny everything under enforce
# (fail closed); proceed under observe / disabled (fail open).
Expand Down
80 changes: 77 additions & 3 deletions test/unit/core/test_runtime_interceptor.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
from agent_assembly.core import runtime_interceptor
from agent_assembly.core.runtime_interceptor import (
RuntimeQueryInterceptor,
_FailClosedInterceptor,
build_governance_interceptor,
)
from agent_assembly.exceptions import OpTerminatedError, ToolExecutionBlockedError
Expand Down Expand Up @@ -170,8 +171,9 @@ def _no_core_import(name: str, *args: Any, **kwargs: Any) -> Any:
def test_build_interceptor_returns_bare_client_when_connect_fails(
monkeypatch: pytest.MonkeyPatch,
) -> None:
"""Native extension present but no reachable runtime: connect() raises, so
the bare client is returned (fail-open) rather than a denying interceptor."""
"""Under ``observe`` a native extension present but no reachable runtime:
connect() raises, so the bare client is returned (fail-open) rather than a
denying interceptor. (The ``None`` default now fails closed here — AAASM-4130.)"""

class _UnreachableRuntimeClient:
@staticmethod
Expand All @@ -183,7 +185,7 @@ def connect(_socket_path: str) -> Any:
monkeypatch.setitem(sys.modules, "agent_assembly._core", fake_core)

client = _FakeGatewayClient()
result = build_governance_interceptor(client, "agent-001")
result = build_governance_interceptor(client, "agent-001", "observe")

assert result is client

Expand Down Expand Up @@ -214,6 +216,78 @@ def connect(_socket_path: str) -> Any:
}


def test_default_mode_unreachable_runtime_fails_closed(monkeypatch: pytest.MonkeyPatch) -> None:
"""AAASM-4130: with no ``enforcement_mode`` (the advertised ``enforce`` default)
a native-present-but-unreachable runtime yields a deny-all interceptor, not the
silent fail-open bare client. The default posture must match its advertised
``enforce`` behavior locally."""

class _UnreachableRuntimeClient:
@staticmethod
def connect(_socket_path: str) -> Any:
raise OSError("no such socket")

fake_core = types.ModuleType("agent_assembly._core")
fake_core.RuntimeClient = _UnreachableRuntimeClient # type: ignore[attr-defined]
monkeypatch.setitem(sys.modules, "agent_assembly._core", fake_core)

client = _FakeGatewayClient()
result = build_governance_interceptor(client, "agent-001") # no mode -> default

assert isinstance(result, _FailClosedInterceptor)
assert result.check_tool_start(serialized={"name": "t"}, input_str="i")["status"] == "deny"


def test_default_mode_warns_when_native_core_missing(monkeypatch: pytest.MonkeyPatch) -> None:
"""AAASM-4130: pure-Python install (native extension absent) under the default
``enforce`` posture cannot run a local deny, so it must warn loudly rather than
fail open silently. The bare client is still returned (init stays graceful)."""
monkeypatch.delitem(sys.modules, "agent_assembly._core", raising=False)

import builtins

real_import = builtins.__import__

def _no_core_import(name: str, *args: Any, **kwargs: Any) -> Any:
if name == "agent_assembly._core":
raise ImportError("native extension unavailable")
return real_import(name, *args, **kwargs)

monkeypatch.setattr(builtins, "__import__", _no_core_import)

client = _FakeGatewayClient()
with pytest.warns(UserWarning, match="native runtime extension"):
result = build_governance_interceptor(client, "agent-001") # no mode -> default

assert result is client
assert not hasattr(result, "check_tool_start")


def test_observe_mode_does_not_warn_when_native_core_missing(monkeypatch: pytest.MonkeyPatch) -> None:
"""The warning is scoped to the enforce posture: an explicit ``observe`` dry-run
with no native extension legitimately fails open and must stay silent."""
monkeypatch.delitem(sys.modules, "agent_assembly._core", raising=False)

import builtins
import warnings

real_import = builtins.__import__

def _no_core_import(name: str, *args: Any, **kwargs: Any) -> Any:
if name == "agent_assembly._core":
raise ImportError("native extension unavailable")
return real_import(name, *args, **kwargs)

monkeypatch.setattr(builtins, "__import__", _no_core_import)

client = _FakeGatewayClient()
with warnings.catch_warnings():
warnings.simplefilter("error")
result = build_governance_interceptor(client, "agent-001", "observe")

assert result is client


def test_enforce_query_raising_fails_closed() -> None:
"""AAASM-3106: under enforce a raising query_policy denies, not allows."""

Expand Down