[feat] Add Composio trigger ingress, async dispatch, and worker#4742
[feat] Add Composio trigger ingress, async dispatch, and worker#4742jp-agenta wants to merge 2 commits into
Conversation
|
The latest updates on your projects. Learn more about Vercel for GitHub.
|
|
Important Review skippedDraft detected. Please check the settings in the CodeRabbit UI or the ⚙️ Run configurationConfiguration used: Organization UI Review profile: CHILL Plan: Pro Plus Run ID: You can disable this status message by setting the Use the checkbox below for a quick retry:
✨ Finishing Touches🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
Inbound dual of webhooks: a global ingress endpoint POST /triggers/composio/events fast-ACKs and enqueues onto the queues:triggers Redis Stream; a dedicated worker_triggers process consumes it and the TriggersDispatcher invokes the bound workflow. - Ingress endpoint with HMAC-SHA256 signature verification against COMPOSIO_WEBHOOK_SECRET; whitelisted in auth middleware as public. - Async pipeline mirroring webhooks: Redis Streams broker + taskiq worker with retry_on_error and TRIGGER_MAX_RETRIES=5; dedup by event_id for idempotency. - Dispatcher attributes the run to the subscription creator (created_by_id) or null; binds the workflow key-agnostically from the references dict via the /retrieve selector shape. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
Cover credential-free pure logic the acceptance suite only exercises behind live Composio creds: HMAC signature verification and the dispatcher branches (unknown trigger, disabled, dedup, missing reference, success, non-200 failure). Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
5e9e904 to
40de507
Compare
fcd3cda to
ec3d94e
Compare
|
Superseded by #4749, which consolidates the entire gateway-triggers work (api + web + hosting + docs) into a single PR. |
Context
Trigger subscriptions can be created and stored (the lane below this one), but nothing yet turns an inbound provider event into a workflow run. This is the inbound dual of webhooks: when Composio reports that a watched event fired, we need to receive it, decide which subscription it belongs to, and invoke the bound workflow.
What this adds
A single global ingress endpoint and an async dispatch pipeline that mirrors how outbound webhooks already work.
POST /triggers/composio/eventsreceives every Composio event. It verifies the signature, demuxes by the subscription id carried in the payload, fast-ACKs with202 Accepted, and enqueues onto a newqueues:triggersRedis Stream. It does not invoke the workflow inline. A dedicatedworker_triggersprocess consumes the stream and theTriggersDispatcherdoes the actual invocation, so a slow or failing workflow never blocks Composio's delivery.The async shape matches webhooks exactly:
The taskiq task retries with
TRIGGER_MAX_RETRIES=5(matchingWEBHOOK_MAX_RETRIES), anddedup_seen(event_id)makes redelivery idempotent.Signature verification is HMAC-SHA256 over
{id}.{ts}.{body}againstCOMPOSIO_WEBHOOK_SECRET, compared withhmac.compare_digest. If the secret is unset it is a no-op (local dev), and a bad signature is rejected before any processing. The endpoint is whitelisted as public in the auth middleware, mirroring the existingtools/connections/callbackOAuth route.The dispatcher attributes the triggered run to
subscription.created_by_id(the person who set up the subscription) or null if absent, and binds the workflow key-agnostically by iterating the subscription'sreferencesdict in the/retrieveselector shape.Tests / notes
test_triggers_ingress.pycovers the ingress endpoint.ruff formatandruff checkpass clean onapi/.queues:triggers.