diff --git a/src/serving/api/webhook_dispatcher.py b/src/serving/api/webhook_dispatcher.py index 2b6e027..e7d3ac6 100644 --- a/src/serving/api/webhook_dispatcher.py +++ b/src/serving/api/webhook_dispatcher.py @@ -172,25 +172,34 @@ def ensure_webhook_delivery_queue_table(conn: duckdb.DuckDBPyConnection) -> None survive a process restart. ``body`` stores the canonical payload so a delivery can be replayed without re-reading ``pipeline_events``. """ - conn.execute( - """ - CREATE TABLE IF NOT EXISTS webhook_delivery_queue ( - webhook_id VARCHAR NOT NULL, - event_id VARCHAR NOT NULL, - tenant VARCHAR, - event_type VARCHAR, - body VARCHAR, - status VARCHAR NOT NULL DEFAULT 'pending', - attempts INTEGER NOT NULL DEFAULT 0, - next_attempt_at TIMESTAMP, - last_status_code INTEGER, - last_error VARCHAR, - created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, - updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, - PRIMARY KEY (webhook_id, event_id) + # Serialize the lazy DDL behind the shared catalog lock, exactly like the + # three #123-locked ``ensure_*`` siblings: the dispatcher creates this table + # on the shared serving connection from the event loop while an offloaded + # read handler runs its own ``ensure_*`` on a worker thread, and concurrent + # CREATE on a cold DuckDB catalog conflicts across *different* tables too. + # Omitting the lock here left the cross-table "Catalog write-write conflict" + # the #123 fix set out to remove still reachable on a cold restart. + # (audit_30 D2/A2 follow-up residual) + with catalog_ddl_lock: + conn.execute( + """ + CREATE TABLE IF NOT EXISTS webhook_delivery_queue ( + webhook_id VARCHAR NOT NULL, + event_id VARCHAR NOT NULL, + tenant VARCHAR, + event_type VARCHAR, + body VARCHAR, + status VARCHAR NOT NULL DEFAULT 'pending', + attempts INTEGER NOT NULL DEFAULT 0, + next_attempt_at TIMESTAMP, + last_status_code INTEGER, + last_error VARCHAR, + created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, + updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, + PRIMARY KEY (webhook_id, event_id) + ) + """ ) - """ - ) class WebhookDispatcher: diff --git a/src/serving/masking.py b/src/serving/masking.py index 8ca89c5..9258b94 100644 --- a/src/serving/masking.py +++ b/src/serving/masking.py @@ -178,7 +178,22 @@ def _lineage_source_columns(self, output_name: str, sql: str) -> frozenset[str]: continue seen.add(id(node)) if not node.downstream and node.name: - sources.add(node.name.split(".")[-1]) + bare = node.name.split(".")[-1] + if bare == "*": + # Lineage terminates at an unexpanded ``SELECT *`` (no schema + # to expand it), so this output column could carry *any* + # column of that table — including PII. The #123 deep|shallow + # union only catches a star one level *above* the rename (the + # shallow scan still names the column); a star *below* an inner + # rename leaves the rename invisible to both — the lineage leaf + # is the bare ``*``, the shallow scan sees only the outer alias. + # A literal ``*`` source would never match a rule field, so it + # fails open. Treat it as unresolved and fail closed instead. + # (audit_30 D2 follow-up: SELECT*-blinded inner-rename bypass of + # the #123 lineage fix — e.g. + # ``SELECT c FROM (SELECT email AS c FROM (SELECT * FROM users))``) + return _UNRESOLVED_SOURCES + sources.add(bare) stack.extend(node.downstream) return frozenset(sources) diff --git a/tests/unit/test_catalog_ddl_concurrency.py b/tests/unit/test_catalog_ddl_concurrency.py index df49705..856bdc8 100644 --- a/tests/unit/test_catalog_ddl_concurrency.py +++ b/tests/unit/test_catalog_ddl_concurrency.py @@ -12,14 +12,23 @@ from src.db_concurrency import catalog_ddl_lock from src.processing.event_replayer import ensure_dead_letter_table from src.serving.api.alerts.history import ensure_alert_history_table -from src.serving.api.webhook_dispatcher import ensure_webhook_deliveries_table +from src.serving.api.webhook_dispatcher import ( + ensure_webhook_deliveries_table, + ensure_webhook_delivery_queue_table, +) Ensurer = Callable[[duckdb.DuckDBPyConnection], None] +# ``ensure_webhook_delivery_queue_table`` is the dispatcher's own lazy CREATE +# (run on the shared serving connection from the event loop). It was left out of +# the #123 lock and so still raced the offloaded read-handler DDL across tables; +# include it here so both the same-table and cross-table hammers cover it. +# (audit_30 D2/A2 follow-up residual) _ENSURERS: list[Ensurer] = [ ensure_dead_letter_table, ensure_alert_history_table, ensure_webhook_deliveries_table, + ensure_webhook_delivery_queue_table, ] @@ -73,8 +82,9 @@ def test_ensure_tables_concurrency_safe_across_tables_cold_db() -> None: def test_catalog_ddl_lock_is_a_single_shared_instance() -> None: - # All three helpers must guard DDL with the *same* lock instance, or the - # cross-table conflict resurfaces. Pins against a future per-module lock. + # Every ensure_* helper (across all three modules) must guard DDL with the + # *same* lock instance, or the cross-table conflict resurfaces. Pins against a + # future per-module lock. assert ( event_replayer_module.catalog_ddl_lock is history_module.catalog_ddl_lock diff --git a/tests/unit/test_masking.py b/tests/unit/test_masking.py index e3e3549..76148c6 100644 --- a/tests/unit/test_masking.py +++ b/tests/unit/test_masking.py @@ -223,6 +223,33 @@ def test_mask_query_results_masks_pii_renamed_through_subquery(tmp_path: Path): assert rows == [{out_col: "a***@example.com"}], sql +def test_mask_query_results_masks_pii_renamed_above_inner_select_star(tmp_path: Path): + # A SELECT * *below* an inner rename defeats both #123 lineage paths: lineage + # walks past the renamed `email` node to the bare `*` leaf and returned a + # plain frozenset({'*'}) — not the unresolved sentinel (that only fires on a + # lineage *exception*) — so `email` was never in the source set and the + # column failed open as cleartext; the shallow scan sees only the outer alias. + # A `*` leaf means the column could carry any source column (incl. PII), so it + # must fail closed. (audit_30 D2 follow-up: SELECT*-blinded inner-rename bypass + # of #123 — distinct from the subquery/CTE renames above, which keep a + # resolvable lineage leaf.) + masker = PiiMasker(_user_email_config(tmp_path)) + + for sql in ( + "SELECT c FROM (SELECT email AS c FROM (SELECT * FROM users_enriched) z) t", + "WITH z AS (SELECT * FROM users_enriched), y AS (SELECT email AS c FROM z) SELECT c FROM y", + ): + rows, masked = masker.mask_query_results( + sql, + [{"c": "alice@example.com"}], + tenant="acme", + table_to_entity={"users_enriched": "user"}, + ) + + assert masked is True, sql + assert rows == [{"c": "a***@example.com"}], sql + + def test_mask_query_results_masks_select_star_by_name(tmp_path: Path): # SELECT * has no resolvable projection lineage; masking falls back to # matching rule fields against the (canonical) output column names.