Skip to content

Commit 7777138

Browse files
eakmanrqizeigerman
authored andcommitted
fix: always set column order scd type 2 (#2155)
1 parent 0a893bc commit 7777138

File tree

3 files changed

+127
-36
lines changed

3 files changed

+127
-36
lines changed

sqlmesh/core/engine_adapter/base.py

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1278,6 +1278,7 @@ def _scd_type_2(
12781278
select_source_columns: t.List[t.Union[str, exp.Alias]] = [
12791279
col for col in unmanaged_columns if col != updated_at_name
12801280
]
1281+
table_columns = [exp.column(c, quoted=True) for c in columns_to_types]
12811282
if updated_at_name:
12821283
select_source_columns.append(
12831284
exp.cast(updated_at_name, time_data_type).as_(updated_at_name)
@@ -1393,14 +1394,14 @@ def _scd_type_2(
13931394
# Historical Records that Do Not Change
13941395
.with_(
13951396
"static",
1396-
self._select_columns(columns_to_types)
1397+
exp.select(*table_columns)
13971398
.from_(target_table)
13981399
.where(f"{valid_to_name} IS NOT NULL"),
13991400
)
14001401
# Latest Records that can be updated
14011402
.with_(
14021403
"latest",
1403-
self._select_columns(columns_to_types)
1404+
exp.select(*table_columns)
14041405
.from_(target_table)
14051406
.where(f"{valid_to_name} IS NULL"),
14061407
)
@@ -1520,14 +1521,14 @@ def _scd_type_2(
15201521
.from_("joined")
15211522
.where(updated_row_filter),
15221523
)
1523-
.select("*")
1524+
.select(*table_columns)
15241525
.from_("static")
15251526
.union(
1526-
"SELECT * FROM updated_rows",
1527+
exp.select(*table_columns).from_("updated_rows"),
15271528
distinct=False,
15281529
)
15291530
.union(
1530-
"SELECT * FROM inserted_rows",
1531+
exp.select(*table_columns).from_("inserted_rows"),
15311532
distinct=False,
15321533
)
15331534
)

tests/core/engine_adapter/test_base.py

Lines changed: 102 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -1128,15 +1128,30 @@ def test_scd_type_2_by_time(make_mocked_engine_adapter: t.Callable):
11281128
"test_updated_at" > "t_test_updated_at"
11291129
)
11301130
SELECT
1131-
*
1131+
"id",
1132+
"name",
1133+
"price",
1134+
"test_updated_at",
1135+
"test_valid_from",
1136+
"test_valid_to"
11321137
FROM "static"
11331138
UNION ALL
11341139
SELECT
1135-
*
1140+
"id",
1141+
"name",
1142+
"price",
1143+
"test_updated_at",
1144+
"test_valid_from",
1145+
"test_valid_to"
11361146
FROM "updated_rows"
11371147
UNION ALL
11381148
SELECT
1139-
*
1149+
"id",
1150+
"name",
1151+
"price",
1152+
"test_updated_at",
1153+
"test_valid_from",
1154+
"test_valid_to"
11401155
FROM "inserted_rows"
11411156
"""
11421157
).sql()
@@ -1300,15 +1315,30 @@ def test_scd_type_2_by_time_no_invalidate_hard_deletes(make_mocked_engine_adapte
13001315
"test_updated_at" > "t_test_updated_at"
13011316
)
13021317
SELECT
1303-
*
1318+
"id",
1319+
"name",
1320+
"price",
1321+
"test_updated_at",
1322+
"test_valid_from",
1323+
"test_valid_to"
13041324
FROM "static"
13051325
UNION ALL
13061326
SELECT
1307-
*
1327+
"id",
1328+
"name",
1329+
"price",
1330+
"test_updated_at",
1331+
"test_valid_from",
1332+
"test_valid_to"
13081333
FROM "updated_rows"
13091334
UNION ALL
13101335
SELECT
1311-
*
1336+
"id",
1337+
"name",
1338+
"price",
1339+
"test_updated_at",
1340+
"test_valid_from",
1341+
"test_valid_to"
13121342
FROM "inserted_rows"
13131343
"""
13141344
).sql()
@@ -1500,15 +1530,33 @@ def test_merge_scd_type_2_pandas(make_mocked_engine_adapter: t.Callable):
15001530
"test_updated_at" > "t_test_updated_at"
15011531
)
15021532
SELECT
1503-
*
1533+
"id1",
1534+
"id2",
1535+
"name",
1536+
"price",
1537+
"test_updated_at",
1538+
"test_valid_from",
1539+
"test_valid_to"
15041540
FROM "static"
15051541
UNION ALL
15061542
SELECT
1507-
*
1543+
"id1",
1544+
"id2",
1545+
"name",
1546+
"price",
1547+
"test_updated_at",
1548+
"test_valid_from",
1549+
"test_valid_to"
15081550
FROM "updated_rows"
15091551
UNION ALL
15101552
SELECT
1511-
*
1553+
"id1",
1554+
"id2",
1555+
"name",
1556+
"price",
1557+
"test_updated_at",
1558+
"test_valid_from",
1559+
"test_valid_to"
15121560
FROM "inserted_rows"
15131561
"""
15141562
).sql()
@@ -1686,15 +1734,27 @@ def test_scd_type_2_by_column(make_mocked_engine_adapter: t.Callable):
16861734
)
16871735
)
16881736
SELECT
1689-
*
1737+
"id",
1738+
"name",
1739+
"price",
1740+
"test_valid_from",
1741+
"test_valid_to"
16901742
FROM "static"
16911743
UNION ALL
16921744
SELECT
1693-
*
1745+
"id",
1746+
"name",
1747+
"price",
1748+
"test_valid_from",
1749+
"test_valid_to"
16941750
FROM "updated_rows"
16951751
UNION ALL
16961752
SELECT
1697-
*
1753+
"id",
1754+
"name",
1755+
"price",
1756+
"test_valid_from",
1757+
"test_valid_to"
16981758
FROM "inserted_rows"
16991759
"""
17001760
).sql()
@@ -1886,15 +1946,27 @@ def test_scd_type_2_by_column_star_check(make_mocked_engine_adapter: t.Callable)
18861946
)
18871947
)
18881948
SELECT
1889-
*
1949+
"id",
1950+
"name",
1951+
"price",
1952+
"test_valid_from",
1953+
"test_valid_to"
18901954
FROM "static"
18911955
UNION ALL
18921956
SELECT
1893-
*
1957+
"id",
1958+
"name",
1959+
"price",
1960+
"test_valid_from",
1961+
"test_valid_to"
18941962
FROM "updated_rows"
18951963
UNION ALL
18961964
SELECT
1897-
*
1965+
"id",
1966+
"name",
1967+
"price",
1968+
"test_valid_from",
1969+
"test_valid_to"
18981970
FROM "inserted_rows"
18991971
"""
19001972
).sql()
@@ -2071,15 +2143,27 @@ def test_scd_type_2_by_column_no_invalidate_hard_deletes(make_mocked_engine_adap
20712143
)
20722144
)
20732145
SELECT
2074-
*
2146+
"id",
2147+
"name",
2148+
"price",
2149+
"test_valid_from",
2150+
"test_valid_to"
20752151
FROM "static"
20762152
UNION ALL
20772153
SELECT
2078-
*
2154+
"id",
2155+
"name",
2156+
"price",
2157+
"test_valid_from",
2158+
"test_valid_to"
20792159
FROM "updated_rows"
20802160
UNION ALL
20812161
SELECT
2082-
*
2162+
"id",
2163+
"name",
2164+
"price",
2165+
"test_valid_from",
2166+
"test_valid_to"
20832167
FROM "inserted_rows"
20842168
"""
20852169
).sql()

tests/core/engine_adapter/test_spark.py

Lines changed: 19 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -759,19 +759,25 @@ def check_table_exists(table_name: exp.Table) -> bool:
759759
`test_updated_at`,
760760
`test_valid_from`,
761761
`test_valid_to`
762-
FROM (
763-
SELECT
764-
*
765-
FROM `static`
766-
UNION ALL
767-
SELECT
768-
*
769-
FROM `updated_rows`
770-
UNION ALL
771-
SELECT
772-
*
773-
FROM `inserted_rows`
774-
) AS `_subquery`
762+
FROM `static`
763+
UNION ALL
764+
SELECT
765+
`id`,
766+
`name`,
767+
`price`,
768+
`test_updated_at`,
769+
`test_valid_from`,
770+
`test_valid_to`
771+
FROM `updated_rows`
772+
UNION ALL
773+
SELECT
774+
`id`,
775+
`name`,
776+
`price`,
777+
`test_updated_at`,
778+
`test_valid_from`,
779+
`test_valid_to`
780+
FROM `inserted_rows`
775781
""",
776782
dialect="spark",
777783
).sql(dialect="spark"),

0 commit comments

Comments
 (0)