diff --git a/api/ee/tests/pytest/acceptance/triggers/test_triggers_subscriptions.py b/api/ee/tests/pytest/acceptance/triggers/test_triggers_subscriptions.py new file mode 100644 index 0000000000..d68b42acaa --- /dev/null +++ b/api/ee/tests/pytest/acceptance/triggers/test_triggers_subscriptions.py @@ -0,0 +1,226 @@ +"""EE acceptance tests for /triggers/subscriptions/* and /triggers/deliveries/*. + +Mirrors the OSS suite but exercises the routes as a business-plan, +developer-role account. Subscription CRUD is gated on EDIT_TRIGGERS and reads on +VIEW_TRIGGERS; a developer role carries both, so this verifies the routes behave +once the gate is satisfied. + +The read/query surfaces are DB-only (no Composio needed). The full create -> +list -> disable -> delete roundtrip, including the C7 invariant (deleting a +subscription leaves the shared connection intact), mints a provider-side trigger +instance and is gated on COMPOSIO_API_KEY. + +Requires a running API. +""" + +import os +from uuid import uuid4 + +import pytest +import requests + +from utils.constants import BASE_TIMEOUT + + +_COMPOSIO_ENABLED = bool(os.getenv("COMPOSIO_API_KEY")) +_requires_composio = pytest.mark.skipif( + not _COMPOSIO_ENABLED, + reason="needs live Composio credentials (COMPOSIO_API_KEY)", +) + + +def _create_developer_business_account(admin_api): + uid = uuid4().hex[:12] + email = f"triggers-sub-dev-{uid}@test.agenta.ai" + resp = admin_api( + "POST", + "/admin/simple/accounts/", + json={ + "accounts": { + "u": { + "user": {"email": email}, + "options": { + "create_api_keys": True, + "return_api_keys": True, + "seed_defaults": False, + }, + "subscription": {"plan": "cloud_v0_business"}, + "organization_memberships": [ + { + "organization_ref": {"ref": "org"}, + "user_ref": {"ref": "user"}, + "role": "developer", + } + ], + "workspace_memberships": [ + { + "workspace_ref": {"ref": "wrk"}, + "user_ref": {"ref": "user"}, + "role": "developer", + } + ], + "project_memberships": [ + { + "project_ref": {"ref": "prj"}, + "user_ref": {"ref": "user"}, + "role": "developer", + } + ], + } + } + }, + ) + assert resp.status_code == 200, resp.text + account = resp.json()["accounts"]["u"] + return { + "email": email, + "credentials": f"ApiKey {account['api_keys']['key']}", + } + + +def _delete_account_by_email(admin_api, *, email): + resp = admin_api( + "DELETE", + "/admin/simple/accounts/", + json={"accounts": {"u": {"user": {"email": email}}}, "confirm": "delete"}, + ) + assert resp.status_code == 204, resp.text + + +@pytest.fixture(scope="class") +def triggers_api(admin_api, ag_env): + account = _create_developer_business_account(admin_api) + + def _request(method: str, endpoint: str, **kwargs): + headers = kwargs.pop("headers", {}) + headers.setdefault("Authorization", account["credentials"]) + return requests.request( + method=method, + url=f"{ag_env['api_url']}{endpoint}", + headers=headers, + timeout=BASE_TIMEOUT, + **kwargs, + ) + + yield _request + + _delete_account_by_email(admin_api, email=account["email"]) + + +# --------------------------------------------------------------------------- +# DB-only: reads, queries, 404s +# --------------------------------------------------------------------------- + + +class TestTriggerSubscriptionsReads: + def test_list_subscriptions_returns_200_empty(self, triggers_api): + response = triggers_api("GET", "/triggers/subscriptions/") + assert response.status_code == 200 + body = response.json() + assert "count" in body + assert isinstance(body["subscriptions"], list) + assert body["count"] == len(body["subscriptions"]) + + def test_query_subscriptions_returns_200(self, triggers_api): + response = triggers_api("POST", "/triggers/subscriptions/query", json={}) + assert response.status_code == 200 + body = response.json() + assert body["count"] == len(body["subscriptions"]) + + def test_fetch_unknown_subscription_returns_404(self, triggers_api): + response = triggers_api("GET", f"/triggers/subscriptions/{uuid4()}") + assert response.status_code == 404 + + def test_delete_unknown_subscription_returns_404(self, triggers_api): + response = triggers_api("DELETE", f"/triggers/subscriptions/{uuid4()}") + assert response.status_code == 404 + + +class TestTriggerDeliveriesReads: + def test_list_deliveries_returns_200_empty(self, triggers_api): + response = triggers_api("GET", "/triggers/deliveries") + assert response.status_code == 200 + body = response.json() + assert isinstance(body["deliveries"], list) + assert body["count"] == len(body["deliveries"]) + + def test_query_deliveries_returns_200(self, triggers_api): + response = triggers_api("POST", "/triggers/deliveries/query", json={}) + assert response.status_code == 200 + body = response.json() + assert body["count"] == len(body["deliveries"]) + + def test_fetch_unknown_delivery_returns_404(self, triggers_api): + response = triggers_api("GET", f"/triggers/deliveries/{uuid4()}") + assert response.status_code == 404 + + +# --------------------------------------------------------------------------- +# Full lifecycle (needs Composio) — C7 invariant included +# --------------------------------------------------------------------------- + + +@_requires_composio +class TestTriggerSubscriptionsLifecycle: + def _create_connection(self, triggers_api): + slug = f"acc-{uuid4().hex[:8]}" + create = triggers_api( + "POST", + "/tools/connections/", + json={ + "connection": { + "slug": slug, + "provider_key": "composio", + "integration_key": "github", + "data": {"auth_scheme": "oauth"}, + } + }, + ) + assert create.status_code == 200, create.text + return create.json()["connection"]["id"] + + def test_create_list_disable_delete_keeps_connection(self, triggers_api): + connection_id = self._create_connection(triggers_api) + + create = triggers_api( + "POST", + "/triggers/subscriptions/", + json={ + "subscription": { + "name": f"sub-{uuid4().hex[:8]}", + "connection_id": connection_id, + "data": { + "event_key": "GITHUB_STAR_ADDED_EVENT", + "trigger_config": {}, + "inputs_fields": {"repo": "$.event.data.repository"}, + "references": {"workflow": {"slug": "triage"}}, + }, + } + }, + ) + assert create.status_code == 200, create.text + sub = create.json()["subscription"] + subscription_id = sub["id"] + assert sub["connection_id"] == connection_id + assert sub["data"]["ti_id"] is not None + + listing = triggers_api("GET", "/triggers/subscriptions/").json() + assert any(s["id"] == subscription_id for s in listing["subscriptions"]) + + revoke = triggers_api( + "POST", f"/triggers/subscriptions/{subscription_id}/revoke" + ) + assert revoke.status_code == 200, revoke.text + assert revoke.json()["subscription"]["enabled"] is False + + delete = triggers_api("DELETE", f"/triggers/subscriptions/{subscription_id}") + assert delete.status_code == 204 + + fetch = triggers_api("GET", f"/triggers/subscriptions/{subscription_id}") + assert fetch.status_code == 404 + + # C7: deleting the subscription must NOT delete/revoke the connection. + conn = triggers_api("GET", f"/tools/connections/{connection_id}") + assert conn.status_code == 200, conn.text + + triggers_api("DELETE", f"/tools/connections/{connection_id}") diff --git a/api/entrypoints/routers.py b/api/entrypoints/routers.py index badfdcf0a0..1db32a85f3 100644 --- a/api/entrypoints/routers.py +++ b/api/entrypoints/routers.py @@ -144,6 +144,7 @@ from oss.src.core.tools.registry import ToolsGatewayRegistry from oss.src.core.tools.service import ToolsService from oss.src.apis.fastapi.tools.router import ToolsRouter +from oss.src.dbs.postgres.triggers.dao import TriggersDAO from oss.src.core.triggers.providers.composio import ComposioTriggersAdapter from oss.src.core.triggers.registry import TriggersGatewayRegistry from oss.src.core.triggers.service import TriggersService @@ -642,8 +643,12 @@ async def lifespan(*args, **kwargs): adapters=_composio_triggers_adapters, ) +triggers_dao = TriggersDAO(engine=_transactions_engine) + triggers_service = TriggersService( adapter_registry=triggers_adapter_registry, + triggers_dao=triggers_dao, + connections_service=connections_service, ) _t_services_done = time.perf_counter() - _t_services diff --git a/api/oss/databases/postgres/migrations/core_oss/versions/oss000000003_add_trigger_subscriptions_and_deliveries.py b/api/oss/databases/postgres/migrations/core_oss/versions/oss000000003_add_trigger_subscriptions_and_deliveries.py new file mode 100644 index 0000000000..c755fbebcc --- /dev/null +++ b/api/oss/databases/postgres/migrations/core_oss/versions/oss000000003_add_trigger_subscriptions_and_deliveries.py @@ -0,0 +1,179 @@ +"""add trigger_subscriptions and trigger_deliveries tables + +The two-table heart of the gateway-triggers domain (WP3), modeled on +webhook_subscriptions + webhook_deliveries. A subscription FKs the shared +gateway_connections row (many subscriptions per connection); a delivery dedups +on the provider event id (metadata.id) per subscription (I4). Authored once in +the shared core_oss chain so it runs in BOTH editions. + +Revision ID: oss000000003 +Revises: oss000000002 +Create Date: 2026-06-18 00:00:01.000000 + +""" + +from typing import Sequence, Union + +from alembic import op +import sqlalchemy as sa +from sqlalchemy.dialects import postgresql + + +# revision identifiers, used by Alembic. +revision: str = "oss000000003" +down_revision: Union[str, None] = "oss000000002" +branch_labels: Union[str, Sequence[str], None] = None +depends_on: Union[str, Sequence[str], None] = None + + +def upgrade() -> None: + # -- TRIGGER SUBSCRIPTIONS -------------------------------------------------- + op.create_table( + "trigger_subscriptions", + sa.Column("project_id", sa.UUID(), nullable=False), + sa.Column("id", sa.UUID(), nullable=False), + sa.Column("connection_id", sa.UUID(), nullable=False), + sa.Column("name", sa.String(), nullable=True), + sa.Column("description", sa.String(), nullable=True), + sa.Column("data", postgresql.JSON(astext_type=sa.Text()), nullable=True), + sa.Column( + "flags", + postgresql.JSONB(none_as_null=True, astext_type=sa.Text()), + nullable=True, + ), + sa.Column("meta", postgresql.JSON(astext_type=sa.Text()), nullable=True), + sa.Column( + "tags", + postgresql.JSONB(none_as_null=True, astext_type=sa.Text()), + nullable=True, + ), + sa.Column( + "created_at", + sa.TIMESTAMP(timezone=True), + server_default=sa.text("CURRENT_TIMESTAMP"), + nullable=False, + ), + sa.Column( + "updated_at", + sa.TIMESTAMP(timezone=True), + server_onupdate=sa.text("CURRENT_TIMESTAMP"), + nullable=True, + ), + sa.Column("deleted_at", sa.TIMESTAMP(timezone=True), nullable=True), + sa.Column("created_by_id", sa.UUID(), nullable=True), + sa.Column("updated_by_id", sa.UUID(), nullable=True), + sa.Column("deleted_by_id", sa.UUID(), nullable=True), + sa.ForeignKeyConstraint(["project_id"], ["projects.id"], ondelete="CASCADE"), + sa.ForeignKeyConstraint( + ["project_id", "connection_id"], + ["gateway_connections.project_id", "gateway_connections.id"], + ondelete="CASCADE", + ), + sa.PrimaryKeyConstraint("project_id", "id"), + ) + + op.create_index( + "ix_trigger_subscriptions_project_id_created_at", + "trigger_subscriptions", + ["project_id", "created_at"], + unique=False, + ) + op.create_index( + "ix_trigger_subscriptions_project_id_deleted_at", + "trigger_subscriptions", + ["project_id", "deleted_at"], + unique=False, + ) + op.create_index( + "ix_trigger_subscriptions_connection_id", + "trigger_subscriptions", + ["project_id", "connection_id"], + unique=False, + ) + + # -- TRIGGER DELIVERIES ----------------------------------------------------- + op.create_table( + "trigger_deliveries", + sa.Column("project_id", sa.UUID(), nullable=False), + sa.Column("id", sa.UUID(), nullable=False), + sa.Column("subscription_id", sa.UUID(), nullable=False), + sa.Column("event_id", sa.String(), nullable=False), + sa.Column( + "status", + postgresql.JSONB(none_as_null=True, astext_type=sa.Text()), + nullable=True, + ), + sa.Column("data", postgresql.JSON(astext_type=sa.Text()), nullable=True), + sa.Column( + "created_at", + sa.TIMESTAMP(timezone=True), + server_default=sa.text("CURRENT_TIMESTAMP"), + nullable=False, + ), + sa.Column( + "updated_at", + sa.TIMESTAMP(timezone=True), + server_onupdate=sa.text("CURRENT_TIMESTAMP"), + nullable=True, + ), + sa.Column("deleted_at", sa.TIMESTAMP(timezone=True), nullable=True), + sa.Column("created_by_id", sa.UUID(), nullable=True), + sa.Column("updated_by_id", sa.UUID(), nullable=True), + sa.Column("deleted_by_id", sa.UUID(), nullable=True), + sa.ForeignKeyConstraint(["project_id"], ["projects.id"], ondelete="CASCADE"), + sa.ForeignKeyConstraint( + ["project_id", "subscription_id"], + ["trigger_subscriptions.project_id", "trigger_subscriptions.id"], + ondelete="CASCADE", + ), + sa.PrimaryKeyConstraint("project_id", "id"), + ) + + op.create_index( + "ix_trigger_deliveries_project_id_created_at", + "trigger_deliveries", + ["project_id", "created_at"], + unique=False, + ) + op.create_index( + "ix_trigger_deliveries_subscription_id_created_at", + "trigger_deliveries", + ["subscription_id", "created_at"], + unique=False, + ) + op.create_index( + "ix_trigger_deliveries_subscription_id_event_id", + "trigger_deliveries", + ["project_id", "subscription_id", "event_id"], + unique=True, + ) + + +def downgrade() -> None: + op.drop_index( + "ix_trigger_deliveries_subscription_id_event_id", + table_name="trigger_deliveries", + ) + op.drop_index( + "ix_trigger_deliveries_subscription_id_created_at", + table_name="trigger_deliveries", + ) + op.drop_index( + "ix_trigger_deliveries_project_id_created_at", + table_name="trigger_deliveries", + ) + op.drop_table("trigger_deliveries") + + op.drop_index( + "ix_trigger_subscriptions_connection_id", + table_name="trigger_subscriptions", + ) + op.drop_index( + "ix_trigger_subscriptions_project_id_deleted_at", + table_name="trigger_subscriptions", + ) + op.drop_index( + "ix_trigger_subscriptions_project_id_created_at", + table_name="trigger_subscriptions", + ) + op.drop_table("trigger_subscriptions") diff --git a/api/oss/src/apis/fastapi/triggers/models.py b/api/oss/src/apis/fastapi/triggers/models.py index f1bd9b73d7..9d671ac49d 100644 --- a/api/oss/src/apis/fastapi/triggers/models.py +++ b/api/oss/src/apis/fastapi/triggers/models.py @@ -2,10 +2,17 @@ from pydantic import BaseModel, Field +from oss.src.core.shared.dtos import Windowing from oss.src.core.triggers.dtos import ( TriggerCatalogEvent, TriggerCatalogEventDetails, TriggerCatalogProvider, + TriggerDelivery, + TriggerDeliveryQuery, + TriggerSubscription, + TriggerSubscriptionCreate, + TriggerSubscriptionEdit, + TriggerSubscriptionQuery, ) @@ -34,3 +41,53 @@ class TriggerCatalogEventsResponse(BaseModel): total: int = 0 cursor: Optional[str] = None events: List[TriggerCatalogEvent] = Field(default_factory=list) + + +# --------------------------------------------------------------------------- +# Trigger Subscriptions +# --------------------------------------------------------------------------- + + +class TriggerSubscriptionCreateRequest(BaseModel): + subscription: TriggerSubscriptionCreate + + +class TriggerSubscriptionEditRequest(BaseModel): + subscription: TriggerSubscriptionEdit + + +class TriggerSubscriptionQueryRequest(BaseModel): + subscription: Optional[TriggerSubscriptionQuery] = None + + windowing: Optional[Windowing] = None + + +class TriggerSubscriptionResponse(BaseModel): + count: int = 0 + subscription: Optional[TriggerSubscription] = None + + +class TriggerSubscriptionsResponse(BaseModel): + count: int = 0 + subscriptions: List[TriggerSubscription] = Field(default_factory=list) + + +# --------------------------------------------------------------------------- +# Trigger Deliveries +# --------------------------------------------------------------------------- + + +class TriggerDeliveryQueryRequest(BaseModel): + delivery: Optional[TriggerDeliveryQuery] = None + + windowing: Optional[Windowing] = None + + +class TriggerDeliveryResponse(BaseModel): + count: int = 0 + delivery: Optional[TriggerDelivery] = None + + +class TriggerDeliveriesResponse(BaseModel): + count: int = 0 + deliveries: List[TriggerDelivery] = Field(default_factory=list) diff --git a/api/oss/src/apis/fastapi/triggers/router.py b/api/oss/src/apis/fastapi/triggers/router.py index 91ae7ff2e0..a1e5281e8f 100644 --- a/api/oss/src/apis/fastapi/triggers/router.py +++ b/api/oss/src/apis/fastapi/triggers/router.py @@ -1,5 +1,6 @@ from functools import wraps from typing import Optional +from uuid import UUID import httpx from fastapi import APIRouter, HTTPException, Query, Request, status @@ -15,8 +16,21 @@ TriggerCatalogEventsResponse, TriggerCatalogProviderResponse, TriggerCatalogProvidersResponse, + TriggerDeliveriesResponse, + TriggerDeliveryQueryRequest, + TriggerDeliveryResponse, + TriggerSubscriptionCreateRequest, + TriggerSubscriptionEditRequest, + TriggerSubscriptionQueryRequest, + TriggerSubscriptionResponse, + TriggerSubscriptionsResponse, +) +from oss.src.core.triggers.exceptions import ( + AdapterError, + ConnectionNotFoundError, + ProviderNotFoundError, + SubscriptionNotFoundError, ) -from oss.src.core.triggers.exceptions import AdapterError from oss.src.core.triggers.service import TriggersService @@ -106,6 +120,107 @@ def __init__( response_model_exclude_none=True, ) + # --- Trigger Subscriptions --- + self.router.add_api_route( + "/subscriptions/", + self.create_subscription, + methods=["POST"], + operation_id="create_trigger_subscription", + response_model=TriggerSubscriptionResponse, + response_model_exclude_none=True, + status_code=status.HTTP_200_OK, + ) + self.router.add_api_route( + "/subscriptions/", + self.list_subscriptions, + methods=["GET"], + operation_id="list_trigger_subscriptions", + response_model=TriggerSubscriptionsResponse, + response_model_exclude_none=True, + status_code=status.HTTP_200_OK, + ) + self.router.add_api_route( + "/subscriptions/query", + self.query_subscriptions, + methods=["POST"], + operation_id="query_trigger_subscriptions", + response_model=TriggerSubscriptionsResponse, + response_model_exclude_none=True, + status_code=status.HTTP_200_OK, + ) + self.router.add_api_route( + "/subscriptions/{subscription_id}/refresh", + self.refresh_subscription, + methods=["POST"], + operation_id="refresh_trigger_subscription", + response_model=TriggerSubscriptionResponse, + response_model_exclude_none=True, + status_code=status.HTTP_200_OK, + ) + self.router.add_api_route( + "/subscriptions/{subscription_id}/revoke", + self.revoke_subscription, + methods=["POST"], + operation_id="revoke_trigger_subscription", + response_model=TriggerSubscriptionResponse, + response_model_exclude_none=True, + status_code=status.HTTP_200_OK, + ) + self.router.add_api_route( + "/subscriptions/{subscription_id}", + self.fetch_subscription, + methods=["GET"], + operation_id="fetch_trigger_subscription", + response_model=TriggerSubscriptionResponse, + response_model_exclude_none=True, + status_code=status.HTTP_200_OK, + ) + self.router.add_api_route( + "/subscriptions/{subscription_id}", + self.edit_subscription, + methods=["PUT"], + operation_id="edit_trigger_subscription", + response_model=TriggerSubscriptionResponse, + response_model_exclude_none=True, + status_code=status.HTTP_200_OK, + ) + self.router.add_api_route( + "/subscriptions/{subscription_id}", + self.delete_subscription, + methods=["DELETE"], + operation_id="delete_trigger_subscription", + status_code=status.HTTP_204_NO_CONTENT, + ) + + # --- Trigger Deliveries --- + self.router.add_api_route( + "/deliveries", + self.list_deliveries, + methods=["GET"], + operation_id="list_trigger_deliveries", + response_model=TriggerDeliveriesResponse, + response_model_exclude_none=True, + status_code=status.HTTP_200_OK, + ) + self.router.add_api_route( + "/deliveries/query", + self.query_deliveries, + methods=["POST"], + operation_id="query_trigger_deliveries", + response_model=TriggerDeliveriesResponse, + response_model_exclude_none=True, + status_code=status.HTTP_200_OK, + ) + self.router.add_api_route( + "/deliveries/{delivery_id}", + self.fetch_delivery, + methods=["GET"], + operation_id="fetch_trigger_delivery", + response_model=TriggerDeliveryResponse, + response_model_exclude_none=True, + status_code=status.HTTP_200_OK, + ) + # ----------------------------------------------------------------------- # Trigger Catalog # ----------------------------------------------------------------------- @@ -322,3 +437,277 @@ async def get_event( ) return response + + # ----------------------------------------------------------------------- + # Trigger Subscriptions + # ----------------------------------------------------------------------- + + async def _check(self, request: Request, permission) -> None: + if is_ee(): + has_permission = await check_action_access( + user_uid=str(request.state.user_id), + project_id=str(request.state.project_id), + permission=permission, + ) + if not has_permission: + raise FORBIDDEN_EXCEPTION + + @intercept_exceptions() + @handle_adapter_exceptions() + async def create_subscription( + self, + request: Request, + *, + body: TriggerSubscriptionCreateRequest, + ) -> TriggerSubscriptionResponse: + await self._check(request, Permission.EDIT_TRIGGERS if is_ee() else None) + + try: + subscription = await self.triggers_service.create_subscription( + project_id=UUID(request.state.project_id), + user_id=UUID(str(request.state.user_id)), + # + subscription=body.subscription, + ) + except ConnectionNotFoundError as e: + raise HTTPException(status_code=404, detail=e.message) from e + + return TriggerSubscriptionResponse( + count=1 if subscription else 0, + subscription=subscription, + ) + + @intercept_exceptions() + async def list_subscriptions( + self, + request: Request, + ) -> TriggerSubscriptionsResponse: + await self._check(request, Permission.VIEW_TRIGGERS if is_ee() else None) + + subscriptions = await self.triggers_service.query_subscriptions( + project_id=UUID(request.state.project_id), + ) + + return TriggerSubscriptionsResponse( + count=len(subscriptions), + subscriptions=subscriptions, + ) + + @intercept_exceptions() + async def query_subscriptions( + self, + request: Request, + *, + body: TriggerSubscriptionQueryRequest, + ) -> TriggerSubscriptionsResponse: + await self._check(request, Permission.VIEW_TRIGGERS if is_ee() else None) + + subscriptions = await self.triggers_service.query_subscriptions( + project_id=UUID(request.state.project_id), + # + subscription=body.subscription, + # + windowing=body.windowing, + ) + + return TriggerSubscriptionsResponse( + count=len(subscriptions), + subscriptions=subscriptions, + ) + + @intercept_exceptions() + async def fetch_subscription( + self, + request: Request, + *, + subscription_id: UUID, + ) -> TriggerSubscriptionResponse: + await self._check(request, Permission.VIEW_TRIGGERS if is_ee() else None) + + subscription = await self.triggers_service.fetch_subscription( + project_id=UUID(request.state.project_id), + # + subscription_id=subscription_id, + ) + if not subscription: + raise HTTPException( + status_code=status.HTTP_404_NOT_FOUND, + detail="Trigger subscription not found", + ) + + return TriggerSubscriptionResponse( + count=1, + subscription=subscription, + ) + + @intercept_exceptions() + @handle_adapter_exceptions() + async def edit_subscription( + self, + request: Request, + *, + subscription_id: UUID, + body: TriggerSubscriptionEditRequest, + ) -> TriggerSubscriptionResponse: + await self._check(request, Permission.EDIT_TRIGGERS if is_ee() else None) + + if str(subscription_id) != str(body.subscription.id): + raise HTTPException( + status_code=status.HTTP_400_BAD_REQUEST, + detail="Path subscription_id does not match body id", + ) + + subscription = await self.triggers_service.edit_subscription( + project_id=UUID(request.state.project_id), + user_id=UUID(str(request.state.user_id)), + # + subscription=body.subscription, + ) + if not subscription: + raise HTTPException( + status_code=status.HTTP_404_NOT_FOUND, + detail="Trigger subscription not found", + ) + + return TriggerSubscriptionResponse( + count=1, + subscription=subscription, + ) + + @intercept_exceptions() + @handle_adapter_exceptions() + async def delete_subscription( + self, + request: Request, + *, + subscription_id: UUID, + ) -> None: + await self._check(request, Permission.EDIT_TRIGGERS if is_ee() else None) + + deleted = await self.triggers_service.delete_subscription( + project_id=UUID(request.state.project_id), + # + subscription_id=subscription_id, + ) + if not deleted: + raise HTTPException( + status_code=status.HTTP_404_NOT_FOUND, + detail="Trigger subscription not found", + ) + + @intercept_exceptions() + @handle_adapter_exceptions() + async def refresh_subscription( + self, + request: Request, + *, + subscription_id: UUID, + ) -> TriggerSubscriptionResponse: + await self._check(request, Permission.EDIT_TRIGGERS if is_ee() else None) + + try: + subscription = await self.triggers_service.refresh_subscription( + project_id=UUID(request.state.project_id), + user_id=UUID(str(request.state.user_id)), + # + subscription_id=subscription_id, + ) + except SubscriptionNotFoundError as e: + raise HTTPException(status_code=404, detail=e.message) from e + + return TriggerSubscriptionResponse( + count=1, + subscription=subscription, + ) + + @intercept_exceptions() + @handle_adapter_exceptions() + async def revoke_subscription( + self, + request: Request, + *, + subscription_id: UUID, + ) -> TriggerSubscriptionResponse: + await self._check(request, Permission.EDIT_TRIGGERS if is_ee() else None) + + try: + subscription = await self.triggers_service.revoke_subscription( + project_id=UUID(request.state.project_id), + user_id=UUID(str(request.state.user_id)), + # + subscription_id=subscription_id, + ) + except SubscriptionNotFoundError as e: + raise HTTPException(status_code=404, detail=e.message) from e + + return TriggerSubscriptionResponse( + count=1, + subscription=subscription, + ) + + # ----------------------------------------------------------------------- + # Trigger Deliveries + # ----------------------------------------------------------------------- + + @intercept_exceptions() + async def list_deliveries( + self, + request: Request, + ) -> TriggerDeliveriesResponse: + await self._check(request, Permission.VIEW_TRIGGERS if is_ee() else None) + + deliveries = await self.triggers_service.query_deliveries( + project_id=UUID(request.state.project_id), + ) + + return TriggerDeliveriesResponse( + count=len(deliveries), + deliveries=deliveries, + ) + + @intercept_exceptions() + async def query_deliveries( + self, + request: Request, + *, + body: TriggerDeliveryQueryRequest, + ) -> TriggerDeliveriesResponse: + await self._check(request, Permission.VIEW_TRIGGERS if is_ee() else None) + + deliveries = await self.triggers_service.query_deliveries( + project_id=UUID(request.state.project_id), + # + delivery=body.delivery, + # + windowing=body.windowing, + ) + + return TriggerDeliveriesResponse( + count=len(deliveries), + deliveries=deliveries, + ) + + @intercept_exceptions() + async def fetch_delivery( + self, + request: Request, + *, + delivery_id: UUID, + ) -> TriggerDeliveryResponse: + await self._check(request, Permission.VIEW_TRIGGERS if is_ee() else None) + + delivery = await self.triggers_service.fetch_delivery( + project_id=UUID(request.state.project_id), + # + delivery_id=delivery_id, + ) + if not delivery: + raise HTTPException( + status_code=status.HTTP_404_NOT_FOUND, + detail="Trigger delivery not found", + ) + + return TriggerDeliveryResponse( + count=1, + delivery=delivery, + ) diff --git a/api/oss/src/core/triggers/dtos.py b/api/oss/src/core/triggers/dtos.py index f724089a38..b2a302d6b9 100644 --- a/api/oss/src/core/triggers/dtos.py +++ b/api/oss/src/core/triggers/dtos.py @@ -1,8 +1,19 @@ from enum import Enum from typing import Any, Dict, List, Optional +from uuid import UUID from pydantic import BaseModel, Field +from oss.src.core.shared.dtos import ( + Header, + Identifier, + Lifecycle, + Metadata, + Reference, + Selector, + Status, +) + # --------------------------------------------------------------------------- # Trigger Enums @@ -47,3 +58,126 @@ class TriggerCatalogProvider(BaseModel): # name: str description: Optional[str] = None + + +# --------------------------------------------------------------------------- +# Context allowlists (mapping; see mapping.md §3) +# +# The inbound analogue of webhooks' EVENT_CONTEXT_FIELDS / SUBSCRIPTION_CONTEXT_FIELDS. +# A subscription's inputs_fields template may only reference these context keys; +# ca_*/secrets/connection internals are never exposed. +# --------------------------------------------------------------------------- + +TRIGGER_EVENT_FIELDS = { + "data", + "type", + "timestamp", + "metadata", +} + +SUBSCRIPTION_CONTEXT_FIELDS = { + "id", + "name", + "tags", + "meta", + "created_at", + "updated_at", +} + + +# --------------------------------------------------------------------------- +# Trigger Subscriptions +# +# A standing watch on one provider event. Mirrors a webhook subscription +# (subscribe-to-events lifecycle, CRUD) + FK to the shared gateway_connections +# row + a bound workflow reference. The provider-side trigger instance id +# (``ti_*``) lives on the row alongside its ``trigger_config``. +# --------------------------------------------------------------------------- + + +class TriggerSubscriptionData(BaseModel): + event_key: str + # + ti_id: Optional[str] = None + trigger_config: Optional[Dict[str, Any]] = None + # + # MAPPING — inputs-only template resolved into WorkflowServiceRequest.data.inputs. + inputs_fields: Optional[Dict[str, Any]] = None + # + # DESTINATION — the bound workflow, by reference (the /retrieve shape). + references: Optional[Dict[str, Reference]] = None + selector: Optional[Selector] = None + + +class TriggerSubscription(Identifier, Lifecycle, Header, Metadata): + connection_id: UUID + # + data: TriggerSubscriptionData + # + enabled: bool = True + valid: bool = True + + +class TriggerSubscriptionCreate(Header, Metadata): + connection_id: UUID + # + data: TriggerSubscriptionData + + +class TriggerSubscriptionEdit(Identifier, Header, Metadata): + connection_id: UUID + # + data: TriggerSubscriptionData + # + enabled: bool = True + valid: bool = True + + +class TriggerSubscriptionQuery(BaseModel): + name: Optional[str] = None + connection_id: Optional[UUID] = None + event_key: Optional[str] = None + + +# --------------------------------------------------------------------------- +# Trigger Deliveries +# +# One audit row per inbound event dispatched to its workflow — the inbound dual +# of webhook_deliveries. ``event_id`` is the I4 dedup key (provider metadata.id), +# unique per subscription. +# --------------------------------------------------------------------------- + + +class TriggerDeliveryData(BaseModel): + event_key: Optional[str] = None + # + references: Optional[Dict[str, Reference]] = None + inputs: Optional[Dict[str, Any]] = None + # + result: Optional[Dict[str, Any]] = None + error: Optional[str] = None + + +class TriggerDelivery(Identifier, Lifecycle): + status: Status + + data: Optional[TriggerDeliveryData] = None + + subscription_id: UUID + event_id: str + + +class TriggerDeliveryCreate(Identifier): + status: Status + + data: Optional[TriggerDeliveryData] = None + + subscription_id: UUID + event_id: str + + +class TriggerDeliveryQuery(BaseModel): + status: Optional[Status] = None + + subscription_id: Optional[UUID] = None + event_id: Optional[str] = None diff --git a/api/oss/src/core/triggers/exceptions.py b/api/oss/src/core/triggers/exceptions.py index 473b4094a4..092144ceff 100644 --- a/api/oss/src/core/triggers/exceptions.py +++ b/api/oss/src/core/triggers/exceptions.py @@ -17,6 +17,22 @@ def __init__(self, provider_key: str): super().__init__(f"Provider not found: {provider_key}") +class SubscriptionNotFoundError(TriggersError): + """Raised when a subscription_id does not exist in the project.""" + + def __init__(self, *, subscription_id: str): + self.subscription_id = subscription_id + super().__init__(f"Trigger subscription not found: {subscription_id}") + + +class ConnectionNotFoundError(TriggersError): + """Raised when a subscription references a connection that does not exist.""" + + def __init__(self, *, connection_id: str): + self.connection_id = connection_id + super().__init__(f"Connection not found: {connection_id}") + + class AdapterError(TriggersError): """Raised when an adapter operation fails.""" diff --git a/api/oss/src/core/triggers/interfaces.py b/api/oss/src/core/triggers/interfaces.py index 2b07ca835f..a7280c90ea 100644 --- a/api/oss/src/core/triggers/interfaces.py +++ b/api/oss/src/core/triggers/interfaces.py @@ -2,10 +2,18 @@ from typing import Any, Dict, List, Optional, Tuple from uuid import UUID +from oss.src.core.shared.dtos import Windowing from oss.src.core.triggers.dtos import ( TriggerCatalogEvent, TriggerCatalogEventDetails, TriggerCatalogProvider, + TriggerDelivery, + TriggerDeliveryCreate, + TriggerDeliveryQuery, + TriggerSubscription, + TriggerSubscriptionCreate, + TriggerSubscriptionEdit, + TriggerSubscriptionQuery, ) @@ -73,3 +81,114 @@ async def delete_subscription( ) -> None: """Permanently delete the provider-side trigger instance.""" ... + + +class TriggersDAOInterface(ABC): + """Persistence contract for the triggers domain (subscriptions + deliveries).""" + + # --- subscriptions ------------------------------------------------------ # + + @abstractmethod + async def create_subscription( + self, + *, + project_id: UUID, + user_id: UUID, + # + subscription: TriggerSubscriptionCreate, + # + ti_id: str, + ) -> TriggerSubscription: ... + + @abstractmethod + async def fetch_subscription( + self, + *, + project_id: UUID, + # + subscription_id: UUID, + ) -> Optional[TriggerSubscription]: ... + + @abstractmethod + async def edit_subscription( + self, + *, + project_id: UUID, + user_id: UUID, + # + subscription: TriggerSubscriptionEdit, + ) -> Optional[TriggerSubscription]: ... + + @abstractmethod + async def delete_subscription( + self, + *, + project_id: UUID, + # + subscription_id: UUID, + ) -> bool: ... + + @abstractmethod + async def query_subscriptions( + self, + *, + project_id: UUID, + # + subscription: Optional[TriggerSubscriptionQuery] = None, + # + windowing: Optional[Windowing] = None, + ) -> List[TriggerSubscription]: ... + + @abstractmethod + async def get_subscription_by_trigger_id( + self, + *, + trigger_id: str, + ) -> Optional[TriggerSubscription]: + """FROZEN (WP4): resolve an inbound event's ``ti_*`` to its local row.""" + ... + + # --- deliveries --------------------------------------------------------- # + + @abstractmethod + async def write_delivery( + self, + *, + project_id: UUID, + user_id: Optional[UUID], + # + delivery: TriggerDeliveryCreate, + ) -> TriggerDelivery: + """FROZEN (WP4): upsert a delivery row (idempotent on event_id).""" + ... + + @abstractmethod + async def fetch_delivery( + self, + *, + project_id: UUID, + # + delivery_id: UUID, + ) -> Optional[TriggerDelivery]: ... + + @abstractmethod + async def query_deliveries( + self, + *, + project_id: UUID, + # + delivery: Optional[TriggerDeliveryQuery] = None, + # + windowing: Optional[Windowing] = None, + ) -> List[TriggerDelivery]: ... + + @abstractmethod + async def dedup_seen( + self, + *, + project_id: UUID, + subscription_id: UUID, + event_id: str, + ) -> bool: + """FROZEN (WP4): True if a delivery for this event_id already exists (I4).""" + ... diff --git a/api/oss/src/core/triggers/service.py b/api/oss/src/core/triggers/service.py index 1056144ea4..349f5fd889 100644 --- a/api/oss/src/core/triggers/service.py +++ b/api/oss/src/core/triggers/service.py @@ -1,13 +1,28 @@ from typing import List, Optional, Tuple +from uuid import UUID from oss.src.utils.logging import get_module_logger +from oss.src.core.gateway.connections.service import ConnectionsService from oss.src.core.triggers.dtos import ( TriggerCatalogEvent, TriggerCatalogEventDetails, TriggerCatalogProvider, + TriggerDelivery, + TriggerDeliveryCreate, + TriggerDeliveryQuery, + TriggerSubscription, + TriggerSubscriptionCreate, + TriggerSubscriptionEdit, + TriggerSubscriptionQuery, ) +from oss.src.core.triggers.exceptions import ( + ConnectionNotFoundError, + SubscriptionNotFoundError, +) +from oss.src.core.triggers.interfaces import TriggersDAOInterface from oss.src.core.triggers.registry import TriggersGatewayRegistry +from oss.src.core.shared.dtos import Windowing log = get_module_logger(__name__) @@ -16,16 +31,22 @@ class TriggersService: """Triggers domain orchestration. - WP1 scope is the read-only events catalog. Subscriptions/deliveries CRUD and - ingress/dispatch land in later WPs (WP3/WP4) and will extend this service. + Covers the read-only events catalog (WP1) and subscription/delivery + CRUD (WP3). Subscriptions bind a provider event to a workflow on top of a + shared gateway connection; the provider-side trigger instance (``ti_*``) is + minted/managed through the adapter, never the catalog routes. """ def __init__( self, *, adapter_registry: TriggersGatewayRegistry, + triggers_dao: Optional[TriggersDAOInterface] = None, + connections_service: Optional[ConnectionsService] = None, ): self.adapter_registry = adapter_registry + self.dao = triggers_dao + self.connections_service = connections_service # ----------------------------------------------------------------------- # Catalog browse @@ -88,3 +109,282 @@ async def get_event( integration_key=integration_key, event_key=event_key, ) + + # ----------------------------------------------------------------------- + # Subscriptions + # ----------------------------------------------------------------------- + + async def _require_connection( + self, + *, + project_id: UUID, + connection_id: UUID, + ): + connection = await self.connections_service.get_connection( + project_id=project_id, + connection_id=connection_id, + ) + if not connection: + raise ConnectionNotFoundError(connection_id=str(connection_id)) + return connection + + async def create_subscription( + self, + *, + project_id: UUID, + user_id: UUID, + # + subscription: TriggerSubscriptionCreate, + ) -> TriggerSubscription: + """Mint the provider-side ``ti_*`` on a shared connection, then persist.""" + connection = await self._require_connection( + project_id=project_id, + connection_id=subscription.connection_id, + ) + + adapter = self.adapter_registry.get(connection.provider_key.value) + + ti_id = await adapter.create_subscription( + project_id=project_id, + event_key=subscription.data.event_key, + connected_account_id=connection.provider_connection_id, + trigger_config=subscription.data.trigger_config or {}, + ) + + return await self.dao.create_subscription( + project_id=project_id, + user_id=user_id, + # + subscription=subscription, + # + ti_id=ti_id, + ) + + async def fetch_subscription( + self, + *, + project_id: UUID, + # + subscription_id: UUID, + ) -> Optional[TriggerSubscription]: + return await self.dao.fetch_subscription( + project_id=project_id, + subscription_id=subscription_id, + ) + + async def query_subscriptions( + self, + *, + project_id: UUID, + # + subscription: Optional[TriggerSubscriptionQuery] = None, + # + windowing: Optional[Windowing] = None, + ) -> List[TriggerSubscription]: + return await self.dao.query_subscriptions( + project_id=project_id, + subscription=subscription, + windowing=windowing, + ) + + async def edit_subscription( + self, + *, + project_id: UUID, + user_id: UUID, + # + subscription: TriggerSubscriptionEdit, + ) -> Optional[TriggerSubscription]: + """Full-PUT edit. Reflects the enabled flag onto the provider ``ti_*``.""" + existing = await self.dao.fetch_subscription( + project_id=project_id, + subscription_id=subscription.id, + ) + if existing is None: + return None + + ti_id = existing.data.ti_id + if ti_id is not None and subscription.enabled != existing.enabled: + connection = await self._require_connection( + project_id=project_id, + connection_id=existing.connection_id, + ) + adapter = self.adapter_registry.get(connection.provider_key.value) + await adapter.set_subscription_status( + trigger_id=ti_id, + enabled=subscription.enabled, + ) + + return await self.dao.edit_subscription( + project_id=project_id, + user_id=user_id, + subscription=subscription, + ) + + async def delete_subscription( + self, + *, + project_id: UUID, + # + subscription_id: UUID, + ) -> bool: + """Delete the local row and the provider ``ti_*``. + + Deleting a subscription must NOT revoke the shared connection (C7): the + adapter call below targets only the trigger instance, never the ``ca_*``. + """ + existing = await self.dao.fetch_subscription( + project_id=project_id, + subscription_id=subscription_id, + ) + if existing is None: + return False + + ti_id = existing.data.ti_id + if ti_id is not None: + connection = await self.connections_service.get_connection( + project_id=project_id, + connection_id=existing.connection_id, + ) + if connection is not None: + adapter = self.adapter_registry.get(connection.provider_key.value) + try: + await adapter.delete_subscription(trigger_id=ti_id) + except Exception: + log.warning( + "Failed to delete provider trigger %s; proceeding with local delete", + ti_id, + ) + + return await self.dao.delete_subscription( + project_id=project_id, + subscription_id=subscription_id, + ) + + async def refresh_subscription( + self, + *, + project_id: UUID, + user_id: UUID, + # + subscription_id: UUID, + ) -> TriggerSubscription: + """Re-enable the provider ``ti_*`` and mark the row enabled+valid.""" + return await self._set_enabled( + project_id=project_id, + user_id=user_id, + subscription_id=subscription_id, + enabled=True, + ) + + async def revoke_subscription( + self, + *, + project_id: UUID, + user_id: UUID, + # + subscription_id: UUID, + ) -> TriggerSubscription: + """Disable the provider ``ti_*`` and mark the row disabled. + + Local + provider trigger-instance only; the shared connection is never + touched (C7). + """ + return await self._set_enabled( + project_id=project_id, + user_id=user_id, + subscription_id=subscription_id, + enabled=False, + ) + + async def _set_enabled( + self, + *, + project_id: UUID, + user_id: UUID, + subscription_id: UUID, + enabled: bool, + ) -> TriggerSubscription: + existing = await self.dao.fetch_subscription( + project_id=project_id, + subscription_id=subscription_id, + ) + if existing is None: + raise SubscriptionNotFoundError(subscription_id=str(subscription_id)) + + ti_id = existing.data.ti_id + if ti_id is not None: + connection = await self._require_connection( + project_id=project_id, + connection_id=existing.connection_id, + ) + adapter = self.adapter_registry.get(connection.provider_key.value) + await adapter.set_subscription_status( + trigger_id=ti_id, + enabled=enabled, + ) + + edit = TriggerSubscriptionEdit( + id=existing.id, + connection_id=existing.connection_id, + name=existing.name, + description=existing.description, + tags=existing.tags, + meta=existing.meta, + data=existing.data, + enabled=enabled, + valid=existing.valid, + ) + + updated = await self.dao.edit_subscription( + project_id=project_id, + user_id=user_id, + subscription=edit, + ) + + return updated or existing + + # ----------------------------------------------------------------------- + # Deliveries + # ----------------------------------------------------------------------- + + async def fetch_delivery( + self, + *, + project_id: UUID, + # + delivery_id: UUID, + ) -> Optional[TriggerDelivery]: + return await self.dao.fetch_delivery( + project_id=project_id, + delivery_id=delivery_id, + ) + + async def query_deliveries( + self, + *, + project_id: UUID, + # + delivery: Optional[TriggerDeliveryQuery] = None, + # + windowing: Optional[Windowing] = None, + ) -> List[TriggerDelivery]: + return await self.dao.query_deliveries( + project_id=project_id, + delivery=delivery, + windowing=windowing, + ) + + async def write_delivery( + self, + *, + project_id: UUID, + user_id: Optional[UUID] = None, + # + delivery: TriggerDeliveryCreate, + ) -> TriggerDelivery: + return await self.dao.write_delivery( + project_id=project_id, + user_id=user_id, + delivery=delivery, + ) diff --git a/api/oss/src/dbs/postgres/triggers/dao.py b/api/oss/src/dbs/postgres/triggers/dao.py new file mode 100644 index 0000000000..b3a1a51e3c --- /dev/null +++ b/api/oss/src/dbs/postgres/triggers/dao.py @@ -0,0 +1,378 @@ +from datetime import datetime, timezone +from typing import List, Optional +from uuid import UUID + +from sqlalchemy import select +from sqlalchemy.dialects.postgresql import insert + +from oss.src.core.shared.dtos import Windowing +from oss.src.core.triggers.dtos import ( + TriggerDelivery, + TriggerDeliveryCreate, + TriggerDeliveryQuery, + TriggerSubscription, + TriggerSubscriptionCreate, + TriggerSubscriptionEdit, + TriggerSubscriptionQuery, +) +from oss.src.core.triggers.interfaces import TriggersDAOInterface + +from oss.src.dbs.postgres.shared.engine import ( + TransactionsEngine, + get_transactions_engine, +) +from oss.src.dbs.postgres.shared.utils import apply_windowing +from oss.src.dbs.postgres.triggers.dbes import ( + TriggerDeliveryDBE, + TriggerSubscriptionDBE, +) +from oss.src.dbs.postgres.triggers.mappings import ( + map_delivery_dbe_to_dto, + map_delivery_dto_to_dbe_create, + map_subscription_dbe_to_dto, + map_subscription_dto_to_dbe_create, + map_subscription_dto_to_dbe_edit, +) + + +class TriggersDAO(TriggersDAOInterface): + def __init__(self, engine: TransactionsEngine = None): + if engine is None: + engine = get_transactions_engine() + self.engine = engine + + # --- SUBSCRIPTIONS ------------------------------------------------------ # + + async def create_subscription( + self, + *, + project_id: UUID, + user_id: UUID, + # + subscription: TriggerSubscriptionCreate, + # + ti_id: str, + ) -> TriggerSubscription: + subscription_dbe = map_subscription_dto_to_dbe_create( + project_id=project_id, + user_id=user_id, + # + subscription=subscription, + # + ti_id=ti_id, + ) + + async with self.engine.session() as session: + session.add(subscription_dbe) + + await session.commit() + + await session.refresh(subscription_dbe) + + return map_subscription_dbe_to_dto( + subscription_dbe=subscription_dbe, + ) + + async def fetch_subscription( + self, + *, + project_id: UUID, + # + subscription_id: UUID, + ) -> Optional[TriggerSubscription]: + async with self.engine.session() as session: + stmt = select(TriggerSubscriptionDBE).where( + TriggerSubscriptionDBE.project_id == project_id, + TriggerSubscriptionDBE.id == subscription_id, + ) + + result = await session.execute(stmt) + + subscription_dbe = result.scalar_one_or_none() + + if not subscription_dbe: + return None + + return map_subscription_dbe_to_dto( + subscription_dbe=subscription_dbe, + ) + + async def edit_subscription( + self, + *, + project_id: UUID, + user_id: UUID, + # + subscription: TriggerSubscriptionEdit, + ) -> Optional[TriggerSubscription]: + async with self.engine.session() as session: + stmt = select(TriggerSubscriptionDBE).where( + TriggerSubscriptionDBE.id == subscription.id, + TriggerSubscriptionDBE.project_id == project_id, + ) + + result = await session.execute(stmt) + + subscription_dbe = result.scalar_one_or_none() + + if not subscription_dbe: + return None + + map_subscription_dto_to_dbe_edit( + subscription_dbe=subscription_dbe, + # + user_id=user_id, + # + subscription=subscription, + ) + + await session.commit() + + await session.refresh(subscription_dbe) + + return map_subscription_dbe_to_dto( + subscription_dbe=subscription_dbe, + ) + + async def delete_subscription( + self, + *, + project_id: UUID, + # + subscription_id: UUID, + ) -> bool: + async with self.engine.session() as session: + stmt = select(TriggerSubscriptionDBE).where( + TriggerSubscriptionDBE.project_id == project_id, + TriggerSubscriptionDBE.id == subscription_id, + ) + + result = await session.execute(stmt) + + subscription_dbe = result.scalar_one_or_none() + + if not subscription_dbe: + return False + + await session.delete(subscription_dbe) + + await session.commit() + + return True + + async def query_subscriptions( + self, + *, + project_id: UUID, + # + subscription: Optional[TriggerSubscriptionQuery] = None, + # + windowing: Optional[Windowing] = None, + ) -> List[TriggerSubscription]: + async with self.engine.session() as session: + stmt = select(TriggerSubscriptionDBE).filter( + TriggerSubscriptionDBE.project_id == project_id, + ) + + if subscription: + if subscription.name is not None: + stmt = stmt.filter( + TriggerSubscriptionDBE.name.ilike(f"%{subscription.name}%"), + ) + + if subscription.connection_id is not None: + stmt = stmt.filter( + TriggerSubscriptionDBE.connection_id + == subscription.connection_id, + ) + + if subscription.event_key is not None: + stmt = stmt.filter( + TriggerSubscriptionDBE.data["event_key"].astext + == subscription.event_key, + ) + + if windowing: + stmt = apply_windowing( + stmt=stmt, + DBE=TriggerSubscriptionDBE, + attribute="id", + order="descending", + windowing=windowing, + ) + + result = await session.execute(stmt) + + return [ + map_subscription_dbe_to_dto(subscription_dbe=dbe) + for dbe in result.scalars().all() + ] + + async def get_subscription_by_trigger_id( + self, + *, + trigger_id: str, + ) -> Optional[TriggerSubscription]: + async with self.engine.session() as session: + stmt = ( + select(TriggerSubscriptionDBE) + .filter( + TriggerSubscriptionDBE.data["ti_id"].astext == trigger_id, + ) + .limit(1) + ) + + result = await session.execute(stmt) + + subscription_dbe = result.scalars().first() + + if not subscription_dbe: + return None + + return map_subscription_dbe_to_dto( + subscription_dbe=subscription_dbe, + ) + + # --- DELIVERIES --------------------------------------------------------- # + + async def write_delivery( + self, + *, + project_id: UUID, + user_id: Optional[UUID], + # + delivery: TriggerDeliveryCreate, + ) -> TriggerDelivery: + delivery_dbe = map_delivery_dto_to_dbe_create( + project_id=project_id, + user_id=user_id, + # + delivery=delivery, + ) + + async with self.engine.session() as session: + values = { + c.name: getattr(delivery_dbe, c.name) + for c in TriggerDeliveryDBE.__table__.columns + if not ( + c.name in ("id", "created_at", "updated_at", "deleted_at") + and getattr(delivery_dbe, c.name) is None + ) + } + + stmt = insert(TriggerDeliveryDBE).values(**values) + stmt = stmt.on_conflict_do_update( + index_elements=["project_id", "subscription_id", "event_id"], + set_={ + "status": stmt.excluded.status, + "data": stmt.excluded.data, + "updated_at": datetime.now(timezone.utc), + "updated_by_id": stmt.excluded.created_by_id, + }, + ) + await session.execute(stmt) + await session.commit() + + refreshed_stmt = select(TriggerDeliveryDBE).where( + TriggerDeliveryDBE.project_id == project_id, + TriggerDeliveryDBE.subscription_id == delivery.subscription_id, + TriggerDeliveryDBE.event_id == delivery.event_id, + ) + delivery_dbe = (await session.execute(refreshed_stmt)).scalar_one() + + return map_delivery_dbe_to_dto( + delivery_dbe=delivery_dbe, + ) + + async def fetch_delivery( + self, + *, + project_id: UUID, + # + delivery_id: UUID, + ) -> Optional[TriggerDelivery]: + async with self.engine.session() as session: + stmt = select(TriggerDeliveryDBE).where( + TriggerDeliveryDBE.project_id == project_id, + TriggerDeliveryDBE.id == delivery_id, + ) + + result = await session.execute(stmt) + + delivery_dbe = result.scalar_one_or_none() + + if not delivery_dbe: + return None + + return map_delivery_dbe_to_dto( + delivery_dbe=delivery_dbe, + ) + + async def query_deliveries( + self, + *, + project_id: UUID, + # + delivery: Optional[TriggerDeliveryQuery] = None, + # + windowing: Optional[Windowing] = None, + ) -> List[TriggerDelivery]: + async with self.engine.session() as session: + stmt = select(TriggerDeliveryDBE).filter( + TriggerDeliveryDBE.project_id == project_id, + ) + + if delivery: + if delivery.status is not None and delivery.status.code is not None: + stmt = stmt.filter( + TriggerDeliveryDBE.status["code"].astext + == str(delivery.status.code), + ) + + if delivery.subscription_id is not None: + stmt = stmt.filter( + TriggerDeliveryDBE.subscription_id == delivery.subscription_id, + ) + + if delivery.event_id is not None: + stmt = stmt.filter( + TriggerDeliveryDBE.event_id == delivery.event_id, + ) + + if windowing: + stmt = apply_windowing( + stmt=stmt, + DBE=TriggerDeliveryDBE, + attribute="created_at", + order="descending", + windowing=windowing, + ) + + result = await session.execute(stmt) + + return [ + map_delivery_dbe_to_dto(delivery_dbe=dbe) + for dbe in result.scalars().all() + ] + + async def dedup_seen( + self, + *, + project_id: UUID, + subscription_id: UUID, + event_id: str, + ) -> bool: + async with self.engine.session() as session: + stmt = ( + select(TriggerDeliveryDBE.id) + .where( + TriggerDeliveryDBE.project_id == project_id, + TriggerDeliveryDBE.subscription_id == subscription_id, + TriggerDeliveryDBE.event_id == event_id, + ) + .limit(1) + ) + + result = await session.execute(stmt) + + return result.scalar_one_or_none() is not None diff --git a/api/oss/src/dbs/postgres/triggers/dbas.py b/api/oss/src/dbs/postgres/triggers/dbas.py new file mode 100644 index 0000000000..2f2e7b199b --- /dev/null +++ b/api/oss/src/dbs/postgres/triggers/dbas.py @@ -0,0 +1,53 @@ +from sqlalchemy import Column, String +from sqlalchemy.dialects.postgresql import UUID + +from oss.src.dbs.postgres.shared.dbas import ( + DataDBA, + FlagsDBA, + HeaderDBA, + IdentifierDBA, + LifecycleDBA, + MetaDBA, + ProjectScopeDBA, + StatusDBA, + TagsDBA, +) + + +class TriggerSubscriptionDBA( + ProjectScopeDBA, + LifecycleDBA, + IdentifierDBA, + HeaderDBA, + DataDBA, + FlagsDBA, + TagsDBA, + MetaDBA, +): + __abstract__ = True + + connection_id = Column( + UUID(as_uuid=True), + nullable=False, + ) + + +class TriggerDeliveryDBA( + ProjectScopeDBA, + LifecycleDBA, + IdentifierDBA, + StatusDBA, + DataDBA, +): + __abstract__ = True + + subscription_id = Column( + UUID(as_uuid=True), + nullable=False, + ) + + # I4: provider metadata.id — an arbitrary provider string, unique per subscription. + event_id = Column( + String, + nullable=False, + ) diff --git a/api/oss/src/dbs/postgres/triggers/dbes.py b/api/oss/src/dbs/postgres/triggers/dbes.py new file mode 100644 index 0000000000..9caf012350 --- /dev/null +++ b/api/oss/src/dbs/postgres/triggers/dbes.py @@ -0,0 +1,75 @@ +from sqlalchemy import ForeignKeyConstraint, Index, PrimaryKeyConstraint + +from oss.src.dbs.postgres.shared.base import Base +from oss.src.dbs.postgres.triggers.dbas import ( + TriggerDeliveryDBA, + TriggerSubscriptionDBA, +) + + +class TriggerSubscriptionDBE(Base, TriggerSubscriptionDBA): + __tablename__ = "trigger_subscriptions" + + __table_args__ = ( + ForeignKeyConstraint( + ["project_id"], + ["projects.id"], + ondelete="CASCADE", + ), + ForeignKeyConstraint( + ["project_id", "connection_id"], + ["gateway_connections.project_id", "gateway_connections.id"], + ondelete="CASCADE", + ), + PrimaryKeyConstraint("project_id", "id"), + Index( + "ix_trigger_subscriptions_project_id_created_at", + "project_id", + "created_at", + ), + Index( + "ix_trigger_subscriptions_project_id_deleted_at", + "project_id", + "deleted_at", + ), + Index( + "ix_trigger_subscriptions_connection_id", + "project_id", + "connection_id", + ), + ) + + +class TriggerDeliveryDBE(Base, TriggerDeliveryDBA): + __tablename__ = "trigger_deliveries" + + __table_args__ = ( + ForeignKeyConstraint( + ["project_id"], + ["projects.id"], + ondelete="CASCADE", + ), + ForeignKeyConstraint( + ["project_id", "subscription_id"], + ["trigger_subscriptions.project_id", "trigger_subscriptions.id"], + ondelete="CASCADE", + ), + PrimaryKeyConstraint("project_id", "id"), + Index( + "ix_trigger_deliveries_project_id_created_at", + "project_id", + "created_at", + ), + Index( + "ix_trigger_deliveries_subscription_id_created_at", + "subscription_id", + "created_at", + ), + Index( + "ix_trigger_deliveries_subscription_id_event_id", + "project_id", + "subscription_id", + "event_id", + unique=True, + ), + ) diff --git a/api/oss/src/dbs/postgres/triggers/mappings.py b/api/oss/src/dbs/postgres/triggers/mappings.py new file mode 100644 index 0000000000..97b1eaed92 --- /dev/null +++ b/api/oss/src/dbs/postgres/triggers/mappings.py @@ -0,0 +1,179 @@ +from uuid import UUID + +from oss.src.core.shared.dtos import Status +from oss.src.core.triggers.dtos import ( + TriggerDelivery, + TriggerDeliveryCreate, + TriggerDeliveryData, + TriggerSubscription, + TriggerSubscriptionCreate, + TriggerSubscriptionData, + TriggerSubscriptionEdit, +) + +from oss.src.dbs.postgres.triggers.dbes import ( + TriggerDeliveryDBE, + TriggerSubscriptionDBE, +) + + +# --- Subscription ----------------------------------------------------------- # + +_SUBSCRIPTION_FLAGS = ("enabled", "valid") + + +def _flags_to_dbe(*, enabled: bool, valid: bool) -> dict: + return {"enabled": enabled, "valid": valid} + + +def map_subscription_dto_to_dbe_create( + *, + project_id: UUID, + user_id: UUID, + # + subscription: TriggerSubscriptionCreate, + # + ti_id: str, +) -> TriggerSubscriptionDBE: + data = subscription.data.model_copy(update={"ti_id": ti_id}) + + return TriggerSubscriptionDBE( + project_id=project_id, + # + created_by_id=user_id, + # + connection_id=subscription.connection_id, + # + name=subscription.name, + description=subscription.description, + tags=subscription.tags, + meta=subscription.meta, + # + flags=_flags_to_dbe(enabled=True, valid=True), + # + data=data.model_dump(mode="json", exclude_none=True), + ) + + +def map_subscription_dbe_to_dto( + *, + subscription_dbe: TriggerSubscriptionDBE, +) -> TriggerSubscription: + flags = subscription_dbe.flags or {} + + return TriggerSubscription( + id=subscription_dbe.id, + # + created_at=subscription_dbe.created_at, + updated_at=subscription_dbe.updated_at, + deleted_at=subscription_dbe.deleted_at, + created_by_id=subscription_dbe.created_by_id, + updated_by_id=subscription_dbe.updated_by_id, + deleted_by_id=subscription_dbe.deleted_by_id, + # + connection_id=subscription_dbe.connection_id, + # + name=subscription_dbe.name, + description=subscription_dbe.description, + # + tags=subscription_dbe.tags, + meta=subscription_dbe.meta, + # + data=TriggerSubscriptionData.model_validate(subscription_dbe.data), + # + enabled=bool(flags.get("enabled", True)), + valid=bool(flags.get("valid", True)), + ) + + +def map_subscription_dto_to_dbe_edit( + *, + subscription_dbe: TriggerSubscriptionDBE, + # + user_id: UUID, + # + subscription: TriggerSubscriptionEdit, +) -> None: + subscription_dbe.updated_by_id = user_id + + subscription_dbe.connection_id = subscription.connection_id + + subscription_dbe.name = subscription.name + subscription_dbe.description = subscription.description + + subscription_dbe.tags = subscription.tags + subscription_dbe.meta = subscription.meta + + # Preserve the provider ti_id even if the client omitted it on the full-PUT. + existing_ti_id = (subscription_dbe.data or {}).get("ti_id") + data = subscription.data + if data.ti_id is None and existing_ti_id is not None: + data = data.model_copy(update={"ti_id": existing_ti_id}) + + subscription_dbe.data = data.model_dump(mode="json", exclude_none=True) + + subscription_dbe.flags = _flags_to_dbe( + enabled=subscription.enabled, + valid=subscription.valid, + ) + + +# --- Delivery --------------------------------------------------------------- # + + +def map_delivery_dto_to_dbe_create( + *, + project_id: UUID, + user_id: UUID | None, + # + delivery: TriggerDeliveryCreate, +) -> TriggerDeliveryDBE: + dbe_kwargs = dict( + project_id=project_id, + # + created_by_id=user_id, + # + status=delivery.status.model_dump(mode="json", exclude_none=True) + if delivery.status + else None, + # + data=delivery.data.model_dump(mode="json", exclude_none=True) + if delivery.data + else None, + # + subscription_id=delivery.subscription_id, + # + event_id=delivery.event_id, + ) + if delivery.id is not None: + dbe_kwargs["id"] = delivery.id + + return TriggerDeliveryDBE(**dbe_kwargs) + + +def map_delivery_dbe_to_dto( + *, + delivery_dbe: TriggerDeliveryDBE, +) -> TriggerDelivery: + return TriggerDelivery( + id=delivery_dbe.id, + # + created_at=delivery_dbe.created_at, + updated_at=delivery_dbe.updated_at, + deleted_at=delivery_dbe.deleted_at, + created_by_id=delivery_dbe.created_by_id, + updated_by_id=delivery_dbe.updated_by_id, + deleted_by_id=delivery_dbe.deleted_by_id, + # + status=Status.model_validate(delivery_dbe.status) + if delivery_dbe.status + else Status(), + # + data=TriggerDeliveryData.model_validate(delivery_dbe.data) + if delivery_dbe.data + else None, + # + subscription_id=delivery_dbe.subscription_id, + # + event_id=delivery_dbe.event_id, + ) diff --git a/api/oss/tests/pytest/acceptance/triggers/test_triggers_subscriptions.py b/api/oss/tests/pytest/acceptance/triggers/test_triggers_subscriptions.py new file mode 100644 index 0000000000..cd519cc3f2 --- /dev/null +++ b/api/oss/tests/pytest/acceptance/triggers/test_triggers_subscriptions.py @@ -0,0 +1,155 @@ +"""Acceptance tests for /triggers/subscriptions/* and /triggers/deliveries/*. + +The read/query surfaces are DB-only — a fresh project returns well-shaped empty +lists and 404s with no Composio credentials, which also proves the +trigger_subscriptions / trigger_deliveries tables landed (migration ran). + +Creating a subscription mints a provider-side trigger instance (ti_*) on a +shared gateway connection, so the full create -> list -> disable -> delete +roundtrip (and the C7 invariant — deleting a subscription leaves the connection +intact) is gated on COMPOSIO_API_KEY being present in the runner's environment. + +Requires a running API. +""" + +import os +from uuid import uuid4 + +import pytest + + +_COMPOSIO_ENABLED = bool(os.getenv("COMPOSIO_API_KEY")) +_requires_composio = pytest.mark.skipif( + not _COMPOSIO_ENABLED, + reason="needs live Composio credentials (COMPOSIO_API_KEY)", +) + + +# --------------------------------------------------------------------------- +# DB-only: reads, queries, 404s (no Composio needed) +# --------------------------------------------------------------------------- + + +class TestTriggerSubscriptionsReads: + def test_list_subscriptions_returns_200_empty(self, authed_api): + body = authed_api("GET", "/triggers/subscriptions/").json() + assert "count" in body + assert "subscriptions" in body + assert isinstance(body["subscriptions"], list) + assert body["count"] == len(body["subscriptions"]) + + def test_query_subscriptions_returns_200(self, authed_api): + response = authed_api("POST", "/triggers/subscriptions/query", json={}) + assert response.status_code == 200 + body = response.json() + assert body["count"] == len(body["subscriptions"]) + + def test_fetch_unknown_subscription_returns_404(self, authed_api): + response = authed_api("GET", f"/triggers/subscriptions/{uuid4()}") + assert response.status_code == 404 + + def test_delete_unknown_subscription_returns_404(self, authed_api): + response = authed_api("DELETE", f"/triggers/subscriptions/{uuid4()}") + assert response.status_code == 404 + + def test_refresh_unknown_subscription_returns_404(self, authed_api): + response = authed_api("POST", f"/triggers/subscriptions/{uuid4()}/refresh") + assert response.status_code == 404 + + def test_revoke_unknown_subscription_returns_404(self, authed_api): + response = authed_api("POST", f"/triggers/subscriptions/{uuid4()}/revoke") + assert response.status_code == 404 + + +class TestTriggerDeliveriesReads: + def test_list_deliveries_returns_200_empty(self, authed_api): + body = authed_api("GET", "/triggers/deliveries").json() + assert "count" in body + assert "deliveries" in body + assert isinstance(body["deliveries"], list) + assert body["count"] == len(body["deliveries"]) + + def test_query_deliveries_returns_200(self, authed_api): + response = authed_api("POST", "/triggers/deliveries/query", json={}) + assert response.status_code == 200 + body = response.json() + assert body["count"] == len(body["deliveries"]) + + def test_fetch_unknown_delivery_returns_404(self, authed_api): + response = authed_api("GET", f"/triggers/deliveries/{uuid4()}") + assert response.status_code == 404 + + +# --------------------------------------------------------------------------- +# Full lifecycle (needs Composio) — create on a shared connection bound to a +# workflow, list/disable/delete it, and prove the connection survives (C7). +# --------------------------------------------------------------------------- + + +@_requires_composio +class TestTriggerSubscriptionsLifecycle: + def _create_connection(self, authed_api): + slug = f"acc-{uuid4().hex[:8]}" + create = authed_api( + "POST", + "/tools/connections/", + json={ + "connection": { + "slug": slug, + "provider_key": "composio", + "integration_key": "github", + "data": {"auth_scheme": "oauth"}, + } + }, + ) + assert create.status_code == 200, create.text + return create.json()["connection"]["id"] + + def test_create_list_disable_delete_keeps_connection(self, authed_api): + connection_id = self._create_connection(authed_api) + + # CREATE — binds the event to a workflow reference on the shared connection + create = authed_api( + "POST", + "/triggers/subscriptions/", + json={ + "subscription": { + "name": f"sub-{uuid4().hex[:8]}", + "connection_id": connection_id, + "data": { + "event_key": "GITHUB_STAR_ADDED_EVENT", + "trigger_config": {}, + "inputs_fields": {"repo": "$.event.data.repository"}, + "references": {"workflow": {"slug": "triage"}}, + }, + } + }, + ) + assert create.status_code == 200, create.text + sub = create.json()["subscription"] + subscription_id = sub["id"] + assert sub["connection_id"] == connection_id + assert sub["data"]["ti_id"] is not None + assert sub["enabled"] is True + + # LIST + listing = authed_api("GET", "/triggers/subscriptions/").json() + assert any(s["id"] == subscription_id for s in listing["subscriptions"]) + + # DISABLE (revoke the subscription, not the connection) + revoke = authed_api("POST", f"/triggers/subscriptions/{subscription_id}/revoke") + assert revoke.status_code == 200, revoke.text + assert revoke.json()["subscription"]["enabled"] is False + + # DELETE + delete = authed_api("DELETE", f"/triggers/subscriptions/{subscription_id}") + assert delete.status_code == 204 + + fetch = authed_api("GET", f"/triggers/subscriptions/{subscription_id}") + assert fetch.status_code == 404 + + # C7: deleting the subscription must NOT delete/revoke the connection. + conn = authed_api("GET", f"/tools/connections/{connection_id}") + assert conn.status_code == 200, conn.text + + authed_api("DELETE", f"/tools/connections/{connection_id}")