Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 13 additions & 1 deletion src/serving/api/routers/alerts.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@

from fastapi import APIRouter, HTTPException, Request, Response, status
from pydantic import AnyHttpUrl, BaseModel, Field
from starlette.concurrency import run_in_threadpool

from src.serving.api.alert_dispatcher import (
create_alert,
Expand Down Expand Up @@ -145,5 +146,16 @@ async def alert_history(alert_id: str, request: Request) -> dict[str, object]:
rule = get_alert(get_alert_config_path(request.app), alert_id, _tenant(request))
if rule is None:
raise HTTPException(status_code=404, detail=f"Alert '{alert_id}' not found.")
history = get_alert_history(request.app.state.query_engine._conn, alert_id)
history = await run_in_threadpool(_read_alert_history, request, alert_id)
return {"history": history}


def _read_alert_history(request: Request, alert_id: str) -> list[dict]:
# The history scan runs on a worker thread (run_in_threadpool); a dedicated
# cursor — not the shared connection — keeps concurrent reads on different
# threads from colliding on the connection. (audit_30_06_26.md A2)
cursor = request.app.state.query_engine._conn.cursor()
try:
return get_alert_history(cursor, alert_id)
finally:
cursor.close()
278 changes: 157 additions & 121 deletions src/serving/api/routers/deadletter.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import duckdb
from fastapi import APIRouter, HTTPException, Query, Request
from pydantic import BaseModel, Field
from starlette.concurrency import run_in_threadpool

from src.processing.event_replayer import (
DeadLetterEventNotFoundError,
Expand Down Expand Up @@ -68,6 +69,18 @@ def _conn(request: Request) -> duckdb.DuckDBPyConnection:
return conn


def _read_cursor(request: Request) -> duckdb.DuckDBPyConnection:
"""A dedicated cursor for the GET handlers, used on a worker thread.

The read handlers offload their scans with ``run_in_threadpool``; a separate
cursor — not the shared connection object — keeps concurrent reads on
different worker threads from colliding on the connection. (audit_30_06_26.md A2)
"""
cursor = cast(duckdb.DuckDBPyConnection, request.app.state.query_engine._conn).cursor()
ensure_dead_letter_table(cursor)
return cursor


def _tenant_id(request: Request) -> str:
tenant_key = getattr(request.state, "tenant_key", None)
tenant_id = getattr(request.state, "tenant_id", None)
Expand Down Expand Up @@ -116,41 +129,48 @@ def _require_deadletter_write_access(request: Request, event_id: str) -> None:

@router.get("/stats", response_model=DeadLetterStatsResponse)
async def deadletter_stats(request: Request) -> DeadLetterStatsResponse:
conn = _conn(request)
return await run_in_threadpool(_deadletter_stats, request)


def _deadletter_stats(request: Request) -> DeadLetterStatsResponse:
cursor = _read_cursor(request)
tenant_id = _tenant_id(request)
rows = conn.execute(
"""
SELECT failure_reason, COUNT(*)
FROM dead_letter_events
WHERE status = 'failed'
AND COALESCE(tenant_id, 'default') = ?
GROUP BY failure_reason
ORDER BY failure_reason
""",
[tenant_id],
).fetchall()
last_24h_row = conn.execute(
"""
SELECT COUNT(*)
FROM dead_letter_events
WHERE status = 'failed'
AND COALESCE(tenant_id, 'default') = ?
AND received_at >= NOW() - INTERVAL '24 hours'
""",
[tenant_id],
).fetchone()
trend_rows = conn.execute(
"""
SELECT DATE_TRUNC('hour', received_at) AS hour_bucket, COUNT(*)
FROM dead_letter_events
WHERE status = 'failed'
AND COALESCE(tenant_id, 'default') = ?
AND received_at >= NOW() - INTERVAL '24 hours'
GROUP BY hour_bucket
ORDER BY hour_bucket
""",
[tenant_id],
).fetchall()
try:
rows = cursor.execute(
"""
SELECT failure_reason, COUNT(*)
FROM dead_letter_events
WHERE status = 'failed'
AND COALESCE(tenant_id, 'default') = ?
GROUP BY failure_reason
ORDER BY failure_reason
""",
[tenant_id],
).fetchall()
last_24h_row = cursor.execute(
"""
SELECT COUNT(*)
FROM dead_letter_events
WHERE status = 'failed'
AND COALESCE(tenant_id, 'default') = ?
AND received_at >= NOW() - INTERVAL '24 hours'
""",
[tenant_id],
).fetchone()
trend_rows = cursor.execute(
"""
SELECT DATE_TRUNC('hour', received_at) AS hour_bucket, COUNT(*)
FROM dead_letter_events
WHERE status = 'failed'
AND COALESCE(tenant_id, 'default') = ?
AND received_at >= NOW() - INTERVAL '24 hours'
GROUP BY hour_bucket
ORDER BY hour_bucket
""",
[tenant_id],
).fetchall()
finally:
cursor.close()
return DeadLetterStatsResponse(
counts={str(reason): int(count) for reason, count in rows if reason is not None},
last_24h=int(last_24h_row[0]) if last_24h_row and last_24h_row[0] is not None else 0,
Expand All @@ -171,75 +191,84 @@ async def list_deadletter_events(
page_size: int = Query(default=50, ge=1, le=100),
reason: str | None = Query(default=None),
) -> DeadLetterListResponse:
conn = _conn(request)
return await run_in_threadpool(_list_deadletter_events, request, page, page_size, reason)


def _list_deadletter_events(
request: Request, page: int, page_size: int, reason: str | None
) -> DeadLetterListResponse:
cursor = _read_cursor(request)
tenant_id = _tenant_id(request)
params: list[object]
if reason is not None:
params = [tenant_id, reason]
total_row = conn.execute(
"""
SELECT COUNT(*)
FROM dead_letter_events
WHERE status = 'failed'
AND COALESCE(tenant_id, 'default') = ?
AND failure_reason = ?
""",
params,
).fetchone()
else:
params = [tenant_id]
total_row = conn.execute(
"""
SELECT COUNT(*)
FROM dead_letter_events
WHERE status = 'failed'
AND COALESCE(tenant_id, 'default') = ?
""",
params,
).fetchone()
total = int(total_row[0]) if total_row and total_row[0] is not None else 0
offset = (page - 1) * page_size
if reason is not None:
rows = conn.execute(
"""
SELECT
event_id,
event_type,
failure_reason,
failure_detail,
received_at,
retry_count,
last_retried_at,
status
FROM dead_letter_events
WHERE status = 'failed'
AND COALESCE(tenant_id, 'default') = ?
AND failure_reason = ?
ORDER BY received_at DESC, event_id ASC
LIMIT ? OFFSET ?
""",
[tenant_id, reason, page_size, offset],
).fetchall()
else:
rows = conn.execute(
"""
SELECT
event_id,
event_type,
failure_reason,
failure_detail,
received_at,
retry_count,
last_retried_at,
status
FROM dead_letter_events
WHERE status = 'failed'
AND COALESCE(tenant_id, 'default') = ?
ORDER BY received_at DESC, event_id ASC
LIMIT ? OFFSET ?
""",
[tenant_id, page_size, offset],
).fetchall()
try:
params: list[object]
if reason is not None:
params = [tenant_id, reason]
total_row = cursor.execute(
"""
SELECT COUNT(*)
FROM dead_letter_events
WHERE status = 'failed'
AND COALESCE(tenant_id, 'default') = ?
AND failure_reason = ?
""",
params,
).fetchone()
else:
params = [tenant_id]
total_row = cursor.execute(
"""
SELECT COUNT(*)
FROM dead_letter_events
WHERE status = 'failed'
AND COALESCE(tenant_id, 'default') = ?
""",
params,
).fetchone()
total = int(total_row[0]) if total_row and total_row[0] is not None else 0
offset = (page - 1) * page_size
if reason is not None:
rows = cursor.execute(
"""
SELECT
event_id,
event_type,
failure_reason,
failure_detail,
received_at,
retry_count,
last_retried_at,
status
FROM dead_letter_events
WHERE status = 'failed'
AND COALESCE(tenant_id, 'default') = ?
AND failure_reason = ?
ORDER BY received_at DESC, event_id ASC
LIMIT ? OFFSET ?
""",
[tenant_id, reason, page_size, offset],
).fetchall()
else:
rows = cursor.execute(
"""
SELECT
event_id,
event_type,
failure_reason,
failure_detail,
received_at,
retry_count,
last_retried_at,
status
FROM dead_letter_events
WHERE status = 'failed'
AND COALESCE(tenant_id, 'default') = ?
ORDER BY received_at DESC, event_id ASC
LIMIT ? OFFSET ?
""",
[tenant_id, page_size, offset],
).fetchall()
finally:
cursor.close()

return DeadLetterListResponse(
items=[
Expand All @@ -266,25 +295,32 @@ async def list_deadletter_events(

@router.get("/{event_id}", response_model=DeadLetterDetail)
async def get_deadletter_event(event_id: str, request: Request) -> DeadLetterDetail:
conn = _conn(request)
row = conn.execute(
"""
SELECT
event_id,
event_type,
payload,
failure_reason,
failure_detail,
received_at,
retry_count,
last_retried_at,
status
FROM dead_letter_events
WHERE event_id = ?
AND COALESCE(tenant_id, 'default') = ?
""",
[event_id, _tenant_id(request)],
).fetchone()
return await run_in_threadpool(_get_deadletter_event, request, event_id)


def _get_deadletter_event(request: Request, event_id: str) -> DeadLetterDetail:
cursor = _read_cursor(request)
try:
row = cursor.execute(
"""
SELECT
event_id,
event_type,
payload,
failure_reason,
failure_detail,
received_at,
retry_count,
last_retried_at,
status
FROM dead_letter_events
WHERE event_id = ?
AND COALESCE(tenant_id, 'default') = ?
""",
[event_id, _tenant_id(request)],
).fetchone()
finally:
cursor.close()
if row is None:
raise HTTPException(status_code=404, detail=f"Dead-letter event '{event_id}' not found.")
return DeadLetterDetail(
Expand Down
14 changes: 13 additions & 1 deletion src/serving/api/routers/webhooks.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

from fastapi import APIRouter, HTTPException, Request, Response, status
from pydantic import AnyHttpUrl, BaseModel, Field
from starlette.concurrency import run_in_threadpool

from src.serving.api.egress_guard import UnsafeEgressURLError, validate_public_url
from src.serving.api.webhook_dispatcher import (
Expand Down Expand Up @@ -100,5 +101,16 @@ async def webhook_logs(webhook_id: str, request: Request) -> dict[str, object]:
)
if registration is None:
raise HTTPException(status_code=404, detail=f"Webhook '{webhook_id}' not found.")
logs = get_delivery_logs(request.app.state.query_engine._conn, webhook_id)
logs = await run_in_threadpool(_read_delivery_logs, request, webhook_id)
return {"logs": logs}


def _read_delivery_logs(request: Request, webhook_id: str) -> list[dict]:
# The log scan runs on a worker thread (run_in_threadpool); a dedicated
# cursor — not the shared connection — keeps concurrent reads on different
# threads from colliding on the connection. (audit_30_06_26.md A2)
cursor = request.app.state.query_engine._conn.cursor()
try:
return get_delivery_logs(cursor, webhook_id)
finally:
cursor.close()
14 changes: 13 additions & 1 deletion tests/unit/test_alerts_router_unit.py
Original file line number Diff line number Diff line change
Expand Up @@ -270,12 +270,24 @@ async def test_test_alert_not_found_raises_404(monkeypatch: pytest.MonkeyPatch)
# ── alert_history ────────────────────────────────────────────────


class _CursorConn:
"""A connection stub that yields a closeable cursor — the history read
offloads onto a dedicated cursor (audit_30 A2). get_alert_history is stubbed,
so the cursor itself is never queried."""

def cursor(self) -> _CursorConn:
return self

def close(self) -> None:
pass


async def test_alert_history_returns_records(monkeypatch: pytest.MonkeyPatch) -> None:
monkeypatch.setattr(alerts_module, "get_alert", lambda path, alert_id, tenant: _rule())
monkeypatch.setattr(
alerts_module, "get_alert_history", lambda conn, alert_id: [{"fired_at": "2026-06-13"}]
)
result = await alert_history("al-1", _req(query_conn=object()))
result = await alert_history("al-1", _req(query_conn=_CursorConn()))
assert result == {"history": [{"fired_at": "2026-06-13"}]}


Expand Down
Loading
Loading