diff --git a/docs/sage_integration.md b/docs/sage_integration.md new file mode 100644 index 000000000..ad81c9ee1 --- /dev/null +++ b/docs/sage_integration.md @@ -0,0 +1,92 @@ +# SAGE persistent memory integration + +[SAGE](https://github.com/l33tdawg/sage) (Sovereign Agent Governed +Experience) is a BFT-consensus memory layer for AI agents. This +integration wires ii-agent into a SAGE node via the official async Python +SDK so each agent turn recalls prior committed memories before the model +runs and stores an observation after the turn completes. + +## Installation + +The SAGE SDK is an optional extra — the framework runs fine without it: + +```bash +pip install "ii-agent[sage]" +``` + +## Environment variables + +All configuration is driven by environment variables. No secrets live in +the source tree. + +| Variable | Default | Description | +| ------------------------ | -------------------- | --------------------------------------------------------------------------------------------------------- | +| `SAGE_ENABLED` | `false` | Master switch. When false, `register_sage_hooks` is a no-op and the integration never contacts any node. | +| `SAGE_NODE_URL` | `http://localhost:8090` | Base URL of the SAGE REST API. | +| `SAGE_AGENT_ID` | unset | Optional display name for the agent on the SAGE network. | +| `SAGE_AGENT_KEY` | unset | Filesystem path to a 32-byte Ed25519 seed. Falls back to `AgentIdentity.default()`. | +| `SAGE_DEFAULT_DOMAIN` | `ii-agent` | Domain tag used for recall and propose when no per-call override is supplied. | +| `SAGE_RECALL_TOP_K` | `5` | Number of memories fetched per pre-hook recall. | +| `SAGE_PRE_HOOK_TIMEOUT_S`| `2.0` | Strict timeout (seconds) for the pre-hook recall. On timeout the agent turn proceeds with no context. | + +## Registering the integration + +```python +from ii_agent.agents.agent import IIAgent +from ii_agent.integrations.sage import register_sage_hooks + +agent = IIAgent( + user_id="user-123", + session_id="session-abc", + model=..., +) + +# Appends a pre-hook and a post-hook to the agent. No-op when SAGE_ENABLED is false. +register_sage_hooks(agent) +``` + +`register_sage_hooks` extends (not replaces) the agent's existing +`pre_hooks` / `post_hooks` lists, so it is safe to combine SAGE with +other observability or policy hooks. + +## Turn lifecycle + +1. **Pre-hook (blocking, bounded)** — before the model runs, the pre-hook + embeds the user's input and calls `AsyncSageClient.query()` for + semantically similar committed memories under `SAGE_DEFAULT_DOMAIN`. + Results are formatted into a context block and prepended to the + user's input so the model sees both. +2. **Post-hook (background)** — after the model responds, the post-hook + is scheduled as a FastAPI background task via + `@hook(run_in_background=True)`. It submits a concise observation of + the turn via `AsyncSageClient.propose()`. The agent response is + returned to the caller without waiting for BFT consensus. + +## Fallback behaviour + +The integration is defensive by design. **None** of the following +conditions block or fail an agent turn: + +- `SAGE_ENABLED` unset or `false` — `register_sage_hooks` is a no-op. +- `sage-agent-sdk` extra not installed — the integration logs a debug + message and falls back to a no-op. +- SAGE node unreachable — `is_available()` returns `False`; recall + returns an empty list; propose is skipped. +- Pre-hook recall times out (default 2 s) — the turn proceeds with no + injected context. +- Any SDK exception on recall or propose — caught, logged at debug level, + swallowed. + +If you need end-to-end correctness guarantees (e.g. an audit trail that +cannot skip turns), run the SAGE node in-process or behind a reverse +proxy with its own retry semantics — this integration deliberately +prioritises agent-turn latency over delivery guarantees. + +## Testing + +Unit tests live under `src/tests/unit/integrations/` and cover: + +- Hook registration is a no-op when `SAGE_ENABLED` is false. +- Hook registration appends two callables to the agent when enabled. +- Turn flow — pre-hook fetches recall, post-hook submits observation — + with the SDK mocked out end to end. diff --git a/pyproject.toml b/pyproject.toml index b8a513b8f..8c685f7ca 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -102,6 +102,9 @@ gaia = [ "termcolor>=3.0.1", "uvicorn[standard]>=0.29.0", ] +sage = [ + "sage-agent-sdk>=6.6.1", +] [build-system] requires = ["hatchling"] diff --git a/src/ii_agent/agents/agent.py b/src/ii_agent/agents/agent.py index c70aba4fe..9ceb78c7d 100644 --- a/src/ii_agent/agents/agent.py +++ b/src/ii_agent/agents/agent.py @@ -235,6 +235,15 @@ def __post_init__(self) -> None: for sub_agent in self.sub_agents: self._initialize_sub_agent(sub_agent) + # Opt-in SAGE persistent memory integration. Activated only when + # SAGE_ENABLED=true in the environment; otherwise a no-op. + try: + from ii_agent.integrations.sage import register_sage_hooks + + register_sage_hooks(self) + except Exception as exc: # noqa: BLE001 — integration must never block agent init + logger.debug(f"SAGE integration skipped: {exc}") + def _initialize_sub_agent(self, sub_agent: "IIAgent") -> None: """Initialize a sub-agent with shared context from parent.""" # Share session store if not set diff --git a/src/ii_agent/core/config/sage_config.py b/src/ii_agent/core/config/sage_config.py new file mode 100644 index 000000000..3ef5cf8cc --- /dev/null +++ b/src/ii_agent/core/config/sage_config.py @@ -0,0 +1,43 @@ +"""Configuration for the SAGE persistent memory integration. + +All settings are driven by environment variables so no secrets live in the +source tree. The integration is strictly opt-in via ``SAGE_ENABLED``. +""" + +from __future__ import annotations + +from pydantic_settings import BaseSettings + + +class SageConfig(BaseSettings): + """Settings for the SAGE memory integration. + + Attributes: + enabled: Master switch. When false (default) the integration is a + no-op and never contacts a SAGE node. + node_url: Base URL of the SAGE REST API (e.g. ``http://localhost:8090``). + agent_id: Optional agent identifier (display name) used on + ``register_agent``. If unset, registration is skipped. + agent_key: Optional filesystem path to a 32-byte Ed25519 seed. + Falls back to :meth:`AgentIdentity.default()` which respects + ``SAGE_IDENTITY_PATH``. + default_domain: Domain tag used for recall and propose when no + per-call override is supplied. + recall_top_k: Number of memories to fetch per pre-hook recall. + pre_hook_timeout_s: Strict timeout for the pre-hook recall. On + timeout the hook yields control immediately with no injected + context — the agent turn is never blocked. + """ + + enabled: bool = False + node_url: str = "http://localhost:8090" + agent_id: str | None = None + agent_key: str | None = None + default_domain: str = "ii-agent" + recall_top_k: int = 5 + pre_hook_timeout_s: float = 2.0 + + class Config: + env_prefix = "SAGE_" + env_file = ".env" + extra = "ignore" diff --git a/src/ii_agent/integrations/sage/__init__.py b/src/ii_agent/integrations/sage/__init__.py new file mode 100644 index 000000000..476b91c62 --- /dev/null +++ b/src/ii_agent/integrations/sage/__init__.py @@ -0,0 +1,31 @@ +"""SAGE persistent-memory integration for ii-agent. + +SAGE (Sovereign Agent Governed Experience) is a BFT-consensus memory layer +for AI agents. This integration wires an :class:`IIAgent` instance into a +SAGE node via the async SDK so each turn recalls prior memories before the +model runs and stores an observation after the turn completes. + +The integration is strictly opt-in: when ``SAGE_ENABLED`` is unset or +``false``, :func:`register_sage_hooks` is a no-op and the agent behaves as +if the integration did not exist. If the optional ``sage-agent-sdk`` +dependency is not installed the integration also falls back to a no-op +with a debug log — the framework continues to work end-to-end. + +Install extras:: + + pip install "ii-agent[sage]" + +Typical usage:: + + from ii_agent.agents.agent import IIAgent + from ii_agent.integrations.sage import register_sage_hooks + + agent = IIAgent(...) + register_sage_hooks(agent) +""" + +from __future__ import annotations + +from ii_agent.integrations.sage.registrar import register_sage_hooks + +__all__ = ["register_sage_hooks"] diff --git a/src/ii_agent/integrations/sage/client.py b/src/ii_agent/integrations/sage/client.py new file mode 100644 index 000000000..692cc9f2d --- /dev/null +++ b/src/ii_agent/integrations/sage/client.py @@ -0,0 +1,227 @@ +"""Async SAGE client wrapper with lazy initialisation and graceful degradation. + +The wrapper serves three purposes: + +1. Guard the optional ``sage-agent-sdk`` import so the framework keeps + working when the extra is not installed. +2. Provide a lightweight ``is_available()`` health probe that the hook + layer can call on every turn without paying a full SDK round-trip on + failure paths. +3. Cache the :class:`AgentIdentity` and the underlying + :class:`AsyncSageClient` so we don't regenerate keys or leak HTTP + connections across turns. +""" + +from __future__ import annotations + +import asyncio +from pathlib import Path +from typing import Any + +from ii_agent.core.config.sage_config import SageConfig +from ii_agent.core.logger import logger + +# Lazy-resolved references to optional SDK symbols. Populated on first use. +_AsyncSageClientCls: Any | None = None +_AgentIdentityCls: Any | None = None +_MemoryTypeEnum: Any | None = None +_SDK_AVAILABLE: bool | None = None + + +def _ensure_sdk() -> bool: + """Resolve the optional ``sage-agent-sdk`` symbols on first use. + + Returns ``True`` iff the SDK is importable. Subsequent calls are O(1). + """ + global _AsyncSageClientCls, _AgentIdentityCls, _MemoryTypeEnum, _SDK_AVAILABLE + if _SDK_AVAILABLE is not None: + return _SDK_AVAILABLE + try: + from sage_sdk.async_client import AsyncSageClient as _AsyncSdk + from sage_sdk.auth import AgentIdentity as _Identity + from sage_sdk.models import MemoryType as _MemType + + _AsyncSageClientCls = _AsyncSdk + _AgentIdentityCls = _Identity + _MemoryTypeEnum = _MemType + _SDK_AVAILABLE = True + except ImportError: + logger.debug("sage-agent-sdk not installed — SAGE integration disabled") + _SDK_AVAILABLE = False + return _SDK_AVAILABLE + + +class SageClient: + """Async SAGE facade with a stable API for the hook layer. + + The underlying :class:`AsyncSageClient` is created lazily on the first + call that actually needs it. Health probes use an independent + short-lived client via ``httpx`` so a misconfigured node never + poisons the cached client. + """ + + def __init__(self, config: SageConfig | None = None) -> None: + self._config = config or SageConfig() + self._sdk_client: Any | None = None + self._identity: Any | None = None + self._available: bool | None = None + + # ------------------------------------------------------------------ + # Config / identity + # ------------------------------------------------------------------ + + @property + def config(self) -> SageConfig: + return self._config + + @property + def agent_identity(self) -> Any | None: + """Return the cached :class:`AgentIdentity`, creating it if needed. + + Returns ``None`` when the SDK is unavailable or the integration is + disabled — callers must guard accordingly. + """ + if not self._config.enabled or not _ensure_sdk(): + return None + if self._identity is None: + assert _AgentIdentityCls is not None + key_path = self._config.agent_key + if key_path and Path(key_path).exists(): + self._identity = _AgentIdentityCls.from_file(key_path) + else: + self._identity = _AgentIdentityCls.default() + return self._identity + + # ------------------------------------------------------------------ + # Availability + # ------------------------------------------------------------------ + + async def is_available(self) -> bool: + """Return True iff SAGE is enabled, SDK installed, and node healthy. + + Uses a short-lived ``httpx`` request so the probe never stashes a + broken cached client on the instance. + """ + if not self._config.enabled: + return False + if not _ensure_sdk(): + return False + try: + import httpx + + async with httpx.AsyncClient(timeout=self._config.pre_hook_timeout_s) as http: + resp = await http.get(f"{self._config.node_url.rstrip('/')}/health") + ok = resp.status_code == 200 + self._available = ok + return ok + except Exception as exc: # noqa: BLE001 — health probe must never throw + logger.debug(f"SAGE health check failed: {exc}") + self._available = False + return False + + # ------------------------------------------------------------------ + # Lazy SDK client + # ------------------------------------------------------------------ + + async def _sdk(self) -> Any | None: + """Return the cached :class:`AsyncSageClient`, building it on demand.""" + if not self._config.enabled or not _ensure_sdk(): + return None + if self._sdk_client is None: + identity = self.agent_identity + if identity is None: + return None + assert _AsyncSageClientCls is not None + self._sdk_client = _AsyncSageClientCls( + base_url=self._config.node_url, + identity=identity, + timeout=max(self._config.pre_hook_timeout_s * 2, 5.0), + ) + return self._sdk_client + + async def aclose(self) -> None: + """Close the underlying HTTP client, if any.""" + if self._sdk_client is not None: + try: + await self._sdk_client.close() + except Exception as exc: # noqa: BLE001 + logger.debug(f"SAGE client close failed: {exc}") + finally: + self._sdk_client = None + + # ------------------------------------------------------------------ + # Memory operations + # ------------------------------------------------------------------ + + async def recall( + self, + text: str, + *, + domain: str | None = None, + top_k: int | None = None, + ) -> list[dict[str, Any]]: + """Recall semantically similar committed memories. + + Returns a list of ``{content, confidence, domain}`` dicts. Returns + an empty list on any error so callers can treat it as a pure + fallback to the agent's existing context. + """ + client = await self._sdk() + if client is None: + return [] + try: + embedding = await client.embed(text) + response = await client.query( + embedding=embedding, + domain_tag=domain or self._config.default_domain, + top_k=top_k or self._config.recall_top_k, + ) + results = getattr(response, "results", []) or [] + return [ + { + "content": getattr(r, "content", ""), + "confidence": getattr(r, "confidence_score", 0.0), + "domain": getattr(r, "domain_tag", ""), + } + for r in results + ] + except asyncio.CancelledError: + raise + except Exception as exc: # noqa: BLE001 — recall must never raise + logger.debug(f"SAGE recall failed: {exc}") + return [] + + async def propose( + self, + content: str, + *, + memory_type: str = "observation", + domain: str | None = None, + confidence: float = 0.80, + ) -> bool: + """Propose a memory to the SAGE network. + + Returns ``True`` on a clean submission, ``False`` otherwise. This + intentionally swallows exceptions so the post-hook background task + does not surface failures back into the agent run. + """ + client = await self._sdk() + if client is None or not content: + return False + try: + embedding = await client.embed(content) + assert _MemoryTypeEnum is not None + mt = getattr(_MemoryTypeEnum, memory_type, _MemoryTypeEnum.observation) + await client.propose( + content=content, + memory_type=mt, + domain_tag=domain or self._config.default_domain, + confidence=confidence, + embedding=embedding, + ) + return True + except asyncio.CancelledError: + raise + except Exception as exc: # noqa: BLE001 — propose must never raise + logger.debug(f"SAGE propose failed: {exc}") + return False diff --git a/src/ii_agent/integrations/sage/hooks.py b/src/ii_agent/integrations/sage/hooks.py new file mode 100644 index 000000000..644351cc6 --- /dev/null +++ b/src/ii_agent/integrations/sage/hooks.py @@ -0,0 +1,183 @@ +"""Pre- and post-hook callables that wire SAGE into the agent turn cycle. + +The pre-hook performs a semantic recall against the SAGE node and injects a +synthetic context message before the model runs. The post-hook stores the +turn observation. The post-hook is decorated with +``@hook(run_in_background=True)`` so AgentOS schedules it as a FastAPI +background task — the agent response is returned to the caller without +waiting for consensus. + +Both hooks are defensive: any failure, timeout, or missing dependency is +logged at debug level and swallowed. The agent turn is never blocked by +SAGE being slow, down, or misconfigured. +""" + +from __future__ import annotations + +import asyncio +from typing import TYPE_CHECKING, Any + +from ii_agent.agents.hooks.decorator import hook +from ii_agent.core.logger import logger +from ii_agent.integrations.sage.client import SageClient + +if TYPE_CHECKING: # pragma: no cover — type-checking only + from ii_agent.agents.runs import RunInput, RunOutput + + +# Marker stored on the RunInput so the post-hook can correlate what was +# injected into the prompt with what gets stored. Keeps the public API +# on RunInput untouched — we use attribute-setattr rather than a subclass. +_SAGE_CONTEXT_ATTR = "_sage_recalled_context" + + +def _format_recall_block(memories: list[dict[str, Any]]) -> str: + """Format a recall result set into a human-readable context block.""" + if not memories: + return "" + lines = ["[SAGE persistent memory — recalled context]"] + for m in memories: + confidence = float(m.get("confidence", 0.0) or 0.0) + content = str(m.get("content", "")).strip().replace("\n", " ") + if not content: + continue + lines.append(f"- [{confidence:.0%}] {content[:400]}") + if len(lines) == 1: # only the header survived + return "" + return "\n".join(lines) + + +def _extract_user_text(run_input: "RunInput | None") -> str: + """Extract a plain-text representation of the user's turn input.""" + if run_input is None: + return "" + try: + return run_input.input_content_string() or "" + except Exception as exc: # noqa: BLE001 + logger.debug(f"SAGE: failed to stringify run input: {exc}") + return "" + + +def _extract_response_text(run_output: "RunOutput | None") -> str: + """Extract a plain-text representation of the assistant's response.""" + if run_output is None or run_output.content is None: + return "" + content = run_output.content + if isinstance(content, str): + return content + try: + return str(content) + except Exception: # noqa: BLE001 + return "" + + +def make_sage_hooks( + client: SageClient, +) -> tuple[Any, Any]: + """Build the pre- and post-hook closures bound to ``client``. + + Returning a pair of closures (rather than module-level hooks) lets a + single process host multiple agents with different SAGE + configurations — each registration gets its own client. + """ + + @hook + async def sage_pre_hook(run_input: "RunInput", **_kwargs: Any) -> None: + """Recall SAGE memories and inject them into the run input. + + Uses a strict timeout (``SAGE_PRE_HOOK_TIMEOUT_S``, default 2s) so + a slow SAGE node never blocks the agent turn. Falls back to empty + recall on timeout or any exception. + """ + if not client.config.enabled: + return + + user_text = _extract_user_text(run_input) + if not user_text: + return + + try: + memories = await asyncio.wait_for( + client.recall(user_text), + timeout=client.config.pre_hook_timeout_s, + ) + except asyncio.TimeoutError: + logger.debug( + "SAGE pre-hook recall timed out after " + f"{client.config.pre_hook_timeout_s}s — proceeding without context" + ) + memories = [] + except Exception as exc: # noqa: BLE001 + logger.debug(f"SAGE pre-hook recall errored: {exc}") + memories = [] + + block = _format_recall_block(memories) + if not block: + return + + # Stash the recalled memories on the RunInput so the post-hook can + # reference them when storing the turn observation. + try: + setattr(run_input, _SAGE_CONTEXT_ATTR, memories) + except Exception: # noqa: BLE001 — dataclass with slots would block this + pass + + # Attach the context block to the turn input. We prepend it to the + # existing string rather than replacing it so the model still sees + # the user's original content verbatim. + # Inject the recalled context. The fast path handles plain strings + # (the dominant case for chat agents). For Message-shaped inputs + # we detect by duck-typing on `content` so we don't have to import + # the heavy framework module at hook-execution time. + try: + if isinstance(run_input.input_content, str): + run_input.input_content = f"{block}\n\n{run_input.input_content}" + elif hasattr(run_input.input_content, "content") and isinstance( + getattr(run_input.input_content, "content", None), (str, type(None)) + ): + existing = run_input.input_content.content or "" + run_input.input_content.content = f"{block}\n\n{existing}" + # For other shapes (list/dict/BaseModel), skip injection — + # they may not round-trip cleanly with a prepended block. + # TODO(follow-up): support list/dict input_content injection. + except Exception as exc: # noqa: BLE001 + logger.debug(f"SAGE pre-hook failed to inject context: {exc}") + + logger.debug(f"SAGE pre-hook injected {len(memories)} recalled memories") + + @hook(run_in_background=True) + async def sage_post_hook( + run_output: "RunOutput", + **_kwargs: Any, + ) -> None: + """Store a concise observation of the turn in SAGE. + + Runs in background via AgentOS so the agent response is returned + without waiting on consensus. Silently drops failures. + """ + if not client.config.enabled: + return + + user_text = _extract_user_text(run_output.input) + assistant_text = _extract_response_text(run_output) + if not user_text and not assistant_text: + return + + # Truncate aggressively — SAGE is for durable institutional memory, + # not transcripts. Keep the observation small and semantic. + observation = ( + f"ii-agent turn (agent={run_output.agent_name or 'unknown'}): " + f"user asked: {user_text[:400]}. " + f"assistant responded: {assistant_text[:400]}." + ) + + try: + await client.propose( + observation, + memory_type="observation", + confidence=0.80, + ) + except Exception as exc: # noqa: BLE001 — background task must not throw + logger.debug(f"SAGE post-hook propose failed: {exc}") + + return sage_pre_hook, sage_post_hook diff --git a/src/ii_agent/integrations/sage/registrar.py b/src/ii_agent/integrations/sage/registrar.py new file mode 100644 index 000000000..9a96bdb28 --- /dev/null +++ b/src/ii_agent/integrations/sage/registrar.py @@ -0,0 +1,63 @@ +"""Agent-side registration helper for the SAGE integration. + +The entrypoint is :func:`register_sage_hooks` which appends the pre- and +post-hooks onto a live :class:`IIAgent` instance. It honours existing +hooks on the agent (extending rather than replacing the lists) so host +applications can freely combine SAGE with their own observability hooks. +""" + +from __future__ import annotations + +from typing import TYPE_CHECKING + +from ii_agent.core.config.sage_config import SageConfig +from ii_agent.core.logger import logger +from ii_agent.integrations.sage.client import SageClient +from ii_agent.integrations.sage.hooks import make_sage_hooks + +if TYPE_CHECKING: # pragma: no cover — type-checking only + from ii_agent.agents.agent import IIAgent + + +def register_sage_hooks( + agent: "IIAgent", + *, + config: SageConfig | None = None, + client: SageClient | None = None, +) -> SageClient | None: + """Wire the SAGE pre- and post-hooks onto ``agent``. + + The integration is strictly opt-in. When ``SAGE_ENABLED`` is unset or + false, this is a no-op and returns ``None``. + + Args: + agent: A constructed :class:`IIAgent` instance. + config: Optional pre-built :class:`SageConfig`. Defaults to + environment-driven configuration. + client: Optional pre-built :class:`SageClient`. Useful for tests + that inject a stub. + + Returns: + The :class:`SageClient` bound to the hooks, or ``None`` when the + integration is disabled. + """ + cfg = config or SageConfig() + if not cfg.enabled: + logger.debug("SAGE integration disabled (SAGE_ENABLED is false or unset)") + return None + + sage_client = client or SageClient(cfg) + pre_hook, post_hook = make_sage_hooks(sage_client) + + # Extend existing hook lists rather than replacing them so callers can + # freely combine SAGE with their own hooks. + existing_pre = list(agent.pre_hooks or []) + existing_post = list(agent.post_hooks or []) + agent.pre_hooks = existing_pre + [pre_hook] + agent.post_hooks = existing_post + [post_hook] + + logger.info( + f"SAGE integration registered " + f"(node={cfg.node_url}, domain={cfg.default_domain})" + ) + return sage_client diff --git a/src/tests/unit/integrations/test_sage_integration.py b/src/tests/unit/integrations/test_sage_integration.py new file mode 100644 index 000000000..e474ae2c8 --- /dev/null +++ b/src/tests/unit/integrations/test_sage_integration.py @@ -0,0 +1,365 @@ +"""Unit tests for the SAGE persistent-memory integration. + +These tests exercise the public surface of +``ii_agent.integrations.sage`` without requiring the optional +``sage-agent-sdk`` to be installed: the SDK symbols are stubbed in +``sys.modules`` at import time and the hook layer is driven directly. +""" + +from __future__ import annotations + +import sys +import types +from dataclasses import dataclass, field +from typing import Any +from unittest.mock import AsyncMock + +import pytest + + +# --------------------------------------------------------------------------- +# Stub the optional sage_sdk modules so the wrapper can import cleanly +# regardless of whether the extra is installed on the test runner. +# --------------------------------------------------------------------------- + + +def _install_sage_sdk_stubs() -> dict[str, Any]: + """Install fake sage_sdk.* modules and return their namespaces.""" + + class FakeIdentity: + @classmethod + def default(cls) -> "FakeIdentity": + return cls() + + @classmethod + def from_file(cls, path: str) -> "FakeIdentity": + return cls() + + class FakeMemoryType: + observation = "observation" + fact = "fact" + inference = "inference" + + class FakeAsyncSageClient: + last_instance: "FakeAsyncSageClient | None" = None + + def __init__( + self, + base_url: str, + identity: Any, + timeout: float = 30.0, + ) -> None: + self.base_url = base_url + self.identity = identity + self.timeout = timeout + self.embed = AsyncMock(return_value=[0.1, 0.2, 0.3]) + self.query = AsyncMock() + self.propose = AsyncMock(return_value=None) + self.close = AsyncMock() + FakeAsyncSageClient.last_instance = self + + async_mod = types.ModuleType("sage_sdk.async_client") + async_mod.AsyncSageClient = FakeAsyncSageClient + + auth_mod = types.ModuleType("sage_sdk.auth") + auth_mod.AgentIdentity = FakeIdentity + + models_mod = types.ModuleType("sage_sdk.models") + models_mod.MemoryType = FakeMemoryType + + root = types.ModuleType("sage_sdk") + root.async_client = async_mod + root.auth = auth_mod + root.models = models_mod + + sys.modules["sage_sdk"] = root + sys.modules["sage_sdk.async_client"] = async_mod + sys.modules["sage_sdk.auth"] = auth_mod + sys.modules["sage_sdk.models"] = models_mod + + return { + "AsyncSageClient": FakeAsyncSageClient, + "AgentIdentity": FakeIdentity, + "MemoryType": FakeMemoryType, + } + + +_SDK_STUBS = _install_sage_sdk_stubs() + + +# Reset the cached SDK-resolution flag in the wrapper so the stubs actually +# get picked up on first import. +from ii_agent.integrations.sage import client as sage_client_module # noqa: E402 + +sage_client_module._SDK_AVAILABLE = None +sage_client_module._AsyncSageClientCls = None +sage_client_module._AgentIdentityCls = None +sage_client_module._MemoryTypeEnum = None + + +from ii_agent.core.config.sage_config import SageConfig # noqa: E402 +from ii_agent.integrations.sage import register_sage_hooks # noqa: E402 +from ii_agent.integrations.sage.client import SageClient # noqa: E402 +from ii_agent.integrations.sage.hooks import make_sage_hooks # noqa: E402 + + +# --------------------------------------------------------------------------- +# Lightweight fakes that avoid importing the full IIAgent class (which pulls +# in the entire framework). +# --------------------------------------------------------------------------- + + +@dataclass +class _FakeAgent: + pre_hooks: list = field(default_factory=list) + post_hooks: list = field(default_factory=list) + + +@dataclass +class _FakeRunInput: + input_content: str + def input_content_string(self) -> str: # mirrors the real dataclass API + return self.input_content + + +@dataclass +class _FakeRunOutput: + agent_name: str + content: str + input: _FakeRunInput + + +class _FakeQueryResponse: + def __init__(self, results: list[dict[str, Any]]): + class _R: + def __init__(self, c: str, conf: float, dom: str) -> None: + self.content = c + self.confidence_score = conf + self.domain_tag = dom + + self.results = [_R(r["content"], r["confidence"], r["domain"]) for r in results] + + +# --------------------------------------------------------------------------- +# register_sage_hooks +# --------------------------------------------------------------------------- + + +def test_register_sage_hooks_noop_when_disabled(): + agent = _FakeAgent() + result = register_sage_hooks(agent, config=SageConfig(enabled=False)) + assert result is None + assert agent.pre_hooks == [] + assert agent.post_hooks == [] + + +def test_register_sage_hooks_appends_two_hooks_when_enabled(): + agent = _FakeAgent(pre_hooks=[lambda: None], post_hooks=[]) + client = SageClient(SageConfig(enabled=True)) + returned = register_sage_hooks(agent, config=client.config, client=client) + + assert returned is client + assert len(agent.pre_hooks) == 2 # pre-existing + SAGE + assert len(agent.post_hooks) == 1 + # Names come through @wraps + assert agent.pre_hooks[-1].__name__ == "sage_pre_hook" + assert agent.post_hooks[-1].__name__ == "sage_post_hook" + + +def test_post_hook_is_marked_run_in_background(): + from ii_agent.agents.hooks.decorator import should_run_in_background + + client = SageClient(SageConfig(enabled=True)) + _pre, post = make_sage_hooks(client) + assert should_run_in_background(post) is True + + +# --------------------------------------------------------------------------- +# End-to-end turn flow: recall → inject → propose +# --------------------------------------------------------------------------- + + +@pytest.mark.asyncio +async def test_turn_flow_recalls_then_stores(monkeypatch): + """End-to-end turn: pre-hook recalls, injects, post-hook stores. + + The SDK layer is mocked out via the SageClient public API so this + test validates the hook wiring without coupling to internals of + AsyncSageClient. + """ + cfg = SageConfig(enabled=True, pre_hook_timeout_s=5.0) + client = SageClient(cfg) + pre_hook, post_hook = make_sage_hooks(client) + + seeded_results = [ + {"content": "Use ThreadSanitizer for C++ races", "confidence": 0.92, "domain": "ii-agent"}, + {"content": "Python races: prefer asyncio.Lock()", "confidence": 0.81, "domain": "ii-agent"}, + ] + + recall_calls: list[str] = [] + propose_calls: list[dict[str, Any]] = [] + + async def fake_recall(text: str, *, domain: str | None = None, top_k: int | None = None): + recall_calls.append(text) + return seeded_results + + async def fake_propose(content: str, **kwargs: Any) -> bool: + propose_calls.append({"content": content, **kwargs}) + return True + + monkeypatch.setattr(client, "recall", fake_recall) + monkeypatch.setattr(client, "propose", fake_propose) + + run_input = _FakeRunInput(input_content="how do I debug a race condition?") + await pre_hook(run_input=run_input) + + # Pre-hook should have prepended the recall block. + assert "[SAGE persistent memory" in run_input.input_content + assert "ThreadSanitizer" in run_input.input_content + assert "race condition" in run_input.input_content # original preserved + assert recall_calls == ["how do I debug a race condition?"] + + # Now simulate the post-hook storing the observation. + run_output = _FakeRunOutput( + agent_name="test-agent", + content="Start with ThreadSanitizer...", + input=run_input, + ) + await post_hook(run_output=run_output) + + assert len(propose_calls) == 1 + body = propose_calls[0] + assert body["memory_type"] == "observation" + assert "race condition" in body["content"] + assert "ThreadSanitizer" in body["content"] + + +@pytest.mark.asyncio +async def test_pre_hook_respects_timeout(monkeypatch): + cfg = SageConfig(enabled=True, pre_hook_timeout_s=0.01) + client = SageClient(cfg) + pre_hook, _post = make_sage_hooks(client) + + # Force recall to hang longer than the timeout. + async def slow_recall(*_a, **_k): + import asyncio + + await asyncio.sleep(0.5) + return [] + + monkeypatch.setattr(client, "recall", slow_recall) + + run_input = _FakeRunInput(input_content="hello") + await pre_hook(run_input=run_input) + + # Input was preserved verbatim; no recall block injected. + assert run_input.input_content == "hello" + + +@pytest.mark.asyncio +async def test_pre_hook_noop_when_disabled(): + cfg = SageConfig(enabled=False) + client = SageClient(cfg) + pre_hook, _post = make_sage_hooks(client) + + run_input = _FakeRunInput(input_content="hello") + await pre_hook(run_input=run_input) + assert run_input.input_content == "hello" + + +# --------------------------------------------------------------------------- +# Integration test (SDK mocked via respx) — exercises the real +# AsyncSageClient HTTP layer end-to-end, to prove the integration works +# against the actual wire protocol without needing a live SAGE node. +# --------------------------------------------------------------------------- + + +@pytest.mark.asyncio +async def test_turn_flow_against_mocked_sdk_http(monkeypatch): + """Drive the real sage_sdk AsyncSageClient through respx. + + This uses the real SDK (not stubs) when it is installed on the host; + the HTTP layer is mocked via respx so no live node is needed. + """ + respx = pytest.importorskip("respx") + httpx = pytest.importorskip("httpx") + + # The module-level stubs registered at import time would shadow the + # real SDK — remove them so we can import the real package. + for mod in ("sage_sdk", "sage_sdk.async_client", "sage_sdk.auth", "sage_sdk.models"): + sys.modules.pop(mod, None) + try: + from sage_sdk.async_client import AsyncSageClient as _RealAsync + from sage_sdk.auth import AgentIdentity as _RealIdentity + from sage_sdk.models import MemoryType as _RealMemType + except ImportError: + # Re-install the stubs the rest of the test module relies on and + # skip this one integration test. + _install_sage_sdk_stubs() + pytest.skip("sage-agent-sdk not installed — skipping HTTP-layer test") + + # Swap the wrapper's cached SDK references to the real classes. + # This test intentionally runs after the stub-driven tests (pytest + # runs in declaration order by default) so the stub reinstallation + # is unnecessary for the current file layout. + sage_client_module._AsyncSageClientCls = _RealAsync + sage_client_module._AgentIdentityCls = _RealIdentity + sage_client_module._MemoryTypeEnum = _RealMemType + sage_client_module._SDK_AVAILABLE = True + + cfg = SageConfig( + enabled=True, + node_url="http://sage.test", + default_domain="ii-agent", + pre_hook_timeout_s=5.0, + ) + client = SageClient(cfg) + pre_hook, post_hook = make_sage_hooks(client) + + query_response = { + "results": [ + { + "memory_id": "m-1", + "submitting_agent": "agent-hash", + "content": "Race conditions in asyncio come from unawaited tasks.", + "content_hash": "0xdeadbeef", + "memory_type": "observation", + "domain_tag": "ii-agent", + "confidence_score": 0.88, + "status": "committed", + "created_at": "2026-01-01T00:00:00Z", + } + ], + "next_cursor": None, + "total_count": 1, + } + + with respx.mock(base_url="http://sage.test", assert_all_called=False) as mock: + mock.post("/v1/embed").mock( + return_value=httpx.Response(200, json={"embedding": [0.0, 0.1, 0.2]}) + ) + mock.post("/v1/memory/query").mock( + return_value=httpx.Response(200, json=query_response) + ) + submit_route = mock.post("/v1/memory/submit").mock( + return_value=httpx.Response( + 200, + json={"memory_id": "m-2", "tx_hash": "0xabc", "status": "pending"}, + ) + ) + + run_input = _FakeRunInput(input_content="debug asyncio race") + await pre_hook(run_input=run_input) + + assert "[SAGE persistent memory" in run_input.input_content + assert "asyncio" in run_input.input_content + + run_output = _FakeRunOutput( + agent_name="integration-test", + content="Await all tasks you spawn.", + input=run_input, + ) + await post_hook(run_output=run_output) + + # Submit route was called exactly once (the post-hook's propose). + assert submit_route.called, "post-hook should have POSTed to /v1/memory/submit"