Skip to content

Commit ec5e085

Browse files
authored
Fix: Streamline execution of pre- / post- statements when creating a physical table (#3837)
1 parent beaebe3 commit ec5e085

File tree

3 files changed

+85
-51
lines changed

3 files changed

+85
-51
lines changed

sqlmesh/core/renderer.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -114,6 +114,7 @@ def _render(
114114
self._model_fqn,
115115
snapshots={self._model_fqn: this_snapshot} if this_snapshot else None,
116116
deployability_index=deployability_index,
117+
table_mapping=table_mapping,
117118
)
118119

119120
expressions = [self._expression]

sqlmesh/core/snapshot/evaluator.py

Lines changed: 59 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -757,25 +757,14 @@ def _create_snapshot(
757757
deployability_index = deployability_index or DeployabilityIndex.all_deployable()
758758

759759
adapter = self._get_adapter(snapshot.model.gateway)
760-
common_render_kwargs: t.Dict[str, t.Any] = dict(
760+
create_render_kwargs: t.Dict[str, t.Any] = dict(
761761
engine_adapter=adapter,
762762
snapshots=parent_snapshots_by_name(snapshot, snapshots),
763763
runtime_stage=RuntimeStage.CREATING,
764+
deployability_index=deployability_index,
764765
)
765-
pre_post_render_kwargs = dict(
766-
**common_render_kwargs,
767-
deployability_index=deployability_index.with_deployable(snapshot),
768-
)
769-
create_render_kwargs = dict(**common_render_kwargs, deployability_index=deployability_index)
770-
771-
# It can still be useful for some strategies to know if the snapshot was actually deployable
772-
is_snapshot_deployable = deployability_index.is_deployable(snapshot)
773-
is_snapshot_representative = deployability_index.is_representative(snapshot)
774-
775-
evaluation_strategy = _evaluation_strategy(snapshot, adapter)
776766

777767
with adapter.transaction(), adapter.session(snapshot.model.session_properties):
778-
adapter.execute(snapshot.model.render_pre_statements(**pre_post_render_kwargs))
779768
rendered_physical_properties = snapshot.model.render_physical_properties(
780769
**create_render_kwargs
781770
)
@@ -796,18 +785,16 @@ def _create_snapshot(
796785

797786
logger.info(f"Cloning table '{source_table_name}' into '{target_table_name}'")
798787

799-
evaluation_strategy.create(
788+
self._execute_create(
789+
snapshot=snapshot,
800790
table_name=tmp_table_name,
801-
model=snapshot.model,
802791
is_table_deployable=False,
803-
render_kwargs=dict(
804-
table_mapping={snapshot.name: tmp_table_name},
805-
**create_render_kwargs,
806-
),
807-
is_snapshot_deployable=is_snapshot_deployable,
808-
is_snapshot_representative=is_snapshot_representative,
809-
physical_properties=rendered_physical_properties,
792+
deployability_index=deployability_index,
793+
create_render_kwargs=create_render_kwargs,
794+
rendered_physical_properties=rendered_physical_properties,
795+
dry_run=True,
810796
)
797+
811798
try:
812799
adapter.clone_table(target_table_name, snapshot.table_name(), replace=True)
813800
alter_expressions = adapter.get_alter_expressions(
@@ -828,7 +815,7 @@ def _create_snapshot(
828815
if (
829816
is_table_deployable
830817
and snapshot.model.forward_only
831-
and not is_snapshot_representative
818+
and not deployability_index.is_representative(snapshot)
832819
):
833820
logger.info(
834821
"Skipping creation of the deployable table '%s' for the forward-only model %s. "
@@ -838,19 +825,16 @@ def _create_snapshot(
838825
)
839826
continue
840827

841-
evaluation_strategy.create(
828+
self._execute_create(
829+
snapshot=snapshot,
842830
table_name=snapshot.table_name(is_deployable=is_table_deployable),
843-
model=snapshot.model,
844831
is_table_deployable=is_table_deployable,
845-
render_kwargs=create_render_kwargs,
846-
is_snapshot_deployable=is_snapshot_deployable,
847-
is_snapshot_representative=is_snapshot_representative,
832+
deployability_index=deployability_index,
833+
create_render_kwargs=create_render_kwargs,
834+
rendered_physical_properties=rendered_physical_properties,
848835
dry_run=dry_run,
849-
physical_properties=rendered_physical_properties,
850836
)
851837

852-
adapter.execute(snapshot.model.render_post_statements(**pre_post_render_kwargs))
853-
854838
if on_complete is not None:
855839
on_complete(snapshot)
856840

@@ -871,10 +855,9 @@ def _migrate_snapshot(
871855
if not needs_migration:
872856
return
873857

874-
evaluation_strategy = _evaluation_strategy(snapshot, adapter)
875-
876858
target_table_name = snapshot.table_name()
877859
if adapter.table_exists(target_table_name):
860+
evaluation_strategy = _evaluation_strategy(snapshot, adapter)
878861
tmp_table_name = snapshot.table_name(is_deployable=False)
879862
logger.info(
880863
"Migrating table schema from '%s' to '%s'",
@@ -894,25 +877,25 @@ def _migrate_snapshot(
894877
target_table_name,
895878
snapshot.snapshot_id,
896879
)
880+
deployability_index = DeployabilityIndex.all_deployable()
897881
render_kwargs: t.Dict[str, t.Any] = dict(
898882
engine_adapter=adapter,
899883
snapshots=parent_snapshots_by_name(snapshot, snapshots),
900884
runtime_stage=RuntimeStage.CREATING,
901-
deployability_index=DeployabilityIndex.all_deployable(),
885+
deployability_index=deployability_index,
902886
)
903887
with adapter.transaction(), adapter.session(snapshot.model.session_properties):
904-
adapter.execute(snapshot.model.render_pre_statements(**render_kwargs))
905-
evaluation_strategy.create(
888+
self._execute_create(
889+
snapshot=snapshot,
906890
table_name=target_table_name,
907-
model=snapshot.model,
908891
is_table_deployable=True,
909-
render_kwargs=render_kwargs,
910-
is_snapshot_deployable=True,
911-
is_snapshot_representative=True,
892+
deployability_index=deployability_index,
893+
create_render_kwargs=render_kwargs,
894+
rendered_physical_properties=snapshot.model.render_physical_properties(
895+
**render_kwargs
896+
),
912897
dry_run=False,
913-
physical_properties=snapshot.model.render_physical_properties(**render_kwargs),
914898
)
915-
adapter.execute(snapshot.model.render_post_statements(**render_kwargs))
916899

917900
def _promote_snapshot(
918901
self,
@@ -1085,6 +1068,40 @@ def _get_adapter(self, gateway: t.Optional[str] = None) -> EngineAdapter:
10851068
raise SQLMeshError(f"Gateway '{gateway}' not found in the available engine adapters.")
10861069
return self.adapter
10871070

1071+
def _execute_create(
1072+
self,
1073+
snapshot: Snapshot,
1074+
table_name: str,
1075+
is_table_deployable: bool,
1076+
deployability_index: DeployabilityIndex,
1077+
create_render_kwargs: t.Dict[str, t.Any],
1078+
rendered_physical_properties: t.Dict[str, exp.Expression],
1079+
dry_run: bool,
1080+
) -> None:
1081+
adapter = self._get_adapter(snapshot.model.gateway)
1082+
evaluation_strategy = _evaluation_strategy(snapshot, adapter)
1083+
1084+
# It can still be useful for some strategies to know if the snapshot was actually deployable
1085+
is_snapshot_deployable = deployability_index.is_deployable(snapshot)
1086+
is_snapshot_representative = deployability_index.is_representative(snapshot)
1087+
1088+
create_render_kwargs = {
1089+
**create_render_kwargs,
1090+
"table_mapping": {snapshot.name: table_name},
1091+
}
1092+
adapter.execute(snapshot.model.render_pre_statements(**create_render_kwargs))
1093+
evaluation_strategy.create(
1094+
table_name=table_name,
1095+
model=snapshot.model,
1096+
is_table_deployable=is_table_deployable,
1097+
render_kwargs=create_render_kwargs,
1098+
is_snapshot_deployable=is_snapshot_deployable,
1099+
is_snapshot_representative=is_snapshot_representative,
1100+
dry_run=dry_run,
1101+
physical_properties=rendered_physical_properties,
1102+
)
1103+
adapter.execute(snapshot.model.render_post_statements(**create_render_kwargs))
1104+
10881105

10891106
def _evaluation_strategy(snapshot: SnapshotInfoLike, adapter: EngineAdapter) -> EvaluationStrategy:
10901107
klass: t.Type

tests/core/test_snapshot_evaluator.py

Lines changed: 25 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1616,22 +1616,26 @@ def test_drop_clone_in_dev_when_migration_fails(mocker: MockerFixture, adapter_m
16161616
)
16171617

16181618

1619-
def test_create_clone_in_dev_self_referencing(mocker: MockerFixture, adapter_mock, make_snapshot):
1619+
@pytest.mark.parametrize("use_this_model", [True, False])
1620+
def test_create_clone_in_dev_self_referencing(
1621+
mocker: MockerFixture, adapter_mock, make_snapshot, use_this_model: bool
1622+
):
16201623
adapter_mock.SUPPORTS_CLONING = True
16211624
adapter_mock.get_alter_expressions.return_value = []
16221625
evaluator = SnapshotEvaluator(adapter_mock)
16231626

1627+
from_table = "test_schema.test_model" if not use_this_model else "@this_model"
16241628
model = load_sql_based_model(
16251629
parse( # type: ignore
1626-
"""
1630+
f"""
16271631
MODEL (
16281632
name test_schema.test_model,
16291633
kind INCREMENTAL_BY_TIME_RANGE (
16301634
time_column ds
16311635
)
16321636
);
16331637
1634-
SELECT 1::INT as a, ds::DATE FROM test_schema.test_model;
1638+
SELECT 1::INT as a, ds::DATE FROM {from_table};
16351639
"""
16361640
),
16371641
)
@@ -1664,10 +1668,15 @@ def test_create_clone_in_dev_self_referencing(mocker: MockerFixture, adapter_moc
16641668
)
16651669

16661670
# Make sure the dry run references the correct ("...__schema_migration_source") table.
1671+
table_alias = (
1672+
"test_model"
1673+
if not use_this_model
1674+
else f"test_schema__test_model__{snapshot.version}__dev__schema_migration_source"
1675+
)
16671676
dry_run_query = adapter_mock.fetchall.call_args[0][0].sql()
16681677
assert (
16691678
dry_run_query
1670-
== f'SELECT CAST(1 AS INT) AS "a", CAST("ds" AS DATE) AS "ds" FROM "sqlmesh__test_schema"."test_schema__test_model__{snapshot.version}__dev__schema_migration_source" AS "test_model" /* test_schema.test_model */ WHERE FALSE LIMIT 0'
1679+
== f'SELECT CAST(1 AS INT) AS "a", CAST("ds" AS DATE) AS "ds" FROM "sqlmesh__test_schema"."test_schema__test_model__{snapshot.version}__dev__schema_migration_source" AS "{table_alias}" /* test_schema.test_model */ WHERE FALSE LIMIT 0'
16711680
)
16721681

16731682

@@ -2800,7 +2809,7 @@ def blocking_value(evaluator):
28002809
assert results[0].blocking
28012810

28022811

2803-
def test_create_post_statements_use_deployable_table(
2812+
def test_create_post_statements_use_non_deployable_table(
28042813
mocker: MockerFixture, adapter_mock, make_snapshot
28052814
):
28062815
evaluator = SnapshotEvaluator(adapter_mock)
@@ -2826,7 +2835,7 @@ def test_create_post_statements_use_deployable_table(
28262835
snapshot = make_snapshot(model)
28272836
snapshot.categorize_as(SnapshotChangeCategory.BREAKING)
28282837

2829-
expected_call = f'CREATE INDEX IF NOT EXISTS "test_idx" ON "sqlmesh__test_schema"."test_schema__test_model__{snapshot.version}" /* test_schema.test_model */("a" NULLS FIRST)'
2838+
expected_call = f'CREATE INDEX IF NOT EXISTS "test_idx" ON "sqlmesh__test_schema"."test_schema__test_model__{snapshot.version}__dev" /* test_schema.test_model */("a" NULLS FIRST)'
28302839

28312840
evaluator.create([snapshot], {}, DeployabilityIndex.none_deployable())
28322841

@@ -2890,7 +2899,7 @@ def model_with_statements(context, **kwargs):
28902899
snapshot.categorize_as(SnapshotChangeCategory.BREAKING)
28912900

28922901
evaluator.create([snapshot], {}, DeployabilityIndex.none_deployable())
2893-
expected_call = f'CREATE INDEX IF NOT EXISTS "idx" ON "sqlmesh__db"."db__test_model__{snapshot.version}" /* db.test_model */("id")'
2902+
expected_call = f'CREATE INDEX IF NOT EXISTS "idx" ON "sqlmesh__db"."db__test_model__{snapshot.version}__dev" /* db.test_model */("id")'
28942903

28952904
call_args = adapter_mock.execute.call_args_list
28962905
pre_calls = call_args[0][0][0]
@@ -2952,12 +2961,19 @@ def test_on_virtual_update_statements(mocker: MockerFixture, adapter_mock, make_
29522961
call_args = adapter_mock.execute.call_args_list
29532962
post_calls = call_args[1][0][0]
29542963
assert len(post_calls) == 1
2964+
assert (
2965+
post_calls[0].sql(dialect="postgres")
2966+
== f'CREATE INDEX IF NOT EXISTS "test_idx" ON "sqlmesh__test_schema"."test_schema__test_model__{snapshot.version}__dev" /* test_schema.test_model */("a")'
2967+
)
2968+
2969+
post_calls = call_args[3][0][0]
2970+
assert len(post_calls) == 1
29552971
assert (
29562972
post_calls[0].sql(dialect="postgres")
29572973
== f'CREATE INDEX IF NOT EXISTS "test_idx" ON "sqlmesh__test_schema"."test_schema__test_model__{snapshot.version}" /* test_schema.test_model */("a")'
29582974
)
29592975

2960-
on_virtual_update_calls = call_args[2][0][0]
2976+
on_virtual_update_calls = call_args[4][0][0]
29612977
assert (
29622978
on_virtual_update_calls[0].sql(dialect="postgres")
29632979
== 'GRANT SELECT ON VIEW "test_schema__test_env"."test_model" /* test_schema.test_model */ TO ROLE "admin"'
@@ -3029,7 +3045,7 @@ def model_with_statements(context, **kwargs):
30293045
)
30303046

30313047
call_args = adapter_mock.execute.call_args_list
3032-
on_virtual_update_call = call_args[2][0][0][0]
3048+
on_virtual_update_call = call_args[4][0][0][0]
30333049
assert (
30343050
on_virtual_update_call.sql(dialect="postgres")
30353051
== 'CREATE INDEX IF NOT EXISTS "idx" ON "db"."test_model_3" /* db.test_model_3 */("id")'

0 commit comments

Comments
 (0)