From 24015033f2cf6b47805d856b160192ec2d5f7076 Mon Sep 17 00:00:00 2001 From: Iaroslav Zeigerman Date: Wed, 17 Sep 2025 10:13:43 -0700 Subject: [PATCH] Fix: Make table schema migration retriable --- sqlmesh/core/engine_adapter/base.py | 3 +++ sqlmesh/core/engine_adapter/databricks.py | 1 + sqlmesh/core/engine_adapter/snowflake.py | 1 + sqlmesh/core/snapshot/evaluator.py | 14 ++++++++++++-- sqlmesh/utils/errors.py | 4 ++++ tests/core/engine_adapter/test_base.py | 2 +- tests/core/engine_adapter/test_databricks.py | 2 +- tests/core/engine_adapter/test_snowflake.py | 6 +++--- tests/core/test_snapshot_evaluator.py | 4 +--- 9 files changed, 27 insertions(+), 10 deletions(-) diff --git a/sqlmesh/core/engine_adapter/base.py b/sqlmesh/core/engine_adapter/base.py index e046dc9b4d..d8747c979d 100644 --- a/sqlmesh/core/engine_adapter/base.py +++ b/sqlmesh/core/engine_adapter/base.py @@ -1044,6 +1044,7 @@ def clone_table( target_table_name: TableName, source_table_name: TableName, replace: bool = False, + exists: bool = True, clone_kwargs: t.Optional[t.Dict[str, t.Any]] = None, **kwargs: t.Any, ) -> None: @@ -1053,6 +1054,7 @@ def clone_table( target_table_name: The name of the table that should be created. source_table_name: The name of the source table that should be cloned. replace: Whether or not to replace an existing table. + exists: Indicates whether to include the IF NOT EXISTS check. """ if not self.SUPPORTS_CLONING: raise NotImplementedError(f"Engine does not support cloning: {type(self)}") @@ -1063,6 +1065,7 @@ def clone_table( this=exp.to_table(target_table_name), kind="TABLE", replace=replace, + exists=exists, clone=exp.Clone( this=exp.to_table(source_table_name), **(clone_kwargs or {}), diff --git a/sqlmesh/core/engine_adapter/databricks.py b/sqlmesh/core/engine_adapter/databricks.py index 2571cb7214..946a7bdf74 100644 --- a/sqlmesh/core/engine_adapter/databricks.py +++ b/sqlmesh/core/engine_adapter/databricks.py @@ -299,6 +299,7 @@ def clone_table( target_table_name: TableName, source_table_name: TableName, replace: bool = False, + exists: bool = True, clone_kwargs: t.Optional[t.Dict[str, t.Any]] = None, **kwargs: t.Any, ) -> None: diff --git a/sqlmesh/core/engine_adapter/snowflake.py b/sqlmesh/core/engine_adapter/snowflake.py index 8a6f5e2fcc..c6b0e71ac3 100644 --- a/sqlmesh/core/engine_adapter/snowflake.py +++ b/sqlmesh/core/engine_adapter/snowflake.py @@ -610,6 +610,7 @@ def clone_table( target_table_name: TableName, source_table_name: TableName, replace: bool = False, + exists: bool = True, clone_kwargs: t.Optional[t.Dict[str, t.Any]] = None, **kwargs: t.Any, ) -> None: diff --git a/sqlmesh/core/snapshot/evaluator.py b/sqlmesh/core/snapshot/evaluator.py index 3e622d2dd1..90a774af2a 100644 --- a/sqlmesh/core/snapshot/evaluator.py +++ b/sqlmesh/core/snapshot/evaluator.py @@ -32,6 +32,7 @@ from sqlglot import exp, select from sqlglot.executor import execute +from tenacity import retry, stop_after_attempt, wait_exponential, retry_if_not_exception_type from sqlmesh.core import constants as c from sqlmesh.core import dialect as d @@ -76,6 +77,7 @@ from sqlmesh.utils.errors import ( ConfigError, DestructiveChangeError, + MigrationNotSupportedError, SQLMeshError, format_destructive_change_msg, format_additive_change_msg, @@ -1035,7 +1037,6 @@ def _clone_snapshot_in_dev( adapter.clone_table( target_table_name, snapshot.table_name(), - replace=True, rendered_physical_properties=rendered_physical_properties, ) self._migrate_target_table( @@ -1111,6 +1112,15 @@ def _migrate_snapshot( dry_run=True, ) + # Retry in case when the table is migrated concurrently from another plan application + @retry( + reraise=True, + stop=stop_after_attempt(5), + wait=wait_exponential(min=1, max=16), + retry=retry_if_not_exception_type( + (DestructiveChangeError, AdditiveChangeError, MigrationNotSupportedError) + ), + ) def _migrate_target_table( self, target_table_name: str, @@ -2671,7 +2681,7 @@ def migrate( ) if len(potential_alter_operations) > 0: # this can happen if a user changes a managed model and deliberately overrides a plan to be forward only, eg `sqlmesh plan --forward-only` - raise SQLMeshError( + raise MigrationNotSupportedError( f"The schema of the managed model '{target_table_name}' cannot be updated in a forward-only fashion." ) diff --git a/sqlmesh/utils/errors.py b/sqlmesh/utils/errors.py index d90965c25c..ca3e1bfb05 100644 --- a/sqlmesh/utils/errors.py +++ b/sqlmesh/utils/errors.py @@ -151,6 +151,10 @@ class AdditiveChangeError(SQLMeshError): pass +class MigrationNotSupportedError(SQLMeshError): + pass + + class NotificationTargetError(SQLMeshError): pass diff --git a/tests/core/engine_adapter/test_base.py b/tests/core/engine_adapter/test_base.py index 220c3291f7..140fac43eb 100644 --- a/tests/core/engine_adapter/test_base.py +++ b/tests/core/engine_adapter/test_base.py @@ -3347,7 +3347,7 @@ def test_clone_table(make_mocked_engine_adapter: t.Callable): adapter.clone_table("target_table", "source_table") adapter.cursor.execute.assert_called_once_with( - "CREATE TABLE `target_table` CLONE `source_table`" + "CREATE TABLE IF NOT EXISTS `target_table` CLONE `source_table`" ) diff --git a/tests/core/engine_adapter/test_databricks.py b/tests/core/engine_adapter/test_databricks.py index fcd7aec0fa..f482361c3c 100644 --- a/tests/core/engine_adapter/test_databricks.py +++ b/tests/core/engine_adapter/test_databricks.py @@ -106,7 +106,7 @@ def test_clone_table(mocker: MockFixture, make_mocked_engine_adapter: t.Callable adapter = make_mocked_engine_adapter(DatabricksEngineAdapter, default_catalog="test_catalog") adapter.clone_table("target_table", "source_table") adapter.cursor.execute.assert_called_once_with( - "CREATE TABLE `target_table` SHALLOW CLONE `source_table`" + "CREATE TABLE IF NOT EXISTS `target_table` SHALLOW CLONE `source_table`" ) diff --git a/tests/core/engine_adapter/test_snowflake.py b/tests/core/engine_adapter/test_snowflake.py index 9a1e068aa6..75ce8edbe0 100644 --- a/tests/core/engine_adapter/test_snowflake.py +++ b/tests/core/engine_adapter/test_snowflake.py @@ -688,7 +688,7 @@ def test_clone_table(mocker: MockerFixture, make_mocked_engine_adapter: t.Callab adapter = make_mocked_engine_adapter(SnowflakeEngineAdapter, default_catalog="test_catalog") adapter.clone_table("target_table", "source_table") adapter.cursor.execute.assert_called_once_with( - 'CREATE TABLE "target_table" CLONE "source_table"' + 'CREATE TABLE IF NOT EXISTS "target_table" CLONE "source_table"' ) # Validate with transient type we create the clone table accordingly @@ -700,7 +700,7 @@ def test_clone_table(mocker: MockerFixture, make_mocked_engine_adapter: t.Callab "target_table", "source_table", rendered_physical_properties=rendered_physical_properties ) adapter.cursor.execute.assert_called_once_with( - 'CREATE TRANSIENT TABLE "target_table" CLONE "source_table"' + 'CREATE TRANSIENT TABLE IF NOT EXISTS "target_table" CLONE "source_table"' ) # Validate other engine adapters would work as usual even when we pass the properties @@ -710,7 +710,7 @@ def test_clone_table(mocker: MockerFixture, make_mocked_engine_adapter: t.Callab "target_table", "source_table", rendered_physical_properties=rendered_physical_properties ) adapter.cursor.execute.assert_called_once_with( - 'CREATE TABLE "target_table" CLONE "source_table"' + 'CREATE TABLE IF NOT EXISTS "target_table" CLONE "source_table"' ) diff --git a/tests/core/test_snapshot_evaluator.py b/tests/core/test_snapshot_evaluator.py index 8003c6014e..660eafac70 100644 --- a/tests/core/test_snapshot_evaluator.py +++ b/tests/core/test_snapshot_evaluator.py @@ -1678,7 +1678,6 @@ def test_create_clone_in_dev(mocker: MockerFixture, adapter_mock, make_snapshot) adapter_mock.clone_table.assert_called_once_with( f"sqlmesh__test_schema.test_schema__test_model__{snapshot.dev_version}__dev", f"sqlmesh__test_schema.test_schema__test_model__{snapshot.version}", - replace=True, rendered_physical_properties={}, ) @@ -1701,7 +1700,7 @@ def test_drop_clone_in_dev_when_migration_fails(mocker: MockerFixture, adapter_m adapter_mock.get_alter_operations.return_value = [] evaluator = SnapshotEvaluator(adapter_mock) - adapter_mock.alter_table.side_effect = Exception("Migration failed") + adapter_mock.alter_table.side_effect = DestructiveChangeError("Migration failed") model = load_sql_based_model( parse( # type: ignore @@ -1728,7 +1727,6 @@ def test_drop_clone_in_dev_when_migration_fails(mocker: MockerFixture, adapter_m adapter_mock.clone_table.assert_called_once_with( f"sqlmesh__test_schema.test_schema__test_model__{snapshot.version}__dev", f"sqlmesh__test_schema.test_schema__test_model__{snapshot.version}", - replace=True, rendered_physical_properties={}, )