From 65a4beb1051adeb14b5878af0725deb9151dd475 Mon Sep 17 00:00:00 2001 From: June Kim Date: Mon, 11 May 2026 20:30:28 -0700 Subject: [PATCH 1/2] Fix UNION column alias aggregation When the same alias appears in multiple UNION branches, aggregate all source columns into a list instead of overwriting with the last occurrence. Before: SELECT a.A as M FROM tab1 a UNION SELECT b.B as M FROM tab2 b columns_aliases = {"M": "tab2.B"} # Lost tab1.A After: columns_aliases = {"M": ["tab1.A", "tab2.B"]} Implementation: - Modified _Collector.add_alias to check for existing alias entries - When duplicate found, convert single value to list or append to list - Deduplicates identical sources (same alias pointing to same column) Fixes #401 --- sql_metadata/column_extractor.py | 15 ++++++++++++++- test/test_unions.py | 17 +++++++++++++++++ 2 files changed, 31 insertions(+), 1 deletion(-) diff --git a/sql_metadata/column_extractor.py b/sql_metadata/column_extractor.py index 5c3fee42..19ee4726 100644 --- a/sql_metadata/column_extractor.py +++ b/sql_metadata/column_extractor.py @@ -200,7 +200,20 @@ def add_alias(self, name: str, target: Any, clause: str) -> None: if clause: self.alias_dict.setdefault(clause, UniqueList()).append(name) if target is not None: - self.alias_map[name] = target + if name in self.alias_map: + # Alias already exists — aggregate targets into a list + existing = self.alias_map[name] + if isinstance(existing, list): + # Already a list — append new target + if target not in existing: + existing.append(target) + else: + # Single value — convert to list + if existing != target: + self.alias_map[name] = [existing, target] + else: + # First occurrence — store as-is + self.alias_map[name] = target # --------------------------------------------------------------------------- diff --git a/test/test_unions.py b/test/test_unions.py index f0506bed..c4ee03c4 100644 --- a/test/test_unions.py +++ b/test/test_unions.py @@ -1,6 +1,23 @@ from sql_metadata import Parser +def test_union_column_aliases(): + # https://github.com/macbre/sql-metadata/issues/401 + # When UNION combines queries with the same alias, + # columns_aliases should aggregate all source columns + query = """ + select a.A as M + from tab1 a + union all + select b.B as M + from tab2 b + """ + parser = Parser(query) + assert parser.columns_aliases == {"M": ["tab1.A", "tab2.B"]} + assert parser.columns == ["tab1.A", "tab2.B"] + assert parser.tables == ["tab1", "tab2"] + + def test_union(): query = """ SELECT From 4ca0cd614a1708768da47de58ae82a9334ab4387 Mon Sep 17 00:00:00 2001 From: June Kim Date: Wed, 13 May 2026 17:31:33 -0700 Subject: [PATCH 2/2] Fix nested-list and UniqueList TypeError in alias aggregation Per @collerek review: the prior add_alias logic produced nested lists when a scalar target was followed by a list target, and raised TypeError when both targets were UniqueList (unhashable in 'not in'). Adopt the reviewer's suggested implementation: normalize existing into a UniqueList, extend with target (list or scalar), collapse to scalar when len==1. Add regression tests for both cases. --- sql_metadata/column_extractor.py | 21 ++++++--------------- test/test_unions.py | 18 ++++++++++++++++++ 2 files changed, 24 insertions(+), 15 deletions(-) diff --git a/sql_metadata/column_extractor.py b/sql_metadata/column_extractor.py index 19ee4726..1efdb8d8 100644 --- a/sql_metadata/column_extractor.py +++ b/sql_metadata/column_extractor.py @@ -199,21 +199,12 @@ def add_alias(self, name: str, target: Any, clause: str) -> None: self.alias_names.append(name) if clause: self.alias_dict.setdefault(clause, UniqueList()).append(name) - if target is not None: - if name in self.alias_map: - # Alias already exists — aggregate targets into a list - existing = self.alias_map[name] - if isinstance(existing, list): - # Already a list — append new target - if target not in existing: - existing.append(target) - else: - # Single value — convert to list - if existing != target: - self.alias_map[name] = [existing, target] - else: - # First occurrence — store as-is - self.alias_map[name] = target + if target is None: + return + existing = self.alias_map.get(name, []) + merged = UniqueList(existing if isinstance(existing, list) else [existing]) + merged.extend(target if isinstance(target, list) else [target]) + self.alias_map[name] = merged if len(merged) > 1 else merged[0] # --------------------------------------------------------------------------- diff --git a/test/test_unions.py b/test/test_unions.py index c4ee03c4..47c74278 100644 --- a/test/test_unions.py +++ b/test/test_unions.py @@ -18,6 +18,24 @@ def test_union_column_aliases(): assert parser.tables == ["tab1", "tab2"] +def test_union_alias_with_expression_targets(): + # Regression: scalar then list-target must not nest + q1 = """ + SELECT a AS x FROM t1 + UNION ALL + SELECT b + c AS x FROM t2 + """ + assert Parser(q1).columns_aliases == {"x": ["a", "b", "c"]} + + # Regression: list then list-target must not raise TypeError on UniqueList + q2 = """ + SELECT a + b AS x FROM t1 + UNION ALL + SELECT c + d AS x FROM t2 + """ + assert Parser(q2).columns_aliases == {"x": ["a", "b", "c", "d"]} + + def test_union(): query = """ SELECT