Skip to content

[feat] Gateway triggers: Composio event ingress, subscriptions, and UI#4749

Open
jp-agenta wants to merge 5 commits into
mainfrom
gateway-triggers-all
Open

[feat] Gateway triggers: Composio event ingress, subscriptions, and UI#4749
jp-agenta wants to merge 5 commits into
mainfrom
gateway-triggers-all

Conversation

@jp-agenta

Copy link
Copy Markdown
Member

Context

The inbound dual of webhooks. Webhooks turn Agenta events into outbound HTTP calls; this turns inbound external provider events (via Composio) into Agenta workflow runs. Nothing in the platform previously turned an external event into a workflow invocation.

This PR collapses the former WP0–WP6 stack (+ design docs) into a single change.

What this adds

  • Shared connections domaincore/gateway/connections + dbs/postgres/gateway/connections, a routerless domain backed by the renamed gateway_connections table, reused by both tools and triggers (OAuth initiate/callback/refresh/revoke).
  • Triggers domain — event catalog (Composio), subscriptions, and deliveries (core/triggers, dbs/postgres/triggers, apis/fastapi/triggers).
  • Ingress + dispatch — a global POST /triggers/composio/events endpoint: HMAC-SHA256 signature verification, 202 ack-fast, enqueue to a TaskIQ worker that resolves the subscription, dedups on event_id, maps inputs, and invokes the bound workflow, recording one delivery row.
  • Web UI — triggers catalog browse + shared-connection management, and subscriptions/deliveries management (web/oss + @agenta packages).
  • Hostingworker-triggers mounted in all compose stacks.
  • Tests — unit (resolver, HMAC signature verification, dispatcher branches) and acceptance (catalog, subscriptions, ingress); Composio-dependent acceptance tests gate on the server's reported state, not a local env var.
  • Docs — full design set under docs/designs/gateway-triggers/.

Notes

  • ProviderNotFoundError maps to 404 at the router boundary; connection_data and catalog list fields use Field(default_factory=...).
  • Composio-disabled is the no-adapter case → empty catalog; the acceptance test skips when the API reports Composio enabled.

What to QA

  • With Composio configured: browse the catalog, create a subscription bound to a workflow, fire a provider event, confirm a delivery row + workflow run.
  • Unknown provider key → 404. Forged webhook signature → 401.

🤖 Generated with Claude Code

Inbound dual of webhooks: turn external provider events into Agenta workflow runs. Adds a shared routerless connections domain (core/gateway/connections), a triggers domain (event catalog, subscriptions, deliveries), a global Composio ingress endpoint with HMAC verification + async dispatch worker, and the web UI for catalog browse and subscription/delivery management. Includes design docs and unit/acceptance tests.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
@dosubot dosubot Bot added the size:XXL This PR changes 1000+ lines, ignoring generated files. label Jun 19, 2026
@vercel

vercel Bot commented Jun 19, 2026

Copy link
Copy Markdown

The latest updates on your projects. Learn more about Vercel for GitHub.

Project Deployment Actions Updated (UTC)
agenta-documentation Ready Ready Preview, Comment Jun 19, 2026 6:14pm

Request Review

@coderabbitai

coderabbitai Bot commented Jun 19, 2026

Copy link
Copy Markdown

Review Change Stack

No actionable comments were generated in the recent review. 🎉

ℹ️ Recent review info
⚙️ Run configuration

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro Plus

Run ID: 526e8dbc-48d9-4e09-9718-05f37c359fc2

📥 Commits

Reviewing files that changed from the base of the PR and between ce43b26 and c5c5206.

📒 Files selected for processing (4)
  • api/oss/src/core/triggers/exceptions.py
  • api/oss/src/core/triggers/service.py
  • api/oss/src/dbs/postgres/triggers/dao.py
  • web/packages/agenta-entity-ui/src/gatewayTrigger/drawers/TriggerSubscriptionDrawer.tsx
🚧 Files skipped from review as they are similar to previous changes (3)
  • api/oss/src/core/triggers/exceptions.py
  • api/oss/src/dbs/postgres/triggers/dao.py
  • api/oss/src/core/triggers/service.py

📝 Walkthrough

Summary by CodeRabbit

  • New Features

    • Added the Gateway Triggers subsystem for inbound provider events, including trigger connections, catalog browsing, subscriptions, and delivery audit/history.
    • Added a Triggers settings page with drawers to manage subscriptions and view deliveries.
    • Introduced a background worker and Redis-based dispatch flow for verified event ingestion and workflow invocation.
    • Enabled Triggers permissions (view/edit/run) with new EE-gated contracts and endpoints.
  • Documentation

    • Added/expanded Gateway Triggers design and implementation documentation (architecture, mapping, and execution plan).

Walkthrough

This PR adds shared gateway connections, trigger catalogs, subscriptions, deliveries, ingress, dispatch, worker/runtime wiring, trigger settings UI, webhook naming updates, and expanded GitButler workflow docs.

Changes

Gateway triggers and shared connections

Layer / File(s) Summary
Shared connections core
api/oss/src/core/gateway/connections/*, api/oss/src/core/gateway/catalog/*, api/oss/src/core/tools/*, api/oss/src/dbs/postgres/gateway/connections/*, api/oss/databases/postgres/migrations/...002...
Connection persistence, catalog access, and provider auth move into a shared gateway connections domain and tools delegate to it.
Triggers service, router, and persistence
api/oss/src/core/triggers/*, api/oss/src/dbs/postgres/triggers/*, api/oss/databases/postgres/migrations/...003..., api/oss/src/apis/fastapi/triggers/*, api/entrypoints/routers.py, api/oss/src/tasks/.../triggers/*, api/entrypoints/worker_triggers.py, api/entrypoints/dispatcher_composio.py, api/oss/src/middlewares/auth.py, api/oss/src/utils/env.py, sdks/python/agenta/sdk/utils/resolvers.py, api/oss/src/core/webhooks/delivery.py, api/oss/tests/pytest/...
Trigger permissions, provider adapters, subscriptions, deliveries, ingress, dispatch, worker startup, and supporting tests/runtime wiring are added.
Trigger settings UI
web/packages/agenta-entities/src/gatewayTrigger/*, web/packages/agenta-entity-ui/src/gatewayTrigger/*, web/oss/src/components/pages/settings/Triggers/*, web/oss/src/components/Sidebar/SettingsSidebar.tsx, web/oss/src/pages/.../settings/index.tsx, web/packages/agenta-entities/tests/unit/gatewayTriggerApi.test.ts
Frontend entities, hooks, drawers, and settings pages add trigger catalog browsing, shared connection views, subscription editing, and delivery history.
Tool renames and consumer rewiring
web/packages/agenta-entities/src/gatewayTool/*, web/packages/agenta-entity-ui/src/gatewayTool/*, web/oss/src/components/DrillInView/OSSdrillInUIProvider.tsx, web/oss/src/components/pages/settings/Tools/*
Gateway-tool exports are renamed to tool* identifiers and the OSS consumers switch to the renamed hooks, atoms, and API wrappers.
Webhook state and UI rename
web/oss/src/services/webhooks/types.ts, web/oss/src/state/webhooks/*, web/oss/src/components/Webhooks/*, web/oss/src/styles/globals.css, web/_reference/agenta-sdk/src/types.ts
Webhook-specific types, atoms, cache keys, schema names, and components replace automation-oriented names.
GitButler workflow docs
AGENTS.md
GitButler instructions add detailed stack, lane, stash, push, and PR-base guidance.
Gateway triggers design set
docs/designs/gateway-triggers/*
Design, plan, work-package, and status documents describe the shared-connections and triggers rollout.
Ancillary tests and config
api/oss/tests/pytest/unit/services/test_db_manager.py, api/pytest.ini
A workspace-selection test changes its expected role behavior, and pytest ignores one importlib metadata deprecation warning.

Sequence Diagram(s)

sequenceDiagram
  participant Composio
  participant TriggersRouter
  participant TriggersWorker
  participant TriggersDispatcher
  participant WorkflowsService

  Composio->>TriggersRouter: POST signed trigger event
  TriggersRouter->>TriggersRouter: verify signature and parse metadata
  TriggersRouter->>TriggersWorker: enqueue trigger_id, event_id, event
  TriggersWorker->>TriggersDispatcher: dispatch(...)
  TriggersDispatcher->>WorkflowsService: invoke_workflow(...)
Loading

Estimated code review effort

🎯 5 (Critical) | ⏱️ ~120 minutes

Possibly related PRs

  • Agenta-AI/agenta#4338: This PR also updates OSS consumers to use the renamed gateway-tool entity exports and hooks.
  • Agenta-AI/agenta#4748: This PR also changes the get_default_workspace_id unit test expectation in api/oss/tests/pytest/unit/services/test_db_manager.py.
🚥 Pre-merge checks | ✅ 4 | ❌ 1

❌ Failed checks (1 warning)

Check name Status Explanation Resolution
Docstring Coverage ⚠️ Warning Docstring coverage is 20.26% which is insufficient. The required threshold is 60.00%. Write docstrings for the functions missing them to satisfy the coverage threshold.
✅ Passed checks (4 passed)
Check name Status Explanation
Title check ✅ Passed The PR title clearly and specifically describes the main change: implementing gateway triggers with Composio event ingress, subscriptions, and UI components.
Description check ✅ Passed The PR description is comprehensive and directly related to the changeset. It explains the context (inbound dual of webhooks), what is being added (connections domain, triggers domain, ingress/dispatch, UI, hosting, tests, docs), and provides QA guidance.
Linked Issues check ✅ Passed Check skipped because no linked issues were found for this pull request.
Out of Scope Changes check ✅ Passed Check skipped because no linked issues were found for this pull request.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing Touches
📝 Generate docstrings
  • Create stacked PR
  • Commit on current branch
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Commit unit tests in branch gateway-triggers-all

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

@github-actions

github-actions Bot commented Jun 19, 2026

Copy link
Copy Markdown
Contributor

Railway Preview Environment

Preview URL https://gateway-production-ef02.up.railway.app/w
Image tag pr-4749-6041a41
Status Failed
Railway logs Open logs
Logs View workflow run
Updated at 2026-06-19T18:21:24.286Z

get_default_workspace_id no longer prefers owner-role (multi-org: an invitee owns their own empty personal workspace). Assert oldest membership wins regardless of role.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
…g in tests

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>

@coderabbitai coderabbitai Bot left a comment

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 11

Note

Due to the large number of review comments, Critical, Major severity comments were prioritized as inline comments.

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (3)
api/ee/tests/pytest/acceptance/triggers/test_triggers_subscriptions.py (1)

182-227: ⚠️ Potential issue | 🟠 Major | ⚡ Quick win

Guarantee connection cleanup even on test failure.

If any assertion fails before Line 226, the connection cleanup is skipped, which can leak external/provider state and make later acceptance runs flaky. Move cleanup to a finally block and assert the delete response.

Suggested fix
 def test_create_list_disable_delete_keeps_connection(self, triggers_api):
     connection_id = self._create_connection(triggers_api)
-
-    create = triggers_api(
-        "POST",
-        "/triggers/subscriptions/",
-        json={
-            "subscription": {
-                "name": f"sub-{uuid4().hex[:8]}",
-                "connection_id": connection_id,
-                "data": {
-                    "event_key": "GITHUB_STAR_ADDED_EVENT",
-                    "trigger_config": {},
-                    "inputs_fields": {"repo": "$.event.data.repository"},
-                    "references": {"workflow": {"slug": "triage"}},
-                },
-            }
-        },
-    )
-    assert create.status_code == 200, create.text
-    sub = create.json()["subscription"]
-    subscription_id = sub["id"]
-    assert sub["connection_id"] == connection_id
-    assert sub["data"]["ti_id"] is not None
-
-    listing = triggers_api("GET", "/triggers/subscriptions/").json()
-    assert any(s["id"] == subscription_id for s in listing["subscriptions"])
-
-    revoke = triggers_api(
-        "POST", f"/triggers/subscriptions/{subscription_id}/revoke"
-    )
-    assert revoke.status_code == 200, revoke.text
-    assert revoke.json()["subscription"]["enabled"] is False
-
-    delete = triggers_api("DELETE", f"/triggers/subscriptions/{subscription_id}")
-    assert delete.status_code == 204
-
-    fetch = triggers_api("GET", f"/triggers/subscriptions/{subscription_id}")
-    assert fetch.status_code == 404
-
-    # C7: deleting the subscription must NOT delete/revoke the connection.
-    conn = triggers_api("GET", f"/tools/connections/{connection_id}")
-    assert conn.status_code == 200, conn.text
-
-    triggers_api("DELETE", f"/tools/connections/{connection_id}")
+    try:
+        create = triggers_api(
+            "POST",
+            "/triggers/subscriptions/",
+            json={
+                "subscription": {
+                    "name": f"sub-{uuid4().hex[:8]}",
+                    "connection_id": connection_id,
+                    "data": {
+                        "event_key": "GITHUB_STAR_ADDED_EVENT",
+                        "trigger_config": {},
+                        "inputs_fields": {"repo": "$.event.data.repository"},
+                        "references": {"workflow": {"slug": "triage"}},
+                    },
+                }
+            },
+        )
+        assert create.status_code == 200, create.text
+        sub = create.json()["subscription"]
+        subscription_id = sub["id"]
+        assert sub["connection_id"] == connection_id
+        assert sub["data"]["ti_id"] is not None
+
+        listing = triggers_api("GET", "/triggers/subscriptions/").json()
+        assert any(s["id"] == subscription_id for s in listing["subscriptions"])
+
+        revoke = triggers_api("POST", f"/triggers/subscriptions/{subscription_id}/revoke")
+        assert revoke.status_code == 200, revoke.text
+        assert revoke.json()["subscription"]["enabled"] is False
+
+        delete = triggers_api("DELETE", f"/triggers/subscriptions/{subscription_id}")
+        assert delete.status_code == 204
+
+        fetch = triggers_api("GET", f"/triggers/subscriptions/{subscription_id}")
+        assert fetch.status_code == 404
+
+        conn = triggers_api("GET", f"/tools/connections/{connection_id}")
+        assert conn.status_code == 200, conn.text
+    finally:
+        cleanup = triggers_api("DELETE", f"/tools/connections/{connection_id}")
+        assert cleanup.status_code == 204, cleanup.text
api/oss/tests/pytest/acceptance/triggers/test_triggers_subscriptions.py (1)

108-156: ⚠️ Potential issue | 🟠 Major | ⚡ Quick win

Make lifecycle cleanup unconditional.

Cleanup at Line 155 only runs on the happy path. A mid-test failure leaves shared connection state behind, which can pollute subsequent acceptance runs. Move deletion to finally and assert cleanup success.

Suggested fix
 def test_create_list_disable_delete_keeps_connection(self, authed_api):
     connection_id = self._create_connection(authed_api)
-
-    # CREATE — binds the event to a workflow reference on the shared connection
-    create = authed_api(
-        "POST",
-        "/triggers/subscriptions/",
-        json={
-            "subscription": {
-                "name": f"sub-{uuid4().hex[:8]}",
-                "connection_id": connection_id,
-                "data": {
-                    "event_key": "GITHUB_STAR_ADDED_EVENT",
-                    "trigger_config": {},
-                    "inputs_fields": {"repo": "$.event.data.repository"},
-                    "references": {"workflow": {"slug": "triage"}},
-                },
-            }
-        },
-    )
-    assert create.status_code == 200, create.text
-    sub = create.json()["subscription"]
-    subscription_id = sub["id"]
-    assert sub["connection_id"] == connection_id
-    assert sub["data"]["ti_id"] is not None
-    assert sub["enabled"] is True
-
-    # LIST
-    listing = authed_api("GET", "/triggers/subscriptions/").json()
-    assert any(s["id"] == subscription_id for s in listing["subscriptions"])
-
-    # DISABLE (revoke the subscription, not the connection)
-    revoke = authed_api("POST", f"/triggers/subscriptions/{subscription_id}/revoke")
-    assert revoke.status_code == 200, revoke.text
-    assert revoke.json()["subscription"]["enabled"] is False
-
-    # DELETE
-    delete = authed_api("DELETE", f"/triggers/subscriptions/{subscription_id}")
-    assert delete.status_code == 204
-
-    fetch = authed_api("GET", f"/triggers/subscriptions/{subscription_id}")
-    assert fetch.status_code == 404
-
-    # C7: deleting the subscription must NOT delete/revoke the connection.
-    conn = authed_api("GET", f"/tools/connections/{connection_id}")
-    assert conn.status_code == 200, conn.text
-
-    authed_api("DELETE", f"/tools/connections/{connection_id}")
+    try:
+        create = authed_api(
+            "POST",
+            "/triggers/subscriptions/",
+            json={
+                "subscription": {
+                    "name": f"sub-{uuid4().hex[:8]}",
+                    "connection_id": connection_id,
+                    "data": {
+                        "event_key": "GITHUB_STAR_ADDED_EVENT",
+                        "trigger_config": {},
+                        "inputs_fields": {"repo": "$.event.data.repository"},
+                        "references": {"workflow": {"slug": "triage"}},
+                    },
+                }
+            },
+        )
+        assert create.status_code == 200, create.text
+        sub = create.json()["subscription"]
+        subscription_id = sub["id"]
+        assert sub["connection_id"] == connection_id
+        assert sub["data"]["ti_id"] is not None
+        assert sub["enabled"] is True
+
+        listing = authed_api("GET", "/triggers/subscriptions/").json()
+        assert any(s["id"] == subscription_id for s in listing["subscriptions"])
+
+        revoke = authed_api("POST", f"/triggers/subscriptions/{subscription_id}/revoke")
+        assert revoke.status_code == 200, revoke.text
+        assert revoke.json()["subscription"]["enabled"] is False
+
+        delete = authed_api("DELETE", f"/triggers/subscriptions/{subscription_id}")
+        assert delete.status_code == 204
+
+        fetch = authed_api("GET", f"/triggers/subscriptions/{subscription_id}")
+        assert fetch.status_code == 404
+
+        conn = authed_api("GET", f"/tools/connections/{connection_id}")
+        assert conn.status_code == 200, conn.text
+    finally:
+        cleanup = authed_api("DELETE", f"/tools/connections/{connection_id}")
+        assert cleanup.status_code == 204, cleanup.text
docs/designs/gateway-triggers/proposal.md (1)

285-289: ⚠️ Potential issue | 🟡 Minor | ⚡ Quick win

Remove the empty quoted line.

The blank > line before the next heading trips MD028 and will keep docs lint failing.

As per the markdownlint hint, this blockquote needs to stay contiguous.

🛠️ Proposed fix
 > **Consequence — cross-domain revoke.** Because `ca_*` is shared, revoking it affects
 > both tools actions and trigger subscriptions on it. Lean: revoke-for-everyone + show
 > usage; deleting a subscription must not revoke the connection. Connect once, used
 > everywhere — the inverse of the connect-twice cost that rejected option B carried.
->
 
 ### The workflow dispatch seam

Source: Linters/SAST tools

🟡 Minor comments (16)
web/packages/agenta-entities/src/gatewayTrigger/hooks/useTriggerSubscriptions.ts-54-58 (1)

54-58: ⚠️ Potential issue | 🟡 Minor

Gate loading state for empty connectionId to match the pattern in useTriggerDeliveries.

Line 57 exposes loading while the query is disabled. When connectionId is empty, the query is disabled (enabled: !!connectionId at line 42) but isLoading still reflects query.isPending. Apply the same gating pattern used in useTriggerDeliveries to return false when the query is disabled:

Suggested fix
     return {
         subscriptions,
         count: query.data?.count ?? 0,
-        isLoading: query.isPending,
+        isLoading: connectionId ? query.isPending : false,
         error: query.error,
     }
 }
web/packages/agenta-entities/src/gatewayTrigger/hooks/useTriggerConnections.ts-60-64 (1)

60-64: ⚠️ Potential issue | 🟡 Minor

Gate loading state when integrationKey is empty.

Line 63 can surface a loading state even when the query is intentionally disabled (line 48). Mirror the useTriggerDeliveries hook pattern and gate isLoading by the input key.

Suggested fix
 export const useTriggerIntegrationConnections = (integrationKey: string) => {
     const query = useAtomValue(triggerIntegrationConnectionsAtomFamily(integrationKey))

     const connections = useMemo<TriggerConnection[]>(
         () => query.data?.connections ?? [],
         [query.data?.connections],
     )

     return {
         connections,
         count: query.data?.count ?? 0,
-        isLoading: query.isPending,
+        isLoading: integrationKey ? query.isPending : false,
         error: query.error,
     }
 }
web/packages/agenta-entities/src/gatewayTrigger/hooks/useTriggerEvent.ts-32-35 (1)

32-35: ⚠️ Potential issue | 🟡 Minor

Make isLoading conditional on both required keys.

Line 34 should be gated by integrationKey and eventKey so consumers don't get a loading signal when the query is disabled due to missing prerequisites.

Suggested fix
     return {
         event: query.data?.event ?? null,
-        isLoading: query.isPending,
+        isLoading: integrationKey && eventKey ? query.isPending : false,
         error: query.error,
     }
 }
web/oss/src/components/pages/settings/Triggers/components/GatewaySubscriptionsSection.tsx-248-248 (1)

248-248: ⚠️ Potential issue | 🟡 Minor | ⚡ Quick win

Avoid non-unique rowKey fallback.

Line 248 can return "", which can produce duplicate keys and unstable row identity when multiple records miss these fields.

Suggested fix
-                    rowKey={(record) => record.id ?? record.slug ?? record.data?.event_key ?? ""}
+                    rowKey={(record) =>
+                        record.id ??
+                        record.slug ??
+                        `${record.connection_id ?? "unknown"}:${record.data?.event_key ?? "unknown"}:${record.created_at ?? "unknown"}`
+                    }
web/packages/agenta-entity-ui/src/gatewayTrigger/drawers/TriggerSubscriptionDrawer.tsx-97-100 (1)

97-100: ⚠️ Potential issue | 🟡 Minor | ⚡ Quick win

Edit prefill uses raw workflow revision ID as UI label

Line 99 sets workflowLabel to the revision ID, so edit mode can display an opaque ID instead of the workflow display name/version format used elsewhere.

As per coding guidelines, display workflow names via workflowMolecule.selectors.artifactName(entityId) and use the prescribed label rules for workflow/variant naming.

Source: Coding guidelines

web/packages/agenta-entity-ui/src/gatewayTrigger/drawers/TriggerDeliveriesDrawer.tsx-51-55 (1)

51-55: ⚠️ Potential issue | 🟡 Minor | ⚡ Quick win

Status tag color ignores the status-code fallback

Line 51 already falls back to status.code, but Line 54 colors using status.type only. Code-only statuses can display the wrong semantic color.

Suggested fix
-const type = record.status?.type ?? record.status?.code
+const type = record.status?.type ?? record.status?.code
 return (
     <Tooltip title={record.status?.message ?? undefined}>
-        <Tag color={statusColor(record.status?.type)}>{type ?? "unknown"}</Tag>
+        <Tag color={statusColor(type)}>{type ?? "unknown"}</Tag>
     </Tooltip>
 )
web/oss/src/components/pages/settings/Triggers/components/GatewayTriggersSection.tsx-56-57 (1)

56-57: ⚠️ Potential issue | 🟡 Minor | ⚡ Quick win

Name column can render blank for valid rows

Line 56 falls back to slug, but if both name and slug are absent the cell is empty. Add integration_key as final fallback so every row remains identifiable.

Suggested fix
-<Typography.Text>{record.name || record.slug}</Typography.Text>
+<Typography.Text>{record.name || record.slug || record.integration_key}</Typography.Text>
web/packages/agenta-entity-ui/src/gatewayTrigger/drawers/TriggerSubscriptionDrawer.tsx-138-146 (1)

138-146: ⚠️ Potential issue | 🟡 Minor | ⚡ Quick win

Reject non-object JSON for inputs_fields

Line 140 accepts any valid JSON (including arrays/primitives), but inputs_fields is expected to be an object map. This can produce contract failures downstream.

Suggested fix
 let inputsFields: Record<string, unknown> = {}
 try {
-    inputsFields = inputsText.trim() ? JSON.parse(inputsText) : {}
+    const parsed = inputsText.trim() ? JSON.parse(inputsText) : {}
+    if (parsed === null || Array.isArray(parsed) || typeof parsed !== "object") {
+        throw new Error("Inputs mapping must be a JSON object")
+    }
+    inputsFields = parsed as Record<string, unknown>
     setInputsError(null)
 } catch {
     setInputsError("Invalid JSON")
     message.error("inputs mapping is not valid JSON")
     return
 }
api/oss/src/core/gateway/connections/dtos.py-112-118 (1)

112-118: ⚠️ Potential issue | 🟡 Minor | ⚡ Quick win

Use enum typing for ConnectionRequest.auth_scheme to keep validation consistent.

ConnectionCreateData.auth_scheme is enum-validated, but ConnectionRequest.auth_scheme is a plain str, which lets invalid schemes bypass early validation and fail later in adapter calls.

🔧 Proposed fix
 class ConnectionRequest(BaseModel):
@@
-    auth_scheme: Optional[str] = None
+    auth_scheme: Optional[ConnectionAuthScheme] = None
api/oss/src/apis/fastapi/triggers/router.py-376-377 (1)

376-377: ⚠️ Potential issue | 🟡 Minor | ⚡ Quick win

Validate limit at the API boundary.

Line 376 allows invalid values (e.g., 0 or negative), which can propagate upstream and surface as adapter failures instead of a clean client validation error.

Suggested fix
-        limit: Optional[int] = Query(default=None),
+        limit: Optional[int] = Query(default=None, ge=1, le=1000),
docs/designs/gateway-triggers/wp/WP4-specs.md-20-25 (1)

20-25: ⚠️ Potential issue | 🟡 Minor | ⚡ Quick win

Update WP4 ACK semantics from 200 to 202.

Line 22 and Line 54 document 200 for no-op/unset-secret, but this PR’s ingress contract is 202 Accepted. Keeping 200 here creates avoidable contract drift.

Suggested doc fix
-- HMAC-SHA256 verify over `{id}.{ts}.{body}` with `COMPOSIO_WEBHOOK_SECRET`; 401 bad sig;
-  200 no-op when secret unset; add `COMPOSIO_WEBHOOK_SECRET` to `env`.
+- HMAC-SHA256 verify over `{id}.{ts}.{body}` with `COMPOSIO_WEBHOOK_SECRET`; 401 bad sig;
+  202 no-op when secret unset; add `COMPOSIO_WEBHOOK_SECRET` to `env`.
@@
-- Forged signature → 401; unset secret → 200 no-op.
+- Forged signature → 401; unset secret → 202 no-op.

Also applies to: 54-55

docs/designs/gateway-triggers/wp/WP4-status.md-7-25 (1)

7-25: ⚠️ Potential issue | 🟡 Minor | ⚡ Quick win

Refresh WP4 status to match implemented work.

The document still marks WP4 as NOT STARTED, but this stack includes ingress, dispatch runtime, worker wiring, and tests. This stale status will confuse follow-on planning.

docs/designs/gateway-triggers/wp/WP0-status.md-15-18 (1)

15-18: ⚠️ Potential issue | 🟡 Minor | ⚡ Quick win

Align the connection path spelling.

This status file still uses the flat core/connections / dbs/postgres/connections names, but the spec and implementation cohort use the gateway/ subpaths. Keeping both spellings will send the next lane to the wrong package.

Based on the stack context, this PR already uses core/gateway/connections/ and dbs/postgres/gateway/connections/.

🛠️ Suggested alignment
- [x] `core/connections/` service + DAO interface + `ConnectionsService` + `ConnectionsGatewayInterface`
+ [x] `core/gateway/connections/` service + DAO interface + `ConnectionsService` + `ConnectionsGatewayInterface`

- [x] `dbs/postgres/connections/` DBE + DAO + mappings
+ [x] `dbs/postgres/gateway/connections/` DBE + DAO + mappings

Also applies to: 50-59

docs/designs/gateway-triggers/wp/WL-runbook.md-128-154 (1)

128-154: ⚠️ Potential issue | 🟡 Minor | ⚡ Quick win

Label the markdown example fence.

The unlabeled fence trips MD040; md is sufficient here.

As per the markdownlint hint, the docs PR body example needs a language tag.

🛠️ Proposed fix
-```
+```md

Source: Linters/SAST tools

docs/designs/gateway-triggers/wp/WP1-status.md-41-42 (1)

41-42: ⚠️ Potential issue | 🟡 Minor | ⚡ Quick win

Permission note conflicts with the WP1 frozen contract.

This says no VIEW_TRIGGERS and reuses VIEW_TOOLS, but WP1 specs freeze the opposite rule. Please reconcile this to avoid implementer confusion on authorization behavior.

docs/designs/gateway-triggers/wp/WP2-specs.md-29-33 (1)

29-33: ⚠️ Potential issue | 🟡 Minor | ⚡ Quick win

Frozen resolver return type is too narrow.

The spec locks -> dict, but current trigger dispatch logic already handles scalar/non-dict results from resolve_target_fields. Please widen the contract wording to avoid future mismatches.

🧹 Nitpick comments (9)
web/packages/agenta-entities/src/gatewayTrigger/api/client.ts (1)

5-19: ⚡ Quick win

Trim block comments to “why”-only notes.

These docblocks narrate implementation details (“what”) across many lines; please reduce to terse invariant-level comments only, or rely on naming/extraction.

As per coding guidelines, “Keep AI-generated in-code comments minimal; comment only the non-obvious why, never the what.”

Source: Coding guidelines

web/packages/agenta-entities/src/gatewayTrigger/api/api.ts (1)

1-12: ⚡ Quick win

Reduce explanatory prose comments in this module.

The section/header comments mostly restate behavior; please keep only short “why” notes where needed.

As per coding guidelines, comments should be minimal and explain non-obvious rationale, not implementation narration.

Also applies to: 42-42, 111-111, 133-133, 243-243

Source: Coding guidelines

web/packages/agenta-entities/tests/unit/gatewayTriggerApi.test.ts (1)

1-14: ⚡ Quick win

Shorten the top-of-file comment block.

Please keep only the non-obvious invariant/intent and drop step-by-step prose.

As per coding guidelines, comments should be terse and explain “why,” not narrate “what.”

Source: Coding guidelines

web/packages/agenta-entity-ui/src/gatewayTrigger/index.ts (1)

1-8: ⚡ Quick win

Condense the module docblock to a one-liner (or remove).

Current block explains behavior in detail; this can be inferred from exports and naming.

As per coding guidelines, keep in-code comments minimal and focused on subtle “why” context only.

Source: Coding guidelines

web/packages/agenta-entities/src/gatewayTrigger/core/types.ts (1)

1-15: ⚡ Quick win

Trim non-obvious-comment blocks to terse intent notes only.

These large narrative blocks describe what and migration context; please reduce to minimal “why” comments (or rely on symbol names/module docs) to match repo standards.

As per coding guidelines, “Keep AI-generated in-code comments minimal; comment only the non-obvious why … never the what.”

Also applies to: 90-95, 134-140, 235-239

Source: Coding guidelines

web/packages/agenta-entities/src/gatewayTrigger/hooks/useCatalogEvents.ts (1)

14-16: ⚡ Quick win

Unify catalog-search atom ownership to avoid split state.

eventsSearchAtom here overlaps conceptually with eventSearchAtom in state/atoms.ts; keeping two similarly named atoms for the same concern invites accidental desync across consumers. Prefer exporting one shared atom from a single module and reusing it here.

api/oss/tests/pytest/acceptance/triggers/test_triggers_ingress.py (1)

151-163: ⚡ Quick win

Make the dedup assertion deterministic and cleanup-safe.

Line 151 currently allows a false positive (<= 1) when the async dispatcher hasn’t written any row yet. Poll for completion (bounded) and then assert exactly one delivery; also run cleanup in finally with response checks.

Proposed test hardening
+from time import sleep
@@
-        # The dispatch is async; the dedup guard means at most one delivery row
-        # exists for this (subscription, event_id).
-        deliveries = authed_api(
-            "POST",
-            "/triggers/deliveries/query",
-            json={
-                "delivery": {"subscription_id": subscription_id, "event_id": event_id}
-            },
-        ).json()["deliveries"]
-        assert len(deliveries) <= 1
-
-        authed_api("DELETE", f"/triggers/subscriptions/{subscription_id}")
-        authed_api("DELETE", f"/tools/connections/{connection_id}")
+        try:
+            deliveries = []
+            for _ in range(20):
+                query = authed_api(
+                    "POST",
+                    "/triggers/deliveries/query",
+                    json={
+                        "delivery": {"subscription_id": subscription_id, "event_id": event_id}
+                    },
+                )
+                assert query.status_code == 200, query.text
+                deliveries = query.json()["deliveries"]
+                if deliveries:
+                    break
+                sleep(0.25)
+
+            assert len(deliveries) == 1
+        finally:
+            delete_sub = authed_api("DELETE", f"/triggers/subscriptions/{subscription_id}")
+            assert delete_sub.status_code in (200, 204), delete_sub.text
+            delete_conn = authed_api("DELETE", f"/tools/connections/{connection_id}")
+            assert delete_conn.status_code in (200, 204), delete_conn.text
api/oss/tests/pytest/unit/triggers/test_triggers_dispatcher.py (1)

129-149: ⚡ Quick win

Add coverage for the raised-exception branch in dispatch.

Current tests validate non-200 responses, but not the invoke_workflow exception path that should write a failed delivery and re-raise.

Suggested additional unit test
+import pytest
@@
 async def test_workflow_non_200_writes_failed_delivery():
@@
     assert delivery.status.code == "500"
+
+
+async def test_workflow_exception_writes_failed_delivery_and_reraises():
+    project_id = uuid4()
+    reference = Reference(slug="wf-1")
+    subscription = _make_subscription(references={"workflow": reference})
+    dao = _make_dao(resolved=(project_id, subscription))
+
+    workflows = MagicMock()
+    workflows.invoke_workflow = AsyncMock(side_effect=RuntimeError("boom"))
+    dispatcher = TriggersDispatcher(triggers_dao=dao, workflows_service=workflows)
+
+    with pytest.raises(RuntimeError, match="boom"):
+        await dispatcher.dispatch(trigger_id="ti_1", event_id="e1", event=_EVENT)
+
+    dao.write_delivery.assert_awaited_once()
+    delivery = dao.write_delivery.await_args.kwargs["delivery"]
+    assert delivery.status.code == "500"
api/oss/src/tasks/asyncio/triggers/dispatcher.py (1)

1-9: ⚡ Quick win

Trim narrative comments to “why-only” comments.

Several comments/docstrings here narrate behavior rather than a non-obvious invariant. Please reduce these to terse “why” notes (or rely on naming/extraction).

As per coding guidelines, "Keep AI-generated in-code comments minimal; comment only the non-obvious why ... never the what."

Also applies to: 109-130

Source: Coding guidelines


ℹ️ Review info
⚙️ Run configuration

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro Plus

Run ID: bb4b1c2d-a35f-4d60-ab97-8083ce9d0008

📥 Commits

Reviewing files that changed from the base of the PR and between a97e608 and f606d7a.

📒 Files selected for processing (125)
  • AGENTS.md
  • api/ee/src/core/access/permissions/types.py
  • api/ee/tests/pytest/acceptance/tools/__init__.py
  • api/ee/tests/pytest/acceptance/tools/test_tools_connections.py
  • api/ee/tests/pytest/acceptance/triggers/__init__.py
  • api/ee/tests/pytest/acceptance/triggers/test_triggers_catalog.py
  • api/ee/tests/pytest/acceptance/triggers/test_triggers_subscriptions.py
  • api/entrypoints/routers.py
  • api/entrypoints/worker_triggers.py
  • api/oss/databases/postgres/migrations/core_oss/versions/oss000000002_rename_tool_connections_to_gateway_connections.py
  • api/oss/databases/postgres/migrations/core_oss/versions/oss000000003_add_trigger_subscriptions_and_deliveries.py
  • api/oss/src/apis/fastapi/tools/models.py
  • api/oss/src/apis/fastapi/tools/router.py
  • api/oss/src/apis/fastapi/triggers/__init__.py
  • api/oss/src/apis/fastapi/triggers/models.py
  • api/oss/src/apis/fastapi/triggers/router.py
  • api/oss/src/core/gateway/__init__.py
  • api/oss/src/core/gateway/connections/__init__.py
  • api/oss/src/core/gateway/connections/dtos.py
  • api/oss/src/core/gateway/connections/exceptions.py
  • api/oss/src/core/gateway/connections/interfaces.py
  • api/oss/src/core/gateway/connections/providers/__init__.py
  • api/oss/src/core/gateway/connections/providers/composio/__init__.py
  • api/oss/src/core/gateway/connections/providers/composio/adapter.py
  • api/oss/src/core/gateway/connections/registry.py
  • api/oss/src/core/gateway/connections/service.py
  • api/oss/src/core/gateway/connections/utils.py
  • api/oss/src/core/tools/dtos.py
  • api/oss/src/core/tools/interfaces.py
  • api/oss/src/core/tools/providers/composio/adapter.py
  • api/oss/src/core/tools/service.py
  • api/oss/src/core/triggers/__init__.py
  • api/oss/src/core/triggers/dtos.py
  • api/oss/src/core/triggers/exceptions.py
  • api/oss/src/core/triggers/interfaces.py
  • api/oss/src/core/triggers/providers/__init__.py
  • api/oss/src/core/triggers/providers/composio/__init__.py
  • api/oss/src/core/triggers/providers/composio/adapter.py
  • api/oss/src/core/triggers/providers/composio/catalog.py
  • api/oss/src/core/triggers/registry.py
  • api/oss/src/core/triggers/service.py
  • api/oss/src/core/webhooks/delivery.py
  • api/oss/src/dbs/postgres/gateway/__init__.py
  • api/oss/src/dbs/postgres/gateway/connections/__init__.py
  • api/oss/src/dbs/postgres/gateway/connections/dao.py
  • api/oss/src/dbs/postgres/gateway/connections/dbes.py
  • api/oss/src/dbs/postgres/gateway/connections/mappings.py
  • api/oss/src/dbs/postgres/triggers/__init__.py
  • api/oss/src/dbs/postgres/triggers/dao.py
  • api/oss/src/dbs/postgres/triggers/dbas.py
  • api/oss/src/dbs/postgres/triggers/dbes.py
  • api/oss/src/dbs/postgres/triggers/mappings.py
  • api/oss/src/middlewares/auth.py
  • api/oss/src/tasks/asyncio/triggers/__init__.py
  • api/oss/src/tasks/asyncio/triggers/dispatcher.py
  • api/oss/src/tasks/taskiq/triggers/__init__.py
  • api/oss/src/tasks/taskiq/triggers/worker.py
  • api/oss/src/utils/env.py
  • api/oss/tests/pytest/acceptance/tools/test_tools_connections.py
  • api/oss/tests/pytest/acceptance/triggers/__init__.py
  • api/oss/tests/pytest/acceptance/triggers/test_triggers_catalog.py
  • api/oss/tests/pytest/acceptance/triggers/test_triggers_ingress.py
  • api/oss/tests/pytest/acceptance/triggers/test_triggers_subscriptions.py
  • api/oss/tests/pytest/unit/models/test_lifecycle_conventions.py
  • api/oss/tests/pytest/unit/triggers/__init__.py
  • api/oss/tests/pytest/unit/triggers/test_triggers_dispatcher.py
  • api/oss/tests/pytest/unit/triggers/test_triggers_signature.py
  • api/oss/tests/pytest/unit/webhooks/test_webhooks_tasks.py
  • docs/designs/gateway-triggers/gap.md
  • docs/designs/gateway-triggers/mapping.md
  • docs/designs/gateway-triggers/mimics.md
  • docs/designs/gateway-triggers/plan.md
  • docs/designs/gateway-triggers/proposal.md
  • docs/designs/gateway-triggers/research.md
  • docs/designs/gateway-triggers/wp/WL-runbook.md
  • docs/designs/gateway-triggers/wp/WP0-specs.md
  • docs/designs/gateway-triggers/wp/WP0-status.md
  • docs/designs/gateway-triggers/wp/WP1-specs.md
  • docs/designs/gateway-triggers/wp/WP1-status.md
  • docs/designs/gateway-triggers/wp/WP2-specs.md
  • docs/designs/gateway-triggers/wp/WP2-status.md
  • docs/designs/gateway-triggers/wp/WP3-specs.md
  • docs/designs/gateway-triggers/wp/WP3-status.md
  • docs/designs/gateway-triggers/wp/WP4-specs.md
  • docs/designs/gateway-triggers/wp/WP4-status.md
  • docs/designs/gateway-triggers/wp/WP5-specs.md
  • docs/designs/gateway-triggers/wp/WP5-status.md
  • docs/designs/gateway-triggers/wp/WP6-specs.md
  • docs/designs/gateway-triggers/wp/WP6-status.md
  • hosting/docker-compose/ee/docker-compose.dev.yml
  • hosting/docker-compose/ee/docker-compose.gh.local.yml
  • hosting/docker-compose/ee/docker-compose.gh.yml
  • hosting/docker-compose/oss/docker-compose.dev.yml
  • hosting/docker-compose/oss/docker-compose.gh.local.yml
  • hosting/docker-compose/oss/docker-compose.gh.ssl.yml
  • hosting/docker-compose/oss/docker-compose.gh.yml
  • sdks/python/agenta/sdk/utils/resolvers.py
  • sdks/python/oss/tests/pytest/unit/test_resolvers.py
  • web/oss/src/components/Sidebar/SettingsSidebar.tsx
  • web/oss/src/components/pages/settings/Triggers/Triggers.tsx
  • web/oss/src/components/pages/settings/Triggers/components/GatewaySubscriptionsSection.tsx
  • web/oss/src/components/pages/settings/Triggers/components/GatewayTriggersSection.tsx
  • web/oss/src/pages/w/[workspace_id]/p/[project_id]/settings/index.tsx
  • web/packages/agenta-entities/package.json
  • web/packages/agenta-entities/src/gatewayTrigger/api/api.ts
  • web/packages/agenta-entities/src/gatewayTrigger/api/client.ts
  • web/packages/agenta-entities/src/gatewayTrigger/api/index.ts
  • web/packages/agenta-entities/src/gatewayTrigger/core/index.ts
  • web/packages/agenta-entities/src/gatewayTrigger/core/types.ts
  • web/packages/agenta-entities/src/gatewayTrigger/hooks/index.ts
  • web/packages/agenta-entities/src/gatewayTrigger/hooks/useCatalogEvents.ts
  • web/packages/agenta-entities/src/gatewayTrigger/hooks/useTriggerConnections.ts
  • web/packages/agenta-entities/src/gatewayTrigger/hooks/useTriggerDeliveries.ts
  • web/packages/agenta-entities/src/gatewayTrigger/hooks/useTriggerEvent.ts
  • web/packages/agenta-entities/src/gatewayTrigger/hooks/useTriggerSubscription.ts
  • web/packages/agenta-entities/src/gatewayTrigger/hooks/useTriggerSubscriptions.ts
  • web/packages/agenta-entities/src/gatewayTrigger/index.ts
  • web/packages/agenta-entities/src/gatewayTrigger/state/atoms.ts
  • web/packages/agenta-entities/src/gatewayTrigger/state/index.ts
  • web/packages/agenta-entities/tests/unit/gatewayTriggerApi.test.ts
  • web/packages/agenta-entity-ui/package.json
  • web/packages/agenta-entity-ui/src/gatewayTrigger/drawers/TriggerDeliveriesDrawer.tsx
  • web/packages/agenta-entity-ui/src/gatewayTrigger/drawers/TriggerEventsDrawer.tsx
  • web/packages/agenta-entity-ui/src/gatewayTrigger/drawers/TriggerSubscriptionDrawer.tsx
  • web/packages/agenta-entity-ui/src/gatewayTrigger/index.ts
💤 Files with no reviewable changes (1)
  • api/oss/src/core/tools/dtos.py

Comment thread api/oss/src/apis/fastapi/triggers/router.py Outdated
Comment on lines +802 to +807
if self.dispatch_task is not None:
await self.dispatch_task.kiq(
trigger_id=str(trigger_id),
event_id=str(event_id),
event=envelope,
)

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major | ⚡ Quick win

Bound enqueue latency on the ingress request path.

Line 803 awaits broker enqueue directly with no timeout/error shaping. A slow or stuck broker call can pin request workers and degrade ingress availability.

Suggested fix
+import asyncio
@@
         if self.dispatch_task is not None:
-            await self.dispatch_task.kiq(
-                trigger_id=str(trigger_id),
-                event_id=str(event_id),
-                event=envelope,
-            )
+            try:
+                await asyncio.wait_for(
+                    self.dispatch_task.kiq(
+                        trigger_id=str(trigger_id),
+                        event_id=str(event_id),
+                        event=envelope,
+                    ),
+                    timeout=2.0,
+                )
+            except Exception as e:
+                log.error("Failed to enqueue trigger event: %s", e)
+                raise HTTPException(
+                    status_code=status.HTTP_503_SERVICE_UNAVAILABLE,
+                    detail="Failed to enqueue trigger event",
+                ) from e

Comment on lines +103 to +120
async def get_connection_status(
self,
*,
provider_connection_id: str,
) -> Dict[str, Any]:
"""Poll provider for updated connection status."""
...

@abstractmethod
async def refresh_connection(
self,
*,
provider_connection_id: str,
force: bool = False,
callback_url: Optional[str] = None,
integration_key: Optional[str] = None,
user_id: Optional[str] = None,
) -> Dict[str, Any]: ...

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion | 🟠 Major | 🏗️ Heavy lift

Replace raw Dict[str, Any] returns with explicit DTOs in gateway interface methods.

Using untyped dicts for get_connection_status and refresh_connection weakens the core contract and makes downstream handling brittle. Define dedicated response DTOs and return those from the interface.

As per coding guidelines, api/oss/src/{core,clients}/**/*.py: “Do NOT use Dict[str, Any] as a return type when you can define a proper model”.

Source: Coding guidelines

Comment on lines +40 to +49
def __init__(
self,
*,
adapter_registry: TriggersGatewayRegistry,
triggers_dao: Optional[TriggersDAOInterface] = None,
connections_service: Optional[ConnectionsService] = None,
):
self.adapter_registry = adapter_registry
self.dao = triggers_dao
self.connections_service = connections_service

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major | ⚡ Quick win

Constructor allows invalid None dependencies and later crashes.

Line 48 and Line 49 store nullable dependencies, but methods call self.dao / self.connections_service unconditionally. Instantiating with defaults produces runtime AttributeError paths.

Proposed fix
-    def __init__(
-        self,
-        *,
-        adapter_registry: TriggersGatewayRegistry,
-        triggers_dao: Optional[TriggersDAOInterface] = None,
-        connections_service: Optional[ConnectionsService] = None,
-    ):
+    def __init__(
+        self,
+        *,
+        adapter_registry: TriggersGatewayRegistry,
+        triggers_dao: TriggersDAOInterface,
+        connections_service: ConnectionsService,
+    ):
         self.adapter_registry = adapter_registry
         self.dao = triggers_dao
         self.connections_service = connections_service

Comment thread api/oss/src/core/triggers/service.py
Comment on lines +222 to +237
async def activate_connection_by_provider_id(
self,
*,
provider_connection_id: str,
project_id: Optional[UUID] = None,
) -> Optional[Connection]:
"""Set is_valid=True and is_active=True for the connection matching the provider ID."""
async with self.engine.session() as session:
stmt = select(self.ConnectionDBE).filter(
self.ConnectionDBE.data["connected_account_id"].astext
== provider_connection_id
)

if project_id is not None:
stmt = stmt.filter(self.ConnectionDBE.project_id == project_id)

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major | 🏗️ Heavy lift

Enforce tenant scope for provider-ID lookup and activation.

These methods allow read/update by provider_connection_id without mandatory project_id, which enables cross-project access patterns and weakens tenant isolation.

Suggested fix
 async def activate_connection_by_provider_id(
     self,
     *,
     provider_connection_id: str,
-    project_id: Optional[UUID] = None,
+    project_id: UUID,
 ) -> Optional[Connection]:
@@
-    stmt = select(self.ConnectionDBE).filter(
-        self.ConnectionDBE.data["connected_account_id"].astext
-        == provider_connection_id
-    )
-
-    if project_id is not None:
-        stmt = stmt.filter(self.ConnectionDBE.project_id == project_id)
+    stmt = select(self.ConnectionDBE).filter(
+        self.ConnectionDBE.project_id == project_id,
+        self.ConnectionDBE.data["connected_account_id"].astext
+        == provider_connection_id,
+    )
@@
 async def find_connection_by_provider_id(
     self,
     *,
+    project_id: UUID,
     provider_connection_id: str,
 ) -> Optional[Connection]:
@@
     stmt = (
         select(self.ConnectionDBE)
         .filter(
+            self.ConnectionDBE.project_id == project_id,
             self.ConnectionDBE.data["connected_account_id"].astext
             == provider_connection_id
         )
         .limit(1)
     )

As per coding guidelines, api/oss/src/dbs/postgres/**/dao.py: “Always enforce tenant scope (project_id minimum) in DAO reads and writes.”

Also applies to: 260-274

Source: Coding guidelines

Comment on lines +211 to +248
async def get_subscription_by_trigger_id(
self,
*,
trigger_id: str,
) -> Optional[TriggerSubscription]:
async with self.engine.session() as session:
stmt = (
select(TriggerSubscriptionDBE)
.filter(
TriggerSubscriptionDBE.data["ti_id"].astext == trigger_id,
)
.limit(1)
)

result = await session.execute(stmt)

subscription_dbe = result.scalars().first()

if not subscription_dbe:
return None

return map_subscription_dbe_to_dto(
subscription_dbe=subscription_dbe,
)

async def get_project_and_subscription_by_trigger_id(
self,
*,
trigger_id: str,
) -> Optional[Tuple[UUID, TriggerSubscription]]:
async with self.engine.session() as session:
stmt = (
select(TriggerSubscriptionDBE)
.filter(
TriggerSubscriptionDBE.data["ti_id"].astext == trigger_id,
)
.limit(1)
)

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major | 🏗️ Heavy lift

Enforce tenant scope for trigger-id subscription lookups.

Line 220 and Line 245 resolve subscriptions by ti_id without a tenant filter. This breaks the DAO-level tenant-scope invariant and can misroute data across projects if ids collide or are replayed.

As per coding guidelines, api/oss/src/dbs/postgres/**/dao.py: “Always enforce tenant scope (project_id minimum) in DAO reads and writes”.

Source: Coding guidelines

Comment on lines +107 to +113
# Preserve the provider ti_id even if the client omitted it on the full-PUT.
existing_ti_id = (subscription_dbe.data or {}).get("ti_id")
data = subscription.data
if data.ti_id is None and existing_ti_id is not None:
data = data.model_copy(update={"ti_id": existing_ti_id})

subscription_dbe.data = data.model_dump(mode="json", exclude_none=True)

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major | ⚡ Quick win

Prevent client-controlled overwrite of provider ti_id during edit.

On Line 110 and Line 113, a caller can submit data.ti_id and persist it. That allows rebinding a local subscription to an arbitrary provider trigger id, which is a cross-boundary identity integrity risk.

Proposed fix
-    existing_ti_id = (subscription_dbe.data or {}).get("ti_id")
-    data = subscription.data
-    if data.ti_id is None and existing_ti_id is not None:
-        data = data.model_copy(update={"ti_id": existing_ti_id})
+    existing_ti_id = (subscription_dbe.data or {}).get("ti_id")
+    data = subscription.data.model_copy(update={"ti_id": existing_ti_id})
 
     subscription_dbe.data = data.model_dump(mode="json", exclude_none=True)

Comment on lines +90 to +101
already_seen = await self.triggers_dao.dedup_seen(
project_id=project_id,
subscription_id=subscription.id,
event_id=event_id,
)
if already_seen:
log.info(
"[TRIGGERS DISPATCHER] Duplicate event %s for subscription %s — skipping",
event_id,
subscription.id,
)
return

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major | 🏗️ Heavy lift

Persisting failure before re-raise can short-circuit retries.

This flow records a failed delivery and then re-raises. With retry-on-error enabled in the TaskIQ worker, retry attempts can be skipped by the dedup guard, so transient failures may never get a real retry. Pick one policy: either retry first and persist only on terminal attempt, or persist and do not re-raise.

Also applies to: 166-176

Comment on lines +20 to +26
export function projectScopedParams(extra?: Record<string, unknown>) {
const projectId = getDefaultStore().get(projectIdAtom)
return {
params: {
...(projectId ? {project_id: projectId} : {}),
...(extra ?? {}),
},

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major | ⚡ Quick win

Prevent extra from overriding project_id.

projectScopedParams() currently lets extra.project_id overwrite the scoped value, which can bypass intended tenant scoping at call sites.

Suggested fix
 export function projectScopedParams(extra?: Record<string, unknown>) {
     const projectId = getDefaultStore().get(projectIdAtom)
     return {
         params: {
-            ...(projectId ? {project_id: projectId} : {}),
             ...(extra ?? {}),
+            ...(projectId ? {project_id: projectId} : {}),
         },
     }
 }

As per coding guidelines, project-scoped/shared-state wiring should preserve expected boundary contracts and avoid fragile config/query patterns.

📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
export function projectScopedParams(extra?: Record<string, unknown>) {
const projectId = getDefaultStore().get(projectIdAtom)
return {
params: {
...(projectId ? {project_id: projectId} : {}),
...(extra ?? {}),
},
export function projectScopedParams(extra?: Record<string, unknown>) {
const projectId = getDefaultStore().get(projectIdAtom)
return {
params: {
...(extra ?? {}),
...(projectId ? {project_id: projectId} : {}),
},
}
}

Source: Coding guidelines

Normalize the inbound provider envelope in the dispatcher into a stable
context (event.attributes + synthetic trigger_id/trigger_type/timestamp/
created_at), parallel to webhooks' event context. Resolve and complete the
bound workflow reference on subscription create/edit (the /deploy pattern) so
a variant id is resolved to a runnable revision. Align the drawer's mapping
suggestions + live preview to the same normalized shape.

Update trigger tests to the new shape and always-verify ingress; gate the
create-roundtrip acceptance tests on an ACTIVE connected account.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
body = r.json()
print(f" ✅ id={body.get('id')}")
print("\n Set this in your env so signature verification passes:")
print(f" COMPOSIO_WEBHOOK_SECRET={body.get('secret')}")
def one(i: int) -> str:
with httpx.Client(timeout=20, base_url=base, headers=headers) as c:
secret = _resolve_secret(c, cache, url, force=False)
print(f" container#{i}: secret={secret[:12]}…")

@coderabbitai coderabbitai Bot left a comment

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 14

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (4)
web/packages/agenta-entity-ui/src/gatewayTool/drawers/ConnectDrawer.tsx (1)

80-90: ⚠️ Potential issue | 🔴 Critical

Drawer mutation flow misses trigger cache invalidation.

After createToolConnection, both mutation paths (lines 90 and 119) call invalidateConnections() which only invalidates tool caches. Because connections are shared between tools and triggers (stored in the same gateway_connections rows), the trigger connection queries remain stale. Compare with TriggerConnectDrawer, which correctly invalidates both surfaces.

Suggested fix
 const invalidateConnections = useCallback(() => {
     queryClient.invalidateQueries({queryKey: ["tools", "connections"]})
     queryClient.invalidateQueries({queryKey: ["tools", "catalog"]})
+    queryClient.invalidateQueries({queryKey: ["triggers", "connections"]})
 }, [])
web/oss/src/components/pages/settings/Tools/hooks/useToolsConnections.ts (1)

22-35: ⚠️ Potential issue | 🟠 Major

Invalidate trigger connection cache on tool-side mutations.

The invalidate() callback (lines 22–35) and its use in handleCreate (line 56–58), handleDelete, and handleRefresh only invalidate ["tools", ...] keys. Since tools and triggers share the same gateway_connections rows, trigger screens can display stale connection state after these mutations until an unrelated refetch occurs. Other files in the codebase (useToolConnectionActions.ts, useTriggerConnectionActions.ts) correctly invalidate both surfaces.

Suggested fix
 const invalidate = useCallback(() => {
     queryClient.invalidateQueries({
         queryKey: ["tools", "connections"],
     })
+    queryClient.invalidateQueries({
+        queryKey: ["triggers", "connections"],
+    })
 }, [integrationKey])
web/oss/src/styles/globals.css (1)

384-443: ⚠️ Potential issue | 🟡 Minor | ⚡ Quick win

webhooks-table CSS rules appear disconnected from the rendered table.

These selectors only apply when the table (or an ancestor) has webhooks-table, but the provided Webhooks.tsx table markup doesn’t set that class. The column-width rules likely won’t take effect.

Suggested fix outside this file
-            <Table
+            <Table
+                className="webhooks-table"
                 columns={columns}
                 dataSource={webhooks ?? []}
                 loading={isLoading}
                 rowKey="id"
api/oss/src/apis/fastapi/triggers/router.py (1)

111-117: ⚠️ Potential issue | 🟠 Major

Support both /composio/events and /composio/events/ to prevent POST redirect failures.

FastAPI's default redirect_slashes=True issues HTTP 307 redirects when requests don't match the defined route's trailing-slash format. For POST webhook ingestion, some providers do not properly resend the request body after a 307 redirect, causing lost events.

Suggested fix
         self.router.add_api_route(
             "/composio/events/",
             self.ingest_composio_event,
             methods=["POST"],
             operation_id="ingest_composio_event",
             response_model=TriggerEventAck,
             status_code=status.HTTP_202_ACCEPTED,
         )
+        self.router.add_api_route(
+            "/composio/events",
+            self.ingest_composio_event,
+            methods=["POST"],
+            include_in_schema=False,
+        )
🧹 Nitpick comments (9)
api/ee/tests/pytest/acceptance/triggers/test_triggers_connections.py (1)

29-33: ⚡ Quick win

Use the shared API env object instead of os.getenv here.

This file introduces direct env access at Line 29; in API code/tests we should read config from the shared env object to keep config handling centralized.

Suggested change
-import os
+from oss.src.utils.env import env
...
-_COMPOSIO_ENABLED = bool(os.getenv("COMPOSIO_API_KEY"))
+_COMPOSIO_ENABLED = bool(env.composio.api_key)

As per coding guidelines: api/**/*.py: “Add new API environment variables to api/oss/src/utils/env.py and consume them via the shared env object instead of calling os.getenv(...) directly”.

Source: Coding guidelines

web/oss/src/components/Webhooks/WebhookDrawer.tsx (1)

58-67: ⚡ Quick win

Reuse a shared provider-detection helper instead of duplicating URL heuristics.

This edit path derives provider with inline hostname checks; the same concern is already handled elsewhere via shared helper logic. Consolidating avoids drift between pages.

hosting/docker-compose/ee/docker-compose.gh.local.yml (1)

516-545: 💤 Low value

Consider caching pip packages for faster container restarts.

The composio service runs pip install on every container start, which adds latency on restarts. For a dev-only tunnel service with restart: always, this is acceptable but could be optimized by building a custom image or using a volume-cached pip directory.

api/oss/src/core/triggers/utils.py (1)

45-68: 💤 Low value

Consider returning cached value on Composio failure instead of None.

When force_refresh=False and the cache is empty, if Composio fails (line 57-59), the method returns None. However, if there's a stale/expired cache entry that was evicted, users get no secret. Consider a fallback strategy or ensure callers handle None gracefully.

web/oss/src/services/webhooks/types.ts (1)

22-43: ⚡ Quick win

Split API payload types from draft form-state types.

WebhookFormValues currently models payload-style fields (event_types), while form consumers use draft fields (events, header_list), which is why downstream code needs as any. Define a dedicated draft type and use it in form-preview helpers to restore type safety.

Proposed direction
 interface WebhookFormValuesBase<P extends WebhookProvider = WebhookProvider> {
     provider: P
     name?: string
     event_types?: WebhookEventType[]
 }

+export interface WebhookDraftFields {
+    events?: WebhookEventType[]
+    header_list?: {key: string; value: string}[]
+}
+
 export interface WebhookConfigFormValues extends WebhookFormValuesBase<"webhook"> {
     url?: string
     headers?: Record<string, string>
     auth_mode?: "signature" | "authorization"
     auth_value?: string
 }

 export interface GitHubFormValues extends WebhookFormValuesBase<"github"> {
     github_sub_type?: GitHubDispatchType
     github_repo?: string
     github_pat?: string
     github_workflow?: string
     github_branch?: string
 }

 export type WebhookFormValues = WebhookConfigFormValues | GitHubFormValues
+export type WebhookDraftFormValues =
+    | (WebhookConfigFormValues & WebhookDraftFields)
+    | (GitHubFormValues & WebhookDraftFields)

Then type form-preview paths with WebhookDraftFormValues instead of WebhookFormValues.

web/packages/agenta-entities/src/gatewayTrigger/api/client.ts (1)

30-36: ⚡ Quick win

Trim this block comment to a one-line “why” note (or remove it).

The current comment narrates behavior that is already clear from the function body, which adds maintenance noise.

As per coding guidelines, “Keep AI-generated in-code comments minimal; comment only the non-obvious why … never the what.”

Source: Coding guidelines

api/oss/src/apis/fastapi/triggers/router.py (1)

136-159: 🏗️ Heavy lift

Use POST /query for searchable catalog list endpoints.

The new searchable/filterable catalog lists are exposed via GET query params; the router guideline requires query-style filtering via POST /query payload endpoints.

As per coding guidelines, “api/oss/src/apis/fastapi/**/router.py: Use POST /query for filtering/search with payload support.”

Also applies to: 603-614, 727-736

Source: Coding guidelines

api/entrypoints/dispatcher_composio.py (1)

70-70: 💤 Low value

httpx.Client is never closed.

The forward client is created but never explicitly closed. Since this script runs indefinitely, it's not a functional issue, but for resource hygiene consider using a context manager or calling forward.close() on exit. However, given this is a dev-only script that blocks forever on wait_forever(), this is acceptable as-is.

web/packages/agenta-entity-ui/src/gatewayTrigger/drawers/TriggerCatalogDrawer.tsx (1)

426-444: 💤 Low value

Event cards are hoverable but not actionable.

The event Card has hoverable prop but no onClick handler. If clicking an event should do something (e.g., show event details or select it for subscription), add the handler. If cards are purely informational, consider removing hoverable to avoid misleading UX affordance.


ℹ️ Review info
⚙️ Run configuration

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro Plus

Run ID: 2c857f25-e2d7-4af9-b561-b325285e1c10

📥 Commits

Reviewing files that changed from the base of the PR and between 338939c and ce43b26.

📒 Files selected for processing (116)
  • api/ee/tests/pytest/acceptance/triggers/test_triggers_connections.py
  • api/ee/tests/pytest/acceptance/triggers/test_triggers_subscriptions.py
  • api/entrypoints/dispatcher_composio.py
  • api/entrypoints/routers.py
  • api/oss/src/apis/fastapi/tools/router.py
  • api/oss/src/apis/fastapi/triggers/models.py
  • api/oss/src/apis/fastapi/triggers/router.py
  • api/oss/src/core/gateway/catalog/__init__.py
  • api/oss/src/core/gateway/catalog/dtos.py
  • api/oss/src/core/gateway/catalog/interfaces.py
  • api/oss/src/core/gateway/catalog/providers/__init__.py
  • api/oss/src/core/gateway/catalog/providers/composio/__init__.py
  • api/oss/src/core/gateway/catalog/providers/composio/adapter.py
  • api/oss/src/core/gateway/catalog/registry.py
  • api/oss/src/core/gateway/catalog/service.py
  • api/oss/src/core/gateway/connections/providers/composio/adapter.py
  • api/oss/src/core/gateway/providers/__init__.py
  • api/oss/src/core/gateway/providers/composio/__init__.py
  • api/oss/src/core/gateway/providers/composio/errors.py
  • api/oss/src/core/tools/dtos.py
  • api/oss/src/core/tools/providers/composio/adapter.py
  • api/oss/src/core/tools/service.py
  • api/oss/src/core/triggers/dtos.py
  • api/oss/src/core/triggers/interfaces.py
  • api/oss/src/core/triggers/providers/composio/adapter.py
  • api/oss/src/core/triggers/service.py
  • api/oss/src/core/triggers/utils.py
  • api/oss/src/middlewares/auth.py
  • api/oss/src/tasks/asyncio/triggers/dispatcher.py
  • api/oss/src/utils/env.py
  • api/oss/tests/manual/triggers/try_composio_triggers.py
  • api/oss/tests/pytest/acceptance/triggers/test_triggers_connections.py
  • api/oss/tests/pytest/acceptance/triggers/test_triggers_ingress.py
  • api/oss/tests/pytest/acceptance/triggers/test_triggers_subscriptions.py
  • api/oss/tests/pytest/unit/triggers/test_triggers_dispatcher.py
  • api/oss/tests/pytest/unit/triggers/test_triggers_signature.py
  • hosting/docker-compose/ee/docker-compose.dev.yml
  • hosting/docker-compose/ee/docker-compose.gh.local.yml
  • hosting/docker-compose/ee/docker-compose.gh.yml
  • hosting/docker-compose/oss/docker-compose.dev.yml
  • hosting/docker-compose/oss/docker-compose.gh.local.yml
  • hosting/docker-compose/oss/docker-compose.gh.ssl.yml
  • hosting/docker-compose/oss/docker-compose.gh.yml
  • hosting/docker-compose/run.sh
  • web/_reference/agenta-sdk/src/types.ts
  • web/oss/src/components/DrillInView/OSSdrillInUIProvider.tsx
  • web/oss/src/components/Playground/Components/PlaygroundVariantConfigPrompt/assets/GatewayToolsPanel.tsx
  • web/oss/src/components/Sidebar/SettingsSidebar.tsx
  • web/oss/src/components/Webhooks/Modals/DeleteWebhookModal.tsx
  • web/oss/src/components/Webhooks/Modals/SecretRevealModal.tsx
  • web/oss/src/components/Webhooks/RequestPreview.tsx
  • web/oss/src/components/Webhooks/WebhookDrawer.tsx
  • web/oss/src/components/Webhooks/WebhookFieldRenderer.tsx
  • web/oss/src/components/Webhooks/WebhookLogsTab.tsx
  • web/oss/src/components/Webhooks/assets/constants.ts
  • web/oss/src/components/Webhooks/assets/types.ts
  • web/oss/src/components/Webhooks/utils/buildPreviewRequest.ts
  • web/oss/src/components/Webhooks/utils/buildSubscription.ts
  • web/oss/src/components/Webhooks/utils/handleTestResult.ts
  • web/oss/src/components/Webhooks/widgets/AdvanceConfigWidget.tsx
  • web/oss/src/components/Webhooks/widgets/DispatchAlertWidget.tsx
  • web/oss/src/components/Webhooks/widgets/HeaderListWidget.tsx
  • web/oss/src/components/pages/settings/APIKeys/APIKeys.tsx
  • web/oss/src/components/pages/settings/Tools/components/GatewayToolsSection.tsx
  • web/oss/src/components/pages/settings/Tools/hooks/useIntegrationDetail.ts
  • web/oss/src/components/pages/settings/Tools/hooks/useToolsConnections.ts
  • web/oss/src/components/pages/settings/Tools/hooks/useToolsIntegrations.ts
  • web/oss/src/components/pages/settings/Triggers/components/GatewaySubscriptionsSection.tsx
  • web/oss/src/components/pages/settings/Triggers/components/GatewayTriggersSection.tsx
  • web/oss/src/components/pages/settings/Webhooks/Webhooks.tsx
  • web/oss/src/pages/w/[workspace_id]/p/[project_id]/settings/index.tsx
  • web/oss/src/services/webhooks/api.ts
  • web/oss/src/services/webhooks/types.ts
  • web/oss/src/state/automations/state.ts
  • web/oss/src/state/webhooks/atoms.ts
  • web/oss/src/state/webhooks/state.ts
  • web/oss/src/styles/globals.css
  • web/packages/agenta-entities/src/gatewayTool/api/api.ts
  • web/packages/agenta-entities/src/gatewayTool/api/index.ts
  • web/packages/agenta-entities/src/gatewayTool/hooks/index.ts
  • web/packages/agenta-entities/src/gatewayTool/hooks/useToolActionDetail.ts
  • web/packages/agenta-entities/src/gatewayTool/hooks/useToolCatalogActions.ts
  • web/packages/agenta-entities/src/gatewayTool/hooks/useToolCatalogIntegrations.ts
  • web/packages/agenta-entities/src/gatewayTool/hooks/useToolConnectionActions.ts
  • web/packages/agenta-entities/src/gatewayTool/hooks/useToolConnectionQuery.ts
  • web/packages/agenta-entities/src/gatewayTool/hooks/useToolConnectionsQuery.ts
  • web/packages/agenta-entities/src/gatewayTool/hooks/useToolIntegrationConnections.ts
  • web/packages/agenta-entities/src/gatewayTool/hooks/useToolIntegrationDetail.ts
  • web/packages/agenta-entities/src/gatewayTool/index.ts
  • web/packages/agenta-entities/src/gatewayTool/state/atoms.ts
  • web/packages/agenta-entities/src/gatewayTool/state/index.ts
  • web/packages/agenta-entities/src/gatewayTrigger/api/api.ts
  • web/packages/agenta-entities/src/gatewayTrigger/api/client.ts
  • web/packages/agenta-entities/src/gatewayTrigger/api/index.ts
  • web/packages/agenta-entities/src/gatewayTrigger/core/types.ts
  • web/packages/agenta-entities/src/gatewayTrigger/hooks/index.ts
  • web/packages/agenta-entities/src/gatewayTrigger/hooks/useTriggerCatalogEvents.ts
  • web/packages/agenta-entities/src/gatewayTrigger/hooks/useTriggerCatalogIntegrations.ts
  • web/packages/agenta-entities/src/gatewayTrigger/hooks/useTriggerConnectionActions.ts
  • web/packages/agenta-entities/src/gatewayTrigger/hooks/useTriggerEvent.ts
  • web/packages/agenta-entities/src/gatewayTrigger/index.ts
  • web/packages/agenta-entities/src/gatewayTrigger/state/atoms.ts
  • web/packages/agenta-entities/src/gatewayTrigger/state/index.ts
  • web/packages/agenta-entities/src/index.ts
  • web/packages/agenta-entity-ui/src/gatewayTool/components/SchemaForm.tsx
  • web/packages/agenta-entity-ui/src/gatewayTool/drawers/CatalogDrawer.tsx
  • web/packages/agenta-entity-ui/src/gatewayTool/drawers/ConnectDrawer.tsx
  • web/packages/agenta-entity-ui/src/gatewayTool/drawers/ConnectionManagerDrawer.tsx
  • web/packages/agenta-entity-ui/src/gatewayTool/drawers/ToolExecutionDrawer.tsx
  • web/packages/agenta-entity-ui/src/gatewayTrigger/drawers/TriggerCatalogDrawer.tsx
  • web/packages/agenta-entity-ui/src/gatewayTrigger/drawers/TriggerConnectDrawer.tsx
  • web/packages/agenta-entity-ui/src/gatewayTrigger/drawers/TriggerDeliveriesDrawer.tsx
  • web/packages/agenta-entity-ui/src/gatewayTrigger/drawers/TriggerEventsDrawer.tsx
  • web/packages/agenta-entity-ui/src/gatewayTrigger/drawers/TriggerSubscriptionDrawer.tsx
  • web/packages/agenta-entity-ui/src/gatewayTrigger/index.ts
  • web/tests/tests/fixtures/base.fixture/providerHelpers/index.ts
💤 Files with no reviewable changes (1)
  • web/oss/src/state/automations/state.ts
✅ Files skipped from review due to trivial changes (5)
  • api/oss/src/core/gateway/catalog/providers/composio/init.py
  • web/packages/agenta-entity-ui/src/gatewayTool/components/SchemaForm.tsx
  • web/_reference/agenta-sdk/src/types.ts
  • web/packages/agenta-entities/src/index.ts
  • web/oss/src/components/pages/settings/APIKeys/APIKeys.tsx
🚧 Files skipped from review as they are similar to previous changes (11)
  • api/oss/src/middlewares/auth.py
  • web/packages/agenta-entities/src/gatewayTrigger/state/atoms.ts
  • web/packages/agenta-entity-ui/src/gatewayTrigger/drawers/TriggerDeliveriesDrawer.tsx
  • api/oss/src/core/triggers/interfaces.py
  • web/packages/agenta-entities/src/gatewayTrigger/hooks/useTriggerEvent.ts
  • web/oss/src/components/pages/settings/Triggers/components/GatewaySubscriptionsSection.tsx
  • api/oss/tests/pytest/acceptance/triggers/test_triggers_subscriptions.py
  • api/entrypoints/routers.py
  • api/ee/tests/pytest/acceptance/triggers/test_triggers_subscriptions.py
  • web/oss/src/components/Sidebar/SettingsSidebar.tsx
  • api/oss/src/tasks/asyncio/triggers/dispatcher.py

Comment on lines +135 to +160
def test_create_revoke_roundtrip(self, connections_api):
slug = f"acc-{uuid4().hex[:8]}"
create = connections_api(
"POST",
"/triggers/connections/",
json={
"connection": {
"slug": slug,
"provider_key": "composio",
"integration_key": "github",
"data": {"auth_scheme": "oauth"},
}
},
)
assert create.status_code == 200, create.text
connection_id = create.json()["connection"]["id"]

revoke = connections_api(
"POST", f"/triggers/connections/{connection_id}/revoke"
)
assert revoke.status_code == 200, revoke.text
assert revoke.json()["connection"]["flags"]["is_valid"] is False

delete = connections_api("DELETE", f"/triggers/connections/{connection_id}")
assert delete.status_code == 204, delete.text

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major | ⚡ Quick win

Always clean up created connections with finally in lifecycle tests.

At Line 137/170 and Line 200, resources are created but deletion only happens on the success path. If an assertion fails before delete, the test leaks provider-backed state and can make later runs flaky.

Suggested pattern
-        create = connections_api(...)
-        assert create.status_code == 200, create.text
-        connection_id = create.json()["connection"]["id"]
-        ...
-        delete = connections_api("DELETE", f"/triggers/connections/{connection_id}")
-        assert delete.status_code == 204, delete.text
+        connection_id = None
+        try:
+            create = connections_api(...)
+            assert create.status_code == 200, create.text
+            connection_id = create.json()["connection"]["id"]
+            ...
+        finally:
+            if connection_id:
+                connections_api("DELETE", f"/triggers/connections/{connection_id}")

Also applies to: 167-227

Comment on lines +22 to +30
async def list_integrations(
self,
*,
search: Optional[str] = None,
sort_by: Optional[str] = None,
limit: Optional[int] = None,
cursor: Optional[str] = None,
) -> Tuple[List[CatalogIntegration], Optional[str], int]:
"""Returns (items, next_cursor, total_items)."""

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion | 🟠 Major | 🏗️ Heavy lift

Replace tuple pagination output with a named DTO contract.

Returning (items, next_cursor, total_items) as a tuple makes the contract positional and brittle across adapters/callers. Use a typed DTO (e.g., CatalogIntegrationsPage) for explicit fields and safer cross-layer evolution.

As per coding guidelines, "Do NOT return tuples like (data, trace_id) from service or client methods; use a named DTO instead."

Source: Coding guidelines

Comment on lines +18 to +20
adapter = self._adapters.get(provider_key)
if not adapter:
raise ProviderNotFoundError(provider_key)

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor | ⚡ Quick win

Use explicit missing-key detection in registry lookup.

if not adapter can misclassify a valid adapter as missing if it is falsy; check for None explicitly.

Proposed fix
     def get(self, provider_key: str) -> CatalogGatewayInterface:
         adapter = self._adapters.get(provider_key)
-        if not adapter:
+        if adapter is None:
             raise ProviderNotFoundError(provider_key)
         return adapter

Comment on lines +44 to +53
async def list_integrations(
self,
*,
provider_key: str,
#
search: Optional[str] = None,
sort_by: Optional[str] = None,
limit: Optional[int] = None,
cursor: Optional[str] = None,
) -> Tuple[List[CatalogIntegration], Optional[str], int]:

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion | 🟠 Major | 🏗️ Heavy lift

Return a typed DTO from list_integrations instead of a tuple.

Core service methods should expose named DTO contracts; positional tuples are brittle at call sites and violate the service contract guideline.

As per coding guidelines, “Service methods must return typed DTOs … Do NOT return tuples like (data, trace_id) from service or client methods.”

Source: Coding guidelines

Comment on lines +139 to +143
existing = await self._get("/webhook_subscriptions")
items = existing.get("items", []) if isinstance(existing, dict) else []
if items:
return items[0]["secret"]

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major | ⚡ Quick win

Handle empty/malformed webhook subscription payloads before indexing.

items[0]["secret"] can raise IndexError/KeyError in both the initial read and 409 retry path, and those errors bypass the current httpx.HTTPError wrapper.

Suggested hardening
-            if items:
-                return items[0]["secret"]
+            if items and isinstance(items[0], dict) and items[0].get("secret"):
+                return items[0]["secret"]

@@
-            if resp.status_code == 409:
-                again = await self._get("/webhook_subscriptions")
-                return again["items"][0]["secret"]
+            if resp.status_code == 409:
+                again = await self._get("/webhook_subscriptions")
+                again_items = again.get("items", []) if isinstance(again, dict) else []
+                if again_items and isinstance(again_items[0], dict) and again_items[0].get("secret"):
+                    return again_items[0]["secret"]
+                raise AdapterError(
+                    provider_key="composio",
+                    operation="ensure_webhook_subscription",
+                    detail="Webhook subscription conflict, but no readable secret was returned",
+                )

Also applies to: 149-153

Comment on lines +20 to +24
_COMPOSIO_ENABLED = bool(os.getenv("COMPOSIO_API_KEY"))
_requires_composio = pytest.mark.skipif(
not _COMPOSIO_ENABLED,
reason="needs live Composio credentials (COMPOSIO_API_KEY)",
)

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion | 🟠 Major | ⚡ Quick win

Use the shared API env object instead of os.getenv in this test module.

Line 20 reads COMPOSIO_API_KEY directly via os.getenv(...); this should come from the shared env object for consistency with API config handling.

As per coding guidelines, "api/**/*.py: Add new API environment variables to api/oss/src/utils/env.py and consume them via the shared env object instead of calling os.getenv(...) directly."

Source: Coding guidelines

Comment on lines +48 to +71
def test_create_revoke_roundtrip(self, authed_api):
slug = f"acc-{uuid4().hex[:8]}"
create = authed_api(
"POST",
"/triggers/connections/",
json={
"connection": {
"slug": slug,
"provider_key": "composio",
"integration_key": "github",
"data": {"auth_scheme": "oauth"},
}
},
)
assert create.status_code == 200, create.text
connection_id = create.json()["connection"]["id"]

revoke = authed_api("POST", f"/triggers/connections/{connection_id}/revoke")
assert revoke.status_code == 200, revoke.text
assert revoke.json()["connection"]["flags"]["is_valid"] is False

delete = authed_api("DELETE", f"/triggers/connections/{connection_id}")
assert delete.status_code == 204, delete.text

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major | ⚡ Quick win

Ensure created live connections are cleaned up even when assertions fail.

Lines 48-71, 78-110, and 111-140 only delete the connection on the happy path. If an assertion fails first, the provider resource is left behind and can make subsequent acceptance runs flaky. Wrap each create/use flow in try/finally and best-effort delete when connection_id is set.

Also applies to: 78-110, 111-140

Comment on lines +112 to +113
@_requires_connected_account
class TestTriggerIngressDedup:

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor | ⚡ Quick win

Strengthen dedup test preconditions before signing events.

Line 112 gates only on COMPOSIO_TEST_CONNECTED_ACCOUNT, and Line 161 can return an empty secret. That can cause 401 failures unrelated to dedup behavior and make this acceptance test flaky.

Suggested patch
 `@_requires_connected_account`
+@_requires_composio
 class TestTriggerIngressDedup:
     def test_duplicate_event_id_writes_single_delivery(self, authed_api, unauthed_api):
@@
         timestamp = "1700000000"
         secret = _resolve_webhook_secret()
+        assert secret, "Composio webhook secret is required for signed ingress tests"
         headers = {
             "Content-Type": "application/json",
             "webhook-id": event_id,
             "webhook-timestamp": timestamp,
             "webhook-signature": _sign(secret, event_id, timestamp, body),
         }

Also applies to: 161-167

Comment on lines +549 to +555
image: python:3.13-slim-trixie
# === EXECUTION ============================================ #
command:
- bash
- -c
- "pip install --quiet --root-user-action=ignore composio httpx && python /app/dispatcher_composio.py"
# === STORAGE ============================================== #

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major | ⚡ Quick win

🧩 Analysis chain

🏁 Script executed:

cat -n hosting/docker-compose/oss/docker-compose.gh.yml | sed -n '540,560p'

Repository: Agenta-AI/agenta

Length of output: 1120


🏁 Script executed:

# Check for profile definitions and tunnel service setup
rg "profiles:|composio|tunnel" hosting/docker-compose/oss/docker-compose.gh.yml -A 3 -B 1

Repository: Agenta-AI/agenta

Length of output: 1740


🏁 Script executed:

# Check the full service definition for the tunnel/composio service
ast-grep --pattern 'composio-tunnel:' hosting/docker-compose/oss/docker-compose.gh.yml || rg "composio-tunnel:" -A 20 hosting/docker-compose/

Repository: Agenta-AI/agenta

Length of output: 42


🏁 Script executed:

# Check if with-tunnel is enabled by default in run.sh
rg "with-tunnel|profiles" hosting/docker-compose/run.sh -B 2 -A 2

Repository: Agenta-AI/agenta

Length of output: 361


🏁 Script executed:

# Check other docker-compose files for similar patterns or prebuilt images
fd "docker-compose" hosting/docker-compose/ -type f | head -10

Repository: Agenta-AI/agenta

Length of output: 231


🏁 Script executed:

# Check if there's a Dockerfile for composio or other services
fd "Dockerfile" hosting/docker-compose/ -type f

Repository: Agenta-AI/agenta

Length of output: 231


🏁 Script executed:

# Fix the fd command - check for docker-compose files
fd "docker-compose" hosting/docker-compose/ -t f | head -10

Repository: Agenta-AI/agenta

Length of output: 469


🏁 Script executed:

# Check for Dockerfiles
fd "Dockerfile" hosting/ -t f

Repository: Agenta-AI/agenta

Length of output: 509


🏁 Script executed:

# Check run.sh to see what WITH_TUNNEL default is
rg "WITH_TUNNEL" hosting/docker-compose/run.sh -B 3 -A 1

Repository: Agenta-AI/agenta

Length of output: 542


🏁 Script executed:

# Check if there's a base image or Dockerfile for the composio service
rg "composio" hosting/docker-compose/ -l

Repository: Agenta-AI/agenta

Length of output: 600


🏁 Script executed:

# Check if EE version has the same composio service
cat hosting/docker-compose/ee/docker-compose.gh.yml | grep -A 15 "composio:"

Repository: Agenta-AI/agenta

Length of output: 808


🏁 Script executed:

# Search for other runtime pip install patterns in docker-compose files
rg "pip install" hosting/docker-compose/ -B 2 -A 1

Repository: Agenta-AI/agenta

Length of output: 3064


🏁 Script executed:

# Check dispatcher_composio.py to understand dependencies
head -30 api/entrypoints/dispatcher_composio.py

Repository: Agenta-AI/agenta

Length of output: 1118


🏁 Script executed:

# Check if there's a requirements file for composio
fd "requirements.*composio" hosting/

Repository: Agenta-AI/agenta

Length of output: 42


Avoid unpinned runtime pip install in the composio service across all docker-compose files.

Installing composio and httpx at startup makes the stack non-deterministic and brittle (network failures, PyPI index issues, or upstream releases can break runs). With the tunnel enabled by default, this impacts every normal run.sh invocation. This pattern exists across all OSS and EE docker-compose files.

Prefer a prebuilt image or pinned versions (e.g., composio==0.X.X httpx==0.Y.Y) to ensure reproducible, deterministic startup.

Comment on lines +64 to +68
useEffect(() => {
if (loadedPages < targetPages && query.hasNextPage && !query.isFetchingNextPage) {
query.fetchNextPage()
}
}, [loadedPages, targetPages, query.hasNextPage, query.isFetchingNextPage, query.fetchNextPage])

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

🧩 Analysis chain

🏁 Script executed:

cd /repo && find . -name "useTriggerCatalogIntegrations.ts" -o -name "useTriggerCatalogEvents.ts"

Repository: Agenta-AI/agenta

Length of output: 116


🏁 Script executed:

cd /repo && head -100 web/packages/agenta-entities/src/gatewayTrigger/hooks/useTriggerCatalogIntegrations.ts

Repository: Agenta-AI/agenta

Length of output: 116


🏁 Script executed:

cd /repo && head -100 web/packages/agenta-entities/src/gatewayTrigger/hooks/useTriggerCatalogEvents.ts

Repository: Agenta-AI/agenta

Length of output: 116


🏁 Script executed:

find . -name "useTriggerCatalogIntegrations.ts" -o -name "useTriggerCatalogEvents.ts"

Repository: Agenta-AI/agenta

Length of output: 232


🏁 Script executed:

cat web/packages/agenta-entities/src/gatewayTrigger/hooks/useTriggerCatalogIntegrations.ts

Repository: Agenta-AI/agenta

Length of output: 2986


🏁 Script executed:

cat web/packages/agenta-entities/src/gatewayTrigger/hooks/useTriggerCatalogEvents.ts

Repository: Agenta-AI/agenta

Length of output: 3062


🌐 Web query:

TanStack Query useInfiniteQuery isError property API reference

💡 Result:

In TanStack Query, the useInfiniteQuery hook inherits the returned properties of the standard useQuery hook [1][2][3]. The isError property is a boolean that is true if the query has encountered an error [4]. When using useInfiniteQuery, you can access isError to determine if the query is in an error state. Because it shares the same base return type as useQuery, the standard pattern for error handling applies: const { isError, error } = useInfiniteQuery({... }); if (isError) { return

Error: {error.message}
; } Note that while useInfiniteQuery specifically manages multiple pages and provides additional error properties for background operations (such as isFetchNextPageError and isFetchPreviousPageError) [1][2], the base isError property remains the primary indicator for the overall query status. Key properties related to errors in useInfiniteQuery: - isError: boolean, true if the query has an error [4]. - error: The error object, if an error was thrown [4]. - isFetchNextPageError: boolean, true if the query failed while fetching the next page [1][2]. - isFetchPreviousPageError: boolean, true if the query failed while fetching the previous page [1][2]. - isRefetchError: boolean, true if the query failed while refetching [1][2].

Citations:


🏁 Script executed:

rg "query\.isError" web/packages/ --type ts --type tsx -A 2 -B 2

Repository: Agenta-AI/agenta

Length of output: 88


🏁 Script executed:

rg "isError" web/packages/agenta-entities/ --type ts --type tsx -c

Repository: Agenta-AI/agenta

Length of output: 88


🏁 Script executed:

rg "isError" web/packages/agenta-entities/ -A 2 -B 2

Repository: Agenta-AI/agenta

Length of output: 40122


🏁 Script executed:

rg "isFetchNextPageError" web/packages/agenta-entities/ -A 2 -B 2

Repository: Agenta-AI/agenta

Length of output: 42


🏁 Script executed:

rg "isFetchNextPageError|isFetchPreviousPageError" web/packages/agenta-entities/

Repository: Agenta-AI/agenta

Length of output: 42


Guard auto-prefetch on error to avoid repeated failing fetch cycles.

When fetchNextPage() errors, this effect can keep re-triggering while loadedPages < targetPages, causing repeated failing requests. Add query.isError to prevent retries on error state.

Suggested fix
     useEffect(() => {
-        if (loadedPages < targetPages && query.hasNextPage && !query.isFetchingNextPage) {
-            query.fetchNextPage()
+        if (
+            loadedPages >= targetPages ||
+            !query.hasNextPage ||
+            query.isFetchingNextPage ||
+            query.isError
+        ) {
+            return
         }
+        void query.fetchNextPage()
     }, [loadedPages, targetPages, query.hasNextPage, query.isFetchingNextPage, query.fetchNextPage])

Apply the same guard to useTriggerCatalogEvents.

Resolve the bound reference via the canonical
WorkflowsService.retrieve_workflow_revision (handles application/evaluator/
workflow + environment families) and rebuild the completed family with
build_retrieval_info, so invoke_workflow finds the service uri. Raise
TriggerReferenceInvalid when it cannot resolve. Skip soft-deleted
subscriptions in the ti_* resolver. FE: scope the picker to application
workflows and send the reference family by its true kind.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Backend feature size:XXL This PR changes 1000+ lines, ignoring generated files.

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants