Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
191 changes: 186 additions & 5 deletions src/serving/api/webhook_dispatcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
import os
import secrets
import uuid
from datetime import UTC, date, datetime
from datetime import UTC, date, datetime, timedelta
from decimal import Decimal
from pathlib import Path

Expand Down Expand Up @@ -158,11 +158,46 @@ def get_delivery_logs(conn: duckdb.DuckDBPyConnection, webhook_id: str) -> list[
return [dict(zip(columns, row, strict=False)) for row in cursor.fetchall()]


def ensure_webhook_delivery_queue_table(conn: duckdb.DuckDBPyConnection) -> None:
"""Durable per-(webhook, event) delivery state for re-drive.

Distinct from ``webhook_deliveries`` (an append-only attempt *log*): this is
the *state* table whose ``(webhook_id, event_id)`` primary key dedupes
enqueues and whose ``status`` / ``next_attempt_at`` drive retries that
survive a process restart. ``body`` stores the canonical payload so a
delivery can be replayed without re-reading ``pipeline_events``.
"""
conn.execute(
"""
CREATE TABLE IF NOT EXISTS webhook_delivery_queue (
webhook_id VARCHAR NOT NULL,
event_id VARCHAR NOT NULL,
tenant VARCHAR,
event_type VARCHAR,
body VARCHAR,
status VARCHAR NOT NULL DEFAULT 'pending',
attempts INTEGER NOT NULL DEFAULT 0,
next_attempt_at TIMESTAMP,
last_status_code INTEGER,
last_error VARCHAR,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
PRIMARY KEY (webhook_id, event_id)
)
"""
)


class WebhookDispatcher:
def __init__(self, app: FastAPI, poll_interval_seconds: float = 2.0) -> None:
self.app = app
self.poll_interval_seconds = poll_interval_seconds
self.backoff_seconds = [1.0, 5.0, 25.0]
# Durable re-drive: how many delivery rounds a (webhook, event) gets
# before it is parked as 'dead', and how many due rows one re-drive pass
# processes (bounded so the pass never blocks the loop on a large queue).
self.max_delivery_attempts = 5
self.redrive_batch_size = 100
self.seen_event_ids: set[str] = set()
self._task: asyncio.Task | None = None

Expand All @@ -187,6 +222,14 @@ async def run(self) -> None:
await self.dispatch_new_events()
except Exception as exc:
logger.warning("webhook_dispatcher_error", error=str(exc))
# Re-drive durably-queued deliveries that failed earlier (or were
# left pending by a crash/restart). This is the half the in-memory
# seen-set could not provide: a failed delivery is retried instead of
# silently dropped (audit_28_06_26.md #3).
try:
await self.process_delivery_queue()
except Exception as exc:
logger.warning("webhook_redrive_error", error=str(exc))
await asyncio.sleep(self.poll_interval_seconds)

def mark_existing_events_seen(self) -> None:
Expand Down Expand Up @@ -220,16 +263,43 @@ async def dispatch_new_events(self) -> None:
tenant = str(event.get("tenant_id") or "default")
for webhook in webhooks_by_tenant.get(tenant, []):
if _matches_filters(event, webhook.filters):
await self.deliver(webhook, event)
# Record the delivery durably *before* attempting it, then
# attempt inline (low latency for the happy path). A failure
# leaves a 'pending' row that process_delivery_queue re-drives
# — surviving all-retries-failed and a process restart, which
# the in-memory seen-set alone could not (audit #3).
self._enqueue_delivery(webhook, event)
result = await self.deliver(webhook, event)
self._record_delivery_outcome(webhook.id, event_id, result)

async def deliver(self, webhook: WebhookRegistration, event: dict) -> dict:
"""Deliver one event now (the ``/test`` endpoint and the inline dispatch
path). Computes the canonical body from ``event`` and posts it.
"""
event_type = str(event.get("event_type") or event.get("topic") or "unknown")
event_id = str(event.get("event_id") or "")
return await self._deliver_body(
webhook, body=_event_body(event), event_id=event_id, event_type=event_type
)

async def _deliver_body(
self,
webhook: WebhookRegistration,
*,
body: bytes,
event_id: str,
event_type: str,
) -> dict:
"""Post a pre-serialised body to one webhook with the retry burst and
per-attempt logging. Shared by :meth:`deliver` and the durable re-drive
(:meth:`process_delivery_queue`), which replays the stored body verbatim.
"""
conn = self.app.state.query_engine._conn
ensure_webhook_deliveries_table(conn)

delivery_id = str(uuid.uuid4())
event_type = str(event.get("event_type") or event.get("topic") or "unknown")
event_id = str(event.get("event_id") or delivery_id)
body = _event_body(event)
if not event_id:
event_id = delivery_id
headers = {
"Content-Type": "application/json",
"X-AgentFlow-Event": event_type,
Expand Down Expand Up @@ -354,6 +424,117 @@ def _fetch_pipeline_events(self, tenant: str | None = None) -> list[dict]:
result_columns = [description[0] for description in cursor.description]
return [dict(zip(result_columns, row, strict=False)) for row in cursor.fetchall()]

def _enqueue_delivery(self, webhook: WebhookRegistration, event: dict) -> None:
"""Durably record a (webhook, event) delivery as ``pending`` (idempotent
on the primary key — a re-scan of the same event never duplicates it)."""
event_id = str(event.get("event_id") or "")
if not event_id:
return
conn = self.app.state.query_engine._conn
ensure_webhook_delivery_queue_table(conn)
now = datetime.now(UTC)
conn.execute(
"""
INSERT INTO webhook_delivery_queue
(webhook_id, event_id, tenant, event_type, body, status, attempts,
next_attempt_at, created_at, updated_at)
VALUES (?, ?, ?, ?, ?, 'pending', 0, ?, ?, ?)
ON CONFLICT DO NOTHING
""",
[
webhook.id,
event_id,
str(event.get("tenant_id") or "default"),
str(event.get("event_type") or event.get("topic") or "unknown"),
_event_body(event).decode("utf-8"),
now,
now,
now,
],
)

def _record_delivery_outcome(self, webhook_id: str, event_id: str, result: dict) -> None:
"""Advance a queue row from the outcome of one delivery round: success →
``delivered``; failure → bump attempts and re-schedule (back to
``pending`` with a backoff ``next_attempt_at``), or park as ``dead`` once
``max_delivery_attempts`` is reached."""
if not event_id:
return
conn = self.app.state.query_engine._conn
now = datetime.now(UTC)
status_code = result.get("status_code")
if result.get("success"):
conn.execute(
"UPDATE webhook_delivery_queue SET status = 'delivered', "
"last_status_code = ?, last_error = NULL, updated_at = ? "
"WHERE webhook_id = ? AND event_id = ?",
[status_code, now, webhook_id, event_id],
)
return
row = conn.execute(
"SELECT attempts FROM webhook_delivery_queue WHERE webhook_id = ? AND event_id = ?",
[webhook_id, event_id],
).fetchone()
attempts = (row[0] if row else 0) + 1
error = result.get("error")
if attempts >= self.max_delivery_attempts:
conn.execute(
"UPDATE webhook_delivery_queue SET status = 'dead', attempts = ?, "
"last_status_code = ?, last_error = ?, next_attempt_at = NULL, updated_at = ? "
"WHERE webhook_id = ? AND event_id = ?",
[attempts, status_code, error, now, webhook_id, event_id],
)
return
delay = self.backoff_seconds[min(attempts - 1, len(self.backoff_seconds) - 1)]
conn.execute(
"UPDATE webhook_delivery_queue SET status = 'pending', attempts = ?, "
"last_status_code = ?, last_error = ?, next_attempt_at = ?, updated_at = ? "
"WHERE webhook_id = ? AND event_id = ?",
[
attempts,
status_code,
error,
now + timedelta(seconds=delay),
now,
webhook_id,
event_id,
],
)

async def process_delivery_queue(self) -> None:
"""Re-drive due ``pending`` deliveries. A webhook that has since been
removed/deactivated parks its row as ``dead`` rather than retrying
forever. Bounded by ``redrive_batch_size`` so one pass can't stall the
loop on a large backlog."""
conn = self.app.state.query_engine._conn
ensure_webhook_delivery_queue_table(conn)
path = get_webhook_config_path(self.app)
now = datetime.now(UTC)
due = conn.execute(
"SELECT webhook_id, event_id, tenant, event_type, body "
"FROM webhook_delivery_queue "
"WHERE status = 'pending' AND (next_attempt_at IS NULL OR next_attempt_at <= ?) "
"ORDER BY created_at ASC LIMIT ?",
[now, self.redrive_batch_size],
).fetchall()
for webhook_id, event_id, tenant, event_type, body in due:
webhook = get_webhook(path, webhook_id, str(tenant or "default"))
if webhook is None:
conn.execute(
"UPDATE webhook_delivery_queue SET status = 'dead', "
"last_error = 'webhook inactive or removed', next_attempt_at = NULL, "
"updated_at = ? WHERE webhook_id = ? AND event_id = ?",
[datetime.now(UTC), webhook_id, event_id],
)
continue
result = await self._deliver_body(
webhook,
body=(body or "").encode("utf-8"),
event_id=event_id,
event_type=event_type or "unknown",
)
self._record_delivery_outcome(webhook_id, event_id, result)


def _log_delivery(
conn: duckdb.DuckDBPyConnection,
Expand Down
41 changes: 41 additions & 0 deletions tests/integration/test_webhooks.py
Original file line number Diff line number Diff line change
Expand Up @@ -420,6 +420,47 @@ async def test_webhookless_tenant_events_are_scanned_without_cross_delivery(
]
assert delivered == [("http://agent.test/acme", "evt-acme-order")]

@pytest.mark.asyncio
async def test_failed_delivery_is_requeued_and_redriven(self, client, httpx_mock):
"""audit_28_06_26.md #3: a delivery that fails every inline attempt is not
silently dropped — it is left pending in the durable queue and re-driven
once the endpoint recovers."""
_disable_auth(client)
_register(
client,
filters={"event_types": ["order"], "entity_ids": ["ORD-1"], "min_amount": 100},
)
_prepare_pipeline_events(client)
dispatcher = client.app.state.webhook_dispatcher
dispatcher.seen_event_ids.clear()
conn = client.app.state.query_engine._conn
webhook_dispatcher.ensure_webhook_delivery_queue_table(conn)
conn.execute("DELETE FROM webhook_delivery_queue")

# endpoint down: every attempt of the inline burst returns 5xx
httpx_mock.add_response(status_code=500)
httpx_mock.add_response(status_code=500)
httpx_mock.add_response(status_code=500)
await dispatcher.dispatch_new_events()

row = conn.execute(
"SELECT status FROM webhook_delivery_queue WHERE event_id = 'evt-order-1'"
).fetchone()
assert row is not None # durably queued, not dropped
assert row[0] == "pending"

# endpoint recovers; the re-drive pass delivers it
httpx_mock.requests.clear()
conn.execute("UPDATE webhook_delivery_queue SET next_attempt_at = NULL")
await dispatcher.process_delivery_queue()

assert len(httpx_mock.requests) >= 1
assert json.loads(httpx_mock.requests[0]["content"].decode())["event_id"] == "evt-order-1"
status = conn.execute(
"SELECT status FROM webhook_delivery_queue WHERE event_id = 'evt-order-1'"
).fetchone()[0]
assert status == "delivered"

def test_webhook_registrations_survive_api_restart(self, tmp_path: Path):
config_path = tmp_path / "webhooks.yaml"
previous_path = getattr(app.state, "webhook_config_path", None)
Expand Down
Loading