From 7c4f0104a2ed9f1d55e557223b632a16a3f94ccb Mon Sep 17 00:00:00 2001 From: JuliaEdom Date: Sun, 28 Jun 2026 18:49:56 +0300 Subject: [PATCH 1/9] =?UTF-8?q?fix(dv2):=20SCD2=20hash=5Fdiff=20over=20des?= =?UTF-8?q?criptive=20columns=20=E2=80=94=20stop=20dropping=20every=20UPDA?= =?UTF-8?q?TE?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The PG OLTP->vault promotion (run live by the LISTEN/NOTIFY freshness listener) computed satellite hash_diff as a constant per-entity tag md5(id || '|tag|v1'). The NOT EXISTS gate then matched the unchanged (hk, hash_diff) pair on every re-promotion, so an order moving pending->shipped (or a corrected total_amount, or a customer PII change) inserted nothing and was permanently invisible to rv.sat_*, bv_order_canonical and the branch_pnl mart — contradicting spec.yaml scd2:true and ADR-0005's CDC update contract. hash_diff is now derived from the descriptive columns and the gate inserts a new version only when it differs from the current (latest load_ts) version for the hash key — correct SCD2 insert-on-change, still idempotent on no-change re-runs. Adds a regression test that fails on the old constant tag. Audit ref: audit_28_06_26.md #9 (headline data-correctness defect). Co-Authored-By: Claude Opus 4.8 (1M context) --- tests/unit/test_dv2_postgres_ingestion.py | 30 +++++++++++- .../postgres_oltp/promote_to_raw_vault_pg.sql | 47 +++++++++++++------ 2 files changed, 62 insertions(+), 15 deletions(-) diff --git a/tests/unit/test_dv2_postgres_ingestion.py b/tests/unit/test_dv2_postgres_ingestion.py index e559195..7481cad 100644 --- a/tests/unit/test_dv2_postgres_ingestion.py +++ b/tests/unit/test_dv2_postgres_ingestion.py @@ -437,6 +437,34 @@ def test_promotion_targets_existing_postgres_columns(table, columns): def test_promotion_is_idempotent_per_table_kind(): body = _promotion_sql_without_comments().lower() - # hubs/links are idempotent on their primary key; satellites on (hk, hash_diff). + # hubs/links are idempotent on their primary key; satellites are SCD2: + # a new version lands only when hash_diff differs from the latest version. assert body.count("on conflict do nothing") == 6 # 2 hub_customer + 2 hub_order + 2 lnk assert body.count("where not exists") == 4 # 2 personal + 2 header satellites + # the SCD2 gate compares against the current (latest load_ts) version — one + # max(load_ts) subquery per satellite, else a re-run on changed data is lost. + assert body.count("select max(e2.load_ts)") == 4 + + +def test_promotion_satellites_capture_scd2_change_not_a_constant_tag(): + """Regression guard for audit_28_06_26.md #9. + + The promotion used a constant per-entity hash_diff (``md5(id || '|tag|v1')``), + so the ``NOT EXISTS (… AND hash_diff = …)`` gate matched the unchanged row and + silently dropped every UPDATE (e.g. order pending -> shipped). hash_diff must + instead be derived from the descriptive columns so a changed row produces a + new satellite version. + """ + body = _promotion_sql_without_comments() + # the old constant tags must be gone entirely + assert "|pg-hdr|v1" not in body + assert "|pg-oltp|v1" not in body + # order-header hash_diff covers status + amount + date + channel (each appears + # twice per satellite: in the inserted column and in the NOT EXISTS gate; ×2 branches) + order_hd = "concat_ws('|', o.order_date::timestamp(3)::text, o.channel, o.order_status, o.total_amount::text)" + assert body.count(order_hd) == 4 + # customer-personal hash_diff covers name + contact + customer_hd = ( + "concat_ws('|', c.first_name, c.last_name, coalesce(c.email, ''), coalesce(c.phone, ''))" + ) + assert body.count(customer_hd) == 4 diff --git a/warehouse/agentflow/dv2/postgres_oltp/promote_to_raw_vault_pg.sql b/warehouse/agentflow/dv2/postgres_oltp/promote_to_raw_vault_pg.sql index c52e10d..be805eb 100644 --- a/warehouse/agentflow/dv2/postgres_oltp/promote_to_raw_vault_pg.sql +++ b/warehouse/agentflow/dv2/postgres_oltp/promote_to_raw_vault_pg.sql @@ -13,12 +13,15 @@ -- concat(a, '|', b) -> a || '|' || b -- FROM oltp_live._ -> FROM ops_. (no bridge) -- --- Idempotency: hubs/links collide on their BYTEA primary key --- (ON CONFLICT DO NOTHING); satellites insert a version only when the --- (hash key, hash_diff) pair is not already present, so a re-run is a no-op. --- hash_diff mirrors the ClickHouse pull variant: a constant per-entity tag (the --- `PostgreSQL()` snapshot is treated as append-only — no change/tombstone --- capture), so each OLTP entity lands exactly one satellite version. +-- SCD2 change capture: hubs/links collide on their BYTEA primary key +-- (ON CONFLICT DO NOTHING). Satellites compute hash_diff over the *descriptive* +-- columns (status/amount/date/channel for the order header; name/email/phone for +-- the customer) and insert a new version only when that hash_diff differs from +-- the *current* (latest load_ts) version for the hash key. So a re-run with no +-- change is a no-op, but a changed order (e.g. pending -> shipped) or customer +-- correctly lands a new satellite version — which the LISTEN/NOTIFY freshness +-- listener that runs this promotion then surfaces. (A constant per-entity tag, +-- the old behaviour, silently dropped every UPDATE — see audit_28_06_26.md #9.) -- -- record_source = pg_ops__ so the business vault's -- split_part(record_source, '__', 2) extracts the branch. @@ -41,7 +44,7 @@ INSERT INTO rv.sat_customer_personal__1c__msk SELECT decode(md5(c.customer_id), 'hex'), localtimestamp(3), - decode(md5(c.customer_id || '|pg-oltp|v1'), 'hex'), + decode(md5(concat_ws('|', c.first_name, c.last_name, coalesce(c.email, ''), coalesce(c.phone, ''))), 'hex'), 'pg_ops__msk', c.first_name, c.last_name, @@ -54,7 +57,11 @@ FROM ops_msk.customers c WHERE NOT EXISTS ( SELECT 1 FROM rv.sat_customer_personal__1c__msk e WHERE e.customer_hk = decode(md5(c.customer_id), 'hex') - AND e.hash_diff = decode(md5(c.customer_id || '|pg-oltp|v1'), 'hex') + AND e.hash_diff = decode(md5(concat_ws('|', c.first_name, c.last_name, coalesce(c.email, ''), coalesce(c.phone, ''))), 'hex') + AND e.load_ts = ( + SELECT max(e2.load_ts) FROM rv.sat_customer_personal__1c__msk e2 + WHERE e2.customer_hk = e.customer_hk + ) ); -- ============ MSK: hub order + header satellite + order<->customer link ============ @@ -69,7 +76,7 @@ INSERT INTO rv.sat_order_header__bitrix__msk SELECT decode(md5(o.order_id), 'hex'), localtimestamp(3), - decode(md5(o.order_id || '|pg-hdr|v1'), 'hex'), + decode(md5(concat_ws('|', o.order_date::timestamp(3)::text, o.channel, o.order_status, o.total_amount::text)), 'hex'), 'pg_ops__msk', o.order_date::timestamp(3), o.channel, @@ -80,7 +87,11 @@ FROM ops_msk.orders o WHERE NOT EXISTS ( SELECT 1 FROM rv.sat_order_header__bitrix__msk e WHERE e.order_hk = decode(md5(o.order_id), 'hex') - AND e.hash_diff = decode(md5(o.order_id || '|pg-hdr|v1'), 'hex') + AND e.hash_diff = decode(md5(concat_ws('|', o.order_date::timestamp(3)::text, o.channel, o.order_status, o.total_amount::text)), 'hex') + AND e.load_ts = ( + SELECT max(e2.load_ts) FROM rv.sat_order_header__bitrix__msk e2 + WHERE e2.order_hk = e.order_hk + ) ); INSERT INTO rv.lnk_order_customer (link_hk, order_hk, customer_hk, load_ts, record_source) @@ -105,7 +116,7 @@ INSERT INTO rv.sat_customer_personal__1c__dxb SELECT decode(md5(c.customer_id), 'hex'), localtimestamp(3), - decode(md5(c.customer_id || '|pg-oltp|v1'), 'hex'), + decode(md5(concat_ws('|', c.first_name, c.last_name, coalesce(c.email, ''), coalesce(c.phone, ''))), 'hex'), 'pg_ops__dxb', c.first_name, c.last_name, @@ -118,7 +129,11 @@ FROM ops_dxb.customers c WHERE NOT EXISTS ( SELECT 1 FROM rv.sat_customer_personal__1c__dxb e WHERE e.customer_hk = decode(md5(c.customer_id), 'hex') - AND e.hash_diff = decode(md5(c.customer_id || '|pg-oltp|v1'), 'hex') + AND e.hash_diff = decode(md5(concat_ws('|', c.first_name, c.last_name, coalesce(c.email, ''), coalesce(c.phone, ''))), 'hex') + AND e.load_ts = ( + SELECT max(e2.load_ts) FROM rv.sat_customer_personal__1c__dxb e2 + WHERE e2.customer_hk = e.customer_hk + ) ); -- ============ DXB: hub order + header satellite + order<->customer link ============ @@ -133,7 +148,7 @@ INSERT INTO rv.sat_order_header__bitrix__dxb SELECT decode(md5(o.order_id), 'hex'), localtimestamp(3), - decode(md5(o.order_id || '|pg-hdr|v1'), 'hex'), + decode(md5(concat_ws('|', o.order_date::timestamp(3)::text, o.channel, o.order_status, o.total_amount::text)), 'hex'), 'pg_ops__dxb', o.order_date::timestamp(3), o.channel, @@ -144,7 +159,11 @@ FROM ops_dxb.orders o WHERE NOT EXISTS ( SELECT 1 FROM rv.sat_order_header__bitrix__dxb e WHERE e.order_hk = decode(md5(o.order_id), 'hex') - AND e.hash_diff = decode(md5(o.order_id || '|pg-hdr|v1'), 'hex') + AND e.hash_diff = decode(md5(concat_ws('|', o.order_date::timestamp(3)::text, o.channel, o.order_status, o.total_amount::text)), 'hex') + AND e.load_ts = ( + SELECT max(e2.load_ts) FROM rv.sat_order_header__bitrix__dxb e2 + WHERE e2.order_hk = e.order_hk + ) ); INSERT INTO rv.lnk_order_customer (link_hk, order_hk, customer_hk, load_ts, record_source) From 086eb8ee2dbed9ccc48b713d77ed83f187434b29 Mon Sep 17 00:00:00 2001 From: JuliaEdom Date: Sun, 28 Jun 2026 18:56:19 +0300 Subject: [PATCH 2/9] fix(security): close NL->SQL cross-tenant read and PII masking fail-open Two reproduced defects in LLM-mode NL->SQL (audit_28_06_26.md #5, #6): #5 cross-tenant read: validate_nl_sql checked only the leaf table name and _scope_sql skipped already schema-qualified tables, so an LLM-emitted 'victim_schema.orders_v2' passed the allow-list AND was executed verbatim against the victim tenant's schema (schemas are tenant slugs, guessable). Fix: validate_nl_sql now rejects any table carrying a db/catalog qualifier (primary); _scope_sql force-rescopes a known table into the caller's tenant schema even if pre-qualified, instead of skipping it (defense-in-depth). #6 PII masking fail-open: mask_query_results returned rows UNMASKED whenever a query touched !=1 entity table, so a users_enriched JOIN orders_v2 leaked cleartext email/phone/address. Fix: mask the union of all matched entities' rules; only a query touching no entity at all returns unmasked. Tests: schema-qualified inputs added to the guard's negative corpus; the masking test that *asserted* the fail-open leak is rewritten to assert fail-closed union masking; a _scope_sql foreign-schema re-scope test added. Co-Authored-By: Claude Opus 4.8 (1M context) --- src/serving/masking.py | 11 ++++++++--- src/serving/semantic_layer/query/sql_builder.py | 8 ++++++-- src/serving/semantic_layer/sql_guard.py | 7 +++++++ tests/unit/test_masking.py | 15 ++++++++++----- tests/unit/test_query_engine.py | 11 +++++++++++ tests/unit/test_sql_guard.py | 11 +++++++++++ 6 files changed, 53 insertions(+), 10 deletions(-) diff --git a/src/serving/masking.py b/src/serving/masking.py index bba87fc..972f37d 100644 --- a/src/serving/masking.py +++ b/src/serving/masking.py @@ -46,10 +46,15 @@ def mask_query_results( entity_types = { table_to_entity[table_name] for table_name in tables if table_name in table_to_entity } - if len(entity_types) != 1: + if not entity_types: return [dict(row) for row in rows], False - entity_type = next(iter(entity_types)) - masked_rows = [self.mask(entity_type, row, tenant) for row in rows] + # Apply every matched entity's masking rules. A multi-entity JOIN must not + # bypass masking — returning the rows unmasked leaked cleartext PII + # (e.g. users_enriched JOIN orders_v2). Mask the union of all matched + # entities rather than failing open. (audit_28_06_26.md #6) + masked_rows = [dict(row) for row in rows] + for entity_type in entity_types: + masked_rows = [self.mask(entity_type, row, tenant) for row in masked_rows] return masked_rows, masked_rows != rows def _extract_table_names(self, sql: str) -> set[str]: diff --git a/src/serving/semantic_layer/query/sql_builder.py b/src/serving/semantic_layer/query/sql_builder.py index 8e1eaee..97c8e35 100644 --- a/src/serving/semantic_layer/query/sql_builder.py +++ b/src/serving/semantic_layer/query/sql_builder.py @@ -101,12 +101,16 @@ def _scope_sql(self: SQLBuilderHost, sql: str, tenant_id: str | None) -> str: table_name = table.name if ( not table_name - or table.db - or table.catalog or table_name.lower() not in known_tables or table_name.lower() in cte_names ): continue + # Force the known table into the caller's tenant schema even if it + # arrived already schema-qualified — defense-in-depth so a qualified + # name can never read another tenant. validate_nl_sql already rejects + # qualified NL SQL; this re-scopes (instead of skipping) any that + # reaches here through another caller. (audit_28_06_26.md #5) + table.set("catalog", None) table.set("db", exp.to_identifier(schema, quoted=True)) table.set("this", exp.to_identifier(table_name, quoted=True)) diff --git a/src/serving/semantic_layer/sql_guard.py b/src/serving/semantic_layer/sql_guard.py index be194cb..8b59f4b 100644 --- a/src/serving/semantic_layer/sql_guard.py +++ b/src/serving/semantic_layer/sql_guard.py @@ -65,6 +65,13 @@ def validate_nl_sql(sql: str, allowed_tables: set[str]) -> None: raise UnsafeSQLError(f"Forbidden node: {type(node).__name__}") if isinstance(node, exp.Table) and not node.name: raise UnsafeSQLError("Table-valued functions not allowed") + if isinstance(node, exp.Table) and (node.db or node.catalog): + # NL SQL must use bare table names so _scope_sql can re-prefix them + # with the caller's tenant schema. A schema/catalog qualifier + # (e.g. victim_schema.orders_v2) would otherwise slip past the + # leaf-name allow-list below AND past _scope_sql's skip-if-qualified + # branch, reading another tenant's data. (audit_28_06_26.md #5) + raise UnsafeSQLError(f"Schema-qualified table names are not allowed: {node.sql()}") if isinstance(node, exp.Func): # sqlglot models some DuckDB scan functions as typed Func nodes # (read_csv -> exp.ReadCSV, read_parquet -> exp.ReadParquet) rather diff --git a/tests/unit/test_masking.py b/tests/unit/test_masking.py index f0749d5..7c4d71a 100644 --- a/tests/unit/test_masking.py +++ b/tests/unit/test_masking.py @@ -145,7 +145,10 @@ def test_mask_query_results_masks_single_entity(tmp_path: Path): assert rows == [{"email": "j***@example.com"}] -def test_mask_query_results_skips_when_multiple_entities(tmp_path: Path): +def test_mask_query_results_masks_union_when_multiple_entities(tmp_path: Path): + """A multi-entity JOIN must mask the union of all matched entities, not fail + open. The old behaviour returned cleartext PII for any query touching !=1 + entity table — a reproduced cross-entity leak (audit_28_06_26.md #6).""" config_path = _write_pii_config( tmp_path / "pii_fields.yaml", """ @@ -164,14 +167,16 @@ def test_mask_query_results_skips_when_multiple_entities(tmp_path: Path): masker = PiiMasker(config_path) rows, masked = masker.mask_query_results( - "SELECT u.email FROM users u JOIN orders o ON u.id = o.user_id", - [{"email": "jane@example.com"}], + "SELECT u.email, o.user_id FROM users u JOIN orders o ON u.id = o.user_id", + [{"email": "jane@example.com", "user_id": "U-123"}], tenant="acme", table_to_entity={"users": "user", "orders": "order"}, ) - assert masked is False - assert rows == [{"email": "jane@example.com"}] + assert masked is True + # both entities' rules are applied: user.email (partial) AND order.user_id (full) + assert rows[0]["email"] == "j***@example.com" + assert rows[0]["user_id"] != "U-123" def test_mask_query_results_returns_unchanged_for_unmapped_table(tmp_path: Path): diff --git a/tests/unit/test_query_engine.py b/tests/unit/test_query_engine.py index 297a8c6..e763f37 100644 --- a/tests/unit/test_query_engine.py +++ b/tests/unit/test_query_engine.py @@ -69,6 +69,17 @@ def test_scope_sql_leaves_comments_untouched(engine: QueryEngine) -> None: assert ("users_enriched", "tenant_a") in _tables(scoped) +def test_scope_sql_rescopes_foreign_schema_qualified_table(engine: QueryEngine) -> None: + # Defense-in-depth (audit_28_06_26.md #5): even if a schema-qualified known + # table reaches _scope_sql (validate_nl_sql rejects it on the NL path), it + # must be forced into the caller's tenant schema, never executed against the + # named foreign schema — otherwise tenant_a reads victim's data. + scoped = engine._scope_sql("SELECT * FROM victim.orders_v2", tenant_id="tenant_a") + + assert ("orders_v2", "tenant_a") in _tables(scoped) + assert ("orders_v2", "victim") not in _tables(scoped) + + def test_query_package_exports_query_engine() -> None: from src.serving.semantic_layer.query import QueryEngine as PackageQueryEngine diff --git a/tests/unit/test_sql_guard.py b/tests/unit/test_sql_guard.py index 5a01100..33fc39e 100644 --- a/tests/unit/test_sql_guard.py +++ b/tests/unit/test_sql_guard.py @@ -41,6 +41,17 @@ ("WITH x AS (DELETE FROM orders_v2 RETURNING id) SELECT * FROM x", "Forbidden node"), # Unparseable SQL must fail closed instead of falling through the guard. ("SELECT * FROM (((", "Unparseable"), + # Schema/catalog-qualified table names are a cross-tenant read vector: the + # leaf-name allow-list below and _scope_sql's skip-if-qualified branch both + # miss them, so victim_schema.orders_v2 would execute against another + # tenant's schema. The guard must reject any qualifier. (audit_28_06_26.md #5) + ("SELECT * FROM acme.orders_v2", "Schema-qualified"), + ('SELECT * FROM "acme"."orders_v2"', "Schema-qualified"), + ( + "SELECT o.* FROM orders_v2 o JOIN victim.users_enriched u ON o.user_id = u.id", + "Schema-qualified", + ), + ("SELECT * FROM cat.acme.orders_v2", "Schema-qualified"), ] From 71348bc5cb60cdcfb50e2c25d9f6e41932e82b62 Mon Sep 17 00:00:00 2001 From: JuliaEdom Date: Sun, 28 Jun 2026 19:04:03 +0300 Subject: [PATCH 3/9] fix(security): SSRF egress guard on webhook/alert target URLs Webhook and alert targets were validated only as AnyHttpUrl, so any tenant could register http://169.254.169.254/, http://127.0.0.1:port or http://10.x and the server would POST to that internal target, returning status/error as an SSRF oracle (audit_28_06_26.md #2, confirmed by two audit passes). New egress_guard.validate_public_url resolves the host and rejects any URL that is not an http(s) target resolving exclusively to public unicast addresses (loopback/private/link-local/reserved/multicast/unspecified all rejected). Applied at registration time (POST /v1/webhooks, /v1/alerts, PUT alert -> 400) and again immediately before each delivery (narrowing the DNS-rebinding window); a delivery to a now-internal host is failed and logged, not fetched. Resolution runs via asyncio.to_thread so it never blocks the event loop. Co-Authored-By: Claude Opus 4.8 (1M context) --- src/serving/api/alerts/escalation.py | 37 +++++++++++- src/serving/api/egress_guard.py | 66 ++++++++++++++++++++ src/serving/api/routers/alerts.py | 11 ++++ src/serving/api/routers/webhooks.py | 6 ++ src/serving/api/webhook_dispatcher.py | 32 ++++++++++ tests/unit/test_egress_guard.py | 86 +++++++++++++++++++++++++++ 6 files changed, 237 insertions(+), 1 deletion(-) create mode 100644 src/serving/api/egress_guard.py create mode 100644 tests/unit/test_egress_guard.py diff --git a/src/serving/api/alerts/escalation.py b/src/serving/api/alerts/escalation.py index 23f4783..3ee326d 100644 --- a/src/serving/api/alerts/escalation.py +++ b/src/serving/api/alerts/escalation.py @@ -8,6 +8,7 @@ import httpx import structlog +from src.serving.api.egress_guard import UnsafeEgressURLError, validate_public_url from src.serving.api.webhook_dispatcher import _event_body, _signature from .evaluator import evaluate_rule @@ -238,13 +239,47 @@ async def deliver( status_code: int | None = None error: str | None = None + target_url = webhook_url or alert.webhook_url + # Re-validate at delivery time (DNS rebinding): a name public at registration + # could now resolve to an internal address. (audit_28_06_26.md #2) + try: + await asyncio.to_thread(validate_public_url, target_url) + except UnsafeEgressURLError as exc: + error = f"unsafe egress URL: {exc}" + log_alert_history( + conn, + delivery_id=delivery_id, + alert=alert, + metric=alert.metric, + current_value=current_value, + previous_value=previous_value, + change_pct=change_pct, + threshold=alert.threshold, + condition=alert.condition, + window=alert.window, + event_type=event_type, + status_code=None, + success=False, + error=error, + payload=payload, + ) + return { + "delivery_id": delivery_id, + "alert_id": alert.id, + "event_type": event_type, + "success": False, + "status_code": None, + "error": error, + "attempts": 0, + } + async with httpx.AsyncClient(timeout=5.0) as client: for attempt in range(1, 4): attempts = attempt error = None try: response = await client.post( - webhook_url or alert.webhook_url, + target_url, content=body, headers=headers, ) diff --git a/src/serving/api/egress_guard.py b/src/serving/api/egress_guard.py new file mode 100644 index 0000000..adc4964 --- /dev/null +++ b/src/serving/api/egress_guard.py @@ -0,0 +1,66 @@ +"""Egress URL guard against SSRF. + +Webhook and alert targets are tenant-controlled URLs that the server fetches. +Without a guard a tenant can point them at loopback / private / link-local / +cloud-metadata addresses and use the delivery result (status / error) as an +SSRF oracle to map and reach the internal network (audit_28_06_26.md #2). + +This module resolves the host and rejects any URL that is not an http(s) target +resolving exclusively to public unicast addresses. It is applied both at +registration time (reject early, 4xx) and immediately before each delivery +(narrowing the DNS-rebinding window — a name that resolved public at creation +could later point at an internal IP). +""" + +from __future__ import annotations + +import ipaddress +import socket +from urllib.parse import urlsplit + +_ALLOWED_SCHEMES = {"http", "https"} + + +class UnsafeEgressURLError(ValueError): + """Raised when an outbound URL is not a public http(s) target.""" + + +def _ip_is_public(ip: str) -> bool: + addr = ipaddress.ip_address(ip) + return not ( + addr.is_private + or addr.is_loopback + or addr.is_link_local + or addr.is_reserved + or addr.is_multicast + or addr.is_unspecified + ) + + +def validate_public_url(url: str) -> None: + """Raise :class:`UnsafeEgressURLError` unless ``url`` is an http(s) URL whose + host resolves *only* to public unicast addresses. + + Resolution is synchronous (``socket.getaddrinfo``); call it via + ``asyncio.to_thread`` on the event loop. IP-literal hosts resolve to + themselves, so loopback/private/link-local literals are rejected without any + network DNS. + """ + parts = urlsplit(url) + scheme = parts.scheme.lower() + if scheme not in _ALLOWED_SCHEMES: + raise UnsafeEgressURLError(f"scheme not allowed: {parts.scheme!r}") + host = parts.hostname + if not host: + raise UnsafeEgressURLError("missing host") + port = parts.port or (443 if scheme == "https" else 80) + try: + infos = socket.getaddrinfo(host, port, proto=socket.IPPROTO_TCP) + except (socket.gaierror, UnicodeError) as exc: + raise UnsafeEgressURLError(f"host does not resolve: {host}") from exc + resolved = {str(info[4][0]) for info in infos} + if not resolved: + raise UnsafeEgressURLError(f"host does not resolve: {host}") + for ip in resolved: + if not _ip_is_public(ip): + raise UnsafeEgressURLError(f"host {host} resolves to non-public address {ip}") diff --git a/src/serving/api/routers/alerts.py b/src/serving/api/routers/alerts.py index 86f0de1..99a55fd 100644 --- a/src/serving/api/routers/alerts.py +++ b/src/serving/api/routers/alerts.py @@ -1,3 +1,4 @@ +import asyncio from typing import Literal, cast from fastapi import APIRouter, HTTPException, Request, Response, status @@ -13,6 +14,7 @@ list_alerts, update_alert, ) +from src.serving.api.egress_guard import UnsafeEgressURLError, validate_public_url router = APIRouter(prefix="/v1/alerts", tags=["alerts"]) @@ -66,6 +68,10 @@ def _validate_metric_request(request: Request, metric: str, window: str) -> None @router.post("", status_code=status.HTTP_201_CREATED) async def register_alert(payload: AlertCreateRequest, request: Request) -> dict[str, object]: _validate_metric_request(request, payload.metric, payload.window) + try: + await asyncio.to_thread(validate_public_url, str(payload.webhook_url)) + except UnsafeEgressURLError as exc: + raise HTTPException(status_code=400, detail=f"Unsafe webhook URL: {exc}") from exc rule = create_alert( get_alert_config_path(request.app), name=payload.name, @@ -101,6 +107,11 @@ async def modify_alert( next_metric = updates.get("metric", existing.metric) next_window = updates.get("window", existing.window) _validate_metric_request(request, next_metric, next_window) + if "webhook_url" in updates: + try: + await asyncio.to_thread(validate_public_url, str(updates["webhook_url"])) + except UnsafeEgressURLError as exc: + raise HTTPException(status_code=400, detail=f"Unsafe webhook URL: {exc}") from exc updated = update_alert(path, alert_id, _tenant(request), updates) if updated is None: diff --git a/src/serving/api/routers/webhooks.py b/src/serving/api/routers/webhooks.py index 6eb0441..25024c8 100644 --- a/src/serving/api/routers/webhooks.py +++ b/src/serving/api/routers/webhooks.py @@ -1,9 +1,11 @@ +import asyncio from datetime import UTC, datetime from typing import cast from fastapi import APIRouter, HTTPException, Request, Response, status from pydantic import AnyHttpUrl, BaseModel, Field +from src.serving.api.egress_guard import UnsafeEgressURLError, validate_public_url from src.serving.api.webhook_dispatcher import ( WebhookDispatcher, WebhookFilters, @@ -32,6 +34,10 @@ def _tenant(request: Request) -> str: @router.post("", status_code=status.HTTP_201_CREATED) async def register_webhook(payload: WebhookCreateRequest, request: Request) -> dict[str, object]: + try: + await asyncio.to_thread(validate_public_url, str(payload.url)) + except UnsafeEgressURLError as exc: + raise HTTPException(status_code=400, detail=f"Unsafe webhook URL: {exc}") from exc registration = create_webhook( get_webhook_config_path(request.app), url=str(payload.url), diff --git a/src/serving/api/webhook_dispatcher.py b/src/serving/api/webhook_dispatcher.py index a891940..0e35dcc 100644 --- a/src/serving/api/webhook_dispatcher.py +++ b/src/serving/api/webhook_dispatcher.py @@ -17,6 +17,8 @@ from fastapi import FastAPI from pydantic import BaseModel, Field +from src.serving.api.egress_guard import UnsafeEgressURLError, validate_public_url + try: import yaml except ImportError: # pragma: no cover @@ -240,6 +242,36 @@ async def deliver(self, webhook: WebhookRegistration, event: dict) -> dict: status_code: int | None = None error: str | None = None + # Re-validate at delivery time too (not only at registration): a hostname + # that resolved to a public IP when the webhook was created could now + # point at an internal address (DNS rebinding). Fail the delivery instead + # of fetching an internal target. (audit_28_06_26.md #2) + try: + await asyncio.to_thread(validate_public_url, webhook.url) + except UnsafeEgressURLError as exc: + error = f"unsafe egress URL: {exc}" + _log_delivery( + conn, + delivery_id=delivery_id, + webhook_id=webhook.id, + event_id=event_id, + event_type=event_type, + attempt=0, + status_code=None, + success=False, + error=error, + ) + return { + "delivery_id": delivery_id, + "webhook_id": webhook.id, + "event_id": event_id, + "event_type": event_type, + "success": False, + "status_code": None, + "error": error, + "attempts": 0, + } + async with httpx.AsyncClient(timeout=5.0) as client: for attempt in range(1, 4): attempts = attempt diff --git a/tests/unit/test_egress_guard.py b/tests/unit/test_egress_guard.py new file mode 100644 index 0000000..3b8cbbd --- /dev/null +++ b/tests/unit/test_egress_guard.py @@ -0,0 +1,86 @@ +"""SSRF egress-guard tests (no Docker, no network). + +IP-literal hosts resolve to themselves via getaddrinfo, so the loopback/private/ +link-local cases need no network; hostname cases mock getaddrinfo. Covers +audit_28_06_26.md #2 — webhook/alert targets must reject internal addresses. +""" + +from __future__ import annotations + +import socket + +import pytest + +from src.serving.api.egress_guard import UnsafeEgressURLError, validate_public_url + + +@pytest.mark.parametrize( + "url", + [ + "http://127.0.0.1/x", # loopback + "http://169.254.169.254/latest/meta-data/", # cloud metadata + "http://10.0.0.5:6379/", # private + "http://192.168.1.1/", # private + "http://172.16.0.1/", # private + "http://[::1]/", # IPv6 loopback + "http://0.0.0.0/", # unspecified + "https://[fd00::1]/", # IPv6 unique-local + "http://[fe80::1]/", # IPv6 link-local + ], +) +def test_validate_rejects_non_public_addresses(url: str) -> None: + with pytest.raises(UnsafeEgressURLError): + validate_public_url(url) + + +@pytest.mark.parametrize( + "url", + [ + "ftp://example.com/x", + "file:///etc/passwd", + "gopher://example.com/", + "http://", # missing host + "not-a-url", # no scheme + ], +) +def test_validate_rejects_bad_scheme_or_host(url: str) -> None: + with pytest.raises(UnsafeEgressURLError): + validate_public_url(url) + + +def test_validate_allows_public_host(monkeypatch: pytest.MonkeyPatch) -> None: + monkeypatch.setattr( + socket, + "getaddrinfo", + lambda *a, **k: [(socket.AF_INET, socket.SOCK_STREAM, 6, "", ("93.184.216.34", 80))], + ) + validate_public_url("http://example.com/hook") # must not raise + + +def test_validate_rejects_public_name_resolving_to_private( + monkeypatch: pytest.MonkeyPatch, +) -> None: + # DNS-rebinding shape: an innocuous host that resolves to an internal IP. + monkeypatch.setattr( + socket, + "getaddrinfo", + lambda *a, **k: [(socket.AF_INET, socket.SOCK_STREAM, 6, "", ("10.1.2.3", 80))], + ) + with pytest.raises(UnsafeEgressURLError): + validate_public_url("http://internal.example.com/") + + +def test_validate_rejects_when_any_resolved_ip_is_private( + monkeypatch: pytest.MonkeyPatch, +) -> None: + # A host resolving to both a public and a private IP must be rejected. + monkeypatch.setattr( + socket, + "getaddrinfo", + lambda *a, **k: [ + (socket.AF_INET, socket.SOCK_STREAM, 6, "", ("93.184.216.34", 80)), + (socket.AF_INET, socket.SOCK_STREAM, 6, "", ("127.0.0.1", 80)), + ], + ) + with pytest.raises(UnsafeEgressURLError): + validate_public_url("http://dual.example.com/") From 5a21c3090ab50acadef301e2217491f994d6d401 Mon Sep 17 00:00:00 2001 From: JuliaEdom Date: Sun, 28 Jun 2026 19:12:20 +0300 Subject: [PATCH 4/9] fix(security): rate-limiter fails closed to local cap; bound un-paginated NL query MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit #7 rate limiter fail-open: on any Redis error check() returned (True, limit, ...), so a Redis outage (or an attacker who can degrade it) disabled per-tenant rate limiting fleet-wide — a brute-force / DoS-amplification window on the expensive NL->SQL and entity paths. It now falls back to the existing per-process sliding window (refactored into _check_local) instead of fail-open. (audit #7) #8 unbounded batch NL query: /v1/query paginates (LIMIT), but /v1/batch calls execute_nl_query which executed the translated SQL with no row cap, so a batch item like "SELECT * FROM orders_v2" (up to 20/request) could stream a whole table into memory. execute_nl_query now wraps the validated SQL in a bounded LIMIT (1000, the paginated max). nosec B608 pin bumped 2->3 with reason. (audit #8) Note: the audit suspected a count-vs-limit off-by-one; checked and it is NOT a bug — the Redis and local paths both admit exactly the configured number of requests; left unchanged. Co-Authored-By: Claude Opus 4.8 (1M context) --- src/serving/api/rate_limiter.py | 32 ++++++++++++------- .../semantic_layer/query/nl_queries.py | 14 +++++++- .../unit/test_query_engine_mixin_contracts.py | 6 +++- tests/unit/test_rate_limiter.py | 14 ++++++-- tests/unit/test_security_tooling_policy.py | 2 +- tests/unit/test_sql_guard.py | 5 ++- 6 files changed, 55 insertions(+), 18 deletions(-) diff --git a/src/serving/api/rate_limiter.py b/src/serving/api/rate_limiter.py index 79bb942..a77fb86 100644 --- a/src/serving/api/rate_limiter.py +++ b/src/serving/api/rate_limiter.py @@ -34,6 +34,21 @@ def __init__( self._time_source = time_source self._windows: dict[str, list[float]] = defaultdict(list) + def _check_local( + self, key: str, limit: int, window_seconds: int, now: float + ) -> tuple[bool, int, int]: + """Per-process sliding-window check (no Redis). Used both when Redis is + unconfigured and as the fail-closed fallback when Redis errors.""" + cutoff = now - window_seconds + window = [stamp for stamp in self._windows[key] if stamp > cutoff] + self._windows[key] = window + if len(window) >= limit: + reset_at = int(window[0] + window_seconds) if window else int(now + window_seconds) + return False, 0, reset_at + window.append(now) + reset_at = int(window[0] + window_seconds) + return True, max(0, limit - len(window)), reset_at + async def check( self, key: str, @@ -43,16 +58,7 @@ async def check( now = self._time_source() reset_at = int(now + window_seconds) if self._redis is None: - cutoff = now - window_seconds - window = [stamp for stamp in self._windows[key] if stamp > cutoff] - self._windows[key] = window - if len(window) >= limit: - if window: - reset_at = int(window[0] + window_seconds) - return False, 0, reset_at - window.append(now) - reset_at = int(window[0] + window_seconds) - return True, max(0, limit - len(window)), reset_at + return self._check_local(key, limit, window_seconds, now) try: pipeline = self._redis.pipeline() @@ -68,7 +74,11 @@ async def check( operation="check", error=str(exc), ) - return True, limit, reset_at + # Fail closed to a per-process cap instead of fail-open: a Redis + # outage must not silently disable rate limiting fleet-wide, which + # would open a brute-force / DoS-amplification window on the + # expensive NL->SQL and entity paths. (audit_28_06_26.md #7) + return self._check_local(key, limit, window_seconds, now) count = int(results[2]) oldest_entry = results[4] diff --git a/src/serving/semantic_layer/query/nl_queries.py b/src/serving/semantic_layer/query/nl_queries.py index adad10f..409a9cb 100644 --- a/src/serving/semantic_layer/query/nl_queries.py +++ b/src/serving/semantic_layer/query/nl_queries.py @@ -19,6 +19,12 @@ tracer = trace.get_tracer("agentflow.query_engine") +# Hard row cap for the un-paginated NL execution path (used by /v1/batch). The +# paginated /v1/query path bounds itself via LIMIT; execute_nl_query did not, so +# a batch item like "SELECT * FROM orders_v2" could stream a whole table into +# memory. Matches the paginated max page size. (audit_28_06_26.md #8) +_MAX_NL_QUERY_ROWS = 1000 + class UnsafeNLQueryError(HTTPException, ValueError): def __init__(self, detail: str) -> None: @@ -217,7 +223,13 @@ def execute_nl_query( ) if resolved_tenant_id is not None: span.set_attribute("tenant_id", resolved_tenant_id) - data = self._backend.execute(sql) + bounded_sql = ( + # sql is prevalidated by validate_nl_sql; bound the row count + # so an un-paginated NL item can't stream a whole table. + f"SELECT * FROM ({sql}) AS bounded_nl_query " # nosec B608 + f"LIMIT {_MAX_NL_QUERY_ROWS}" + ) + data = self._backend.execute(bounded_sql) if span is not None and span.is_recording(): span.set_attribute("row_count", len(data)) except BackendExecutionError as e: diff --git a/tests/unit/test_query_engine_mixin_contracts.py b/tests/unit/test_query_engine_mixin_contracts.py index 025cfac..e3d5066 100644 --- a/tests/unit/test_query_engine_mixin_contracts.py +++ b/tests/unit/test_query_engine_mixin_contracts.py @@ -88,4 +88,8 @@ def test_execute_nl_query_runs_against_minimal_host_contract(host: _MinimalQuery assert result["data"] == [{"order_id": "ORD-1"}] assert result["sql"] == "SELECT * FROM orders_v2" - host._backend.execute.assert_called_once_with("SELECT * FROM orders_v2") + # the executed SQL is wrapped in a bounded LIMIT (audit #8); result["sql"] + # still reports the logical query. + host._backend.execute.assert_called_once_with( + "SELECT * FROM (SELECT * FROM orders_v2) AS bounded_nl_query LIMIT 1000" + ) diff --git a/tests/unit/test_rate_limiter.py b/tests/unit/test_rate_limiter.py index 8cf9762..66cc6a9 100644 --- a/tests/unit/test_rate_limiter.py +++ b/tests/unit/test_rate_limiter.py @@ -237,7 +237,7 @@ async def test_rate_limiter_persists_counts_across_instances() -> None: @pytest.mark.asyncio -async def test_rate_limiter_fails_open_when_redis_is_unavailable( +async def test_rate_limiter_fails_closed_to_local_cap_when_redis_unavailable( monkeypatch: pytest.MonkeyPatch, ) -> None: logger = LoggerSpy() @@ -247,15 +247,23 @@ async def test_rate_limiter_fails_open_when_redis_is_unavailable( monkeypatch.setattr("src.serving.api.rate_limiter.logger", logger) + # A Redis outage must NOT disable limiting fleet-wide (the old fail-open + # behaviour returned remaining==limit). It falls back to a per-process cap: + # the request is allowed but counted. (audit_28_06_26.md #7) allowed, remaining, reset_at = await limiter.check("tenant:key", 2) - assert allowed is True - assert remaining == 2 + assert remaining == 1 # counted locally, not the full limit assert reset_at == 1_060 assert logger.warning_calls == [ ("rate_limiter_unavailable", {"operation": "check", "error": "redis down"}) ] + # The local cap is actually enforced: over the limit of 2, the next call is denied. + await limiter.check("tenant:key", 2) + allowed_over, remaining_over, _ = await limiter.check("tenant:key", 2) + assert allowed_over is False + assert remaining_over == 0 + @pytest.mark.asyncio async def test_rate_limiter_in_memory_fallback_allows_then_blocks( diff --git a/tests/unit/test_security_tooling_policy.py b/tests/unit/test_security_tooling_policy.py index 0cf98fb..3f16f23 100644 --- a/tests/unit/test_security_tooling_policy.py +++ b/tests/unit/test_security_tooling_policy.py @@ -56,7 +56,7 @@ def test_sql_injection_checks_are_not_globally_suppressed() -> None: "src/serving/backends/duckdb_backend.py": 2, "src/serving/semantic_layer/nl_engine.py": 6, "src/serving/semantic_layer/query/entity_queries.py": 3, - "src/serving/semantic_layer/query/nl_queries.py": 2, + "src/serving/semantic_layer/query/nl_queries.py": 3, "src/serving/semantic_layer/search_index.py": 1, } diff --git a/tests/unit/test_sql_guard.py b/tests/unit/test_sql_guard.py index 33fc39e..7b8da54 100644 --- a/tests/unit/test_sql_guard.py +++ b/tests/unit/test_sql_guard.py @@ -107,4 +107,7 @@ def test_execute_nl_query_executes_safe_sql(monkeypatch: pytest.MonkeyPatch) -> assert result["data"] == [{"order_id": "ORD-1"}] assert result["row_count"] == 1 - backend.execute.assert_called_once_with("SELECT * FROM orders_v2") + # execute_nl_query wraps the validated SQL in a bounded LIMIT (audit #8). + backend.execute.assert_called_once_with( + "SELECT * FROM (SELECT * FROM orders_v2) AS bounded_nl_query LIMIT 1000" + ) From d36d140e2210be8177f356b2665bd46ad6fcaeed Mon Sep 17 00:00:00 2001 From: JuliaEdom Date: Sun, 28 Jun 2026 19:16:16 +0300 Subject: [PATCH 5/9] fix(metrics): active_sessions time-bounded; align order-metric status filters #11 active_sessions counted every session ever: the demo write path never sets ended_at, so the metrics WHERE ended_at IS NULL OR ended_at >= NOW()-30min was always true and the count grew monotonically. Re-anchored on started_at (active = started in the last 30 min and not ended), so it is actually time-bounded. #M4 order metrics were mutually inconsistent: revenue and avg_order_value filter status != cancelled but order_count did not, so avg_order_value != revenue/ order_count. Aligned order_count to the same non-cancelled filter and corrected the revenue/order_count descriptions (revenue was labelled completed orders but includes pending/shipped). Co-Authored-By: Claude Opus 4.8 (1M context) --- src/serving/semantic_layer/catalog.py | 22 ++++++++++++++++------ 1 file changed, 16 insertions(+), 6 deletions(-) diff --git a/src/serving/semantic_layer/catalog.py b/src/serving/semantic_layer/catalog.py index f2cd8ce..ee4d67c 100644 --- a/src/serving/semantic_layer/catalog.py +++ b/src/serving/semantic_layer/catalog.py @@ -56,7 +56,7 @@ def _register_defaults(self) -> None: self.register_metric( MetricDefinition( name="revenue", - description="Total revenue from completed orders", + description="Total revenue from non-cancelled orders", sql_template=( "SELECT SUM(total_amount) as value " "FROM orders_v2 " @@ -73,11 +73,16 @@ def _register_defaults(self) -> None: self.register_metric( MetricDefinition( name="order_count", - description="Number of orders placed", + description="Number of non-cancelled orders", sql_template=( + # status filter aligned with revenue / avg_order_value so the + # three metrics are mutually consistent (avg = revenue/count); + # previously order_count counted cancelled orders too, so the + # identity did not hold. (audit_28_06_26.md #M4) "SELECT COUNT(*) as value " "FROM orders_v2 " - "WHERE created_at >= NOW() - INTERVAL '{window}'" + "WHERE status != 'cancelled' " + "AND created_at >= NOW() - INTERVAL '{window}'" ), unit="count", contract_version=self.contract_registry.latest_contract_version( @@ -130,12 +135,17 @@ def _register_defaults(self) -> None: self.register_metric( MetricDefinition( name="active_sessions", - description="Currently active user sessions", + description="Active user sessions (started in last 30 min, not ended)", sql_template=( + # Sessions that started recently and have not ended. The old + # 'ended_at IS NULL OR ended_at >= ...' counted every session + # ever, because the demo write path never sets ended_at, so + # 'ended_at IS NULL' was always true. Anchor on started_at so + # the count is actually time-bounded. (audit_28_06_26.md #11) "SELECT COUNT(*) as value " "FROM sessions_aggregated " - "WHERE ended_at IS NULL " - "OR ended_at >= NOW() - INTERVAL '30 minutes'" + "WHERE started_at >= NOW() - INTERVAL '30 minutes' " + "AND ended_at IS NULL" ), unit="count", available_windows=["now"], From 313fa0aaf3a9234fe074b1b2f91de870e3701b59 Mon Sep 17 00:00:00 2001 From: JuliaEdom Date: Sun, 28 Jun 2026 19:22:12 +0300 Subject: [PATCH 6/9] perf: take per-request side-effects off the event loop; O(1) audit; SCAN not KEYS MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Four hot-path scaling defects (audit_28_06_26.md #13/#14/#15/#16): #13 record_usage ran a connection-per-request DuckDB write with a blocking time.sleep retry inline in the async auth middleware, freezing the event loop on every authenticated request. Offloaded via run_in_threadpool. #14 the hash-chained audit publisher re-read the ENTIRE growing log file on every publish to fetch only the last line — O(file) per request, O(n^2) over the log's lifetime, on the event loop via #13. Now caches (last_hash, sequence) in memory after a one-time tail read (it is the only writer, append-only under the lock). #15 metric-cache invalidation used Redis KEYS metric:* (O(keyspace), blocks single-threaded Redis for all clients) roughly every 2s under ingestion. Switched to cursor-based SCAN. #16 the search-index rebuild full-scanned and re-tokenized every entity table on the event loop every 60s. Offloaded via run_in_threadpool. Co-Authored-By: Claude Opus 4.8 (1M context) --- src/serving/api/auth/middleware.py | 6 +++++- src/serving/audit_publisher.py | 14 +++++++++++++- src/serving/cache.py | 17 +++++++++++++---- src/serving/semantic_layer/search_index.py | 7 ++++++- tests/unit/test_cache.py | 7 +++++++ 5 files changed, 44 insertions(+), 7 deletions(-) diff --git a/src/serving/api/auth/middleware.py b/src/serving/api/auth/middleware.py index 3fe8fca..a658584 100644 --- a/src/serving/api/auth/middleware.py +++ b/src/serving/api/auth/middleware.py @@ -9,6 +9,7 @@ import structlog from fastapi import Header, HTTPException, Request, Response from fastapi.responses import JSONResponse +from starlette.concurrency import run_in_threadpool from src.constants import DEFAULT_RATE_LIMIT_WINDOW_SECONDS, FAILED_AUTH_WINDOW_SECONDS from src.serving.api.metrics import AUTH_FAILURES @@ -106,7 +107,10 @@ async def __call__( ) manager.clear_failed_auth(client_ip) - manager.record_usage(tenant_key, path) + # record_usage opens a DuckDB connection, writes, and retries with a + # blocking sleep; running it inline froze the event loop on every + # authenticated request. Offload to a worker thread. (audit_28_06_26.md #13) + await run_in_threadpool(manager.record_usage, tenant_key, path) is_allowed, remaining, reset_at = await manager.check_rate_limit(tenant_key) rate_limit_headers = { "X-RateLimit-Limit": str(tenant_key.rate_limit_rpm), diff --git a/src/serving/audit_publisher.py b/src/serving/audit_publisher.py index e0abb8a..50e470a 100644 --- a/src/serving/audit_publisher.py +++ b/src/serving/audit_publisher.py @@ -24,10 +24,20 @@ class HashChainedFileAuditPublisher: def __init__(self, path: Path | str) -> None: self._path = Path(path) self._lock = threading.Lock() + self._cached_hash: str | None = None + self._cached_sequence: int | None = None # None until the tail is read once def publish(self, payload: Mapping[str, object]) -> None: with self._lock: - previous_hash, sequence = _last_hash_and_sequence(self._path) + if self._cached_sequence is None: + # Read the tail once per process to resume the chain. Subsequent + # writes use the in-memory head — this is the only writer and the + # log is append-only under the lock — instead of re-reading the + # whole growing file every call, which was O(file) per request + # and O(n^2) over the log's lifetime. (audit_28_06_26.md #14) + self._cached_hash, self._cached_sequence = _last_hash_and_sequence(self._path) + previous_hash = self._cached_hash + sequence = self._cached_sequence record: dict[str, object] = { "sequence": sequence + 1, "previous_hash": previous_hash, @@ -38,6 +48,8 @@ def publish(self, payload: Mapping[str, object]) -> None: with self._path.open("a", encoding="utf-8", newline="\n") as handle: handle.write(json.dumps(record, sort_keys=True, default=str)) handle.write("\n") + self._cached_hash = str(record["hash"]) + self._cached_sequence = sequence + 1 def build_audit_publisher_from_env() -> AuditPublisher: diff --git a/src/serving/cache.py b/src/serving/cache.py index 97c371f..1053bc5 100644 --- a/src/serving/cache.py +++ b/src/serving/cache.py @@ -106,8 +106,20 @@ async def invalidate_metrics(self) -> None: error="redis package not installed", ) return + # SCAN (cursor-based, non-blocking) instead of KEYS, which is O(keyspace) + # and blocks single-threaded Redis for all clients on every call — and + # this runs roughly every 2s under steady ingestion. (audit_28_06_26.md #15) try: - keys = await self._redis.keys("metric:*") + cursor = 0 + while True: + cursor, keys = await self._redis.scan(cursor, match="metric:*", count=500) + if keys: + normalized_keys = [ + key.decode() if isinstance(key, bytes) else key for key in keys + ] + await self._redis.delete(*normalized_keys) + if cursor == 0: + break except Exception as exc: logger.warning( "query_cache_unavailable", @@ -115,9 +127,6 @@ async def invalidate_metrics(self) -> None: error=str(exc), ) return - normalized_keys = [key.decode() if isinstance(key, bytes) else key for key in keys] - if normalized_keys: - await self._redis.delete(*normalized_keys) async def close(self) -> None: if self._redis is None: diff --git a/src/serving/semantic_layer/search_index.py b/src/serving/semantic_layer/search_index.py index 94ac843..a9465a9 100644 --- a/src/serving/semantic_layer/search_index.py +++ b/src/serving/semantic_layer/search_index.py @@ -8,6 +8,7 @@ import duckdb import structlog +from starlette.concurrency import run_in_threadpool from src.serving.semantic_layer.catalog import DataCatalog, EntityDefinition, MetricDefinition from src.serving.semantic_layer.query_engine import QueryEngine @@ -77,7 +78,11 @@ async def rebuild_periodically(self, interval_seconds: int = 60) -> None: while True: try: await asyncio.sleep(interval_seconds) - self.rebuild() + # rebuild() full-scans and re-tokenizes every entity table and + # runs a live metric query per metric; running it inline froze + # the event loop for the whole scan every interval. Offload it to + # a worker thread. (audit_28_06_26.md #16) + await run_in_threadpool(self.rebuild) except asyncio.CancelledError: raise except Exception: diff --git a/tests/unit/test_cache.py b/tests/unit/test_cache.py index 19d0cf7..ed03b28 100644 --- a/tests/unit/test_cache.py +++ b/tests/unit/test_cache.py @@ -39,6 +39,13 @@ async def keys(self, pattern: str): prefix = pattern[:-1] if pattern.endswith("*") else pattern return [key for key in self.data if key.startswith(prefix)] + async def scan(self, cursor: int, match: str = "*", count: int = 100): + # Single-batch fake: returns all matching keys with a terminal cursor 0. + if self.raise_on_keys is not None: + raise self.raise_on_keys + prefix = match[:-1] if match.endswith("*") else match + return 0, [key for key in self.data if key.startswith(prefix)] + async def delete(self, *keys: str): self.deleted.append(keys) for key in keys: From 6d4b4bd9e97a67f79783bd0b45d723a9a4a93bda Mon Sep 17 00:00:00 2001 From: JuliaEdom Date: Sun, 28 Jun 2026 19:27:22 +0300 Subject: [PATCH 7/9] fix(outbox): offload blocking Kafka flush off the event loop MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit run_forever is an asyncio task but called the fully synchronous process_pending, whose per-row producer.flush(10) blocked the event loop for up to 10s per pending message against a slow/unreachable broker — freezing all HTTP/SSE traffic. (audit_28_06_26.md #1) Adds process_pending_async/_process_row_async used by run_forever: the DuckDB read/mark-sent/schedule-retry stay on the loop (the connection may be shared with the query engine, so it must not be touched from a worker thread), while the blocking Kafka produce+flush is offloaded via asyncio.to_thread. The synchronous process_pending/process_entry are kept for direct/test use. Co-Authored-By: Claude Opus 4.8 (1M context) --- src/processing/outbox.py | 59 ++++++++++++++++++++++++++++- tests/unit/test_outbox_processor.py | 31 +++++++++++++++ 2 files changed, 89 insertions(+), 1 deletion(-) diff --git a/src/processing/outbox.py b/src/processing/outbox.py index 5d8dfc1..884a6cc 100644 --- a/src/processing/outbox.py +++ b/src/processing/outbox.py @@ -77,7 +77,7 @@ async def run_forever(self) -> None: while True: await asyncio.sleep(2) try: - self.process_pending() + await self.process_pending_async() except duckdb.Error as exc: logger.warning( "outbox_processing_failed", @@ -107,6 +107,63 @@ def process_pending(self, limit: int = 100) -> int: processed += 1 return processed + async def process_pending_async(self, limit: int = 100) -> int: + """Async variant used by run_forever. + + DuckDB reads/updates stay on the event loop (the connection may be shared + with the query engine, so it must not be touched from a worker thread), + but the blocking Kafka produce+flush(10) is offloaded so a slow or + unreachable broker can't freeze the whole event loop. (audit_28_06_26.md #1) + """ + rows = self._connection.execute( + """ + SELECT id, event_id, payload, topic, retry_count + FROM outbox + WHERE status = 'pending' + AND (next_attempt_at IS NULL OR next_attempt_at <= ?) + ORDER BY created_at + LIMIT ? + """, + [datetime.now(UTC), limit], + ).fetchall() + processed = 0 + for row in rows: + if await self._process_row_async(row): + processed += 1 + return processed + + async def _process_row_async(self, row: tuple[Any, ...]) -> bool: + outbox_id, event_id, payload, topic, retry_count = row + decoded_payload = self._decode_payload(payload) + try: + await asyncio.to_thread(self._producer, topic, decoded_payload) + except (BufferError, ConnectionError, TimeoutError, KafkaException, RuntimeError) as exc: + error_message = str(exc) + if isinstance(exc, RuntimeError) and not ( + error_message.startswith("KafkaError{") + or "Kafka message(s) were not delivered" in error_message + ): + raise + next_retry_count = int(retry_count or 0) + 1 + logger.warning( + "outbox_delivery_retry_scheduled", + outbox_id=outbox_id, + event_id=event_id, + topic=topic, + retry_count=next_retry_count, + error=error_message, + exc_info=True, + ) + self._schedule_retry( + outbox_id=outbox_id, + event_id=event_id, + retry_count=next_retry_count, + error_message=error_message, + ) + return False + self._mark_sent(outbox_id=outbox_id, event_id=event_id) + return True + def process_entry(self, outbox_id: str) -> bool: row = self._connection.execute( """ diff --git a/tests/unit/test_outbox_processor.py b/tests/unit/test_outbox_processor.py index 7ef5044..59b031c 100644 --- a/tests/unit/test_outbox_processor.py +++ b/tests/unit/test_outbox_processor.py @@ -122,6 +122,37 @@ def test_skips_rows_with_future_next_attempt(self, conn: duckdb.DuckDBPyConnecti assert processor.process_pending() == 0 assert spy.calls == [] + @pytest.mark.asyncio + async def test_process_pending_async_dispatches_and_marks_sent( + self, conn: duckdb.DuckDBPyConnection + ) -> None: + # run_forever uses the async variant (audit #1): it offloads the blocking + # Kafka produce to a thread but keeps DuckDB on the loop. + spy = _SpyProducer() + processor = _processor(conn, spy) + _insert_outbox(conn, outbox_id="o1", event_id="evt-1") + _insert_outbox(conn, outbox_id="o2", event_id="evt-2") + + processed = await processor.process_pending_async() + + assert processed == 2 + assert {topic for topic, _ in spy.calls} == {TOPIC} + assert _status(conn, "o1")[0] == "sent" + assert _status(conn, "o2")[0] == "sent" + + @pytest.mark.asyncio + async def test_process_pending_async_schedules_retry_on_kafka_error( + self, conn: duckdb.DuckDBPyConnection + ) -> None: + producer = _RaisingProducer(RuntimeError("KafkaError{code=_MSG_TIMED_OUT}")) + processor = _processor(conn, producer) + _insert_outbox(conn, outbox_id="o1") + + processed = await processor.process_pending_async() + + assert processed == 0 + assert _status(conn, "o1")[0] == "pending" # rescheduled, not marked sent + class TestProcessEntry: def test_dispatches_single_entry(self, conn: duckdb.DuckDBPyConnection) -> None: From d3a72bfe0f09261251a36e156d94fba1e28e65be Mon Sep 17 00:00:00 2001 From: JuliaEdom Date: Sun, 28 Jun 2026 19:32:31 +0300 Subject: [PATCH 8/9] fix(alerts): advance alert state only on successful delivery MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit dispatch_alert set fired_at / last_escalation_level and returned regardless of whether deliver() succeeded, so a timed-out level-1 page recorded the alert as fired and the on-call was never notified — and a single-step alert would not re-notify until cooldown (default 30 min). (audit_28_06_26.md #4) State transitions are now gated on delivery success: on a failed page the fire branch leaves fired_at=None and the escalation branch leaves last_escalation_level unchanged, so the next evaluation tick (the existing periodic dispatch loop) re-attempts delivery instead of going silent. Adds no-Docker unit tests for the fire-success, fire-failure, and escalation-failure paths. Co-Authored-By: Claude Opus 4.8 (1M context) --- src/serving/api/alerts/escalation.py | 38 ++++--- tests/unit/test_alert_escalation_delivery.py | 109 +++++++++++++++++++ 2 files changed, 133 insertions(+), 14 deletions(-) create mode 100644 tests/unit/test_alert_escalation_delivery.py diff --git a/src/serving/api/alerts/escalation.py b/src/serving/api/alerts/escalation.py index 3ee326d..80fed6d 100644 --- a/src/serving/api/alerts/escalation.py +++ b/src/serving/api/alerts/escalation.py @@ -79,10 +79,6 @@ async def dispatch_alert( return alert, True, 0 if current_triggered and alert.fired_at is None: - alert.fired_at = now - alert.resolved_at = None - alert.state = "firing" - alert.last_escalation_level = 1 payload = { "alert_id": alert.id, "alert_name": alert.name, @@ -101,7 +97,7 @@ async def dispatch_alert( payload["previous_value"] = evaluation["previous_value"] if evaluation["change_pct"] is not None: payload["change_pct"] = evaluation["change_pct"] - await deliver( + result = await deliver( dispatcher, alert, payload, @@ -111,6 +107,17 @@ async def dispatch_alert( change_pct=evaluation["change_pct"], webhook_url=alert.escalation[0].webhook_url, ) + if not result.get("success"): + # Delivery failed: do NOT advance fired state, so the next evaluation + # tick re-attempts the page instead of recording the alert as fired + # and going silent until cooldown. (audit_28_06_26.md #4) + alert.last_condition_triggered = True + alert.updated_at = now + return alert, True, 0 + alert.fired_at = now + alert.resolved_at = None + alert.state = "firing" + alert.last_escalation_level = 1 alert.last_triggered_at = now alert.last_condition_triggered = True alert.updated_at = now @@ -139,7 +146,7 @@ async def dispatch_alert( payload["previous_value"] = evaluation["previous_value"] if evaluation["change_pct"] is not None: payload["change_pct"] = evaluation["change_pct"] - await deliver( + result = await deliver( dispatcher, alert, payload, @@ -153,14 +160,17 @@ async def dispatch_alert( change_pct=evaluation["change_pct"], webhook_url=next_step.webhook_url, ) - alert.last_triggered_at = now - alert.last_escalation_level = max( - alert.last_escalation_level, - next_step.level, - ) - alert.updated_at = now - triggered += 1 - alert_changed = True + if result.get("success"): + alert.last_triggered_at = now + alert.last_escalation_level = max( + alert.last_escalation_level, + next_step.level, + ) + alert.updated_at = now + triggered += 1 + alert_changed = True + # else: leave last_escalation_level unchanged so the next evaluation + # tick re-attempts this escalation step. (audit_28_06_26.md #4) alert.state = "sustained" alert.last_condition_triggered = True return alert, alert_changed, triggered diff --git a/tests/unit/test_alert_escalation_delivery.py b/tests/unit/test_alert_escalation_delivery.py new file mode 100644 index 0000000..b34d45f --- /dev/null +++ b/tests/unit/test_alert_escalation_delivery.py @@ -0,0 +1,109 @@ +"""Alert state advances only on successful delivery (no Docker). + +evaluate_rule and deliver are monkeypatched so the escalation state machine can +be driven without a live DuckDB/metric/HTTP stack. Covers audit_28_06_26.md #4: +a failed page must NOT record the alert as fired/escalated (which would go silent +until cooldown) — the next evaluation tick re-attempts instead. +""" + +from __future__ import annotations + +from datetime import UTC, datetime + +import pytest + +import src.serving.api.alerts.escalation as escalation +from src.serving.api.alerts.dispatcher import AlertEscalationStep, AlertRule + +_NOW = datetime(2026, 6, 28, 12, 0, tzinfo=UTC) + + +def _alert(**overrides: object) -> AlertRule: + defaults: dict[str, object] = { + "id": "a1", + "name": "High error rate", + "tenant": "acme", + "metric": "error_rate", + "window": "1h", + "condition": "above", + "threshold": 0.1, + "webhook_url": "https://hooks.example.com/x", + "secret": "s", + "created_at": _NOW, + "updated_at": _NOW, + } + defaults.update(overrides) + return AlertRule(**defaults) + + +def _patch_eval(monkeypatch: pytest.MonkeyPatch, *, triggered: bool) -> None: + monkeypatch.setattr( + escalation, + "evaluate_rule", + lambda dispatcher, alert, now: { + "triggered": triggered, + "current_value": 0.5, + "previous_value": None, + "change_pct": None, + }, + ) + + +def _patch_deliver(monkeypatch: pytest.MonkeyPatch, *, success: bool) -> None: + async def _deliver(*args: object, **kwargs: object) -> dict[str, object]: + return {"success": success, "error": None if success else "timeout"} + + monkeypatch.setattr(escalation, "deliver", _deliver) + + +@pytest.mark.asyncio +async def test_fire_advances_state_on_delivery_success(monkeypatch: pytest.MonkeyPatch) -> None: + _patch_eval(monkeypatch, triggered=True) + _patch_deliver(monkeypatch, success=True) + + alert, changed, triggered = await escalation.dispatch_alert(None, _alert(), _NOW) + + assert alert.fired_at == _NOW + assert alert.state == "firing" + assert alert.last_escalation_level == 1 + assert triggered == 1 + + +@pytest.mark.asyncio +async def test_fire_does_not_advance_state_on_delivery_failure( + monkeypatch: pytest.MonkeyPatch, +) -> None: + _patch_eval(monkeypatch, triggered=True) + _patch_deliver(monkeypatch, success=False) + + alert, changed, triggered = await escalation.dispatch_alert(None, _alert(), _NOW) + + # fired_at stays None so the next evaluation tick re-attempts the page rather + # than going silent until cooldown. + assert alert.fired_at is None + assert alert.state != "firing" + assert triggered == 0 + + +@pytest.mark.asyncio +async def test_escalation_level_not_advanced_on_delivery_failure( + monkeypatch: pytest.MonkeyPatch, +) -> None: + _patch_eval(monkeypatch, triggered=True) + _patch_deliver(monkeypatch, success=False) + + fired = datetime(2026, 6, 28, 11, 0, tzinfo=UTC) # 60 min before _NOW + alert = _alert( + escalation=[ + AlertEscalationStep(level=2, after_minutes=10, webhook_url="https://h.example.com/l2") + ], + fired_at=fired, + state="firing", + last_escalation_level=1, + last_condition_triggered=True, + ) + + result, changed, triggered = await escalation.dispatch_alert(None, alert, _NOW) + + assert result.last_escalation_level == 1 # not advanced to 2 on failed delivery + assert triggered == 0 From 8ff4200cc17f21ba214f24a661015ab50306d82c Mon Sep 17 00:00:00 2001 From: JuliaEdom Date: Sun, 28 Jun 2026 19:51:00 +0300 Subject: [PATCH 9/9] test(integration): adapt suites to the audit fixes (CI-caught regressions) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit CI integration (Docker, not runnable on the dev host) surfaced 26 failures from this branch's own changes — exactly why this went through a PR: - SSRF egress guard (#2) rejected the reserved .test hostnames the webhook/alert suites use (agent.test etc. do not resolve), failing create with 400 and blocking deliveries — which then cascaded into the alert-dedup suite via the #4 success-gating. Added an autouse integration fixture that resolves only .test names to a public IP, so the guard stays active (real loopback/private rejection intact; logic unit-tested in test_egress_guard) while the mocked- httpx delivery paths remain exercisable. - outbox ProcessorStub now implements process_pending_async (run_forever calls the async variant after #1). - clickhouse live order_count expectation 8 -> 7 (now excludes the 1 cancelled seed order, #M4). Co-Authored-By: Claude Opus 4.8 (1M context) --- tests/integration/conftest.py | 21 +++++++++++++++++++ .../test_clickhouse_backend_live.py | 4 +++- tests/integration/test_outbox.py | 2 +- 3 files changed, 25 insertions(+), 2 deletions(-) diff --git a/tests/integration/conftest.py b/tests/integration/conftest.py index ff8dc92..defd5f7 100644 --- a/tests/integration/conftest.py +++ b/tests/integration/conftest.py @@ -28,6 +28,27 @@ def _default_open_auth(request, monkeypatch): monkeypatch.setenv("AGENTFLOW_AUTH_DISABLED", "true") +@pytest.fixture(autouse=True) +def _resolve_reserved_test_hosts(monkeypatch): + """Let the SSRF egress guard accept the reserved ``.test`` hostnames the + webhook/alert suites use (agent.test, example.test) with a mocked httpx + client. RFC 6761 ``.test`` names do not resolve, so the guard + (audit_28_06_26.md #2) would reject them with 400/failed-delivery before the + mock. We map only ``.test`` to a public IP; every other host uses the real + resolver, so loopback/private rejection still holds. The guard's own logic + is unit-tested in test_egress_guard.py.""" + import socket + + real_getaddrinfo = socket.getaddrinfo + + def _fake_getaddrinfo(host, *args, **kwargs): + if isinstance(host, str) and host.endswith(".test"): + return [(socket.AF_INET, socket.SOCK_STREAM, 6, "", ("93.184.216.34", 80))] + return real_getaddrinfo(host, *args, **kwargs) + + monkeypatch.setattr(socket, "getaddrinfo", _fake_getaddrinfo) + + def pytest_configure(config): config.addinivalue_line("markers", "kind: marks tests requiring a kind cluster") config.addinivalue_line( diff --git a/tests/integration/test_clickhouse_backend_live.py b/tests/integration/test_clickhouse_backend_live.py index 41433a8..1f0a3c4 100644 --- a/tests/integration/test_clickhouse_backend_live.py +++ b/tests/integration/test_clickhouse_backend_live.py @@ -89,7 +89,9 @@ def test_seeded_metrics_return_plausible_values(backend): order_count = backend.scalar( catalog.metrics["order_count"].sql_template.format(window="24 hours") ) - assert int(order_count) == 8 + # order_count now excludes cancelled orders (aligned with revenue/avg; + # audit_28_06_26.md #M4): 8 seeded - 1 cancelled = 7. + assert int(order_count) == 7 error_rate = backend.scalar( catalog.metrics["error_rate"].sql_template.format(window="24 hours") diff --git a/tests/integration/test_outbox.py b/tests/integration/test_outbox.py index 810cdb3..e4ac011 100644 --- a/tests/integration/test_outbox.py +++ b/tests/integration/test_outbox.py @@ -332,7 +332,7 @@ class ProcessorStub: def __init__(self) -> None: self.closed = False - def process_pending(self) -> int: + async def process_pending_async(self) -> int: raise RuntimeError("unexpected processor bug") def close(self) -> None: