Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
45 changes: 27 additions & 18 deletions src/serving/api/webhook_dispatcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -172,25 +172,34 @@ def ensure_webhook_delivery_queue_table(conn: duckdb.DuckDBPyConnection) -> None
survive a process restart. ``body`` stores the canonical payload so a
delivery can be replayed without re-reading ``pipeline_events``.
"""
conn.execute(
"""
CREATE TABLE IF NOT EXISTS webhook_delivery_queue (
webhook_id VARCHAR NOT NULL,
event_id VARCHAR NOT NULL,
tenant VARCHAR,
event_type VARCHAR,
body VARCHAR,
status VARCHAR NOT NULL DEFAULT 'pending',
attempts INTEGER NOT NULL DEFAULT 0,
next_attempt_at TIMESTAMP,
last_status_code INTEGER,
last_error VARCHAR,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
PRIMARY KEY (webhook_id, event_id)
# Serialize the lazy DDL behind the shared catalog lock, exactly like the
# three #123-locked ``ensure_*`` siblings: the dispatcher creates this table
# on the shared serving connection from the event loop while an offloaded
# read handler runs its own ``ensure_*`` on a worker thread, and concurrent
# CREATE on a cold DuckDB catalog conflicts across *different* tables too.
# Omitting the lock here left the cross-table "Catalog write-write conflict"
# the #123 fix set out to remove still reachable on a cold restart.
# (audit_30 D2/A2 follow-up residual)
with catalog_ddl_lock:
conn.execute(
"""
CREATE TABLE IF NOT EXISTS webhook_delivery_queue (
webhook_id VARCHAR NOT NULL,
event_id VARCHAR NOT NULL,
tenant VARCHAR,
event_type VARCHAR,
body VARCHAR,
status VARCHAR NOT NULL DEFAULT 'pending',
attempts INTEGER NOT NULL DEFAULT 0,
next_attempt_at TIMESTAMP,
last_status_code INTEGER,
last_error VARCHAR,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
PRIMARY KEY (webhook_id, event_id)
)
"""
)
"""
)


class WebhookDispatcher:
Expand Down
17 changes: 16 additions & 1 deletion src/serving/masking.py
Original file line number Diff line number Diff line change
Expand Up @@ -178,7 +178,22 @@ def _lineage_source_columns(self, output_name: str, sql: str) -> frozenset[str]:
continue
seen.add(id(node))
if not node.downstream and node.name:
sources.add(node.name.split(".")[-1])
bare = node.name.split(".")[-1]
if bare == "*":
# Lineage terminates at an unexpanded ``SELECT *`` (no schema
# to expand it), so this output column could carry *any*
# column of that table — including PII. The #123 deep|shallow
# union only catches a star one level *above* the rename (the
# shallow scan still names the column); a star *below* an inner
# rename leaves the rename invisible to both — the lineage leaf
# is the bare ``*``, the shallow scan sees only the outer alias.
# A literal ``*`` source would never match a rule field, so it
# fails open. Treat it as unresolved and fail closed instead.
# (audit_30 D2 follow-up: SELECT*-blinded inner-rename bypass of
# the #123 lineage fix — e.g.
# ``SELECT c FROM (SELECT email AS c FROM (SELECT * FROM users))``)
return _UNRESOLVED_SOURCES
sources.add(bare)
stack.extend(node.downstream)
return frozenset(sources)

Expand Down
16 changes: 13 additions & 3 deletions tests/unit/test_catalog_ddl_concurrency.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,14 +12,23 @@
from src.db_concurrency import catalog_ddl_lock
from src.processing.event_replayer import ensure_dead_letter_table
from src.serving.api.alerts.history import ensure_alert_history_table
from src.serving.api.webhook_dispatcher import ensure_webhook_deliveries_table
from src.serving.api.webhook_dispatcher import (
ensure_webhook_deliveries_table,
ensure_webhook_delivery_queue_table,
)

Ensurer = Callable[[duckdb.DuckDBPyConnection], None]

# ``ensure_webhook_delivery_queue_table`` is the dispatcher's own lazy CREATE
# (run on the shared serving connection from the event loop). It was left out of
# the #123 lock and so still raced the offloaded read-handler DDL across tables;
# include it here so both the same-table and cross-table hammers cover it.
# (audit_30 D2/A2 follow-up residual)
_ENSURERS: list[Ensurer] = [
ensure_dead_letter_table,
ensure_alert_history_table,
ensure_webhook_deliveries_table,
ensure_webhook_delivery_queue_table,
]


Expand Down Expand Up @@ -73,8 +82,9 @@ def test_ensure_tables_concurrency_safe_across_tables_cold_db() -> None:


def test_catalog_ddl_lock_is_a_single_shared_instance() -> None:
# All three helpers must guard DDL with the *same* lock instance, or the
# cross-table conflict resurfaces. Pins against a future per-module lock.
# Every ensure_* helper (across all three modules) must guard DDL with the
# *same* lock instance, or the cross-table conflict resurfaces. Pins against a
# future per-module lock.
assert (
event_replayer_module.catalog_ddl_lock
is history_module.catalog_ddl_lock
Expand Down
27 changes: 27 additions & 0 deletions tests/unit/test_masking.py
Original file line number Diff line number Diff line change
Expand Up @@ -223,6 +223,33 @@ def test_mask_query_results_masks_pii_renamed_through_subquery(tmp_path: Path):
assert rows == [{out_col: "a***@example.com"}], sql


def test_mask_query_results_masks_pii_renamed_above_inner_select_star(tmp_path: Path):
# A SELECT * *below* an inner rename defeats both #123 lineage paths: lineage
# walks past the renamed `email` node to the bare `*` leaf and returned a
# plain frozenset({'*'}) — not the unresolved sentinel (that only fires on a
# lineage *exception*) — so `email` was never in the source set and the
# column failed open as cleartext; the shallow scan sees only the outer alias.
# A `*` leaf means the column could carry any source column (incl. PII), so it
# must fail closed. (audit_30 D2 follow-up: SELECT*-blinded inner-rename bypass
# of #123 — distinct from the subquery/CTE renames above, which keep a
# resolvable lineage leaf.)
masker = PiiMasker(_user_email_config(tmp_path))

for sql in (
"SELECT c FROM (SELECT email AS c FROM (SELECT * FROM users_enriched) z) t",
"WITH z AS (SELECT * FROM users_enriched), y AS (SELECT email AS c FROM z) SELECT c FROM y",
):
rows, masked = masker.mask_query_results(
sql,
[{"c": "alice@example.com"}],
tenant="acme",
table_to_entity={"users_enriched": "user"},
)

assert masked is True, sql
assert rows == [{"c": "a***@example.com"}], sql


def test_mask_query_results_masks_select_star_by_name(tmp_path: Path):
# SELECT * has no resolvable projection lineage; masking falls back to
# matching rule fields against the (canonical) output column names.
Expand Down