Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 21 additions & 5 deletions datajunction-server/datajunction_server/construction/build_v2.py
Original file line number Diff line number Diff line change
Expand Up @@ -1793,10 +1793,15 @@ def convert_to_cte(
"""
Convert the query to a CTE that can be used by the outer query
"""
# Move all the CTEs used by the inner query to the outer query
# Move all the CTEs used by the inner query to the outer query,
# deduplicating by CTE name to avoid duplicate CTEs
existing_cte_names = {cte.alias_or_name.identifier() for cte in outer_query.ctes}
for cte in inner_query.ctes:
cte.set_parent(outer_query, parent_key="ctes")
outer_query.ctes.extend(inner_query.ctes)
cte_identifier = cte.alias_or_name.identifier()
if cte_identifier not in existing_cte_names: # pragma: no branch
cte.set_parent(outer_query, parent_key="ctes")
outer_query.ctes.append(cte)
existing_cte_names.add(cte_identifier)
inner_query.ctes = []

# Convert the dimension node query to a CTE
Expand Down Expand Up @@ -2136,10 +2141,21 @@ async def build_ast(
# Apply pushdown filters if possible
apply_filters_to_node(node, query, to_filter_asts(filters))

# Add new CTEs while deduplicating by CTE name
existing_cte_names = {cte.alias_or_name.identifier() for cte in query.ctes}
for cte in new_cte_mapping.values():
query.ctes.extend(cte.ctes)
# Add nested CTEs from this CTE, deduplicating
for nested_cte in cte.ctes: # pragma: no branch
nested_cte_id = nested_cte.alias_or_name.identifier()
if nested_cte_id not in existing_cte_names: # pragma: no branch
query.ctes.append(nested_cte)
existing_cte_names.add(nested_cte_id)
cte.ctes = []
query.ctes.append(cte)
# Add the CTE itself if not already present
cte_id = cte.alias_or_name.identifier()
if cte_id not in existing_cte_names: # pragma: no branch
query.ctes.append(cte)
existing_cte_names.add(cte_id)
query.select.add_aliases_to_unnamed_columns()
ctes_mapping.update(new_cte_mapping)
return query
Expand Down
Loading