From 912e562c2847b5ebe3df7c41d529a9bf8abba160 Mon Sep 17 00:00:00 2001 From: JuliaEdom Date: Tue, 30 Jun 2026 19:10:48 +0300 Subject: [PATCH 1/3] fix(security): reject WITH RECURSIVE CTE shadowing a tenant table (D1 bypass) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The D1 fix (f153b23) re-scopes a physical table reference shadowed by a non-recursive CTE of the same name, but a recursive CTE *can* self- reference, so sqlglot keeps its name in its own body scope: the physical anchor reference (the first UNION branch, which cannot self-reference) is mis-classified as a CTE reference and never re-scoped — it stays bound to the shared `main` schema and leaks every tenant's rows. _scope_sql is the sole tenant-isolation mechanism (one DuckDB, schema-per-tenant), so this is a full cross-tenant read from a single valid SELECT. There is no safe re-scoping of a recursive anchor (genuinely ambiguous with the recursion) and no legitimate query names a recursive CTE after a physical table, so fail closed at both layers: validate_nl_sql rejects the shape (the NL/LLM gate) and _scope_sql raises for any other caller (defense-in-depth). Non-recursive shadows keep the safe re-scope path. Regression tests fail on old code (validate accepts; _scope_sql leaks), pass on new. Verified e2e: the recursive attack through execute_nl_query now returns 403; the legitimate tenant_a query still sees only its rows. Co-Authored-By: Claude Opus 4.8 (1M context) --- .../semantic_layer/query/sql_builder.py | 21 +++++++++++++++++++ src/serving/semantic_layer/sql_guard.py | 20 ++++++++++++++++++ tests/unit/test_query_engine.py | 18 ++++++++++++++++ tests/unit/test_sql_guard.py | 15 +++++++++++++ 4 files changed, 74 insertions(+) diff --git a/src/serving/semantic_layer/query/sql_builder.py b/src/serving/semantic_layer/query/sql_builder.py index 904228d..5755c17 100644 --- a/src/serving/semantic_layer/query/sql_builder.py +++ b/src/serving/semantic_layer/query/sql_builder.py @@ -80,6 +80,27 @@ def _scope_sql(self: SQLBuilderHost, sql: str, tenant_id: str | None) -> str: known_tables.add("pipeline_events") parsed = sqlglot.parse_one(sql, dialect="duckdb") + # A recursive CTE *can* self-reference, so sqlglot keeps its name in its + # own body scope and the cte_sources skip below mis-classifies the + # physical *anchor* reference (the first UNION branch, which cannot + # self-reference) as a CTE reference — it is never re-scoped, stays bound + # to the shared `main` schema and leaks every tenant's rows. There is no + # safe re-scoping of a recursive anchor (genuinely ambiguous with the + # recursion) and no legitimate query names a recursive CTE after a + # physical table, so fail closed. validate_nl_sql rejects this on the NL + # path; this guards any other caller. (audit_30 D1 follow-up: WITH + # RECURSIVE bypass of f153b23) + recursive_shadow = sorted( + { + cte.alias_or_name.lower() + for with_node in parsed.find_all(exp.With) + if with_node.args.get("recursive") + for cte in with_node.expressions + if cte.alias_or_name and cte.alias_or_name.lower() in known_tables + } + ) + if recursive_shadow: + raise ValueError(f"Recursive CTE shadows tenant-scoped table(s): {recursive_shadow}") # Classify every table reference by scope so a CTE whose name collides # with a real table — e.g. `WITH orders_v2 AS (SELECT * FROM orders_v2) # SELECT * FROM orders_v2` — cannot hide the *physical* inner reference diff --git a/src/serving/semantic_layer/sql_guard.py b/src/serving/semantic_layer/sql_guard.py index 8b59f4b..944c982 100644 --- a/src/serving/semantic_layer/sql_guard.py +++ b/src/serving/semantic_layer/sql_guard.py @@ -89,6 +89,26 @@ def validate_nl_sql(sql: str, allowed_tables: set[str]) -> None: cte.alias_or_name.lower() for cte in statement.find_all(exp.CTE) if cte.alias_or_name } normalized_allowed_tables = {table.lower() for table in allowed_tables} + # A recursive CTE whose name shadows a real (allowed) table is a cross-tenant + # read vector. A recursive CTE *can* self-reference, so its name lives in its + # own body scope; the leaf-name allow-list excludes the CTE name, AND + # _scope_sql's cte_sources skip mis-classifies the physical *anchor* + # reference (the first UNION branch, which cannot self-reference) as a CTE + # reference — so it stays bare, binds to the shared `main` schema, and leaks + # every tenant's rows. Non-recursive shadows are safely re-scoped, but the + # recursive anchor cannot be, so reject the shape outright. (audit_30 D1 + # follow-up: WITH RECURSIVE bypass of f153b23) + recursive_shadows = { + cte.alias_or_name.lower() + for with_node in statement.find_all(exp.With) + if with_node.args.get("recursive") + for cte in with_node.expressions + if cte.alias_or_name and cte.alias_or_name.lower() in normalized_allowed_tables + } + if recursive_shadows: + raise UnsafeSQLError( + f"Recursive CTE shadows physical table(s): {sorted(recursive_shadows)}" + ) unknown_tables = { table.name.lower() for table in statement.find_all(exp.Table) diff --git a/tests/unit/test_query_engine.py b/tests/unit/test_query_engine.py index ff2af55..ba1b50a 100644 --- a/tests/unit/test_query_engine.py +++ b/tests/unit/test_query_engine.py @@ -68,6 +68,24 @@ def test_scope_sql_qualifies_physical_table_shadowed_by_cte_name(engine: QueryEn assert all(db != "main" for _, db in tables) +def test_scope_sql_rejects_recursive_cte_shadowing_physical_table(engine: QueryEngine) -> None: + # A recursive CTE *can* self-reference, so sqlglot keeps its name in its own + # body scope and the cte_sources skip mis-classifies the physical *anchor* + # reference (the first UNION branch, which cannot self-reference) as a CTE + # reference — it is never re-scoped, stays bound to the shared `main` schema, + # and leaks every tenant's rows (the WITH RECURSIVE bypass of the D1 fix). + # There is no safe re-scoping of a recursive anchor and no legitimate query + # names a recursive CTE after a physical table, so fail closed. + # (audit_30 D1 follow-up) + with pytest.raises(ValueError, match="[Rr]ecursive CTE shadows"): + engine._scope_sql( + "WITH RECURSIVE orders_v2 AS " + "(SELECT * FROM orders_v2 UNION SELECT * FROM orders_v2) " + "SELECT * FROM orders_v2", + tenant_id="tenant_a", + ) + + def test_scope_sql_qualifies_tables_after_subquery(engine: QueryEngine) -> None: scoped = engine._scope_sql( "SELECT * FROM (SELECT * FROM orders_v2) AS recent, users_enriched", diff --git a/tests/unit/test_sql_guard.py b/tests/unit/test_sql_guard.py index 7b8da54..f687117 100644 --- a/tests/unit/test_sql_guard.py +++ b/tests/unit/test_sql_guard.py @@ -52,6 +52,21 @@ "Schema-qualified", ), ("SELECT * FROM cat.acme.orders_v2", "Schema-qualified"), + # A recursive CTE whose name shadows a real (allowed) table bypasses the + # leaf-name allow-list (the CTE name is excluded from the unknown-tables + # check) AND _scope_sql's cte_sources skip: a recursive CTE *can* self- + # reference, so sqlglot puts the name in its own body scope, and the physical + # anchor reference (which cannot self-reference) is mis-classified as a CTE + # reference and never re-scoped — it stays bound to the shared `main` schema + # and leaks every tenant's rows. Non-recursive shadows are safely re-scoped + # (test_query_engine), but the recursive anchor cannot be, so reject the + # shape outright. (audit_30 D1 follow-up: WITH RECURSIVE bypass) + ( + "WITH RECURSIVE orders_v2 AS " + "(SELECT * FROM orders_v2 UNION SELECT * FROM orders_v2) " + "SELECT * FROM orders_v2", + "Recursive CTE shadows", + ), ] From 80e31e948854b1951f834471903275caf7f5f2b5 Mon Sep 17 00:00:00 2001 From: JuliaEdom Date: Tue, 30 Jun 2026 19:18:50 +0300 Subject: [PATCH 2/3] fix(security): mask PII renamed through a subquery/CTE (D2 lineage bypass) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The D2 fix (faa6c77) mapped each output column to the source names in the *outermost* projection only, so a PII column renamed at an inner level — `SELECT contact FROM (SELECT email AS contact FROM users_enriched) t` (and the CTE/double-rename variants) — never matched the `email` rule and was returned as cleartext with no X-PII-Masked signal. Resolve true projection lineage with sqlglot.lineage, tracing each output column to its ultimate source columns across subqueries, CTEs and union branches. Union the deep lineage sources with the shallow columns named directly in the projection: lineage is blind through an inner `SELECT *` (no schema to expand it) where the shallow scan still catches a direct `email AS contact`, and the shallow scan misses the inner rename lineage catches — either alone leaks one shape, the union closes both. A lineage failure falls back to a sentinel source set that matches every rule field (fail closed), so an unresolvable column is masked, never leaked. Regression test covers subquery, CTE and double-rename renames; the full masking + property + router suites stay green. SELECT * still falls back to name-matching (its outputs are the source names verbatim). Co-Authored-By: Claude Opus 4.8 (1M context) --- src/serving/masking.py | 85 +++++++++++++++++++++++++++++++++----- tests/unit/test_masking.py | 30 ++++++++++++++ 2 files changed, 105 insertions(+), 10 deletions(-) diff --git a/src/serving/masking.py b/src/serving/masking.py index 3027e03..8ca89c5 100644 --- a/src/serving/masking.py +++ b/src/serving/masking.py @@ -5,6 +5,7 @@ import sqlglot from sqlglot import exp +from sqlglot.lineage import lineage try: import yaml @@ -12,6 +13,23 @@ yaml = None # type: ignore[assignment] +class _UnresolvedSources(frozenset[str]): + """Sentinel column-source set whose membership test is always ``True``. + + Used when projection lineage can't be resolved for an output column (an + ambiguous reference, or an exotic shape sqlglot's lineage rejects). Treating + every masking rule field as a possible source masks the column **fail + closed** rather than letting a renamed/derived PII value slip through as + cleartext. (audit_30 D2 follow-up) + """ + + def __contains__(self, item: object) -> bool: + return True + + +_UNRESOLVED_SOURCES: frozenset[str] = _UnresolvedSources() + + class PiiMasker: def __init__(self, config_path: str | Path = "config/pii_fields.yaml"): if yaml is None: # pragma: no cover @@ -25,7 +43,7 @@ def mask( data: dict, tenant: str, *, - source_columns: dict[str, set[str]] | None = None, + source_columns: dict[str, frozenset[str]] | None = None, ) -> dict: masking = self._config.get("masking", {}) if tenant in masking.get("pii_exempt_tenants", []): @@ -46,7 +64,7 @@ def _output_columns_for_field( self, field: str, data: dict, - source_columns: dict[str, set[str]] | None, + source_columns: dict[str, frozenset[str]] | None, ) -> set[str]: """Which result columns to mask for a rule's source ``field``. @@ -93,12 +111,19 @@ def mask_query_results( ] return masked_rows, masked_rows != rows - def _projection_source_columns(self, sql: str) -> dict[str, set[str]] | None: + def _projection_source_columns(self, sql: str) -> dict[str, frozenset[str]] | None: """Map each output column to the source column names that feed it. - Returns ``None`` when the projection can't be resolved precisely — a - ``SELECT *`` (whose outputs are the source names verbatim) or unparseable - SQL — so masking falls back to matching rule fields against output names. + Resolves *true* projection lineage (through subqueries, CTEs and unions) + so a PII column renamed at any nesting depth is masked by what it is + built from, not by its output name. The shallow one-level resolver this + replaces saw only the outermost projection, so an inner rename — + ``SELECT contact FROM (SELECT email AS contact FROM users_enriched)`` — + returned cleartext. (audit_30 D2 follow-up: subquery/CTE-alias bypass) + + Returns ``None`` for a ``SELECT *`` (whose outputs are the source names + verbatim, so name-matching is correct) or unparseable SQL, so masking + falls back to matching rule fields against output names. """ try: parsed = sqlglot.parse_one(sql, read="duckdb") @@ -107,16 +132,56 @@ def _projection_source_columns(self, sql: str) -> dict[str, set[str]] | None: select = parsed.find(exp.Select) if select is None: return None - mapping: dict[str, set[str]] = {} + if any( + isinstance(projection, exp.Star) or projection.find(exp.Star) is not None + for projection in select.expressions + ): + return None + mapping: dict[str, frozenset[str]] = {} for projection in select.expressions: - if isinstance(projection, exp.Star) or projection.find(exp.Star) is not None: - return None output_name = projection.alias_or_name if not output_name: continue - mapping[output_name] = {col.name for col in projection.find_all(exp.Column) if col.name} + # Union the *deep* lineage sources (which trace a renamed PII column + # through subqueries/CTEs) with the *shallow* columns named directly + # in this projection. Lineage is blind through an inner ``SELECT *`` + # (no schema to expand it), where the shallow scan still catches a + # direct ``email AS contact``; lineage catches the inner rename the + # shallow scan misses. Either alone leaks one shape — the union + # closes both. + deep = self._lineage_source_columns(output_name, sql) + if isinstance(deep, _UnresolvedSources): + mapping[output_name] = deep + continue + shallow = frozenset(col.name for col in projection.find_all(exp.Column) if col.name) + mapping[output_name] = deep | shallow return mapping + def _lineage_source_columns(self, output_name: str, sql: str) -> frozenset[str]: + """The ultimate source column names feeding ``output_name``. + + Walks the lineage graph to its leaves (across subqueries, CTEs and union + branches) and returns the bare column names. Fails closed — a sentinel + that matches every rule field — when lineage can't be resolved, so an + unresolved column is masked rather than leaked. + """ + try: + root = lineage(output_name, sql, dialect="duckdb") + except Exception: # noqa: BLE001 - any lineage failure must fail closed + return _UNRESOLVED_SOURCES + sources: set[str] = set() + stack = [root] + seen: set[int] = set() + while stack: + node = stack.pop() + if id(node) in seen: + continue + seen.add(id(node)) + if not node.downstream and node.name: + sources.add(node.name.split(".")[-1]) + stack.extend(node.downstream) + return frozenset(sources) + def _extract_table_names(self, sql: str) -> set[str]: try: parsed = sqlglot.parse_one(sql, read="duckdb") diff --git a/tests/unit/test_masking.py b/tests/unit/test_masking.py index f163e87..e3e3549 100644 --- a/tests/unit/test_masking.py +++ b/tests/unit/test_masking.py @@ -193,6 +193,36 @@ def test_mask_query_results_masks_derived_pii_column(tmp_path: Path): assert rows == [{"e": "a***@example.com"}] +def test_mask_query_results_masks_pii_renamed_through_subquery(tmp_path: Path): + # An inner rename hides the PII source name from the outer projection, so the + # one-level lineage resolver saw only the renamed output column and returned + # cleartext (the D2 fix's subquery/CTE-alias bypass). True lineage traces the + # output column back to `email` through the subquery/CTE. (audit_30 D2 follow-up) + masker = PiiMasker(_user_email_config(tmp_path)) + + for sql, out_col in ( + ("SELECT contact FROM (SELECT email AS contact FROM users_enriched) t", "contact"), + ( + "WITH t AS (SELECT email AS contact FROM users_enriched) SELECT contact FROM t", + "contact", + ), + ( + "SELECT outer_c FROM (SELECT inner_c AS outer_c " + "FROM (SELECT email AS inner_c FROM users_enriched) a) b", + "outer_c", + ), + ): + rows, masked = masker.mask_query_results( + sql, + [{out_col: "alice@example.com"}], + tenant="acme", + table_to_entity={"users_enriched": "user"}, + ) + + assert masked is True, sql + assert rows == [{out_col: "a***@example.com"}], sql + + def test_mask_query_results_masks_select_star_by_name(tmp_path: Path): # SELECT * has no resolvable projection lineage; masking falls back to # matching rule fields against the (canonical) output column names. From 601c2637df7d8f7524dcfef62d92b66f2e2b7d16 Mon Sep 17 00:00:00 2001 From: JuliaEdom Date: Tue, 30 Jun 2026 19:28:43 +0300 Subject: [PATCH 3/3] fix(api): serialize lazy table DDL to stop cold-DB concurrent-create 500s MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The #120 read-handler offload moved deadletter/webhook-log/alert-history reads onto worker threads (run_in_threadpool) with a dedicated cursor, but each still calls its ensure_*_table() — a lazy CREATE TABLE IF NOT EXISTS / ALTER ... ADD COLUMN IF NOT EXISTS — on that cursor. Pre-#120 these ran serialized on the event loop; the offload let them run truly in parallel, and concurrent catalog DDL on one DuckDB raises "Catalog write-write conflict" (across different tables too, not just the same one — the catalog is a single versioned structure). The serving store defaults to :memory:, cold on every restart, so a concurrent burst of cold reads returned HTTP 500s until one request won the CREATE race. Serialize every ensure_*_table behind one shared process-wide lock (src/db_concurrency.catalog_ddl_lock): the first thread creates, the rest see a warm no-op (warm DDL doesn't conflict). The lock is held only for the brief CREATE/ALTER, never around queries and never nested. Also close the freshly-opened read cursor in deadletter._read_cursor if the DDL raises, before the handler's own try/finally takes over (no per-failure leak). Regression tests fire 32 threads at one ensure_* and 12 each across all three behind a Barrier: reliably raise without the lock (verified 16-30 conflicts), green with it. A shared-instance test pins the single lock so a future per-table lock can't reintroduce the cross-table race. Co-Authored-By: Claude Opus 4.8 (1M context) --- src/db_concurrency.py | 24 +++++++ src/processing/event_replayer.py | 42 ++++++----- src/serving/api/alerts/history.py | 50 +++++++------ src/serving/api/routers/deadletter.py | 8 ++- src/serving/api/webhook_dispatcher.py | 33 +++++---- tests/unit/test_catalog_ddl_concurrency.py | 83 ++++++++++++++++++++++ 6 files changed, 185 insertions(+), 55 deletions(-) create mode 100644 src/db_concurrency.py create mode 100644 tests/unit/test_catalog_ddl_concurrency.py diff --git a/src/db_concurrency.py b/src/db_concurrency.py new file mode 100644 index 0000000..e30fd3d --- /dev/null +++ b/src/db_concurrency.py @@ -0,0 +1,24 @@ +"""Process-wide serialization for DuckDB catalog DDL. + +DuckDB raises a ``Catalog write-write conflict`` when two threads run catalog +DDL (CREATE/ALTER) concurrently — even on *different* tables — because the +catalog is a single versioned structure. The serving API offloads its read +handlers onto worker threads (``run_in_threadpool``), and each lazily ensures +its backing table (``CREATE TABLE IF NOT EXISTS`` / ``ALTER ... ADD COLUMN IF +NOT EXISTS``) on a fresh cursor; on a cold DB (the default serving store is +``:memory:``, cold on every restart) a concurrent burst raced and surfaced +HTTP 500s. Serialize every such lazy table-creation behind this one lock so the +first thread creates and the rest see a warm no-op. (audit_30 A2 follow-up: +the #120 read-handler offload race) +""" + +from __future__ import annotations + +import threading + +# A single process-wide lock guarding all lazy DuckDB catalog DDL. Held only for +# the brief CREATE/ALTER IF NOT EXISTS (a near-instant no-op once warm), never +# around query execution and never nested, so it cannot deadlock. A cross-table +# conflict (not just same-table) is real in DuckDB, so the lock is shared across +# every ensure_*_table helper rather than one lock per table. +catalog_ddl_lock = threading.Lock() diff --git a/src/processing/event_replayer.py b/src/processing/event_replayer.py index fc7a6af..b701374 100644 --- a/src/processing/event_replayer.py +++ b/src/processing/event_replayer.py @@ -12,6 +12,7 @@ import structlog from opentelemetry import trace +from src.db_concurrency import catalog_ddl_lock from src.processing.outbox import OutboxProcessor, ensure_outbox_table from src.processing.tracing import inject_trace_to_kafka_headers, telemetry_disabled from src.quality.validators.schema_validator import validate_event @@ -40,25 +41,30 @@ class ReplayResult: def ensure_dead_letter_table(conn: duckdb.DuckDBPyConnection) -> None: - conn.execute( - """ - CREATE TABLE IF NOT EXISTS dead_letter_events ( - event_id TEXT PRIMARY KEY, - tenant_id TEXT DEFAULT 'default', - event_type TEXT, - payload JSON, - failure_reason TEXT, - failure_detail TEXT, - received_at TIMESTAMP, - retry_count INTEGER DEFAULT 0, - last_retried_at TIMESTAMP, - status TEXT DEFAULT 'failed' + # Serialize the lazy DDL: the offloaded read handlers call this on worker + # threads, and concurrent CREATE/ALTER on a cold DuckDB catalog conflicts. + # (audit_30 A2 follow-up: #120 offload race) + with catalog_ddl_lock: + conn.execute( + """ + CREATE TABLE IF NOT EXISTS dead_letter_events ( + event_id TEXT PRIMARY KEY, + tenant_id TEXT DEFAULT 'default', + event_type TEXT, + payload JSON, + failure_reason TEXT, + failure_detail TEXT, + received_at TIMESTAMP, + retry_count INTEGER DEFAULT 0, + last_retried_at TIMESTAMP, + status TEXT DEFAULT 'failed' + ) + """ + ) + conn.execute( + "ALTER TABLE dead_letter_events " + "ADD COLUMN IF NOT EXISTS tenant_id TEXT DEFAULT 'default'" ) - """ - ) - conn.execute( - "ALTER TABLE dead_letter_events ADD COLUMN IF NOT EXISTS tenant_id TEXT DEFAULT 'default'" - ) class EventReplayer: diff --git a/src/serving/api/alerts/history.py b/src/serving/api/alerts/history.py index ae50cbf..6c34eb9 100644 --- a/src/serving/api/alerts/history.py +++ b/src/serving/api/alerts/history.py @@ -6,34 +6,40 @@ import duckdb +from src.db_concurrency import catalog_ddl_lock + if TYPE_CHECKING: from .dispatcher import AlertRule def ensure_alert_history_table(conn: duckdb.DuckDBPyConnection) -> None: - conn.execute( - """ - CREATE TABLE IF NOT EXISTS alert_history ( - delivery_id VARCHAR, - alert_id VARCHAR, - alert_name VARCHAR, - metric VARCHAR, - current_value DOUBLE, - previous_value DOUBLE, - change_pct DOUBLE, - threshold DOUBLE, - condition VARCHAR, - metric_window VARCHAR, - tenant VARCHAR, - event_type VARCHAR, - status_code INTEGER, - success BOOLEAN, - error TEXT, - payload JSON, - triggered_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP + # Serialize the lazy DDL: the offloaded read handler calls this on a worker + # thread, and concurrent CREATE on a cold DuckDB catalog conflicts (across + # tables too). (audit_30 A2 follow-up: #120 offload race) + with catalog_ddl_lock: + conn.execute( + """ + CREATE TABLE IF NOT EXISTS alert_history ( + delivery_id VARCHAR, + alert_id VARCHAR, + alert_name VARCHAR, + metric VARCHAR, + current_value DOUBLE, + previous_value DOUBLE, + change_pct DOUBLE, + threshold DOUBLE, + condition VARCHAR, + metric_window VARCHAR, + tenant VARCHAR, + event_type VARCHAR, + status_code INTEGER, + success BOOLEAN, + error TEXT, + payload JSON, + triggered_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP + ) + """ ) - """ - ) def get_alert_history(conn: duckdb.DuckDBPyConnection, alert_id: str) -> list[dict]: diff --git a/src/serving/api/routers/deadletter.py b/src/serving/api/routers/deadletter.py index 4d4aec5..9d742ab 100644 --- a/src/serving/api/routers/deadletter.py +++ b/src/serving/api/routers/deadletter.py @@ -77,7 +77,13 @@ def _read_cursor(request: Request) -> duckdb.DuckDBPyConnection: different worker threads from colliding on the connection. (audit_30_06_26.md A2) """ cursor = cast(duckdb.DuckDBPyConnection, request.app.state.query_engine._conn).cursor() - ensure_dead_letter_table(cursor) + try: + ensure_dead_letter_table(cursor) + except Exception: + # Don't leak the freshly-opened cursor if the lazy DDL fails before the + # handler's own try/finally takes over. + cursor.close() + raise return cursor diff --git a/src/serving/api/webhook_dispatcher.py b/src/serving/api/webhook_dispatcher.py index 08ccacd..2b6e027 100644 --- a/src/serving/api/webhook_dispatcher.py +++ b/src/serving/api/webhook_dispatcher.py @@ -17,6 +17,7 @@ from fastapi import FastAPI from pydantic import BaseModel, Field +from src.db_concurrency import catalog_ddl_lock from src.serving.api.egress_guard import UnsafeEgressURLError, validate_public_url try: @@ -124,21 +125,25 @@ def deactivate_webhook(path: Path, webhook_id: str, tenant: str) -> bool: def ensure_webhook_deliveries_table(conn: duckdb.DuckDBPyConnection) -> None: - conn.execute( - """ - CREATE TABLE IF NOT EXISTS webhook_deliveries ( - delivery_id VARCHAR, - webhook_id VARCHAR, - event_id VARCHAR, - event_type VARCHAR, - attempt INTEGER, - status_code INTEGER, - success BOOLEAN, - error TEXT, - delivered_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP + # Serialize the lazy DDL: the offloaded read handler calls this on a worker + # thread, and concurrent CREATE on a cold DuckDB catalog conflicts (across + # tables too). (audit_30 A2 follow-up: #120 offload race) + with catalog_ddl_lock: + conn.execute( + """ + CREATE TABLE IF NOT EXISTS webhook_deliveries ( + delivery_id VARCHAR, + webhook_id VARCHAR, + event_id VARCHAR, + event_type VARCHAR, + attempt INTEGER, + status_code INTEGER, + success BOOLEAN, + error TEXT, + delivered_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP + ) + """ ) - """ - ) def get_delivery_logs(conn: duckdb.DuckDBPyConnection, webhook_id: str) -> list[dict]: diff --git a/tests/unit/test_catalog_ddl_concurrency.py b/tests/unit/test_catalog_ddl_concurrency.py new file mode 100644 index 0000000..df49705 --- /dev/null +++ b/tests/unit/test_catalog_ddl_concurrency.py @@ -0,0 +1,83 @@ +from __future__ import annotations + +import threading +from collections.abc import Callable + +import duckdb +import pytest + +import src.processing.event_replayer as event_replayer_module +import src.serving.api.alerts.history as history_module +import src.serving.api.webhook_dispatcher as webhook_module +from src.db_concurrency import catalog_ddl_lock +from src.processing.event_replayer import ensure_dead_letter_table +from src.serving.api.alerts.history import ensure_alert_history_table +from src.serving.api.webhook_dispatcher import ensure_webhook_deliveries_table + +Ensurer = Callable[[duckdb.DuckDBPyConnection], None] + +_ENSURERS: list[Ensurer] = [ + ensure_dead_letter_table, + ensure_alert_history_table, + ensure_webhook_deliveries_table, +] + + +def _hammer(conn: duckdb.DuckDBPyConnection, jobs: list[Ensurer]) -> list[Exception]: + """Run each ensure_* on a fresh cursor on ``conn``, all firing the DDL at once. + + A ``Barrier`` releases every thread into ``ensure_*`` simultaneously so the + cold-catalog write-write conflict is provoked deterministically: with the + lock removed this reliably raises on most threads; with it, every thread + serializes to a warm no-op and the list stays empty. + """ + errors: list[Exception] = [] + barrier = threading.Barrier(len(jobs)) + + def worker(ensure: Ensurer) -> None: + cursor = conn.cursor() + try: + barrier.wait() + ensure(cursor) + except Exception as exc: # noqa: BLE001 - capture the catalog conflict, if any + errors.append(exc) + finally: + cursor.close() + + threads = [threading.Thread(target=worker, args=(job,)) for job in jobs] + for thread in threads: + thread.start() + for thread in threads: + thread.join() + return errors + + +@pytest.mark.parametrize("ensure", _ENSURERS) +def test_ensure_table_concurrency_safe_same_table_cold_db(ensure: Ensurer) -> None: + # The #120 offload calls ensure_*_table on worker threads; on a cold DuckDB + # a concurrent burst raced on the catalog -> "Catalog write-write conflict" + # -> HTTP 500 (the serving DB default is :memory:, cold on every restart). + # The shared catalog DDL lock serializes creation. (audit_30 A2 follow-up) + conn = duckdb.connect(":memory:") + errors = _hammer(conn, [ensure] * 32) + assert errors == [], f"{ensure.__name__}: {errors[:2]}" + + +def test_ensure_tables_concurrency_safe_across_tables_cold_db() -> None: + # DuckDB raises a catalog write-write conflict even across *different* tables, + # so every ensure_*_table helper must share one lock (not a lock per table). + # 12 interleaved calls per table on a cold DB. + conn = duckdb.connect(":memory:") + errors = _hammer(conn, _ENSURERS * 12) + assert errors == [], errors[:3] + + +def test_catalog_ddl_lock_is_a_single_shared_instance() -> None: + # All three helpers must guard DDL with the *same* lock instance, or the + # cross-table conflict resurfaces. Pins against a future per-module lock. + assert ( + event_replayer_module.catalog_ddl_lock + is history_module.catalog_ddl_lock + is webhook_module.catalog_ddl_lock + is catalog_ddl_lock + )