Skip to content
18 changes: 16 additions & 2 deletions src/serving/api/alerts/escalation.py
Original file line number Diff line number Diff line change
Expand Up @@ -194,8 +194,9 @@ async def dispatch_alert(
continue
if step.webhook_url not in notified_urls:
notified_urls.append(step.webhook_url)
all_delivered = True
for webhook_url in notified_urls or [alert.webhook_url]:
await deliver(
result = await deliver(
dispatcher,
alert,
payload,
Expand All @@ -205,7 +206,20 @@ async def dispatch_alert(
change_pct=evaluation["change_pct"],
webhook_url=webhook_url,
)
triggered += 1
if result.get("success"):
triggered += 1
else:
all_delivered = False
if not all_delivered:
# A resolved notification failed to deliver: do NOT clear fired_at or
# advance to the "resolved" state this tick, so the next evaluation
# tick re-attempts. Otherwise the resolved page is lost for good and
# the receiver keeps treating the incident as open — the same silent
# loss the firing/escalation paths were hardened against.
# (audit_30_06_26.md C1; mirrors audit_28_06_26.md #4)
alert.last_condition_triggered = False
alert.updated_at = now
return alert, True, triggered
alert.state = "resolved"
alert.resolved_at = now
alert.fired_at = None
Expand Down
52 changes: 38 additions & 14 deletions src/serving/api/analytics.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,15 @@
Awaitable[Response],
]

# Analytics runs before the route's Pydantic validation, so cap the persisted
# query text defensively at the same bound /v1/query enforces (1000 chars).
_MAX_QUERY_TEXT_CHARS = 1000

# Auth/throttle outcomes whose requests must never be recorded: the analytics
# middleware sits OUTSIDE AuthMiddleware, so recording these would let
# unauthenticated/rejected traffic drive un-throttled DB writes. (audit_30 S1)
_UNRECORDED_STATUS_CODES = frozenset({401, 403, 429, 503})


def ensure_analytics_table(db_path: Path | str) -> None:
for attempt in range(10):
Expand Down Expand Up @@ -98,22 +107,35 @@ async def receive() -> Message:
response = await call_next(request)
# failure telemetry is best-effort before re-raising the original error
except Exception: # nosec B110
# Record downstream failures before re-raising them through the client stack.
_schedule_session_write(
request.app.state.auth_manager.db_path,
request_id,
_build_session_record(
request=request,
request_id=request_id,
status_code=500,
duration_ms=(time.perf_counter() - started_at) * 1000,
cache_hit=False,
body=body,
),
)
# Record downstream failures before re-raising — but only for an
# authenticated request, so an unauthenticated error can't drive an
# un-throttled DB write/thread spawn. (audit_30_06_26.md S1)
if getattr(request.state, "tenant_key", None) is not None:
_schedule_session_write(
request.app.state.auth_manager.db_path,
request_id,
_build_session_record(
request=request,
request_id=request_id,
status_code=500,
duration_ms=(time.perf_counter() - started_at) * 1000,
cache_hit=False,
body=body,
),
)
raise

response.headers["X-Request-Id"] = request_id
# Record analytics only for authenticated, non-rejected requests. This
# middleware runs OUTSIDE AuthMiddleware, so without this gate an
# unauthenticated/failed/throttled request would spawn a DB-writing
# thread and persist an attacker-controlled body with neither auth nor
# rate-limiting in front of it — a remote DoS. (audit_30_06_26.md S1)
if (
getattr(request.state, "tenant_key", None) is None
or response.status_code in _UNRECORDED_STATUS_CODES
):
return response
background = response.background
if background is None:
background = BackgroundTasks()
Expand Down Expand Up @@ -520,7 +542,9 @@ def _build_session_record(
payload = {}
question = payload.get("question")
if isinstance(question, str):
query_text = question
# Truncate: analytics runs before the route validates the body,
# so an oversized question would otherwise be persisted verbatim.
query_text = question[:_MAX_QUERY_TEXT_CHARS]

return {
"request_id": request_id,
Expand Down
15 changes: 15 additions & 0 deletions src/serving/api/routers/agent_query.py
Original file line number Diff line number Diff line change
Expand Up @@ -516,6 +516,21 @@ async def get_metric(
)
_ensure_metric_allowed(req, metric_name)

# Reject windows the metric doesn't declare. The engine silently maps an
# unknown window to "1 hour" (and active_sessions ignores it entirely), so
# without this the response would echo the *requested* window while
# returning a *different* window's value, and each bogus window string would
# pollute the metric cache. Mirrors the alerts router. (audit_30_06_26.md A1)
available_windows = catalog.metrics[metric_name].available_windows
if window not in available_windows:
raise HTTPException(
status_code=422,
detail=(
f"Unsupported window '{window}' for metric '{metric_name}'. "
f"Available: {available_windows}"
),
)

as_of = _normalize_as_of(as_of)
as_of_text = _as_of_iso_text(as_of)
try:
Expand Down
96 changes: 53 additions & 43 deletions src/serving/api/routers/lineage.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

from fastapi import APIRouter, HTTPException, Request
from pydantic import BaseModel
from starlette.concurrency import run_in_threadpool

router = APIRouter(prefix="/v1/lineage", tags=["lineage"])

Expand Down Expand Up @@ -66,48 +67,57 @@ def _quality_score(rows: list[dict], *, default: float | None = None) -> float |


def _fetch_matching_events(request: Request, entity_type: str, entity_id: str) -> list[dict]:
conn = request.app.state.query_engine._conn
columns = {row[1] for row in conn.execute("PRAGMA table_info('pipeline_events')").fetchall()}
if "entity_id" not in columns:
return []

tenant_id = getattr(request.state, "tenant_id", None)
if tenant_id is not None and "tenant_id" not in columns and tenant_id != "default":
return []
time_column = "processed_at" if "processed_at" in columns else "created_at"
select_columns = [
"event_id",
"topic",
f"{time_column} AS processed_at",
(
"COALESCE(tenant_id, 'default') AS tenant_id"
if "tenant_id" in columns
else "'default' AS tenant_id"
),
"event_type" if "event_type" in columns else "NULL AS event_type",
"entity_id",
"latency_ms" if "latency_ms" in columns else "NULL AS latency_ms",
]
where_clauses = ["entity_id = ?"]
params: list[object] = [entity_id]
if "entity_type" in columns:
where_clauses.append("entity_type = ?")
params.append(entity_type)
if tenant_id is not None and "tenant_id" in columns:
where_clauses.append("COALESCE(tenant_id, 'default') = ?")
params.append(str(tenant_id))

cursor = conn.execute(
(
# selected columns come from the schema allowlist
f"SELECT {', '.join(select_columns)} " # nosec B608
"FROM pipeline_events "
f"WHERE {' AND '.join(where_clauses)} ORDER BY {time_column} ASC"
),
params,
)
result_columns = [description[0] for description in cursor.description]
return [dict(zip(result_columns, row, strict=False)) for row in cursor.fetchall()]
# Runs on a worker thread (get_lineage offloads it) so the full-scan can't
# block the event loop and starve every other tenant on the worker. Use a
# dedicated cursor rather than the shared connection object so concurrent
# reads don't collide on it. (audit_30_06_26.md A2)
cursor = request.app.state.query_engine._conn.cursor()
try:
columns = {
row[1] for row in cursor.execute("PRAGMA table_info('pipeline_events')").fetchall()
}
if "entity_id" not in columns:
return []

tenant_id = getattr(request.state, "tenant_id", None)
if tenant_id is not None and "tenant_id" not in columns and tenant_id != "default":
return []
time_column = "processed_at" if "processed_at" in columns else "created_at"
select_columns = [
"event_id",
"topic",
f"{time_column} AS processed_at",
(
"COALESCE(tenant_id, 'default') AS tenant_id"
if "tenant_id" in columns
else "'default' AS tenant_id"
),
"event_type" if "event_type" in columns else "NULL AS event_type",
"entity_id",
"latency_ms" if "latency_ms" in columns else "NULL AS latency_ms",
]
where_clauses = ["entity_id = ?"]
params: list[object] = [entity_id]
if "entity_type" in columns:
where_clauses.append("entity_type = ?")
params.append(entity_type)
if tenant_id is not None and "tenant_id" in columns:
where_clauses.append("COALESCE(tenant_id, 'default') = ?")
params.append(str(tenant_id))

cursor.execute(
(
# selected columns come from the schema allowlist
f"SELECT {', '.join(select_columns)} " # nosec B608
"FROM pipeline_events "
f"WHERE {' AND '.join(where_clauses)} ORDER BY {time_column} ASC"
),
params,
)
result_columns = [description[0] for description in cursor.description]
return [dict(zip(result_columns, row, strict=False)) for row in cursor.fetchall()]
finally:
cursor.close()


@router.get(
Expand Down Expand Up @@ -137,7 +147,7 @@ async def get_lineage(entity_type: str, entity_id: str, request: Request) -> Lin
detail=f"API key '{tenant_key.name}' cannot access entity type '{entity_type}'.",
)

rows = _fetch_matching_events(request, entity_type, entity_id)
rows = await run_in_threadpool(_fetch_matching_events, request, entity_type, entity_id)
if not rows:
raise HTTPException(
status_code=404,
Expand Down
29 changes: 23 additions & 6 deletions src/serving/api/routers/slo.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@

from fastapi import APIRouter, FastAPI, HTTPException, Request
from pydantic import BaseModel, Field
from starlette.concurrency import run_in_threadpool

try:
import yaml
Expand Down Expand Up @@ -60,8 +61,11 @@ def load_slos(path: Path) -> list[SLODefinition]:


def _pipeline_event_columns(request: Request) -> set[str]:
conn = request.app.state.query_engine._conn
return {row[1] for row in conn.execute("PRAGMA table_info('pipeline_events')").fetchall()}
cursor = request.app.state.query_engine._conn.cursor()
try:
return {row[1] for row in cursor.execute("PRAGMA table_info('pipeline_events')").fetchall()}
finally:
cursor.close()


def _time_column(columns: set[str]) -> str | None:
Expand Down Expand Up @@ -96,7 +100,10 @@ def _measurement_value(
if time_column is None:
return None

conn = request.app.state.query_engine._conn
# A dedicated cursor (not the shared connection object) keeps concurrent
# /v1/slo requests — offloaded to worker threads by get_slos — from
# colliding on the connection. (audit_30_06_26.md A2)
conn = request.app.state.query_engine._conn.cursor()
window = f"{definition.window_days} days"
tenant_sql, tenant_params = _tenant_filter(columns, _tenant_id(request))

Expand Down Expand Up @@ -195,9 +202,12 @@ def _error_budget_remaining(target: float, current: float) -> float:
return max(0.0, min(1.0, 1.0 - consumed))


@router.get("", response_model=SLOResponse)
async def get_slos(request: Request) -> SLOResponse:
definitions = load_slos(get_slo_config_path(request.app))
def _compute_slo_statuses(request: Request, definitions: list[SLODefinition]) -> list[SLOStatus]:
# Runs on a worker thread (get_slos offloads it) so the per-SLO aggregate
# scans can't block the event loop for every tenant on the worker. The
# helpers each open their own short-lived cursor, so concurrent /v1/slo
# requests on different threads never collide on the shared connection.
# (audit_30_06_26.md A2)
columns = _pipeline_event_columns(request)
time_column = _time_column(columns)
statuses = []
Expand Down Expand Up @@ -232,4 +242,11 @@ async def get_slos(request: Request) -> SLOResponse:
)
)

return statuses


@router.get("", response_model=SLOResponse)
async def get_slos(request: Request) -> SLOResponse:
definitions = load_slos(get_slo_config_path(request.app))
statuses = await run_in_threadpool(_compute_slo_statuses, request, definitions)
return SLOResponse(slos=statuses)
41 changes: 36 additions & 5 deletions src/serving/api/routers/stream.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,11 @@
from collections.abc import AsyncIterator
from datetime import datetime

import duckdb
from fastapi import APIRouter, Request
from fastapi.responses import StreamingResponse
from opentelemetry import trace
from starlette.concurrency import run_in_threadpool

router = APIRouter(prefix="/v1/stream", tags=["stream"])
tracer = trace.get_tracer("agentflow.api")
Expand All @@ -19,9 +21,38 @@ async def fetch_recent_events(
entity_id: str | None = None,
limit: int = 10,
) -> list[dict[str, object]]:
"""Fetch recent pipeline events from DuckDB with optional filters."""
conn = request.app.state.query_engine._conn
columns = {row[1] for row in conn.execute("PRAGMA table_info('pipeline_events')").fetchall()}
"""Fetch recent pipeline events from DuckDB with optional filters.

Offloaded to a worker thread: the SSE generator calls this once per second
per open stream, so running the scan inline would block the event loop (and
every other tenant on the worker) for the scan's duration. (audit_30 A2)
"""
return await run_in_threadpool(_fetch_recent_events_sync, request, event_type, entity_id, limit)


def _fetch_recent_events_sync(
request: Request,
event_type: str | None,
entity_id: str | None,
limit: int,
) -> list[dict[str, object]]:
# Use a dedicated cursor (not the shared connection object) so concurrent
# streams running on different worker threads don't collide on it.
cursor = request.app.state.query_engine._conn.cursor()
try:
return _fetch_recent_events_with_cursor(cursor, request, event_type, entity_id, limit)
finally:
cursor.close()


def _fetch_recent_events_with_cursor(
cursor: duckdb.DuckDBPyConnection,
request: Request,
event_type: str | None,
entity_id: str | None,
limit: int,
) -> list[dict[str, object]]:
columns = {row[1] for row in cursor.execute("PRAGMA table_info('pipeline_events')").fetchall()}
time_column = "processed_at" if "processed_at" in columns else "created_at"
tenant_id = getattr(request.state, "tenant_id", None)
if tenant_id is not None and "tenant_id" not in columns and tenant_id != "default":
Expand Down Expand Up @@ -80,8 +111,8 @@ async def fetch_recent_events(
sql = f"{sql} ORDER BY {time_column} DESC LIMIT ?"
params.append(limit)

rows = conn.execute(sql, params).fetchall()
result_columns = [description[0] for description in conn.description]
rows = cursor.execute(sql, params).fetchall()
result_columns = [description[0] for description in cursor.description]
return [dict(zip(result_columns, row, strict=False)) for row in rows]


Expand Down
Loading
Loading