diff --git a/src/serving/api/routers/alerts.py b/src/serving/api/routers/alerts.py index 99a55fd..c4b7f00 100644 --- a/src/serving/api/routers/alerts.py +++ b/src/serving/api/routers/alerts.py @@ -3,6 +3,7 @@ from fastapi import APIRouter, HTTPException, Request, Response, status from pydantic import AnyHttpUrl, BaseModel, Field +from starlette.concurrency import run_in_threadpool from src.serving.api.alert_dispatcher import ( create_alert, @@ -145,5 +146,16 @@ async def alert_history(alert_id: str, request: Request) -> dict[str, object]: rule = get_alert(get_alert_config_path(request.app), alert_id, _tenant(request)) if rule is None: raise HTTPException(status_code=404, detail=f"Alert '{alert_id}' not found.") - history = get_alert_history(request.app.state.query_engine._conn, alert_id) + history = await run_in_threadpool(_read_alert_history, request, alert_id) return {"history": history} + + +def _read_alert_history(request: Request, alert_id: str) -> list[dict]: + # The history scan runs on a worker thread (run_in_threadpool); a dedicated + # cursor — not the shared connection — keeps concurrent reads on different + # threads from colliding on the connection. (audit_30_06_26.md A2) + cursor = request.app.state.query_engine._conn.cursor() + try: + return get_alert_history(cursor, alert_id) + finally: + cursor.close() diff --git a/src/serving/api/routers/deadletter.py b/src/serving/api/routers/deadletter.py index 1da7bd3..4d4aec5 100644 --- a/src/serving/api/routers/deadletter.py +++ b/src/serving/api/routers/deadletter.py @@ -8,6 +8,7 @@ import duckdb from fastapi import APIRouter, HTTPException, Query, Request from pydantic import BaseModel, Field +from starlette.concurrency import run_in_threadpool from src.processing.event_replayer import ( DeadLetterEventNotFoundError, @@ -68,6 +69,18 @@ def _conn(request: Request) -> duckdb.DuckDBPyConnection: return conn +def _read_cursor(request: Request) -> duckdb.DuckDBPyConnection: + """A dedicated cursor for the GET handlers, used on a worker thread. + + The read handlers offload their scans with ``run_in_threadpool``; a separate + cursor — not the shared connection object — keeps concurrent reads on + different worker threads from colliding on the connection. (audit_30_06_26.md A2) + """ + cursor = cast(duckdb.DuckDBPyConnection, request.app.state.query_engine._conn).cursor() + ensure_dead_letter_table(cursor) + return cursor + + def _tenant_id(request: Request) -> str: tenant_key = getattr(request.state, "tenant_key", None) tenant_id = getattr(request.state, "tenant_id", None) @@ -116,41 +129,48 @@ def _require_deadletter_write_access(request: Request, event_id: str) -> None: @router.get("/stats", response_model=DeadLetterStatsResponse) async def deadletter_stats(request: Request) -> DeadLetterStatsResponse: - conn = _conn(request) + return await run_in_threadpool(_deadletter_stats, request) + + +def _deadletter_stats(request: Request) -> DeadLetterStatsResponse: + cursor = _read_cursor(request) tenant_id = _tenant_id(request) - rows = conn.execute( - """ - SELECT failure_reason, COUNT(*) - FROM dead_letter_events - WHERE status = 'failed' - AND COALESCE(tenant_id, 'default') = ? - GROUP BY failure_reason - ORDER BY failure_reason - """, - [tenant_id], - ).fetchall() - last_24h_row = conn.execute( - """ - SELECT COUNT(*) - FROM dead_letter_events - WHERE status = 'failed' - AND COALESCE(tenant_id, 'default') = ? - AND received_at >= NOW() - INTERVAL '24 hours' - """, - [tenant_id], - ).fetchone() - trend_rows = conn.execute( - """ - SELECT DATE_TRUNC('hour', received_at) AS hour_bucket, COUNT(*) - FROM dead_letter_events - WHERE status = 'failed' - AND COALESCE(tenant_id, 'default') = ? - AND received_at >= NOW() - INTERVAL '24 hours' - GROUP BY hour_bucket - ORDER BY hour_bucket - """, - [tenant_id], - ).fetchall() + try: + rows = cursor.execute( + """ + SELECT failure_reason, COUNT(*) + FROM dead_letter_events + WHERE status = 'failed' + AND COALESCE(tenant_id, 'default') = ? + GROUP BY failure_reason + ORDER BY failure_reason + """, + [tenant_id], + ).fetchall() + last_24h_row = cursor.execute( + """ + SELECT COUNT(*) + FROM dead_letter_events + WHERE status = 'failed' + AND COALESCE(tenant_id, 'default') = ? + AND received_at >= NOW() - INTERVAL '24 hours' + """, + [tenant_id], + ).fetchone() + trend_rows = cursor.execute( + """ + SELECT DATE_TRUNC('hour', received_at) AS hour_bucket, COUNT(*) + FROM dead_letter_events + WHERE status = 'failed' + AND COALESCE(tenant_id, 'default') = ? + AND received_at >= NOW() - INTERVAL '24 hours' + GROUP BY hour_bucket + ORDER BY hour_bucket + """, + [tenant_id], + ).fetchall() + finally: + cursor.close() return DeadLetterStatsResponse( counts={str(reason): int(count) for reason, count in rows if reason is not None}, last_24h=int(last_24h_row[0]) if last_24h_row and last_24h_row[0] is not None else 0, @@ -171,75 +191,84 @@ async def list_deadletter_events( page_size: int = Query(default=50, ge=1, le=100), reason: str | None = Query(default=None), ) -> DeadLetterListResponse: - conn = _conn(request) + return await run_in_threadpool(_list_deadletter_events, request, page, page_size, reason) + + +def _list_deadletter_events( + request: Request, page: int, page_size: int, reason: str | None +) -> DeadLetterListResponse: + cursor = _read_cursor(request) tenant_id = _tenant_id(request) - params: list[object] - if reason is not None: - params = [tenant_id, reason] - total_row = conn.execute( - """ - SELECT COUNT(*) - FROM dead_letter_events - WHERE status = 'failed' - AND COALESCE(tenant_id, 'default') = ? - AND failure_reason = ? - """, - params, - ).fetchone() - else: - params = [tenant_id] - total_row = conn.execute( - """ - SELECT COUNT(*) - FROM dead_letter_events - WHERE status = 'failed' - AND COALESCE(tenant_id, 'default') = ? - """, - params, - ).fetchone() - total = int(total_row[0]) if total_row and total_row[0] is not None else 0 - offset = (page - 1) * page_size - if reason is not None: - rows = conn.execute( - """ - SELECT - event_id, - event_type, - failure_reason, - failure_detail, - received_at, - retry_count, - last_retried_at, - status - FROM dead_letter_events - WHERE status = 'failed' - AND COALESCE(tenant_id, 'default') = ? - AND failure_reason = ? - ORDER BY received_at DESC, event_id ASC - LIMIT ? OFFSET ? - """, - [tenant_id, reason, page_size, offset], - ).fetchall() - else: - rows = conn.execute( - """ - SELECT - event_id, - event_type, - failure_reason, - failure_detail, - received_at, - retry_count, - last_retried_at, - status - FROM dead_letter_events - WHERE status = 'failed' - AND COALESCE(tenant_id, 'default') = ? - ORDER BY received_at DESC, event_id ASC - LIMIT ? OFFSET ? - """, - [tenant_id, page_size, offset], - ).fetchall() + try: + params: list[object] + if reason is not None: + params = [tenant_id, reason] + total_row = cursor.execute( + """ + SELECT COUNT(*) + FROM dead_letter_events + WHERE status = 'failed' + AND COALESCE(tenant_id, 'default') = ? + AND failure_reason = ? + """, + params, + ).fetchone() + else: + params = [tenant_id] + total_row = cursor.execute( + """ + SELECT COUNT(*) + FROM dead_letter_events + WHERE status = 'failed' + AND COALESCE(tenant_id, 'default') = ? + """, + params, + ).fetchone() + total = int(total_row[0]) if total_row and total_row[0] is not None else 0 + offset = (page - 1) * page_size + if reason is not None: + rows = cursor.execute( + """ + SELECT + event_id, + event_type, + failure_reason, + failure_detail, + received_at, + retry_count, + last_retried_at, + status + FROM dead_letter_events + WHERE status = 'failed' + AND COALESCE(tenant_id, 'default') = ? + AND failure_reason = ? + ORDER BY received_at DESC, event_id ASC + LIMIT ? OFFSET ? + """, + [tenant_id, reason, page_size, offset], + ).fetchall() + else: + rows = cursor.execute( + """ + SELECT + event_id, + event_type, + failure_reason, + failure_detail, + received_at, + retry_count, + last_retried_at, + status + FROM dead_letter_events + WHERE status = 'failed' + AND COALESCE(tenant_id, 'default') = ? + ORDER BY received_at DESC, event_id ASC + LIMIT ? OFFSET ? + """, + [tenant_id, page_size, offset], + ).fetchall() + finally: + cursor.close() return DeadLetterListResponse( items=[ @@ -266,25 +295,32 @@ async def list_deadletter_events( @router.get("/{event_id}", response_model=DeadLetterDetail) async def get_deadletter_event(event_id: str, request: Request) -> DeadLetterDetail: - conn = _conn(request) - row = conn.execute( - """ - SELECT - event_id, - event_type, - payload, - failure_reason, - failure_detail, - received_at, - retry_count, - last_retried_at, - status - FROM dead_letter_events - WHERE event_id = ? - AND COALESCE(tenant_id, 'default') = ? - """, - [event_id, _tenant_id(request)], - ).fetchone() + return await run_in_threadpool(_get_deadletter_event, request, event_id) + + +def _get_deadletter_event(request: Request, event_id: str) -> DeadLetterDetail: + cursor = _read_cursor(request) + try: + row = cursor.execute( + """ + SELECT + event_id, + event_type, + payload, + failure_reason, + failure_detail, + received_at, + retry_count, + last_retried_at, + status + FROM dead_letter_events + WHERE event_id = ? + AND COALESCE(tenant_id, 'default') = ? + """, + [event_id, _tenant_id(request)], + ).fetchone() + finally: + cursor.close() if row is None: raise HTTPException(status_code=404, detail=f"Dead-letter event '{event_id}' not found.") return DeadLetterDetail( diff --git a/src/serving/api/routers/webhooks.py b/src/serving/api/routers/webhooks.py index 25024c8..c1b87da 100644 --- a/src/serving/api/routers/webhooks.py +++ b/src/serving/api/routers/webhooks.py @@ -4,6 +4,7 @@ from fastapi import APIRouter, HTTPException, Request, Response, status from pydantic import AnyHttpUrl, BaseModel, Field +from starlette.concurrency import run_in_threadpool from src.serving.api.egress_guard import UnsafeEgressURLError, validate_public_url from src.serving.api.webhook_dispatcher import ( @@ -100,5 +101,16 @@ async def webhook_logs(webhook_id: str, request: Request) -> dict[str, object]: ) if registration is None: raise HTTPException(status_code=404, detail=f"Webhook '{webhook_id}' not found.") - logs = get_delivery_logs(request.app.state.query_engine._conn, webhook_id) + logs = await run_in_threadpool(_read_delivery_logs, request, webhook_id) return {"logs": logs} + + +def _read_delivery_logs(request: Request, webhook_id: str) -> list[dict]: + # The log scan runs on a worker thread (run_in_threadpool); a dedicated + # cursor — not the shared connection — keeps concurrent reads on different + # threads from colliding on the connection. (audit_30_06_26.md A2) + cursor = request.app.state.query_engine._conn.cursor() + try: + return get_delivery_logs(cursor, webhook_id) + finally: + cursor.close() diff --git a/tests/unit/test_alerts_router_unit.py b/tests/unit/test_alerts_router_unit.py index ec58187..caa19e0 100644 --- a/tests/unit/test_alerts_router_unit.py +++ b/tests/unit/test_alerts_router_unit.py @@ -270,12 +270,24 @@ async def test_test_alert_not_found_raises_404(monkeypatch: pytest.MonkeyPatch) # ── alert_history ──────────────────────────────────────────────── +class _CursorConn: + """A connection stub that yields a closeable cursor — the history read + offloads onto a dedicated cursor (audit_30 A2). get_alert_history is stubbed, + so the cursor itself is never queried.""" + + def cursor(self) -> _CursorConn: + return self + + def close(self) -> None: + pass + + async def test_alert_history_returns_records(monkeypatch: pytest.MonkeyPatch) -> None: monkeypatch.setattr(alerts_module, "get_alert", lambda path, alert_id, tenant: _rule()) monkeypatch.setattr( alerts_module, "get_alert_history", lambda conn, alert_id: [{"fired_at": "2026-06-13"}] ) - result = await alert_history("al-1", _req(query_conn=object())) + result = await alert_history("al-1", _req(query_conn=_CursorConn())) assert result == {"history": [{"fired_at": "2026-06-13"}]} diff --git a/tests/unit/test_blocking_routes_offloaded.py b/tests/unit/test_blocking_routes_offloaded.py index c2b14ec..6af458b 100644 --- a/tests/unit/test_blocking_routes_offloaded.py +++ b/tests/unit/test_blocking_routes_offloaded.py @@ -4,6 +4,10 @@ loop, so concurrent requests serialized and blocked every other tenant on the worker. This drives the lineage route through ASGI with a connection whose scan sleeps, and asserts four concurrent requests overlap instead of serializing. + +The same applies to the deadletter stats/list/detail reads and the alert-history +and webhook-log reads (the A2 follow-up): each ran its scan inline, so they are +pinned the same way below. """ from __future__ import annotations @@ -11,12 +15,16 @@ import asyncio import time from datetime import UTC, datetime +from pathlib import Path from types import SimpleNamespace import httpx import pytest from fastapi import FastAPI +from src.serving.api.routers import alerts as alerts_module +from src.serving.api.routers import webhooks as webhooks_module +from src.serving.api.routers.deadletter import router as deadletter_router from src.serving.api.routers.lineage import router as lineage_router from src.serving.semantic_layer.catalog import DataCatalog @@ -100,3 +108,78 @@ async def test_lineage_does_not_block_event_loop() -> None: assert all(response.status_code == 200 for response in responses) # 4 × 0.3s serialized ≈ 1.2s; offloaded to the threadpool they overlap (~0.3s). assert elapsed < 0.9, f"Event loop blocked: {elapsed:.2f}s (expected < 0.9s)" + + +class _SleepyHistoryConn: + """A fake DuckDB connection whose *data* scans sleep. DDL (CREATE/ALTER) and + PRAGMA statements return immediately; a SELECT sleeps, mimicking the blocking + scan these read handlers ran inline on the event loop. Supports ``execute`` + directly (the pre-fix path) and ``cursor()`` (the offloaded path), so the same + test serializes on the old code and overlaps on the new. ``fetchall`` / + ``fetchone`` return empty results — the test asserts timing, not content. + """ + + def __init__(self, delay_seconds: float) -> None: + self.delay_seconds = delay_seconds + self.description: list[tuple[str]] = [("c",)] + + def cursor(self) -> _SleepyHistoryConn: + return _SleepyHistoryConn(self.delay_seconds) + + def execute(self, sql: str, params: object = None) -> _SleepyHistoryConn: + if sql.lstrip().upper().startswith("SELECT"): + time.sleep(self.delay_seconds) + return self + + def fetchall(self) -> list[tuple]: + return [] + + def fetchone(self) -> tuple | None: + return None + + def close(self) -> None: + pass + + +async def _assert_concurrent_requests_overlap(app: FastAPI, url: str, *, budget: float) -> None: + transport = httpx.ASGITransport(app=app) + async with httpx.AsyncClient(transport=transport, base_url="http://testserver") as client: + started_at = time.perf_counter() + responses = await asyncio.gather(*[client.get(url) for _ in range(4)]) + elapsed = time.perf_counter() - started_at + assert all(response.status_code == 200 for response in responses) + assert elapsed < budget, f"Event loop blocked: {elapsed:.2f}s (expected < {budget}s)" + + +@pytest.mark.asyncio +async def test_deadletter_stats_does_not_block_event_loop() -> None: + app = FastAPI() + app.state.query_engine = SimpleNamespace(_conn=_SleepyHistoryConn(delay_seconds=0.1)) + app.include_router(deadletter_router) + # /stats runs three scans per request: 4 × serialized ≈ 1.2s, offloaded ≈ 0.3s. + await _assert_concurrent_requests_overlap(app, "/v1/deadletter/stats", budget=0.9) + + +@pytest.mark.asyncio +async def test_alert_history_does_not_block_event_loop(monkeypatch: pytest.MonkeyPatch) -> None: + # The rule lookup is a YAML read, not the DuckDB scan under test — stub it so + # the handler reaches the history scan with a non-None rule. + monkeypatch.setattr(alerts_module, "get_alert_config_path", lambda app: Path("unused")) + monkeypatch.setattr(alerts_module, "get_alert", lambda *a, **k: SimpleNamespace(id="A1")) + app = FastAPI() + app.state.query_engine = SimpleNamespace(_conn=_SleepyHistoryConn(delay_seconds=0.3)) + app.include_router(alerts_module.router) + # One scan per request: 4 × serialized ≈ 1.2s, offloaded ≈ 0.3s. + await _assert_concurrent_requests_overlap(app, "/v1/alerts/A1/history", budget=0.9) + + +@pytest.mark.asyncio +async def test_webhook_logs_does_not_block_event_loop(monkeypatch: pytest.MonkeyPatch) -> None: + # The registration lookup is a YAML read, not the DuckDB scan under test. + monkeypatch.setattr(webhooks_module, "get_webhook_config_path", lambda app: Path("unused")) + monkeypatch.setattr(webhooks_module, "get_webhook", lambda *a, **k: SimpleNamespace(id="W1")) + app = FastAPI() + app.state.query_engine = SimpleNamespace(_conn=_SleepyHistoryConn(delay_seconds=0.3)) + app.include_router(webhooks_module.router) + # One scan per request: 4 × serialized ≈ 1.2s, offloaded ≈ 0.3s. + await _assert_concurrent_requests_overlap(app, "/v1/webhooks/W1/logs", budget=0.9)