Skip to content

Commit fcc082b

Browse files
committed
Fix: Support for unique key expressions for SCD2 by column models (#3075)
1 parent 0181bb5 commit fcc082b

File tree

2 files changed

+197
-4
lines changed

2 files changed

+197
-4
lines changed

sqlmesh/core/engine_adapter/base.py

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1429,11 +1429,12 @@ def remove_managed_columns(
14291429
)
14301430
row_value_check = exp.or_(*row_check_conditions)
14311431
unique_key_conditions = []
1432-
for col in unique_key:
1433-
t_col = col.copy()
1434-
t_col.this.set("this", f"t_{col.name}")
1432+
for key in unique_key:
1433+
t_key = key.copy()
1434+
for col in t_key.find_all(exp.Column):
1435+
col.this.set("this", f"t_{col.name}")
14351436
unique_key_conditions.extend(
1436-
[t_col.is_(exp.Null()).not_(), col.is_(exp.Null()).not_()]
1437+
[t_key.is_(exp.Null()).not_(), key.is_(exp.Null()).not_()]
14371438
)
14381439
unique_key_check = exp.and_(*unique_key_conditions)
14391440
# unique_key_check is saying "if the row is updated"

tests/core/engine_adapter/test_base.py

Lines changed: 192 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1719,6 +1719,198 @@ def test_scd_type_2_by_column(make_mocked_engine_adapter: t.Callable):
17191719
)
17201720

17211721

1722+
def test_scd_type_2_by_column_composite_key(make_mocked_engine_adapter: t.Callable):
1723+
adapter = make_mocked_engine_adapter(EngineAdapter)
1724+
1725+
adapter.scd_type_2_by_column(
1726+
target_table="target",
1727+
source_table=t.cast(exp.Select, parse_one("SELECT id_a, id_b, name, price FROM source")),
1728+
unique_key=[exp.func("CONCAT", exp.column("id_a"), exp.column("id_b"))],
1729+
valid_from_col=exp.column("test_VALID_from", quoted=True),
1730+
valid_to_col=exp.column("test_valid_to", quoted=True),
1731+
check_columns=[exp.column("name"), exp.column("price")],
1732+
columns_to_types={
1733+
"id_a": exp.DataType.build("VARCHAR"),
1734+
"id_b": exp.DataType.build("VARCHAR"),
1735+
"name": exp.DataType.build("VARCHAR"),
1736+
"price": exp.DataType.build("DOUBLE"),
1737+
"test_VALID_from": exp.DataType.build("TIMESTAMP"),
1738+
"test_valid_to": exp.DataType.build("TIMESTAMP"),
1739+
},
1740+
execution_time=datetime(2020, 1, 1, 0, 0, 0),
1741+
)
1742+
1743+
assert (
1744+
adapter.cursor.execute.call_args[0][0]
1745+
== parse_one(
1746+
"""
1747+
CREATE OR REPLACE TABLE "target" AS
1748+
WITH "source" AS (
1749+
SELECT DISTINCT ON (CONCAT("id_a", "id_b"))
1750+
TRUE AS "_exists",
1751+
"id_a",
1752+
"id_b",
1753+
"name",
1754+
"price",
1755+
FROM (
1756+
SELECT
1757+
"id_a",
1758+
"id_b",
1759+
"name",
1760+
"price"
1761+
FROM "source"
1762+
) AS "raw_source"
1763+
), "static" AS (
1764+
SELECT
1765+
"id_a",
1766+
"id_b",
1767+
"name",
1768+
"price",
1769+
"test_VALID_from",
1770+
"test_valid_to",
1771+
TRUE AS "_exists"
1772+
FROM "target"
1773+
WHERE
1774+
NOT "test_valid_to" IS NULL
1775+
), "latest" AS (
1776+
SELECT
1777+
"id_a",
1778+
"id_b",
1779+
"name",
1780+
"price",
1781+
"test_VALID_from",
1782+
"test_valid_to",
1783+
TRUE AS "_exists"
1784+
FROM "target"
1785+
WHERE
1786+
"test_valid_to" IS NULL
1787+
), "deleted" AS (
1788+
SELECT
1789+
"static"."id_a",
1790+
"static"."id_b",
1791+
"static"."name",
1792+
"static"."price",
1793+
"static"."test_VALID_from",
1794+
"static"."test_valid_to"
1795+
FROM "static"
1796+
LEFT JOIN "latest"
1797+
ON CONCAT("static"."id_a", "static"."id_b") = CONCAT("latest"."id_a", "latest"."id_b")
1798+
WHERE
1799+
"latest"."test_valid_to" IS NULL
1800+
), "latest_deleted" AS (
1801+
SELECT
1802+
TRUE AS "_exists",
1803+
CONCAT("id_a", "id_b") AS "_key0",
1804+
MAX("test_valid_to") AS "test_valid_to"
1805+
FROM "deleted"
1806+
GROUP BY
1807+
CONCAT("id_a", "id_b")
1808+
), "joined" AS (
1809+
SELECT
1810+
"source"."_exists" AS "_exists",
1811+
"latest"."id_a" AS "t_id_a",
1812+
"latest"."id_b" AS "t_id_b",
1813+
"latest"."name" AS "t_name",
1814+
"latest"."price" AS "t_price",
1815+
"latest"."test_VALID_from" AS "t_test_VALID_from",
1816+
"latest"."test_valid_to" AS "t_test_valid_to",
1817+
"source"."id_a" AS "id_a",
1818+
"source"."id_b" AS "id_b",
1819+
"source"."name" AS "name",
1820+
"source"."price" AS "price"
1821+
FROM "latest"
1822+
LEFT JOIN "source"
1823+
ON CONCAT("latest"."id_a", "latest"."id_b") = CONCAT("source"."id_a", "source"."id_b")
1824+
UNION ALL
1825+
SELECT
1826+
"source"."_exists" AS "_exists",
1827+
"latest"."id_a" AS "t_id_a",
1828+
"latest"."id_b" AS "t_id_b",
1829+
"latest"."name" AS "t_name",
1830+
"latest"."price" AS "t_price",
1831+
"latest"."test_VALID_from" AS "t_test_VALID_from",
1832+
"latest"."test_valid_to" AS "t_test_valid_to",
1833+
"source"."id_a" AS "id_a",
1834+
"source"."id_b" AS "id_b",
1835+
"source"."name" AS "name",
1836+
"source"."price" AS "price"
1837+
FROM "latest"
1838+
RIGHT JOIN "source"
1839+
ON CONCAT("latest"."id_a", "latest"."id_b") = CONCAT("source"."id_a", "source"."id_b")
1840+
WHERE
1841+
"latest"."_exists" IS NULL
1842+
), "updated_rows" AS (
1843+
SELECT
1844+
COALESCE("joined"."t_id_a", "joined"."id_a") AS "id_a",
1845+
COALESCE("joined"."t_id_b", "joined"."id_b") AS "id_b",
1846+
COALESCE("joined"."t_name", "joined"."name") AS "name",
1847+
COALESCE("joined"."t_price", "joined"."price") AS "price",
1848+
COALESCE("t_test_VALID_from", CAST('1970-01-01 00:00:00' AS TIMESTAMP)) AS "test_VALID_from",
1849+
CASE
1850+
WHEN "joined"."_exists" IS NULL
1851+
OR (
1852+
(
1853+
NOT CONCAT("t_id_a", "t_id_b") IS NULL AND NOT CONCAT("id_a", "id_b") IS NULL
1854+
)
1855+
AND (
1856+
"name" <> "t_name"
1857+
OR (
1858+
"t_name" IS NULL AND NOT "name" IS NULL
1859+
)
1860+
OR (
1861+
NOT "t_name" IS NULL AND "name" IS NULL
1862+
)
1863+
OR "price" <> "t_price"
1864+
OR (
1865+
"t_price" IS NULL AND NOT "price" IS NULL
1866+
)
1867+
OR (
1868+
NOT "t_price" IS NULL AND "price" IS NULL
1869+
)
1870+
)
1871+
)
1872+
THEN CAST('2020-01-01 00:00:00' AS TIMESTAMP)
1873+
ELSE "t_test_valid_to"
1874+
END AS "test_valid_to"
1875+
FROM "joined"
1876+
LEFT JOIN "latest_deleted"
1877+
ON CONCAT("joined"."id_a", "joined"."id_b") = "latest_deleted"."_key0"
1878+
), "inserted_rows" AS (
1879+
SELECT
1880+
"id_a",
1881+
"id_b",
1882+
"name",
1883+
"price",
1884+
CAST('2020-01-01 00:00:00' AS TIMESTAMP) AS "test_VALID_from",
1885+
CAST(NULL AS TIMESTAMP) AS "test_valid_to"
1886+
FROM "joined"
1887+
WHERE
1888+
(
1889+
NOT CONCAT("t_id_a", "t_id_b") IS NULL AND NOT CONCAT("id_a", "id_b") IS NULL
1890+
)
1891+
AND (
1892+
"name" <> "t_name"
1893+
OR (
1894+
"t_name" IS NULL AND NOT "name" IS NULL
1895+
)
1896+
OR (
1897+
NOT "t_name" IS NULL AND "name" IS NULL
1898+
)
1899+
OR "price" <> "t_price"
1900+
OR (
1901+
"t_price" IS NULL AND NOT "price" IS NULL
1902+
)
1903+
OR (
1904+
NOT "t_price" IS NULL AND "price" IS NULL
1905+
)
1906+
)
1907+
)
1908+
SELECT CAST("id_a" AS VARCHAR) AS "id_a", CAST("id_b" AS VARCHAR) AS "id_b", CAST("name" AS VARCHAR) AS "name", CAST("price" AS DOUBLE) AS "price", CAST("test_VALID_from" AS TIMESTAMP) AS "test_VALID_from", CAST("test_valid_to" AS TIMESTAMP) AS "test_valid_to" FROM (SELECT "id_a", "id_b", "name", "price", "test_VALID_from", "test_valid_to" FROM "static" UNION ALL SELECT "id_a", "id_b", "name", "price", "test_VALID_from", "test_valid_to" FROM "updated_rows" UNION ALL SELECT "id_a", "id_b", "name", "price", "test_VALID_from", "test_valid_to" FROM "inserted_rows") AS "_subquery"
1909+
"""
1910+
).sql()
1911+
)
1912+
1913+
17221914
def test_scd_type_2_truncate(make_mocked_engine_adapter: t.Callable):
17231915
adapter = make_mocked_engine_adapter(EngineAdapter)
17241916

0 commit comments

Comments
 (0)