diff --git a/sqlmesh/core/engine_adapter/base.py b/sqlmesh/core/engine_adapter/base.py index 38f662c76f..f04ad4fd71 100644 --- a/sqlmesh/core/engine_adapter/base.py +++ b/sqlmesh/core/engine_adapter/base.py @@ -551,11 +551,13 @@ def replace_query( target_table, source_queries, target_columns_to_types, + **kwargs, ) return self._insert_overwrite_by_condition( target_table, source_queries, target_columns_to_types, + **kwargs, ) def create_index( @@ -1614,7 +1616,7 @@ def _insert_overwrite_by_time_partition( **kwargs: t.Any, ) -> None: return self._insert_overwrite_by_condition( - table_name, source_queries, target_columns_to_types, where + table_name, source_queries, target_columns_to_types, where, **kwargs ) def _values_to_sql( diff --git a/sqlmesh/core/engine_adapter/mssql.py b/sqlmesh/core/engine_adapter/mssql.py index a85ba6d94f..359d1f0818 100644 --- a/sqlmesh/core/engine_adapter/mssql.py +++ b/sqlmesh/core/engine_adapter/mssql.py @@ -423,7 +423,9 @@ def _insert_overwrite_by_condition( insert_overwrite_strategy_override: t.Optional[InsertOverwriteStrategy] = None, **kwargs: t.Any, ) -> None: - if not where or where == exp.true(): + # note that this is passed as table_properties here rather than physical_properties + use_merge_strategy = kwargs.get("table_properties", {}).get("mssql_merge_exists") + if (not where or where == exp.true()) and not use_merge_strategy: # this is a full table replacement, call the base strategy to do DELETE+INSERT # which will result in TRUNCATE+INSERT due to how we have overridden self.delete_from() return EngineAdapter._insert_overwrite_by_condition( @@ -436,7 +438,7 @@ def _insert_overwrite_by_condition( **kwargs, ) - # For actual conditional overwrites, use MERGE from InsertOverwriteWithMergeMixin + # For conditional overwrites or when mssql_merge_exists is set use MERGE return super()._insert_overwrite_by_condition( table_name=table_name, source_queries=source_queries, diff --git a/tests/core/engine_adapter/test_mssql.py b/tests/core/engine_adapter/test_mssql.py index a405bb7576..bf28157d00 100644 --- a/tests/core/engine_adapter/test_mssql.py +++ b/tests/core/engine_adapter/test_mssql.py @@ -9,14 +9,14 @@ from sqlglot import expressions as exp from sqlglot import parse_one +from pathlib import Path +from sqlmesh import model from sqlmesh.core.engine_adapter.mssql import MSSQLEngineAdapter -from sqlmesh.core.snapshot import SnapshotEvaluator, SnapshotChangeCategory +from sqlmesh.core.snapshot import SnapshotEvaluator, SnapshotChangeCategory, Snapshot from sqlmesh.core.model import load_sql_based_model +from sqlmesh.core.model.kind import SCDType2ByTimeKind from sqlmesh.core import dialect as d -from sqlmesh.core.engine_adapter.shared import ( - DataObject, - DataObjectType, -) +from sqlmesh.core.engine_adapter.shared import DataObject, DataObjectType, SourceQuery from sqlmesh.utils.date import to_ds from tests.core.engine_adapter import to_sql_calls @@ -916,3 +916,89 @@ def test_replace_query_strategy(adapter: MSSQLEngineAdapter, mocker: MockerFixtu "TRUNCATE TABLE [test_table];", "INSERT INTO [test_table] ([a], [b]) SELECT [a] AS [a], [b] AS [b] FROM [db].[upstream_table] AS [upstream_table];", ] + + +def test_mssql_merge_exists_switches_strategy_from_truncate_to_merge( + make_mocked_engine_adapter: t.Callable, mocker: MockerFixture +): + adapter = make_mocked_engine_adapter(MSSQLEngineAdapter) + + query = exp.select("*").from_("source") + source_queries = [SourceQuery(query_factory=lambda: query)] + + # Test WITHOUT mssql_merge_exists, should use DELETE+INSERT strategy + base_insert_overwrite = mocker.patch( + "sqlmesh.core.engine_adapter.base.EngineAdapter._insert_overwrite_by_condition" + ) + + adapter._insert_overwrite_by_condition( + table_name="target", + source_queries=source_queries, + target_columns_to_types={ + "id": exp.DataType.build("INT"), + "value": exp.DataType.build("VARCHAR"), + }, + where=None, + ) + + # Should call base DELETE+INSERT strategy + assert base_insert_overwrite.called + base_insert_overwrite.reset_mock() + + # Test WITH mssql_merge_exists uses MERGE strategy + super_insert_overwrite = mocker.patch( + "sqlmesh.core.engine_adapter.base.EngineAdapterWithIndexSupport._insert_overwrite_by_condition" + ) + + adapter._insert_overwrite_by_condition( + table_name="target", + source_queries=source_queries, + target_columns_to_types={ + "id": exp.DataType.build("INT"), + "value": exp.DataType.build("VARCHAR"), + }, + where=None, + table_properties={"mssql_merge_exists": True}, + ) + + # Should call super's MERGE strategy, not base DELETE+INSERT + assert super_insert_overwrite.called + assert not base_insert_overwrite.called + + +def test_python_scd2_model_preserves_physical_properties(make_snapshot): + @model( + "test_schema.python_scd2_with_mssql_merge", + kind=SCDType2ByTimeKind( + unique_key=["id"], + valid_from_name="valid_from", + valid_to_name="valid_to", + updated_at_name="updated_at", + ), + columns={ + "id": "INT", + "value": "VARCHAR", + "updated_at": "TIMESTAMP", + "valid_from": "TIMESTAMP", + "valid_to": "TIMESTAMP", + }, + physical_properties={"mssql_merge_exists": True}, + ) + def python_scd2_model(context, **kwargs): + import pandas as pd + + return pd.DataFrame( + {"id": [1, 2], "value": ["a", "b"], "updated_at": ["2024-01-01", "2024-01-02"]} + ) + + m = model.get_registry()["test_schema.python_scd2_with_mssql_merge"].model( + module_path=Path("."), + path=Path("."), + dialect="tsql", + ) + + # verify model has physical_properties that trigger merge strategy + assert "mssql_merge_exists" in m.physical_properties + snapshot: Snapshot = make_snapshot(m) + assert snapshot.node.physical_properties == m.physical_properties + assert snapshot.node.physical_properties.get("mssql_merge_exists")