diff --git a/sqlmesh/core/engine_adapter/fabric.py b/sqlmesh/core/engine_adapter/fabric.py index 8e2fb0e496..4968a65804 100644 --- a/sqlmesh/core/engine_adapter/fabric.py +++ b/sqlmesh/core/engine_adapter/fabric.py @@ -7,7 +7,6 @@ from functools import cached_property from sqlglot import exp from tenacity import retry, stop_after_attempt, wait_exponential, retry_if_result -from sqlmesh.core.engine_adapter.mixins import LogicalMergeMixin from sqlmesh.core.engine_adapter.mssql import MSSQLEngineAdapter from sqlmesh.core.engine_adapter.shared import ( InsertOverwriteStrategy, @@ -19,7 +18,7 @@ logger = logging.getLogger(__name__) -class FabricEngineAdapter(LogicalMergeMixin, MSSQLEngineAdapter): +class FabricEngineAdapter(MSSQLEngineAdapter): """ Adapter for Microsoft Fabric. """ diff --git a/tests/core/engine_adapter/test_fabric.py b/tests/core/engine_adapter/test_fabric.py index 6b80ef7337..c3b8c52ec5 100644 --- a/tests/core/engine_adapter/test_fabric.py +++ b/tests/core/engine_adapter/test_fabric.py @@ -2,6 +2,7 @@ import typing as t +import pandas as pd # noqa: TID253 import pytest from pytest_mock import MockerFixture from sqlglot import exp, parse_one @@ -88,3 +89,145 @@ def test_replace_query(adapter: FabricEngineAdapter, mocker: MockerFixture): "TRUNCATE TABLE [test_table];", "INSERT INTO [test_table] ([a]) SELECT [a] FROM [tbl];", ] + + +def test_merge_pandas( + make_mocked_engine_adapter: t.Callable, mocker: MockerFixture, make_temp_table_name: t.Callable +): + mocker.patch( + "sqlmesh.core.engine_adapter.fabric.FabricEngineAdapter.table_exists", + return_value=False, + ) + + adapter = make_mocked_engine_adapter(FabricEngineAdapter) + + temp_table_mock = mocker.patch("sqlmesh.core.engine_adapter.EngineAdapter._get_temp_table") + table_name = "target" + temp_table_id = "abcdefgh" + temp_table_mock.return_value = make_temp_table_name(table_name, temp_table_id) + + df = pd.DataFrame({"id": [1, 2, 3], "ts": [1, 2, 3], "val": [4, 5, 6]}) + + # 1 key + adapter.merge( + target_table=table_name, + source_table=df, + target_columns_to_types={ + "id": exp.DataType.build("int"), + "ts": exp.DataType.build("TIMESTAMP"), + "val": exp.DataType.build("int"), + }, + unique_key=[exp.to_identifier("id")], + ) + adapter._connection_pool.get().bulk_copy.assert_called_with( + f"__temp_target_{temp_table_id}", [(1, 1, 4), (2, 2, 5), (3, 3, 6)] + ) + + assert to_sql_calls(adapter) == [ + f"""IF NOT EXISTS (SELECT * FROM INFORMATION_SCHEMA.TABLES WHERE TABLE_NAME = '__temp_target_{temp_table_id}') EXEC('CREATE TABLE [__temp_target_{temp_table_id}] ([id] INT, [ts] DATETIME2(6), [val] INT)');""", + f"MERGE INTO [target] AS [__MERGE_TARGET__] USING (SELECT CAST([id] AS INT) AS [id], CAST([ts] AS DATETIME2(6)) AS [ts], CAST([val] AS INT) AS [val] FROM [__temp_target_{temp_table_id}]) AS [__MERGE_SOURCE__] ON [__MERGE_TARGET__].[id] = [__MERGE_SOURCE__].[id] WHEN MATCHED THEN UPDATE SET [__MERGE_TARGET__].[ts] = [__MERGE_SOURCE__].[ts], [__MERGE_TARGET__].[val] = [__MERGE_SOURCE__].[val] WHEN NOT MATCHED THEN INSERT ([id], [ts], [val]) VALUES ([__MERGE_SOURCE__].[id], [__MERGE_SOURCE__].[ts], [__MERGE_SOURCE__].[val]);", + f"DROP TABLE IF EXISTS [__temp_target_{temp_table_id}];", + ] + + # 2 keys + adapter.cursor.reset_mock() + adapter._connection_pool.get().reset_mock() + temp_table_mock.return_value = make_temp_table_name(table_name, temp_table_id) + adapter.merge( + target_table=table_name, + source_table=df, + target_columns_to_types={ + "id": exp.DataType.build("int"), + "ts": exp.DataType.build("TIMESTAMP"), + "val": exp.DataType.build("int"), + }, + unique_key=[exp.to_identifier("id"), exp.to_column("ts")], + ) + adapter._connection_pool.get().bulk_copy.assert_called_with( + f"__temp_target_{temp_table_id}", [(1, 1, 4), (2, 2, 5), (3, 3, 6)] + ) + + assert to_sql_calls(adapter) == [ + f"""IF NOT EXISTS (SELECT * FROM INFORMATION_SCHEMA.TABLES WHERE TABLE_NAME = '__temp_target_{temp_table_id}') EXEC('CREATE TABLE [__temp_target_{temp_table_id}] ([id] INT, [ts] DATETIME2(6), [val] INT)');""", + f"MERGE INTO [target] AS [__MERGE_TARGET__] USING (SELECT CAST([id] AS INT) AS [id], CAST([ts] AS DATETIME2(6)) AS [ts], CAST([val] AS INT) AS [val] FROM [__temp_target_{temp_table_id}]) AS [__MERGE_SOURCE__] ON [__MERGE_TARGET__].[id] = [__MERGE_SOURCE__].[id] AND [__MERGE_TARGET__].[ts] = [__MERGE_SOURCE__].[ts] WHEN MATCHED THEN UPDATE SET [__MERGE_TARGET__].[val] = [__MERGE_SOURCE__].[val] WHEN NOT MATCHED THEN INSERT ([id], [ts], [val]) VALUES ([__MERGE_SOURCE__].[id], [__MERGE_SOURCE__].[ts], [__MERGE_SOURCE__].[val]);", + f"DROP TABLE IF EXISTS [__temp_target_{temp_table_id}];", + ] + + +def test_merge_exists( + make_mocked_engine_adapter: t.Callable, mocker: MockerFixture, make_temp_table_name: t.Callable +): + mocker.patch( + "sqlmesh.core.engine_adapter.fabric.FabricEngineAdapter.table_exists", + return_value=False, + ) + + adapter = make_mocked_engine_adapter(FabricEngineAdapter) + + temp_table_mock = mocker.patch("sqlmesh.core.engine_adapter.EngineAdapter._get_temp_table") + table_name = "target" + temp_table_id = "abcdefgh" + temp_table_mock.return_value = make_temp_table_name(table_name, temp_table_id) + + df = pd.DataFrame({"id": [1, 2, 3], "ts": [1, 2, 3], "val": [4, 5, 6]}) + + # regular implementation + adapter.merge( + target_table=table_name, + source_table=df, + target_columns_to_types={ + "id": exp.DataType.build("int"), + "ts": exp.DataType.build("TIMESTAMP"), + "val": exp.DataType.build("int"), + }, + unique_key=[exp.to_identifier("id")], + ) + + assert to_sql_calls(adapter) == [ + f"""IF NOT EXISTS (SELECT * FROM INFORMATION_SCHEMA.TABLES WHERE TABLE_NAME = '__temp_target_{temp_table_id}') EXEC('CREATE TABLE [__temp_target_{temp_table_id}] ([id] INT, [ts] DATETIME2(6), [val] INT)');""", + f"MERGE INTO [target] AS [__MERGE_TARGET__] USING (SELECT CAST([id] AS INT) AS [id], CAST([ts] AS DATETIME2(6)) AS [ts], CAST([val] AS INT) AS [val] FROM [__temp_target_{temp_table_id}]) AS [__MERGE_SOURCE__] ON [__MERGE_TARGET__].[id] = [__MERGE_SOURCE__].[id] WHEN MATCHED THEN UPDATE SET [__MERGE_TARGET__].[ts] = [__MERGE_SOURCE__].[ts], [__MERGE_TARGET__].[val] = [__MERGE_SOURCE__].[val] WHEN NOT MATCHED THEN INSERT ([id], [ts], [val]) VALUES ([__MERGE_SOURCE__].[id], [__MERGE_SOURCE__].[ts], [__MERGE_SOURCE__].[val]);", + f"DROP TABLE IF EXISTS [__temp_target_{temp_table_id}];", + ] + + # merge exists implementation + adapter.cursor.reset_mock() + adapter._connection_pool.get().reset_mock() + temp_table_mock.return_value = make_temp_table_name(table_name, temp_table_id) + adapter.merge( + target_table=table_name, + source_table=df, + target_columns_to_types={ + "id": exp.DataType.build("int"), + "ts": exp.DataType.build("TIMESTAMP"), + "val": exp.DataType.build("int"), + }, + unique_key=[exp.to_identifier("id")], + physical_properties={"mssql_merge_exists": True}, + ) + + assert to_sql_calls(adapter) == [ + f"""IF NOT EXISTS (SELECT * FROM INFORMATION_SCHEMA.TABLES WHERE TABLE_NAME = '__temp_target_{temp_table_id}') EXEC('CREATE TABLE [__temp_target_{temp_table_id}] ([id] INT, [ts] DATETIME2(6), [val] INT)');""", + f"MERGE INTO [target] AS [__MERGE_TARGET__] USING (SELECT CAST([id] AS INT) AS [id], CAST([ts] AS DATETIME2(6)) AS [ts], CAST([val] AS INT) AS [val] FROM [__temp_target_{temp_table_id}]) AS [__MERGE_SOURCE__] ON [__MERGE_TARGET__].[id] = [__MERGE_SOURCE__].[id] WHEN MATCHED AND EXISTS(SELECT [__MERGE_TARGET__].[ts], [__MERGE_TARGET__].[val] EXCEPT SELECT [__MERGE_SOURCE__].[ts], [__MERGE_SOURCE__].[val]) THEN UPDATE SET [__MERGE_TARGET__].[ts] = [__MERGE_SOURCE__].[ts], [__MERGE_TARGET__].[val] = [__MERGE_SOURCE__].[val] WHEN NOT MATCHED THEN INSERT ([id], [ts], [val]) VALUES ([__MERGE_SOURCE__].[id], [__MERGE_SOURCE__].[ts], [__MERGE_SOURCE__].[val]);", + f"DROP TABLE IF EXISTS [__temp_target_{temp_table_id}];", + ] + + # merge exists and all model columns are keys + adapter.cursor.reset_mock() + adapter._connection_pool.get().reset_mock() + temp_table_mock.return_value = make_temp_table_name(table_name, temp_table_id) + adapter.merge( + target_table=table_name, + source_table=df, + target_columns_to_types={ + "id": exp.DataType.build("int"), + "ts": exp.DataType.build("TIMESTAMP"), + }, + unique_key=[exp.to_identifier("id"), exp.to_column("ts")], + physical_properties={"mssql_merge_exists": True}, + ) + + assert to_sql_calls(adapter) == [ + f"""IF NOT EXISTS (SELECT * FROM INFORMATION_SCHEMA.TABLES WHERE TABLE_NAME = '__temp_target_{temp_table_id}') EXEC('CREATE TABLE [__temp_target_{temp_table_id}] ([id] INT, [ts] DATETIME2(6))');""", + f"MERGE INTO [target] AS [__MERGE_TARGET__] USING (SELECT CAST([id] AS INT) AS [id], CAST([ts] AS DATETIME2(6)) AS [ts] FROM [__temp_target_{temp_table_id}]) AS [__MERGE_SOURCE__] ON [__MERGE_TARGET__].[id] = [__MERGE_SOURCE__].[id] AND [__MERGE_TARGET__].[ts] = [__MERGE_SOURCE__].[ts] WHEN NOT MATCHED THEN INSERT ([id], [ts]) VALUES ([__MERGE_SOURCE__].[id], [__MERGE_SOURCE__].[ts]);", + f"DROP TABLE IF EXISTS [__temp_target_{temp_table_id}];", + ]