From 854969448c0a69754d45f7a2f4a6eae53c3e15c7 Mon Sep 17 00:00:00 2001 From: Mahmoud Mabrouk Date: Fri, 19 Jun 2026 18:27:53 +0200 Subject: [PATCH 1/2] feat(agent): agent workflow service and gateway tool-resolution API --- api/oss/src/apis/fastapi/tools/models.py | 44 +++- api/oss/src/apis/fastapi/tools/router.py | 89 +++++--- api/oss/src/core/tools/dtos.py | 44 +++- api/oss/src/core/tools/exceptions.py | 18 ++ api/oss/src/core/tools/service.py | 159 ++++++++++++++ api/oss/tests/pytest/unit/tools/__init__.py | 1 + .../unit/tools/test_agent_resolution.py | 79 +++++++ services/entrypoints/main.py | 2 + services/oss/src/agent/__init__.py | 13 ++ services/oss/src/agent/app.py | 161 ++++++++++++++ services/oss/src/agent/client.py | 63 ++++++ services/oss/src/agent/config.py | 72 +++++++ services/oss/src/agent/schemas.py | 82 +++++++ services/oss/src/agent/secrets.py | 72 +++++++ services/oss/src/agent/tools/__init__.py | 21 ++ services/oss/src/agent/tools/gateway.py | 195 +++++++++++++++++ services/oss/src/agent/tools/resolver.py | 85 ++++++++ services/oss/src/agent/tools/secrets.py | 65 ++++++ services/oss/src/agent/tracing.py | 85 ++++++++ .../oss/tests/pytest/integration/__init__.py | 1 + .../pytest/integration/agent/__init__.py | 1 + .../pytest/integration/agent/conftest.py | 76 +++++++ .../agent/test_resolve_secrets_http.py | 64 ++++++ .../integration/agent/tools/__init__.py | 1 + .../agent/tools/test_gateway_http.py | 170 +++++++++++++++ .../agent/tools/test_secrets_http.py | 39 ++++ services/oss/tests/pytest/unit/__init__.py | 1 + .../oss/tests/pytest/unit/agent/__init__.py | 1 + .../oss/tests/pytest/unit/agent/conftest.py | 112 ++++++++++ .../pytest/unit/agent/test_invoke_handler.py | 204 ++++++++++++++++++ .../pytest/unit/agent/test_secrets_mapping.py | 24 +++ .../pytest/unit/agent/test_select_backend.py | 61 ++++++ .../tests/pytest/unit/agent/tools/__init__.py | 1 + .../unit/agent/tools/test_gateway_mapping.py | 19 ++ .../unit/agent/tools/test_resolution.py | 86 ++++++++ 35 files changed, 2178 insertions(+), 33 deletions(-) create mode 100644 api/oss/tests/pytest/unit/tools/__init__.py create mode 100644 api/oss/tests/pytest/unit/tools/test_agent_resolution.py create mode 100644 services/oss/src/agent/__init__.py create mode 100644 services/oss/src/agent/app.py create mode 100644 services/oss/src/agent/client.py create mode 100644 services/oss/src/agent/config.py create mode 100644 services/oss/src/agent/schemas.py create mode 100644 services/oss/src/agent/secrets.py create mode 100644 services/oss/src/agent/tools/__init__.py create mode 100644 services/oss/src/agent/tools/gateway.py create mode 100644 services/oss/src/agent/tools/resolver.py create mode 100644 services/oss/src/agent/tools/secrets.py create mode 100644 services/oss/src/agent/tracing.py create mode 100644 services/oss/tests/pytest/integration/__init__.py create mode 100644 services/oss/tests/pytest/integration/agent/__init__.py create mode 100644 services/oss/tests/pytest/integration/agent/conftest.py create mode 100644 services/oss/tests/pytest/integration/agent/test_resolve_secrets_http.py create mode 100644 services/oss/tests/pytest/integration/agent/tools/__init__.py create mode 100644 services/oss/tests/pytest/integration/agent/tools/test_gateway_http.py create mode 100644 services/oss/tests/pytest/integration/agent/tools/test_secrets_http.py create mode 100644 services/oss/tests/pytest/unit/__init__.py create mode 100644 services/oss/tests/pytest/unit/agent/__init__.py create mode 100644 services/oss/tests/pytest/unit/agent/conftest.py create mode 100644 services/oss/tests/pytest/unit/agent/test_invoke_handler.py create mode 100644 services/oss/tests/pytest/unit/agent/test_secrets_mapping.py create mode 100644 services/oss/tests/pytest/unit/agent/test_select_backend.py create mode 100644 services/oss/tests/pytest/unit/agent/tools/__init__.py create mode 100644 services/oss/tests/pytest/unit/agent/tools/test_gateway_mapping.py create mode 100644 services/oss/tests/pytest/unit/agent/tools/test_resolution.py diff --git a/api/oss/src/apis/fastapi/tools/models.py b/api/oss/src/apis/fastapi/tools/models.py index 891b276c22..86c62dcec6 100644 --- a/api/oss/src/apis/fastapi/tools/models.py +++ b/api/oss/src/apis/fastapi/tools/models.py @@ -1,6 +1,12 @@ -from typing import List, Optional, Union +from typing import Any, List, Optional, Union -from pydantic import BaseModel +from agenta.sdk.agents.tools import ( + BuiltinToolConfig, + GatewayToolConfig, + ToolConfigurationError, + coerce_tool_configs, +) +from pydantic import BaseModel, Field, field_validator from oss.src.core.tools.dtos import ( # Tool Catalog @@ -15,6 +21,9 @@ ToolConnectionCreate, # Tool Calls ToolResult, + # Agent tools + AgentToolReference, + ResolvedAgentTool, ) @@ -87,3 +96,34 @@ class ToolConnectionsResponse(BaseModel): class ToolCallResponse(BaseModel): call: ToolResult + + +# --------------------------------------------------------------------------- +# Agent tool resolution +# --------------------------------------------------------------------------- + + +class ToolResolveRequest(BaseModel): + tools: List[AgentToolReference] = Field(default_factory=list) + + @field_validator("tools", mode="before") + @classmethod + def _coerce_tools(cls, value: Any) -> List[AgentToolReference]: + try: + configs = coerce_tool_configs(value or []).tool_configs + except ToolConfigurationError as exc: + raise ValueError(str(exc)) from exc + unsupported = [ + config + for config in configs + if not isinstance(config, (BuiltinToolConfig, GatewayToolConfig)) + ] + if unsupported: + raise ValueError("/tools/resolve accepts only builtin and gateway tools") + return configs + + +class ToolResolveResponse(BaseModel): + count: int = 0 + builtins: List[str] = Field(default_factory=list) + custom: List[ResolvedAgentTool] = Field(default_factory=list) diff --git a/api/oss/src/apis/fastapi/tools/router.py b/api/oss/src/apis/fastapi/tools/router.py index 043d114fa7..3cc689a055 100644 --- a/api/oss/src/apis/fastapi/tools/router.py +++ b/api/oss/src/apis/fastapi/tools/router.py @@ -29,6 +29,9 @@ ToolConnectionsResponse, # ToolCallResponse, + # + ToolResolveRequest, + ToolResolveResponse, ) from oss.src.core.shared.dtos import Status @@ -42,10 +45,12 @@ ToolResultData, ) from oss.src.core.tools.exceptions import ( + ActionNotFoundError, AdapterError, ConnectionInactiveError, ConnectionInvalidError, ConnectionNotFoundError, + ToolSlugInvalidError, ) from oss.src.core.tools.service import ( ToolsService, @@ -208,6 +213,14 @@ def __init__( ) # --- Tool operations --- + self.router.add_api_route( + "/resolve", + self.resolve_tools, + methods=["POST"], + operation_id="resolve_agent_tools", + response_model=ToolResolveResponse, + response_model_exclude_none=True, + ) self.router.add_api_route( "/call", self.call_tool, @@ -886,6 +899,51 @@ async def callback_connection( # Tool Calls # ----------------------------------------------------------------------- + @intercept_exceptions() + @handle_adapter_exceptions() + async def resolve_tools( + self, + request: Request, + *, + body: ToolResolveRequest, + ) -> ToolResolveResponse: + """Resolve an agent's tool references into model-ready specs. + + Validates Composio connections up front and enriches each action from the + catalog, so a running agent (e.g. Pi) gets ``customTools`` whose ``execute`` + routes back through ``POST /tools/call`` — provider keys stay server-side. + """ + if is_ee(): + has_permission = await check_action_access( + user_uid=request.state.user_id, + project_id=request.state.project_id, + permission=Permission.VIEW_TOOLS, + ) + if not has_permission: + raise FORBIDDEN_EXCEPTION + + try: + resolution = await self.tools_service.resolve_agent_tools( + project_id=UUID(request.state.project_id), + tools=body.tools, + ) + except ConnectionNotFoundError as e: + raise HTTPException(status_code=404, detail=e.message) from e + except ConnectionInactiveError as e: + raise HTTPException(status_code=400, detail=e.message) from e + except ConnectionInvalidError as e: + raise HTTPException(status_code=400, detail=e.message) from e + except ToolSlugInvalidError as e: + raise HTTPException(status_code=400, detail=e.message) from e + except ActionNotFoundError as e: + raise HTTPException(status_code=404, detail=e.message) from e + + return ToolResolveResponse( + count=len(resolution.builtins) + len(resolution.custom), + builtins=resolution.builtins, + custom=resolution.custom, + ) + @intercept_exceptions() @handle_adapter_exceptions() async def call_tool( @@ -931,39 +989,12 @@ async def call_tool( connection_slug = slug_parts[4] try: - connections = await self.tools_service.query_connections( + connection = await self.tools_service.resolve_connection_by_slug( project_id=UUID(request.state.project_id), provider_key=provider_key, integration_key=integration_key, + connection_slug=connection_slug, ) - - connection = next( - (c for c in connections if c.slug == connection_slug), None - ) - - if not connection: - raise ConnectionNotFoundError( - connection_slug=connection_slug, - provider_key=provider_key, - integration_key=integration_key, - ) - - if not connection.is_active: - raise ConnectionInactiveError(connection_id=connection_slug) - - if not connection.is_valid: - raise ConnectionInvalidError( - connection_slug=connection_slug, - detail="Please refresh the connection.", - ) - - if not connection.provider_connection_id: - raise ConnectionNotFoundError( - connection_slug=connection_slug, - provider_key=provider_key, - integration_key=integration_key, - ) - except ConnectionNotFoundError as e: raise HTTPException(status_code=404, detail=e.message) from e except ConnectionInactiveError as e: diff --git a/api/oss/src/core/tools/dtos.py b/api/oss/src/core/tools/dtos.py index a588965f61..ad4d105bdd 100644 --- a/api/oss/src/core/tools/dtos.py +++ b/api/oss/src/core/tools/dtos.py @@ -1,8 +1,9 @@ from enum import Enum -from typing import Any, Dict, List, Optional +from typing import Any, Dict, List, Optional, Union +from agenta.sdk.agents.tools import BuiltinToolConfig, GatewayToolConfig from agenta.sdk.models.workflows import JsonSchemas -from pydantic import BaseModel +from pydantic import BaseModel, Field from oss.src.core.shared.dtos import ( Header, @@ -238,3 +239,42 @@ class ToolExecutionResponse(BaseModel): data: Optional[Json] = None error: Optional[str] = None successful: bool = False + + +# --------------------------------------------------------------------------- +# Agent tools (config references + resolution) +# --------------------------------------------------------------------------- + +# A provider-agnostic list of tool references lives under an agent revision's +# ``parameters["tools"]``. Each entry is a discriminated union on ``type``: config +# holds references and display metadata only, never secrets. The backend resolves +# them into model-ready specs at invoke time (see ToolsService.resolve_agent_tools). + + +AgentBuiltinTool = BuiltinToolConfig +AgentComposioTool = GatewayToolConfig +AgentToolReference = Union[BuiltinToolConfig, GatewayToolConfig] + + +class ResolvedAgentTool(BaseModel): + """A runnable reference resolved into a model-ready tool spec. + + ``call_ref`` is the ``tools.{provider}.{integration}.{action}.{connection}`` slug + the execution bridge sends back to ``POST /tools/call``. + """ + + name: str + description: Optional[str] = None + input_schema: Optional[Dict[str, Any]] = None + call_ref: str + + +class AgentToolsResolution(BaseModel): + """Outcome of resolving an agent's ``tools`` list. + + ``builtins`` pass straight into Pi's ``tools: string[]``; ``custom`` become Pi + ``customTools`` whose ``execute`` routes through ``/tools/call``. + """ + + builtins: List[str] = Field(default_factory=list) + custom: List[ResolvedAgentTool] = Field(default_factory=list) diff --git a/api/oss/src/core/tools/exceptions.py b/api/oss/src/core/tools/exceptions.py index f46c08b6cd..e9dbd54f3f 100644 --- a/api/oss/src/core/tools/exceptions.py +++ b/api/oss/src/core/tools/exceptions.py @@ -40,6 +40,24 @@ def __init__( super().__init__(msg) +class ActionNotFoundError(ToolsError): + """Raised when a catalog action cannot be found for an integration.""" + + def __init__( + self, + *, + provider_key: str, + integration_key: str, + action_key: str, + ): + self.provider_key = provider_key + self.integration_key = integration_key + self.action_key = action_key + super().__init__( + f"Action not found: {provider_key}/{integration_key}/{action_key}" + ) + + class ConnectionSlugConflictError(ToolsError): """Raised when a connection slug already exists for the integration.""" diff --git a/api/oss/src/core/tools/service.py b/api/oss/src/core/tools/service.py index f603bc4d42..a9e1e4c779 100644 --- a/api/oss/src/core/tools/service.py +++ b/api/oss/src/core/tools/service.py @@ -1,3 +1,4 @@ +import re from typing import Any, Dict, List, Optional, Tuple from uuid import UUID @@ -6,6 +7,11 @@ from oss.src.core.tools.utils import make_oauth_state from oss.src.core.tools.dtos import ( + AgentBuiltinTool, + AgentComposioTool, + AgentToolReference, + AgentToolsResolution, + ResolvedAgentTool, ToolCatalogAction, ToolCatalogActionDetails, ToolCatalogIntegration, @@ -15,17 +21,27 @@ ToolConnectionRequest, ToolExecutionRequest, ToolExecutionResponse, + ToolProviderKind, ) from oss.src.core.tools.interfaces import ( ToolsDAOInterface, ) from oss.src.core.tools.registry import ToolsGatewayRegistry from oss.src.core.tools.exceptions import ( + ActionNotFoundError, ConnectionInactiveError, + ConnectionInvalidError, ConnectionNotFoundError, + ToolSlugInvalidError, ) +# A slug segment is safe for the ``tools.{provider}.{integration}.{action}.{connection}`` +# call-ref. ``__`` is forbidden because ``/tools/call`` round-trips ``__`` <-> ``.`` when +# parsing function names, so a ``__`` inside a segment would corrupt the split. +_SLUG_SEGMENT_RE = re.compile(r"^[a-zA-Z0-9-]+(?:_[a-zA-Z0-9-]+)*$") + + log = get_module_logger(__name__) @@ -408,3 +424,146 @@ async def execute_tool( arguments=arguments, ), ) + + # ----------------------------------------------------------------------- + # Connection resolution (shared by the call endpoint and the agent resolver) + # ----------------------------------------------------------------------- + + async def resolve_connection_by_slug( + self, + *, + project_id: UUID, + provider_key: str, + integration_key: str, + connection_slug: str, + ) -> ToolConnection: + """Resolve a project-scoped connection slug to a usable connection row. + + Raises a domain exception when the connection is missing, inactive, invalid, + or never finished its provider handshake. Shared by ``call_tool`` (execution) + and ``resolve_agent_tools`` (up-front validation). + """ + # Query all (not active-only) so an inactive connection yields a precise + # "inactive" error instead of an indistinguishable "not found". + connections = await self.query_connections( + project_id=project_id, + provider_key=provider_key, + integration_key=integration_key, + is_active=None, + ) + + connection = next( + (c for c in connections if c.slug == connection_slug), + None, + ) + + if not connection: + raise ConnectionNotFoundError( + provider_key=provider_key, + integration_key=integration_key, + connection_slug=connection_slug, + ) + + if not connection.is_active: + raise ConnectionInactiveError(connection_id=connection_slug) + + if not connection.is_valid: + raise ConnectionInvalidError( + connection_slug=connection_slug, + detail="Please refresh the connection.", + ) + + if not connection.provider_connection_id: + raise ConnectionNotFoundError( + provider_key=provider_key, + integration_key=integration_key, + connection_slug=connection_slug, + ) + + return connection + + # ----------------------------------------------------------------------- + # Agent tool resolution + # ----------------------------------------------------------------------- + + async def resolve_agent_tools( + self, + *, + project_id: UUID, + tools: List[AgentToolReference], + ) -> AgentToolsResolution: + """Resolve an agent's tool references into model-ready specs. + + ``builtin`` references pass through as names. ``composio`` references are + validated against the project's connections up front and enriched from the + catalog (description + input schema), so the model never sees a stale schema + and the invoke fails fast on a missing/invalid connection rather than mid-loop. + """ + builtins: List[str] = [] + custom: List[ResolvedAgentTool] = [] + + for ref in tools: + if isinstance(ref, AgentBuiltinTool): + if ref.name: + builtins.append(ref.name) + continue + + if isinstance(ref, AgentComposioTool): + custom.append( + await self._resolve_composio_tool( + project_id=project_id, + ref=ref, + ) + ) + + return AgentToolsResolution(builtins=builtins, custom=custom) + + async def _resolve_composio_tool( + self, + *, + project_id: UUID, + ref: AgentComposioTool, + ) -> ResolvedAgentTool: + provider_key = ToolProviderKind.COMPOSIO.value + + for segment in (ref.integration, ref.action, ref.connection): + if not _SLUG_SEGMENT_RE.match(segment): + raise ToolSlugInvalidError( + slug=f"{provider_key}.{ref.integration}.{ref.action}.{ref.connection}", + detail=f"Invalid slug segment: {segment!r}", + ) + + # Fail fast if the connection is missing/inactive/invalid for this project. + await self.resolve_connection_by_slug( + project_id=project_id, + provider_key=provider_key, + integration_key=ref.integration, + connection_slug=ref.connection, + ) + + action = await self.get_action( + provider_key=provider_key, + integration_key=ref.integration, + action_key=ref.action, + ) + if not action: + raise ActionNotFoundError( + provider_key=provider_key, + integration_key=ref.integration, + action_key=ref.action, + ) + + input_schema = ( + action.schemas.inputs if action.schemas and action.schemas.inputs else None + ) + name = ref.name or f"{ref.integration}__{ref.action}" + call_ref = ( + f"tools.{provider_key}.{ref.integration}.{ref.action}.{ref.connection}" + ) + + return ResolvedAgentTool( + name=name, + description=action.description, + input_schema=input_schema, + call_ref=call_ref, + ) diff --git a/api/oss/tests/pytest/unit/tools/__init__.py b/api/oss/tests/pytest/unit/tools/__init__.py new file mode 100644 index 0000000000..8b13789179 --- /dev/null +++ b/api/oss/tests/pytest/unit/tools/__init__.py @@ -0,0 +1 @@ + diff --git a/api/oss/tests/pytest/unit/tools/test_agent_resolution.py b/api/oss/tests/pytest/unit/tools/test_agent_resolution.py new file mode 100644 index 0000000000..12ad49266a --- /dev/null +++ b/api/oss/tests/pytest/unit/tools/test_agent_resolution.py @@ -0,0 +1,79 @@ +from __future__ import annotations + +from types import SimpleNamespace +from uuid import uuid4 + +import pytest +from pydantic import ValidationError + +from agenta.sdk.agents.tools import BuiltinToolConfig, GatewayToolConfig + +from oss.src.apis.fastapi.tools.models import ToolResolveRequest +from oss.src.core.tools.dtos import AgentBuiltinTool, AgentComposioTool +from oss.src.core.tools.service import ToolsService + + +def test_api_reuses_sdk_tool_config_classes(): + assert AgentBuiltinTool is BuiltinToolConfig + assert AgentComposioTool is GatewayToolConfig + + +def test_resolve_request_coerces_legacy_composio_shape(): + request = ToolResolveRequest( + tools=[ + "read", + { + "type": "composio", + "integration": "github", + "action": "GET_USER", + "connection": "c1", + }, + ] + ) + assert isinstance(request.tools[0], BuiltinToolConfig) + assert isinstance(request.tools[1], GatewayToolConfig) + + +def test_resolve_request_rejects_non_gateway_runtime_tools(): + with pytest.raises(ValidationError, match="only builtin and gateway"): + ToolResolveRequest( + tools=[ + { + "type": "code", + "name": "calc", + "script": "...", + } + ] + ) + + +async def test_api_resolution_returns_stable_call_reference(monkeypatch): + service = object.__new__(ToolsService) + + async def _connection(**_kwargs): + return object() + + async def _action(**_kwargs): + return SimpleNamespace( + description="Get user", + schemas=SimpleNamespace( + inputs={"type": "object", "properties": {}}, + ), + ) + + monkeypatch.setattr(service, "resolve_connection_by_slug", _connection) + monkeypatch.setattr(service, "get_action", _action) + + result = await service.resolve_agent_tools( + project_id=uuid4(), + tools=[ + BuiltinToolConfig(name="read"), + GatewayToolConfig( + integration="github", + action="GET_USER", + connection="c1", + ), + ], + ) + assert result.builtins == ["read"] + assert result.custom[0].call_ref == "tools.composio.github.GET_USER.c1" diff --git a/services/entrypoints/main.py b/services/entrypoints/main.py index 72cc291dfb..f52ac69ed8 100644 --- a/services/entrypoints/main.py +++ b/services/entrypoints/main.py @@ -43,6 +43,7 @@ ) from oss.src.chat import chat_v0_app from oss.src.completion import completion_v0_app +from oss.src.agent import agent_v0_app from entrypoints.legacy import register_legacy_routes @@ -134,6 +135,7 @@ async def health(): app.mount("/chat/v0", chat_v0_app) app.mount("/completion/v0", completion_v0_app) +app.mount("/agent/v0", agent_v0_app) register_legacy_routes( app=app, diff --git a/services/oss/src/agent/__init__.py b/services/oss/src/agent/__init__.py new file mode 100644 index 0000000000..8a1b875183 --- /dev/null +++ b/services/oss/src/agent/__init__.py @@ -0,0 +1,13 @@ +"""The Agenta agent workflow app and its glue. + +The handler and backend wiring are in ``app``; tool resolution in ``tools``; provider +secrets in ``secrets``; trace/usage glue in ``tracing``; the ``/inspect`` schemas in +``schemas``; the file-backed defaults in ``config``. The engine-agnostic runtime (the +backend/environment/harness ports and their adapters) lives in the SDK at +``agenta.sdk.agents``; this package is the thin Agenta integration that feeds it resolved +tools, vault secrets, and a trace context. +""" + +from oss.src.agent.app import agent_v0_app, create_agent_app + +__all__ = ["agent_v0_app", "create_agent_app"] diff --git a/services/oss/src/agent/app.py b/services/oss/src/agent/app.py new file mode 100644 index 0000000000..ac1bdbcc1b --- /dev/null +++ b/services/oss/src/agent/app.py @@ -0,0 +1,161 @@ +"""Agent workflow app: the ``/invoke`` handler, wired onto the SDK agent runtime. + +Mirrors the chat/completion services: an Agenta app exposing ``/invoke`` and ``/inspect`` +through ``ag.create_app`` + ``ag.workflow`` + ``ag.route``. The handler parses the request +into a neutral ``AgentConfig`` + ``RunSelection`` (``agenta.sdk.agents``), resolves tools +(``tools``) and provider secrets (``secrets``) server-side, threads the trace context +(``tracing``), then runs one turn through a :class:`Harness` over a backend it picks from +the selection, and records the run's usage. + +The backend (rivet over ACP vs the in-process Pi path) and the transport (HTTP sidecar vs +subprocess) are deployment choices; the harness, sandbox, and permission policy are editable +playground config. +""" + +import os +from typing import Any, Dict, List, Optional + +import agenta as ag + +from agenta.sdk.agents import ( + AgentConfig, + Backend, + Environment, + InProcessPiBackend, + RivetBackend, + RunSelection, + SessionConfig, + make_harness, + to_messages, +) +from agenta.sdk.agents.adapters.vercel import agent_run_to_vercel_parts + +from oss.src.agent.config import load_config, wrapper_dir +from oss.src.agent.schemas import AGENT_SCHEMAS +from oss.src.agent.secrets import resolve_harness_secrets +from oss.src.agent.tools import resolve_agent_resources +from oss.src.agent.tracing import record_usage, trace_context + + +def _default_agent_config() -> AgentConfig: + """The service's file defaults (AGENTS.md, model, tools) as a neutral AgentConfig.""" + file_cfg = load_config() + return AgentConfig( + instructions=file_cfg.agents_md, + model=file_cfg.model, + tools=file_cfg.tools, + ) + + +def select_backend(selection: RunSelection) -> Backend: + """Pick the backend for a run. + + The in-process Pi backend runs Pi locally, and the Agenta harness is Pi with an opinion, + so both ``pi`` and ``agenta`` stay on it. Any other harness, a non-local sandbox, or + ``AGENTA_AGENT_RUNTIME=rivet`` selects the rivet backend instead of silently dropping the + choice (``agenta`` is not yet supported on the rivet path, so ``agenta`` + a non-local + sandbox raises ``UnsupportedHarnessError`` rather than running the wrong thing). The + transport to the TypeScript runner is a deployment detail each backend takes: + ``AGENTA_AGENT_PI_URL`` set (docker) -> HTTP to the sidecar; unset (local checkout) -> + spawn the runner CLI from the wrapper dir. + """ + runtime = os.getenv("AGENTA_AGENT_RUNTIME", "pi").lower() + url = os.getenv("AGENTA_AGENT_PI_URL") + cwd = str(wrapper_dir()) + use_rivet = ( + runtime == "rivet" + or selection.harness not in ("pi", "agenta") + or selection.sandbox != "local" + ) + if use_rivet: + return RivetBackend(sandbox=selection.sandbox, url=url, cwd=cwd) + return InProcessPiBackend(url=url, cwd=cwd) + + +async def _agent( + inputs: Optional[Dict[str, Any]] = None, + messages: Optional[List[Any]] = None, + parameters: Optional[Dict] = None, + stream: Optional[bool] = None, + session_id: Optional[str] = None, +): + params = parameters or {} + + agent_config = AgentConfig.from_params(params, defaults=_default_agent_config()) + selection = RunSelection.from_params( + params, + default_harness=os.getenv("AGENTA_AGENT_HARNESS", "pi"), + default_sandbox=os.getenv("AGENTA_AGENT_SANDBOX", "local"), + ) + + msgs = to_messages(messages or (inputs or {}).get("messages") or []) + resources = await resolve_agent_resources( + tools=agent_config.tools, + mcp_servers=agent_config.mcp_servers, + ) + + session_config = SessionConfig( + agent=agent_config, + secrets=await resolve_harness_secrets(), + permission_policy=selection.permission_policy, + trace=trace_context(), + session_id=session_id, + builtin_names=resources.tools.builtin_names, + tool_specs=resources.tools.tool_specs, + tool_callback=resources.tools.tool_callback, + mcp_servers=resources.mcp_servers, + ) + + # The harness validates that the chosen backend can drive it. Unsupported combinations + # such as `agenta` on rivet fail here instead of silently changing runtime behavior. + # setup/cleanup own the backend lifecycle; prompt/stream run one cold turn. + harness = make_harness(selection.harness, Environment(select_backend(selection))) + + # The `/messages` SSE path sets `stream`: return the Vercel UI Message Stream as an async + # generator (the normalizer turns it into a streaming response). `/invoke` and the + # `/messages` JSON path leave it unset and take the batch path below. + if stream: + return _agent_vercel_stream(harness, session_config, msgs) + + await harness.setup() + try: + result = await harness.prompt(session_config, msgs) + finally: + await harness.cleanup() + + record_usage(result.usage) + return {"role": "assistant", "content": result.output} + + +async def _agent_vercel_stream(harness, session_config, msgs): + """Run one streaming turn and yield Vercel UI Message Stream parts. + + Owns the environment lifecycle (``setup`` / ``cleanup``); the per-turn session is torn + down by the ``AgentRun``'s own cleanup hook when the stream drains. The ``session_id`` is + stamped onto the stream's ``start`` part by the endpoint, so it is not threaded here. + """ + await harness.setup() + try: + run = await harness.stream(session_config, msgs) + async for part in agent_run_to_vercel_parts(run): + yield part + try: + record_usage(run.result().usage) + except Exception: # result unavailable on a failed/aborted stream + pass + finally: + await harness.cleanup() + + +def create_agent_app(): + app = ag.create_app() + # No builtin URI yet: registering the agent as a first-class workflow type + # (`agenta:builtin:agent:v0`) is still future work. Here we register the handler + # directly, so it gets an auto URI (`user:custom:...`) and runs locally. + routed = ag.workflow(schemas=AGENT_SCHEMAS)(_agent) + # is_agent gates the agent-only `/messages` + `/load-session` routes (next to /invoke). + ag.route("/", app=app, flags={"is_chat": True, "is_agent": True})(routed) + return app + + +agent_v0_app = create_agent_app() diff --git a/services/oss/src/agent/client.py b/services/oss/src/agent/client.py new file mode 100644 index 0000000000..59ec7969b4 --- /dev/null +++ b/services/oss/src/agent/client.py @@ -0,0 +1,63 @@ +"""Access to the Agenta backend from inside a harness run. + +Resolving the backend base URL and the caller-scoped credential is shared by the tool +resolver and the secret resolver, so it lives here. The credential reuses the same +propagation the OTLP export rides on, so an agent run calls ``/tools/resolve``, +``/tools/call``, and ``/secrets/`` as the caller, not with broader rights. +""" + +import os +from typing import Optional + +import agenta as ag +from agenta.sdk.engines.tracing.propagation import inject + +# Budget for a backend round-trip (the tool catalog/connection check, the vault fetch). +TOOLS_TIMEOUT = float(os.getenv("AGENTA_AGENT_TOOLS_TIMEOUT", "30")) + + +def agenta_api_base() -> Optional[str]: + """Resolve the Agenta backend base URL (``.../api``). + + Prefers an explicit override, then derives it from the OTLP endpoint the SDK is + configured with (``{host}/api/otlp/v1/traces``), then falls back to env. Returns + ``None`` when nothing is configured; callers only need this when tools or secrets apply. + """ + override = os.getenv("AGENTA_AGENT_TOOLS_API_URL") + if override: + return override.rstrip("/") + + try: + otlp_url = ag.tracing.otlp_url + except Exception: # pylint: disable=broad-except + otlp_url = None + if otlp_url and "/otlp/" in otlp_url: + return otlp_url.split("/otlp/", 1)[0].rstrip("/") + + api_url = os.getenv("AGENTA_API_URL") + if api_url: + return api_url.rstrip("/") + + return None + + +def request_authorization() -> Optional[str]: + """The project-scoped credential to call the Agenta backend. + + Reuses the same propagation the OTLP credential rides on (the caller's Authorization), + falling back to the service's own API key the way the tracing sidecar does. Scoping to + the caller keeps an agent run from invoking tools the user could not (WP-7 risk: + RUN_TOOLS scoping). + """ + try: + authorization = inject({}).get("Authorization") + except Exception: # pylint: disable=broad-except + authorization = None + if authorization: + return authorization + + api_key = os.getenv("AGENTA_API_KEY") + if api_key: + return f"ApiKey {api_key}" + + return None diff --git a/services/oss/src/agent/config.py b/services/oss/src/agent/config.py new file mode 100644 index 0000000000..b8efb693dc --- /dev/null +++ b/services/oss/src/agent/config.py @@ -0,0 +1,72 @@ +"""Hardcoded MVP agent config, read from ``services/agent/config``. + +The config (AGENTS.md text, model, tools) lives in editable files so changing the +agent does not need a code change. Paths can be overridden with env vars for Docker +or alternate layouts. +""" + +import json +import os +from dataclasses import dataclass, field +from pathlib import Path +from typing import Any, List, Optional + +# services/oss/src/agent/config.py -> parents[3] == services/ +_SERVICES_DIR = Path(__file__).resolve().parents[3] +_DEFAULT_AGENT_DIR = _SERVICES_DIR / "agent" + +# Fallback config used when the editable files are missing or a field is absent. +# Kept in sync with the catalog template and the `/inspect` schema defaults +# (schemas.py: _DEFAULT_MODEL / _DEFAULT_AGENTS_MD). +DEFAULT_MODEL = "gpt-5.5" +DEFAULT_AGENTS_MD = ( + "You are a friendly hello-world agent running on the Agenta agent service.\n\n" + "- Greet the user warmly.\n" + "- Answer the user's message in one or two short sentences." +) + + +@dataclass +class AgentConfig: + agents_md: str + model: Optional[str] = None + # Provider-agnostic tool references (WP-7). Each entry is either a plain string + # (a Pi built-in name, normalized to a ``builtin`` ref downstream) or a + # discriminated dict (``{"type": "composio", ...}``). Resolution happens in the + # backend at invoke time; the service just forwards the list. + tools: List[Any] = field(default_factory=list) + + +def wrapper_dir() -> Path: + """Directory of the TypeScript Pi wrapper (where the command runs).""" + override = os.getenv("AGENTA_AGENT_WRAPPER_DIR") + return Path(override) if override else _DEFAULT_AGENT_DIR + + +def config_dir() -> Path: + """Directory holding AGENTS.md and agent.json.""" + override = os.getenv("AGENTA_AGENT_CONFIG_DIR") + return Path(override) if override else (_DEFAULT_AGENT_DIR / "config") + + +def load_config() -> AgentConfig: + base = config_dir() + + # Read the editable AGENTS.md when present; otherwise fall back to the default + # instructions so a fresh checkout (or Docker layout) still runs. + agents_md = DEFAULT_AGENTS_MD + agents_path = base / "AGENTS.md" + if agents_path.exists(): + text = agents_path.read_text(encoding="utf-8").strip() + if text: + agents_md = text + + model: str = DEFAULT_MODEL + tools: List[str] = [] + meta_path = base / "agent.json" + if meta_path.exists(): + meta = json.loads(meta_path.read_text(encoding="utf-8")) + model = meta.get("model") or DEFAULT_MODEL + tools = meta.get("tools", []) or [] + + return AgentConfig(agents_md=agents_md, model=model, tools=tools) diff --git a/services/oss/src/agent/schemas.py b/services/oss/src/agent/schemas.py new file mode 100644 index 0000000000..7047734a01 --- /dev/null +++ b/services/oss/src/agent/schemas.py @@ -0,0 +1,82 @@ +"""JSON schemas the agent workflow advertises via ``/inspect``. + +The agent self-describes its interface here instead of registering a static SDK +interface. The shape mirrors the chat workflow (messages in, a single assistant +message out) so the playground renders a chat box and POSTs `data.inputs.messages`. + +Kept in its own module so it composes into the workflow registration with a one-line +change and stays out of the handler logic. +""" + +_SCHEMA = "https://json-schema.org/draft/2020-12/schema" + +# Default config the playground pre-fills and the agent falls back to. Kept in sync +# with the catalog template and ``config.py`` (DEFAULT_MODEL / DEFAULT_AGENTS_MD). +_DEFAULT_MODEL = "gpt-5.5" +_DEFAULT_AGENTS_MD = ( + "You are a friendly hello-world agent running on the Agenta agent service.\n\n" + "- Greet the user warmly.\n" + "- Answer the user's message in one or two short sentences." +) + +# Inputs: a chat-style message list. `x-ag-type-ref: messages` is what marks the +# workflow as chat to the playground (same marker the builtin chat service uses). +AGENT_INPUTS_SCHEMA = { + "$schema": _SCHEMA, + "type": "object", + "additionalProperties": True, + "properties": { + "messages": { + "x-ag-type-ref": "messages", + "type": "array", + "description": "Ordered list of normalized chat messages.", + }, + }, +} + +# The agent config element: one composite control the playground renders for the whole +# agent config, instead of reusing `prompt-template` plus loose params. The field shape is +# the `agent_config` catalog type (AgentConfigSchema in agenta.sdk.utils.types), so this is a +# thin `x-ag-type-ref` the playground resolves against `/workflows/catalog/types/agent_config` +# and dispatches to the AgentConfigControl (web/packages/agenta-entity-ui/.../AgentConfigControl.tsx). +# The catalog type keeps the typed tools/mcp_servers shape in one place; this schema only +# carries the default that the playground pre-fills. The agent handler reads it from +# `parameters.agent` in app.py. +_DEFAULT_AGENT_CONFIG = { + "agents_md": _DEFAULT_AGENTS_MD, + "model": _DEFAULT_MODEL, + "tools": [], + "mcp_servers": [], + "harness": "pi", + "sandbox": "local", + "permission_policy": "auto", +} + +AGENT_CONFIG_SCHEMA = { + "type": "object", + "x-ag-type-ref": "agent_config", + "title": "Agent", + "description": "The agent's instructions, model, tools, MCP servers, and runtime.", + "default": _DEFAULT_AGENT_CONFIG, +} + +AGENT_PARAMETERS_SCHEMA = { + "$schema": _SCHEMA, + "type": "object", + "additionalProperties": True, + "properties": {"agent": AGENT_CONFIG_SCHEMA}, +} + +# Outputs: the final assistant message. +AGENT_OUTPUTS_SCHEMA = { + "$schema": _SCHEMA, + "x-ag-type-ref": "message", + "type": "object", + "description": "Final assistant message returned by the agent.", +} + +AGENT_SCHEMAS = { + "inputs": AGENT_INPUTS_SCHEMA, + "parameters": AGENT_PARAMETERS_SCHEMA, + "outputs": AGENT_OUTPUTS_SCHEMA, +} diff --git a/services/oss/src/agent/secrets.py b/services/oss/src/agent/secrets.py new file mode 100644 index 0000000000..7bd3096f35 --- /dev/null +++ b/services/oss/src/agent/secrets.py @@ -0,0 +1,72 @@ +"""Resolve provider API keys from the project vault into harness env vars. + +The agent authenticates the harness with the same provider keys the project configured for +LLM access. We fetch the project's vault ``provider_key`` secrets from the backend (the +same backend + caller credential the tool resolver uses) and inject each as its standard +env var, so the harness uses whichever its model needs. Empty when the vault has none, in +which case the harness falls back to its own login / OAuth (see ``runRivet``). +""" + +from typing import Dict + +import httpx + +from agenta.sdk.utils.logging import get_module_logger + +from oss.src.agent.client import ( + TOOLS_TIMEOUT, + agenta_api_base, + request_authorization, +) + +log = get_module_logger(__name__) + +# Map a vault standard-provider kind to the env var the harness (Pi/Claude/litellm) reads. +# Only providers an agent harness can use are listed. +_PROVIDER_ENV_VARS = { + "openai": "OPENAI_API_KEY", + "anthropic": "ANTHROPIC_API_KEY", + "gemini": "GEMINI_API_KEY", + "mistral": "MISTRAL_API_KEY", + "mistralai": "MISTRAL_API_KEY", + "groq": "GROQ_API_KEY", + "together_ai": "TOGETHERAI_API_KEY", + "openrouter": "OPENROUTER_API_KEY", +} + + +async def resolve_harness_secrets() -> Dict[str, str]: + """Fetch the project vault's provider keys as ``{ENV_VAR: key}``. Best-effort. + + The SDK's per-request secret context does not propagate to this custom route, so we + resolve here rather than reading it. + """ + api_base = agenta_api_base() + if not api_base: + return {} + headers = {"Content-Type": "application/json"} + authorization = request_authorization() + if authorization: + headers["Authorization"] = authorization + + try: + async with httpx.AsyncClient(timeout=TOOLS_TIMEOUT) as client: + response = await client.get(f"{api_base}/secrets/", headers=headers) + if response.status_code >= 400: + log.warning("agent: vault secrets fetch HTTP %s", response.status_code) + return {} + secrets = response.json() or [] + except Exception: # pylint: disable=broad-except + log.warning("agent: vault secrets fetch failed", exc_info=True) + return {} + + env: Dict[str, str] = {} + for secret in secrets: + if not isinstance(secret, dict) or secret.get("kind") != "provider_key": + continue + data = secret.get("data") or {} + env_var = _PROVIDER_ENV_VARS.get(str(data.get("kind", "")).lower()) + key = (data.get("provider") or {}).get("key") + if env_var and key: + env.setdefault(env_var, key) + return env diff --git a/services/oss/src/agent/tools/__init__.py b/services/oss/src/agent/tools/__init__.py new file mode 100644 index 0000000000..e3b68d6167 --- /dev/null +++ b/services/oss/src/agent/tools/__init__.py @@ -0,0 +1,21 @@ +"""Agent-service composition and adapters for tool resolution.""" + +from .gateway import AgentaGatewayToolResolver, _to_gateway_reference +from .resolver import ( + ResolvedAgentResources, + resolve_agent_resources, + resolve_mcp_servers, + resolve_tools, +) +from .secrets import VaultToolSecretProvider + +_gateway_ref = _to_gateway_reference + +__all__ = [ + "AgentaGatewayToolResolver", + "VaultToolSecretProvider", + "ResolvedAgentResources", + "resolve_agent_resources", + "resolve_tools", + "resolve_mcp_servers", +] diff --git a/services/oss/src/agent/tools/gateway.py b/services/oss/src/agent/tools/gateway.py new file mode 100644 index 0000000000..adbea3465f --- /dev/null +++ b/services/oss/src/agent/tools/gateway.py @@ -0,0 +1,195 @@ +"""Agenta HTTP adapter for server-bound gateway tools.""" + +from __future__ import annotations + +from typing import Any, Dict, Sequence + +import httpx + +from agenta.sdk.agents.tools import ( + CallbackToolSpec, + GatewayToolConfig, + GatewayToolResolution, + GatewayToolResolutionError, + ToolCallback, + UnsupportedToolProviderError, +) +from agenta.sdk.utils.logging import get_module_logger + +from oss.src.agent.client import ( + TOOLS_TIMEOUT, + agenta_api_base, + request_authorization, +) + +log = get_module_logger(__name__) + + +def _normalize_reference(reference: str) -> str: + return reference.replace("__", ".") + + +def _to_gateway_reference(tool_config: GatewayToolConfig) -> Dict[str, Any]: + reference: Dict[str, Any] = { + "type": "gateway", + "provider": tool_config.provider, + "integration": tool_config.integration, + "action": tool_config.action, + "connection": tool_config.connection, + } + if tool_config.name: + reference["name"] = tool_config.name + return reference + + +class AgentaGatewayToolResolver: + async def resolve( + self, + tools: Sequence[GatewayToolConfig], + ) -> GatewayToolResolution: + for tool_config in tools: + if tool_config.provider != "composio": + raise UnsupportedToolProviderError(tool_config.provider) + + api_base = agenta_api_base() + if not api_base: + error = GatewayToolResolutionError( + "Agent has gateway tools configured but the Agenta API base URL " + "is unknown. Set AGENTA_AGENT_TOOLS_API_URL or AGENTA_API_URL." + ) + log.warning("agent: gateway tool resolution failed: %s", error) + raise error + + authorization = request_authorization() + headers = {"Content-Type": "application/json"} + if authorization: + headers["Authorization"] = authorization + + references = [_to_gateway_reference(tool_config) for tool_config in tools] + configs_by_reference: dict[str, GatewayToolConfig] = {} + for tool_config in tools: + reference = _normalize_reference(tool_config.reference) + if reference in configs_by_reference: + error = GatewayToolResolutionError( + f"Duplicate gateway reference: {reference}", + reference=reference, + ) + log.warning("agent: %s", error) + raise error + configs_by_reference[reference] = tool_config + + try: + async with httpx.AsyncClient(timeout=TOOLS_TIMEOUT) as client: + response = await client.post( + f"{api_base}/tools/resolve", + json={"tools": references}, + headers=headers, + ) + except httpx.HTTPError as exc: + log.warning( + "agent: gateway tool resolution request failed for %d tool(s)", + len(tools), + exc_info=True, + ) + raise GatewayToolResolutionError( + "Gateway tool resolution request failed", + ref_count=len(tools), + ) from exc + + if response.status_code >= 400: + error = GatewayToolResolutionError( + f"Gateway tool resolution failed (HTTP {response.status_code})", + status=response.status_code, + ref_count=len(tools), + ) + log.warning("agent: %s", error) + raise error + + try: + payload = response.json() or {} + except ValueError as exc: + log.warning( + "agent: gateway tool resolution returned invalid JSON", + exc_info=True, + ) + raise GatewayToolResolutionError( + "Gateway tool resolution returned invalid JSON", + ref_count=len(tools), + ) from exc + + raw_specs = payload.get("custom") if isinstance(payload, dict) else None + if not isinstance(raw_specs, list): + raw_specs = [] + if len(raw_specs) != len(tools): + error = GatewayToolResolutionError( + f"Gateway tool resolution returned {len(raw_specs)} spec(s) for " + f"{len(tools)} ref(s); expected one per ref.", + ref_count=len(tools), + spec_count=len(raw_specs), + ) + log.warning("agent: %s", error) + raise error + + specs_by_reference: dict[str, dict[str, Any]] = {} + for raw_spec in raw_specs: + if not isinstance(raw_spec, dict): + error = GatewayToolResolutionError( + "Gateway tool resolution returned a non-object spec" + ) + log.warning("agent: %s", error) + raise error + call_ref = raw_spec.get("call_ref") + if not call_ref: + error = GatewayToolResolutionError( + "Gateway tool resolution returned an incomplete spec " + f"(name={raw_spec.get('name')!r}, call_ref={call_ref!r})" + ) + log.warning("agent: %s", error) + raise error + reference = _normalize_reference(str(call_ref)) + if reference in specs_by_reference: + error = GatewayToolResolutionError( + f"Gateway tool resolution returned duplicate ref: {reference}", + reference=reference, + ) + log.warning("agent: %s", error) + raise error + specs_by_reference[reference] = raw_spec + + tool_specs: list[CallbackToolSpec] = [] + for reference, tool_config in configs_by_reference.items(): + raw_spec = specs_by_reference.get(reference) + if raw_spec is None: + error = GatewayToolResolutionError( + f"Gateway tool resolution did not return ref: {reference}", + reference=reference, + ) + log.warning("agent: %s", error) + raise error + name = raw_spec.get("name") + if not name: + error = GatewayToolResolutionError( + f"Gateway tool resolution returned an incomplete spec for {reference}", + reference=reference, + ) + log.warning("agent: %s", error) + raise error + tool_specs.append( + CallbackToolSpec( + name=str(name), + description=raw_spec.get("description") or str(name), + input_schema=raw_spec.get("input_schema") + or {"type": "object", "properties": {}}, + call_ref=str(raw_spec["call_ref"]), + needs_approval=tool_config.needs_approval, + render=tool_config.render, + ) + ) + + return GatewayToolResolution( + tool_specs=tool_specs, + tool_callback=ToolCallback( + endpoint=f"{api_base}/tools/call", + authorization=authorization, + ), + ) diff --git a/services/oss/src/agent/tools/resolver.py b/services/oss/src/agent/tools/resolver.py new file mode 100644 index 0000000000..8d56e92906 --- /dev/null +++ b/services/oss/src/agent/tools/resolver.py @@ -0,0 +1,85 @@ +"""Composition of SDK tool and MCP resolvers for the agent service.""" + +from __future__ import annotations + +import os +from typing import Any, Sequence + +from pydantic import BaseModel, ConfigDict, Field + +from agenta.sdk.agents.mcp import ( + MCPResolver, + ResolvedMCPServer, + parse_mcp_server_configs, +) +from agenta.sdk.agents.tools import ( + MissingSecretPolicy, + ResolvedToolSet, + ToolConfig, + ToolResolver, + coerce_tool_configs, +) +from agenta.sdk.utils.constants import TRUTHY + +from .gateway import AgentaGatewayToolResolver +from .secrets import VaultToolSecretProvider + + +class ResolvedAgentResources(BaseModel): + model_config = ConfigDict(frozen=True) + + tools: ResolvedToolSet = Field(default_factory=ResolvedToolSet) + mcp_servers: list[ResolvedMCPServer] = Field(default_factory=list) + + +def _mcp_enabled() -> bool: + return os.getenv("AGENTA_AGENT_ENABLE_MCP", "").strip().lower() in TRUTHY + + +async def resolve_agent_resources( + *, + tools: Sequence[Any], + mcp_servers: Sequence[Any], +) -> ResolvedAgentResources: + tool_configs: list[ToolConfig] = coerce_tool_configs(tools).tool_configs + secret_provider = VaultToolSecretProvider() + resolved_tools = await ToolResolver( + secret_provider=secret_provider, + gateway_resolver=AgentaGatewayToolResolver(), + missing_secret_policy=MissingSecretPolicy.ERROR, + ).resolve(tool_configs) + + resolved_mcp_servers: list[ResolvedMCPServer] = [] + if _mcp_enabled(): + resolved_mcp_servers = await MCPResolver( + secret_provider=secret_provider, + missing_secret_policy=MissingSecretPolicy.ERROR, + ).resolve(parse_mcp_server_configs(mcp_servers)) + + return ResolvedAgentResources( + tools=resolved_tools, + mcp_servers=resolved_mcp_servers, + ) + + +async def resolve_tools(tools: Sequence[Any]) -> ResolvedToolSet: + """Compatibility wrapper for callers resolving tools without MCP.""" + return ( + await resolve_agent_resources( + tools=tools, + mcp_servers=[], + ) + ).tools + + +async def resolve_mcp_servers( + mcp_servers: Sequence[Any], +) -> list[dict[str, Any]]: + """Compatibility wrapper returning the previous wire-dictionary shape.""" + if not _mcp_enabled(): + return [] + resources = await resolve_agent_resources( + tools=[], + mcp_servers=mcp_servers, + ) + return [server.to_wire() for server in resources.mcp_servers] diff --git a/services/oss/src/agent/tools/secrets.py b/services/oss/src/agent/tools/secrets.py new file mode 100644 index 0000000000..59aa10e9f0 --- /dev/null +++ b/services/oss/src/agent/tools/secrets.py @@ -0,0 +1,65 @@ +"""Vault-backed secret provider for agent tools and MCP servers.""" + +from __future__ import annotations + +from typing import Mapping, Sequence + +import httpx + +from agenta.sdk.utils.logging import get_module_logger + +from oss.src.agent.client import ( + TOOLS_TIMEOUT, + agenta_api_base, + request_authorization, +) + +log = get_module_logger(__name__) + + +async def resolve_named_secrets(names: Sequence[str]) -> dict[str, str]: + """Resolve project vault secrets by name for tool and MCP environments.""" + if not names: + return {} + + api_base = agenta_api_base() + if not api_base: + return {} + + headers = {"Content-Type": "application/json"} + authorization = request_authorization() + if authorization: + headers["Authorization"] = authorization + + try: + async with httpx.AsyncClient(timeout=TOOLS_TIMEOUT) as client: + response = await client.post( + f"{api_base}/secrets/resolve", + json={"names": list(names)}, + headers=headers, + ) + if response.status_code >= 400: + log.warning( + "agent: named-secret resolve HTTP %s for %s", + response.status_code, + names, + ) + return {} + data = response.json() or {} + except Exception: # pylint: disable=broad-except + log.warning("agent: named-secret resolve failed for %s", names, exc_info=True) + return {} + + resolved = data.get("secrets") if isinstance(data, dict) else None + resolved = resolved if isinstance(resolved, dict) else {} + missing = [name for name in names if name not in resolved] + if missing: + log.warning("agent: unresolved named secret(s): %s", missing) + return { + str(key): str(value) for key, value in resolved.items() if value is not None + } + + +class VaultToolSecretProvider: + async def get_many(self, names: Sequence[str]) -> Mapping[str, str]: + return await resolve_named_secrets(names) diff --git a/services/oss/src/agent/tracing.py b/services/oss/src/agent/tracing.py new file mode 100644 index 0000000000..7069381a35 --- /dev/null +++ b/services/oss/src/agent/tracing.py @@ -0,0 +1,85 @@ +"""OpenTelemetry glue: thread the workflow trace into the run, record the run's usage. + +The handler runs inside the instrumented ``/invoke`` span, so threading its trace context +into the harness makes the agent's spans children of that span (same trace), and stamping +the run's token/cost totals onto it shows the run's usage even though the harness exports +its span tree in a separate OTLP batch. +""" + +import os +from typing import Any, Dict, Optional + +from opentelemetry import trace as otel_trace + +import agenta as ag +from agenta.sdk.engines.tracing.propagation import inject +from agenta.sdk.utils.logging import get_module_logger + +from agenta.sdk.agents import TraceContext + +log = get_module_logger(__name__) + +_CAPTURE_CONTENT = os.getenv("AGENTA_AGENT_CAPTURE_CONTENT", "true").lower() not in ( + "0", + "false", + "no", +) + + +def trace_context() -> Optional[TraceContext]: + """Capture the active workflow span's trace context for the harness. + + Threading the ``/invoke`` span's ``traceparent`` into the run makes the agent's spans + children of that span, so the whole run shows up under the response's ``trace_id`` the + way completion/chat nest their LLM spans. Best-effort: any failure returns ``None`` and + the run is traced standalone (or not at all) using the runner's env config. + """ + try: + headers = inject({}) + + traceparent = headers.get("traceparent") + if not traceparent: + return None + + endpoint = None + try: + endpoint = ag.tracing.otlp_url + except Exception: # pylint: disable=broad-except + endpoint = None + + return TraceContext( + traceparent=traceparent, + baggage=headers.get("baggage"), + endpoint=endpoint, + authorization=headers.get("Authorization"), + capture_content=_CAPTURE_CONTENT, + ) + except Exception: # pylint: disable=broad-except + log.warning("agent: failed to capture trace context", exc_info=True) + return None + + +def record_usage(usage: Optional[Dict[str, Any]]) -> None: + """Stamp the agent's token/cost totals onto the active ``/invoke`` workflow span. + + The harness emits its own span tree (turns, LLM, tools) in a separate OTLP batch, so + Agenta's per-batch cumulative roll-up cannot bridge the totals onto the workflow span. + Setting ``gen_ai.usage.*`` here records them directly on that span (the root of its + batch), so the trace shows the run's tokens and cost. Best-effort. + """ + if not usage or not usage.get("total"): + return + try: + span = otel_trace.get_current_span() + input_tokens = int(usage.get("input") or 0) + output_tokens = int(usage.get("output") or 0) + span.set_attribute("gen_ai.usage.input_tokens", input_tokens) + span.set_attribute("gen_ai.usage.output_tokens", output_tokens) + span.set_attribute("gen_ai.usage.prompt_tokens", input_tokens) + span.set_attribute("gen_ai.usage.completion_tokens", output_tokens) + span.set_attribute("gen_ai.usage.total_tokens", int(usage.get("total") or 0)) + cost = usage.get("cost") + if cost: + span.set_attribute("gen_ai.usage.cost", float(cost)) + except Exception: # pylint: disable=broad-except + log.warning("agent: failed to record usage on workflow span", exc_info=True) diff --git a/services/oss/tests/pytest/integration/__init__.py b/services/oss/tests/pytest/integration/__init__.py new file mode 100644 index 0000000000..a78ea06af0 --- /dev/null +++ b/services/oss/tests/pytest/integration/__init__.py @@ -0,0 +1 @@ +# Integration tests package. diff --git a/services/oss/tests/pytest/integration/agent/__init__.py b/services/oss/tests/pytest/integration/agent/__init__.py new file mode 100644 index 0000000000..da89fd87e0 --- /dev/null +++ b/services/oss/tests/pytest/integration/agent/__init__.py @@ -0,0 +1 @@ +# Integration tests for the agent workflow service (httpx boundary mocked, no live backend). diff --git a/services/oss/tests/pytest/integration/agent/conftest.py b/services/oss/tests/pytest/integration/agent/conftest.py new file mode 100644 index 0000000000..dfc223343c --- /dev/null +++ b/services/oss/tests/pytest/integration/agent/conftest.py @@ -0,0 +1,76 @@ +"""Integration fixtures: a fake httpx client for the tool/secret resolvers. + +These tests wire the real resolver code against a mocked HTTP boundary (no live backend, no +respx/pytest-httpx dependency). ``install_http`` patches, in a given resolver module, the two +``client`` helpers (``agenta_api_base`` / ``request_authorization``) plus ``httpx.AsyncClient``, +and returns a ``capture`` dict the test can assert the outgoing request against. +""" + +from __future__ import annotations + +import json +from typing import Any, Dict, Optional + +import pytest + + +class _FakeResponse: + def __init__(self, status_code: int, payload: Any, text: Optional[str]) -> None: + self.status_code = status_code + self._payload = payload if payload is not None else {} + self.text = text if text is not None else json.dumps(self._payload) + + def json(self) -> Any: + return self._payload + + +def _fake_async_client(*, response, raises, capture: Dict[str, Any]): + class _Client: + def __init__(self, *args, **kwargs) -> None: + pass + + async def __aenter__(self): + return self + + async def __aexit__(self, *args): + return False + + async def post(self, url, json=None, headers=None): + capture.update(method="POST", url=url, json=json, headers=headers) + if raises: + raise raises + return response + + async def get(self, url, headers=None): + capture.update(method="GET", url=url, headers=headers) + if raises: + raise raises + return response + + return _Client + + +@pytest.fixture +def install_http(monkeypatch): + def _install( + module, + *, + status: int = 200, + payload: Any = None, + text: Optional[str] = None, + raises: Optional[BaseException] = None, + api_base: Optional[str] = "https://api.x/api", + authorization: Optional[str] = "Access tok", + ) -> Dict[str, Any]: + capture: Dict[str, Any] = {} + monkeypatch.setattr(module, "agenta_api_base", lambda: api_base) + monkeypatch.setattr(module, "request_authorization", lambda: authorization) + response = _FakeResponse(status, payload, text) + monkeypatch.setattr( + module.httpx, + "AsyncClient", + _fake_async_client(response=response, raises=raises, capture=capture), + ) + return capture + + return _install diff --git a/services/oss/tests/pytest/integration/agent/test_resolve_secrets_http.py b/services/oss/tests/pytest/integration/agent/test_resolve_secrets_http.py new file mode 100644 index 0000000000..8eb4f45a01 --- /dev/null +++ b/services/oss/tests/pytest/integration/agent/test_resolve_secrets_http.py @@ -0,0 +1,64 @@ +"""``resolve_harness_secrets`` against a mocked ``GET /secrets/``. + +Best-effort by design: it maps only ``provider_key`` vault entries to env vars, dedupes by +env var, and returns ``{}`` on any error rather than failing the run. +""" + +from __future__ import annotations + +import pytest + +from oss.src.agent import secrets +from oss.src.agent.secrets import resolve_harness_secrets + +pytestmark = pytest.mark.integration + + +async def test_no_api_base_returns_empty(install_http): + install_http(secrets, api_base=None) + assert await resolve_harness_secrets() == {} + + +async def test_maps_only_provider_keys_with_dedupe(install_http): + install_http( + secrets, + status=200, + payload=[ + { + "kind": "provider_key", + "data": {"kind": "openai", "provider": {"key": "sk-1"}}, + }, + # duplicate env var -> first one wins (setdefault). + { + "kind": "provider_key", + "data": {"kind": "openai", "provider": {"key": "sk-2"}}, + }, + { + "kind": "provider_key", + "data": {"kind": "anthropic", "provider": {"key": "sk-ant"}}, + }, + # not a provider key -> ignored. + {"kind": "other", "data": {"kind": "openai", "provider": {"key": "x"}}}, + # unmapped provider -> ignored. + { + "kind": "provider_key", + "data": {"kind": "made_up", "provider": {"key": "y"}}, + }, + # missing key -> ignored. + {"kind": "provider_key", "data": {"kind": "groq", "provider": {}}}, + ], + ) + + env = await resolve_harness_secrets() + + assert env == {"OPENAI_API_KEY": "sk-1", "ANTHROPIC_API_KEY": "sk-ant"} + + +async def test_http_error_returns_empty(install_http): + install_http(secrets, status=400) + assert await resolve_harness_secrets() == {} + + +async def test_network_exception_returns_empty(install_http): + install_http(secrets, raises=RuntimeError("network down")) + assert await resolve_harness_secrets() == {} diff --git a/services/oss/tests/pytest/integration/agent/tools/__init__.py b/services/oss/tests/pytest/integration/agent/tools/__init__.py new file mode 100644 index 0000000000..8b13789179 --- /dev/null +++ b/services/oss/tests/pytest/integration/agent/tools/__init__.py @@ -0,0 +1 @@ + diff --git a/services/oss/tests/pytest/integration/agent/tools/test_gateway_http.py b/services/oss/tests/pytest/integration/agent/tools/test_gateway_http.py new file mode 100644 index 0000000000..1664e8fcfe --- /dev/null +++ b/services/oss/tests/pytest/integration/agent/tools/test_gateway_http.py @@ -0,0 +1,170 @@ +from __future__ import annotations + +import httpx +import pytest + +from agenta.sdk.agents import ( + GatewayToolResolutionError, + ToolCallback, +) + +from oss.src.agent.tools import resolve_tools +from oss.src.agent.tools import gateway + +pytestmark = pytest.mark.integration + +_GATEWAY = { + "type": "gateway", + "provider": "composio", + "integration": "github", + "action": "GET_USER", + "connection": "c1", +} + + +async def test_no_gateway_short_circuits_without_http(install_http): + capture = install_http(gateway, raises=AssertionError("must not call HTTP")) + resolved = await resolve_tools(["read"]) + assert resolved.builtin_names == ["read"] + assert capture == {} + + +async def test_missing_api_base_raises_typed_error(install_http): + install_http(gateway, api_base=None) + with pytest.raises(GatewayToolResolutionError, match="API base URL"): + await resolve_tools([_GATEWAY]) + + +async def test_gateway_metadata_and_description_fallback_are_preserved(install_http): + capture = install_http( + gateway, + payload={ + "custom": [ + { + "name": "get_user", + "description": None, + "input_schema": {"type": "object"}, + "call_ref": "tools.composio.github.GET_USER.c1", + } + ] + }, + ) + resolved = await resolve_tools( + [ + { + **_GATEWAY, + "needs_approval": True, + "render": {"kind": "component", "component": "User"}, + } + ] + ) + spec = resolved.tool_specs[0] + assert spec.description == "get_user" + assert spec.needs_approval is True + assert spec.render == {"kind": "component", "component": "User"} + assert spec.to_wire()["needsApproval"] is True + assert isinstance(resolved.tool_callback, ToolCallback) + assert capture["json"]["tools"][0]["type"] == "gateway" + + +async def test_gateway_specs_are_joined_by_call_ref_not_position(install_http): + install_http( + gateway, + payload={ + "custom": [ + { + "name": "second", + "description": "Second", + "input_schema": {}, + "call_ref": "tools.composio.github.SECOND.c2", + }, + { + "name": "first", + "description": "First", + "input_schema": {}, + "call_ref": "tools.composio.github.FIRST.c1", + }, + ] + }, + ) + resolved = await resolve_tools( + [ + { + **_GATEWAY, + "action": "FIRST", + "connection": "c1", + "needs_approval": True, + }, + { + **_GATEWAY, + "action": "SECOND", + "connection": "c2", + "render": {"kind": "component", "component": "Second"}, + }, + ] + ) + first, second = resolved.tool_specs + assert first.name == "first" + assert first.needs_approval is True + assert first.render is None + assert second.name == "second" + assert second.needs_approval is False + assert second.render == {"kind": "component", "component": "Second"} + + +async def test_transport_failure_is_logged_and_normalized( + install_http, + monkeypatch, +): + warnings = [] + monkeypatch.setattr( + gateway, + "log", + type( + "Log", + (), + {"warning": lambda self, *args, **kwargs: warnings.append(args)}, + )(), + ) + request = httpx.Request("POST", "https://api.x/api/tools/resolve") + install_http(gateway, raises=httpx.ConnectError("offline", request=request)) + with pytest.raises(GatewayToolResolutionError) as caught: + await resolve_tools([_GATEWAY]) + assert isinstance(caught.value.__cause__, httpx.ConnectError) + assert warnings + assert "gateway tool resolution request failed" in warnings[0][0].lower() + + +@pytest.mark.parametrize( + ("payload", "message"), + [ + ({"custom": []}, "expected one per ref"), + ( + { + "custom": [ + { + "name": "get_user", + "description": "x", + "input_schema": {}, + } + ] + }, + "incomplete spec", + ), + ], +) +async def test_invalid_gateway_response_fails_fast( + install_http, + payload, + message, +): + install_http(gateway, payload=payload) + with pytest.raises(GatewayToolResolutionError, match=message): + await resolve_tools([_GATEWAY]) + + +async def test_http_status_failure_is_typed(install_http): + install_http(gateway, status=400, text="bad request") + with pytest.raises(GatewayToolResolutionError) as caught: + await resolve_tools([_GATEWAY]) + assert caught.value.status == 400 diff --git a/services/oss/tests/pytest/integration/agent/tools/test_secrets_http.py b/services/oss/tests/pytest/integration/agent/tools/test_secrets_http.py new file mode 100644 index 0000000000..2aea5678fb --- /dev/null +++ b/services/oss/tests/pytest/integration/agent/tools/test_secrets_http.py @@ -0,0 +1,39 @@ +from __future__ import annotations + +import pytest + +from oss.src.agent.tools import secrets + +pytestmark = pytest.mark.integration + + +async def test_named_secrets_are_resolved_by_tools_adapter(install_http): + capture = install_http( + secrets, + payload={"secrets": {"TOKEN": "value", "EMPTY": None}}, + ) + + resolved = await secrets.resolve_named_secrets(["TOKEN", "EMPTY", "MISSING"]) + + assert resolved == {"TOKEN": "value"} + assert capture == { + "method": "POST", + "url": "https://api.x/api/secrets/resolve", + "json": {"names": ["TOKEN", "EMPTY", "MISSING"]}, + "headers": { + "Content-Type": "application/json", + "Authorization": "Access tok", + }, + } + + +async def test_named_secrets_without_api_base_return_empty(install_http): + install_http(secrets, api_base=None) + + assert await secrets.resolve_named_secrets(["TOKEN"]) == {} + + +async def test_named_secret_http_failure_returns_empty(install_http): + install_http(secrets, status=500) + + assert await secrets.resolve_named_secrets(["TOKEN"]) == {} diff --git a/services/oss/tests/pytest/unit/__init__.py b/services/oss/tests/pytest/unit/__init__.py new file mode 100644 index 0000000000..1a351fabba --- /dev/null +++ b/services/oss/tests/pytest/unit/__init__.py @@ -0,0 +1 @@ +# Unit tests package. diff --git a/services/oss/tests/pytest/unit/agent/__init__.py b/services/oss/tests/pytest/unit/agent/__init__.py new file mode 100644 index 0000000000..5da5a3fd3b --- /dev/null +++ b/services/oss/tests/pytest/unit/agent/__init__.py @@ -0,0 +1 @@ +# Unit tests for the agent workflow service (oss.src.agent). diff --git a/services/oss/tests/pytest/unit/agent/conftest.py b/services/oss/tests/pytest/unit/agent/conftest.py new file mode 100644 index 0000000000..f84c7b29df --- /dev/null +++ b/services/oss/tests/pytest/unit/agent/conftest.py @@ -0,0 +1,112 @@ +"""Fakes for the agent service unit tests. + +A local, minimal ``FakeBackend`` (≈ the SDK's) so the ``/invoke`` handler can run end-to-end +in-process with no runner, no LLM, and no network. It implements the real ``Backend`` / +``Sandbox`` / ``Session`` ports, so the port contract keeps it honest across the two suites. + +This conftest is scoped to ``unit/agent/`` so the handler tests do not pull the acceptance +suite's account / live-API fixtures from the services root conftest. +""" + +from __future__ import annotations + +from typing import Dict, Mapping, Optional, Sequence + +import pytest + +from agenta.sdk.agents import AgentResult, HarnessType +from agenta.sdk.agents.interfaces import Backend, Sandbox, Session +from agenta.sdk.agents.streaming import AgentRun + + +class _FakeSandbox(Sandbox): + def __init__(self) -> None: + self.files: Dict[str, bytes] = {} + self.destroyed = False + + async def add_files(self, files: Mapping[str, bytes]) -> None: + self.files.update(files) + + async def destroy(self) -> None: + self.destroyed = True + + +class _FakeSession(Session): + def __init__(self, result: AgentResult) -> None: + self._result = result + self.destroyed = False + + @property + def id(self) -> Optional[str]: + return self._result.session_id + + async def prompt(self, messages, *, on_event=None) -> AgentResult: + return self._result + + def stream(self, messages) -> AgentRun: + result = self._result + + async def _records(): + yield { + "kind": "result", + "result": {"ok": True, "output": result.output}, + } + + return AgentRun(_records()) + + async def destroy(self) -> None: + self.destroyed = True + + +class FakeBackend(Backend): + """Echoes a fixed result, regardless of harness. Records lifecycle for assertions. + + Crucially it also records the *harness-shaped* config each ``create_session`` receives + (the ``PiAgentConfig`` / ``ClaudeAgentConfig`` / ``AgentaAgentConfig`` the harness + produced). This is the backend boundary where per-harness translation surfaces, so a + handler test can assert the response body is identical across harnesses *and* that the + translated configs diverge as designed (Pi keeps built-ins and forces auto; Claude drops + built-ins and honors the policy; Agenta unions forced tools and carries skills). + """ + + def __init__( + self, + *, + result: Optional[AgentResult] = None, + supported: Sequence[HarnessType] = ( + HarnessType.PI, + HarnessType.CLAUDE, + HarnessType.AGENTA, + ), + ) -> None: + self.supported_harnesses = frozenset(supported) + self._result = result if result is not None else AgentResult(output="echo") + self.setup_calls = 0 + self.shutdown_calls = 0 + # Every harness-shaped config that reached the backend boundary, in call order. + self.created_configs: list = [] + self.created_session_ids: list[Optional[str]] = [] + + async def setup(self) -> None: + self.setup_calls += 1 + + async def shutdown(self) -> None: + self.shutdown_calls += 1 + + async def create_sandbox(self) -> _FakeSandbox: + return _FakeSandbox() + + async def create_session( + self, sandbox, config, *, harness, secrets=None, trace=None, session_id=None + ) -> _FakeSession: + self.created_configs.append(config) + self.created_session_ids.append(session_id) + return _FakeSession(self._result) + + +@pytest.fixture +def fake_backend(): + def _make(**kwargs) -> FakeBackend: + return FakeBackend(**kwargs) + + return _make diff --git a/services/oss/tests/pytest/unit/agent/test_invoke_handler.py b/services/oss/tests/pytest/unit/agent/test_invoke_handler.py new file mode 100644 index 0000000000..a562eff74f --- /dev/null +++ b/services/oss/tests/pytest/unit/agent/test_invoke_handler.py @@ -0,0 +1,204 @@ +"""The ``/invoke`` handler (`_agent`) end-to-end in-process. + +Runs the real parse -> resolve -> harness -> record path with a ``FakeBackend`` and the +network-touching helpers stubbed. No runner, no LLM, no HTTP. This is where the cross-harness +"byte-identical response body" guarantee is locked at the Python layer. +""" + +from __future__ import annotations + +import pytest + +from agenta.sdk.agents import ( + AgentConfig, + AgentResult, + GatewayToolResolutionError, + ResolvedToolSet, +) +from agenta.sdk.agents.adapters.agenta_builtins import AGENTA_FORCED_SKILLS + +from oss.src.agent import app +from oss.src.agent.tools import ResolvedAgentResources + + +def _patch_handler(monkeypatch, backend, *, builtins=(), tool_callback=None): + """Stub the network-touching helpers and pin one ``backend`` for the run. + + ``builtins`` are the resolved built-in tool names ``resolve_tools`` hands back, so a turn + can carry a real tool list and the per-harness translation has something to diverge on. + Returns the ``recorded`` dict the usage hook writes into. + """ + recorded = {} + + async def _resources(*, tools, mcp_servers): + return ResolvedAgentResources( + tools=ResolvedToolSet( + builtin_names=list(builtins), + tool_callback=tool_callback, + ) + ) + + async def _no_secrets(): + return {} + + monkeypatch.setattr(app, "resolve_agent_resources", _resources) + monkeypatch.setattr(app, "resolve_harness_secrets", _no_secrets) + monkeypatch.setattr(app, "trace_context", lambda: None) + monkeypatch.setattr( + app, "record_usage", lambda usage: recorded.__setitem__("usage", usage) + ) + monkeypatch.setattr(app, "select_backend", lambda selection: backend) + monkeypatch.setattr( + app, "_default_agent_config", lambda: AgentConfig(instructions="x", model="m") + ) + return recorded + + +@pytest.fixture +def patched(monkeypatch, fake_backend): + backend = fake_backend(result=AgentResult(output="echo", usage={"total": 15})) + recorded = _patch_handler(monkeypatch, backend) + return backend, recorded + + +async def _invoke(harness="pi", **agent): + return await app._agent( + messages=[{"role": "user", "content": "hi"}], + parameters={"agent": {"harness": harness, **agent}}, + ) + + +async def test_invoke_returns_assistant_message(patched): + assert await _invoke("pi") == {"role": "assistant", "content": "echo"} + + +async def test_invoke_records_usage(patched): + _, recorded = patched + await _invoke("pi") + assert recorded["usage"] == {"total": 15} + + +async def test_invoke_runs_backend_lifecycle(patched): + backend, _ = patched + await _invoke("pi") + assert backend.setup_calls == 1 + assert backend.shutdown_calls == 1 # cleanup() tears the backend down + + +async def test_messages_session_id_reaches_session_config(patched): + backend, _ = patched + + await app._agent( + messages=[{"role": "user", "content": "hi"}], + parameters={"agent": {"harness": "pi"}}, + session_id="sess_request", + ) + + assert backend.created_session_ids == ["sess_request"] + + +async def test_invoke_cross_harness_same_body_divergent_configs( + monkeypatch, fake_backend +): + """The real cross-harness guarantee, exercised through the handler — not stubbed. + + The earlier version of this test pinned a single echoing backend and asserted + ``pi == agenta == claude`` on the echoed constant. That passes no matter how badly the + per-harness translation diverges, because the translation never ran. Here the same turn + runs as pi / agenta / claude against a backend that records the *harness-shaped* config it + receives, so we can assert two distinct things: + + 1. the response body is byte-identical across the three harnesses (the response-layer + guarantee), and + 2. the config that reached the backend boundary diverged exactly as designed — proving + the handler actually drove ``PiHarness`` / ``ClaudeHarness`` / ``AgentaHarness``, + each producing its own config. + + The turn carries a built-in tool (``web_search``) and a ``deny`` policy so the divergence + is observable: Claude drops Pi built-ins and honors the policy; Pi keeps them and forces + ``auto``; Agenta unions the forced tools and ships skills. + """ + backend = fake_backend(result=AgentResult(output="echo", usage={"total": 15})) + _patch_handler(monkeypatch, backend, builtins=["web_search"]) + + bodies = [ + await _invoke(harness, permission_policy="deny") + for harness in ("pi", "agenta", "claude") + ] + pi_body, agenta_body, claude_body = bodies + + # (1) Response-layer guarantee: identical body regardless of harness. + assert ( + pi_body + == agenta_body + == claude_body + == { + "role": "assistant", + "content": "echo", + } + ) + + # (2) The three harness-shaped configs that reached the backend boundary, in call order. + assert len(backend.created_configs) == 3 + pi_cfg, agenta_cfg, claude_cfg = backend.created_configs + pi_wire = pi_cfg.wire_tools() + agenta_wire = agenta_cfg.wire_tools() + claude_wire = claude_cfg.wire_tools() + + # Pi keeps its built-in tool natively and never gates tool use (policy forced to auto, + # the author's `deny` notwithstanding). + assert pi_wire["tools"] == ["web_search"] + assert pi_wire["permissionPolicy"] == "auto" + assert "skills" not in pi_wire + + # Claude has no Pi built-ins (the `web_search` name is dropped) and honors the policy. + assert claude_wire["tools"] == [] + assert claude_wire["permissionPolicy"] == "deny" + assert "skills" not in claude_wire + + # Agenta is Pi-with-an-opinion: it unions the forced tools onto the author's set, forces + # auto like Pi, and ships the forced skills. + assert agenta_wire["tools"] == ["web_search", "read", "bash"] + assert agenta_wire["permissionPolicy"] == "auto" + assert agenta_wire["skills"] == list(AGENTA_FORCED_SKILLS) + + # The configs genuinely differ at the boundary; the body's sameness is not a tautology. + assert pi_wire != claude_wire + assert agenta_wire != pi_wire + + +async def test_stream_tool_resolution_failure_is_raised_before_backend_setup( + monkeypatch, +): + async def _failure(*, tools, mcp_servers): + raise GatewayToolResolutionError("gateway unavailable") + + monkeypatch.setattr(app, "resolve_agent_resources", _failure) + monkeypatch.setattr( + app, + "_default_agent_config", + lambda: AgentConfig( + tools=[ + { + "type": "gateway", + "integration": "github", + "action": "GET_USER", + "connection": "c1", + } + ] + ), + ) + monkeypatch.setattr( + app, + "select_backend", + lambda _selection: (_ for _ in ()).throw( + AssertionError("backend must not be selected") + ), + ) + + with pytest.raises(GatewayToolResolutionError, match="gateway unavailable"): + await app._agent( + messages=[{"role": "user", "content": "hi"}], + parameters={"agent": {"harness": "pi"}}, + stream=True, + ) diff --git a/services/oss/tests/pytest/unit/agent/test_secrets_mapping.py b/services/oss/tests/pytest/unit/agent/test_secrets_mapping.py new file mode 100644 index 0000000000..54104e3bb0 --- /dev/null +++ b/services/oss/tests/pytest/unit/agent/test_secrets_mapping.py @@ -0,0 +1,24 @@ +"""Provider-key -> harness env-var mapping. + +The harness authenticates with the project's vault provider keys, injected as the env vars +each provider's SDK reads. If a name here drifts from what the harness expects, auth fails +silently and the run falls back to login/OAuth, so the table is worth a guard. +""" + +from __future__ import annotations + +from oss.src.agent.secrets import _PROVIDER_ENV_VARS + + +def test_standard_providers_map_to_expected_env_vars(): + assert _PROVIDER_ENV_VARS["openai"] == "OPENAI_API_KEY" + assert _PROVIDER_ENV_VARS["anthropic"] == "ANTHROPIC_API_KEY" + assert _PROVIDER_ENV_VARS["gemini"] == "GEMINI_API_KEY" + assert _PROVIDER_ENV_VARS["groq"] == "GROQ_API_KEY" + assert _PROVIDER_ENV_VARS["together_ai"] == "TOGETHERAI_API_KEY" + assert _PROVIDER_ENV_VARS["openrouter"] == "OPENROUTER_API_KEY" + + +def test_both_mistral_spellings_share_one_env_var(): + assert _PROVIDER_ENV_VARS["mistral"] == "MISTRAL_API_KEY" + assert _PROVIDER_ENV_VARS["mistralai"] == "MISTRAL_API_KEY" diff --git a/services/oss/tests/pytest/unit/agent/test_select_backend.py b/services/oss/tests/pytest/unit/agent/test_select_backend.py new file mode 100644 index 0000000000..e998428d99 --- /dev/null +++ b/services/oss/tests/pytest/unit/agent/test_select_backend.py @@ -0,0 +1,61 @@ +"""``select_backend``: the engine-routing decision. + +The harness and sandbox are orthogonal playground choices; this locks how they (plus the +``AGENTA_AGENT_RUNTIME`` deployment override) map to an engine. ``pi`` and ``agenta`` stay on +the in-process Pi backend locally; anything else routes to rivet. The transport (HTTP sidecar +vs subprocess) follows ``AGENTA_AGENT_PI_URL``. +""" + +from __future__ import annotations + +import pytest + +from agenta.sdk.agents import InProcessPiBackend, RivetBackend, RunSelection + +from oss.src.agent.app import select_backend + + +@pytest.fixture(autouse=True) +def _clean_env(monkeypatch): + # Start every case from a known-empty deployment environment. + monkeypatch.delenv("AGENTA_AGENT_RUNTIME", raising=False) + monkeypatch.delenv("AGENTA_AGENT_PI_URL", raising=False) + + +def _sel(harness="pi", sandbox="local"): + return RunSelection(harness=harness, sandbox=sandbox) + + +def test_pi_local_uses_in_process(): + assert isinstance(select_backend(_sel("pi", "local")), InProcessPiBackend) + + +def test_agenta_local_uses_in_process(): + # Agenta is Pi with an opinion, so it stays on the in-process Pi backend. + assert isinstance(select_backend(_sel("agenta", "local")), InProcessPiBackend) + + +def test_claude_routes_to_rivet(): + assert isinstance(select_backend(_sel("claude", "local")), RivetBackend) + + +def test_non_local_sandbox_routes_to_rivet(): + backend = select_backend(_sel("pi", "daytona")) + assert isinstance(backend, RivetBackend) + assert backend._sandbox == "daytona" # the sandbox axis is threaded through + + +def test_runtime_override_forces_rivet(monkeypatch): + monkeypatch.setenv("AGENTA_AGENT_RUNTIME", "rivet") + assert isinstance(select_backend(_sel("pi", "local")), RivetBackend) + + +def test_pi_url_selects_http_transport(monkeypatch): + monkeypatch.setenv("AGENTA_AGENT_PI_URL", "http://agent-pi:8765") + backend = select_backend(_sel("pi", "local")) + assert backend._url == "http://agent-pi:8765" + + +def test_no_pi_url_uses_subprocess_transport(): + # Unset URL means the backend will spawn the runner CLI rather than POST to a sidecar. + assert select_backend(_sel("pi", "local"))._url is None diff --git a/services/oss/tests/pytest/unit/agent/tools/__init__.py b/services/oss/tests/pytest/unit/agent/tools/__init__.py new file mode 100644 index 0000000000..8b13789179 --- /dev/null +++ b/services/oss/tests/pytest/unit/agent/tools/__init__.py @@ -0,0 +1 @@ + diff --git a/services/oss/tests/pytest/unit/agent/tools/test_gateway_mapping.py b/services/oss/tests/pytest/unit/agent/tools/test_gateway_mapping.py new file mode 100644 index 0000000000..3ca0690980 --- /dev/null +++ b/services/oss/tests/pytest/unit/agent/tools/test_gateway_mapping.py @@ -0,0 +1,19 @@ +from agenta.sdk.agents import GatewayToolConfig + +from oss.src.agent.tools import _to_gateway_reference + + +def test_gateway_reference_uses_canonical_sdk_shape(): + assert _to_gateway_reference( + GatewayToolConfig( + integration="github", + action="GET_USER", + connection="c1", + ) + ) == { + "type": "gateway", + "provider": "composio", + "integration": "github", + "action": "GET_USER", + "connection": "c1", + } diff --git a/services/oss/tests/pytest/unit/agent/tools/test_resolution.py b/services/oss/tests/pytest/unit/agent/tools/test_resolution.py new file mode 100644 index 0000000000..92ef5368d7 --- /dev/null +++ b/services/oss/tests/pytest/unit/agent/tools/test_resolution.py @@ -0,0 +1,86 @@ +from __future__ import annotations + +import pytest + +from agenta.sdk.agents import ( + CodeToolSpec, + MissingMCPSecretError, + MissingToolSecretError, +) + +from oss.src.agent.tools import resolve_mcp_servers, resolve_tools +from oss.src.agent.tools import resolver as resolver_module +from oss.src.agent.tools import secrets as secrets_module + + +async def test_resolve_tools_builds_local_specs_with_scoped_secrets(monkeypatch): + async def _named_secrets(names): + assert names == ["TOKEN"] + return {"TOKEN": "secret"} + + monkeypatch.setattr(secrets_module, "resolve_named_secrets", _named_secrets) + resolved = await resolve_tools( + [ + "read", + { + "type": "code", + "name": "calc", + "script": "...", + "secrets": ["TOKEN"], + }, + { + "type": "client", + "name": "pick", + }, + ] + ) + assert resolved.builtin_names == ["read"] + code = next(spec for spec in resolved.tool_specs if spec.name == "calc") + assert isinstance(code, CodeToolSpec) + assert code.env == {"TOKEN": "secret"} + assert ( + next(spec for spec in resolved.tool_specs if spec.name == "pick").kind + == "client" + ) + + +async def test_missing_tool_secret_is_not_silently_omitted(monkeypatch): + async def _named_secrets(_names): + return {} + + monkeypatch.setattr(secrets_module, "resolve_named_secrets", _named_secrets) + with pytest.raises(MissingToolSecretError): + await resolve_tools( + [ + { + "type": "code", + "name": "calc", + "script": "...", + "secrets": ["TOKEN"], + } + ] + ) + + +async def test_mcp_is_disabled_at_service_composition_by_default(monkeypatch): + monkeypatch.delenv("AGENTA_AGENT_ENABLE_MCP", raising=False) + assert await resolve_mcp_servers([{"name": "github", "command": "npx"}]) == [] + + +async def test_missing_mcp_secret_is_explicit_when_enabled(monkeypatch): + monkeypatch.setattr(resolver_module, "_mcp_enabled", lambda: True) + + async def _named_secrets(_names): + return {} + + monkeypatch.setattr(secrets_module, "resolve_named_secrets", _named_secrets) + with pytest.raises(MissingMCPSecretError): + await resolve_mcp_servers( + [ + { + "name": "github", + "command": "npx", + "secrets": {"GITHUB_TOKEN": "missing"}, + } + ] + ) From 78b85ae7e7d68ac117839949230ae23c48398dec Mon Sep 17 00:00:00 2001 From: Mahmoud Mabrouk Date: Fri, 19 Jun 2026 22:12:35 +0200 Subject: [PATCH 2/2] test(agent): cover missing runner assets --- .../pytest/unit/agent/test_select_backend.py | 29 +++++++++++++++++-- 1 file changed, 27 insertions(+), 2 deletions(-) diff --git a/services/oss/tests/pytest/unit/agent/test_select_backend.py b/services/oss/tests/pytest/unit/agent/test_select_backend.py index e998428d99..4f65bb29f3 100644 --- a/services/oss/tests/pytest/unit/agent/test_select_backend.py +++ b/services/oss/tests/pytest/unit/agent/test_select_backend.py @@ -8,18 +8,34 @@ from __future__ import annotations +from pathlib import Path + import pytest -from agenta.sdk.agents import InProcessPiBackend, RivetBackend, RunSelection +from agenta.sdk.agents import ( + AgentRunnerConfigurationError, + InProcessPiBackend, + RivetBackend, + RunSelection, +) from oss.src.agent.app import select_backend +@pytest.fixture +def runner_wrapper(tmp_path: Path) -> Path: + cli = tmp_path / "src" / "cli.ts" + cli.parent.mkdir() + cli.write_text("console.log('runner')\n", encoding="utf-8") + return tmp_path + + @pytest.fixture(autouse=True) -def _clean_env(monkeypatch): +def _clean_env(monkeypatch, runner_wrapper: Path): # Start every case from a known-empty deployment environment. monkeypatch.delenv("AGENTA_AGENT_RUNTIME", raising=False) monkeypatch.delenv("AGENTA_AGENT_PI_URL", raising=False) + monkeypatch.setenv("AGENTA_AGENT_WRAPPER_DIR", str(runner_wrapper)) def _sel(harness="pi", sandbox="local"): @@ -59,3 +75,12 @@ def test_pi_url_selects_http_transport(monkeypatch): def test_no_pi_url_uses_subprocess_transport(): # Unset URL means the backend will spawn the runner CLI rather than POST to a sidecar. assert select_backend(_sel("pi", "local"))._url is None + + +def test_no_pi_url_requires_runner_assets(monkeypatch, tmp_path: Path): + missing_wrapper = tmp_path / "missing-wrapper" + missing_wrapper.mkdir() + monkeypatch.setenv("AGENTA_AGENT_WRAPPER_DIR", str(missing_wrapper)) + + with pytest.raises(AgentRunnerConfigurationError, match="src/cli.ts"): + select_backend(_sel("pi", "local"))