diff --git a/src/serving/api/webhook_dispatcher.py b/src/serving/api/webhook_dispatcher.py index 0e35dcc..5c2dd53 100644 --- a/src/serving/api/webhook_dispatcher.py +++ b/src/serving/api/webhook_dispatcher.py @@ -7,7 +7,7 @@ import os import secrets import uuid -from datetime import UTC, date, datetime +from datetime import UTC, date, datetime, timedelta from decimal import Decimal from pathlib import Path @@ -158,11 +158,46 @@ def get_delivery_logs(conn: duckdb.DuckDBPyConnection, webhook_id: str) -> list[ return [dict(zip(columns, row, strict=False)) for row in cursor.fetchall()] +def ensure_webhook_delivery_queue_table(conn: duckdb.DuckDBPyConnection) -> None: + """Durable per-(webhook, event) delivery state for re-drive. + + Distinct from ``webhook_deliveries`` (an append-only attempt *log*): this is + the *state* table whose ``(webhook_id, event_id)`` primary key dedupes + enqueues and whose ``status`` / ``next_attempt_at`` drive retries that + survive a process restart. ``body`` stores the canonical payload so a + delivery can be replayed without re-reading ``pipeline_events``. + """ + conn.execute( + """ + CREATE TABLE IF NOT EXISTS webhook_delivery_queue ( + webhook_id VARCHAR NOT NULL, + event_id VARCHAR NOT NULL, + tenant VARCHAR, + event_type VARCHAR, + body VARCHAR, + status VARCHAR NOT NULL DEFAULT 'pending', + attempts INTEGER NOT NULL DEFAULT 0, + next_attempt_at TIMESTAMP, + last_status_code INTEGER, + last_error VARCHAR, + created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, + updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, + PRIMARY KEY (webhook_id, event_id) + ) + """ + ) + + class WebhookDispatcher: def __init__(self, app: FastAPI, poll_interval_seconds: float = 2.0) -> None: self.app = app self.poll_interval_seconds = poll_interval_seconds self.backoff_seconds = [1.0, 5.0, 25.0] + # Durable re-drive: how many delivery rounds a (webhook, event) gets + # before it is parked as 'dead', and how many due rows one re-drive pass + # processes (bounded so the pass never blocks the loop on a large queue). + self.max_delivery_attempts = 5 + self.redrive_batch_size = 100 self.seen_event_ids: set[str] = set() self._task: asyncio.Task | None = None @@ -187,6 +222,14 @@ async def run(self) -> None: await self.dispatch_new_events() except Exception as exc: logger.warning("webhook_dispatcher_error", error=str(exc)) + # Re-drive durably-queued deliveries that failed earlier (or were + # left pending by a crash/restart). This is the half the in-memory + # seen-set could not provide: a failed delivery is retried instead of + # silently dropped (audit_28_06_26.md #3). + try: + await self.process_delivery_queue() + except Exception as exc: + logger.warning("webhook_redrive_error", error=str(exc)) await asyncio.sleep(self.poll_interval_seconds) def mark_existing_events_seen(self) -> None: @@ -220,16 +263,43 @@ async def dispatch_new_events(self) -> None: tenant = str(event.get("tenant_id") or "default") for webhook in webhooks_by_tenant.get(tenant, []): if _matches_filters(event, webhook.filters): - await self.deliver(webhook, event) + # Record the delivery durably *before* attempting it, then + # attempt inline (low latency for the happy path). A failure + # leaves a 'pending' row that process_delivery_queue re-drives + # — surviving all-retries-failed and a process restart, which + # the in-memory seen-set alone could not (audit #3). + self._enqueue_delivery(webhook, event) + result = await self.deliver(webhook, event) + self._record_delivery_outcome(webhook.id, event_id, result) async def deliver(self, webhook: WebhookRegistration, event: dict) -> dict: + """Deliver one event now (the ``/test`` endpoint and the inline dispatch + path). Computes the canonical body from ``event`` and posts it. + """ + event_type = str(event.get("event_type") or event.get("topic") or "unknown") + event_id = str(event.get("event_id") or "") + return await self._deliver_body( + webhook, body=_event_body(event), event_id=event_id, event_type=event_type + ) + + async def _deliver_body( + self, + webhook: WebhookRegistration, + *, + body: bytes, + event_id: str, + event_type: str, + ) -> dict: + """Post a pre-serialised body to one webhook with the retry burst and + per-attempt logging. Shared by :meth:`deliver` and the durable re-drive + (:meth:`process_delivery_queue`), which replays the stored body verbatim. + """ conn = self.app.state.query_engine._conn ensure_webhook_deliveries_table(conn) delivery_id = str(uuid.uuid4()) - event_type = str(event.get("event_type") or event.get("topic") or "unknown") - event_id = str(event.get("event_id") or delivery_id) - body = _event_body(event) + if not event_id: + event_id = delivery_id headers = { "Content-Type": "application/json", "X-AgentFlow-Event": event_type, @@ -354,6 +424,117 @@ def _fetch_pipeline_events(self, tenant: str | None = None) -> list[dict]: result_columns = [description[0] for description in cursor.description] return [dict(zip(result_columns, row, strict=False)) for row in cursor.fetchall()] + def _enqueue_delivery(self, webhook: WebhookRegistration, event: dict) -> None: + """Durably record a (webhook, event) delivery as ``pending`` (idempotent + on the primary key — a re-scan of the same event never duplicates it).""" + event_id = str(event.get("event_id") or "") + if not event_id: + return + conn = self.app.state.query_engine._conn + ensure_webhook_delivery_queue_table(conn) + now = datetime.now(UTC) + conn.execute( + """ + INSERT INTO webhook_delivery_queue + (webhook_id, event_id, tenant, event_type, body, status, attempts, + next_attempt_at, created_at, updated_at) + VALUES (?, ?, ?, ?, ?, 'pending', 0, ?, ?, ?) + ON CONFLICT DO NOTHING + """, + [ + webhook.id, + event_id, + str(event.get("tenant_id") or "default"), + str(event.get("event_type") or event.get("topic") or "unknown"), + _event_body(event).decode("utf-8"), + now, + now, + now, + ], + ) + + def _record_delivery_outcome(self, webhook_id: str, event_id: str, result: dict) -> None: + """Advance a queue row from the outcome of one delivery round: success → + ``delivered``; failure → bump attempts and re-schedule (back to + ``pending`` with a backoff ``next_attempt_at``), or park as ``dead`` once + ``max_delivery_attempts`` is reached.""" + if not event_id: + return + conn = self.app.state.query_engine._conn + now = datetime.now(UTC) + status_code = result.get("status_code") + if result.get("success"): + conn.execute( + "UPDATE webhook_delivery_queue SET status = 'delivered', " + "last_status_code = ?, last_error = NULL, updated_at = ? " + "WHERE webhook_id = ? AND event_id = ?", + [status_code, now, webhook_id, event_id], + ) + return + row = conn.execute( + "SELECT attempts FROM webhook_delivery_queue WHERE webhook_id = ? AND event_id = ?", + [webhook_id, event_id], + ).fetchone() + attempts = (row[0] if row else 0) + 1 + error = result.get("error") + if attempts >= self.max_delivery_attempts: + conn.execute( + "UPDATE webhook_delivery_queue SET status = 'dead', attempts = ?, " + "last_status_code = ?, last_error = ?, next_attempt_at = NULL, updated_at = ? " + "WHERE webhook_id = ? AND event_id = ?", + [attempts, status_code, error, now, webhook_id, event_id], + ) + return + delay = self.backoff_seconds[min(attempts - 1, len(self.backoff_seconds) - 1)] + conn.execute( + "UPDATE webhook_delivery_queue SET status = 'pending', attempts = ?, " + "last_status_code = ?, last_error = ?, next_attempt_at = ?, updated_at = ? " + "WHERE webhook_id = ? AND event_id = ?", + [ + attempts, + status_code, + error, + now + timedelta(seconds=delay), + now, + webhook_id, + event_id, + ], + ) + + async def process_delivery_queue(self) -> None: + """Re-drive due ``pending`` deliveries. A webhook that has since been + removed/deactivated parks its row as ``dead`` rather than retrying + forever. Bounded by ``redrive_batch_size`` so one pass can't stall the + loop on a large backlog.""" + conn = self.app.state.query_engine._conn + ensure_webhook_delivery_queue_table(conn) + path = get_webhook_config_path(self.app) + now = datetime.now(UTC) + due = conn.execute( + "SELECT webhook_id, event_id, tenant, event_type, body " + "FROM webhook_delivery_queue " + "WHERE status = 'pending' AND (next_attempt_at IS NULL OR next_attempt_at <= ?) " + "ORDER BY created_at ASC LIMIT ?", + [now, self.redrive_batch_size], + ).fetchall() + for webhook_id, event_id, tenant, event_type, body in due: + webhook = get_webhook(path, webhook_id, str(tenant or "default")) + if webhook is None: + conn.execute( + "UPDATE webhook_delivery_queue SET status = 'dead', " + "last_error = 'webhook inactive or removed', next_attempt_at = NULL, " + "updated_at = ? WHERE webhook_id = ? AND event_id = ?", + [datetime.now(UTC), webhook_id, event_id], + ) + continue + result = await self._deliver_body( + webhook, + body=(body or "").encode("utf-8"), + event_id=event_id, + event_type=event_type or "unknown", + ) + self._record_delivery_outcome(webhook_id, event_id, result) + def _log_delivery( conn: duckdb.DuckDBPyConnection, diff --git a/tests/integration/test_webhooks.py b/tests/integration/test_webhooks.py index d3be5ee..2db5ca1 100644 --- a/tests/integration/test_webhooks.py +++ b/tests/integration/test_webhooks.py @@ -420,6 +420,47 @@ async def test_webhookless_tenant_events_are_scanned_without_cross_delivery( ] assert delivered == [("http://agent.test/acme", "evt-acme-order")] + @pytest.mark.asyncio + async def test_failed_delivery_is_requeued_and_redriven(self, client, httpx_mock): + """audit_28_06_26.md #3: a delivery that fails every inline attempt is not + silently dropped — it is left pending in the durable queue and re-driven + once the endpoint recovers.""" + _disable_auth(client) + _register( + client, + filters={"event_types": ["order"], "entity_ids": ["ORD-1"], "min_amount": 100}, + ) + _prepare_pipeline_events(client) + dispatcher = client.app.state.webhook_dispatcher + dispatcher.seen_event_ids.clear() + conn = client.app.state.query_engine._conn + webhook_dispatcher.ensure_webhook_delivery_queue_table(conn) + conn.execute("DELETE FROM webhook_delivery_queue") + + # endpoint down: every attempt of the inline burst returns 5xx + httpx_mock.add_response(status_code=500) + httpx_mock.add_response(status_code=500) + httpx_mock.add_response(status_code=500) + await dispatcher.dispatch_new_events() + + row = conn.execute( + "SELECT status FROM webhook_delivery_queue WHERE event_id = 'evt-order-1'" + ).fetchone() + assert row is not None # durably queued, not dropped + assert row[0] == "pending" + + # endpoint recovers; the re-drive pass delivers it + httpx_mock.requests.clear() + conn.execute("UPDATE webhook_delivery_queue SET next_attempt_at = NULL") + await dispatcher.process_delivery_queue() + + assert len(httpx_mock.requests) >= 1 + assert json.loads(httpx_mock.requests[0]["content"].decode())["event_id"] == "evt-order-1" + status = conn.execute( + "SELECT status FROM webhook_delivery_queue WHERE event_id = 'evt-order-1'" + ).fetchone()[0] + assert status == "delivered" + def test_webhook_registrations_survive_api_restart(self, tmp_path: Path): config_path = tmp_path / "webhooks.yaml" previous_path = getattr(app.state, "webhook_config_path", None) diff --git a/tests/unit/test_webhook_dispatcher_unit.py b/tests/unit/test_webhook_dispatcher_unit.py index 934d964..56f2739 100644 --- a/tests/unit/test_webhook_dispatcher_unit.py +++ b/tests/unit/test_webhook_dispatcher_unit.py @@ -9,11 +9,12 @@ from __future__ import annotations +import asyncio import hashlib import hmac import json from collections.abc import Iterator -from datetime import UTC, datetime +from datetime import UTC, datetime, timedelta from decimal import Decimal from pathlib import Path from types import SimpleNamespace @@ -238,3 +239,191 @@ def test_delivery_logs_roundtrip() -> None: assert get_delivery_logs(conn, "wh-other") == [] finally: conn.close() + + +# --- durable delivery queue / re-drive (audit_28_06_26.md #3) ----------------- + + +def _event(event_id: str = "e1", tenant: str = "default") -> dict: + return { + "event_id": event_id, + "tenant_id": tenant, + "event_type": "order.created", + "order_id": "ORD-1", + } + + +def _queue_row(conn: duckdb.DuckDBPyConnection, webhook_id: str, event_id: str): + return conn.execute( + "SELECT status, attempts, next_attempt_at FROM webhook_delivery_queue " + "WHERE webhook_id = ? AND event_id = ?", + [webhook_id, event_id], + ).fetchone() + + +def test_enqueue_delivery_is_idempotent_on_webhook_event() -> None: + conn = duckdb.connect(":memory:") + try: + dispatcher = WebhookDispatcher(_stub_app(conn)) + webhook = SimpleNamespace(id="wh-1") + dispatcher._enqueue_delivery(webhook, _event("e1")) + dispatcher._enqueue_delivery(webhook, _event("e1")) # re-scan: no duplicate + assert conn.execute("SELECT count(*) FROM webhook_delivery_queue").fetchone()[0] == 1 + status, attempts, _ = _queue_row(conn, "wh-1", "e1") + assert status == "pending" + assert attempts == 0 + finally: + conn.close() + + +def test_record_outcome_success_marks_delivered() -> None: + conn = duckdb.connect(":memory:") + try: + dispatcher = WebhookDispatcher(_stub_app(conn)) + dispatcher._enqueue_delivery(SimpleNamespace(id="wh-1"), _event("e1")) + dispatcher._record_delivery_outcome("wh-1", "e1", {"success": True, "status_code": 200}) + assert _queue_row(conn, "wh-1", "e1")[0] == "delivered" + finally: + conn.close() + + +def test_record_outcome_failure_reschedules_then_dies_at_max() -> None: + conn = duckdb.connect(":memory:") + try: + dispatcher = WebhookDispatcher(_stub_app(conn)) + dispatcher.max_delivery_attempts = 2 + dispatcher.backoff_seconds = [10.0] + dispatcher._enqueue_delivery(SimpleNamespace(id="wh-1"), _event("e1")) + + dispatcher._record_delivery_outcome("wh-1", "e1", {"success": False, "status_code": 500}) + status, attempts, next_at = _queue_row(conn, "wh-1", "e1") + assert (status, attempts) == ("pending", 1) + assert next_at is not None # scheduled for re-drive + + dispatcher._record_delivery_outcome("wh-1", "e1", {"success": False, "status_code": 500}) + status, attempts, next_at = _queue_row(conn, "wh-1", "e1") + assert (status, attempts) == ("dead", 2) + assert next_at is None # parked, no longer re-driven + finally: + conn.close() + + +def test_process_delivery_queue_redrives_due_pending(tmp_path: Path) -> None: + conn = duckdb.connect(":memory:") + try: + config_path = tmp_path / "webhooks.yaml" + created = create_webhook( + config_path, url="https://example.test/hook", tenant="acme", filters=WebhookFilters() + ) + app = _stub_app(conn) + app.state.webhook_config_path = config_path + dispatcher = WebhookDispatcher(app) + dispatcher._enqueue_delivery(SimpleNamespace(id=created.id), _event("e1", tenant="acme")) + dispatcher._record_delivery_outcome( + created.id, "e1", {"success": False, "status_code": 500} + ) + conn.execute("UPDATE webhook_delivery_queue SET next_attempt_at = NULL") # force due + + attempted: list[str] = [] + + async def _fake_deliver_body(webhook, *, body, event_id, event_type): + attempted.append(event_id) + return {"success": True, "status_code": 200} + + dispatcher._deliver_body = _fake_deliver_body + asyncio.run(dispatcher.process_delivery_queue()) + + assert attempted == ["e1"] + assert _queue_row(conn, created.id, "e1")[0] == "delivered" + finally: + conn.close() + + +def test_process_delivery_queue_parks_dead_when_webhook_removed(tmp_path: Path) -> None: + conn = duckdb.connect(":memory:") + try: + config_path = tmp_path / "webhooks.yaml" # no webhook registered -> lookup returns None + app = _stub_app(conn) + app.state.webhook_config_path = config_path + dispatcher = WebhookDispatcher(app) + dispatcher._enqueue_delivery(SimpleNamespace(id="ghost"), _event("e1", tenant="acme")) + + attempted: list[int] = [] + + async def _fake_deliver_body(*args, **kwargs): + attempted.append(1) + return {"success": True} + + dispatcher._deliver_body = _fake_deliver_body + asyncio.run(dispatcher.process_delivery_queue()) + + assert attempted == [] # a removed webhook is never posted to + assert _queue_row(conn, "ghost", "e1")[0] == "dead" + finally: + conn.close() + + +def test_process_delivery_queue_skips_not_due(tmp_path: Path) -> None: + conn = duckdb.connect(":memory:") + try: + config_path = tmp_path / "webhooks.yaml" + created = create_webhook( + config_path, url="https://example.test/hook", tenant="acme", filters=WebhookFilters() + ) + app = _stub_app(conn) + app.state.webhook_config_path = config_path + dispatcher = WebhookDispatcher(app) + dispatcher._enqueue_delivery(SimpleNamespace(id=created.id), _event("e1", tenant="acme")) + conn.execute( + "UPDATE webhook_delivery_queue SET next_attempt_at = ?", + [datetime.now(UTC) + timedelta(hours=1)], # due in the future + ) + + attempted: list[int] = [] + + async def _fake_deliver_body(*args, **kwargs): + attempted.append(1) + return {"success": True} + + dispatcher._deliver_body = _fake_deliver_body + asyncio.run(dispatcher.process_delivery_queue()) + + assert attempted == [] + assert _queue_row(conn, created.id, "e1")[0] == "pending" + finally: + conn.close() + + +def test_pending_delivery_survives_a_new_dispatcher_instance(tmp_path: Path) -> None: + """The core #3 guarantee: a failed delivery left pending in the durable queue + is re-driven by a *fresh* dispatcher (i.e. after a process restart) — the + in-memory seen-set cannot do this.""" + conn = duckdb.connect(":memory:") + try: + config_path = tmp_path / "webhooks.yaml" + created = create_webhook( + config_path, url="https://example.test/hook", tenant="acme", filters=WebhookFilters() + ) + app = _stub_app(conn) + app.state.webhook_config_path = config_path + + first = WebhookDispatcher(app) + first._enqueue_delivery(SimpleNamespace(id=created.id), _event("e1", tenant="acme")) + first._record_delivery_outcome(created.id, "e1", {"success": False, "status_code": 503}) + conn.execute("UPDATE webhook_delivery_queue SET next_attempt_at = NULL") + del first # simulate process exit; only the durable row remains + + reborn = WebhookDispatcher(app) + attempted: list[str] = [] + + async def _fake_deliver_body(webhook, *, body, event_id, event_type): + attempted.append(event_id) + return {"success": True, "status_code": 200} + + reborn._deliver_body = _fake_deliver_body + asyncio.run(reborn.process_delivery_queue()) + + assert attempted == ["e1"] + assert _queue_row(conn, created.id, "e1")[0] == "delivered" + finally: + conn.close()