diff --git a/src/db_concurrency.py b/src/db_concurrency.py new file mode 100644 index 0000000..e30fd3d --- /dev/null +++ b/src/db_concurrency.py @@ -0,0 +1,24 @@ +"""Process-wide serialization for DuckDB catalog DDL. + +DuckDB raises a ``Catalog write-write conflict`` when two threads run catalog +DDL (CREATE/ALTER) concurrently — even on *different* tables — because the +catalog is a single versioned structure. The serving API offloads its read +handlers onto worker threads (``run_in_threadpool``), and each lazily ensures +its backing table (``CREATE TABLE IF NOT EXISTS`` / ``ALTER ... ADD COLUMN IF +NOT EXISTS``) on a fresh cursor; on a cold DB (the default serving store is +``:memory:``, cold on every restart) a concurrent burst raced and surfaced +HTTP 500s. Serialize every such lazy table-creation behind this one lock so the +first thread creates and the rest see a warm no-op. (audit_30 A2 follow-up: +the #120 read-handler offload race) +""" + +from __future__ import annotations + +import threading + +# A single process-wide lock guarding all lazy DuckDB catalog DDL. Held only for +# the brief CREATE/ALTER IF NOT EXISTS (a near-instant no-op once warm), never +# around query execution and never nested, so it cannot deadlock. A cross-table +# conflict (not just same-table) is real in DuckDB, so the lock is shared across +# every ensure_*_table helper rather than one lock per table. +catalog_ddl_lock = threading.Lock() diff --git a/src/processing/event_replayer.py b/src/processing/event_replayer.py index fc7a6af..b701374 100644 --- a/src/processing/event_replayer.py +++ b/src/processing/event_replayer.py @@ -12,6 +12,7 @@ import structlog from opentelemetry import trace +from src.db_concurrency import catalog_ddl_lock from src.processing.outbox import OutboxProcessor, ensure_outbox_table from src.processing.tracing import inject_trace_to_kafka_headers, telemetry_disabled from src.quality.validators.schema_validator import validate_event @@ -40,25 +41,30 @@ class ReplayResult: def ensure_dead_letter_table(conn: duckdb.DuckDBPyConnection) -> None: - conn.execute( - """ - CREATE TABLE IF NOT EXISTS dead_letter_events ( - event_id TEXT PRIMARY KEY, - tenant_id TEXT DEFAULT 'default', - event_type TEXT, - payload JSON, - failure_reason TEXT, - failure_detail TEXT, - received_at TIMESTAMP, - retry_count INTEGER DEFAULT 0, - last_retried_at TIMESTAMP, - status TEXT DEFAULT 'failed' + # Serialize the lazy DDL: the offloaded read handlers call this on worker + # threads, and concurrent CREATE/ALTER on a cold DuckDB catalog conflicts. + # (audit_30 A2 follow-up: #120 offload race) + with catalog_ddl_lock: + conn.execute( + """ + CREATE TABLE IF NOT EXISTS dead_letter_events ( + event_id TEXT PRIMARY KEY, + tenant_id TEXT DEFAULT 'default', + event_type TEXT, + payload JSON, + failure_reason TEXT, + failure_detail TEXT, + received_at TIMESTAMP, + retry_count INTEGER DEFAULT 0, + last_retried_at TIMESTAMP, + status TEXT DEFAULT 'failed' + ) + """ + ) + conn.execute( + "ALTER TABLE dead_letter_events " + "ADD COLUMN IF NOT EXISTS tenant_id TEXT DEFAULT 'default'" ) - """ - ) - conn.execute( - "ALTER TABLE dead_letter_events ADD COLUMN IF NOT EXISTS tenant_id TEXT DEFAULT 'default'" - ) class EventReplayer: diff --git a/src/serving/api/alerts/history.py b/src/serving/api/alerts/history.py index ae50cbf..6c34eb9 100644 --- a/src/serving/api/alerts/history.py +++ b/src/serving/api/alerts/history.py @@ -6,34 +6,40 @@ import duckdb +from src.db_concurrency import catalog_ddl_lock + if TYPE_CHECKING: from .dispatcher import AlertRule def ensure_alert_history_table(conn: duckdb.DuckDBPyConnection) -> None: - conn.execute( - """ - CREATE TABLE IF NOT EXISTS alert_history ( - delivery_id VARCHAR, - alert_id VARCHAR, - alert_name VARCHAR, - metric VARCHAR, - current_value DOUBLE, - previous_value DOUBLE, - change_pct DOUBLE, - threshold DOUBLE, - condition VARCHAR, - metric_window VARCHAR, - tenant VARCHAR, - event_type VARCHAR, - status_code INTEGER, - success BOOLEAN, - error TEXT, - payload JSON, - triggered_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP + # Serialize the lazy DDL: the offloaded read handler calls this on a worker + # thread, and concurrent CREATE on a cold DuckDB catalog conflicts (across + # tables too). (audit_30 A2 follow-up: #120 offload race) + with catalog_ddl_lock: + conn.execute( + """ + CREATE TABLE IF NOT EXISTS alert_history ( + delivery_id VARCHAR, + alert_id VARCHAR, + alert_name VARCHAR, + metric VARCHAR, + current_value DOUBLE, + previous_value DOUBLE, + change_pct DOUBLE, + threshold DOUBLE, + condition VARCHAR, + metric_window VARCHAR, + tenant VARCHAR, + event_type VARCHAR, + status_code INTEGER, + success BOOLEAN, + error TEXT, + payload JSON, + triggered_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP + ) + """ ) - """ - ) def get_alert_history(conn: duckdb.DuckDBPyConnection, alert_id: str) -> list[dict]: diff --git a/src/serving/api/routers/deadletter.py b/src/serving/api/routers/deadletter.py index 4d4aec5..9d742ab 100644 --- a/src/serving/api/routers/deadletter.py +++ b/src/serving/api/routers/deadletter.py @@ -77,7 +77,13 @@ def _read_cursor(request: Request) -> duckdb.DuckDBPyConnection: different worker threads from colliding on the connection. (audit_30_06_26.md A2) """ cursor = cast(duckdb.DuckDBPyConnection, request.app.state.query_engine._conn).cursor() - ensure_dead_letter_table(cursor) + try: + ensure_dead_letter_table(cursor) + except Exception: + # Don't leak the freshly-opened cursor if the lazy DDL fails before the + # handler's own try/finally takes over. + cursor.close() + raise return cursor diff --git a/src/serving/api/webhook_dispatcher.py b/src/serving/api/webhook_dispatcher.py index 08ccacd..2b6e027 100644 --- a/src/serving/api/webhook_dispatcher.py +++ b/src/serving/api/webhook_dispatcher.py @@ -17,6 +17,7 @@ from fastapi import FastAPI from pydantic import BaseModel, Field +from src.db_concurrency import catalog_ddl_lock from src.serving.api.egress_guard import UnsafeEgressURLError, validate_public_url try: @@ -124,21 +125,25 @@ def deactivate_webhook(path: Path, webhook_id: str, tenant: str) -> bool: def ensure_webhook_deliveries_table(conn: duckdb.DuckDBPyConnection) -> None: - conn.execute( - """ - CREATE TABLE IF NOT EXISTS webhook_deliveries ( - delivery_id VARCHAR, - webhook_id VARCHAR, - event_id VARCHAR, - event_type VARCHAR, - attempt INTEGER, - status_code INTEGER, - success BOOLEAN, - error TEXT, - delivered_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP + # Serialize the lazy DDL: the offloaded read handler calls this on a worker + # thread, and concurrent CREATE on a cold DuckDB catalog conflicts (across + # tables too). (audit_30 A2 follow-up: #120 offload race) + with catalog_ddl_lock: + conn.execute( + """ + CREATE TABLE IF NOT EXISTS webhook_deliveries ( + delivery_id VARCHAR, + webhook_id VARCHAR, + event_id VARCHAR, + event_type VARCHAR, + attempt INTEGER, + status_code INTEGER, + success BOOLEAN, + error TEXT, + delivered_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP + ) + """ ) - """ - ) def get_delivery_logs(conn: duckdb.DuckDBPyConnection, webhook_id: str) -> list[dict]: diff --git a/src/serving/masking.py b/src/serving/masking.py index 3027e03..8ca89c5 100644 --- a/src/serving/masking.py +++ b/src/serving/masking.py @@ -5,6 +5,7 @@ import sqlglot from sqlglot import exp +from sqlglot.lineage import lineage try: import yaml @@ -12,6 +13,23 @@ yaml = None # type: ignore[assignment] +class _UnresolvedSources(frozenset[str]): + """Sentinel column-source set whose membership test is always ``True``. + + Used when projection lineage can't be resolved for an output column (an + ambiguous reference, or an exotic shape sqlglot's lineage rejects). Treating + every masking rule field as a possible source masks the column **fail + closed** rather than letting a renamed/derived PII value slip through as + cleartext. (audit_30 D2 follow-up) + """ + + def __contains__(self, item: object) -> bool: + return True + + +_UNRESOLVED_SOURCES: frozenset[str] = _UnresolvedSources() + + class PiiMasker: def __init__(self, config_path: str | Path = "config/pii_fields.yaml"): if yaml is None: # pragma: no cover @@ -25,7 +43,7 @@ def mask( data: dict, tenant: str, *, - source_columns: dict[str, set[str]] | None = None, + source_columns: dict[str, frozenset[str]] | None = None, ) -> dict: masking = self._config.get("masking", {}) if tenant in masking.get("pii_exempt_tenants", []): @@ -46,7 +64,7 @@ def _output_columns_for_field( self, field: str, data: dict, - source_columns: dict[str, set[str]] | None, + source_columns: dict[str, frozenset[str]] | None, ) -> set[str]: """Which result columns to mask for a rule's source ``field``. @@ -93,12 +111,19 @@ def mask_query_results( ] return masked_rows, masked_rows != rows - def _projection_source_columns(self, sql: str) -> dict[str, set[str]] | None: + def _projection_source_columns(self, sql: str) -> dict[str, frozenset[str]] | None: """Map each output column to the source column names that feed it. - Returns ``None`` when the projection can't be resolved precisely — a - ``SELECT *`` (whose outputs are the source names verbatim) or unparseable - SQL — so masking falls back to matching rule fields against output names. + Resolves *true* projection lineage (through subqueries, CTEs and unions) + so a PII column renamed at any nesting depth is masked by what it is + built from, not by its output name. The shallow one-level resolver this + replaces saw only the outermost projection, so an inner rename — + ``SELECT contact FROM (SELECT email AS contact FROM users_enriched)`` — + returned cleartext. (audit_30 D2 follow-up: subquery/CTE-alias bypass) + + Returns ``None`` for a ``SELECT *`` (whose outputs are the source names + verbatim, so name-matching is correct) or unparseable SQL, so masking + falls back to matching rule fields against output names. """ try: parsed = sqlglot.parse_one(sql, read="duckdb") @@ -107,16 +132,56 @@ def _projection_source_columns(self, sql: str) -> dict[str, set[str]] | None: select = parsed.find(exp.Select) if select is None: return None - mapping: dict[str, set[str]] = {} + if any( + isinstance(projection, exp.Star) or projection.find(exp.Star) is not None + for projection in select.expressions + ): + return None + mapping: dict[str, frozenset[str]] = {} for projection in select.expressions: - if isinstance(projection, exp.Star) or projection.find(exp.Star) is not None: - return None output_name = projection.alias_or_name if not output_name: continue - mapping[output_name] = {col.name for col in projection.find_all(exp.Column) if col.name} + # Union the *deep* lineage sources (which trace a renamed PII column + # through subqueries/CTEs) with the *shallow* columns named directly + # in this projection. Lineage is blind through an inner ``SELECT *`` + # (no schema to expand it), where the shallow scan still catches a + # direct ``email AS contact``; lineage catches the inner rename the + # shallow scan misses. Either alone leaks one shape — the union + # closes both. + deep = self._lineage_source_columns(output_name, sql) + if isinstance(deep, _UnresolvedSources): + mapping[output_name] = deep + continue + shallow = frozenset(col.name for col in projection.find_all(exp.Column) if col.name) + mapping[output_name] = deep | shallow return mapping + def _lineage_source_columns(self, output_name: str, sql: str) -> frozenset[str]: + """The ultimate source column names feeding ``output_name``. + + Walks the lineage graph to its leaves (across subqueries, CTEs and union + branches) and returns the bare column names. Fails closed — a sentinel + that matches every rule field — when lineage can't be resolved, so an + unresolved column is masked rather than leaked. + """ + try: + root = lineage(output_name, sql, dialect="duckdb") + except Exception: # noqa: BLE001 - any lineage failure must fail closed + return _UNRESOLVED_SOURCES + sources: set[str] = set() + stack = [root] + seen: set[int] = set() + while stack: + node = stack.pop() + if id(node) in seen: + continue + seen.add(id(node)) + if not node.downstream and node.name: + sources.add(node.name.split(".")[-1]) + stack.extend(node.downstream) + return frozenset(sources) + def _extract_table_names(self, sql: str) -> set[str]: try: parsed = sqlglot.parse_one(sql, read="duckdb") diff --git a/src/serving/semantic_layer/query/sql_builder.py b/src/serving/semantic_layer/query/sql_builder.py index 904228d..5755c17 100644 --- a/src/serving/semantic_layer/query/sql_builder.py +++ b/src/serving/semantic_layer/query/sql_builder.py @@ -80,6 +80,27 @@ def _scope_sql(self: SQLBuilderHost, sql: str, tenant_id: str | None) -> str: known_tables.add("pipeline_events") parsed = sqlglot.parse_one(sql, dialect="duckdb") + # A recursive CTE *can* self-reference, so sqlglot keeps its name in its + # own body scope and the cte_sources skip below mis-classifies the + # physical *anchor* reference (the first UNION branch, which cannot + # self-reference) as a CTE reference — it is never re-scoped, stays bound + # to the shared `main` schema and leaks every tenant's rows. There is no + # safe re-scoping of a recursive anchor (genuinely ambiguous with the + # recursion) and no legitimate query names a recursive CTE after a + # physical table, so fail closed. validate_nl_sql rejects this on the NL + # path; this guards any other caller. (audit_30 D1 follow-up: WITH + # RECURSIVE bypass of f153b23) + recursive_shadow = sorted( + { + cte.alias_or_name.lower() + for with_node in parsed.find_all(exp.With) + if with_node.args.get("recursive") + for cte in with_node.expressions + if cte.alias_or_name and cte.alias_or_name.lower() in known_tables + } + ) + if recursive_shadow: + raise ValueError(f"Recursive CTE shadows tenant-scoped table(s): {recursive_shadow}") # Classify every table reference by scope so a CTE whose name collides # with a real table — e.g. `WITH orders_v2 AS (SELECT * FROM orders_v2) # SELECT * FROM orders_v2` — cannot hide the *physical* inner reference diff --git a/src/serving/semantic_layer/sql_guard.py b/src/serving/semantic_layer/sql_guard.py index 8b59f4b..944c982 100644 --- a/src/serving/semantic_layer/sql_guard.py +++ b/src/serving/semantic_layer/sql_guard.py @@ -89,6 +89,26 @@ def validate_nl_sql(sql: str, allowed_tables: set[str]) -> None: cte.alias_or_name.lower() for cte in statement.find_all(exp.CTE) if cte.alias_or_name } normalized_allowed_tables = {table.lower() for table in allowed_tables} + # A recursive CTE whose name shadows a real (allowed) table is a cross-tenant + # read vector. A recursive CTE *can* self-reference, so its name lives in its + # own body scope; the leaf-name allow-list excludes the CTE name, AND + # _scope_sql's cte_sources skip mis-classifies the physical *anchor* + # reference (the first UNION branch, which cannot self-reference) as a CTE + # reference — so it stays bare, binds to the shared `main` schema, and leaks + # every tenant's rows. Non-recursive shadows are safely re-scoped, but the + # recursive anchor cannot be, so reject the shape outright. (audit_30 D1 + # follow-up: WITH RECURSIVE bypass of f153b23) + recursive_shadows = { + cte.alias_or_name.lower() + for with_node in statement.find_all(exp.With) + if with_node.args.get("recursive") + for cte in with_node.expressions + if cte.alias_or_name and cte.alias_or_name.lower() in normalized_allowed_tables + } + if recursive_shadows: + raise UnsafeSQLError( + f"Recursive CTE shadows physical table(s): {sorted(recursive_shadows)}" + ) unknown_tables = { table.name.lower() for table in statement.find_all(exp.Table) diff --git a/tests/unit/test_catalog_ddl_concurrency.py b/tests/unit/test_catalog_ddl_concurrency.py new file mode 100644 index 0000000..df49705 --- /dev/null +++ b/tests/unit/test_catalog_ddl_concurrency.py @@ -0,0 +1,83 @@ +from __future__ import annotations + +import threading +from collections.abc import Callable + +import duckdb +import pytest + +import src.processing.event_replayer as event_replayer_module +import src.serving.api.alerts.history as history_module +import src.serving.api.webhook_dispatcher as webhook_module +from src.db_concurrency import catalog_ddl_lock +from src.processing.event_replayer import ensure_dead_letter_table +from src.serving.api.alerts.history import ensure_alert_history_table +from src.serving.api.webhook_dispatcher import ensure_webhook_deliveries_table + +Ensurer = Callable[[duckdb.DuckDBPyConnection], None] + +_ENSURERS: list[Ensurer] = [ + ensure_dead_letter_table, + ensure_alert_history_table, + ensure_webhook_deliveries_table, +] + + +def _hammer(conn: duckdb.DuckDBPyConnection, jobs: list[Ensurer]) -> list[Exception]: + """Run each ensure_* on a fresh cursor on ``conn``, all firing the DDL at once. + + A ``Barrier`` releases every thread into ``ensure_*`` simultaneously so the + cold-catalog write-write conflict is provoked deterministically: with the + lock removed this reliably raises on most threads; with it, every thread + serializes to a warm no-op and the list stays empty. + """ + errors: list[Exception] = [] + barrier = threading.Barrier(len(jobs)) + + def worker(ensure: Ensurer) -> None: + cursor = conn.cursor() + try: + barrier.wait() + ensure(cursor) + except Exception as exc: # noqa: BLE001 - capture the catalog conflict, if any + errors.append(exc) + finally: + cursor.close() + + threads = [threading.Thread(target=worker, args=(job,)) for job in jobs] + for thread in threads: + thread.start() + for thread in threads: + thread.join() + return errors + + +@pytest.mark.parametrize("ensure", _ENSURERS) +def test_ensure_table_concurrency_safe_same_table_cold_db(ensure: Ensurer) -> None: + # The #120 offload calls ensure_*_table on worker threads; on a cold DuckDB + # a concurrent burst raced on the catalog -> "Catalog write-write conflict" + # -> HTTP 500 (the serving DB default is :memory:, cold on every restart). + # The shared catalog DDL lock serializes creation. (audit_30 A2 follow-up) + conn = duckdb.connect(":memory:") + errors = _hammer(conn, [ensure] * 32) + assert errors == [], f"{ensure.__name__}: {errors[:2]}" + + +def test_ensure_tables_concurrency_safe_across_tables_cold_db() -> None: + # DuckDB raises a catalog write-write conflict even across *different* tables, + # so every ensure_*_table helper must share one lock (not a lock per table). + # 12 interleaved calls per table on a cold DB. + conn = duckdb.connect(":memory:") + errors = _hammer(conn, _ENSURERS * 12) + assert errors == [], errors[:3] + + +def test_catalog_ddl_lock_is_a_single_shared_instance() -> None: + # All three helpers must guard DDL with the *same* lock instance, or the + # cross-table conflict resurfaces. Pins against a future per-module lock. + assert ( + event_replayer_module.catalog_ddl_lock + is history_module.catalog_ddl_lock + is webhook_module.catalog_ddl_lock + is catalog_ddl_lock + ) diff --git a/tests/unit/test_masking.py b/tests/unit/test_masking.py index f163e87..e3e3549 100644 --- a/tests/unit/test_masking.py +++ b/tests/unit/test_masking.py @@ -193,6 +193,36 @@ def test_mask_query_results_masks_derived_pii_column(tmp_path: Path): assert rows == [{"e": "a***@example.com"}] +def test_mask_query_results_masks_pii_renamed_through_subquery(tmp_path: Path): + # An inner rename hides the PII source name from the outer projection, so the + # one-level lineage resolver saw only the renamed output column and returned + # cleartext (the D2 fix's subquery/CTE-alias bypass). True lineage traces the + # output column back to `email` through the subquery/CTE. (audit_30 D2 follow-up) + masker = PiiMasker(_user_email_config(tmp_path)) + + for sql, out_col in ( + ("SELECT contact FROM (SELECT email AS contact FROM users_enriched) t", "contact"), + ( + "WITH t AS (SELECT email AS contact FROM users_enriched) SELECT contact FROM t", + "contact", + ), + ( + "SELECT outer_c FROM (SELECT inner_c AS outer_c " + "FROM (SELECT email AS inner_c FROM users_enriched) a) b", + "outer_c", + ), + ): + rows, masked = masker.mask_query_results( + sql, + [{out_col: "alice@example.com"}], + tenant="acme", + table_to_entity={"users_enriched": "user"}, + ) + + assert masked is True, sql + assert rows == [{out_col: "a***@example.com"}], sql + + def test_mask_query_results_masks_select_star_by_name(tmp_path: Path): # SELECT * has no resolvable projection lineage; masking falls back to # matching rule fields against the (canonical) output column names. diff --git a/tests/unit/test_query_engine.py b/tests/unit/test_query_engine.py index ff2af55..ba1b50a 100644 --- a/tests/unit/test_query_engine.py +++ b/tests/unit/test_query_engine.py @@ -68,6 +68,24 @@ def test_scope_sql_qualifies_physical_table_shadowed_by_cte_name(engine: QueryEn assert all(db != "main" for _, db in tables) +def test_scope_sql_rejects_recursive_cte_shadowing_physical_table(engine: QueryEngine) -> None: + # A recursive CTE *can* self-reference, so sqlglot keeps its name in its own + # body scope and the cte_sources skip mis-classifies the physical *anchor* + # reference (the first UNION branch, which cannot self-reference) as a CTE + # reference — it is never re-scoped, stays bound to the shared `main` schema, + # and leaks every tenant's rows (the WITH RECURSIVE bypass of the D1 fix). + # There is no safe re-scoping of a recursive anchor and no legitimate query + # names a recursive CTE after a physical table, so fail closed. + # (audit_30 D1 follow-up) + with pytest.raises(ValueError, match="[Rr]ecursive CTE shadows"): + engine._scope_sql( + "WITH RECURSIVE orders_v2 AS " + "(SELECT * FROM orders_v2 UNION SELECT * FROM orders_v2) " + "SELECT * FROM orders_v2", + tenant_id="tenant_a", + ) + + def test_scope_sql_qualifies_tables_after_subquery(engine: QueryEngine) -> None: scoped = engine._scope_sql( "SELECT * FROM (SELECT * FROM orders_v2) AS recent, users_enriched", diff --git a/tests/unit/test_sql_guard.py b/tests/unit/test_sql_guard.py index 7b8da54..f687117 100644 --- a/tests/unit/test_sql_guard.py +++ b/tests/unit/test_sql_guard.py @@ -52,6 +52,21 @@ "Schema-qualified", ), ("SELECT * FROM cat.acme.orders_v2", "Schema-qualified"), + # A recursive CTE whose name shadows a real (allowed) table bypasses the + # leaf-name allow-list (the CTE name is excluded from the unknown-tables + # check) AND _scope_sql's cte_sources skip: a recursive CTE *can* self- + # reference, so sqlglot puts the name in its own body scope, and the physical + # anchor reference (which cannot self-reference) is mis-classified as a CTE + # reference and never re-scoped — it stays bound to the shared `main` schema + # and leaks every tenant's rows. Non-recursive shadows are safely re-scoped + # (test_query_engine), but the recursive anchor cannot be, so reject the + # shape outright. (audit_30 D1 follow-up: WITH RECURSIVE bypass) + ( + "WITH RECURSIVE orders_v2 AS " + "(SELECT * FROM orders_v2 UNION SELECT * FROM orders_v2) " + "SELECT * FROM orders_v2", + "Recursive CTE shadows", + ), ]