Skip to content

Commit d774db9

Browse files
authored
Fix: Make sure that changes to seed models are reflected in the dev-only VDE mode (#5282)
1 parent 7f5adcf commit d774db9

File tree

5 files changed

+100
-18
lines changed

5 files changed

+100
-18
lines changed

sqlmesh/core/console.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2076,7 +2076,7 @@ def _show_categorized_snapshots(self, plan: Plan, default_catalog: t.Optional[st
20762076
if text_diff:
20772077
self._print("")
20782078
self._print(Syntax(text_diff, "sql", word_wrap=True))
2079-
self._print(tree)
2079+
self._print(tree)
20802080

20812081
def _show_missing_dates(self, plan: Plan, default_catalog: t.Optional[str]) -> None:
20822082
"""Displays the models with missing dates."""

sqlmesh/core/plan/common.py

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,10 @@
55

66
def should_force_rebuild(old: Snapshot, new: Snapshot) -> bool:
77
if new.is_view and new.is_indirect_non_breaking and not new.is_forward_only:
8-
# View models always need to be rebuilt to reflect updated upstream dependencies.
8+
# View models always need to be rebuilt to reflect updated upstream dependencies
9+
return True
10+
if new.is_seed:
11+
# Seed models always need to be rebuilt to reflect changes in the seed file
912
return True
1013
return is_breaking_kind_change(old, new)
1114

sqlmesh/core/scheduler.py

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -474,10 +474,17 @@ def run_merged_intervals(
474474
execution_time=execution_time,
475475
)
476476

477+
# We only need to create physical tables if the snapshot is not representative or if it
478+
# needs backfill
479+
snapshots_to_create_candidates = [
480+
s
481+
for s in selected_snapshots
482+
if not deployability_index.is_representative(s) or s in batched_intervals
483+
]
477484
snapshots_to_create = {
478485
s.snapshot_id
479486
for s in self.snapshot_evaluator.get_snapshots_to_create(
480-
selected_snapshots, deployability_index
487+
snapshots_to_create_candidates, deployability_index
481488
)
482489
}
483490

sqlmesh/core/snapshot/evaluator.py

Lines changed: 12 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -2102,21 +2102,18 @@ def create(
21022102
return
21032103

21042104
super().create(table_name, model, is_table_deployable, render_kwargs, **kwargs)
2105-
if is_table_deployable:
2106-
# For seeds we insert data at the time of table creation.
2107-
try:
2108-
for index, df in enumerate(model.render_seed()):
2109-
if index == 0:
2110-
self._replace_query_for_model(
2111-
model, table_name, df, render_kwargs, **kwargs
2112-
)
2113-
else:
2114-
self.adapter.insert_append(
2115-
table_name, df, target_columns_to_types=model.columns_to_types
2116-
)
2117-
except Exception:
2118-
self.adapter.drop_table(table_name)
2119-
raise
2105+
# For seeds we insert data at the time of table creation.
2106+
try:
2107+
for index, df in enumerate(model.render_seed()):
2108+
if index == 0:
2109+
self._replace_query_for_model(model, table_name, df, render_kwargs, **kwargs)
2110+
else:
2111+
self.adapter.insert_append(
2112+
table_name, df, target_columns_to_types=model.columns_to_types
2113+
)
2114+
except Exception:
2115+
self.adapter.drop_table(table_name)
2116+
raise
21202117

21212118
def insert(
21222119
self,

tests/core/test_integration.py

Lines changed: 75 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2918,6 +2918,81 @@ def test_virtual_environment_mode_dev_only_model_kind_change_manual_categorizati
29182918
]
29192919

29202920

2921+
@time_machine.travel("2023-01-08 15:00:00 UTC")
2922+
def test_virtual_environment_mode_dev_only_seed_model_change(
2923+
init_and_plan_context: t.Callable,
2924+
):
2925+
context, _ = init_and_plan_context(
2926+
"examples/sushi", config="test_config_virtual_environment_mode_dev_only"
2927+
)
2928+
context.load()
2929+
context.plan("prod", auto_apply=True, no_prompts=True)
2930+
2931+
seed_model = context.get_model("sushi.waiter_names")
2932+
with open(seed_model.seed_path, "a") as fd:
2933+
fd.write("\n123,New Test Name")
2934+
2935+
context.load()
2936+
seed_model_snapshot = context.get_snapshot("sushi.waiter_names")
2937+
plan = context.plan_builder("dev").build()
2938+
assert plan.directly_modified == {seed_model_snapshot.snapshot_id}
2939+
assert len(plan.missing_intervals) == 2
2940+
context.apply(plan)
2941+
2942+
actual_seed_df_in_dev = context.fetchdf("SELECT * FROM sushi__dev.waiter_names WHERE id = 123")
2943+
assert actual_seed_df_in_dev.to_dict("records") == [{"id": 123, "name": "New Test Name"}]
2944+
actual_seed_df_in_prod = context.fetchdf("SELECT * FROM sushi.waiter_names WHERE id = 123")
2945+
assert actual_seed_df_in_prod.empty
2946+
2947+
plan = context.plan_builder("prod").build()
2948+
assert plan.directly_modified == {seed_model_snapshot.snapshot_id}
2949+
assert len(plan.missing_intervals) == 1
2950+
assert plan.missing_intervals[0].snapshot_id == seed_model_snapshot.snapshot_id
2951+
context.apply(plan)
2952+
2953+
actual_seed_df_in_prod = context.fetchdf("SELECT * FROM sushi.waiter_names WHERE id = 123")
2954+
assert actual_seed_df_in_prod.to_dict("records") == [{"id": 123, "name": "New Test Name"}]
2955+
2956+
2957+
@time_machine.travel("2023-01-08 15:00:00 UTC")
2958+
def test_virtual_environment_mode_dev_only_model_change_downstream_of_seed(
2959+
init_and_plan_context: t.Callable,
2960+
):
2961+
"""This test covers a scenario when a model downstream of a seed model is modified and explicitly selected
2962+
causing an (unhydrated) seed model to sourced from the state. If SQLMesh attempts to create
2963+
a table for the unchanged seed model, it will fail because the seed model is not hydrated.
2964+
"""
2965+
context, _ = init_and_plan_context(
2966+
"examples/sushi", config="test_config_virtual_environment_mode_dev_only"
2967+
)
2968+
context.load()
2969+
context.plan("prod", auto_apply=True, no_prompts=True)
2970+
2971+
# Make sure that a different version of the seed model is loaded
2972+
seed_model = context.get_model("sushi.waiter_names")
2973+
seed_model = seed_model.copy(update={"stamp": "force new version"})
2974+
context.upsert_model(seed_model)
2975+
2976+
# Make a change to the downstream model
2977+
model = context.get_model("sushi.waiter_as_customer_by_day")
2978+
model = model.copy(update={"stamp": "force new version"})
2979+
context.upsert_model(model)
2980+
2981+
# It is important to clear the cache so that the hydrated seed model is not sourced from the cache
2982+
context.clear_caches()
2983+
2984+
# Make sure to use the selector so that the seed model is sourced from the state
2985+
plan = context.plan_builder("dev", select_models=[model.name]).build()
2986+
assert len(plan.directly_modified) == 1
2987+
assert list(plan.directly_modified)[0].name == model.fqn
2988+
assert len(plan.missing_intervals) == 1
2989+
assert plan.missing_intervals[0].snapshot_id.name == model.fqn
2990+
2991+
# Make sure there's no error when applying the plan
2992+
context.apply(plan)
2993+
context.plan("prod", auto_apply=True, no_prompts=True)
2994+
2995+
29212996
@time_machine.travel("2023-01-08 15:00:00 UTC")
29222997
def test_restatement_plan_ignores_changes(init_and_plan_context: t.Callable):
29232998
context, plan = init_and_plan_context("examples/sushi")

0 commit comments

Comments
 (0)