diff --git a/sqlmesh/core/snapshot/evaluator.py b/sqlmesh/core/snapshot/evaluator.py index f7aea5cff1..cac9c42378 100644 --- a/sqlmesh/core/snapshot/evaluator.py +++ b/sqlmesh/core/snapshot/evaluator.py @@ -2185,7 +2185,13 @@ def _get_target_and_source_columns( if model.on_destructive_change.is_ignore or model.on_additive_change.is_ignore: # We need to identify the columns that are only in the source so we create an empty table with # the user query to determine that - with self.adapter.temp_table(model.ctas_query(**render_kwargs)) as temp_table: + temp_table_name = exp.table_( + "diff", + db=model.physical_schema, + ) + with self.adapter.temp_table( + model.ctas_query(**render_kwargs), name=temp_table_name + ) as temp_table: source_columns = list(self.adapter.columns(temp_table)) else: source_columns = None diff --git a/tests/core/test_snapshot_evaluator.py b/tests/core/test_snapshot_evaluator.py index f3a06c83bd..348bddc32b 100644 --- a/tests/core/test_snapshot_evaluator.py +++ b/tests/core/test_snapshot_evaluator.py @@ -3,6 +3,7 @@ from typing_extensions import Self from unittest.mock import call, patch, Mock +import contextlib import re import logging import pytest @@ -2077,6 +2078,68 @@ def columns(table_name): ) +def test_temp_table_includes_schema_for_ignore_changes( + mocker: MockerFixture, + make_snapshot, + make_mocked_engine_adapter, +): + """Test that temp table creation includes the physical schema when on_destructive_change or on_additive_change is IGNORE.""" + # Create a model with on_destructive_change=IGNORE to trigger temp table creation + model = SqlModel( + name="test_schema.test_model", + kind=IncrementalByTimeRangeKind( + time_column="a", on_destructive_change=OnDestructiveChange.IGNORE + ), + query=parse_one("SELECT c, a FROM tbl WHERE ds BETWEEN @start_ds and @end_ds"), + ) + snapshot = make_snapshot(model, version="1") + snapshot.categorize_as(SnapshotChangeCategory.BREAKING) + + # Set up the mocked adapter + adapter = make_mocked_engine_adapter(EngineAdapter) + adapter.with_settings = lambda **kwargs: adapter # type: ignore + adapter.table_exists = lambda _: True # type: ignore + + # Mock columns method to return existing columns + def columns(table_name): + return { + "c": exp.DataType.build("int"), + "a": exp.DataType.build("int"), + } + + adapter.columns = columns # type: ignore + + # Create a mock for the temp_table context manager + temp_table_name_captured = None + + @contextlib.contextmanager + def mock_temp_table(query_or_df, name="diff", **kwargs): + nonlocal temp_table_name_captured + temp_table_name_captured = exp.to_table(name) + # Return a table that temp_table would normally return + yield exp.table_("__temp_diff_12345", db=temp_table_name_captured.db) + + adapter.temp_table = mock_temp_table # type: ignore + adapter.insert_append = lambda *args, **kwargs: None # type: ignore + + evaluator = SnapshotEvaluator(adapter) + + # Call the append method which will trigger _get_target_and_source_columns + evaluator.evaluate( + snapshot, + start="2020-01-01", + end="2020-01-02", + execution_time="2020-01-02", + snapshots={}, + ) + + # Verify that temp_table was called with a name that includes the schema + assert temp_table_name_captured is not None + assert temp_table_name_captured.name == "diff" + assert temp_table_name_captured.db == model.physical_schema + assert str(temp_table_name_captured.db) == "sqlmesh__test_schema" + + def test_forward_only_snapshot_for_added_model(mocker: MockerFixture, adapter_mock, make_snapshot): adapter_mock.SUPPORTS_CLONING = False evaluator = SnapshotEvaluator(adapter_mock)