Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 24 additions & 0 deletions src/db_concurrency.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
"""Process-wide serialization for DuckDB catalog DDL.

DuckDB raises a ``Catalog write-write conflict`` when two threads run catalog
DDL (CREATE/ALTER) concurrently — even on *different* tables — because the
catalog is a single versioned structure. The serving API offloads its read
handlers onto worker threads (``run_in_threadpool``), and each lazily ensures
its backing table (``CREATE TABLE IF NOT EXISTS`` / ``ALTER ... ADD COLUMN IF
NOT EXISTS``) on a fresh cursor; on a cold DB (the default serving store is
``:memory:``, cold on every restart) a concurrent burst raced and surfaced
HTTP 500s. Serialize every such lazy table-creation behind this one lock so the
first thread creates and the rest see a warm no-op. (audit_30 A2 follow-up:
the #120 read-handler offload race)
"""

from __future__ import annotations

import threading

# A single process-wide lock guarding all lazy DuckDB catalog DDL. Held only for
# the brief CREATE/ALTER IF NOT EXISTS (a near-instant no-op once warm), never
# around query execution and never nested, so it cannot deadlock. A cross-table
# conflict (not just same-table) is real in DuckDB, so the lock is shared across
# every ensure_*_table helper rather than one lock per table.
catalog_ddl_lock = threading.Lock()
42 changes: 24 additions & 18 deletions src/processing/event_replayer.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import structlog
from opentelemetry import trace

from src.db_concurrency import catalog_ddl_lock
from src.processing.outbox import OutboxProcessor, ensure_outbox_table
from src.processing.tracing import inject_trace_to_kafka_headers, telemetry_disabled
from src.quality.validators.schema_validator import validate_event
Expand Down Expand Up @@ -40,25 +41,30 @@ class ReplayResult:


def ensure_dead_letter_table(conn: duckdb.DuckDBPyConnection) -> None:
conn.execute(
"""
CREATE TABLE IF NOT EXISTS dead_letter_events (
event_id TEXT PRIMARY KEY,
tenant_id TEXT DEFAULT 'default',
event_type TEXT,
payload JSON,
failure_reason TEXT,
failure_detail TEXT,
received_at TIMESTAMP,
retry_count INTEGER DEFAULT 0,
last_retried_at TIMESTAMP,
status TEXT DEFAULT 'failed'
# Serialize the lazy DDL: the offloaded read handlers call this on worker
# threads, and concurrent CREATE/ALTER on a cold DuckDB catalog conflicts.
# (audit_30 A2 follow-up: #120 offload race)
with catalog_ddl_lock:
conn.execute(
"""
CREATE TABLE IF NOT EXISTS dead_letter_events (
event_id TEXT PRIMARY KEY,
tenant_id TEXT DEFAULT 'default',
event_type TEXT,
payload JSON,
failure_reason TEXT,
failure_detail TEXT,
received_at TIMESTAMP,
retry_count INTEGER DEFAULT 0,
last_retried_at TIMESTAMP,
status TEXT DEFAULT 'failed'
)
"""
)
conn.execute(
"ALTER TABLE dead_letter_events "
"ADD COLUMN IF NOT EXISTS tenant_id TEXT DEFAULT 'default'"
)
"""
)
conn.execute(
"ALTER TABLE dead_letter_events ADD COLUMN IF NOT EXISTS tenant_id TEXT DEFAULT 'default'"
)


class EventReplayer:
Expand Down
50 changes: 28 additions & 22 deletions src/serving/api/alerts/history.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,34 +6,40 @@

import duckdb

from src.db_concurrency import catalog_ddl_lock

if TYPE_CHECKING:
from .dispatcher import AlertRule


def ensure_alert_history_table(conn: duckdb.DuckDBPyConnection) -> None:
conn.execute(
"""
CREATE TABLE IF NOT EXISTS alert_history (
delivery_id VARCHAR,
alert_id VARCHAR,
alert_name VARCHAR,
metric VARCHAR,
current_value DOUBLE,
previous_value DOUBLE,
change_pct DOUBLE,
threshold DOUBLE,
condition VARCHAR,
metric_window VARCHAR,
tenant VARCHAR,
event_type VARCHAR,
status_code INTEGER,
success BOOLEAN,
error TEXT,
payload JSON,
triggered_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
# Serialize the lazy DDL: the offloaded read handler calls this on a worker
# thread, and concurrent CREATE on a cold DuckDB catalog conflicts (across
# tables too). (audit_30 A2 follow-up: #120 offload race)
with catalog_ddl_lock:
conn.execute(
"""
CREATE TABLE IF NOT EXISTS alert_history (
delivery_id VARCHAR,
alert_id VARCHAR,
alert_name VARCHAR,
metric VARCHAR,
current_value DOUBLE,
previous_value DOUBLE,
change_pct DOUBLE,
threshold DOUBLE,
condition VARCHAR,
metric_window VARCHAR,
tenant VARCHAR,
event_type VARCHAR,
status_code INTEGER,
success BOOLEAN,
error TEXT,
payload JSON,
triggered_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
)
"""
)
"""
)


def get_alert_history(conn: duckdb.DuckDBPyConnection, alert_id: str) -> list[dict]:
Expand Down
8 changes: 7 additions & 1 deletion src/serving/api/routers/deadletter.py
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,13 @@ def _read_cursor(request: Request) -> duckdb.DuckDBPyConnection:
different worker threads from colliding on the connection. (audit_30_06_26.md A2)
"""
cursor = cast(duckdb.DuckDBPyConnection, request.app.state.query_engine._conn).cursor()
ensure_dead_letter_table(cursor)
try:
ensure_dead_letter_table(cursor)
except Exception:
# Don't leak the freshly-opened cursor if the lazy DDL fails before the
# handler's own try/finally takes over.
cursor.close()
raise
return cursor


Expand Down
33 changes: 19 additions & 14 deletions src/serving/api/webhook_dispatcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
from fastapi import FastAPI
from pydantic import BaseModel, Field

from src.db_concurrency import catalog_ddl_lock
from src.serving.api.egress_guard import UnsafeEgressURLError, validate_public_url

try:
Expand Down Expand Up @@ -124,21 +125,25 @@ def deactivate_webhook(path: Path, webhook_id: str, tenant: str) -> bool:


def ensure_webhook_deliveries_table(conn: duckdb.DuckDBPyConnection) -> None:
conn.execute(
"""
CREATE TABLE IF NOT EXISTS webhook_deliveries (
delivery_id VARCHAR,
webhook_id VARCHAR,
event_id VARCHAR,
event_type VARCHAR,
attempt INTEGER,
status_code INTEGER,
success BOOLEAN,
error TEXT,
delivered_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
# Serialize the lazy DDL: the offloaded read handler calls this on a worker
# thread, and concurrent CREATE on a cold DuckDB catalog conflicts (across
# tables too). (audit_30 A2 follow-up: #120 offload race)
with catalog_ddl_lock:
conn.execute(
"""
CREATE TABLE IF NOT EXISTS webhook_deliveries (
delivery_id VARCHAR,
webhook_id VARCHAR,
event_id VARCHAR,
event_type VARCHAR,
attempt INTEGER,
status_code INTEGER,
success BOOLEAN,
error TEXT,
delivered_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
)
"""
)
"""
)


def get_delivery_logs(conn: duckdb.DuckDBPyConnection, webhook_id: str) -> list[dict]:
Expand Down
85 changes: 75 additions & 10 deletions src/serving/masking.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,31 @@

import sqlglot
from sqlglot import exp
from sqlglot.lineage import lineage

try:
import yaml
except ImportError: # pragma: no cover
yaml = None # type: ignore[assignment]


class _UnresolvedSources(frozenset[str]):
"""Sentinel column-source set whose membership test is always ``True``.

Used when projection lineage can't be resolved for an output column (an
ambiguous reference, or an exotic shape sqlglot's lineage rejects). Treating
every masking rule field as a possible source masks the column **fail
closed** rather than letting a renamed/derived PII value slip through as
cleartext. (audit_30 D2 follow-up)
"""

def __contains__(self, item: object) -> bool:
return True


_UNRESOLVED_SOURCES: frozenset[str] = _UnresolvedSources()


class PiiMasker:
def __init__(self, config_path: str | Path = "config/pii_fields.yaml"):
if yaml is None: # pragma: no cover
Expand All @@ -25,7 +43,7 @@ def mask(
data: dict,
tenant: str,
*,
source_columns: dict[str, set[str]] | None = None,
source_columns: dict[str, frozenset[str]] | None = None,
) -> dict:
masking = self._config.get("masking", {})
if tenant in masking.get("pii_exempt_tenants", []):
Expand All @@ -46,7 +64,7 @@ def _output_columns_for_field(
self,
field: str,
data: dict,
source_columns: dict[str, set[str]] | None,
source_columns: dict[str, frozenset[str]] | None,
) -> set[str]:
"""Which result columns to mask for a rule's source ``field``.

Expand Down Expand Up @@ -93,12 +111,19 @@ def mask_query_results(
]
return masked_rows, masked_rows != rows

def _projection_source_columns(self, sql: str) -> dict[str, set[str]] | None:
def _projection_source_columns(self, sql: str) -> dict[str, frozenset[str]] | None:
"""Map each output column to the source column names that feed it.

Returns ``None`` when the projection can't be resolved precisely — a
``SELECT *`` (whose outputs are the source names verbatim) or unparseable
SQL — so masking falls back to matching rule fields against output names.
Resolves *true* projection lineage (through subqueries, CTEs and unions)
so a PII column renamed at any nesting depth is masked by what it is
built from, not by its output name. The shallow one-level resolver this
replaces saw only the outermost projection, so an inner rename —
``SELECT contact FROM (SELECT email AS contact FROM users_enriched)`` —
returned cleartext. (audit_30 D2 follow-up: subquery/CTE-alias bypass)

Returns ``None`` for a ``SELECT *`` (whose outputs are the source names
verbatim, so name-matching is correct) or unparseable SQL, so masking
falls back to matching rule fields against output names.
"""
try:
parsed = sqlglot.parse_one(sql, read="duckdb")
Expand All @@ -107,16 +132,56 @@ def _projection_source_columns(self, sql: str) -> dict[str, set[str]] | None:
select = parsed.find(exp.Select)
if select is None:
return None
mapping: dict[str, set[str]] = {}
if any(
isinstance(projection, exp.Star) or projection.find(exp.Star) is not None
for projection in select.expressions
):
return None
mapping: dict[str, frozenset[str]] = {}
for projection in select.expressions:
if isinstance(projection, exp.Star) or projection.find(exp.Star) is not None:
return None
output_name = projection.alias_or_name
if not output_name:
continue
mapping[output_name] = {col.name for col in projection.find_all(exp.Column) if col.name}
# Union the *deep* lineage sources (which trace a renamed PII column
# through subqueries/CTEs) with the *shallow* columns named directly
# in this projection. Lineage is blind through an inner ``SELECT *``
# (no schema to expand it), where the shallow scan still catches a
# direct ``email AS contact``; lineage catches the inner rename the
# shallow scan misses. Either alone leaks one shape — the union
# closes both.
deep = self._lineage_source_columns(output_name, sql)
if isinstance(deep, _UnresolvedSources):
mapping[output_name] = deep
continue
shallow = frozenset(col.name for col in projection.find_all(exp.Column) if col.name)
mapping[output_name] = deep | shallow
return mapping

def _lineage_source_columns(self, output_name: str, sql: str) -> frozenset[str]:
"""The ultimate source column names feeding ``output_name``.

Walks the lineage graph to its leaves (across subqueries, CTEs and union
branches) and returns the bare column names. Fails closed — a sentinel
that matches every rule field — when lineage can't be resolved, so an
unresolved column is masked rather than leaked.
"""
try:
root = lineage(output_name, sql, dialect="duckdb")
except Exception: # noqa: BLE001 - any lineage failure must fail closed
return _UNRESOLVED_SOURCES
sources: set[str] = set()
stack = [root]
seen: set[int] = set()
while stack:
node = stack.pop()
if id(node) in seen:
continue
seen.add(id(node))
if not node.downstream and node.name:
sources.add(node.name.split(".")[-1])
stack.extend(node.downstream)
return frozenset(sources)

def _extract_table_names(self, sql: str) -> set[str]:
try:
parsed = sqlglot.parse_one(sql, read="duckdb")
Expand Down
21 changes: 21 additions & 0 deletions src/serving/semantic_layer/query/sql_builder.py
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,27 @@ def _scope_sql(self: SQLBuilderHost, sql: str, tenant_id: str | None) -> str:
known_tables.add("pipeline_events")

parsed = sqlglot.parse_one(sql, dialect="duckdb")
# A recursive CTE *can* self-reference, so sqlglot keeps its name in its
# own body scope and the cte_sources skip below mis-classifies the
# physical *anchor* reference (the first UNION branch, which cannot
# self-reference) as a CTE reference — it is never re-scoped, stays bound
# to the shared `main` schema and leaks every tenant's rows. There is no
# safe re-scoping of a recursive anchor (genuinely ambiguous with the
# recursion) and no legitimate query names a recursive CTE after a
# physical table, so fail closed. validate_nl_sql rejects this on the NL
# path; this guards any other caller. (audit_30 D1 follow-up: WITH
# RECURSIVE bypass of f153b23)
recursive_shadow = sorted(
{
cte.alias_or_name.lower()
for with_node in parsed.find_all(exp.With)
if with_node.args.get("recursive")
for cte in with_node.expressions
if cte.alias_or_name and cte.alias_or_name.lower() in known_tables
}
)
if recursive_shadow:
raise ValueError(f"Recursive CTE shadows tenant-scoped table(s): {recursive_shadow}")
# Classify every table reference by scope so a CTE whose name collides
# with a real table — e.g. `WITH orders_v2 AS (SELECT * FROM orders_v2)
# SELECT * FROM orders_v2` — cannot hide the *physical* inner reference
Expand Down
20 changes: 20 additions & 0 deletions src/serving/semantic_layer/sql_guard.py
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,26 @@ def validate_nl_sql(sql: str, allowed_tables: set[str]) -> None:
cte.alias_or_name.lower() for cte in statement.find_all(exp.CTE) if cte.alias_or_name
}
normalized_allowed_tables = {table.lower() for table in allowed_tables}
# A recursive CTE whose name shadows a real (allowed) table is a cross-tenant
# read vector. A recursive CTE *can* self-reference, so its name lives in its
# own body scope; the leaf-name allow-list excludes the CTE name, AND
# _scope_sql's cte_sources skip mis-classifies the physical *anchor*
# reference (the first UNION branch, which cannot self-reference) as a CTE
# reference — so it stays bare, binds to the shared `main` schema, and leaks
# every tenant's rows. Non-recursive shadows are safely re-scoped, but the
# recursive anchor cannot be, so reject the shape outright. (audit_30 D1
# follow-up: WITH RECURSIVE bypass of f153b23)
recursive_shadows = {
cte.alias_or_name.lower()
for with_node in statement.find_all(exp.With)
if with_node.args.get("recursive")
for cte in with_node.expressions
if cte.alias_or_name and cte.alias_or_name.lower() in normalized_allowed_tables
}
if recursive_shadows:
raise UnsafeSQLError(
f"Recursive CTE shadows physical table(s): {sorted(recursive_shadows)}"
)
unknown_tables = {
table.name.lower()
for table in statement.find_all(exp.Table)
Expand Down
Loading