Skip to content

Commit 8ab080a

Browse files
committed
Fix: Child snapshot targeting a wrong parent forward-only snapshot table when the parent was created in dev and the child in prod (#2019)
1 parent 7f0daee commit 8ab080a

File tree

9 files changed

+88
-25
lines changed

9 files changed

+88
-25
lines changed

sqlmesh/core/plan/evaluator.py

Lines changed: 14 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -83,32 +83,39 @@ def evaluate(
8383
all_names = {
8484
s.name for s in snapshots.values() if plan.is_selected_for_backfill(s.name)
8585
}
86-
deployability_index = DeployabilityIndex.create(snapshots)
86+
deployability_index_for_evaluation = DeployabilityIndex.create(snapshots)
87+
deployability_index_for_creation = deployability_index_for_evaluation
8788
if plan.is_dev:
8889
before_promote_snapshots = all_names
8990
after_promote_snapshots = set()
9091
else:
9192
before_promote_snapshots = {
9293
s.name
9394
for s in snapshots.values()
94-
if deployability_index.is_representative(s)
95+
if deployability_index_for_evaluation.is_representative(s)
9596
and plan.is_selected_for_backfill(s.name)
9697
}
9798
after_promote_snapshots = all_names - before_promote_snapshots
98-
deployability_index = DeployabilityIndex.all_deployable()
99+
deployability_index_for_evaluation = DeployabilityIndex.all_deployable()
99100

100101
update_intervals_for_new_snapshots(plan.new_snapshots, self.state_sync)
101102

102-
self._push(plan, deployability_index)
103+
self._push(plan, deployability_index_for_creation)
103104
self._restate(plan)
104105
self._backfill(
105-
plan, before_promote_snapshots, deployability_index, circuit_breaker=circuit_breaker
106+
plan,
107+
before_promote_snapshots,
108+
deployability_index_for_evaluation,
109+
circuit_breaker=circuit_breaker,
106110
)
107111
promotion_result = self._promote(plan, before_promote_snapshots)
108112
self._backfill(
109-
plan, after_promote_snapshots, deployability_index, circuit_breaker=circuit_breaker
113+
plan,
114+
after_promote_snapshots,
115+
deployability_index_for_evaluation,
116+
circuit_breaker=circuit_breaker,
110117
)
111-
self._update_views(plan, promotion_result, deployability_index)
118+
self._update_views(plan, promotion_result, deployability_index_for_evaluation)
112119

113120
if not plan.requires_backfill:
114121
self.console.log_success("Virtual Update executed successfully")

sqlmesh/core/snapshot/definition.py

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1062,7 +1062,7 @@ class DeployabilityIndex(PydanticModel, frozen=True):
10621062

10631063
indexed_ids: t.FrozenSet[str]
10641064
is_opposite_index: bool = False
1065-
representative_shared_version_ids: t.Optional[t.FrozenSet[str]] = None
1065+
representative_shared_version_ids: t.FrozenSet[str] = frozenset()
10661066

10671067
@field_validator("indexed_ids", "representative_shared_version_ids", mode="before")
10681068
@classmethod
@@ -1109,10 +1109,7 @@ def is_representative(self, snapshot: SnapshotIdLike) -> bool:
11091109
True if the snapshot is representative, False otherwise.
11101110
"""
11111111
snapshot_id = self._snapshot_id_key(snapshot.snapshot_id)
1112-
representative = (
1113-
self.representative_shared_version_ids is not None
1114-
and snapshot_id in self.representative_shared_version_ids
1115-
)
1112+
representative = snapshot_id in self.representative_shared_version_ids
11161113
return representative or self.is_deployable(snapshot)
11171114

11181115
def with_non_deployable(self, snapshot: SnapshotIdLike) -> DeployabilityIndex:

sqlmesh/core/snapshot/evaluator.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -586,7 +586,7 @@ def _create_snapshot(
586586
self.adapter.drop_table(tmp_table_name)
587587
else:
588588
table_deployability_flags = [False]
589-
if not snapshot.is_indirect_non_breaking:
589+
if not snapshot.is_indirect_non_breaking and not snapshot.is_forward_only:
590590
table_deployability_flags.append(True)
591591
for is_table_deployable in table_deployability_flags:
592592
evaluation_strategy.create(

sqlmesh/schedulers/airflow/common.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -83,6 +83,7 @@ class PlanDagSpec(PydanticModel):
8383
environment_expiration_ts: t.Optional[int] = None
8484
dag_start_ts: t.Optional[int] = None
8585
deployability_index: DeployabilityIndex = DeployabilityIndex.all_deployable()
86+
deployability_index_for_creation: DeployabilityIndex = DeployabilityIndex.all_deployable()
8687
no_gaps_snapshot_names: t.Optional[t.Set[str]] = None
8788

8889

sqlmesh/schedulers/airflow/dag_generator.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -172,7 +172,7 @@ def _create_plan_application_dag(self, plan_dag_spec: common.PlanDagSpec) -> DAG
172172
(create_start_task, create_end_task) = self._create_creation_tasks(
173173
plan_dag_spec.new_snapshots,
174174
plan_dag_spec.ddl_concurrent_tasks,
175-
plan_dag_spec.deployability_index,
175+
plan_dag_spec.deployability_index_for_creation,
176176
)
177177

178178
(

sqlmesh/schedulers/airflow/plan.py

Lines changed: 12 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -114,9 +114,9 @@ def create_plan_dag_spec(
114114
for s, interval in intervals_to_remove:
115115
all_snapshots[s.snapshot_id].remove_interval(interval)
116116

117-
initial_deployability_index = DeployabilityIndex.create(all_snapshots)
118-
deployability_index = (
119-
initial_deployability_index if request.is_dev else DeployabilityIndex.all_deployable()
117+
deployability_index_for_creation = DeployabilityIndex.create(all_snapshots)
118+
deployability_index_for_evaluation = (
119+
deployability_index_for_creation if request.is_dev else DeployabilityIndex.all_deployable()
120120
)
121121

122122
if not request.skip_backfill:
@@ -125,7 +125,7 @@ def create_plan_dag_spec(
125125
start=request.environment.start_at,
126126
end=end,
127127
execution_time=now(),
128-
deployability_index=deployability_index,
128+
deployability_index=deployability_index_for_evaluation,
129129
restatements=request.restatements,
130130
ignore_cron=True,
131131
)
@@ -136,13 +136,17 @@ def create_plan_dag_spec(
136136
common.BackfillIntervalsPerSnapshot(
137137
snapshot_id=s.snapshot_id,
138138
intervals=intervals,
139-
before_promote=request.is_dev or initial_deployability_index.is_representative(s),
139+
before_promote=request.is_dev or deployability_index_for_creation.is_representative(s),
140140
)
141141
for s, intervals in backfill_batches.items()
142142
]
143143

144144
no_gaps_snapshot_names = (
145-
{s.name for s in all_snapshots.values() if initial_deployability_index.is_representative(s)}
145+
{
146+
s.name
147+
for s in all_snapshots.values()
148+
if deployability_index_for_creation.is_representative(s)
149+
}
146150
if request.no_gaps and not request.is_dev
147151
else None
148152
if request.no_gaps
@@ -170,7 +174,8 @@ def create_plan_dag_spec(
170174
forward_only=request.forward_only,
171175
environment_expiration_ts=request.environment.expiration_ts,
172176
dag_start_ts=to_timestamp(now_dt),
173-
deployability_index=deployability_index,
177+
deployability_index=deployability_index_for_evaluation,
178+
deployability_index_for_creation=deployability_index_for_creation,
174179
no_gaps_snapshot_names=no_gaps_snapshot_names,
175180
)
176181

tests/core/test_integration.py

Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -283,6 +283,60 @@ def test_forward_only_model_regular_plan(init_and_plan_context: t.Callable):
283283
assert not prod_df["event_date"].tolist()
284284

285285

286+
@freeze_time("2023-01-08 15:00:00")
287+
def test_forward_only_parent_created_in_dev_child_created_in_prod(
288+
init_and_plan_context: t.Callable,
289+
):
290+
context, plan = init_and_plan_context("examples/sushi")
291+
context.apply(plan)
292+
293+
waiter_revenue_by_day_model = context.get_model("sushi.waiter_revenue_by_day")
294+
waiter_revenue_by_day_model = add_projection_to_model(
295+
t.cast(SqlModel, waiter_revenue_by_day_model)
296+
)
297+
forward_only_kind = waiter_revenue_by_day_model.kind.copy(update={"forward_only": True})
298+
waiter_revenue_by_day_model = waiter_revenue_by_day_model.copy(
299+
update={"kind": forward_only_kind}
300+
)
301+
context.upsert_model(waiter_revenue_by_day_model)
302+
303+
waiter_revenue_by_day_snapshot = context.get_snapshot(
304+
waiter_revenue_by_day_model, raise_if_missing=True
305+
)
306+
top_waiters_snapshot = context.get_snapshot("sushi.top_waiters", raise_if_missing=True)
307+
308+
plan = context.plan("dev", no_prompts=True, skip_tests=True)
309+
assert len(plan.new_snapshots) == 2
310+
assert (
311+
plan.context_diff.snapshots[waiter_revenue_by_day_snapshot.snapshot_id].change_category
312+
== SnapshotChangeCategory.FORWARD_ONLY
313+
)
314+
assert (
315+
plan.context_diff.snapshots[top_waiters_snapshot.snapshot_id].change_category
316+
== SnapshotChangeCategory.FORWARD_ONLY
317+
)
318+
assert plan.start == to_datetime("2023-01-01")
319+
assert not plan.missing_intervals
320+
321+
context.apply(plan)
322+
323+
# Update the child to refer to a newly added column.
324+
top_waiters_model = context.get_model("sushi.top_waiters")
325+
top_waiters_model = add_projection_to_model(t.cast(SqlModel, top_waiters_model), literal=False)
326+
context.upsert_model(top_waiters_model)
327+
328+
top_waiters_snapshot = context.get_snapshot("sushi.top_waiters", raise_if_missing=True)
329+
330+
plan = context.plan("prod", no_prompts=True, skip_tests=True)
331+
assert len(plan.new_snapshots) == 1
332+
assert (
333+
plan.context_diff.snapshots[top_waiters_snapshot.snapshot_id].change_category
334+
== SnapshotChangeCategory.NON_BREAKING
335+
)
336+
337+
context.apply(plan)
338+
339+
286340
@freeze_time("2023-01-08 15:00:00")
287341
def test_plan_set_choice_is_reflected_in_missing_intervals(init_and_plan_context: t.Callable):
288342
context, plan = init_and_plan_context("examples/sushi")

tests/core/test_snapshot_evaluator.py

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -954,10 +954,6 @@ def test_forward_only_snapshot_for_added_model(mocker: MockerFixture, adapter_mo
954954
f"sqlmesh__test_schema.test_schema__test_model__{snapshot.version}__temp",
955955
**common_create_args,
956956
),
957-
call(
958-
f"sqlmesh__test_schema.test_schema__test_model__{snapshot.version}",
959-
**common_create_args,
960-
),
961957
]
962958
)
963959

tests/schedulers/airflow/test_plan.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -181,6 +181,9 @@ def test_create_plan_dag_spec(
181181
forward_only=True,
182182
dag_start_ts=to_timestamp("2023-01-01"),
183183
no_gaps_snapshot_names=expected_no_gaps_snapshot_names,
184+
deployability_index_for_creation=DeployabilityIndex.all_deployable()
185+
if not paused_forward_only
186+
else DeployabilityIndex.none_deployable(),
184187
)
185188

186189
state_sync_mock.get_snapshots.assert_called_once()

0 commit comments

Comments
 (0)