From fddbacd3733618113d57a55051f5a9d82330f55e Mon Sep 17 00:00:00 2001 From: Yian Shang Date: Sun, 24 May 2026 11:09:23 -0700 Subject: [PATCH 01/11] Reduce aggressive pushdown --- .../construction/build_v3/cte.py | 22 +- .../construction/build_v3/measures.py | 30 +- .../build_v3/measures_sql_test.py | 345 ++++++++++++++++++ 3 files changed, 391 insertions(+), 6 deletions(-) diff --git a/datajunction-server/datajunction_server/construction/build_v3/cte.py b/datajunction-server/datajunction_server/construction/build_v3/cte.py index ef56fe959..cfd33cda8 100644 --- a/datajunction-server/datajunction_server/construction/build_v3/cte.py +++ b/datajunction-server/datajunction_server/construction/build_v3/cte.py @@ -1967,6 +1967,7 @@ def collect_node_ctes( needed_columns_by_node: Optional[dict[str, set[str]]] = None, injected_filters: Optional[dict[str, ast.Expression]] = None, pushdown: Optional[PushdownFilters] = None, + parent_node: Optional[Node] = None, ) -> tuple[list[tuple[str, ast.Query]], list[str], dict[str, set[str]]]: """ Collect CTEs for all non-source nodes, recursively expanding table references. @@ -2114,7 +2115,26 @@ def collect_refs(node: Node, visited: set[str]) -> None: if injected_filters and node.name in injected_filters: _inject_filter_into_where(query_ast, injected_filters[node.name]) - if pushdown: + # Pushdown is restricted to the parent CTE (the metric's + # direct source) and to dimension CTEs. Pushing into + # upstream transform CTEs is unsafe: a transform may apply + # aggregations (e.g. ``MIN(utc_date) AS first_play_date`` + # then renamed back to ``utc_date``) so a column with the + # same name at a downstream CTE has different semantics + # than at the upstream CTE. Pushing a predicate past the + # aggregation barrier silently changes which rows feed the + # aggregation, and therefore changes the result. DJ + # doesn't currently detect when an upstream transform's + # dim-link column is post-aggregation vs passthrough — so + # we conservatively skip cross-CTE pushdown into + # transforms. Matches v2 behavior. ``parent_node = None`` + # preserves the unrestricted pre-v2-parity behavior for + # callers that haven't opted in yet. + if pushdown is not None and ( + parent_node is None + or node is parent_node + or node.type == NodeType.DIMENSION + ): injections, consumed = _resolve_pushdown_filters_for_cte( node, query_ast, diff --git a/datajunction-server/datajunction_server/construction/build_v3/measures.py b/datajunction-server/datajunction_server/construction/build_v3/measures.py index c2fd8df64..a7c193c28 100644 --- a/datajunction-server/datajunction_server/construction/build_v3/measures.py +++ b/datajunction-server/datajunction_server/construction/build_v3/measures.py @@ -1434,6 +1434,7 @@ def build_select_ast( ) if all_filters else None, + parent_node=parent_node, ) # Surface all CTE-consumed filters so the metrics layer's outer WHERE # can skip re-applying them on top of the aggregation CTEs. @@ -1929,12 +1930,31 @@ def build_grain_group_sql( component_aliases[component.name] = component_alias continue - # Skip LIMITED aggregability components with no aggregation - # These are represented by grain columns instead. - # grain_alias was set by _make_component: plain column → column name, - # complex expression → component.name. + # LIMITED aggregability with no aggregation (e.g. COUNT DISTINCT). + # These are represented by grain columns rather than aggregated + # expressions. grain_alias was set by _make_component: plain column + # → bare column name; complex expression (CASE, IF, …) → + # component.name. + # + # When grain_alias != component.name (plain-column case), the bare + # column is already in the projection as a grain column, but + # external consumers (e.g. XP/ABlaze) reference the component by + # its hash-suffixed name (the metric's published + # ``derived_expression``). Emit an additional projection that + # aliases the bare column to the component name so the + # component-name reference resolves against the measures table. + # Internal metric SQL still uses ``component_aliases`` to bind to + # the bare column. if component.rule.type == Aggregability.LIMITED and not component.aggregation: - component_aliases[component.name] = component.grain_alias or component.name + grain_alias = component.grain_alias or component.name + component_aliases[component.name] = grain_alias + if grain_alias != component.name: + component_expressions.append( + (component.name, make_column_ref(grain_alias)), + ) + component_metadata.append( + (component.name, component, metric_node), + ) continue # Always use component.name for consistency - no special case for single-component diff --git a/datajunction-server/tests/construction/build_v3/measures_sql_test.py b/datajunction-server/tests/construction/build_v3/measures_sql_test.py index 8d267bdea..cea110d57 100644 --- a/datajunction-server/tests/construction/build_v3/measures_sql_test.py +++ b/datajunction-server/tests/construction/build_v3/measures_sql_test.py @@ -9806,3 +9806,348 @@ async def test_alias_substitution_skips_subquery_alias_collision( """, normalize_aliases=True, ) + + @pytest.mark.asyncio + async def test_count_distinct_plain_column_component_in_output( + self, + client_with_build_v3, + ): + """A ``COUNT(DISTINCT account_id)`` metric's derived + expression references the component by its hash-suffixed + name (e.g. ``account_id_distinct_``), but for + plain-column DISTINCT components measures.py used to + register the bare column as the SQL alias and SKIP emitting + a hash-named projection — leaving downstream consumers + (XP/ABlaze) unable to resolve the column referenced in the + metric's published ``derived_expression``. + + The fix emits an additional `` AS + `` projection so the component-name + reference resolves against the measures table. This test + pins both the SQL shape AND verifies that every column + referenced in the metric's derived_expression actually + appears as a column in the measures SQL output. + """ + client = client_with_build_v3 + + resp = await client.post( + "/nodes/source/", + json={ + "name": "v3.src_distinct_events", + "columns": [ + {"name": "account_id", "type": "int"}, + {"name": "event_date", "type": "int"}, + {"name": "value", "type": "double"}, + ], + "mode": "published", + "catalog": "default", + "schema_": "v3", + "table": "distinct_events", + }, + ) + assert resp.status_code in (200, 201), resp.json() + resp = await client.post( + "/nodes/transform/", + json={ + "name": "v3.distinct_events_xform", + "query": ( + "SELECT account_id, event_date, value FROM v3.src_distinct_events" + ), + "mode": "published", + "primary_key": ["account_id", "event_date"], + }, + ) + assert resp.status_code == 201, resp.json() + resp = await client.post( + "/nodes/metric/", + json={ + "name": "v3.distinct_account_count", + "query": ( + "SELECT COUNT(DISTINCT account_id) FROM v3.distinct_events_xform" + ), + "mode": "published", + }, + ) + assert resp.status_code == 201, resp.json() + + response = await client.get( + "/sql/measures/v3/", + params={ + "metrics": ["v3.distinct_account_count"], + "dimensions": ["v3.distinct_events_xform.event_date"], + }, + ) + assert response.status_code == 200, response.json() + body = response.json() + gg = get_first_grain_group(body) + sql = gg["sql"] + + # Find the component name (hash-suffixed) emitted for the + # ``COUNT(DISTINCT account_id)`` decomposition. It should + # be present both as a registered component and as a SQL + # column in the projection. + comp_names = [c["name"] for c in gg["components"]] + distinct_comp = next( + (n for n in comp_names if n.startswith("account_id_distinct_")), + None, + ) + assert distinct_comp is not None, ( + f"Expected an ``account_id_distinct_`` component " + f"in the metric decomposition; got {comp_names}" + ) + + assert_sql_equal( + sql, + f""" + WITH v3_distinct_events_xform AS ( + SELECT account_id, event_date, value + FROM default.v3.distinct_events + ) + SELECT t1.event_date, + t1.account_id, + t1.account_id AS {distinct_comp} + FROM v3_distinct_events_xform t1 + GROUP BY t1.event_date, t1.account_id + """, + normalize_aliases=True, + ) + + # Every column referenced in the metric's derived + # expression / combiner must exist as a column in the + # measures SQL output, otherwise downstream (XP/ABlaze) + # can't evaluate the expression against the materialized + # measures table. + combiner = next( + f["combiner"] + for f in body["metric_formulas"] + if f["name"] == "v3.distinct_account_count" + ) + output_col_names = {c["name"] for c in gg["columns"]} + # Extract bare identifiers from the combiner — anything + # that looks like a column reference. Drop SQL keywords. + import re + + SQL_KW = { + "COUNT", + "DISTINCT", + "SUM", + "AVG", + "MIN", + "MAX", + "CASE", + "WHEN", + "THEN", + "ELSE", + "END", + "IS", + "NOT", + "NULL", + "AND", + "OR", + "AS", + "TRUE", + "FALSE", + } + referenced = { + tok + for tok in re.findall(r"\b[A-Za-z_][A-Za-z0-9_]*\b", combiner) + if tok.upper() not in SQL_KW + } + # Each non-keyword identifier in the combiner must resolve + # against the measures SQL output columns. + missing = referenced - output_col_names + assert not missing, ( + f"Derived expression references columns missing from " + f"the measures SQL output.\n" + f" combiner: {combiner}\n" + f" referenced: {sorted(referenced)}\n" + f" output cols: {sorted(output_col_names)}\n" + f" missing: {sorted(missing)}" + ) + + @pytest.mark.asyncio + async def test_filter_does_not_push_past_upstream_aggregation_barrier( + self, + client_with_build_v3, + ): + """Filter pushdown must not cross an upstream transform's + aggregation barrier (e.g. ``MIN(utc_date) AS firstplay_date`` + renamed back to ``utc_date``) — the column has the same + name at both layers but different semantics, and pushing + a predicate past the MIN changes which rows feed the + aggregation, silently inflating the result. + + Real regression from a first-play metric where v3 pushed + ``utc_date BETWEEN ...`` into the upstream + ``profile_game_engagement`` CTE, turning + "lifetime-first-play in window" into "any in-window play" + — roughly doubling the metric. + + Cross-CTE pushdown into upstream transform CTEs is now + gated on ``node is parent_node or node.type == + DIMENSION``. This test pins that the upstream transform's + WHERE only carries its authored predicate (no injected + date filter), while the parent CTE's WHERE correctly + carries the date filter. + """ + client = client_with_build_v3 + + # Upstream transform with an aggregation that renames the + # filter column post-MIN. Real-world shape: a per-account + # "first event date" derived from a per-account daily + # event source. + resp = await client.post( + "/nodes/source/", + json={ + "name": "v3.src_daily_events_agg", + "columns": [ + {"name": "account_id", "type": "int"}, + {"name": "title_id", "type": "int"}, + {"name": "utc_date", "type": "int"}, + ], + "mode": "published", + "catalog": "default", + "schema_": "v3", + "table": "daily_events_agg", + }, + ) + assert resp.status_code in (200, 201), resp.json() + # Tiny date-dim that both the upstream transform and the + # parent transform link their ``utc_date`` column to. This + # is what makes the bug reproducible: BOTH transforms have + # a dim_link to the same date dim, so the per-CTE + # pushdown loop independently routes the filter to both. + resp = await client.post( + "/nodes/source/", + json={ + "name": "v3.src_agg_barrier_date_dim", + "columns": [{"name": "dateint", "type": "int"}], + "mode": "published", + "catalog": "default", + "schema_": "v3", + "table": "agg_barrier_date", + }, + ) + assert resp.status_code in (200, 201), resp.json() + resp = await client.post( + "/nodes/dimension/", + json={ + "name": "v3.agg_barrier_date_dim", + "query": "SELECT dateint FROM v3.src_agg_barrier_date_dim", + "mode": "published", + "primary_key": ["dateint"], + }, + ) + assert resp.status_code == 201, resp.json() + + # Upstream transform with the aggregation barrier: + # MIN(utc_date) renamed back to utc_date. Linked to the + # date dim via its raw ``utc_date`` column — this link + # is the trap; pre-fix DJ would route the date filter here + # past the MIN. + resp = await client.post( + "/nodes/transform/", + json={ + "name": "v3.agg_barrier_firstplay", + "query": ( + "SELECT account_id, title_id, " + " MIN(utc_date) AS utc_date " + "FROM v3.src_daily_events_agg " + "GROUP BY account_id, title_id" + ), + "mode": "published", + "primary_key": ["account_id", "title_id"], + }, + ) + assert resp.status_code == 201, resp.json() + resp = await client.post( + "/nodes/v3.agg_barrier_firstplay/link", + json={ + "dimension_node": "v3.agg_barrier_date_dim", + "join_type": "inner", + "join_on": ( + "v3.agg_barrier_firstplay.utc_date " + "= v3.agg_barrier_date_dim.dateint" + ), + }, + ) + assert resp.status_code in (200, 201), resp.json() + # Parent transform: reads from the upstream and exposes + # ``utc_date`` at the post-MIN semantics. Filter must + # land at THIS CTE's WHERE, not in the upstream's WHERE. + resp = await client.post( + "/nodes/transform/", + json={ + "name": "v3.agg_barrier_parent", + "query": ( + "SELECT account_id, title_id, utc_date, " + " 1 AS firstplay_count " + "FROM v3.agg_barrier_firstplay" + ), + "mode": "published", + "primary_key": ["account_id", "title_id"], + }, + ) + assert resp.status_code == 201, resp.json() + resp = await client.post( + "/nodes/v3.agg_barrier_parent/link", + json={ + "dimension_node": "v3.agg_barrier_date_dim", + "join_type": "inner", + "join_on": ( + "v3.agg_barrier_parent.utc_date = v3.agg_barrier_date_dim.dateint" + ), + }, + ) + assert resp.status_code in (200, 201), resp.json() + resp = await client.post( + "/nodes/metric/", + json={ + "name": "v3.agg_barrier_total", + "query": ("SELECT SUM(firstplay_count) FROM v3.agg_barrier_parent"), + "mode": "published", + }, + ) + assert resp.status_code == 201, resp.json() + + response = await client.get( + "/sql/measures/v3/", + params={ + "metrics": ["v3.agg_barrier_total"], + "dimensions": ["v3.agg_barrier_parent.account_id"], + "filters": [ + "v3.agg_barrier_date_dim.dateint BETWEEN 20260101 AND 20260131", + ], + }, + ) + assert response.status_code == 200, response.json() + sql = get_first_grain_group(response.json())["sql"] + + # Filter lands at the parent CTE's WHERE (post-MIN + # semantics). Upstream transform's WHERE remains empty — + # NOT injected with a date-range predicate that would + # change the MIN's input set. + assert_sql_equal( + sql, + """ + WITH v3_agg_barrier_date_dim AS ( + SELECT dateint + FROM default.v3.agg_barrier_date + WHERE dateint BETWEEN 20260101 AND 20260131 + ), + v3_agg_barrier_firstplay AS ( + SELECT account_id, title_id, MIN(utc_date) AS utc_date + FROM default.v3.daily_events_agg + GROUP BY account_id, title_id + ), + v3_agg_barrier_parent AS ( + SELECT account_id, title_id, utc_date, 1 AS firstplay_count + FROM v3_agg_barrier_firstplay + WHERE utc_date BETWEEN 20260101 AND 20260131 + ) + SELECT t1.account_id, SUM(t1.firstplay_count) firstplay_count_sum_HASH + FROM v3_agg_barrier_parent t1 + GROUP BY t1.account_id + """, + normalize_aliases=True, + ) From 42948bb39640ae84f033d2ef87c01068432aae41 Mon Sep 17 00:00:00 2001 From: Yian Shang Date: Sun, 24 May 2026 11:34:55 -0700 Subject: [PATCH 02/11] Fix --- .../construction/build_v3/cte.py | 178 +++++++++++++++--- .../build_v3/measures_sql_test.py | 3 +- 2 files changed, 149 insertions(+), 32 deletions(-) diff --git a/datajunction-server/datajunction_server/construction/build_v3/cte.py b/datajunction-server/datajunction_server/construction/build_v3/cte.py index cfd33cda8..97fce6b29 100644 --- a/datajunction-server/datajunction_server/construction/build_v3/cte.py +++ b/datajunction-server/datajunction_server/construction/build_v3/cte.py @@ -4,6 +4,7 @@ from __future__ import annotations +import functools from copy import deepcopy from typing import Optional, cast @@ -1961,6 +1962,111 @@ def _build_select_projection_map( return result +@functools.lru_cache(maxsize=1) +def _aggregate_function_names() -> frozenset[str]: + """Names (upper-cased) of every aggregate function known to DJ. + + Derived from DJ's own function registry so we stay in sync with + new aggregates added there, rather than maintaining a parallel + hand-rolled list. + """ + from datajunction_server.sql.functions import function_registry + + return frozenset( + name + for name, cls in function_registry.items() + if getattr(cls, "is_aggregation", False) + ) + + +def _node_aggregates_on_column( + node: "Node", + bare_cols: set[str], + ctx: "BuildContext", +) -> bool: + """Return True iff ``node``'s body contains an aggregate function + call whose arguments reference any column name in ``bare_cols``. + + Used to detect aggregation barriers along the dep chain so that + filters on a column that is aggregated upstream are not pushed + past the aggregation (which would change result semantics). + """ + if not node.current or not node.current.query: + return False + try: + qast = ctx.get_parsed_query(node) + except Exception: # pragma: no cover + return False + for func in qast.find_all(ast.Function): + try: + fname = func.name.identifier(quotes=False).upper() + except Exception: # pragma: no cover + continue + if fname not in _aggregate_function_names(): + continue + for col in func.find_all(ast.Column): + if col.name and col.name.name in bare_cols: + return True + return False + + +def _safe_pushdown_nodes_for_filter( + parent_node: "Node", + filter_str: str, + filter_column_aliases: dict[str, str], + ctx: "BuildContext", +) -> set[str]: + """Return the set of upstream node names where pushing a given + filter is semantically safe. + + BFS down the dep DAG from ``parent_node``. At each CTE check + whether the node aggregates on the filter's bare column — if it + does, the CTE is an aggregation barrier and neither it nor any + of its dependencies are safe targets for this filter. Otherwise + the CTE is safe and its deps are enqueued. + """ + try: + filter_ast = parse_filter(filter_str) + except Exception: # pragma: no cover + return {parent_node.name} + bare_cols: set[str] = set() + for col in filter_ast.find_all(ast.Column): + full = get_column_full_name(col) + if full and full in filter_column_aliases: # pragma: no branch + bare_cols.add(filter_column_aliases[full]) + if not bare_cols: # pragma: no cover + return {parent_node.name} + + safe: set[str] = set() + visited: set[str] = set() + queue: list["Node"] = [parent_node] + + while queue: + node = queue.pop(0) + if node.name in visited: + continue + visited.add(node.name) + if _node_aggregates_on_column(node, bare_cols, ctx): + # Aggregation barrier — neither this node nor its deps + # are safe through this path. + continue + safe.add(node.name) + if not node.current or not node.current.query: + continue + try: + qast = ctx.get_parsed_query(node) + except Exception: # pragma: no cover + continue + for ref in get_table_references_from_ast(qast): + ref_node = ctx.nodes.get(ref) + if ref_node is None or ref_node.type == NodeType.SOURCE: + continue + if ref_node.name not in visited: + queue.append(ref_node) + + return safe + + def collect_node_ctes( ctx: BuildContext, nodes_to_include: list[Node], @@ -2115,37 +2221,47 @@ def collect_refs(node: Node, visited: set[str]) -> None: if injected_filters and node.name in injected_filters: _inject_filter_into_where(query_ast, injected_filters[node.name]) - # Pushdown is restricted to the parent CTE (the metric's - # direct source) and to dimension CTEs. Pushing into - # upstream transform CTEs is unsafe: a transform may apply - # aggregations (e.g. ``MIN(utc_date) AS first_play_date`` - # then renamed back to ``utc_date``) so a column with the - # same name at a downstream CTE has different semantics - # than at the upstream CTE. Pushing a predicate past the - # aggregation barrier silently changes which rows feed the - # aggregation, and therefore changes the result. DJ - # doesn't currently detect when an upstream transform's - # dim-link column is post-aggregation vs passthrough — so - # we conservatively skip cross-CTE pushdown into - # transforms. Matches v2 behavior. ``parent_node = None`` - # preserves the unrestricted pre-v2-parity behavior for - # callers that haven't opted in yet. - if pushdown is not None and ( - parent_node is None - or node is parent_node - or node.type == NodeType.DIMENSION - ): - injections, consumed = _resolve_pushdown_filters_for_cte( - node, - query_ast, - pushdown.filters, - pushdown.column_aliases, - ctx, - ) - for target_select, filter_ast in injections: - inject_filter_into_select(target_select, filter_ast) - if consumed: - consumed_by_node[node.name] = consumed + # Pushdown into upstream transform CTEs is gated on a + # per-filter aggregation-barrier check. A filter is only + # pushed into node N if there's a passthrough projection + # chain for the filter's column from ``parent_node`` down + # to N — equivalently, no intermediate CTE on the path + # rewrites the column via aggregation (e.g. + # ``MIN(utc_date) AS firstplay_date`` renamed back to + # ``utc_date``). Pushing past such a barrier silently + # changes which rows feed the aggregation. Parent itself + # and dimension CTEs are always allowed (no barrier + # concern at parent's own scope; dim CTEs are simple + # projections of their backing source). + if pushdown is not None: + if parent_node is None: + safe_filters_list = list(pushdown.filters) + elif node is parent_node or node.type == NodeType.DIMENSION: + safe_filters_list = list(pushdown.filters) + else: + safe_filters_list = [ + f + for f in pushdown.filters + if node.name + in _safe_pushdown_nodes_for_filter( + parent_node, + f, + pushdown.column_aliases, + ctx, + ) + ] + if safe_filters_list: + injections, consumed = _resolve_pushdown_filters_for_cte( + node, + query_ast, + safe_filters_list, + pushdown.column_aliases, + ctx, + ) + for target_select, filter_ast in injections: + inject_filter_into_select(target_select, filter_ast) + if consumed: + consumed_by_node[node.name] = consumed ctes.append((cte_name, query_ast)) diff --git a/datajunction-server/tests/construction/build_v3/measures_sql_test.py b/datajunction-server/tests/construction/build_v3/measures_sql_test.py index cea110d57..66eabb99e 100644 --- a/datajunction-server/tests/construction/build_v3/measures_sql_test.py +++ b/datajunction-server/tests/construction/build_v3/measures_sql_test.py @@ -9881,12 +9881,13 @@ async def test_count_distinct_plain_column_component_in_output( body = response.json() gg = get_first_grain_group(body) sql = gg["sql"] + raw_gg = body["grain_groups"][0] # Find the component name (hash-suffixed) emitted for the # ``COUNT(DISTINCT account_id)`` decomposition. It should # be present both as a registered component and as a SQL # column in the projection. - comp_names = [c["name"] for c in gg["components"]] + comp_names = [c["name"] for c in raw_gg["components"]] distinct_comp = next( (n for n in comp_names if n.startswith("account_id_distinct_")), None, From f29600e6ae33e27e6903cb569cce70a443a6a98a Mon Sep 17 00:00:00 2001 From: Yian Shang Date: Sun, 24 May 2026 11:56:04 -0700 Subject: [PATCH 03/11] Fix --- .../build_v3/measures_sql_test.py | 37 +++++++++---------- 1 file changed, 18 insertions(+), 19 deletions(-) diff --git a/datajunction-server/tests/construction/build_v3/measures_sql_test.py b/datajunction-server/tests/construction/build_v3/measures_sql_test.py index 66eabb99e..3d98285e2 100644 --- a/datajunction-server/tests/construction/build_v3/measures_sql_test.py +++ b/datajunction-server/tests/construction/build_v3/measures_sql_test.py @@ -9883,30 +9883,34 @@ async def test_count_distinct_plain_column_component_in_output( sql = gg["sql"] raw_gg = body["grain_groups"][0] - # Find the component name (hash-suffixed) emitted for the - # ``COUNT(DISTINCT account_id)`` decomposition. It should - # be present both as a registered component and as a SQL - # column in the projection. - comp_names = [c["name"] for c in raw_gg["components"]] - distinct_comp = next( - (n for n in comp_names if n.startswith("account_id_distinct_")), + # The fix emits an extra `` AS account_id_distinct_`` + # projection so the hash-suffixed component reference (used by + # downstream consumers like XP/ABlaze) resolves against the + # measures table. Locate the hash-suffixed column in the + # response columns metadata to use in the SQL assertion. + distinct_col = next( + ( + c["name"] + for c in raw_gg["columns"] + if c["name"].startswith("account_id_distinct_") + ), None, ) - assert distinct_comp is not None, ( - f"Expected an ``account_id_distinct_`` component " - f"in the metric decomposition; got {comp_names}" + assert distinct_col is not None, ( + f"Expected an ``account_id_distinct_`` output column " + f"in the measures SQL; got cols={[c['name'] for c in raw_gg['columns']]}" ) assert_sql_equal( sql, f""" WITH v3_distinct_events_xform AS ( - SELECT account_id, event_date, value + SELECT account_id, event_date FROM default.v3.distinct_events ) SELECT t1.event_date, t1.account_id, - t1.account_id AS {distinct_comp} + t1.account_id {distinct_col} FROM v3_distinct_events_xform t1 GROUP BY t1.event_date, t1.account_id """, @@ -10131,18 +10135,13 @@ async def test_filter_does_not_push_past_upstream_aggregation_barrier( assert_sql_equal( sql, """ - WITH v3_agg_barrier_date_dim AS ( - SELECT dateint - FROM default.v3.agg_barrier_date - WHERE dateint BETWEEN 20260101 AND 20260131 - ), - v3_agg_barrier_firstplay AS ( + WITH v3_agg_barrier_firstplay AS ( SELECT account_id, title_id, MIN(utc_date) AS utc_date FROM default.v3.daily_events_agg GROUP BY account_id, title_id ), v3_agg_barrier_parent AS ( - SELECT account_id, title_id, utc_date, 1 AS firstplay_count + SELECT account_id, utc_date, 1 AS firstplay_count FROM v3_agg_barrier_firstplay WHERE utc_date BETWEEN 20260101 AND 20260131 ) From 4a24d0bacf91841f3fdb0ffdea84268f638cb347 Mon Sep 17 00:00:00 2001 From: Yian Shang Date: Sun, 24 May 2026 13:32:07 -0700 Subject: [PATCH 04/11] Fix grain alias + derived expr bug --- .../construction/build_v3/measures.py | 29 +--- .../datajunction_server/sql/decompose.py | 8 +- .../build_v3/measures_sql_test.py | 163 ------------------ 3 files changed, 11 insertions(+), 189 deletions(-) diff --git a/datajunction-server/datajunction_server/construction/build_v3/measures.py b/datajunction-server/datajunction_server/construction/build_v3/measures.py index a7c193c28..c992a8a3d 100644 --- a/datajunction-server/datajunction_server/construction/build_v3/measures.py +++ b/datajunction-server/datajunction_server/construction/build_v3/measures.py @@ -1930,31 +1930,12 @@ def build_grain_group_sql( component_aliases[component.name] = component_alias continue - # LIMITED aggregability with no aggregation (e.g. COUNT DISTINCT). - # These are represented by grain columns rather than aggregated - # expressions. grain_alias was set by _make_component: plain column - # → bare column name; complex expression (CASE, IF, …) → - # component.name. - # - # When grain_alias != component.name (plain-column case), the bare - # column is already in the projection as a grain column, but - # external consumers (e.g. XP/ABlaze) reference the component by - # its hash-suffixed name (the metric's published - # ``derived_expression``). Emit an additional projection that - # aliases the bare column to the component name so the - # component-name reference resolves against the measures table. - # Internal metric SQL still uses ``component_aliases`` to bind to - # the bare column. + # Skip LIMITED aggregability components with no aggregation + # These are represented by grain columns instead. + # grain_alias was set by _make_component: plain column → column name, + # complex expression → component.name. if component.rule.type == Aggregability.LIMITED and not component.aggregation: - grain_alias = component.grain_alias or component.name - component_aliases[component.name] = grain_alias - if grain_alias != component.name: - component_expressions.append( - (component.name, make_column_ref(grain_alias)), - ) - component_metadata.append( - (component.name, component, metric_node), - ) + component_aliases[component.name] = component.grain_alias or component.name continue # Always use component.name for consistency - no special case for single-component diff --git a/datajunction-server/datajunction_server/sql/decompose.py b/datajunction-server/datajunction_server/sql/decompose.py index 2a47e15e0..2ee9eddfa 100644 --- a/datajunction-server/datajunction_server/sql/decompose.py +++ b/datajunction-server/datajunction_server/sql/decompose.py @@ -1120,10 +1120,14 @@ def _decompose( if is_distinct: # DISTINCT aggregations can't be pre-aggregated, so keep original function - # Just replace column references with component names + # and reference the column under the same alias measures.py will + # project it as: ``grain_alias`` for plain-column DISTINCT + # (the bare column name), or the component name for complex + # expressions (where grain_alias falls back to component name). + distinct_ref = components[0].grain_alias or components[0].name combiner_ast: ast.Expression = ast.Function( func.name, - args=[ast.Column(ast.Name(components[0].name))], + args=[ast.Column(ast.Name(distinct_ref))], quantifier=ast.SetQuantifier.Distinct, ) else: diff --git a/datajunction-server/tests/construction/build_v3/measures_sql_test.py b/datajunction-server/tests/construction/build_v3/measures_sql_test.py index 3d98285e2..8ca38d846 100644 --- a/datajunction-server/tests/construction/build_v3/measures_sql_test.py +++ b/datajunction-server/tests/construction/build_v3/measures_sql_test.py @@ -9807,169 +9807,6 @@ async def test_alias_substitution_skips_subquery_alias_collision( normalize_aliases=True, ) - @pytest.mark.asyncio - async def test_count_distinct_plain_column_component_in_output( - self, - client_with_build_v3, - ): - """A ``COUNT(DISTINCT account_id)`` metric's derived - expression references the component by its hash-suffixed - name (e.g. ``account_id_distinct_``), but for - plain-column DISTINCT components measures.py used to - register the bare column as the SQL alias and SKIP emitting - a hash-named projection — leaving downstream consumers - (XP/ABlaze) unable to resolve the column referenced in the - metric's published ``derived_expression``. - - The fix emits an additional `` AS - `` projection so the component-name - reference resolves against the measures table. This test - pins both the SQL shape AND verifies that every column - referenced in the metric's derived_expression actually - appears as a column in the measures SQL output. - """ - client = client_with_build_v3 - - resp = await client.post( - "/nodes/source/", - json={ - "name": "v3.src_distinct_events", - "columns": [ - {"name": "account_id", "type": "int"}, - {"name": "event_date", "type": "int"}, - {"name": "value", "type": "double"}, - ], - "mode": "published", - "catalog": "default", - "schema_": "v3", - "table": "distinct_events", - }, - ) - assert resp.status_code in (200, 201), resp.json() - resp = await client.post( - "/nodes/transform/", - json={ - "name": "v3.distinct_events_xform", - "query": ( - "SELECT account_id, event_date, value FROM v3.src_distinct_events" - ), - "mode": "published", - "primary_key": ["account_id", "event_date"], - }, - ) - assert resp.status_code == 201, resp.json() - resp = await client.post( - "/nodes/metric/", - json={ - "name": "v3.distinct_account_count", - "query": ( - "SELECT COUNT(DISTINCT account_id) FROM v3.distinct_events_xform" - ), - "mode": "published", - }, - ) - assert resp.status_code == 201, resp.json() - - response = await client.get( - "/sql/measures/v3/", - params={ - "metrics": ["v3.distinct_account_count"], - "dimensions": ["v3.distinct_events_xform.event_date"], - }, - ) - assert response.status_code == 200, response.json() - body = response.json() - gg = get_first_grain_group(body) - sql = gg["sql"] - raw_gg = body["grain_groups"][0] - - # The fix emits an extra `` AS account_id_distinct_`` - # projection so the hash-suffixed component reference (used by - # downstream consumers like XP/ABlaze) resolves against the - # measures table. Locate the hash-suffixed column in the - # response columns metadata to use in the SQL assertion. - distinct_col = next( - ( - c["name"] - for c in raw_gg["columns"] - if c["name"].startswith("account_id_distinct_") - ), - None, - ) - assert distinct_col is not None, ( - f"Expected an ``account_id_distinct_`` output column " - f"in the measures SQL; got cols={[c['name'] for c in raw_gg['columns']]}" - ) - - assert_sql_equal( - sql, - f""" - WITH v3_distinct_events_xform AS ( - SELECT account_id, event_date - FROM default.v3.distinct_events - ) - SELECT t1.event_date, - t1.account_id, - t1.account_id {distinct_col} - FROM v3_distinct_events_xform t1 - GROUP BY t1.event_date, t1.account_id - """, - normalize_aliases=True, - ) - - # Every column referenced in the metric's derived - # expression / combiner must exist as a column in the - # measures SQL output, otherwise downstream (XP/ABlaze) - # can't evaluate the expression against the materialized - # measures table. - combiner = next( - f["combiner"] - for f in body["metric_formulas"] - if f["name"] == "v3.distinct_account_count" - ) - output_col_names = {c["name"] for c in gg["columns"]} - # Extract bare identifiers from the combiner — anything - # that looks like a column reference. Drop SQL keywords. - import re - - SQL_KW = { - "COUNT", - "DISTINCT", - "SUM", - "AVG", - "MIN", - "MAX", - "CASE", - "WHEN", - "THEN", - "ELSE", - "END", - "IS", - "NOT", - "NULL", - "AND", - "OR", - "AS", - "TRUE", - "FALSE", - } - referenced = { - tok - for tok in re.findall(r"\b[A-Za-z_][A-Za-z0-9_]*\b", combiner) - if tok.upper() not in SQL_KW - } - # Each non-keyword identifier in the combiner must resolve - # against the measures SQL output columns. - missing = referenced - output_col_names - assert not missing, ( - f"Derived expression references columns missing from " - f"the measures SQL output.\n" - f" combiner: {combiner}\n" - f" referenced: {sorted(referenced)}\n" - f" output cols: {sorted(output_col_names)}\n" - f" missing: {sorted(missing)}" - ) - @pytest.mark.asyncio async def test_filter_does_not_push_past_upstream_aggregation_barrier( self, From 4a52f737a507cd1ea8c6a268d60112ae71a04882 Mon Sep 17 00:00:00 2001 From: Yian Shang Date: Sun, 24 May 2026 13:54:36 -0700 Subject: [PATCH 05/11] Fix --- .../construction/build_v3/cte.py | 158 ++------------- .../construction/build_v3/measures.py | 1 - .../build_v3/measures_sql_test.py | 182 ------------------ 3 files changed, 11 insertions(+), 330 deletions(-) diff --git a/datajunction-server/datajunction_server/construction/build_v3/cte.py b/datajunction-server/datajunction_server/construction/build_v3/cte.py index 97fce6b29..c80d977f8 100644 --- a/datajunction-server/datajunction_server/construction/build_v3/cte.py +++ b/datajunction-server/datajunction_server/construction/build_v3/cte.py @@ -4,7 +4,6 @@ from __future__ import annotations -import functools from copy import deepcopy from typing import Optional, cast @@ -1962,118 +1961,12 @@ def _build_select_projection_map( return result -@functools.lru_cache(maxsize=1) -def _aggregate_function_names() -> frozenset[str]: - """Names (upper-cased) of every aggregate function known to DJ. - - Derived from DJ's own function registry so we stay in sync with - new aggregates added there, rather than maintaining a parallel - hand-rolled list. - """ - from datajunction_server.sql.functions import function_registry - - return frozenset( - name - for name, cls in function_registry.items() - if getattr(cls, "is_aggregation", False) - ) - - -def _node_aggregates_on_column( - node: "Node", - bare_cols: set[str], - ctx: "BuildContext", -) -> bool: - """Return True iff ``node``'s body contains an aggregate function - call whose arguments reference any column name in ``bare_cols``. - - Used to detect aggregation barriers along the dep chain so that - filters on a column that is aggregated upstream are not pushed - past the aggregation (which would change result semantics). - """ - if not node.current or not node.current.query: - return False - try: - qast = ctx.get_parsed_query(node) - except Exception: # pragma: no cover - return False - for func in qast.find_all(ast.Function): - try: - fname = func.name.identifier(quotes=False).upper() - except Exception: # pragma: no cover - continue - if fname not in _aggregate_function_names(): - continue - for col in func.find_all(ast.Column): - if col.name and col.name.name in bare_cols: - return True - return False - - -def _safe_pushdown_nodes_for_filter( - parent_node: "Node", - filter_str: str, - filter_column_aliases: dict[str, str], - ctx: "BuildContext", -) -> set[str]: - """Return the set of upstream node names where pushing a given - filter is semantically safe. - - BFS down the dep DAG from ``parent_node``. At each CTE check - whether the node aggregates on the filter's bare column — if it - does, the CTE is an aggregation barrier and neither it nor any - of its dependencies are safe targets for this filter. Otherwise - the CTE is safe and its deps are enqueued. - """ - try: - filter_ast = parse_filter(filter_str) - except Exception: # pragma: no cover - return {parent_node.name} - bare_cols: set[str] = set() - for col in filter_ast.find_all(ast.Column): - full = get_column_full_name(col) - if full and full in filter_column_aliases: # pragma: no branch - bare_cols.add(filter_column_aliases[full]) - if not bare_cols: # pragma: no cover - return {parent_node.name} - - safe: set[str] = set() - visited: set[str] = set() - queue: list["Node"] = [parent_node] - - while queue: - node = queue.pop(0) - if node.name in visited: - continue - visited.add(node.name) - if _node_aggregates_on_column(node, bare_cols, ctx): - # Aggregation barrier — neither this node nor its deps - # are safe through this path. - continue - safe.add(node.name) - if not node.current or not node.current.query: - continue - try: - qast = ctx.get_parsed_query(node) - except Exception: # pragma: no cover - continue - for ref in get_table_references_from_ast(qast): - ref_node = ctx.nodes.get(ref) - if ref_node is None or ref_node.type == NodeType.SOURCE: - continue - if ref_node.name not in visited: - queue.append(ref_node) - - return safe - - def collect_node_ctes( ctx: BuildContext, nodes_to_include: list[Node], needed_columns_by_node: Optional[dict[str, set[str]]] = None, injected_filters: Optional[dict[str, ast.Expression]] = None, pushdown: Optional[PushdownFilters] = None, - parent_node: Optional[Node] = None, ) -> tuple[list[tuple[str, ast.Query]], list[str], dict[str, set[str]]]: """ Collect CTEs for all non-source nodes, recursively expanding table references. @@ -2221,47 +2114,18 @@ def collect_refs(node: Node, visited: set[str]) -> None: if injected_filters and node.name in injected_filters: _inject_filter_into_where(query_ast, injected_filters[node.name]) - # Pushdown into upstream transform CTEs is gated on a - # per-filter aggregation-barrier check. A filter is only - # pushed into node N if there's a passthrough projection - # chain for the filter's column from ``parent_node`` down - # to N — equivalently, no intermediate CTE on the path - # rewrites the column via aggregation (e.g. - # ``MIN(utc_date) AS firstplay_date`` renamed back to - # ``utc_date``). Pushing past such a barrier silently - # changes which rows feed the aggregation. Parent itself - # and dimension CTEs are always allowed (no barrier - # concern at parent's own scope; dim CTEs are simple - # projections of their backing source). if pushdown is not None: - if parent_node is None: - safe_filters_list = list(pushdown.filters) - elif node is parent_node or node.type == NodeType.DIMENSION: - safe_filters_list = list(pushdown.filters) - else: - safe_filters_list = [ - f - for f in pushdown.filters - if node.name - in _safe_pushdown_nodes_for_filter( - parent_node, - f, - pushdown.column_aliases, - ctx, - ) - ] - if safe_filters_list: - injections, consumed = _resolve_pushdown_filters_for_cte( - node, - query_ast, - safe_filters_list, - pushdown.column_aliases, - ctx, - ) - for target_select, filter_ast in injections: - inject_filter_into_select(target_select, filter_ast) - if consumed: - consumed_by_node[node.name] = consumed + injections, consumed = _resolve_pushdown_filters_for_cte( + node, + query_ast, + list(pushdown.filters), + pushdown.column_aliases, + ctx, + ) + for target_select, filter_ast in injections: + inject_filter_into_select(target_select, filter_ast) + if consumed: + consumed_by_node[node.name] = consumed ctes.append((cte_name, query_ast)) diff --git a/datajunction-server/datajunction_server/construction/build_v3/measures.py b/datajunction-server/datajunction_server/construction/build_v3/measures.py index c992a8a3d..c2fd8df64 100644 --- a/datajunction-server/datajunction_server/construction/build_v3/measures.py +++ b/datajunction-server/datajunction_server/construction/build_v3/measures.py @@ -1434,7 +1434,6 @@ def build_select_ast( ) if all_filters else None, - parent_node=parent_node, ) # Surface all CTE-consumed filters so the metrics layer's outer WHERE # can skip re-applying them on top of the aggregation CTEs. diff --git a/datajunction-server/tests/construction/build_v3/measures_sql_test.py b/datajunction-server/tests/construction/build_v3/measures_sql_test.py index 8ca38d846..8d267bdea 100644 --- a/datajunction-server/tests/construction/build_v3/measures_sql_test.py +++ b/datajunction-server/tests/construction/build_v3/measures_sql_test.py @@ -9806,185 +9806,3 @@ async def test_alias_substitution_skips_subquery_alias_collision( """, normalize_aliases=True, ) - - @pytest.mark.asyncio - async def test_filter_does_not_push_past_upstream_aggregation_barrier( - self, - client_with_build_v3, - ): - """Filter pushdown must not cross an upstream transform's - aggregation barrier (e.g. ``MIN(utc_date) AS firstplay_date`` - renamed back to ``utc_date``) — the column has the same - name at both layers but different semantics, and pushing - a predicate past the MIN changes which rows feed the - aggregation, silently inflating the result. - - Real regression from a first-play metric where v3 pushed - ``utc_date BETWEEN ...`` into the upstream - ``profile_game_engagement`` CTE, turning - "lifetime-first-play in window" into "any in-window play" - — roughly doubling the metric. - - Cross-CTE pushdown into upstream transform CTEs is now - gated on ``node is parent_node or node.type == - DIMENSION``. This test pins that the upstream transform's - WHERE only carries its authored predicate (no injected - date filter), while the parent CTE's WHERE correctly - carries the date filter. - """ - client = client_with_build_v3 - - # Upstream transform with an aggregation that renames the - # filter column post-MIN. Real-world shape: a per-account - # "first event date" derived from a per-account daily - # event source. - resp = await client.post( - "/nodes/source/", - json={ - "name": "v3.src_daily_events_agg", - "columns": [ - {"name": "account_id", "type": "int"}, - {"name": "title_id", "type": "int"}, - {"name": "utc_date", "type": "int"}, - ], - "mode": "published", - "catalog": "default", - "schema_": "v3", - "table": "daily_events_agg", - }, - ) - assert resp.status_code in (200, 201), resp.json() - # Tiny date-dim that both the upstream transform and the - # parent transform link their ``utc_date`` column to. This - # is what makes the bug reproducible: BOTH transforms have - # a dim_link to the same date dim, so the per-CTE - # pushdown loop independently routes the filter to both. - resp = await client.post( - "/nodes/source/", - json={ - "name": "v3.src_agg_barrier_date_dim", - "columns": [{"name": "dateint", "type": "int"}], - "mode": "published", - "catalog": "default", - "schema_": "v3", - "table": "agg_barrier_date", - }, - ) - assert resp.status_code in (200, 201), resp.json() - resp = await client.post( - "/nodes/dimension/", - json={ - "name": "v3.agg_barrier_date_dim", - "query": "SELECT dateint FROM v3.src_agg_barrier_date_dim", - "mode": "published", - "primary_key": ["dateint"], - }, - ) - assert resp.status_code == 201, resp.json() - - # Upstream transform with the aggregation barrier: - # MIN(utc_date) renamed back to utc_date. Linked to the - # date dim via its raw ``utc_date`` column — this link - # is the trap; pre-fix DJ would route the date filter here - # past the MIN. - resp = await client.post( - "/nodes/transform/", - json={ - "name": "v3.agg_barrier_firstplay", - "query": ( - "SELECT account_id, title_id, " - " MIN(utc_date) AS utc_date " - "FROM v3.src_daily_events_agg " - "GROUP BY account_id, title_id" - ), - "mode": "published", - "primary_key": ["account_id", "title_id"], - }, - ) - assert resp.status_code == 201, resp.json() - resp = await client.post( - "/nodes/v3.agg_barrier_firstplay/link", - json={ - "dimension_node": "v3.agg_barrier_date_dim", - "join_type": "inner", - "join_on": ( - "v3.agg_barrier_firstplay.utc_date " - "= v3.agg_barrier_date_dim.dateint" - ), - }, - ) - assert resp.status_code in (200, 201), resp.json() - # Parent transform: reads from the upstream and exposes - # ``utc_date`` at the post-MIN semantics. Filter must - # land at THIS CTE's WHERE, not in the upstream's WHERE. - resp = await client.post( - "/nodes/transform/", - json={ - "name": "v3.agg_barrier_parent", - "query": ( - "SELECT account_id, title_id, utc_date, " - " 1 AS firstplay_count " - "FROM v3.agg_barrier_firstplay" - ), - "mode": "published", - "primary_key": ["account_id", "title_id"], - }, - ) - assert resp.status_code == 201, resp.json() - resp = await client.post( - "/nodes/v3.agg_barrier_parent/link", - json={ - "dimension_node": "v3.agg_barrier_date_dim", - "join_type": "inner", - "join_on": ( - "v3.agg_barrier_parent.utc_date = v3.agg_barrier_date_dim.dateint" - ), - }, - ) - assert resp.status_code in (200, 201), resp.json() - resp = await client.post( - "/nodes/metric/", - json={ - "name": "v3.agg_barrier_total", - "query": ("SELECT SUM(firstplay_count) FROM v3.agg_barrier_parent"), - "mode": "published", - }, - ) - assert resp.status_code == 201, resp.json() - - response = await client.get( - "/sql/measures/v3/", - params={ - "metrics": ["v3.agg_barrier_total"], - "dimensions": ["v3.agg_barrier_parent.account_id"], - "filters": [ - "v3.agg_barrier_date_dim.dateint BETWEEN 20260101 AND 20260131", - ], - }, - ) - assert response.status_code == 200, response.json() - sql = get_first_grain_group(response.json())["sql"] - - # Filter lands at the parent CTE's WHERE (post-MIN - # semantics). Upstream transform's WHERE remains empty — - # NOT injected with a date-range predicate that would - # change the MIN's input set. - assert_sql_equal( - sql, - """ - WITH v3_agg_barrier_firstplay AS ( - SELECT account_id, title_id, MIN(utc_date) AS utc_date - FROM default.v3.daily_events_agg - GROUP BY account_id, title_id - ), - v3_agg_barrier_parent AS ( - SELECT account_id, utc_date, 1 AS firstplay_count - FROM v3_agg_barrier_firstplay - WHERE utc_date BETWEEN 20260101 AND 20260131 - ) - SELECT t1.account_id, SUM(t1.firstplay_count) firstplay_count_sum_HASH - FROM v3_agg_barrier_parent t1 - GROUP BY t1.account_id - """, - normalize_aliases=True, - ) From f70ab2c922286bb7346f5e6c38c003725b697f4b Mon Sep 17 00:00:00 2001 From: Yian Shang Date: Sun, 24 May 2026 14:17:08 -0700 Subject: [PATCH 06/11] Fix limited agg metrics --- .../construction/build_v3/cte.py | 4 +- .../construction/build_v3/measures.py | 65 ++++- .../datajunction_server/sql/decompose.py | 8 +- .../build_v3/measures_sql_test.py | 262 ++++++++++++++++++ 4 files changed, 321 insertions(+), 18 deletions(-) diff --git a/datajunction-server/datajunction_server/construction/build_v3/cte.py b/datajunction-server/datajunction_server/construction/build_v3/cte.py index c80d977f8..ef56fe959 100644 --- a/datajunction-server/datajunction_server/construction/build_v3/cte.py +++ b/datajunction-server/datajunction_server/construction/build_v3/cte.py @@ -2114,11 +2114,11 @@ def collect_refs(node: Node, visited: set[str]) -> None: if injected_filters and node.name in injected_filters: _inject_filter_into_where(query_ast, injected_filters[node.name]) - if pushdown is not None: + if pushdown: injections, consumed = _resolve_pushdown_filters_for_cte( node, query_ast, - list(pushdown.filters), + pushdown.filters, pushdown.column_aliases, ctx, ) diff --git a/datajunction-server/datajunction_server/construction/build_v3/measures.py b/datajunction-server/datajunction_server/construction/build_v3/measures.py index c2fd8df64..b2180d899 100644 --- a/datajunction-server/datajunction_server/construction/build_v3/measures.py +++ b/datajunction-server/datajunction_server/construction/build_v3/measures.py @@ -164,6 +164,48 @@ def _parse_type_string(type_str: str | None) -> ct.ColumnType | None: return _TYPE_STRING_MAP.get(normalized) +def register_limited_component( + component: MetricComponent, + metric_node: Node, + component_aliases: dict[str, str], + component_expressions: list[tuple[str, ast.Expression]], + component_metadata: list[tuple[str, MetricComponent, Node]], +) -> None: + """Register a LIMITED-aggregability component (e.g. COUNT DISTINCT) + in the measures-SQL projection. + + The component's grain_alias (set by ``_make_component`` in + ``sql/decompose.py``) is the bare column for plain-column DISTINCT + and the hashed component name for complex expressions. + + When the two differ (plain-column case), the bare column is already + in the projection as a grain column, but external consumers + (XP/ABlaze) reference the component by its hash-suffixed name — + that's what the metric's persisted ``derived_expression`` carries, + and v2 pre-agg materialization writes the column under the hashed + name too. This helper emits an additional `` AS + `` projection so the hashed reference resolves + against the live measures SQL output, and routes + ``component_aliases`` at the hashed name so downstream combiner + rewriters (metrics.py, cube_matcher) stay consistent. + + Both the merged-grain-group path and the per-grain-group path in + ``build_grain_group_sql`` MUST go through this helper so the + contract stays in one place. + """ + grain_alias = component.grain_alias or component.name + if grain_alias != component.name: + component_aliases[component.name] = component.name + component_expressions.append( + (component.name, make_column_ref(grain_alias)), + ) + component_metadata.append( + (component.name, component, metric_node), + ) + else: + component_aliases[component.name] = grain_alias + + def infer_component_type( component: MetricComponent, metric_type: str, @@ -1910,11 +1952,12 @@ def build_grain_group_sql( Aggregability.FULL, ) if orig_agg == Aggregability.LIMITED: - # LIMITED: grain column is already in GROUP BY, no output needed. - # grain_alias was set by _make_component: plain column → column name, - # complex expression → component.name. - component_aliases[component.name] = ( - component.grain_alias or component.name + register_limited_component( + component, + metric_node, + component_aliases, + component_expressions, + component_metadata, ) continue else: @@ -1929,12 +1972,14 @@ def build_grain_group_sql( component_aliases[component.name] = component_alias continue - # Skip LIMITED aggregability components with no aggregation - # These are represented by grain columns instead. - # grain_alias was set by _make_component: plain column → column name, - # complex expression → component.name. if component.rule.type == Aggregability.LIMITED and not component.aggregation: - component_aliases[component.name] = component.grain_alias or component.name + register_limited_component( + component, + metric_node, + component_aliases, + component_expressions, + component_metadata, + ) continue # Always use component.name for consistency - no special case for single-component diff --git a/datajunction-server/datajunction_server/sql/decompose.py b/datajunction-server/datajunction_server/sql/decompose.py index 2ee9eddfa..2a47e15e0 100644 --- a/datajunction-server/datajunction_server/sql/decompose.py +++ b/datajunction-server/datajunction_server/sql/decompose.py @@ -1120,14 +1120,10 @@ def _decompose( if is_distinct: # DISTINCT aggregations can't be pre-aggregated, so keep original function - # and reference the column under the same alias measures.py will - # project it as: ``grain_alias`` for plain-column DISTINCT - # (the bare column name), or the component name for complex - # expressions (where grain_alias falls back to component name). - distinct_ref = components[0].grain_alias or components[0].name + # Just replace column references with component names combiner_ast: ast.Expression = ast.Function( func.name, - args=[ast.Column(ast.Name(distinct_ref))], + args=[ast.Column(ast.Name(components[0].name))], quantifier=ast.SetQuantifier.Distinct, ) else: diff --git a/datajunction-server/tests/construction/build_v3/measures_sql_test.py b/datajunction-server/tests/construction/build_v3/measures_sql_test.py index 8d267bdea..b5413222a 100644 --- a/datajunction-server/tests/construction/build_v3/measures_sql_test.py +++ b/datajunction-server/tests/construction/build_v3/measures_sql_test.py @@ -9806,3 +9806,265 @@ async def test_alias_substitution_skips_subquery_alias_collision( """, normalize_aliases=True, ) + + @pytest.mark.asyncio + async def test_count_distinct_plain_column_projects_hashed_alias( + self, + client_with_build_v3, + ): + """``COUNT(DISTINCT plain_col)`` measures-SQL output must + project the column under BOTH the bare grain name AND the + hash-suffixed component identity (`` AS _distinct_``). + + The metric's persisted ``derived_expression``, the v2 pre-agg + materialization column, and the cube column all reference the + hashed name as the canonical identity. Without the extra + projection, downstream consumers (XP/ABlaze) reading the live + measures SQL output can't resolve the column the + ``derived_expression`` references. The combiner in the + ``/sql/measures/v3`` response must also reference the hashed + name for consistency with the persisted expression. + """ + client = client_with_build_v3 + resp = await client.post( + "/nodes/source/", + json={ + "name": "v3.src_distinct_plain", + "columns": [ + {"name": "account_id", "type": "int"}, + {"name": "event_date", "type": "int"}, + ], + "mode": "published", + "catalog": "default", + "schema_": "v3", + "table": "distinct_plain_events", + }, + ) + assert resp.status_code in (200, 201), resp.json() + resp = await client.post( + "/nodes/transform/", + json={ + "name": "v3.distinct_plain_xform", + "query": ("SELECT account_id, event_date FROM v3.src_distinct_plain"), + "mode": "published", + "primary_key": ["account_id", "event_date"], + }, + ) + assert resp.status_code == 201, resp.json() + resp = await client.post( + "/nodes/metric/", + json={ + "name": "v3.distinct_plain_count", + "query": ( + "SELECT COUNT(DISTINCT account_id) FROM v3.distinct_plain_xform" + ), + "mode": "published", + }, + ) + assert resp.status_code == 201, resp.json() + + response = await client.get( + "/sql/measures/v3/", + params={ + "metrics": ["v3.distinct_plain_count"], + "dimensions": ["v3.distinct_plain_xform.event_date"], + }, + ) + assert response.status_code == 200, response.json() + body = response.json() + gg = body["grain_groups"][0] + + # Locate the hash-suffixed identity column emitted by the fix. + hashed_col = next( + ( + c["name"] + for c in gg["columns"] + if c["name"].startswith("account_id_distinct_") + ), + None, + ) + assert hashed_col is not None, ( + f"Expected an ``account_id_distinct_`` output column; " + f"got {[c['name'] for c in gg['columns']]}" + ) + + # SQL projects BOTH ``t1.account_id`` (grain/dim usage) AND + # ``t1.account_id AS account_id_distinct_`` (identity). + assert_sql_equal( + gg["sql"], + f""" + WITH v3_distinct_plain_xform AS ( + SELECT account_id, event_date + FROM default.v3.distinct_plain_events + ) + SELECT t1.event_date, + t1.account_id, + t1.account_id {hashed_col} + FROM v3_distinct_plain_xform t1 + GROUP BY t1.event_date, t1.account_id + """, + normalize_aliases=True, + ) + + # Combiner in the response and the metric node's persisted + # ``derived_expression`` both reference the hashed identity + # exactly — full equality so any drift (extra qualification, + # different name, accidental rewrite) breaks the test. + # Live SQL upper-cases the function name; the persisted + # ``derived_expression`` keeps the lowercase form from the + # original metric query. SQL is case-insensitive for + # function names so this is fine semantically. + assert body["metric_formulas"][0]["combiner"] == ( + f"COUNT( DISTINCT {hashed_col})" + ) + metric_resp = await client.get("/metrics/v3.distinct_plain_count") + assert metric_resp.status_code == 200 + assert metric_resp.json()["derived_expression"] == ( + f"count( DISTINCT {hashed_col})" + ) + + @pytest.mark.asyncio + async def test_count_distinct_in_merged_grain_group_projects_hashed_alias( + self, + client_with_build_v3, + ): + """Same contract as the single-metric case must hold in the + ``is_merged=True`` grain-group path (multiple metrics with + mixed aggregabilities combined into one measures query). + + Regression: the two LIMITED-component branches in + ``build_grain_group_sql`` were independent — fixing only the + non-merged path left the merged path emitting a bare-column + projection + bare-column combiner, breaking consistency for + cross-fact / multi-metric requests. Both branches now route + through ``register_limited_component``; this test pins both + at once. + """ + client = client_with_build_v3 + resp = await client.post( + "/nodes/source/", + json={ + "name": "v3.src_merged_distinct", + "columns": [ + {"name": "account_id", "type": "int"}, + {"name": "event_date", "type": "int"}, + {"name": "is_active", "type": "boolean"}, + {"name": "amount", "type": "double"}, + ], + "mode": "published", + "catalog": "default", + "schema_": "v3", + "table": "merged_distinct_events", + }, + ) + assert resp.status_code in (200, 201), resp.json() + resp = await client.post( + "/nodes/transform/", + json={ + "name": "v3.merged_distinct_xform", + "query": ( + "SELECT account_id, event_date, is_active, amount " + "FROM v3.src_merged_distinct" + ), + "mode": "published", + "primary_key": ["account_id", "event_date"], + }, + ) + assert resp.status_code == 201, resp.json() + # Plain-column DISTINCT (LIMITED, grain_alias=bare). + resp = await client.post( + "/nodes/metric/", + json={ + "name": "v3.merged_distinct_accounts", + "query": ( + "SELECT COUNT(DISTINCT account_id) FROM v3.merged_distinct_xform" + ), + "mode": "published", + }, + ) + assert resp.status_code == 201, resp.json() + # Complex DISTINCT (LIMITED, grain_alias==component.name). + resp = await client.post( + "/nodes/metric/", + json={ + "name": "v3.merged_distinct_active_accounts", + "query": ( + "SELECT COUNT(DISTINCT CASE WHEN is_active " + "THEN account_id ELSE NULL END) " + "FROM v3.merged_distinct_xform" + ), + "mode": "published", + }, + ) + assert resp.status_code == 201, resp.json() + # FULL aggregability metric — forces the grain group to merge + # across aggregability levels (drives is_merged=True). + resp = await client.post( + "/nodes/metric/", + json={ + "name": "v3.merged_distinct_total_amount", + "query": "SELECT SUM(amount) FROM v3.merged_distinct_xform", + "mode": "published", + }, + ) + assert resp.status_code == 201, resp.json() + + response = await client.get( + "/sql/measures/v3/", + params={ + "metrics": [ + "v3.merged_distinct_accounts", + "v3.merged_distinct_active_accounts", + "v3.merged_distinct_total_amount", + ], + "dimensions": ["v3.merged_distinct_xform.event_date"], + }, + ) + assert response.status_code == 200, response.json() + body = response.json() + + # The merged grain group is the LIMITED one (FULL gets merged + # in at the LIMITED level). Locate the hashed identity + # columns we'll substitute into the expected SQL & combiners. + gg = next( + g + for g in body["grain_groups"] + if "v3.merged_distinct_accounts" in g["metrics"] + ) + cols = [c["name"] for c in gg["columns"]] + plain_hashed = next(n for n in cols if n.startswith("account_id_distinct_")) + complex_hashed = next( + n + for n in cols + if n.startswith("account_id_distinct_case_when_") + or (n.startswith("account_id_distinct_") and n != plain_hashed) + ) + amount_sum_hashed = next(n for n in cols if n.startswith("amount_sum_")) + + assert_sql_equal( + gg["sql"], + f""" + WITH v3_merged_distinct_xform AS ( + SELECT account_id, event_date, is_active, amount + FROM default.v3.merged_distinct_events + ) + SELECT t1.event_date, + t1.account_id, + CASE WHEN t1.is_active THEN t1.account_id ELSE NULL END {complex_hashed}, + SUM(t1.amount) {amount_sum_hashed}, + t1.account_id {plain_hashed} + FROM v3_merged_distinct_xform t1 + GROUP BY t1.event_date, t1.account_id, + CASE WHEN t1.is_active THEN t1.account_id ELSE NULL END + """, + normalize_aliases=True, + ) + + combiners_by_metric = { + f["name"]: f["combiner"] for f in body["metric_formulas"] + } + assert combiners_by_metric == { + "v3.merged_distinct_accounts": f"count( DISTINCT {plain_hashed})", + "v3.merged_distinct_active_accounts": f"count( DISTINCT {complex_hashed})", + "v3.merged_distinct_total_amount": f"SUM({amount_sum_hashed})", + } From fd77b8fbebdccb208f645563646501c5aee3d041 Mon Sep 17 00:00:00 2001 From: Yian Shang Date: Sun, 24 May 2026 15:25:06 -0700 Subject: [PATCH 07/11] Fix --- .../build_v3/measures_sql_test.py | 37 ++++++++----------- 1 file changed, 15 insertions(+), 22 deletions(-) diff --git a/datajunction-server/tests/construction/build_v3/measures_sql_test.py b/datajunction-server/tests/construction/build_v3/measures_sql_test.py index b5413222a..85235515b 100644 --- a/datajunction-server/tests/construction/build_v3/measures_sql_test.py +++ b/datajunction-server/tests/construction/build_v3/measures_sql_test.py @@ -9908,20 +9908,15 @@ async def test_count_distinct_plain_column_projects_hashed_alias( # Combiner in the response and the metric node's persisted # ``derived_expression`` both reference the hashed identity - # exactly — full equality so any drift (extra qualification, - # different name, accidental rewrite) breaks the test. - # Live SQL upper-cases the function name; the persisted - # ``derived_expression`` keeps the lowercase form from the - # original metric query. SQL is case-insensitive for - # function names so this is fine semantically. - assert body["metric_formulas"][0]["combiner"] == ( - f"COUNT( DISTINCT {hashed_col})" - ) + # exactly. Both currently upper-case the function name; the + # contract is *equality between the two* — if either side + # drifts (different qualification, different case-folding, + # accidental rewrite), this test fails. + expected_combiner = f"COUNT( DISTINCT {hashed_col})" + assert body["metric_formulas"][0]["combiner"] == expected_combiner metric_resp = await client.get("/metrics/v3.distinct_plain_count") assert metric_resp.status_code == 200 - assert metric_resp.json()["derived_expression"] == ( - f"count( DISTINCT {hashed_col})" - ) + assert metric_resp.json()["derived_expression"] == expected_combiner @pytest.mark.asyncio async def test_count_distinct_in_merged_grain_group_projects_hashed_alias( @@ -10032,12 +10027,11 @@ async def test_count_distinct_in_merged_grain_group_projects_hashed_alias( if "v3.merged_distinct_accounts" in g["metrics"] ) cols = [c["name"] for c in gg["columns"]] - plain_hashed = next(n for n in cols if n.startswith("account_id_distinct_")) + plain_hashed = next( + n for n in cols if n.startswith("account_id_distinct_") + ) complex_hashed = next( - n - for n in cols - if n.startswith("account_id_distinct_case_when_") - or (n.startswith("account_id_distinct_") and n != plain_hashed) + n for n in cols if n.startswith("is_active_account_id_distinct_") ) amount_sum_hashed = next(n for n in cols if n.startswith("amount_sum_")) @@ -10049,13 +10043,12 @@ async def test_count_distinct_in_merged_grain_group_projects_hashed_alias( FROM default.v3.merged_distinct_events ) SELECT t1.event_date, - t1.account_id, CASE WHEN t1.is_active THEN t1.account_id ELSE NULL END {complex_hashed}, + t1.account_id, SUM(t1.amount) {amount_sum_hashed}, t1.account_id {plain_hashed} FROM v3_merged_distinct_xform t1 - GROUP BY t1.event_date, t1.account_id, - CASE WHEN t1.is_active THEN t1.account_id ELSE NULL END + GROUP BY t1.event_date, {complex_hashed}, t1.account_id """, normalize_aliases=True, ) @@ -10064,7 +10057,7 @@ async def test_count_distinct_in_merged_grain_group_projects_hashed_alias( f["name"]: f["combiner"] for f in body["metric_formulas"] } assert combiners_by_metric == { - "v3.merged_distinct_accounts": f"count( DISTINCT {plain_hashed})", - "v3.merged_distinct_active_accounts": f"count( DISTINCT {complex_hashed})", + "v3.merged_distinct_accounts": f"COUNT( DISTINCT {plain_hashed})", + "v3.merged_distinct_active_accounts": f"COUNT( DISTINCT {complex_hashed})", "v3.merged_distinct_total_amount": f"SUM({amount_sum_hashed})", } From 01a64e5693c8764e2657c41461e6e6a9e6e6b308 Mon Sep 17 00:00:00 2001 From: Yian Shang Date: Sun, 24 May 2026 18:56:11 -0700 Subject: [PATCH 08/11] Fix --- .../construction/build_v3/combiners.py | 7 +- .../construction/build_v3/cube_matcher.py | 14 +- .../construction/build_v3/measures.py | 62 +++-- datajunction-server/tests/api/cubes_test.py | 9 + .../build_v3/measures_sql_test.py | 223 +++++++++++++++--- .../construction/build_v3/metrics_sql_test.py | 132 ++++++----- 6 files changed, 334 insertions(+), 113 deletions(-) diff --git a/datajunction-server/datajunction_server/construction/build_v3/combiners.py b/datajunction-server/datajunction_server/construction/build_v3/combiners.py index 8aa092ea9..033c92478 100644 --- a/datajunction-server/datajunction_server/construction/build_v3/combiners.py +++ b/datajunction-server/datajunction_server/construction/build_v3/combiners.py @@ -846,12 +846,17 @@ def _build_grain_group_from_preagg_table( if col.semantic_type in ("metric_component", "measure", "metric"): col_ref = ast.Column(name=ast.Name(col.name)) - # Find the component to get the merge function + # Find the component to get the merge function. The third + # clause matches legacy pre-agg / grain-group shapes where + # the LIMITED-DISTINCT column lived under the bare grain + # name (before the ` AS ` projection landed) — + # keeps re-aggregation correct against stale materializations. merge_func = None for comp in original_gg.components: if ( # pragma: no branch comp.name == col.name or original_gg.component_aliases.get(comp.name) == col.name + or comp.grain_alias == col.name ): merge_func = comp.merge break diff --git a/datajunction-server/datajunction_server/construction/build_v3/cube_matcher.py b/datajunction-server/datajunction_server/construction/build_v3/cube_matcher.py index 59c351fd4..880d64ffc 100644 --- a/datajunction-server/datajunction_server/construction/build_v3/cube_matcher.py +++ b/datajunction-server/datajunction_server/construction/build_v3/cube_matcher.py @@ -486,7 +486,19 @@ def build_synthetic_grain_group( for comp in decomposed.components: if comp.name not in component_aliases: # pragma: no branch - cube_col_name = mat_col_lookup.get(comp.name, comp.name) + # Legacy compat: cubes materialized before LIMITED-DISTINCT + # gained its ` AS ` projection had the column + # stored under the bare grain name (e.g. ``order_id``) + # rather than the hashed identity (``order_id_distinct_``). + # Try the hashed name first (current shape), then fall back + # to the grain_alias (legacy shape) before defaulting to + # ``comp.name``. Keeps stale cube configs queryable until + # they're re-materialized. + cube_col_name = mat_col_lookup.get(comp.name) + if cube_col_name is None and comp.grain_alias: + cube_col_name = mat_col_lookup.get(comp.grain_alias) + if cube_col_name is None: + cube_col_name = comp.name component_aliases[comp.name] = cube_col_name all_components.append(comp) diff --git a/datajunction-server/datajunction_server/construction/build_v3/measures.py b/datajunction-server/datajunction_server/construction/build_v3/measures.py index b2180d899..4db45aead 100644 --- a/datajunction-server/datajunction_server/construction/build_v3/measures.py +++ b/datajunction-server/datajunction_server/construction/build_v3/measures.py @@ -179,20 +179,38 @@ def register_limited_component( and the hashed component name for complex expressions. When the two differ (plain-column case), the bare column is already - in the projection as a grain column, but external consumers - (XP/ABlaze) reference the component by its hash-suffixed name — - that's what the metric's persisted ``derived_expression`` carries, - and v2 pre-agg materialization writes the column under the hashed - name too. This helper emits an additional `` AS - `` projection so the hashed reference resolves - against the live measures SQL output, and routes - ``component_aliases`` at the hashed name so downstream combiner - rewriters (metrics.py, cube_matcher) stay consistent. + in the projection as a grain column, but the component's + hash-suffixed name is the canonical identity — it's what the + metric's persisted ``derived_expression`` carries, and v2 pre-agg + materialization writes the column under the hashed name. This + helper emits an additional `` AS `` + projection so the hashed reference resolves against the live + measures SQL output, and routes ``component_aliases`` at the + hashed name so downstream combiner rewriters (metrics.py, + cube_matcher) stay consistent. Both the merged-grain-group path and the per-grain-group path in ``build_grain_group_sql`` MUST go through this helper so the contract stays in one place. """ + # LIMITED-no-aggregation components currently always have + # ``merge=None`` (DISTINCT can't be pre-aggregated, see + # decompose.py:1199). Downstream re-aggregation paths + # (combiners.py:_build_grain_group_from_preagg_table) rely on + # ``merge_func is None`` to skip wrapping the projection in a + # merge function — if a future LIMITED variant ever sets a non- + # None merge, that path would silently wrap the merge over the + # bare grain column, producing wrong SQL like + # ``HLL_UNION_AGG(account_id)``. Fail loud here so the gap is + # discovered at the source rather than as a wrong-result bug. + assert component.merge is None, ( + f"LIMITED-aggregability component {component.name!r} has " + f"merge={component.merge!r}; the downstream pre-agg " + f"combiner path assumes LIMITED.merge is None to avoid " + f"wrapping a re-aggregation over a non-component column. " + f"Update both _build_grain_group_from_preagg_table and " + f"this helper if you're adding a LIMITED-with-merge variant." + ) grain_alias = component.grain_alias or component.name if grain_alias != component.name: component_aliases[component.name] = component.name @@ -1789,10 +1807,14 @@ def build_grain_group_from_preagg( f"Component {component.name} not found in pre-agg {preagg.id}", ) - # Always use the measure column name (component hash) as the output alias - # This ensures consistency with the non-preagg path - output_alias = measure_col - + # Always alias the output as ``component.name`` (the hashed + # identity) so the CTE's output column name matches the + # metric's persisted ``derived_expression`` regardless of how + # the pre-agg physically stored it. Legacy pre-aggs + # materialized before the LIMITED-DISTINCT change carry the + # column under the bare grain name; we read that bare column + # and alias it back to the hashed identity here. + output_alias = component.name component_aliases[component.name] = output_alias col_ref = ast.Column(name=ast.Name(measure_col)) @@ -1807,11 +1829,17 @@ def build_grain_group_from_preagg( aliased = ast.Alias(child=agg_expr, alias=ast.Name(output_alias)) select_items.append(aliased) else: - # No merge - output grain column directly, add to GROUP BY - select_items.append(col_ref) + # No merge - output grain column directly, add to GROUP BY. + # Alias to ``component.name`` so the CTE output name is + # the hashed identity regardless of the pre-agg's stored + # column name. + if measure_col != output_alias: + select_items.append( + ast.Alias(child=col_ref, alias=ast.Name(output_alias)), + ) + else: + select_items.append(col_ref) grain_col_names.append(measure_col) - output_alias = measure_col - component_aliases[component.name] = output_alias # Get type from pre-agg columns col_type = preagg.get_column_type(measure_col, default="double") diff --git a/datajunction-server/tests/api/cubes_test.py b/datajunction-server/tests/api/cubes_test.py index e5a0d0941..8c4c65f14 100644 --- a/datajunction-server/tests/api/cubes_test.py +++ b/datajunction-server/tests/api/cubes_test.py @@ -4964,6 +4964,7 @@ async def test_materialize_cube_returns_metric_combiners( order_id, SUM(line_total_sum_e1f61696) line_total_sum_e1f61696, hll_union_agg(customer_id_hll_23002251) customer_id_hll_23002251, + order_id_distinct_f93d50ab, week_order FROM analytics.preaggs.v3_revenue_orders_by_week GROUP BY category, order_id, week_order @@ -5003,6 +5004,14 @@ async def test_materialize_cube_returns_metric_combiners( "semantic_type": "metric_component", "type": "binary", }, + { + "column": None, + "name": "order_id_distinct_f93d50ab", + "node": None, + "semantic_entity": "v3.order_count:order_id_distinct_f93d50ab", + "semantic_type": "metric_component", + "type": "bigint", + }, { "column": None, "name": "week_order", diff --git a/datajunction-server/tests/construction/build_v3/measures_sql_test.py b/datajunction-server/tests/construction/build_v3/measures_sql_test.py index 85235515b..10494ce9e 100644 --- a/datajunction-server/tests/construction/build_v3/measures_sql_test.py +++ b/datajunction-server/tests/construction/build_v3/measures_sql_test.py @@ -1249,7 +1249,7 @@ async def test_three_metrics_same_parent(self, client_with_build_v3): ] # Columns: dimension grain column 3 raw metric columns - assert len(gg["columns"]) == 4 + assert len(gg["columns"]) == 5 assert gg["columns"][0] == { "name": "status", "type": "string", @@ -1272,7 +1272,7 @@ async def test_three_metrics_same_parent(self, client_with_build_v3): FROM default.v3.orders o JOIN default.v3.order_items oi ON o.order_id = oi.order_id ) - SELECT t1.status, t1.order_id, SUM(t1.line_total) line_total_sum_e1f61696, SUM(t1.quantity) quantity_sum_06b64d2e + SELECT t1.status, t1.order_id, SUM(t1.line_total) line_total_sum_e1f61696, SUM(t1.quantity) quantity_sum_06b64d2e, t1.order_id order_id_distinct_f93d50ab FROM v3_order_details t1 GROUP BY t1.status, t1.order_id """, @@ -1299,7 +1299,7 @@ async def test_three_metrics_same_parent(self, client_with_build_v3): # total_revenue component assert components[1] == { - "name": "order_id", + "name": "order_id_distinct_f93d50ab", "expression": "order_id", "aggregation": None, "merge": None, @@ -1647,6 +1647,12 @@ async def test_cross_fact_metrics_different_aggregabilities( "semantic_entity": "v3.page_views_enriched.session_id", "semantic_type": "dimension", }, + { + "name": "session_id_distinct_e7c26e39", + "type": "bigint", + "semantic_entity": "v3.session_count:session_id_distinct_e7c26e39", + "semantic_type": "metric_component", + }, ] assert_sql_equal( gg_sessions["sql"], @@ -1660,7 +1666,7 @@ async def test_cross_fact_metrics_different_aggregabilities( SELECT product_id, category FROM default.v3.products ) - SELECT t2.category, t1.session_id + SELECT t2.category, t1.session_id, t1.session_id session_id_distinct_e7c26e39 FROM v3_page_views_enriched t1 LEFT OUTER JOIN v3_product t2 ON t1.product_id = t2.product_id GROUP BY t2.category, t1.session_id @@ -1721,6 +1727,12 @@ async def test_cross_fact_derived_metric(self, client_with_build_v3): "semantic_entity": "v3.order_details.order_id", "semantic_type": "dimension", }, + { + "name": "order_id_distinct_f93d50ab", + "type": "bigint", + "semantic_entity": "v3.order_count:order_id_distinct_f93d50ab", + "semantic_type": "metric_component", + }, ] # Joins order_details -> product for category dimension assert_sql_equal( @@ -1736,7 +1748,7 @@ async def test_cross_fact_derived_metric(self, client_with_build_v3): SELECT product_id, category FROM default.v3.products ) - SELECT t2.category, t1.order_id + SELECT t2.category, t1.order_id, t1.order_id order_id_distinct_f93d50ab FROM v3_order_details t1 LEFT OUTER JOIN v3_product t2 ON t1.product_id = t2.product_id GROUP BY t2.category, t1.order_id @@ -1762,6 +1774,12 @@ async def test_cross_fact_derived_metric(self, client_with_build_v3): "semantic_entity": "v3.page_views_enriched.customer_id", "semantic_type": "dimension", }, + { + "name": "customer_id_distinct_dd4be7a5", + "type": "bigint", + "semantic_entity": "v3.visitor_count:customer_id_distinct_dd4be7a5", + "semantic_type": "metric_component", + }, ] # Joins page_views_enriched -> product for category dimension assert_sql_equal( @@ -1776,7 +1794,7 @@ async def test_cross_fact_derived_metric(self, client_with_build_v3): SELECT product_id, category FROM default.v3.products ) - SELECT t2.category, t1.customer_id + SELECT t2.category, t1.customer_id, t1.customer_id customer_id_distinct_dd4be7a5 FROM v3_page_views_enriched t1 LEFT OUTER JOIN v3_product t2 ON t1.product_id = t2.product_id GROUP BY t2.category, t1.customer_id @@ -1856,7 +1874,7 @@ async def test_cross_fact_multiple_derived_metrics(self, client_with_build_v3): SELECT product_id, category FROM default.v3.products ) - SELECT t2.category, t1.order_id, SUM(t1.line_total) line_total_sum_e1f61696 + SELECT t2.category, t1.order_id, SUM(t1.line_total) line_total_sum_e1f61696, t1.order_id order_id_distinct_f93d50ab FROM v3_order_details t1 LEFT OUTER JOIN v3_product t2 ON t1.product_id = t2.product_id GROUP BY t2.category, t1.order_id @@ -1882,7 +1900,7 @@ async def test_cross_fact_multiple_derived_metrics(self, client_with_build_v3): SELECT product_id, category FROM default.v3.products ) - SELECT t2.category, t1.customer_id, COUNT(t1.view_id) view_id_count_f41e2db4 + SELECT t2.category, t1.customer_id, COUNT(t1.view_id) view_id_count_f41e2db4, t1.customer_id customer_id_distinct_dd4be7a5 FROM v3_page_views_enriched t1 LEFT OUTER JOIN v3_product t2 ON t1.product_id = t2.product_id GROUP BY t2.category, t1.customer_id @@ -2323,7 +2341,8 @@ async def test_count_distinct_grain_col_not_duplicated_when_also_a_dimension( gg = response.json()["grain_groups"][0] # order_id must appear exactly once in the SELECT (not duplicated as both - # dimension column and grain column) + # dimension column and grain column). The hashed identity projection + # for the LIMITED component is still emitted. assert_sql_equal( gg["sql"], """ @@ -2332,7 +2351,7 @@ async def test_count_distinct_grain_col_not_duplicated_when_also_a_dimension( FROM default.v3.orders o JOIN default.v3.order_items oi ON o.order_id = oi.order_id ) - SELECT t1.status, t1.order_id + SELECT t1.status, t1.order_id, t1.order_id order_id_distinct_f93d50ab FROM v3_order_details t1 GROUP BY t1.status, t1.order_id """, @@ -2358,6 +2377,12 @@ async def test_count_distinct_grain_col_not_duplicated_when_also_a_dimension( "semantic_entity": "v3.order_details.order_id", "semantic_type": "dimension", }, + { + "name": "order_id_distinct_f93d50ab", + "type": "bigint", + "semantic_entity": "v3.order_count:order_id_distinct_f93d50ab", + "semantic_type": "metric_component", + }, ] @@ -3173,7 +3198,8 @@ async def test_temporal_filters_applied_when_cube_exists( t1.order_date date_id, t1.order_id, SUM(t1.line_total) line_total_sum_e1f61696, - SUM(t1.quantity) quantity_sum_06b64d2e + SUM(t1.quantity) quantity_sum_06b64d2e, + t1.order_id order_id_distinct_f93d50ab FROM v3_order_details t1 GROUP BY t1.order_date, t1.order_id """, @@ -4156,7 +4182,8 @@ async def test_nested_derived_metric_decomposes_to_base_components( SELECT t1.status, t1.order_id, - SUM(t1.line_total) line_total_sum_e1f61696 + SUM(t1.line_total) line_total_sum_e1f61696, + t1.order_id order_id_distinct_f93d50ab FROM v3_order_details t1 GROUP BY t1.status, t1.order_id """, @@ -4214,7 +4241,8 @@ async def test_nested_derived_window_metric_decomposes_to_base_components( t2.category, t3.week, t1.order_id, - SUM(t1.line_total) line_total_sum_e1f61696 + SUM(t1.line_total) line_total_sum_e1f61696, + t1.order_id order_id_distinct_f93d50ab FROM v3_order_details t1 LEFT OUTER JOIN v3_product t2 ON t1.product_id = t2.product_id LEFT OUTER JOIN v3_date t3 ON t1.order_date = t3.date_id @@ -4280,7 +4308,8 @@ async def test_nested_derived_cross_fact_decomposes_to_base_components( SELECT t2.category, t1.order_id, - SUM(t1.line_total) line_total_sum_e1f61696 + SUM(t1.line_total) line_total_sum_e1f61696, + t1.order_id order_id_distinct_f93d50ab FROM v3_order_details t1 LEFT OUTER JOIN v3_product t2 ON t1.product_id = t2.product_id GROUP BY t2.category, t1.order_id @@ -4305,7 +4334,8 @@ async def test_nested_derived_cross_fact_decomposes_to_base_components( ) SELECT t2.category, t1.session_id, - COUNT(t1.view_id) view_id_count_f41e2db4 + COUNT(t1.view_id) view_id_count_f41e2db4, + t1.session_id session_id_distinct_e7c26e39 FROM v3_page_views_enriched t1 LEFT OUTER JOIN v3_product t2 ON t1.product_id = t2.product_id GROUP BY t2.category, t1.session_id """, @@ -6402,7 +6432,7 @@ async def test_nested_subquery_drops_fk_column(self, client_with_build_v3): ) AS wrap GROUP BY account_id, event_type ) - SELECT t1.event_type, t1.account_id + SELECT t1.event_type, t1.account_id, t1.account_id account_id_distinct_ccca78c0 FROM v3_events_nested t1 GROUP BY t1.event_type, t1.account_id """, @@ -6461,7 +6491,7 @@ async def test_self_join_filters_both_aliases(self, client_with_build_v3): WHERE a1.audit_date >= 20260101 AND a2.audit_date >= 20260101 ) - SELECT t1.event_type, t1.account_id + SELECT t1.event_type, t1.account_id, t1.account_id account_id_distinct_efcd5b4d FROM v3_events_self_join t1 GROUP BY t1.event_type, t1.account_id """, @@ -6528,7 +6558,7 @@ async def test_source_inside_inner_with_cte( SELECT account_id, event_type FROM v3_events_with_cte__inner_cte inner_cte ) - SELECT t1.event_type, t1.account_id + SELECT t1.event_type, t1.account_id, t1.account_id account_id_distinct_4062d29e FROM v3_events_with_cte t1 GROUP BY t1.event_type, t1.account_id """, @@ -6615,7 +6645,7 @@ async def test_source_inside_where_exists_subquery( AND log.audit_date >= 20260101 ) ) - SELECT t1.event_type, t1.account_id + SELECT t1.event_type, t1.account_id, t1.account_id account_id_distinct_7a788b39 FROM v3_events_exists t1 GROUP BY t1.event_type, t1.account_id """, @@ -6702,7 +6732,7 @@ async def test_set_op_with_nested_subquery_in_non_leading_arm( WHERE deep.audit_date >= 20260101 ) AS wrap ) - SELECT t1.event_type, t1.account_id + SELECT t1.event_type, t1.account_id, t1.account_id account_id_distinct_49f58cc7 FROM v3_events_union_nested t1 GROUP BY t1.event_type, t1.account_id """, @@ -6770,7 +6800,7 @@ async def test_multiple_filters_same_source_different_scopes( AND inner_src.audit_date <= 20260131 ) AS wrap ) - SELECT t1.event_type, t1.account_id + SELECT t1.event_type, t1.account_id, t1.account_id account_id_distinct_233abbf9 FROM v3_events_two_filters t1 GROUP BY t1.event_type, t1.account_id """, @@ -6835,7 +6865,7 @@ async def test_source_in_from_and_in_where_exists(self, client_with_build_v3): ) AND a.audit_date >= 20260101 ) - SELECT t1.event_type, t1.account_id + SELECT t1.event_type, t1.account_id, t1.account_id account_id_distinct_3d483180 FROM v3_events_from_exists t1 GROUP BY t1.event_type, t1.account_id """, @@ -6887,7 +6917,7 @@ async def test_source_without_alias(self, client_with_build_v3): FROM default.v3.audit_log_noalias WHERE src_audit_log_noalias.audit_date >= 20260101 ) - SELECT t1.event_type, t1.account_id + SELECT t1.event_type, t1.account_id, t1.account_id account_id_distinct_a0b1fd19 FROM v3_events_no_alias t1 GROUP BY t1.event_type, t1.account_id """, @@ -6965,7 +6995,7 @@ async def test_source_in_having_subquery(self, client_with_build_v3): WHERE log.audit_date >= 20260101 ) ) - SELECT t1.event_type, t1.account_id + SELECT t1.event_type, t1.account_id, t1.account_id account_id_distinct_4e8747df FROM v3_events_having t1 GROUP BY t1.event_type, t1.account_id """, @@ -7040,7 +7070,7 @@ async def test_source_in_select_list_scalar_subquery( WHERE log.audit_date >= 20260101) AS max_id FROM default.v3.acct_scalar AS acc ) - SELECT t1.event_type, t1.account_id + SELECT t1.event_type, t1.account_id, t1.account_id account_id_distinct_946a2255 FROM v3_events_scalar t1 GROUP BY t1.event_type, t1.account_id """, @@ -7181,7 +7211,7 @@ async def test_two_upstreams_linked_to_same_dim(self, client_with_build_v3): WHERE x.audit_date >= 20260101 AND y.audit_date >= 20260101 ) - SELECT t1.event_type, t1.account_id + SELECT t1.event_type, t1.account_id, t1.account_id account_id_distinct_ec521d7c FROM v3_events_two_upstreams t1 GROUP BY t1.event_type, t1.account_id """, @@ -7286,7 +7316,7 @@ async def test_transform_link_target(self, client_with_build_v3): SELECT account_id, event_type FROM v3_events_with_date ) - SELECT t1.event_type, t1.account_id + SELECT t1.event_type, t1.account_id, t1.account_id account_id_distinct_274a187b FROM v3_events_wrapping t1 GROUP BY t1.event_type, t1.account_id """, @@ -7385,7 +7415,7 @@ async def test_transform_link_plus_source_link_double_filter( v3_events_double_link_wrap AS ( SELECT account_id, event_type FROM v3_events_double_link ) - SELECT t1.event_type, t1.account_id + SELECT t1.event_type, t1.account_id, t1.account_id account_id_distinct_b9c0262f FROM v3_events_double_link_wrap t1 GROUP BY t1.event_type, t1.account_id """, @@ -9819,9 +9849,9 @@ async def test_count_distinct_plain_column_projects_hashed_alias( The metric's persisted ``derived_expression``, the v2 pre-agg materialization column, and the cube column all reference the hashed name as the canonical identity. Without the extra - projection, downstream consumers (XP/ABlaze) reading the live - measures SQL output can't resolve the column the - ``derived_expression`` references. The combiner in the + projection, downstream consumers reading the live measures SQL + output can't resolve the column the ``derived_expression`` + references. The combiner in the ``/sql/measures/v3`` response must also reference the hashed name for consistency with the persisted expression. """ @@ -10027,13 +10057,28 @@ async def test_count_distinct_in_merged_grain_group_projects_hashed_alias( if "v3.merged_distinct_accounts" in g["metrics"] ) cols = [c["name"] for c in gg["columns"]] - plain_hashed = next( - n for n in cols if n.startswith("account_id_distinct_") - ) - complex_hashed = next( + # Pin uniqueness of each hashed identity so a future + # decompose-side naming change (e.g., a uniformity refactor + # that drops a column-name prefix) breaks the test rather + # than silently passing with both `next()` calls returning + # the same column. + plain_matches = [ + n + for n in cols + if n.startswith("account_id_distinct_") + and not n.startswith("is_active_account_id_distinct_") + ] + complex_matches = [ n for n in cols if n.startswith("is_active_account_id_distinct_") - ) - amount_sum_hashed = next(n for n in cols if n.startswith("amount_sum_")) + ] + amount_matches = [n for n in cols if n.startswith("amount_sum_")] + assert len(plain_matches) == 1, plain_matches + assert len(complex_matches) == 1, complex_matches + assert len(amount_matches) == 1, amount_matches + plain_hashed = plain_matches[0] + complex_hashed = complex_matches[0] + amount_sum_hashed = amount_matches[0] + assert plain_hashed != complex_hashed assert_sql_equal( gg["sql"], @@ -10061,3 +10106,107 @@ async def test_count_distinct_in_merged_grain_group_projects_hashed_alias( "v3.merged_distinct_active_accounts": f"COUNT( DISTINCT {complex_hashed})", "v3.merged_distinct_total_amount": f"SUM({amount_sum_hashed})", } + + @pytest.mark.asyncio + async def test_count_distinct_dedup_across_shared_components( + self, + client_with_build_v3, + ): + """When two metrics share the SAME LIMITED component (same + ``component.name``), ``register_limited_component`` must be + invoked exactly once — emitting a single `` AS `` + projection. Duplicate emissions would produce SQL with two + identically-named columns, which Spark/Druid reject. + + The dedup is currently held by ``seen_components`` in + ``build_grain_group_sql``; this test pins that contract end- + to-end. Two metrics with the *same* ``COUNT(DISTINCT + account_id)`` body share the same component hash, so the + helper sees the component twice but only emits once. + """ + client = client_with_build_v3 + resp = await client.post( + "/nodes/source/", + json={ + "name": "v3.src_dedup_distinct", + "columns": [ + {"name": "account_id", "type": "int"}, + {"name": "event_date", "type": "int"}, + ], + "mode": "published", + "catalog": "default", + "schema_": "v3", + "table": "dedup_distinct_events", + }, + ) + assert resp.status_code in (200, 201), resp.json() + resp = await client.post( + "/nodes/transform/", + json={ + "name": "v3.dedup_distinct_xform", + "query": "SELECT account_id, event_date FROM v3.src_dedup_distinct", + "mode": "published", + "primary_key": ["account_id", "event_date"], + }, + ) + assert resp.status_code == 201, resp.json() + # Two metrics with identical decomposition — both produce a + # COUNT(DISTINCT account_id) component with the same hash. + for name in ("v3.dedup_unique_accounts", "v3.dedup_distinct_accounts"): + resp = await client.post( + "/nodes/metric/", + json={ + "name": name, + "query": ( + "SELECT COUNT(DISTINCT account_id) FROM v3.dedup_distinct_xform" + ), + "mode": "published", + }, + ) + assert resp.status_code == 201, resp.json() + + response = await client.get( + "/sql/measures/v3/", + params={ + "metrics": [ + "v3.dedup_unique_accounts", + "v3.dedup_distinct_accounts", + ], + "dimensions": ["v3.dedup_distinct_xform.event_date"], + }, + ) + assert response.status_code == 200, response.json() + body = response.json() + gg = body["grain_groups"][0] + cols = [c["name"] for c in gg["columns"]] + + # The hashed identity column appears exactly once even though + # two metrics reference it. + hashed_matches = [n for n in cols if n.startswith("account_id_distinct_")] + assert len(hashed_matches) == 1, hashed_matches + hashed_col = hashed_matches[0] + + assert_sql_equal( + gg["sql"], + f""" + WITH v3_dedup_distinct_xform AS ( + SELECT account_id, event_date + FROM default.v3.dedup_distinct_events + ) + SELECT t1.event_date, + t1.account_id, + t1.account_id {hashed_col} + FROM v3_dedup_distinct_xform t1 + GROUP BY t1.event_date, t1.account_id + """, + normalize_aliases=True, + ) + + # Both metric formulas reference the same hashed column. + combiners_by_metric = { + f["name"]: f["combiner"] for f in body["metric_formulas"] + } + assert combiners_by_metric == { + "v3.dedup_unique_accounts": f"COUNT( DISTINCT {hashed_col})", + "v3.dedup_distinct_accounts": f"COUNT( DISTINCT {hashed_col})", + } diff --git a/datajunction-server/tests/construction/build_v3/metrics_sql_test.py b/datajunction-server/tests/construction/build_v3/metrics_sql_test.py index dfae474a6..b74d0b91a 100644 --- a/datajunction-server/tests/construction/build_v3/metrics_sql_test.py +++ b/datajunction-server/tests/construction/build_v3/metrics_sql_test.py @@ -242,7 +242,9 @@ async def test_derived_metric_ratio(self, client_with_build_v3): FROM default.v3.page_views ), order_details_0 AS ( - SELECT t2.category, t1.order_id + SELECT t2.category, + t1.order_id, + t1.order_id order_id_distinct_f93d50ab FROM v3_order_details t1 LEFT OUTER JOIN v3_product t2 ON t1.product_id = t2.product_id GROUP BY t2.category, t1.order_id @@ -255,7 +257,9 @@ async def test_derived_metric_ratio(self, client_with_build_v3): GROUP BY category ), page_views_enriched_0 AS ( - SELECT t2.category, t1.customer_id + SELECT t2.category, + t1.customer_id, + t1.customer_id customer_id_distinct_dd4be7a5 FROM v3_page_views_enriched t1 LEFT OUTER JOIN v3_product t2 ON t1.product_id = t2.product_id GROUP BY t2.category, t1.customer_id @@ -326,13 +330,13 @@ async def test_multiple_derived_metrics_same_fact(self, client_with_build_v3): JOIN default.v3.order_items oi ON o.order_id = oi.order_id ), order_details_0 AS ( - SELECT t1.status, t1.order_id, SUM(t1.line_total) line_total_sum_e1f61696, SUM(t1.quantity) quantity_sum_06b64d2e + SELECT t1.status, t1.order_id, SUM(t1.line_total) line_total_sum_e1f61696, SUM(t1.quantity) quantity_sum_06b64d2e, t1.order_id order_id_distinct_f93d50ab FROM v3_order_details t1 GROUP BY t1.status, t1.order_id ) SELECT order_details_0.status AS status, - SUM(order_details_0.line_total_sum_e1f61696) / NULLIF(COUNT(DISTINCT order_details_0.order_id), 0) AS avg_order_value, - SUM(order_details_0.quantity_sum_06b64d2e) / NULLIF(COUNT(DISTINCT order_details_0.order_id), 0) AS avg_items_per_order + SUM(order_details_0.line_total_sum_e1f61696) / NULLIF(COUNT(DISTINCT order_details_0.order_id_distinct_f93d50ab), 0) AS avg_order_value, + SUM(order_details_0.quantity_sum_06b64d2e) / NULLIF(COUNT(DISTINCT order_details_0.order_id_distinct_f93d50ab), 0) AS avg_items_per_order FROM order_details_0 GROUP BY order_details_0.status """, @@ -412,7 +416,8 @@ async def test_derived_metrics_cross_fact(self, client_with_build_v3): SELECT t2.category, t3.name name_customer, t1.order_id, - SUM(t1.line_total) line_total_sum_e1f61696 + SUM(t1.line_total) line_total_sum_e1f61696, + t1.order_id order_id_distinct_f93d50ab FROM v3_order_details t1 LEFT OUTER JOIN v3_product t2 ON t1.product_id = t2.product_id LEFT OUTER JOIN v3_customer t3 ON t1.customer_id = t3.customer_id GROUP BY t2.category, t3.name, t1.order_id @@ -421,7 +426,8 @@ async def test_derived_metrics_cross_fact(self, client_with_build_v3): SELECT t2.category, t3.name name_customer, t1.customer_id, - COUNT(t1.view_id) view_id_count_f41e2db4 + COUNT(t1.view_id) view_id_count_f41e2db4, + t1.customer_id customer_id_distinct_dd4be7a5 FROM v3_page_views_enriched t1 LEFT OUTER JOIN v3_product t2 ON t1.product_id = t2.product_id LEFT OUTER JOIN v3_customer t3 ON t1.customer_id = t3.customer_id GROUP BY t2.category, t3.name, t1.customer_id @@ -429,8 +435,8 @@ async def test_derived_metrics_cross_fact(self, client_with_build_v3): SELECT COALESCE(order_details_0.category, page_views_enriched_0.category) AS category, COALESCE(order_details_0.name_customer, page_views_enriched_0.name_customer) AS name_customer, - CAST(COUNT( DISTINCT order_details_0.order_id) AS DOUBLE) / NULLIF(COUNT( DISTINCT page_views_enriched_0.customer_id), 0) AS conversion_rate, - SUM(order_details_0.line_total_sum_e1f61696) / NULLIF(COUNT( DISTINCT page_views_enriched_0.customer_id), 0) AS revenue_per_visitor, + CAST(COUNT( DISTINCT order_details_0.order_id_distinct_f93d50ab) AS DOUBLE) / NULLIF(COUNT( DISTINCT page_views_enriched_0.customer_id_distinct_dd4be7a5), 0) AS conversion_rate, + SUM(order_details_0.line_total_sum_e1f61696) / NULLIF(COUNT( DISTINCT page_views_enriched_0.customer_id_distinct_dd4be7a5), 0) AS revenue_per_visitor, SUM(order_details_0.line_total_sum_e1f61696) / NULLIF(SUM(page_views_enriched_0.view_id_count_f41e2db4), 0) AS revenue_per_page_view FROM order_details_0 FULL OUTER JOIN page_views_enriched_0 ON order_details_0.category = page_views_enriched_0.category AND order_details_0.name_customer = page_views_enriched_0.name_customer GROUP BY 1, 2 @@ -511,13 +517,14 @@ async def test_pages_per_session_same_fact_derived(self, client_with_build_v3): page_views_enriched_0 AS ( SELECT t2.category, t1.session_id, - COUNT(t1.view_id) view_id_count_f41e2db4 + COUNT(t1.view_id) view_id_count_f41e2db4, + t1.session_id session_id_distinct_e7c26e39 FROM v3_page_views_enriched t1 LEFT OUTER JOIN v3_product t2 ON t1.product_id = t2.product_id GROUP BY t2.category, t1.session_id ) SELECT page_views_enriched_0.category AS category, - SUM(page_views_enriched_0.view_id_count_f41e2db4) / NULLIF(COUNT( DISTINCT page_views_enriched_0.session_id), 0) AS pages_per_session + SUM(page_views_enriched_0.view_id_count_f41e2db4) / NULLIF(COUNT( DISTINCT page_views_enriched_0.session_id_distinct_e7c26e39), 0) AS pages_per_session FROM page_views_enriched_0 GROUP BY page_views_enriched_0.category """, @@ -1419,7 +1426,8 @@ async def test_period_over_period_metrics(self, client_with_build_v3): t3.month, t3.week, t1.order_id, - SUM(t1.line_total) line_total_sum_e1f61696 + SUM(t1.line_total) line_total_sum_e1f61696, + t1.order_id order_id_distinct_f93d50ab FROM v3_order_details t1 LEFT OUTER JOIN v3_product t2 ON t1.product_id = t2.product_id LEFT OUTER JOIN v3_date t3 ON t1.order_date = t3.date_id @@ -1430,7 +1438,7 @@ async def test_period_over_period_metrics(self, client_with_build_v3): order_details_0.category AS category, order_details_0.month AS month, order_details_0.week AS week, - COUNT(DISTINCT order_details_0.order_id) AS order_count, + COUNT(DISTINCT order_details_0.order_id_distinct_f93d50ab) AS order_count, SUM(order_details_0.line_total_sum_e1f61696) AS total_revenue FROM order_details_0 GROUP BY order_details_0.category, order_details_0.month, order_details_0.week @@ -1439,7 +1447,7 @@ async def test_period_over_period_metrics(self, client_with_build_v3): SELECT order_details_0.category AS category, order_details_0.week AS week, - COUNT(DISTINCT order_details_0.order_id) AS order_count, + COUNT(DISTINCT order_details_0.order_id_distinct_f93d50ab) AS order_count, SUM(order_details_0.line_total_sum_e1f61696) AS total_revenue FROM order_details_0 GROUP BY order_details_0.category, order_details_0.week @@ -1619,7 +1627,7 @@ async def test_cross_fact_full_plus_limited_fan_out( GROUP BY t2.category ), page_views_enriched_0 AS ( - SELECT t2.category, t1.customer_id + SELECT t2.category, t1.customer_id, t1.customer_id customer_id_distinct_dd4be7a5 FROM v3_page_views_enriched t1 LEFT OUTER JOIN v3_product t2 ON t1.product_id = t2.product_id GROUP BY t2.category, t1.customer_id @@ -1683,13 +1691,14 @@ async def test_derived_metric_with_base_metric_in_same_query( SELECT t1.status, t1.order_id, - SUM(t1.line_total) line_total_sum_e1f61696 + SUM(t1.line_total) line_total_sum_e1f61696, + t1.order_id order_id_distinct_f93d50ab FROM v3_order_details t1 GROUP BY t1.status, t1.order_id ) SELECT order_details_0.status AS status, - SUM(order_details_0.line_total_sum_e1f61696) / NULLIF(COUNT( DISTINCT order_details_0.order_id), 0) AS avg_order_value, + SUM(order_details_0.line_total_sum_e1f61696) / NULLIF(COUNT( DISTINCT order_details_0.order_id_distinct_f93d50ab), 0) AS avg_order_value, SUM(order_details_0.line_total_sum_e1f61696) AS total_revenue FROM order_details_0 GROUP BY order_details_0.status @@ -2143,12 +2152,12 @@ async def test_nested_derived_metric_simple(self, client_with_build_v3): JOIN default.v3.order_items oi ON o.order_id = oi.order_id ), order_details_0 AS ( - SELECT t1.status, t1.order_id, SUM(t1.line_total) line_total_sum_e1f61696 + SELECT t1.status, t1.order_id, SUM(t1.line_total) line_total_sum_e1f61696, t1.order_id order_id_distinct_f93d50ab FROM v3_order_details t1 GROUP BY t1.status, t1.order_id ) SELECT order_details_0.status AS status, - SUM(order_details_0.line_total_sum_e1f61696) / NULLIF(COUNT(DISTINCT order_details_0.order_id), 0) / 50.0 * 100 AS aov_growth_index + SUM(order_details_0.line_total_sum_e1f61696) / NULLIF(COUNT(DISTINCT order_details_0.order_id_distinct_f93d50ab), 0) / 50.0 * 100 AS aov_growth_index FROM order_details_0 GROUP BY order_details_0.status """, @@ -2208,7 +2217,7 @@ async def test_nested_derived_metric_with_window_function( FROM default.v3.products ), order_details_0 AS ( - SELECT t2.category, t3.week, t1.order_id, SUM(t1.line_total) line_total_sum_e1f61696 + SELECT t2.category, t3.week, t1.order_id, SUM(t1.line_total) line_total_sum_e1f61696, t1.order_id order_id_distinct_f93d50ab FROM v3_order_details t1 LEFT OUTER JOIN v3_product t2 ON t1.product_id = t2.product_id LEFT OUTER JOIN v3_date t3 ON t1.order_date = t3.date_id @@ -2218,9 +2227,9 @@ async def test_nested_derived_metric_with_window_function( SELECT order_details_0.category AS category, order_details_0.week AS week, - COUNT(DISTINCT order_details_0.order_id) AS order_count, + COUNT(DISTINCT order_details_0.order_id_distinct_f93d50ab) AS order_count, SUM(order_details_0.line_total_sum_e1f61696) AS total_revenue, - SUM(order_details_0.line_total_sum_e1f61696) / NULLIF(COUNT( DISTINCT order_details_0.order_id), 0) AS avg_order_value + SUM(order_details_0.line_total_sum_e1f61696) / NULLIF(COUNT( DISTINCT order_details_0.order_id_distinct_f93d50ab), 0) AS avg_order_value FROM order_details_0 GROUP BY order_details_0.category, order_details_0.week ) @@ -2300,20 +2309,20 @@ async def test_nested_derived_metric_cross_fact(self, client_with_build_v3): FROM default.v3.page_views ), order_details_0 AS ( - SELECT t2.category, t1.order_id, SUM(t1.line_total) line_total_sum_e1f61696 + SELECT t2.category, t1.order_id, SUM(t1.line_total) line_total_sum_e1f61696, t1.order_id order_id_distinct_f93d50ab FROM v3_order_details t1 LEFT OUTER JOIN v3_product t2 ON t1.product_id = t2.product_id GROUP BY t2.category, t1.order_id ), page_views_enriched_0 AS ( - SELECT t2.category, t1.session_id, COUNT(t1.view_id) view_id_count_f41e2db4 + SELECT t2.category, t1.session_id, COUNT(t1.view_id) view_id_count_f41e2db4, t1.session_id session_id_distinct_e7c26e39 FROM v3_page_views_enriched t1 LEFT OUTER JOIN v3_product t2 ON t1.product_id = t2.product_id GROUP BY t2.category, t1.session_id ) SELECT COALESCE(order_details_0.category, page_views_enriched_0.category) AS category, - SUM(order_details_0.line_total_sum_e1f61696) / NULLIF(COUNT(DISTINCT order_details_0.order_id), 0) - / NULLIF(SUM(page_views_enriched_0.view_id_count_f41e2db4) / NULLIF(COUNT(DISTINCT page_views_enriched_0.session_id), 0), 0) AS efficiency_ratio + SUM(order_details_0.line_total_sum_e1f61696) / NULLIF(COUNT(DISTINCT order_details_0.order_id_distinct_f93d50ab), 0) + / NULLIF(SUM(page_views_enriched_0.view_id_count_f41e2db4) / NULLIF(COUNT(DISTINCT page_views_enriched_0.session_id_distinct_e7c26e39), 0), 0) AS efficiency_ratio FROM order_details_0 FULL OUTER JOIN page_views_enriched_0 ON order_details_0.category = page_views_enriched_0.category GROUP BY 1 @@ -2683,7 +2692,8 @@ async def test_wow_with_count_distinct_at_daily_grain(self, client_with_build_v3 t2.category, t3.week, t1.order_id, - SUM(t1.line_total) line_total_sum_e1f61696 + SUM(t1.line_total) line_total_sum_e1f61696, + t1.order_id order_id_distinct_f93d50ab FROM v3_order_details t1 LEFT OUTER JOIN v3_product t2 ON t1.product_id = t2.product_id LEFT OUTER JOIN v3_date t3 ON t1.order_date = t3.date_id @@ -2694,7 +2704,7 @@ async def test_wow_with_count_distinct_at_daily_grain(self, client_with_build_v3 order_details_0.date_id_order AS date_id_order, order_details_0.category AS category, order_details_0.week AS week, - COUNT(DISTINCT order_details_0.order_id) AS order_count, + COUNT(DISTINCT order_details_0.order_id_distinct_f93d50ab) AS order_count, SUM(order_details_0.line_total_sum_e1f61696) AS total_revenue FROM order_details_0 GROUP BY order_details_0.date_id_order, order_details_0.category, order_details_0.week @@ -2703,7 +2713,7 @@ async def test_wow_with_count_distinct_at_daily_grain(self, client_with_build_v3 SELECT order_details_0.category AS category, order_details_0.week AS week, - COUNT(DISTINCT order_details_0.order_id) AS order_count, + COUNT(DISTINCT order_details_0.order_id_distinct_f93d50ab) AS order_count, SUM(order_details_0.line_total_sum_e1f61696) AS total_revenue FROM order_details_0 GROUP BY order_details_0.category, order_details_0.week @@ -2880,14 +2890,14 @@ async def test_cross_fact_wow_conversion_rate(self, client_with_build_v3): FROM default.v3.page_views ), order_details_0 AS ( - SELECT t2.category, t3.week, t1.order_id + SELECT t2.category, t3.week, t1.order_id, t1.order_id order_id_distinct_f93d50ab FROM v3_order_details t1 LEFT OUTER JOIN v3_product t2 ON t1.product_id = t2.product_id LEFT OUTER JOIN v3_date t3 ON t1.order_date = t3.date_id GROUP BY t2.category, t3.week, t1.order_id ), page_views_enriched_0 AS ( - SELECT t2.category, t3.week, t1.customer_id + SELECT t2.category, t3.week, t1.customer_id, t1.customer_id customer_id_distinct_dd4be7a5 FROM v3_page_views_enriched t1 LEFT OUTER JOIN v3_product t2 ON t1.product_id = t2.product_id LEFT OUTER JOIN v3_date t3 ON t1.page_date = t3.date_id @@ -2897,9 +2907,9 @@ async def test_cross_fact_wow_conversion_rate(self, client_with_build_v3): SELECT COALESCE(order_details_0.category, page_views_enriched_0.category) AS category, COALESCE(order_details_0.week, page_views_enriched_0.week) AS week, - COUNT(DISTINCT order_details_0.order_id) AS order_count, - COUNT(DISTINCT page_views_enriched_0.customer_id) AS visitor_count, - CAST(COUNT(DISTINCT order_details_0.order_id) AS DOUBLE) / NULLIF(COUNT(DISTINCT page_views_enriched_0.customer_id), 0) AS conversion_rate + COUNT(DISTINCT order_details_0.order_id_distinct_f93d50ab) AS order_count, + COUNT(DISTINCT page_views_enriched_0.customer_id_distinct_dd4be7a5) AS visitor_count, + CAST(COUNT(DISTINCT order_details_0.order_id_distinct_f93d50ab) AS DOUBLE) / NULLIF(COUNT(DISTINCT page_views_enriched_0.customer_id_distinct_dd4be7a5), 0) AS conversion_rate FROM order_details_0 FULL OUTER JOIN page_views_enriched_0 ON order_details_0.category = page_views_enriched_0.category AND order_details_0.week = page_views_enriched_0.week GROUP BY 1, 2 @@ -3021,7 +3031,8 @@ async def test_cross_fact_window_metric_with_finer_grain( SELECT COALESCE(t1.order_date, t3.date_id) AS date_id, t2.category, t3.week, - t1.order_id + t1.order_id, + t1.order_id order_id_distinct_f93d50ab FROM v3_order_details t1 LEFT OUTER JOIN v3_product t2 ON t1.product_id = t2.product_id LEFT OUTER JOIN v3_date t3 ON t1.order_date = t3.date_id GROUP BY COALESCE(t1.order_date, t3.date_id), t2.category, t3.week, t1.order_id @@ -3030,7 +3041,8 @@ async def test_cross_fact_window_metric_with_finer_grain( SELECT COALESCE(t1.page_date, t3.date_id) AS date_id, t2.category, t3.week, - t1.customer_id + t1.customer_id, + t1.customer_id customer_id_distinct_dd4be7a5 FROM v3_page_views_enriched t1 LEFT OUTER JOIN v3_product t2 ON t1.product_id = t2.product_id LEFT OUTER JOIN v3_date t3 ON t1.page_date = t3.date_id GROUP BY COALESCE(t1.page_date, t3.date_id), t2.category, t3.week, t1.customer_id @@ -3039,9 +3051,9 @@ async def test_cross_fact_window_metric_with_finer_grain( SELECT COALESCE(order_details_0.date_id, page_views_enriched_0.date_id) AS date_id, COALESCE(order_details_0.category, page_views_enriched_0.category) AS category, COALESCE(order_details_0.week, page_views_enriched_0.week) AS week, - COUNT( DISTINCT order_details_0.order_id) AS order_count, - COUNT( DISTINCT page_views_enriched_0.customer_id) AS visitor_count, - CAST(COUNT( DISTINCT order_details_0.order_id) AS DOUBLE) / NULLIF(COUNT( DISTINCT page_views_enriched_0.customer_id), 0) AS conversion_rate + COUNT( DISTINCT order_details_0.order_id_distinct_f93d50ab) AS order_count, + COUNT( DISTINCT page_views_enriched_0.customer_id_distinct_dd4be7a5) AS visitor_count, + CAST(COUNT( DISTINCT order_details_0.order_id_distinct_f93d50ab) AS DOUBLE) / NULLIF(COUNT( DISTINCT page_views_enriched_0.customer_id_distinct_dd4be7a5), 0) AS conversion_rate FROM order_details_0 FULL OUTER JOIN page_views_enriched_0 ON order_details_0.date_id = page_views_enriched_0.date_id AND order_details_0.category = page_views_enriched_0.category AND order_details_0.week = page_views_enriched_0.week GROUP BY 1, 2, 3 ) @@ -3130,7 +3142,8 @@ async def test_cross_fact_window_on_derived_metric(self, client_with_build_v3): t2.category, t3.week, t1.order_id, - SUM(t1.line_total) line_total_sum_e1f61696 + SUM(t1.line_total) line_total_sum_e1f61696, + t1.order_id order_id_distinct_f93d50ab FROM v3_order_details t1 LEFT OUTER JOIN v3_product t2 ON t1.product_id = t2.product_id LEFT OUTER JOIN v3_date t3 ON t1.order_date = t3.date_id GROUP BY COALESCE(t1.order_date, t3.date_id), t2.category, t3.week, t1.order_id @@ -3140,7 +3153,8 @@ async def test_cross_fact_window_on_derived_metric(self, client_with_build_v3): t2.category, t3.week, t1.session_id, - COUNT(t1.view_id) view_id_count_f41e2db4 + COUNT(t1.view_id) view_id_count_f41e2db4, + t1.session_id session_id_distinct_e7c26e39 FROM v3_page_views_enriched t1 LEFT OUTER JOIN v3_product t2 ON t1.product_id = t2.product_id LEFT OUTER JOIN v3_date t3 ON t1.page_date = t3.date_id GROUP BY COALESCE(t1.page_date, t3.date_id), t2.category, t3.week, t1.session_id @@ -3149,13 +3163,13 @@ async def test_cross_fact_window_on_derived_metric(self, client_with_build_v3): SELECT COALESCE(order_details_0.date_id, page_views_enriched_0.date_id) AS date_id, COALESCE(order_details_0.category, page_views_enriched_0.category) AS category, COALESCE(order_details_0.week, page_views_enriched_0.week) AS week, - COUNT( DISTINCT order_details_0.order_id) AS order_count, + COUNT( DISTINCT order_details_0.order_id_distinct_f93d50ab) AS order_count, SUM(page_views_enriched_0.view_id_count_f41e2db4) AS page_view_count, - COUNT( DISTINCT page_views_enriched_0.session_id) AS session_count, + COUNT( DISTINCT page_views_enriched_0.session_id_distinct_e7c26e39) AS session_count, SUM(order_details_0.line_total_sum_e1f61696) AS total_revenue, - SUM(order_details_0.line_total_sum_e1f61696) / NULLIF(COUNT( DISTINCT order_details_0.order_id), 0) AS avg_order_value, - SUM(order_details_0.line_total_sum_e1f61696) / NULLIF(COUNT( DISTINCT order_details_0.order_id), 0) / NULLIF(SUM(page_views_enriched_0.view_id_count_f41e2db4) / NULLIF(COUNT( DISTINCT page_views_enriched_0.session_id), 0), 0) AS efficiency_ratio, - SUM(page_views_enriched_0.view_id_count_f41e2db4) / NULLIF(COUNT( DISTINCT page_views_enriched_0.session_id), 0) AS pages_per_session + SUM(order_details_0.line_total_sum_e1f61696) / NULLIF(COUNT( DISTINCT order_details_0.order_id_distinct_f93d50ab), 0) AS avg_order_value, + SUM(order_details_0.line_total_sum_e1f61696) / NULLIF(COUNT( DISTINCT order_details_0.order_id_distinct_f93d50ab), 0) / NULLIF(SUM(page_views_enriched_0.view_id_count_f41e2db4) / NULLIF(COUNT( DISTINCT page_views_enriched_0.session_id_distinct_e7c26e39), 0), 0) AS efficiency_ratio, + SUM(page_views_enriched_0.view_id_count_f41e2db4) / NULLIF(COUNT( DISTINCT page_views_enriched_0.session_id_distinct_e7c26e39), 0) AS pages_per_session FROM order_details_0 FULL OUTER JOIN page_views_enriched_0 ON order_details_0.date_id = page_views_enriched_0.date_id AND order_details_0.category = page_views_enriched_0.category AND order_details_0.week = page_views_enriched_0.week GROUP BY 1, 2, 3 ) @@ -3244,7 +3258,8 @@ async def test_cross_fact_window_on_base_metrics(self, client_with_build_v3): SELECT COALESCE(t1.order_date, t3.date_id) AS date_id, t2.category, t3.week, - t1.order_id + t1.order_id, + t1.order_id order_id_distinct_f93d50ab FROM v3_order_details t1 LEFT OUTER JOIN v3_product t2 ON t1.product_id = t2.product_id LEFT OUTER JOIN v3_date t3 ON t1.order_date = t3.date_id GROUP BY COALESCE(t1.order_date, t3.date_id), t2.category, t3.week, t1.order_id @@ -3253,7 +3268,8 @@ async def test_cross_fact_window_on_base_metrics(self, client_with_build_v3): SELECT COALESCE(t1.page_date, t3.date_id) AS date_id, t2.category, t3.week, - t1.customer_id + t1.customer_id, + t1.customer_id customer_id_distinct_dd4be7a5 FROM v3_page_views_enriched t1 LEFT OUTER JOIN v3_product t2 ON t1.product_id = t2.product_id LEFT OUTER JOIN v3_date t3 ON t1.page_date = t3.date_id GROUP BY COALESCE(t1.page_date, t3.date_id), t2.category, t3.week, t1.customer_id @@ -3262,8 +3278,8 @@ async def test_cross_fact_window_on_base_metrics(self, client_with_build_v3): SELECT COALESCE(order_details_0.date_id, page_views_enriched_0.date_id) AS date_id, COALESCE(order_details_0.category, page_views_enriched_0.category) AS category, COALESCE(order_details_0.week, page_views_enriched_0.week) AS week, - COUNT( DISTINCT order_details_0.order_id) AS order_count, - COUNT( DISTINCT page_views_enriched_0.customer_id) AS visitor_count + COUNT( DISTINCT order_details_0.order_id_distinct_f93d50ab) AS order_count, + COUNT( DISTINCT page_views_enriched_0.customer_id_distinct_dd4be7a5) AS visitor_count FROM order_details_0 FULL OUTER JOIN page_views_enriched_0 ON order_details_0.date_id = page_views_enriched_0.date_id AND order_details_0.category = page_views_enriched_0.category AND order_details_0.week = page_views_enriched_0.week GROUP BY 1, 2, 3 ), @@ -3383,7 +3399,8 @@ async def test_multi_fact_window_metrics_same_grain(self, client_with_build_v3): SELECT t2.category, t3.week, t1.session_id, - COUNT(t1.view_id) view_id_count_f41e2db4 + COUNT(t1.view_id) view_id_count_f41e2db4, + t1.session_id session_id_distinct_e7c26e39 FROM v3_page_views_enriched t1 LEFT OUTER JOIN v3_product t2 ON t1.product_id = t2.product_id LEFT OUTER JOIN v3_date t3 ON t1.page_date = t3.date_id GROUP BY t2.category, t3.week, t1.session_id @@ -3392,9 +3409,9 @@ async def test_multi_fact_window_metrics_same_grain(self, client_with_build_v3): SELECT COALESCE(order_details_0.category, page_views_enriched_0.category) AS category, COALESCE(order_details_0.week, page_views_enriched_0.week) AS week, SUM(page_views_enriched_0.view_id_count_f41e2db4) AS page_view_count, - COUNT( DISTINCT page_views_enriched_0.session_id) AS session_count, + COUNT( DISTINCT page_views_enriched_0.session_id_distinct_e7c26e39) AS session_count, SUM(order_details_0.line_total_sum_e1f61696) AS total_revenue, - SUM(page_views_enriched_0.view_id_count_f41e2db4) / NULLIF(COUNT( DISTINCT page_views_enriched_0.session_id), 0) AS pages_per_session + SUM(page_views_enriched_0.view_id_count_f41e2db4) / NULLIF(COUNT( DISTINCT page_views_enriched_0.session_id_distinct_e7c26e39), 0) AS pages_per_session FROM order_details_0 FULL OUTER JOIN page_views_enriched_0 ON order_details_0.category = page_views_enriched_0.category AND order_details_0.week = page_views_enriched_0.week GROUP BY 1, 2 ) @@ -3605,13 +3622,14 @@ async def test_order_by_multiple_columns(self, client_with_build_v3): order_details_0 AS ( SELECT t1.status, t1.order_id, - SUM(t1.line_total) line_total_sum_e1f61696 + SUM(t1.line_total) line_total_sum_e1f61696, + t1.order_id order_id_distinct_f93d50ab FROM v3_order_details t1 GROUP BY t1.status, t1.order_id ) SELECT order_details_0.status AS status, SUM(order_details_0.line_total_sum_e1f61696) AS total_revenue, - COUNT(DISTINCT order_details_0.order_id) AS order_count + COUNT(DISTINCT order_details_0.order_id_distinct_f93d50ab) AS order_count FROM order_details_0 GROUP BY order_details_0.status ORDER BY status ASC, total_revenue DESC @@ -5103,13 +5121,13 @@ async def test_full_and_limited_metrics_same_fact_computed_at_limited_grain( JOIN default.v3.order_items oi ON o.order_id = oi.order_id ), order_details_0 AS ( - SELECT t1.status, t1.order_id, SUM(t1.line_total) line_total_sum_e1f61696 + SELECT t1.status, t1.order_id, SUM(t1.line_total) line_total_sum_e1f61696, t1.order_id order_id_distinct_f93d50ab FROM v3_order_details t1 GROUP BY t1.status, t1.order_id ) SELECT order_details_0.status AS status, SUM(order_details_0.line_total_sum_e1f61696) AS total_revenue, - COUNT(DISTINCT order_details_0.order_id) AS order_count + COUNT(DISTINCT order_details_0.order_id_distinct_f93d50ab) AS order_count FROM order_details_0 GROUP BY order_details_0.status """, From eb4bf2235228485a7a88dd0615eda2cc4aadd4f5 Mon Sep 17 00:00:00 2001 From: Yian Shang Date: Sun, 24 May 2026 19:02:06 -0700 Subject: [PATCH 09/11] Fix --- .../construction/build_v3/measures.py | 51 +++++-------------- .../build_v3/measures_sql_test.py | 48 +++++------------ 2 files changed, 28 insertions(+), 71 deletions(-) diff --git a/datajunction-server/datajunction_server/construction/build_v3/measures.py b/datajunction-server/datajunction_server/construction/build_v3/measures.py index 4db45aead..3108066c6 100644 --- a/datajunction-server/datajunction_server/construction/build_v3/measures.py +++ b/datajunction-server/datajunction_server/construction/build_v3/measures.py @@ -171,45 +171,22 @@ def register_limited_component( component_expressions: list[tuple[str, ast.Expression]], component_metadata: list[tuple[str, MetricComponent, Node]], ) -> None: - """Register a LIMITED-aggregability component (e.g. COUNT DISTINCT) - in the measures-SQL projection. - - The component's grain_alias (set by ``_make_component`` in - ``sql/decompose.py``) is the bare column for plain-column DISTINCT - and the hashed component name for complex expressions. - - When the two differ (plain-column case), the bare column is already - in the projection as a grain column, but the component's - hash-suffixed name is the canonical identity — it's what the - metric's persisted ``derived_expression`` carries, and v2 pre-agg - materialization writes the column under the hashed name. This - helper emits an additional `` AS `` - projection so the hashed reference resolves against the live - measures SQL output, and routes ``component_aliases`` at the - hashed name so downstream combiner rewriters (metrics.py, - cube_matcher) stay consistent. - - Both the merged-grain-group path and the per-grain-group path in - ``build_grain_group_sql`` MUST go through this helper so the - contract stays in one place. + """Register a LIMITED component (e.g. ``COUNT(DISTINCT)``). + + For plain-column DISTINCT, ``grain_alias`` is the bare column and + differs from ``component.name`` (the hashed identity). Emit an + extra `` AS `` projection so the hashed name + is addressable alongside the bare grain column, and route + ``component_aliases`` at the hashed name so combiner rewriters + stay consistent. Both call sites in ``build_grain_group_sql`` must + use this helper to keep the contract in one place. """ - # LIMITED-no-aggregation components currently always have - # ``merge=None`` (DISTINCT can't be pre-aggregated, see - # decompose.py:1199). Downstream re-aggregation paths - # (combiners.py:_build_grain_group_from_preagg_table) rely on - # ``merge_func is None`` to skip wrapping the projection in a - # merge function — if a future LIMITED variant ever sets a non- - # None merge, that path would silently wrap the merge over the - # bare grain column, producing wrong SQL like - # ``HLL_UNION_AGG(account_id)``. Fail loud here so the gap is - # discovered at the source rather than as a wrong-result bug. + # The preagg-read path assumes LIMITED.merge is None to skip + # re-aggregation; a future LIMITED+merge variant would wrap the + # merge over the bare grain column and produce wrong SQL. assert component.merge is None, ( - f"LIMITED-aggregability component {component.name!r} has " - f"merge={component.merge!r}; the downstream pre-agg " - f"combiner path assumes LIMITED.merge is None to avoid " - f"wrapping a re-aggregation over a non-component column. " - f"Update both _build_grain_group_from_preagg_table and " - f"this helper if you're adding a LIMITED-with-merge variant." + f"LIMITED component {component.name!r} has merge={component.merge!r}; " + f"update the preagg-read path in combiners.py before adding this." ) grain_alias = component.grain_alias or component.name if grain_alias != component.name: diff --git a/datajunction-server/tests/construction/build_v3/measures_sql_test.py b/datajunction-server/tests/construction/build_v3/measures_sql_test.py index 10494ce9e..2a931a7e1 100644 --- a/datajunction-server/tests/construction/build_v3/measures_sql_test.py +++ b/datajunction-server/tests/construction/build_v3/measures_sql_test.py @@ -9842,18 +9842,11 @@ async def test_count_distinct_plain_column_projects_hashed_alias( self, client_with_build_v3, ): - """``COUNT(DISTINCT plain_col)`` measures-SQL output must - project the column under BOTH the bare grain name AND the - hash-suffixed component identity (`` AS _distinct_``). - - The metric's persisted ``derived_expression``, the v2 pre-agg - materialization column, and the cube column all reference the - hashed name as the canonical identity. Without the extra - projection, downstream consumers reading the live measures SQL - output can't resolve the column the ``derived_expression`` - references. The combiner in the - ``/sql/measures/v3`` response must also reference the hashed - name for consistency with the persisted expression. + """For ``COUNT(DISTINCT plain_col)``, the measures SQL must + project the column under BOTH the bare grain name and the + hashed identity, AND the combiner and persisted + ``derived_expression`` must reference the hashed identity — + otherwise consumers can't resolve the column. """ client = client_with_build_v3 resp = await client.post( @@ -9953,17 +9946,11 @@ async def test_count_distinct_in_merged_grain_group_projects_hashed_alias( self, client_with_build_v3, ): - """Same contract as the single-metric case must hold in the - ``is_merged=True`` grain-group path (multiple metrics with - mixed aggregabilities combined into one measures query). - - Regression: the two LIMITED-component branches in - ``build_grain_group_sql`` were independent — fixing only the - non-merged path left the merged path emitting a bare-column - projection + bare-column combiner, breaking consistency for - cross-fact / multi-metric requests. Both branches now route - through ``register_limited_component``; this test pins both - at once. + """Same contract in the ``is_merged=True`` path (mixed- + aggregability metrics in one query). Pins that both LIMITED- + component branches in ``build_grain_group_sql`` route through + ``register_limited_component`` — the merged branch used to + skip the hashed projection independently. """ client = client_with_build_v3 resp = await client.post( @@ -10112,17 +10099,10 @@ async def test_count_distinct_dedup_across_shared_components( self, client_with_build_v3, ): - """When two metrics share the SAME LIMITED component (same - ``component.name``), ``register_limited_component`` must be - invoked exactly once — emitting a single `` AS `` - projection. Duplicate emissions would produce SQL with two - identically-named columns, which Spark/Druid reject. - - The dedup is currently held by ``seen_components`` in - ``build_grain_group_sql``; this test pins that contract end- - to-end. Two metrics with the *same* ``COUNT(DISTINCT - account_id)`` body share the same component hash, so the - helper sees the component twice but only emits once. + """Two metrics that decompose to the same LIMITED component + must produce a single `` AS `` projection, not + two — duplicate column names break SQL execution. Dedup is + held by ``seen_components`` in ``build_grain_group_sql``. """ client = client_with_build_v3 resp = await client.post( From dcc4d0181d2722b7c7d10c4d931d96f5598c2eba Mon Sep 17 00:00:00 2001 From: Yian Shang Date: Sun, 24 May 2026 21:33:06 -0700 Subject: [PATCH 10/11] Fix tests --- .../tests/construction/build_v3/djsql_test.py | 4 +-- .../build_v3/having_clause_test.py | 35 +++++++++++-------- .../build_v3/preagg_substitution_test.py | 26 +++++++------- .../build_v3/set_operations_test.py | 10 +++--- .../build_v3/test_self_join_sql_generation.py | 8 +++-- 5 files changed, 45 insertions(+), 38 deletions(-) diff --git a/datajunction-server/tests/construction/build_v3/djsql_test.py b/datajunction-server/tests/construction/build_v3/djsql_test.py index 5c1094afb..78a188ccb 100644 --- a/datajunction-server/tests/construction/build_v3/djsql_test.py +++ b/datajunction-server/tests/construction/build_v3/djsql_test.py @@ -144,12 +144,12 @@ async def test_derived_metric(self, client_with_build_v3): JOIN default.v3.order_items oi ON o.order_id = oi.order_id ), order_details_0 AS ( - SELECT t1.status, t1.order_id, SUM(t1.line_total) line_total_sum_e1f61696 + SELECT t1.status, t1.order_id, SUM(t1.line_total) line_total_sum_e1f61696, t1.order_id order_id_distinct_f93d50ab FROM v3_order_details t1 GROUP BY t1.status, t1.order_id ) SELECT order_details_0.status AS status, - SUM(order_details_0.line_total_sum_e1f61696) / NULLIF(COUNT(DISTINCT order_details_0.order_id), 0) AS avg_order_value + SUM(order_details_0.line_total_sum_e1f61696) / NULLIF(COUNT(DISTINCT order_details_0.order_id_distinct_f93d50ab), 0) AS avg_order_value FROM order_details_0 GROUP BY order_details_0.status """, diff --git a/datajunction-server/tests/construction/build_v3/having_clause_test.py b/datajunction-server/tests/construction/build_v3/having_clause_test.py index 8bde5601f..27647eaac 100644 --- a/datajunction-server/tests/construction/build_v3/having_clause_test.py +++ b/datajunction-server/tests/construction/build_v3/having_clause_test.py @@ -99,17 +99,18 @@ async def test_metric_filter_with_less_than( order_details_0 AS ( SELECT t2.category, - t1.order_id + t1.order_id, + t1.order_id order_id_distinct_f93d50ab FROM v3_order_details t1 LEFT OUTER JOIN v3_product t2 ON t1.product_id = t2.product_id GROUP BY t2.category, t1.order_id ) SELECT order_details_0.category AS category, - COUNT( DISTINCT order_details_0.order_id) AS order_count + COUNT( DISTINCT order_details_0.order_id_distinct_f93d50ab) AS order_count FROM order_details_0 GROUP BY order_details_0.category - HAVING COUNT( DISTINCT order_details_0.order_id) < 100 + HAVING COUNT( DISTINCT order_details_0.order_id_distinct_f93d50ab) < 100 """, ) @@ -194,17 +195,18 @@ async def test_metric_filter_with_equality( order_details_0 AS ( SELECT t2.category, - t1.order_id + t1.order_id, + t1.order_id order_id_distinct_f93d50ab FROM v3_order_details t1 LEFT OUTER JOIN v3_product t2 ON t1.product_id = t2.product_id GROUP BY t2.category, t1.order_id ) SELECT order_details_0.category AS category, - COUNT( DISTINCT order_details_0.order_id) AS order_count + COUNT( DISTINCT order_details_0.order_id_distinct_f93d50ab) AS order_count FROM order_details_0 GROUP BY order_details_0.category - HAVING COUNT(DISTINCT order_details_0.order_id) = 50 + HAVING COUNT(DISTINCT order_details_0.order_id_distinct_f93d50ab) = 50 """, ) @@ -257,7 +259,8 @@ async def test_multiple_metric_filters_and( SELECT t2.category, t1.order_id, - SUM(t1.line_total) line_total_sum_e1f61696 + SUM(t1.line_total) line_total_sum_e1f61696, + t1.order_id order_id_distinct_f93d50ab FROM v3_order_details t1 LEFT OUTER JOIN v3_product t2 ON t1.product_id = t2.product_id GROUP BY t2.category, t1.order_id @@ -265,12 +268,12 @@ async def test_multiple_metric_filters_and( SELECT order_details_0.category AS category, SUM(order_details_0.line_total_sum_e1f61696) AS total_revenue, - COUNT( DISTINCT order_details_0.order_id) AS order_count + COUNT( DISTINCT order_details_0.order_id_distinct_f93d50ab) AS order_count FROM order_details_0 GROUP BY order_details_0.category HAVING SUM(order_details_0.line_total_sum_e1f61696) > 5000 - AND COUNT( DISTINCT order_details_0.order_id) > 20 + AND COUNT( DISTINCT order_details_0.order_id_distinct_f93d50ab) > 20 """, ) @@ -436,7 +439,8 @@ async def test_complex_mixed_filters( t2.category, t1.status, t1.order_id, - SUM(t1.line_total) line_total_sum_e1f61696 + SUM(t1.line_total) line_total_sum_e1f61696, + t1.order_id order_id_distinct_f93d50ab FROM v3_order_details t1 LEFT OUTER JOIN v3_product t2 ON t1.product_id = t2.product_id WHERE t2.category IN ('Electronics', 'Clothing') @@ -446,11 +450,11 @@ async def test_complex_mixed_filters( order_details_0.category AS category, order_details_0.status AS status, SUM(order_details_0.line_total_sum_e1f61696) AS total_revenue, - COUNT(DISTINCT order_details_0.order_id) AS order_count + COUNT(DISTINCT order_details_0.order_id_distinct_f93d50ab) AS order_count FROM order_details_0 WHERE order_details_0.category IN ('Electronics', 'Clothing') AND order_details_0.status = 'completed' GROUP BY order_details_0.category, order_details_0.status - HAVING SUM(order_details_0.line_total_sum_e1f61696) > 10000 AND COUNT( DISTINCT order_details_0.order_id) >= 50 + HAVING SUM(order_details_0.line_total_sum_e1f61696) > 10000 AND COUNT( DISTINCT order_details_0.order_id_distinct_f93d50ab) >= 50 """, ) @@ -501,16 +505,17 @@ async def test_derived_metric_filter( SELECT t2.category, t1.order_id, - SUM(t1.line_total) line_total_sum_e1f61696 + SUM(t1.line_total) line_total_sum_e1f61696, + t1.order_id order_id_distinct_f93d50ab FROM v3_order_details t1 LEFT OUTER JOIN v3_product t2 ON t1.product_id = t2.product_id GROUP BY t2.category, t1.order_id ) SELECT order_details_0.category AS category, - SUM(order_details_0.line_total_sum_e1f61696) / NULLIF(COUNT( DISTINCT order_details_0.order_id), 0) AS avg_order_value + SUM(order_details_0.line_total_sum_e1f61696) / NULLIF(COUNT( DISTINCT order_details_0.order_id_distinct_f93d50ab), 0) AS avg_order_value FROM order_details_0 GROUP BY order_details_0.category - HAVING SUM(order_details_0.line_total_sum_e1f61696) / NULLIF(COUNT( DISTINCT order_details_0.order_id), 0) > 100 + HAVING SUM(order_details_0.line_total_sum_e1f61696) / NULLIF(COUNT( DISTINCT order_details_0.order_id_distinct_f93d50ab), 0) > 100 """, ) diff --git a/datajunction-server/tests/construction/build_v3/preagg_substitution_test.py b/datajunction-server/tests/construction/build_v3/preagg_substitution_test.py index 1849dcf8a..a6874885e 100644 --- a/datajunction-server/tests/construction/build_v3/preagg_substitution_test.py +++ b/datajunction-server/tests/construction/build_v3/preagg_substitution_test.py @@ -145,7 +145,7 @@ async def test_limited_metric_no_preagg(self, client_with_build_v3): FROM default.v3.orders o JOIN default.v3.order_items oi ON o.order_id = oi.order_id ) - SELECT t1.status, t1.order_id + SELECT t1.status, t1.order_id, t1.order_id order_id_distinct_f93d50ab FROM v3_order_details t1 GROUP BY t1.status, t1.order_id """, @@ -173,12 +173,12 @@ async def test_limited_metric_no_preagg(self, client_with_build_v3): JOIN default.v3.order_items oi ON o.order_id = oi.order_id ), order_details_0 AS ( - SELECT t1.status, t1.order_id + SELECT t1.status, t1.order_id, t1.order_id order_id_distinct_f93d50ab FROM v3_order_details t1 GROUP BY t1.status, t1.order_id ) SELECT order_details_0.status AS status, - COUNT(DISTINCT order_details_0.order_id) AS order_count + COUNT(DISTINCT order_details_0.order_id_distinct_f93d50ab) AS order_count FROM order_details_0 GROUP BY order_details_0.status """, @@ -218,7 +218,7 @@ async def test_derived_metric_no_preagg(self, client_with_build_v3): FROM default.v3.orders o JOIN default.v3.order_items oi ON o.order_id = oi.order_id ) - SELECT t1.status, t1.order_id, SUM(t1.line_total) line_total_sum_e1f61696 + SELECT t1.status, t1.order_id, SUM(t1.line_total) line_total_sum_e1f61696, t1.order_id order_id_distinct_f93d50ab FROM v3_order_details t1 GROUP BY t1.status, t1.order_id """, @@ -246,12 +246,12 @@ async def test_derived_metric_no_preagg(self, client_with_build_v3): JOIN default.v3.order_items oi ON o.order_id = oi.order_id ), order_details_0 AS ( - SELECT t1.status, t1.order_id, SUM(t1.line_total) line_total_sum_e1f61696 + SELECT t1.status, t1.order_id, SUM(t1.line_total) line_total_sum_e1f61696, t1.order_id order_id_distinct_f93d50ab FROM v3_order_details t1 GROUP BY t1.status, t1.order_id ) SELECT order_details_0.status AS status, - SUM(order_details_0.line_total_sum_e1f61696) / NULLIF(COUNT(DISTINCT order_details_0.order_id), 0) AS avg_order_value + SUM(order_details_0.line_total_sum_e1f61696) / NULLIF(COUNT(DISTINCT order_details_0.order_id_distinct_f93d50ab), 0) AS avg_order_value FROM order_details_0 GROUP BY order_details_0.status """, @@ -386,12 +386,12 @@ async def test_derived_metric_with_preagg(self, client_with_build_v3): WITH order_details_0 AS ( SELECT status, SUM(line_total_sum_e1f61696) line_total_sum_e1f61696, - order_id + order_id_distinct_f93d50ab FROM warehouse.preaggs.v3_order_metrics - GROUP BY status, order_id + GROUP BY status, order_id_distinct_f93d50ab ) SELECT order_details_0.status AS status, - SUM(order_details_0.line_total_sum_e1f61696) / NULLIF(COUNT(DISTINCT order_details_0.order_id), 0) AS avg_order_value + SUM(order_details_0.line_total_sum_e1f61696) / NULLIF(COUNT(DISTINCT order_details_0.order_id_distinct_f93d50ab), 0) AS avg_order_value FROM order_details_0 GROUP BY order_details_0.status """, @@ -574,13 +574,13 @@ async def test_cross_fact_with_partial_preagg(self, client_with_build_v3): GROUP BY customer_id ), page_views_enriched_0 AS ( - SELECT t1.customer_id + SELECT t1.customer_id, t1.customer_id customer_id_distinct_dd4be7a5 FROM v3_page_views_enriched t1 GROUP BY t1.customer_id ) SELECT COALESCE(order_details_0.customer_id, page_views_enriched_0.customer_id) AS customer_id, - SUM(order_details_0.line_total_sum_e1f61696) / NULLIF(COUNT( DISTINCT page_views_enriched_0.customer_id), 0) AS revenue_per_visitor + SUM(order_details_0.line_total_sum_e1f61696) / NULLIF(COUNT( DISTINCT page_views_enriched_0.customer_id_distinct_dd4be7a5), 0) AS revenue_per_visitor FROM order_details_0 FULL OUTER JOIN page_views_enriched_0 ON order_details_0.customer_id = page_views_enriched_0.customer_id GROUP BY 1 """, @@ -655,12 +655,12 @@ async def test_cross_fact_with_full_preagg_coverage(self, client_with_build_v3): GROUP BY customer_id ), page_views_enriched_0 AS ( - SELECT t1.customer_id + SELECT t1.customer_id, t1.customer_id customer_id_distinct_dd4be7a5 FROM v3_page_views_enriched t1 GROUP BY t1.customer_id ) SELECT COALESCE(order_details_0.customer_id, page_views_enriched_0.customer_id) AS customer_id, - SUM(order_details_0.line_total_sum_e1f61696) / NULLIF(COUNT( DISTINCT page_views_enriched_0.customer_id), 0) AS revenue_per_visitor + SUM(order_details_0.line_total_sum_e1f61696) / NULLIF(COUNT( DISTINCT page_views_enriched_0.customer_id_distinct_dd4be7a5), 0) AS revenue_per_visitor FROM order_details_0 FULL OUTER JOIN page_views_enriched_0 ON order_details_0.customer_id = page_views_enriched_0.customer_id GROUP BY 1 """, diff --git a/datajunction-server/tests/construction/build_v3/set_operations_test.py b/datajunction-server/tests/construction/build_v3/set_operations_test.py index 5097e3480..66a601ed9 100644 --- a/datajunction-server/tests/construction/build_v3/set_operations_test.py +++ b/datajunction-server/tests/construction/build_v3/set_operations_test.py @@ -89,12 +89,12 @@ async def test_union_all_transform_metric_generates_sql( WHERE status = 'shipped' ), orders_unified_0 AS ( - SELECT t1.status, t1.order_id + SELECT t1.status, t1.order_id, t1.order_id order_id_distinct_a853fcda FROM v3_orders_unified t1 GROUP BY t1.status, t1.order_id ) SELECT orders_unified_0.status AS status, - COUNT(DISTINCT orders_unified_0.order_id) AS unified_order_count + COUNT(DISTINCT orders_unified_0.order_id_distinct_a853fcda) AS unified_order_count FROM orders_unified_0 GROUP BY orders_unified_0.status """, @@ -138,12 +138,12 @@ async def test_union_transform_filter_pushed_into_primary_arm_only( WHERE status = 'shipped' ), orders_unified_0 AS ( - SELECT t1.status, t1.order_id + SELECT t1.status, t1.order_id, t1.order_id order_id_distinct_a853fcda FROM v3_orders_unified t1 GROUP BY t1.status, t1.order_id ) SELECT orders_unified_0.status AS status, - COUNT(DISTINCT orders_unified_0.order_id) AS unified_order_count + COUNT(DISTINCT orders_unified_0.order_id_distinct_a853fcda) AS unified_order_count FROM orders_unified_0 WHERE orders_unified_0.status = 'completed' GROUP BY orders_unified_0.status @@ -276,7 +276,7 @@ async def test_filter_only_dim_link_on_one_arm_source_only( FROM default.v3.orders_us WHERE src_orders_us.order_date >= 20260101 ) - SELECT t1.status, t1.order_id + SELECT t1.status, t1.order_id, t1.order_id order_id_distinct_3d2cb4c8 FROM v3_orders_global t1 GROUP BY t1.status, t1.order_id """, diff --git a/datajunction-server/tests/construction/build_v3/test_self_join_sql_generation.py b/datajunction-server/tests/construction/build_v3/test_self_join_sql_generation.py index d86e1cb73..9ece9f096 100644 --- a/datajunction-server/tests/construction/build_v3/test_self_join_sql_generation.py +++ b/datajunction-server/tests/construction/build_v3/test_self_join_sql_generation.py @@ -92,7 +92,8 @@ async def test_direct_self_join_employee_manager( ) SELECT t2.employee_name employee_name_manager, - t1.employee_id + t1.employee_id, + t1.employee_id employee_id_distinct_d13d9843 FROM default_employee t1 LEFT OUTER JOIN default_employee t2 ON t1.manager_employee_id = t2.employee_id GROUP BY @@ -125,7 +126,8 @@ async def test_direct_self_join_employee_manager( employee_0 AS ( SELECT t2.employee_name employee_name_manager, - t1.employee_id + t1.employee_id, + t1.employee_id employee_id_distinct_d13d9843 FROM default_employee t1 LEFT OUTER JOIN default_employee t2 ON t1.manager_employee_id = t2.employee_id GROUP BY @@ -133,7 +135,7 @@ async def test_direct_self_join_employee_manager( ) SELECT employee_0.employee_name_manager AS employee_name_manager, - COUNT(DISTINCT employee_0.employee_id) AS employee_count + COUNT(DISTINCT employee_0.employee_id_distinct_d13d9843) AS employee_count FROM employee_0 GROUP BY employee_0.employee_name_manager From 3ab19cb6277526a9693a02d15256ec22fdafd91b Mon Sep 17 00:00:00 2001 From: Yian Shang Date: Sun, 24 May 2026 22:27:00 -0700 Subject: [PATCH 11/11] Add test for legacy pre-agg coverage --- .../build_v3/preagg_substitution_test.py | 91 +++++++++++++++++++ 1 file changed, 91 insertions(+) diff --git a/datajunction-server/tests/construction/build_v3/preagg_substitution_test.py b/datajunction-server/tests/construction/build_v3/preagg_substitution_test.py index a6874885e..77e91dc25 100644 --- a/datajunction-server/tests/construction/build_v3/preagg_substitution_test.py +++ b/datajunction-server/tests/construction/build_v3/preagg_substitution_test.py @@ -913,3 +913,94 @@ def test_deduplicates_repeated_components(self): ) # Only one component should appear in output despite two in input assert len(result.components) == 1 + + def test_aliases_legacy_bare_measure_col_to_hashed_component_name(self): + """build_grain_group_from_preagg aliases the pre-agg's measure + column to ``component.name`` when they differ. + + Legacy v2 pre-aggs (materialized before plain-column + COUNT(DISTINCT) projected the hashed identity) carry the + column under the bare grain name (e.g. ``account_id``) while + the component identity is hashed (``account_id_distinct_``). + ``component_aliases`` and the CTE's output column must be the + hashed name so the persisted ``derived_expression`` resolves — + the pre-agg path emits `` AS ``. + """ + from datajunction_server.database.availabilitystate import AvailabilityState + + node = self._make_node() + # Plain-column COUNT(DISTINCT account_id) component: name is + # hashed identity, expression is the bare column, LIMITED with + # no aggregation, and merge is None. + component = MetricComponent( + name="account_id_distinct_abc123", + expression="account_id", + aggregation=None, + merge=None, + rule=AggregationRule(type=Aggregability.LIMITED), + grain_alias="account_id", + ) + grain_group = GrainGroup( + parent_node=node, + aggregability=Aggregability.LIMITED, + grain_columns=[], + components=[(node, component)], + ) + + # Legacy pre-agg stores the measure under the BARE grain name. + # expr_hash matches the component so the measure is found. + expr_hash = compute_expression_hash("account_id") + measure = PreAggMeasure( + name="account_id", # legacy bare-named column + expression="account_id", + aggregation=None, + merge=None, + rule=AggregationRule(type=Aggregability.LIMITED), + expr_hash=expr_hash, + ) + avail = AvailabilityState( + catalog="wh", + schema_="preaggs", + table="legacy_tbl", + valid_through_ts=99999, + ) + preagg = PreAggregation( + node_revision_id=1, + grain_columns=[], + measures=[measure], + sql="SELECT 1", + grain_group_hash="abc", + preagg_hash="def", + availability=avail, + ) + + result = build_grain_group_from_preagg( + self._make_ctx(), + grain_group, + preagg, + resolved_dimensions=[], + components_per_metric={}, + ) + # component_aliases routes the hashed identity at component.name + # (matching the live-build path) so downstream combiner + # rewriters reference the hashed column consistently. + assert result.component_aliases == { + "account_id_distinct_abc123": "account_id_distinct_abc123", + } + # The CTE output exposes ``account_id_distinct_abc123`` even + # though the pre-agg physically stores ``account_id``. + column_names = {c.name for c in result.columns} + assert "account_id_distinct_abc123" in column_names + assert "account_id" not in column_names + # The generated SQL selects the bare physical ``account_id`` + # column from the legacy pre-agg and aliases it to the hashed + # identity. ``assert_sql_equal`` ignores whitespace and the + # optional ``AS`` keyword in alias syntax. + assert_sql_equal( + str(result.query), + """ + SELECT account_id account_id_distinct_abc123 + FROM wh.preaggs.legacy_tbl + GROUP BY account_id + """, + )