diff --git a/pyproject.toml b/pyproject.toml index 6823f7750b..4b526527fc 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -225,7 +225,8 @@ module = [ "pydantic_core.*", "dlt.*", "bigframes.*", - "json_stream.*" + "json_stream.*", + "duckdb.*" ] ignore_missing_imports = true diff --git a/sqlmesh/core/plan/stages.py b/sqlmesh/core/plan/stages.py index 0d829a6739..29b34d0fe0 100644 --- a/sqlmesh/core/plan/stages.py +++ b/sqlmesh/core/plan/stages.py @@ -12,6 +12,7 @@ Snapshot, SnapshotTableInfo, SnapshotId, + snapshots_to_dag, ) @@ -248,6 +249,7 @@ def build(self, plan: EvaluatablePlan) -> t.List[PlanStage]: stored_snapshots = self.state_reader.get_snapshots(plan.environment.snapshots) snapshots = {**new_snapshots, **stored_snapshots} snapshots_by_name = {s.name: s for s in snapshots.values()} + dag = snapshots_to_dag(snapshots.values()) all_selected_for_backfill_snapshots = { s.snapshot_id for s in snapshots.values() if plan.is_selected_for_backfill(s.name) @@ -271,8 +273,15 @@ def build(self, plan: EvaluatablePlan) -> t.List[PlanStage]: after_promote_snapshots = all_selected_for_backfill_snapshots - before_promote_snapshots deployability_index = DeployabilityIndex.all_deployable() + snapshot_ids_with_schema_migration = [ + s.snapshot_id for s in snapshots.values() if s.requires_schema_migration_in_prod + ] + # Include all upstream dependencies of snapshots that require schema migration to make sure + # the upstream tables are created before the schema updates are applied snapshots_with_schema_migration = [ - s for s in snapshots.values() if s.requires_schema_migration_in_prod + snapshots[s_id] + for s_id in dag.subdag(*snapshot_ids_with_schema_migration) + if snapshots[s_id].supports_schema_migration_in_prod ] snapshots_to_intervals = self._missing_intervals( diff --git a/sqlmesh/core/snapshot/definition.py b/sqlmesh/core/snapshot/definition.py index 9522366721..0d64736ffb 100644 --- a/sqlmesh/core/snapshot/definition.py +++ b/sqlmesh/core/snapshot/definition.py @@ -1477,19 +1477,19 @@ def expiration_ts(self) -> int: check_categorical_relative_expression=False, ) + @property + def supports_schema_migration_in_prod(self) -> bool: + """Returns whether or not this snapshot supports schema migration when deployed to production.""" + return self.is_paused and self.is_model and not self.is_symbolic + @property def requires_schema_migration_in_prod(self) -> bool: """Returns whether or not this snapshot requires a schema migration when deployed to production.""" - return ( - self.is_paused - and self.is_model - and self.is_materialized - and ( - (self.previous_version and self.previous_version.version == self.version) - or self.model.forward_only - or bool(self.model.physical_version) - or not self.virtual_environment_mode.is_full - ) + return self.supports_schema_migration_in_prod and ( + (self.previous_version and self.previous_version.version == self.version) + or self.model.forward_only + or bool(self.model.physical_version) + or not self.virtual_environment_mode.is_full ) @property diff --git a/sqlmesh/core/snapshot/evaluator.py b/sqlmesh/core/snapshot/evaluator.py index 8528dd4d1c..4d5023e901 100644 --- a/sqlmesh/core/snapshot/evaluator.py +++ b/sqlmesh/core/snapshot/evaluator.py @@ -489,15 +489,14 @@ def migrate( allow_destructive_snapshots = allow_destructive_snapshots or set() allow_additive_snapshots = allow_additive_snapshots or set() snapshots_by_name = {s.name: s for s in snapshots.values()} - snapshots_with_data_objects = [snapshots[s_id] for s_id in target_data_objects] with self.concurrent_context(): # Only migrate snapshots for which there's an existing data object concurrent_apply_to_snapshots( - snapshots_with_data_objects, + snapshots_by_name.values(), lambda s: self._migrate_snapshot( s, snapshots_by_name, - target_data_objects[s.snapshot_id], + target_data_objects.get(s.snapshot_id), allow_destructive_snapshots, allow_additive_snapshots, self.get_adapter(s.model_gateway), @@ -1059,7 +1058,7 @@ def _migrate_snapshot( adapter: EngineAdapter, deployability_index: DeployabilityIndex, ) -> None: - if not snapshot.requires_schema_migration_in_prod: + if not snapshot.is_model or snapshot.is_symbolic: return deployability_index = DeployabilityIndex.all_deployable() @@ -1081,6 +1080,10 @@ def _migrate_snapshot( ): table_exists = False + rendered_physical_properties = snapshot.model.render_physical_properties( + **render_kwargs + ) + if table_exists: self._migrate_target_table( target_table_name=target_table_name, @@ -1088,13 +1091,21 @@ def _migrate_snapshot( snapshots=snapshots, deployability_index=deployability_index, render_kwargs=render_kwargs, - rendered_physical_properties=snapshot.model.render_physical_properties( - **render_kwargs - ), + rendered_physical_properties=rendered_physical_properties, allow_destructive_snapshots=allow_destructive_snapshots, allow_additive_snapshots=allow_additive_snapshots, run_pre_post_statements=True, ) + else: + self._execute_create( + snapshot=snapshot, + table_name=snapshot.table_name(is_deployable=True), + is_table_deployable=True, + deployability_index=deployability_index, + create_render_kwargs=render_kwargs, + rendered_physical_properties=rendered_physical_properties, + dry_run=True, + ) def _migrate_target_table( self, diff --git a/tests/core/test_context.py b/tests/core/test_context.py index a9d6f7967f..b7ce64eb4c 100644 --- a/tests/core/test_context.py +++ b/tests/core/test_context.py @@ -1593,7 +1593,7 @@ def test_raw_code_handling(sushi_test_dbt_context: Context): hook = model.render_pre_statements()[0] assert ( hook.sql() - == f'''CREATE TABLE "t" AS SELECT 'Length is {raw_code_length}' AS "length_col"''' + == f'''CREATE TABLE IF NOT EXISTS "t" AS SELECT 'Length is {raw_code_length}' AS "length_col"''' ) diff --git a/tests/core/test_integration.py b/tests/core/test_integration.py index c2c11ced80..0d6990e17b 100644 --- a/tests/core/test_integration.py +++ b/tests/core/test_integration.py @@ -942,6 +942,26 @@ def test_forward_only_parent_created_in_dev_child_created_in_prod( context.apply(plan) +@time_machine.travel("2023-01-08 15:00:00 UTC") +def test_forward_only_view_migration( + init_and_plan_context: t.Callable, +): + context, plan = init_and_plan_context("examples/sushi") + context.apply(plan) + + model = context.get_model("sushi.top_waiters") + assert model.kind.is_view + model = add_projection_to_model(t.cast(SqlModel, model)) + context.upsert_model(model) + + # Apply a forward-only plan + context.plan("prod", skip_tests=True, no_prompts=True, auto_apply=True, forward_only=True) + + # Make sure that the new column got reflected in the view schema + df = context.fetchdf("SELECT one FROM sushi.top_waiters LIMIT 1") + assert len(df) == 1 + + @time_machine.travel("2023-01-08 00:00:00 UTC") def test_new_forward_only_model(init_and_plan_context: t.Callable): context, _ = init_and_plan_context("examples/sushi") diff --git a/tests/core/test_plan_stages.py b/tests/core/test_plan_stages.py index 444ce1bb9b..1a049490fd 100644 --- a/tests/core/test_plan_stages.py +++ b/tests/core/test_plan_stages.py @@ -1661,16 +1661,17 @@ def test_build_plan_stages_indirect_non_breaking_view_migration( stages = build_plan_stages(plan, state_reader, None) # Verify stages - assert len(stages) == 8 + assert len(stages) == 9 assert isinstance(stages[0], CreateSnapshotRecordsStage) assert isinstance(stages[1], PhysicalLayerSchemaCreationStage) assert isinstance(stages[2], BackfillStage) assert isinstance(stages[3], EnvironmentRecordUpdateStage) - assert isinstance(stages[4], UnpauseStage) - assert isinstance(stages[5], BackfillStage) - assert isinstance(stages[6], VirtualLayerUpdateStage) - assert isinstance(stages[7], FinalizeEnvironmentStage) + assert isinstance(stages[4], MigrateSchemasStage) + assert isinstance(stages[5], UnpauseStage) + assert isinstance(stages[6], BackfillStage) + assert isinstance(stages[7], VirtualLayerUpdateStage) + assert isinstance(stages[8], FinalizeEnvironmentStage) def test_build_plan_stages_virtual_environment_mode_filtering( diff --git a/tests/core/test_snapshot_evaluator.py b/tests/core/test_snapshot_evaluator.py index 6a39f600de..8003c6014e 100644 --- a/tests/core/test_snapshot_evaluator.py +++ b/tests/core/test_snapshot_evaluator.py @@ -1402,7 +1402,13 @@ def test_migrate_view( evaluator = SnapshotEvaluator(adapter) evaluator.migrate([snapshot], {}) - adapter.cursor.execute.assert_not_called() + adapter.cursor.execute.assert_has_calls( + [ + call( + f'CREATE OR REPLACE VIEW "sqlmesh__test_schema"."test_schema__test_model__{snapshot.version}" ("c", "a") AS SELECT "c" AS "c", "a" AS "a" FROM "tbl" AS "tbl"' + ), + ] + ) def test_migrate_snapshot_data_object_type_mismatch( diff --git a/tests/fixtures/dbt/sushi_test/models/model_with_raw_code.sql b/tests/fixtures/dbt/sushi_test/models/model_with_raw_code.sql index 386e7f40ef..1424f6e970 100644 --- a/tests/fixtures/dbt/sushi_test/models/model_with_raw_code.sql +++ b/tests/fixtures/dbt/sushi_test/models/model_with_raw_code.sql @@ -1,6 +1,6 @@ {{ config( - pre_hook=['CREATE TABLE t AS SELECT \'Length is {{ model.raw_code|length }}\' AS length_col'] + pre_hook=['CREATE TABLE IF NOT EXISTS t AS SELECT \'Length is {{ model.raw_code|length }}\' AS length_col'] ) }}