diff --git a/api/ee/src/core/access/permissions/types.py b/api/ee/src/core/access/permissions/types.py index c3ab36b719..6cf7ee1647 100644 --- a/api/ee/src/core/access/permissions/types.py +++ b/api/ee/src/core/access/permissions/types.py @@ -190,6 +190,11 @@ class Permission(str, Enum): EDIT_TOOLS = "edit_tools" RUN_TOOLS = "run_tools" + # Triggers + VIEW_TRIGGERS = "view_triggers" + EDIT_TRIGGERS = "edit_triggers" + RUN_TRIGGERS = "run_triggers" + @classmethod def default_permissions(cls, role): VIEWER_PERMISSIONS = [ @@ -217,6 +222,7 @@ def default_permissions(cls, role): cls.VIEW_EVALUATION_METRICS, cls.VIEW_EVALUATION_QUEUES, cls.VIEW_TOOLS, + cls.VIEW_TRIGGERS, ] ANNOTATOR_PERMISSIONS = VIEWER_PERMISSIONS + [ cls.CREATE_EVALUATION, @@ -230,6 +236,7 @@ def default_permissions(cls, role): cls.EDIT_EVALUATION_QUEUES, cls.EDIT_SPANS, cls.RUN_TOOLS, + cls.RUN_TRIGGERS, ] EDITOR_PERMISSIONS = ANNOTATOR_PERMISSIONS + [ cls.EDIT_APPLICATIONS, @@ -251,6 +258,7 @@ def default_permissions(cls, role): cls.EDIT_TESTSETS, cls.EDIT_INVOCATIONS, cls.EDIT_TOOLS, + cls.EDIT_TRIGGERS, ] DEVELOPER_PERMISSIONS = EDITOR_PERMISSIONS + [ cls.VIEW_API_KEYS, diff --git a/api/ee/tests/pytest/acceptance/triggers/__init__.py b/api/ee/tests/pytest/acceptance/triggers/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/api/ee/tests/pytest/acceptance/triggers/test_triggers_catalog.py b/api/ee/tests/pytest/acceptance/triggers/test_triggers_catalog.py new file mode 100644 index 0000000000..3ec4a7e956 --- /dev/null +++ b/api/ee/tests/pytest/acceptance/triggers/test_triggers_catalog.py @@ -0,0 +1,160 @@ +"""EE acceptance tests for the triggers events catalog. + +Mirrors the OSS suite (oss/tests/pytest/acceptance/triggers/test_triggers_catalog.py) +but exercises /triggers/catalog/* as a business-plan, developer-role account. +Under EE the catalog is gated on the VIEW_TRIGGERS permission; a developer role +carries VIEW_TRIGGERS, so this verifies the endpoint behaves once the gate is +satisfied. + +Provider-catalog reads need no Composio credentials (empty catalog is valid). +Event browse / config-schema fetch make real Composio calls and are gated on +COMPOSIO_API_KEY being present in the runner's environment. + +Requires a running API. +""" + +import os +from uuid import uuid4 + +import pytest +import requests + +from utils.constants import BASE_TIMEOUT + + +_COMPOSIO_ENABLED = bool(os.getenv("COMPOSIO_API_KEY")) +_requires_composio = pytest.mark.skipif( + not _COMPOSIO_ENABLED, + reason="needs live Composio credentials (COMPOSIO_API_KEY)", +) + + +def _create_developer_business_account(admin_api): + uid = uuid4().hex[:12] + email = f"triggers-dev-{uid}@test.agenta.ai" + resp = admin_api( + "POST", + "/admin/simple/accounts/", + json={ + "accounts": { + "u": { + "user": {"email": email}, + "options": { + "create_api_keys": True, + "return_api_keys": True, + "seed_defaults": False, + }, + "subscription": {"plan": "cloud_v0_business"}, + "organization_memberships": [ + { + "organization_ref": {"ref": "org"}, + "user_ref": {"ref": "user"}, + "role": "developer", + } + ], + "workspace_memberships": [ + { + "workspace_ref": {"ref": "wrk"}, + "user_ref": {"ref": "user"}, + "role": "developer", + } + ], + "project_memberships": [ + { + "project_ref": {"ref": "prj"}, + "user_ref": {"ref": "user"}, + "role": "developer", + } + ], + } + } + }, + ) + assert resp.status_code == 200, resp.text + account = resp.json()["accounts"]["u"] + return { + "email": email, + "credentials": f"ApiKey {account['api_keys']['key']}", + } + + +def _delete_account_by_email(admin_api, *, email): + resp = admin_api( + "DELETE", + "/admin/simple/accounts/", + json={"accounts": {"u": {"user": {"email": email}}}, "confirm": "delete"}, + ) + assert resp.status_code == 204, resp.text + + +@pytest.fixture(scope="class") +def triggers_api(admin_api, ag_env): + account = _create_developer_business_account(admin_api) + + def _request(method: str, endpoint: str, **kwargs): + headers = kwargs.pop("headers", {}) + headers.setdefault("Authorization", account["credentials"]) + return requests.request( + method=method, + url=f"{ag_env['api_url']}{endpoint}", + headers=headers, + timeout=BASE_TIMEOUT, + **kwargs, + ) + + yield _request + + _delete_account_by_email(admin_api, email=account["email"]) + + +class TestTriggersCatalogProviders: + def test_list_providers_returns_200(self, triggers_api): + response = triggers_api("GET", "/triggers/catalog/providers/") + assert response.status_code == 200 + + def test_list_providers_response_shape(self, triggers_api): + body = triggers_api("GET", "/triggers/catalog/providers/").json() + assert "count" in body + assert "providers" in body + assert isinstance(body["providers"], list) + assert body["count"] == len(body["providers"]) + + @pytest.mark.skipif( + _COMPOSIO_ENABLED, + reason="catalog is non-empty when Composio is enabled", + ) + def test_list_providers_empty_when_composio_disabled(self, triggers_api): + body = triggers_api("GET", "/triggers/catalog/providers/").json() + assert body["count"] == 0 + assert body["providers"] == [] + + +@_requires_composio +class TestTriggersCatalogEvents: + def test_browse_events_returns_200(self, triggers_api): + response = triggers_api( + "GET", + "/triggers/catalog/providers/composio/integrations/github/events/", + ) + assert response.status_code == 200 + body = response.json() + assert "events" in body + assert isinstance(body["events"], list) + + def test_fetch_event_config_schema(self, triggers_api): + listing = triggers_api( + "GET", + "/triggers/catalog/providers/composio/integrations/github/events/", + ).json() + if not listing["events"]: + pytest.skip("no github events available from Composio") + + event_key = listing["events"][0]["key"] + response = triggers_api( + "GET", + f"/triggers/catalog/providers/composio/integrations/github/events/{event_key}", + ) + assert response.status_code == 200 + event = response.json()["event"] + assert event["key"] == event_key + assert "trigger_config" in event diff --git a/api/entrypoints/routers.py b/api/entrypoints/routers.py index e7c0cf9d55..badfdcf0a0 100644 --- a/api/entrypoints/routers.py +++ b/api/entrypoints/routers.py @@ -144,6 +144,10 @@ from oss.src.core.tools.registry import ToolsGatewayRegistry from oss.src.core.tools.service import ToolsService from oss.src.apis.fastapi.tools.router import ToolsRouter +from oss.src.core.triggers.providers.composio import ComposioTriggersAdapter +from oss.src.core.triggers.registry import TriggersGatewayRegistry +from oss.src.core.triggers.service import TriggersService +from oss.src.apis.fastapi.triggers.router import TriggersRouter from oss.src.apis.fastapi.shared.utils import SupportHeadersMiddleware @@ -217,6 +221,9 @@ async def lifespan(*args, **kwargs): for adapter in _composio_connections_adapters.values(): await adapter.close() + for adapter in _composio_triggers_adapters.values(): + await adapter.close() + await _transactions_engine.close() await _analytics_engine.close() await _streams_engine.close() @@ -310,6 +317,11 @@ async def lifespan(*args, **kwargs): "description": "External tool connections and OAuth integrations available to applications.", }, # -- + { + "name": "Triggers", + "description": "Inbound provider event triggers and their watchable event catalog.", + }, + # -- { "name": "Folders", "description": "Organize applications and other resources into folder hierarchies.", @@ -618,6 +630,22 @@ async def lifespan(*args, **kwargs): adapter_registry=tools_adapter_registry, ) +# Triggers adapter + service +_composio_triggers_adapters = {} +if env.composio.enabled: + _composio_triggers_adapters["composio"] = ComposioTriggersAdapter( + api_key=env.composio.api_key, # type: ignore[arg-type] # guarded by .enabled + api_url=env.composio.api_url, + ) + +triggers_adapter_registry = TriggersGatewayRegistry( + adapters=_composio_triggers_adapters, +) + +triggers_service = TriggersService( + adapter_registry=triggers_adapter_registry, +) + _t_services_done = time.perf_counter() - _t_services print(f"[STARTUP] Service initialization completed (+{_t_services_done:.3f}s)") _t_routers = time.perf_counter() @@ -732,6 +760,10 @@ async def lifespan(*args, **kwargs): tools_service=tools_service, ) +triggers = TriggersRouter( + triggers_service=triggers_service, +) + simple_traces = SimpleTracesRouter( simple_traces_service=simple_traces_service, ) @@ -1099,6 +1131,19 @@ async def lifespan(*args, **kwargs): include_in_schema=False, ) +app.include_router( + router=triggers.router, + prefix="/triggers", + tags=["Triggers"], +) + +app.include_router( + router=triggers.router, + prefix="/preview/triggers", + tags=["Triggers"], + include_in_schema=False, +) + app.include_router( router=evaluations.admin_router, prefix="/admin/evaluations", diff --git a/api/oss/src/apis/fastapi/triggers/__init__.py b/api/oss/src/apis/fastapi/triggers/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/api/oss/src/apis/fastapi/triggers/models.py b/api/oss/src/apis/fastapi/triggers/models.py new file mode 100644 index 0000000000..f1bd9b73d7 --- /dev/null +++ b/api/oss/src/apis/fastapi/triggers/models.py @@ -0,0 +1,36 @@ +from typing import List, Optional + +from pydantic import BaseModel, Field + +from oss.src.core.triggers.dtos import ( + TriggerCatalogEvent, + TriggerCatalogEventDetails, + TriggerCatalogProvider, +) + + +# --------------------------------------------------------------------------- +# Trigger Catalog +# --------------------------------------------------------------------------- + + +class TriggerCatalogProviderResponse(BaseModel): + count: int = 0 + provider: Optional[TriggerCatalogProvider] = None + + +class TriggerCatalogProvidersResponse(BaseModel): + count: int = 0 + providers: List[TriggerCatalogProvider] = Field(default_factory=list) + + +class TriggerCatalogEventResponse(BaseModel): + count: int = 0 + event: Optional[TriggerCatalogEventDetails] = None + + +class TriggerCatalogEventsResponse(BaseModel): + count: int = 0 + total: int = 0 + cursor: Optional[str] = None + events: List[TriggerCatalogEvent] = Field(default_factory=list) diff --git a/api/oss/src/apis/fastapi/triggers/router.py b/api/oss/src/apis/fastapi/triggers/router.py new file mode 100644 index 0000000000..91ae7ff2e0 --- /dev/null +++ b/api/oss/src/apis/fastapi/triggers/router.py @@ -0,0 +1,324 @@ +from functools import wraps +from typing import Optional + +import httpx +from fastapi import APIRouter, HTTPException, Query, Request, status +from fastapi.responses import JSONResponse + +from oss.src.utils.exceptions import intercept_exceptions +from oss.src.utils.logging import get_module_logger +from oss.src.utils.caching import get_cache, set_cache +from oss.src.utils.common import is_ee + +from oss.src.apis.fastapi.triggers.models import ( + TriggerCatalogEventResponse, + TriggerCatalogEventsResponse, + TriggerCatalogProviderResponse, + TriggerCatalogProvidersResponse, +) +from oss.src.core.triggers.exceptions import AdapterError +from oss.src.core.triggers.service import TriggersService + + +if is_ee(): + from ee.src.core.access.permissions.types import Permission + from ee.src.core.access.permissions.service import ( + check_action_access, + FORBIDDEN_EXCEPTION, + ) + +log = get_module_logger(__name__) + + +def handle_adapter_exceptions(): + """Map unknown providers to 404 and upstream 401 failures to 424.""" + + def decorator(func): + @wraps(func) + async def wrapper(*args, **kwargs): + try: + return await func(*args, **kwargs) + except ProviderNotFoundError as e: + raise HTTPException( + status_code=status.HTTP_404_NOT_FOUND, + detail=str(e), + ) from e + except AdapterError as e: + cause = e.__cause__ + if not ( + isinstance(cause, httpx.HTTPStatusError) + and cause.response is not None + and cause.response.status_code == status.HTTP_401_UNAUTHORIZED + ): + raise + + raise HTTPException( + status_code=status.HTTP_424_FAILED_DEPENDENCY, + detail=e.message, + ) from e + + return wrapper + + return decorator + + +class TriggersRouter: + def __init__( + self, + *, + triggers_service: TriggersService, + ): + self.triggers_service = triggers_service + + self.router = APIRouter() + + # --- Trigger Catalog --- + self.router.add_api_route( + "/catalog/providers/", + self.list_providers, + methods=["GET"], + operation_id="list_trigger_providers", + response_model=TriggerCatalogProvidersResponse, + response_model_exclude_none=True, + ) + self.router.add_api_route( + "/catalog/providers/{provider_key}", + self.get_provider, + methods=["GET"], + operation_id="fetch_trigger_provider", + response_model=TriggerCatalogProviderResponse, + response_model_exclude_none=True, + ) + self.router.add_api_route( + "/catalog/providers/{provider_key}/integrations/{integration_key}/events/", + self.list_events, + methods=["GET"], + operation_id="list_trigger_events", + response_model=TriggerCatalogEventsResponse, + response_model_exclude_none=True, + ) + self.router.add_api_route( + "/catalog/providers/{provider_key}/integrations/{integration_key}/events/{event_key}", + self.get_event, + methods=["GET"], + operation_id="fetch_trigger_event", + response_model=TriggerCatalogEventResponse, + response_model_exclude_none=True, + ) + + # ----------------------------------------------------------------------- + # Trigger Catalog + # ----------------------------------------------------------------------- + + @intercept_exceptions() + @handle_adapter_exceptions() + async def list_providers( + self, + request: Request, + ) -> TriggerCatalogProvidersResponse: + if is_ee(): + has_permission = await check_action_access( + project_id=request.state.project_id, + user_uid=request.state.user_id, + permission=Permission.VIEW_TRIGGERS, + ) + if not has_permission: + raise FORBIDDEN_EXCEPTION + + cached = await get_cache( + project_id=None, # catalog is global; not per-project + namespace="triggers:catalog:providers", + key={}, + model=TriggerCatalogProvidersResponse, + ) + if cached: + return cached + + providers = await self.triggers_service.list_providers() + items = list(providers) + + response = TriggerCatalogProvidersResponse( + count=len(items), + providers=items, + ) + + await set_cache( + project_id=None, + namespace="triggers:catalog:providers", + key={}, + value=response, + ttl=5 * 60, + ) + + return response + + @intercept_exceptions() + @handle_adapter_exceptions() + async def get_provider( + self, + request: Request, + provider_key: str, + ) -> TriggerCatalogProviderResponse: + if is_ee(): + has_permission = await check_action_access( + user_uid=request.state.user_id, + project_id=request.state.project_id, + permission=Permission.VIEW_TRIGGERS, + ) + if not has_permission: + raise FORBIDDEN_EXCEPTION + + cache_key = {"provider_key": provider_key} + cached = await get_cache( + project_id=None, + namespace="triggers:catalog:provider", + key=cache_key, + model=TriggerCatalogProviderResponse, + ) + if cached: + return cached + + provider = await self.triggers_service.get_provider( + provider_key=provider_key, + ) + if not provider: + return JSONResponse( + status_code=404, + content={"detail": "Provider not found"}, + ) + + response = TriggerCatalogProviderResponse( + count=1, + provider=provider, + ) + + await set_cache( + project_id=None, + namespace="triggers:catalog:provider", + key=cache_key, + value=response, + ttl=5 * 60, + ) + + return response + + @intercept_exceptions() + @handle_adapter_exceptions() + async def list_events( + self, + request: Request, + provider_key: str, + integration_key: str, + *, + query: Optional[str] = Query(default=None), + limit: Optional[int] = Query(default=None), + cursor: Optional[str] = Query(default=None), + ) -> TriggerCatalogEventsResponse: + if is_ee(): + has_permission = await check_action_access( + user_uid=request.state.user_id, + project_id=request.state.project_id, + permission=Permission.VIEW_TRIGGERS, + ) + if not has_permission: + raise FORBIDDEN_EXCEPTION + + cache_key = { + "provider_key": provider_key, + "integration_key": integration_key, + "query": query, + "limit": limit, + "cursor": cursor, + } + cached = await get_cache( + project_id=None, + namespace="triggers:catalog:events", + key=cache_key, + model=TriggerCatalogEventsResponse, + ) + if cached: + return cached + + events, next_cursor, total = await self.triggers_service.list_events( + provider_key=provider_key, + integration_key=integration_key, + query=query, + limit=limit, + cursor=cursor, + ) + items = list(events) + + response = TriggerCatalogEventsResponse( + count=len(items), + total=total, + cursor=next_cursor, + events=items, + ) + + await set_cache( + project_id=None, + namespace="triggers:catalog:events", + key=cache_key, + value=response, + ttl=5 * 60, + ) + + return response + + @intercept_exceptions() + @handle_adapter_exceptions() + async def get_event( + self, + request: Request, + provider_key: str, + integration_key: str, + event_key: str, + ) -> TriggerCatalogEventResponse: + if is_ee(): + has_permission = await check_action_access( + user_uid=request.state.user_id, + project_id=request.state.project_id, + permission=Permission.VIEW_TRIGGERS, + ) + if not has_permission: + raise FORBIDDEN_EXCEPTION + + cache_key = { + "provider_key": provider_key, + "integration_key": integration_key, + "event_key": event_key, + } + cached = await get_cache( + project_id=None, + namespace="triggers:catalog:event", + key=cache_key, + model=TriggerCatalogEventResponse, + ) + if cached: + return cached + + event = await self.triggers_service.get_event( + provider_key=provider_key, + integration_key=integration_key, + event_key=event_key, + ) + if not event: + return JSONResponse( + status_code=404, + content={"detail": "Event not found"}, + ) + + response = TriggerCatalogEventResponse( + count=1, + event=event, + ) + + await set_cache( + project_id=None, + namespace="triggers:catalog:event", + key=cache_key, + value=response, + ttl=5 * 60, + ) + + return response diff --git a/api/oss/src/core/tools/interfaces.py b/api/oss/src/core/tools/interfaces.py index 0e459619e6..0a61d59ee9 100644 --- a/api/oss/src/core/tools/interfaces.py +++ b/api/oss/src/core/tools/interfaces.py @@ -20,53 +20,53 @@ class ToolsGatewayInterface(ABC): @abstractmethod async def list_providers(self) -> List[ToolCatalogProvider]: ... - - @abstractmethod - async def list_integrations( - self, - *, - search: Optional[str] = None, - sort_by: Optional[str] = None, - limit: Optional[int] = None, - cursor: Optional[str] = None, - ) -> Tuple[List[ToolCatalogIntegration], Optional[str], int]: - """Returns (items, next_cursor, total_items).""" - ... - - @abstractmethod - async def get_integration( - self, - *, - integration_key: str, - ) -> Optional[ToolCatalogIntegration]: ... - - @abstractmethod - async def list_actions( - self, - *, - integration_key: str, - query: Optional[str] = None, - categories: Optional[List[str]] = None, - important: Optional[bool] = None, - limit: Optional[int] = None, - cursor: Optional[str] = None, - ) -> Tuple[List[ToolCatalogAction], Optional[str], int]: - """Returns (items, next_cursor, total_items).""" - ... - - @abstractmethod - async def get_action( - self, - *, - integration_key: str, + + @abstractmethod + async def list_integrations( + self, + *, + search: Optional[str] = None, + sort_by: Optional[str] = None, + limit: Optional[int] = None, + cursor: Optional[str] = None, + ) -> Tuple[List[ToolCatalogIntegration], Optional[str], int]: + """Returns (items, next_cursor, total_items).""" + ... + + @abstractmethod + async def get_integration( + self, + *, + integration_key: str, + ) -> Optional[ToolCatalogIntegration]: ... + + @abstractmethod + async def list_actions( + self, + *, + integration_key: str, + query: Optional[str] = None, + categories: Optional[List[str]] = None, + important: Optional[bool] = None, + limit: Optional[int] = None, + cursor: Optional[str] = None, + ) -> Tuple[List[ToolCatalogAction], Optional[str], int]: + """Returns (items, next_cursor, total_items).""" + ... + + @abstractmethod + async def get_action( + self, + *, + integration_key: str, action_key: str, ) -> Optional[ToolCatalogActionDetails]: ... @abstractmethod async def execute( self, - *, - request: ToolExecutionRequest, - ) -> ToolExecutionResponse: - """Execute a tool action.""" - ... + *, + request: ToolExecutionRequest, + ) -> ToolExecutionResponse: + """Execute a tool action.""" + ... diff --git a/api/oss/src/core/tools/service.py b/api/oss/src/core/tools/service.py index feb97b475c..df680b21b2 100644 --- a/api/oss/src/core/tools/service.py +++ b/api/oss/src/core/tools/service.py @@ -1,4 +1,4 @@ -from typing import Any, Dict, List, Optional, Tuple +from typing import Any, Dict, List, Optional, Tuple from uuid import UUID from oss.src.utils.logging import get_module_logger @@ -18,9 +18,9 @@ log = get_module_logger(__name__) - - -class ToolsService: + + +class ToolsService: def __init__( self, *, @@ -31,95 +31,95 @@ def __init__( self.adapter_registry = adapter_registry # ----------------------------------------------------------------------- - # Catalog browse - # ----------------------------------------------------------------------- - - async def list_providers(self) -> List[ToolCatalogProvider]: - """Return all providers across registered adapters.""" - results: List[ToolCatalogProvider] = [] - for _key, adapter in self.adapter_registry.items(): - providers = await adapter.list_providers() - results.extend(providers) - return results - - async def get_provider( - self, - *, - provider_key: str, - ) -> Optional[ToolCatalogProvider]: - """Return a single provider by key, or None if not found.""" - adapter = self.adapter_registry.get(provider_key) - providers = await adapter.list_providers() - for p in providers: - if p.key == provider_key: - return p - return None - - async def list_integrations( - self, - *, - provider_key: str, - # - search: Optional[str] = None, - sort_by: Optional[str] = None, - limit: Optional[int] = None, - cursor: Optional[str] = None, - ) -> Tuple[List[ToolCatalogIntegration], Optional[str], int]: - """List integrations for a provider with optional filtering and pagination.""" - adapter = self.adapter_registry.get(provider_key) - integrations, next_cursor, total = await adapter.list_integrations( - search=search, - sort_by=sort_by, - limit=limit, - cursor=cursor, - ) - return integrations, next_cursor, total - - async def get_integration( - self, - *, - provider_key: str, - integration_key: str, - ) -> Optional[ToolCatalogIntegration]: - """Return a single integration by key, or None if not found.""" - adapter = self.adapter_registry.get(provider_key) - return await adapter.get_integration(integration_key=integration_key) - - async def list_actions( - self, - *, - provider_key: str, - integration_key: str, - # - query: Optional[str] = None, - categories: Optional[List[str]] = None, - important: Optional[bool] = None, - limit: Optional[int] = None, - cursor: Optional[str] = None, - ) -> Tuple[List[ToolCatalogAction], Optional[str], int]: - """List actions for an integration with optional search and pagination.""" - adapter = self.adapter_registry.get(provider_key) - return await adapter.list_actions( - integration_key=integration_key, - query=query, - categories=categories, - important=important, - limit=limit, - cursor=cursor, - ) - - async def get_action( - self, - *, - provider_key: str, - integration_key: str, - action_key: str, - ) -> Optional[ToolCatalogActionDetails]: - """Return full action detail including input/output schema, or None if not found.""" - adapter = self.adapter_registry.get(provider_key) - return await adapter.get_action( - integration_key=integration_key, - action_key=action_key, + # Catalog browse + # ----------------------------------------------------------------------- + + async def list_providers(self) -> List[ToolCatalogProvider]: + """Return all providers across registered adapters.""" + results: List[ToolCatalogProvider] = [] + for _key, adapter in self.adapter_registry.items(): + providers = await adapter.list_providers() + results.extend(providers) + return results + + async def get_provider( + self, + *, + provider_key: str, + ) -> Optional[ToolCatalogProvider]: + """Return a single provider by key, or None if not found.""" + adapter = self.adapter_registry.get(provider_key) + providers = await adapter.list_providers() + for p in providers: + if p.key == provider_key: + return p + return None + + async def list_integrations( + self, + *, + provider_key: str, + # + search: Optional[str] = None, + sort_by: Optional[str] = None, + limit: Optional[int] = None, + cursor: Optional[str] = None, + ) -> Tuple[List[ToolCatalogIntegration], Optional[str], int]: + """List integrations for a provider with optional filtering and pagination.""" + adapter = self.adapter_registry.get(provider_key) + integrations, next_cursor, total = await adapter.list_integrations( + search=search, + sort_by=sort_by, + limit=limit, + cursor=cursor, + ) + return integrations, next_cursor, total + + async def get_integration( + self, + *, + provider_key: str, + integration_key: str, + ) -> Optional[ToolCatalogIntegration]: + """Return a single integration by key, or None if not found.""" + adapter = self.adapter_registry.get(provider_key) + return await adapter.get_integration(integration_key=integration_key) + + async def list_actions( + self, + *, + provider_key: str, + integration_key: str, + # + query: Optional[str] = None, + categories: Optional[List[str]] = None, + important: Optional[bool] = None, + limit: Optional[int] = None, + cursor: Optional[str] = None, + ) -> Tuple[List[ToolCatalogAction], Optional[str], int]: + """List actions for an integration with optional search and pagination.""" + adapter = self.adapter_registry.get(provider_key) + return await adapter.list_actions( + integration_key=integration_key, + query=query, + categories=categories, + important=important, + limit=limit, + cursor=cursor, + ) + + async def get_action( + self, + *, + provider_key: str, + integration_key: str, + action_key: str, + ) -> Optional[ToolCatalogActionDetails]: + """Return full action detail including input/output schema, or None if not found.""" + adapter = self.adapter_registry.get(provider_key) + return await adapter.get_action( + integration_key=integration_key, + action_key=action_key, ) # ----------------------------------------------------------------------- @@ -127,10 +127,10 @@ async def get_action( # ----------------------------------------------------------------------- async def query_connections( - self, - *, - project_id: UUID, - # + self, + *, + project_id: UUID, + # provider_key: Optional[str] = None, integration_key: Optional[str] = None, is_active: Optional[bool] = True, @@ -153,10 +153,10 @@ async def list_connections( project_id=project_id, provider_key=provider_key, integration_key=integration_key, - ) - - async def get_connection( - self, + ) + + async def get_connection( + self, *, project_id: UUID, connection_id: UUID, @@ -198,12 +198,12 @@ async def create_connection( project_id=project_id, user_id=user_id, # - connection_create=connection_create, - ) - - async def delete_connection( - self, - *, + connection_create=connection_create, + ) + + async def delete_connection( + self, + *, project_id: UUID, connection_id: UUID, ) -> bool: @@ -211,9 +211,9 @@ async def delete_connection( project_id=project_id, connection_id=connection_id, ) - - async def revoke_connection( - self, + + async def revoke_connection( + self, *, project_id: UUID, connection_id: UUID, @@ -226,7 +226,7 @@ async def revoke_connection( async def refresh_connection( self, *, - project_id: UUID, + project_id: UUID, connection_id: UUID, # force: bool = False, @@ -239,27 +239,27 @@ async def refresh_connection( # ----------------------------------------------------------------------- # Tool execution - # ----------------------------------------------------------------------- - - async def execute_tool( - self, - *, - provider_key: str, - integration_key: str, - action_key: str, - provider_connection_id: str, - user_id: Optional[str] = None, - arguments: Dict[str, Any], - ) -> ToolExecutionResponse: - """Execute a tool action using the provider adapter.""" - adapter = self.adapter_registry.get(provider_key) - - return await adapter.execute( - request=ToolExecutionRequest( - integration_key=integration_key, - action_key=action_key, - provider_connection_id=provider_connection_id, - user_id=user_id, - arguments=arguments, - ), - ) + # ----------------------------------------------------------------------- + + async def execute_tool( + self, + *, + provider_key: str, + integration_key: str, + action_key: str, + provider_connection_id: str, + user_id: Optional[str] = None, + arguments: Dict[str, Any], + ) -> ToolExecutionResponse: + """Execute a tool action using the provider adapter.""" + adapter = self.adapter_registry.get(provider_key) + + return await adapter.execute( + request=ToolExecutionRequest( + integration_key=integration_key, + action_key=action_key, + provider_connection_id=provider_connection_id, + user_id=user_id, + arguments=arguments, + ), + ) diff --git a/api/oss/src/core/triggers/__init__.py b/api/oss/src/core/triggers/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/api/oss/src/core/triggers/dtos.py b/api/oss/src/core/triggers/dtos.py new file mode 100644 index 0000000000..f724089a38 --- /dev/null +++ b/api/oss/src/core/triggers/dtos.py @@ -0,0 +1,49 @@ +from enum import Enum +from typing import Any, Dict, List, Optional + +from pydantic import BaseModel, Field + + +# --------------------------------------------------------------------------- +# Trigger Enums +# --------------------------------------------------------------------------- + + +class TriggerProviderKind(str, Enum): + COMPOSIO = "composio" + + +# --------------------------------------------------------------------------- +# Trigger Catalog +# +# The catalog leaf is an **event** (Composio "trigger type"), the analogue of a +# tools **action**. An event carries a ``trigger_config`` JSON Schema, the +# analogue of an action's ``input_parameters``. +# --------------------------------------------------------------------------- + + +class TriggerCatalogEvent(BaseModel): + key: str + # + name: str + description: Optional[str] = None + # + provider: Optional[str] = None + integration: Optional[str] = None + # + categories: List[str] = Field(default_factory=list) + logo: Optional[str] = None + + +class TriggerCatalogEventDetails(TriggerCatalogEvent): + # FROZEN (WS-PRE): the Event DTO carries the event's trigger_config JSON Schema + # — the inbound analogue of an action's input_parameters. + trigger_config: Optional[Dict[str, Any]] = None + payload: Optional[Dict[str, Any]] = None + + +class TriggerCatalogProvider(BaseModel): + key: TriggerProviderKind + # + name: str + description: Optional[str] = None diff --git a/api/oss/src/core/triggers/exceptions.py b/api/oss/src/core/triggers/exceptions.py new file mode 100644 index 0000000000..473b4094a4 --- /dev/null +++ b/api/oss/src/core/triggers/exceptions.py @@ -0,0 +1,36 @@ +from typing import Optional + + +class TriggersError(Exception): + """Base exception for the triggers domain.""" + + def __init__(self, message: str = "Triggers error"): + self.message = message + super().__init__(self.message) + + +class ProviderNotFoundError(TriggersError): + """Raised when the requested provider_key has no registered adapter.""" + + def __init__(self, provider_key: str): + self.provider_key = provider_key + super().__init__(f"Provider not found: {provider_key}") + + +class AdapterError(TriggersError): + """Raised when an adapter operation fails.""" + + def __init__( + self, + *, + provider_key: str, + operation: str, + detail: Optional[str] = None, + ): + self.provider_key = provider_key + self.operation = operation + self.detail = detail + msg = f"Adapter error ({provider_key}.{operation})" + if detail: + msg += f": {detail}" + super().__init__(msg) diff --git a/api/oss/src/core/triggers/interfaces.py b/api/oss/src/core/triggers/interfaces.py new file mode 100644 index 0000000000..2b07ca835f --- /dev/null +++ b/api/oss/src/core/triggers/interfaces.py @@ -0,0 +1,75 @@ +from abc import ABC, abstractmethod +from typing import Any, Dict, List, Optional, Tuple +from uuid import UUID + +from oss.src.core.triggers.dtos import ( + TriggerCatalogEvent, + TriggerCatalogEventDetails, + TriggerCatalogProvider, +) + + +class TriggersGatewayInterface(ABC): + """Port for external trigger providers (Composio, ...). + + FROZEN (WS-PRE) — consumed by WS3 (subscriptions) and WS5 (web catalog). + The catalog reads (``list_events``/``get_event``) back the events catalog; + the subscription verbs build/manage the provider-side trigger instance + (``ti_*``) that WP3 stores on a local subscription row. + """ + + @abstractmethod + async def list_providers(self) -> List[TriggerCatalogProvider]: ... + + @abstractmethod + async def list_events( + self, + *, + integration_key: str, + query: Optional[str] = None, + limit: Optional[int] = None, + cursor: Optional[str] = None, + ) -> Tuple[List[TriggerCatalogEvent], Optional[str], int]: + """Returns (items, next_cursor, total_items).""" + ... + + @abstractmethod + async def get_event( + self, + *, + integration_key: str, + event_key: str, + ) -> Optional[TriggerCatalogEventDetails]: + """Return one event's detail, carrying its trigger_config JSON Schema.""" + ... + + @abstractmethod + async def create_subscription( + self, + *, + project_id: UUID, + event_key: str, + connected_account_id: str, + trigger_config: Dict[str, Any], + ) -> str: + """Create the provider-side trigger instance; returns its id (``ti_*``).""" + ... + + @abstractmethod + async def set_subscription_status( + self, + *, + trigger_id: str, + enabled: bool, + ) -> None: + """Enable or disable the provider-side trigger instance.""" + ... + + @abstractmethod + async def delete_subscription( + self, + *, + trigger_id: str, + ) -> None: + """Permanently delete the provider-side trigger instance.""" + ... diff --git a/api/oss/src/core/triggers/providers/__init__.py b/api/oss/src/core/triggers/providers/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/api/oss/src/core/triggers/providers/composio/__init__.py b/api/oss/src/core/triggers/providers/composio/__init__.py new file mode 100644 index 0000000000..9841fc07c1 --- /dev/null +++ b/api/oss/src/core/triggers/providers/composio/__init__.py @@ -0,0 +1,18 @@ +# Avoid importing adapter here to prevent SDK dependency issues in standalone scripts. +# Import directly when needed: +# from oss.src.core.triggers.providers.composio.adapter import ComposioTriggersAdapter + +__all__ = [ + "ComposioTriggersAdapter", +] + + +def __getattr__(name): + """Lazy import to avoid SDK dependency on module import.""" + if name == "ComposioTriggersAdapter": + from oss.src.core.triggers.providers.composio.adapter import ( + ComposioTriggersAdapter, + ) + + return ComposioTriggersAdapter + raise AttributeError(f"module {__name__!r} has no attribute {name!r}") diff --git a/api/oss/src/core/triggers/providers/composio/adapter.py b/api/oss/src/core/triggers/providers/composio/adapter.py new file mode 100644 index 0000000000..20fd9dd212 --- /dev/null +++ b/api/oss/src/core/triggers/providers/composio/adapter.py @@ -0,0 +1,187 @@ +from typing import Any, Dict, List, Optional +from uuid import UUID + +import httpx + +from oss.src.utils.logging import get_module_logger + +from oss.src.core.triggers.dtos import ( + TriggerCatalogProvider, + TriggerProviderKind, +) +from oss.src.core.triggers.interfaces import TriggersGatewayInterface +from oss.src.core.triggers.exceptions import AdapterError +from oss.src.core.triggers.providers.composio.catalog import ( + ComposioTriggersCatalogClient, +) + + +log = get_module_logger(__name__) + +COMPOSIO_DEFAULT_API_URL = "https://backend.composio.dev/api/v3" + + +class ComposioTriggersAdapter(ComposioTriggersCatalogClient, TriggersGatewayInterface): + """Composio V3 triggers adapter — uses httpx directly (no SDK). + + Modeled on ``ComposioToolsAdapter``: own httpx client, ``_get/_post/_delete`` + helpers, slug passthrough. Catalog operations (list/get events) come from + ``ComposioTriggersCatalogClient``; subscription (trigger-instance) management + is implemented here and consumed by WP3. + + REST paths (E5 — verified vs the live Composio API reference): + list events GET /triggers_types?toolkit_slugs={i} + get event GET /triggers_types/{slug} + create/upsert POST /trigger_instances/{slug}/upsert + enable/disable PATCH /trigger_instances/manage/{trigger_id} + delete DELETE /trigger_instances/manage/{trigger_id} + """ + + def __init__( + self, + *, + api_key: str, + api_url: str = COMPOSIO_DEFAULT_API_URL, + ): + self.api_key = api_key + self.api_url = api_url.rstrip("/") + # Shared client — one connection pool for the adapter's lifetime. + # Call close() on shutdown (wired in entrypoints/routers.py lifespan). + self._client = httpx.AsyncClient(timeout=30.0) + + async def close(self) -> None: + """Close the shared HTTP client and release connection pool resources.""" + await self._client.aclose() + + def _headers(self) -> Dict[str, str]: + return { + "x-api-key": self.api_key, + "Content-Type": "application/json", + } + + async def _post( + self, + path: str, + *, + json: Optional[Dict[str, Any]] = None, + ) -> Any: + resp = await self._client.post( + f"{self.api_url}{path}", + headers=self._headers(), + json=json or {}, + ) + if not resp.is_success: + log.error("Composio POST %s → %s: %s", path, resp.status_code, resp.text) + resp.raise_for_status() + return resp.json() + + async def _patch( + self, + path: str, + *, + json: Optional[Dict[str, Any]] = None, + ) -> Any: + resp = await self._client.patch( + f"{self.api_url}{path}", + headers=self._headers(), + json=json or {}, + ) + if not resp.is_success: + log.error("Composio PATCH %s → %s: %s", path, resp.status_code, resp.text) + resp.raise_for_status() + return resp.json() + + async def _delete(self, path: str) -> bool: + resp = await self._client.delete( + f"{self.api_url}{path}", + headers=self._headers(), + ) + resp.raise_for_status() + return True + + # ----------------------------------------------------------------------- + # Catalog — provider listing + # ----------------------------------------------------------------------- + + async def list_providers(self) -> List[TriggerCatalogProvider]: + return [ + TriggerCatalogProvider( + key=TriggerProviderKind.COMPOSIO, + name="Composio", + description="Third-party event triggers via Composio", + ) + ] + + # list_events and get_event are inherited from ComposioTriggersCatalogClient + # and satisfy the TriggersGatewayInterface catalog contract. + + # ----------------------------------------------------------------------- + # Subscriptions (provider-side trigger instances — ti_*) — consumed by WP3 + # ----------------------------------------------------------------------- + + async def create_subscription( + self, + *, + project_id: UUID, + event_key: str, + connected_account_id: str, + trigger_config: Dict[str, Any], + ) -> str: + """Create/upsert the provider-side trigger instance; return its id (ti_*).""" + payload: Dict[str, Any] = { + "connected_account_id": connected_account_id, + "trigger_config": trigger_config or {}, + } + try: + result = await self._post( + f"/trigger_instances/{event_key}/upsert", + json=payload, + ) + except httpx.HTTPError as e: + raise AdapterError( + provider_key="composio", + operation="create_subscription", + detail=str(e), + ) from e + + trigger_id = result.get("trigger_id") or result.get("id") + if not trigger_id: + raise AdapterError( + provider_key="composio", + operation="create_subscription", + detail=f"No trigger_id in upsert response for event '{event_key}'", + ) + return trigger_id + + async def set_subscription_status( + self, + *, + trigger_id: str, + enabled: bool, + ) -> None: + status = "enable" if enabled else "disable" + try: + await self._patch( + f"/trigger_instances/manage/{trigger_id}", + json={"status": status}, + ) + except httpx.HTTPError as e: + raise AdapterError( + provider_key="composio", + operation="set_subscription_status", + detail=str(e), + ) from e + + async def delete_subscription( + self, + *, + trigger_id: str, + ) -> None: + try: + await self._delete(f"/trigger_instances/manage/{trigger_id}") + except httpx.HTTPError as e: + raise AdapterError( + provider_key="composio", + operation="delete_subscription", + detail=str(e), + ) from e diff --git a/api/oss/src/core/triggers/providers/composio/catalog.py b/api/oss/src/core/triggers/providers/composio/catalog.py new file mode 100644 index 0000000000..f773fab8ec --- /dev/null +++ b/api/oss/src/core/triggers/providers/composio/catalog.py @@ -0,0 +1,188 @@ +"""Composio triggers catalog operations — mixin for ComposioTriggersAdapter. + +Provides catalog HTTP calls (list events, get one event) backed by +``self._client``, ``self.api_key``, and ``self.api_url`` which must be supplied +by the concrete subclass (ComposioTriggersAdapter). + +Mirrors ``core/tools/providers/composio/catalog.py`` with ``action → event``: +the tools "action" leaf becomes the triggers "event" leaf (a Composio *trigger +type*), and an action's ``input_parameters`` schema becomes an event's +``trigger_config`` schema. The ``cursor`` value is Composio's native +``next_cursor`` string, passed through as-is. +""" + +from typing import Any, Dict, List, Optional, Tuple + +import httpx + +from oss.src.utils.logging import get_module_logger +from oss.src.core.triggers.dtos import ( + TriggerCatalogEvent, + TriggerCatalogEventDetails, +) +from oss.src.core.triggers.exceptions import AdapterError + + +log = get_module_logger(__name__) + +DEFAULT_PAGE_SIZE = 20 +MAX_PAGE_SIZE = 1000 + + +class ComposioTriggersCatalogClient: + """Catalog mixin for ComposioTriggersAdapter — cursor-based pagination. + + Subclass must set ``self.api_key``, ``self.api_url``, and ``self._client`` + (an ``httpx.AsyncClient``) before calling any method. + """ + + # Annotated for type-checkers; filled in by ComposioTriggersAdapter.__init__ + api_key: str + api_url: str + _client: httpx.AsyncClient + + async def list_events( + self, + *, + integration_key: str, + query: Optional[str] = None, + limit: Optional[int] = None, + cursor: Optional[str] = None, + ) -> Tuple[List[TriggerCatalogEvent], Optional[str], int]: + """Fetch one page of events (Composio trigger types) for an integration. + + E5 (verified vs live Composio API reference): GET /triggers_types, + filtered by ``toolkit_slugs``. + """ + page_limit = min(limit, MAX_PAGE_SIZE) if limit else DEFAULT_PAGE_SIZE + + params: Dict[str, Any] = { + "toolkit_slugs": integration_key, + "limit": page_limit, + } + if query: + params["query"] = query + if cursor: + params["cursor"] = cursor + + try: + resp = await self._client.get( + f"{self.api_url}/triggers_types", + headers={"x-api-key": self.api_key, "Content-Type": "application/json"}, + params=params, + timeout=30.0, + ) + resp.raise_for_status() + data = resp.json() + except httpx.HTTPError as e: + raise AdapterError( + provider_key="composio", + operation="list_events", + detail=str(e), + ) from e + + items_raw: List[Dict[str, Any]] = ( + data.get("items", []) if isinstance(data, dict) else data + ) + next_cursor: Optional[str] = ( + data.get("next_cursor") if isinstance(data, dict) else None + ) + total_items: int = ( + data.get("total_items", len(items_raw)) + if isinstance(data, dict) + else len(items_raw) + ) + + items = [_parse_event(item, integration_key) for item in items_raw] + + log.debug( + "[composio] list_events(%s) cursor=%s items=%d total=%d next=%s", + integration_key, + cursor, + len(items), + total_items, + next_cursor, + ) + + return items, next_cursor, total_items + + async def get_event( + self, + *, + integration_key: str, + event_key: str, + ) -> Optional[TriggerCatalogEventDetails]: + """Fetch one event (trigger type) by slug, with its trigger_config schema. + + E5 (verified vs live Composio API reference): GET /triggers_types/{slug}. + Returns None when the event does not exist (404). + """ + try: + resp = await self._client.get( + f"{self.api_url}/triggers_types/{event_key}", + headers={"x-api-key": self.api_key, "Content-Type": "application/json"}, + timeout=15.0, + ) + if resp.status_code == 404: + return None + resp.raise_for_status() + except httpx.HTTPStatusError as e: + if e.response.status_code == 404: + return None + raise AdapterError( + provider_key="composio", + operation="get_event", + detail=str(e), + ) from e + except httpx.HTTPError as e: + raise AdapterError( + provider_key="composio", + operation="get_event", + detail=str(e), + ) from e + + return _parse_event_detail(resp.json(), integration_key) + + +# --------------------------------------------------------------------------- +# Parsers (module-level — no instance state needed) +# --------------------------------------------------------------------------- + + +def _toolkit_slug(item: Dict[str, Any], fallback: str) -> str: + toolkit = item.get("toolkit") + if isinstance(toolkit, dict): + return toolkit.get("slug") or toolkit.get("name") or fallback + if isinstance(toolkit, str): + return toolkit + return fallback + + +def _parse_event(item: Dict[str, Any], integration_key: str) -> TriggerCatalogEvent: + return TriggerCatalogEvent( + key=item.get("slug", ""), + name=item.get("name", ""), + description=item.get("description"), + provider="composio", + integration=_toolkit_slug(item, integration_key), + ) + + +def _parse_event_detail( + item: Dict[str, Any], + integration_key: str, +) -> TriggerCatalogEventDetails: + # The event's required config is the JSON Schema under "config" — the inbound + # analogue of an action's "input_parameters". + trigger_config = item.get("config") or item.get("trigger_config") + payload = item.get("payload") + + return TriggerCatalogEventDetails( + key=item.get("slug", ""), + name=item.get("name", ""), + description=item.get("description"), + provider="composio", + integration=_toolkit_slug(item, integration_key), + trigger_config=trigger_config, + payload=payload, + ) diff --git a/api/oss/src/core/triggers/registry.py b/api/oss/src/core/triggers/registry.py new file mode 100644 index 0000000000..4e641f6202 --- /dev/null +++ b/api/oss/src/core/triggers/registry.py @@ -0,0 +1,27 @@ +from typing import Dict, ItemsView + +from oss.src.core.triggers.interfaces import TriggersGatewayInterface +from oss.src.core.triggers.exceptions import ProviderNotFoundError + + +class TriggersGatewayRegistry: + """Dispatches to the correct adapter based on provider_key.""" + + def __init__( + self, + *, + adapters: Dict[str, TriggersGatewayInterface], + ): + self._adapters = adapters + + def get(self, provider_key: str) -> TriggersGatewayInterface: + adapter = self._adapters.get(provider_key) + if not adapter: + raise ProviderNotFoundError(provider_key) + return adapter + + def keys(self) -> list[str]: + return list(self._adapters.keys()) + + def items(self) -> ItemsView[str, TriggersGatewayInterface]: + return self._adapters.items() diff --git a/api/oss/src/core/triggers/service.py b/api/oss/src/core/triggers/service.py new file mode 100644 index 0000000000..1056144ea4 --- /dev/null +++ b/api/oss/src/core/triggers/service.py @@ -0,0 +1,90 @@ +from typing import List, Optional, Tuple + +from oss.src.utils.logging import get_module_logger + +from oss.src.core.triggers.dtos import ( + TriggerCatalogEvent, + TriggerCatalogEventDetails, + TriggerCatalogProvider, +) +from oss.src.core.triggers.registry import TriggersGatewayRegistry + + +log = get_module_logger(__name__) + + +class TriggersService: + """Triggers domain orchestration. + + WP1 scope is the read-only events catalog. Subscriptions/deliveries CRUD and + ingress/dispatch land in later WPs (WP3/WP4) and will extend this service. + """ + + def __init__( + self, + *, + adapter_registry: TriggersGatewayRegistry, + ): + self.adapter_registry = adapter_registry + + # ----------------------------------------------------------------------- + # Catalog browse + # ----------------------------------------------------------------------- + + async def list_providers(self) -> List[TriggerCatalogProvider]: + """Return all providers across registered adapters.""" + results: List[TriggerCatalogProvider] = [] + for _key, adapter in self.adapter_registry.items(): + providers = await adapter.list_providers() + results.extend(providers) + return results + + async def get_provider( + self, + *, + provider_key: str, + ) -> Optional[TriggerCatalogProvider]: + """Return a single provider by key. + + Raises ``ProviderNotFoundError`` for an unregistered key (mapped to 404 + at the router); returns None when the adapter has no matching provider. + """ + adapter = self.adapter_registry.get(provider_key) + providers = await adapter.list_providers() + for p in providers: + if p.key == provider_key: + return p + return None + + async def list_events( + self, + *, + provider_key: str, + integration_key: str, + # + query: Optional[str] = None, + limit: Optional[int] = None, + cursor: Optional[str] = None, + ) -> Tuple[List[TriggerCatalogEvent], Optional[str], int]: + """List events for an integration with optional search and pagination.""" + adapter = self.adapter_registry.get(provider_key) + return await adapter.list_events( + integration_key=integration_key, + query=query, + limit=limit, + cursor=cursor, + ) + + async def get_event( + self, + *, + provider_key: str, + integration_key: str, + event_key: str, + ) -> Optional[TriggerCatalogEventDetails]: + """Return full event detail including its trigger_config schema, or None.""" + adapter = self.adapter_registry.get(provider_key) + return await adapter.get_event( + integration_key=integration_key, + event_key=event_key, + ) diff --git a/api/oss/src/core/webhooks/delivery.py b/api/oss/src/core/webhooks/delivery.py index 280c3e1a8b..9ca44f3e87 100644 --- a/api/oss/src/core/webhooks/delivery.py +++ b/api/oss/src/core/webhooks/delivery.py @@ -8,7 +8,7 @@ import httpx -from agenta.sdk.utils.resolvers import resolve_json_selector +from agenta.sdk.utils.resolvers import resolve_target_fields from oss.src.core.webhooks.types import ( EVENT_CONTEXT_FIELDS, @@ -23,8 +23,6 @@ log = get_module_logger(__name__) -MAX_RESOLVE_DEPTH = 10 - NON_OVERRIDABLE_HEADERS = { "content-type", "content-length", @@ -92,29 +90,6 @@ def _merge_headers( return merged -def resolve_payload_fields( - fields: Any, - context: Dict[str, Any], - *, - _depth: int = 0, -) -> Any: - if _depth > MAX_RESOLVE_DEPTH: - return None - if isinstance(fields, dict): - return { - k: resolve_payload_fields(v, context, _depth=_depth + 1) - for k, v in fields.items() - } - if isinstance(fields, list): - return [ - resolve_payload_fields(item, context, _depth=_depth + 1) for item in fields - ] - try: - return resolve_json_selector(fields, context) - except Exception: - return None - - def prepare_webhook_request( *, project_id: UUID, @@ -147,7 +122,7 @@ def prepare_webhook_request( } resolved_fields = payload_fields if payload_fields is not None else "$" - payload = resolve_payload_fields(resolved_fields, context) + payload = resolve_target_fields(resolved_fields, context) base_data = WebhookDeliveryData( event_type=typed_event_type, diff --git a/api/oss/src/dbs/postgres/triggers/__init__.py b/api/oss/src/dbs/postgres/triggers/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/api/oss/tests/pytest/acceptance/triggers/__init__.py b/api/oss/tests/pytest/acceptance/triggers/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/api/oss/tests/pytest/acceptance/triggers/test_triggers_catalog.py b/api/oss/tests/pytest/acceptance/triggers/test_triggers_catalog.py new file mode 100644 index 0000000000..bdfa94897c --- /dev/null +++ b/api/oss/tests/pytest/acceptance/triggers/test_triggers_catalog.py @@ -0,0 +1,78 @@ +"""Acceptance tests for GET /triggers/catalog/* endpoints (events catalog). + +The provider-catalog endpoints are reachable without any external API key: an +empty catalog is a valid response (no Composio adapter is registered when +``env.composio`` is unset). The event-browse / config-schema fetch make real +Composio calls, so those tests are gated on COMPOSIO_API_KEY being present in +the runner's environment (the same env the API reads). +""" + +import os + +import pytest + + +_COMPOSIO_ENABLED = bool(os.getenv("COMPOSIO_API_KEY")) +_requires_composio = pytest.mark.skipif( + not _COMPOSIO_ENABLED, + reason="needs live Composio credentials (COMPOSIO_API_KEY)", +) + + +class TestTriggersCatalogProviders: + def test_list_providers_returns_200(self, authed_api): + response = authed_api("GET", "/triggers/catalog/providers/") + assert response.status_code == 200 + + def test_list_providers_response_shape(self, authed_api): + body = authed_api("GET", "/triggers/catalog/providers/").json() + assert "count" in body + assert "providers" in body + assert isinstance(body["providers"], list) + + def test_list_providers_count_matches_list(self, authed_api): + body = authed_api("GET", "/triggers/catalog/providers/").json() + assert body["count"] == len(body["providers"]) + + @pytest.mark.skipif( + _COMPOSIO_ENABLED, + reason="catalog is non-empty when Composio is enabled", + ) + def test_list_providers_empty_when_composio_disabled(self, authed_api): + """With env.composio unset, no adapter is registered → empty catalog.""" + body = authed_api("GET", "/triggers/catalog/providers/").json() + assert body["count"] == 0 + assert body["providers"] == [] + + +@_requires_composio +class TestTriggersCatalogEvents: + def test_browse_events_returns_200(self, authed_api): + response = authed_api( + "GET", + "/triggers/catalog/providers/composio/integrations/github/events/", + ) + assert response.status_code == 200 + body = response.json() + assert "events" in body + assert isinstance(body["events"], list) + + def test_fetch_event_config_schema(self, authed_api): + """A single event carries its trigger_config JSON Schema.""" + listing = authed_api( + "GET", + "/triggers/catalog/providers/composio/integrations/github/events/", + ).json() + if not listing["events"]: + pytest.skip("no github events available from Composio") + + event_key = listing["events"][0]["key"] + response = authed_api( + "GET", + f"/triggers/catalog/providers/composio/integrations/github/events/{event_key}", + ) + assert response.status_code == 200 + event = response.json()["event"] + assert event["key"] == event_key + # trigger_config is the inbound analogue of an action's input_parameters + assert "trigger_config" in event diff --git a/api/oss/tests/pytest/unit/webhooks/test_webhooks_tasks.py b/api/oss/tests/pytest/unit/webhooks/test_webhooks_tasks.py index 1ca605df49..c479f6afdb 100644 --- a/api/oss/tests/pytest/unit/webhooks/test_webhooks_tasks.py +++ b/api/oss/tests/pytest/unit/webhooks/test_webhooks_tasks.py @@ -5,11 +5,14 @@ from unittest.mock import patch -from oss.src.core.webhooks.delivery import ( +from agenta.sdk.utils.resolvers import ( MAX_RESOLVE_DEPTH, + resolve_target_fields, +) + +from oss.src.core.webhooks.delivery import ( NON_OVERRIDABLE_HEADERS, _merge_headers, - resolve_payload_fields, ) from oss.src.core.webhooks.types import ( EVENT_CONTEXT_FIELDS, @@ -35,18 +38,18 @@ "scope": {"project_id": "proj-1"}, } -_RESOLVE_PATH = "oss.src.core.webhooks.delivery.resolve_json_selector" +_RESOLVE_PATH = "agenta.sdk.utils.resolvers.resolve_json_selector" # --------------------------------------------------------------------------- -# resolve_payload_fields +# resolve_target_fields # --------------------------------------------------------------------------- -class TestResolvePayloadFields: +class TestResolveTargetFields: def test_dict_recurses_into_values(self): with patch(_RESOLVE_PATH, side_effect=lambda expr, ctx: f"resolved:{expr}"): - result = resolve_payload_fields( + result = resolve_target_fields( {"key": "$.event.event_id"}, _MOCK_CONTEXT, ) @@ -54,7 +57,7 @@ def test_dict_recurses_into_values(self): def test_list_recurses_into_items(self): with patch(_RESOLVE_PATH, side_effect=lambda expr, ctx: f"resolved:{expr}"): - result = resolve_payload_fields( + result = resolve_target_fields( ["$.event.event_id", "$.scope.project_id"], _MOCK_CONTEXT, ) @@ -65,12 +68,12 @@ def test_list_recurses_into_items(self): def test_primitive_delegates_to_resolve_json_selector(self): with patch(_RESOLVE_PATH, return_value="abc123") as mock_resolve: - result = resolve_payload_fields("$.event.event_id", _MOCK_CONTEXT) + result = resolve_target_fields("$.event.event_id", _MOCK_CONTEXT) assert result == "abc123" mock_resolve.assert_called_once_with("$.event.event_id", _MOCK_CONTEXT) def test_depth_exceeds_limit_returns_none(self): - result = resolve_payload_fields( + result = resolve_target_fields( "$.event.event_id", _MOCK_CONTEXT, _depth=MAX_RESOLVE_DEPTH + 1, @@ -79,7 +82,7 @@ def test_depth_exceeds_limit_returns_none(self): def test_depth_at_limit_still_resolves(self): with patch(_RESOLVE_PATH, return_value="ok"): - result = resolve_payload_fields( + result = resolve_target_fields( "$.event.event_id", _MOCK_CONTEXT, _depth=MAX_RESOLVE_DEPTH, @@ -88,7 +91,7 @@ def test_depth_at_limit_still_resolves(self): def test_resolve_error_returns_none(self): with patch(_RESOLVE_PATH, side_effect=ValueError("bad selector")): - result = resolve_payload_fields("$.bad[", _MOCK_CONTEXT) + result = resolve_target_fields("$.bad[", _MOCK_CONTEXT) assert result is None def test_error_leaf_in_dict_does_not_affect_other_keys(self): @@ -98,7 +101,7 @@ def side_effect(expr, ctx): return "good" with patch(_RESOLVE_PATH, side_effect=side_effect): - result = resolve_payload_fields( + result = resolve_target_fields( {"ok": "$.event.event_id", "bad": "$.bad["}, _MOCK_CONTEXT, ) @@ -106,14 +109,14 @@ def side_effect(expr, ctx): def test_dollar_selector_resolves_full_context(self): with patch(_RESOLVE_PATH, return_value=_MOCK_CONTEXT) as mock_resolve: - result = resolve_payload_fields("$", _MOCK_CONTEXT) + result = resolve_target_fields("$", _MOCK_CONTEXT) assert result == _MOCK_CONTEXT mock_resolve.assert_called_once_with("$", _MOCK_CONTEXT) def test_nested_dict_depth_tracking(self): # Three levels deep should still work (depth starts at 0) with patch(_RESOLVE_PATH, return_value="leaf"): - result = resolve_payload_fields( + result = resolve_target_fields( {"a": {"b": {"c": "$.event.event_id"}}}, _MOCK_CONTEXT, ) diff --git a/sdks/python/agenta/sdk/utils/resolvers.py b/sdks/python/agenta/sdk/utils/resolvers.py index b7b51ed5c4..a512a27489 100644 --- a/sdks/python/agenta/sdk/utils/resolvers.py +++ b/sdks/python/agenta/sdk/utils/resolvers.py @@ -12,6 +12,8 @@ log = get_module_logger(__name__) +MAX_RESOLVE_DEPTH = 10 + # ========= Scheme detection ========= @@ -132,3 +134,33 @@ def resolve_json_selector(value: Any, data: Dict[str, Any]) -> Any: log.debug("Failed to resolve JSON selector %r: %s", value, exc) return None return value + + +def resolve_target_fields( + template: Any, + context: Dict[str, Any], + *, + _depth: int = 0, +) -> Any: + """Resolve a template into a target by resolving its selector leaves. + + Walks ``template`` (arbitrary JSON); each leaf is passed through + ``resolve_json_selector`` against *context* (``$``/``/`` selectors resolved, + everything else returned literally). Null-on-miss, depth-capped at + ``MAX_RESOLVE_DEPTH``. + """ + if _depth > MAX_RESOLVE_DEPTH: + return None + if isinstance(template, dict): + return { + k: resolve_target_fields(v, context, _depth=_depth + 1) + for k, v in template.items() + } + if isinstance(template, list): + return [ + resolve_target_fields(item, context, _depth=_depth + 1) for item in template + ] + try: + return resolve_json_selector(template, context) + except Exception: + return None diff --git a/sdks/python/oss/tests/pytest/unit/test_resolvers.py b/sdks/python/oss/tests/pytest/unit/test_resolvers.py new file mode 100644 index 0000000000..281b8ef013 --- /dev/null +++ b/sdks/python/oss/tests/pytest/unit/test_resolvers.py @@ -0,0 +1,125 @@ +"""Unit tests for the shared selector-resolution helpers. + +Pure logic, no network or database. These live in ``agenta.sdk.utils.resolvers`` +so API-side code (webhook delivery, trigger dispatch) can reuse them; this suite +gives the SDK home its own coverage instead of relying on the api-side callers. +""" + +from agenta.sdk.utils.resolvers import ( + MAX_RESOLVE_DEPTH, + detect_scheme, + resolve_dot_notation, + resolve_json_selector, + resolve_target_fields, +) + +_CONTEXT = { + "event": { + "data": {"issue": {"number": 7}}, + "type": "github.issue.opened", + "timestamp": "2024-01-01T00:00:00Z", + }, + "subscription": {"id": "sub-1", "name": "watch"}, + "scope": {"project_id": "proj-1"}, +} + + +class TestDetectScheme: + def test_json_path(self): + assert detect_scheme("$.event.type") == "json-path" + + def test_json_pointer(self): + assert detect_scheme("/event/type") == "json-pointer" + + def test_dot_notation(self): + assert detect_scheme("event.type") == "dot-notation" + + +class TestResolveJsonSelector: + def test_json_path_leaf(self): + assert resolve_json_selector("$.event.type", _CONTEXT) == "github.issue.opened" + + def test_json_pointer_leaf(self): + assert resolve_json_selector("/scope/project_id", _CONTEXT) == "proj-1" + + def test_nested_path(self): + assert resolve_json_selector("$.event.data.issue.number", _CONTEXT) == 7 + + def test_plain_string_returned_literally(self): + assert resolve_json_selector("just a string", _CONTEXT) == "just a string" + + def test_non_string_returned_literally(self): + assert resolve_json_selector(42, _CONTEXT) == 42 + + def test_missing_path_returns_none(self): + assert resolve_json_selector("$.event.nope", _CONTEXT) is None + + def test_malformed_path_returns_none(self): + assert resolve_json_selector("$.bad[", _CONTEXT) is None + + +class TestResolveDotNotation: + def test_literal_key_with_dots(self): + assert resolve_dot_notation("a.b", {"a.b": "literal"}) == "literal" + + def test_nested_traversal(self): + assert resolve_dot_notation("a.b", {"a": {"b": "nested"}}) == "nested" + + def test_empty_expr_raises_keyerror(self): + try: + resolve_dot_notation("", {}) + assert False, "expected KeyError" + except KeyError: + pass + + def test_bracket_syntax_raises_valueerror(self): + try: + resolve_dot_notation("a[0]", {"a": [1]}) + assert False, "expected ValueError" + except ValueError: + pass + + +class TestResolveTargetFields: + def test_whole_context_passthrough(self): + assert resolve_target_fields("$", _CONTEXT) == _CONTEXT + + def test_dict_template_resolves_each_leaf(self): + template = {"number": "$.event.data.issue.number", "kind": "$.event.type"} + assert resolve_target_fields(template, _CONTEXT) == { + "number": 7, + "kind": "github.issue.opened", + } + + def test_list_template_resolves_each_item(self): + assert resolve_target_fields(["$.scope.project_id", "literal"], _CONTEXT) == [ + "proj-1", + "literal", + ] + + def test_nested_structure(self): + template = {"outer": {"inner": ["$.subscription.id"]}} + assert resolve_target_fields(template, _CONTEXT) == { + "outer": {"inner": ["sub-1"]} + } + + def test_missing_leaf_becomes_none_without_dropping_siblings(self): + template = {"ok": "$.event.type", "miss": "$.event.nope"} + assert resolve_target_fields(template, _CONTEXT) == { + "ok": "github.issue.opened", + "miss": None, + } + + def test_depth_over_limit_returns_none(self): + assert ( + resolve_target_fields( + "$.event.type", _CONTEXT, _depth=MAX_RESOLVE_DEPTH + 1 + ) + is None + ) + + def test_depth_at_limit_still_resolves(self): + assert ( + resolve_target_fields("$.event.type", _CONTEXT, _depth=MAX_RESOLVE_DEPTH) + == "github.issue.opened" + )