Skip to content

Commit 2c72f44

Browse files
authored
Fix: Never ignore forward-only snapshots when building a plan (#2227)
1 parent 7997477 commit 2c72f44

File tree

2 files changed

+143
-17
lines changed

2 files changed

+143
-17
lines changed

sqlmesh/core/plan/builder.py

Lines changed: 39 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -187,25 +187,37 @@ def build(self) -> Plan:
187187

188188
self._apply_effective_from()
189189

190-
dag, ignored = self._build_filtered_dag()
190+
dag = self._build_dag()
191191
directly_modified, indirectly_modified = self._build_directly_and_indirectly_modified(dag)
192-
models_to_backfill = self._build_models_to_backfill(dag)
193192

194193
self._categorize_snapshots(dag, directly_modified, indirectly_modified)
195194
self._adjust_new_snapshot_intervals()
196195

196+
deployability_index = (
197+
DeployabilityIndex.create(self._context_diff.snapshots.values())
198+
if self._is_dev
199+
else DeployabilityIndex.all_deployable()
200+
)
201+
202+
filtered_dag, ignored = self._build_filtered_dag(dag, deployability_index)
203+
204+
# Exclude ignored snapshots from the modified sets.
205+
directly_modified = {s_id for s_id in directly_modified if s_id not in ignored}
206+
for s_id in list(indirectly_modified):
207+
if s_id in ignored:
208+
indirectly_modified.pop(s_id, None)
209+
else:
210+
indirectly_modified[s_id] = {
211+
s_id for s_id in indirectly_modified[s_id] if s_id not in ignored
212+
}
213+
197214
filtered_snapshots = {
198215
s.snapshot_id: s
199216
for s in self._context_diff.snapshots.values()
200217
if s.snapshot_id not in ignored
201218
}
202219

203-
deployability_index = (
204-
DeployabilityIndex.create(filtered_snapshots)
205-
if self._is_dev
206-
else DeployabilityIndex.all_deployable()
207-
)
208-
220+
models_to_backfill = self._build_models_to_backfill(filtered_dag)
209221
restatements = self._build_restatements(
210222
dag, earliest_interval_start(filtered_snapshots.values())
211223
)
@@ -236,25 +248,35 @@ def build(self) -> Plan:
236248
self._latest_plan = plan
237249
return plan
238250

239-
def _build_filtered_dag(self) -> t.Tuple[DAG[SnapshotId], t.Set[SnapshotId]]:
251+
def _build_dag(self) -> DAG[SnapshotId]:
252+
dag: DAG[SnapshotId] = DAG()
253+
for s_id, context_snapshot in self._context_diff.snapshots.items():
254+
dag.add(s_id, context_snapshot.parents)
255+
return dag
256+
257+
def _build_filtered_dag(
258+
self, full_dag: DAG[SnapshotId], deployability_index: DeployabilityIndex
259+
) -> t.Tuple[DAG[SnapshotId], t.Set[SnapshotId]]:
240260
ignored_snapshot_ids: t.Set[SnapshotId] = set()
241-
full_dag: DAG[SnapshotId] = DAG()
242261
filtered_dag: DAG[SnapshotId] = DAG()
243262
cache: t.Optional[t.Dict[str, datetime]] = {}
244-
for s_id, context_snapshot in self._context_diff.snapshots.items():
245-
full_dag.add(s_id, context_snapshot.parents)
246263
for s_id in full_dag:
247264
snapshot = self._context_diff.snapshots.get(s_id)
248265
# If the snapshot doesn't exist then it must be an external model
249266
if not snapshot:
250267
continue
251-
if snapshot.is_valid_start(
268+
269+
is_deployable = deployability_index.is_deployable(s_id)
270+
is_valid_start = snapshot.is_valid_start(
252271
self._start, start_date(snapshot, self._context_diff.snapshots.values(), cache)
253-
) and set(snapshot.parents).isdisjoint(ignored_snapshot_ids):
254-
filtered_dag.add(snapshot.snapshot_id, snapshot.parents)
272+
)
273+
if not is_deployable or (
274+
is_valid_start and set(snapshot.parents).isdisjoint(ignored_snapshot_ids)
275+
):
276+
filtered_dag.add(s_id, snapshot.parents)
255277
else:
256-
ignored_snapshot_ids.add(snapshot.snapshot_id)
257-
return (filtered_dag, ignored_snapshot_ids)
278+
ignored_snapshot_ids.add(s_id)
279+
return filtered_dag, ignored_snapshot_ids
258280

259281
def _build_restatements(
260282
self, dag: DAG[SnapshotId], earliest_interval_start: TimeLike

tests/core/test_plan.py

Lines changed: 104 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1415,6 +1415,13 @@ def test_dev_plan_depends_past(make_snapshot, mocker: MockerFixture):
14151415
'"a_child"',
14161416
'"b"',
14171417
]
1418+
assert dev_plan_start_aligned.directly_modified == {
1419+
snapshot.snapshot_id,
1420+
snapshot_child.snapshot_id,
1421+
unrelated_snapshot.snapshot_id,
1422+
}
1423+
assert dev_plan_start_aligned.indirectly_modified == {}
1424+
14181425
dev_plan_start_ahead_of_model = PlanBuilder(
14191426
context_diff, start="2023-01-02", end="2023-01-10", is_dev=True
14201427
).build()
@@ -1425,6 +1432,103 @@ def test_dev_plan_depends_past(make_snapshot, mocker: MockerFixture):
14251432
snapshot.snapshot_id,
14261433
snapshot_child.snapshot_id,
14271434
]
1435+
assert dev_plan_start_ahead_of_model.directly_modified == {unrelated_snapshot.snapshot_id}
1436+
assert dev_plan_start_ahead_of_model.indirectly_modified == {}
1437+
1438+
1439+
def test_dev_plan_depends_past_non_deployable(make_snapshot, mocker: MockerFixture):
1440+
snapshot = make_snapshot(
1441+
SqlModel(
1442+
name="a",
1443+
# self reference query so it depends_on_past
1444+
query=parse_one("select 1, ds FROM a"),
1445+
start="2023-01-01",
1446+
kind=IncrementalByTimeRangeKind(time_column="ds"),
1447+
),
1448+
)
1449+
snapshot.categorize_as(SnapshotChangeCategory.BREAKING)
1450+
1451+
updated_snapshot = make_snapshot(
1452+
SqlModel(
1453+
**{
1454+
**snapshot.model.dict(),
1455+
"query": parse_one("select 1, ds, 2 FROM a"),
1456+
}
1457+
),
1458+
)
1459+
updated_snapshot.categorize_as(SnapshotChangeCategory.FORWARD_ONLY)
1460+
1461+
snapshot_child = make_snapshot(
1462+
SqlModel(
1463+
name="a_child",
1464+
query=parse_one("select 1, ds FROM a"),
1465+
start="2023-01-01",
1466+
kind=IncrementalByTimeRangeKind(time_column="ds"),
1467+
),
1468+
nodes={'"a"': updated_snapshot.model},
1469+
)
1470+
snapshot_child.categorize_as(SnapshotChangeCategory.BREAKING)
1471+
unrelated_snapshot = make_snapshot(
1472+
SqlModel(
1473+
name="b",
1474+
query=parse_one("select 1, ds"),
1475+
start="2023-01-01",
1476+
kind=IncrementalByTimeRangeKind(time_column="ds"),
1477+
),
1478+
)
1479+
unrelated_snapshot.categorize_as(SnapshotChangeCategory.BREAKING)
1480+
1481+
assert updated_snapshot.depends_on_past
1482+
assert not snapshot_child.depends_on_past
1483+
assert not unrelated_snapshot.depends_on_past
1484+
assert snapshot_child.model.depends_on == {'"a"'}
1485+
assert snapshot_child.parents == (updated_snapshot.snapshot_id,)
1486+
assert unrelated_snapshot.model.depends_on == set()
1487+
1488+
context_diff = ContextDiff(
1489+
environment="test_environment",
1490+
is_new_environment=True,
1491+
is_unfinalized_environment=False,
1492+
create_from="prod",
1493+
added={snapshot_child.snapshot_id, unrelated_snapshot.snapshot_id},
1494+
removed_snapshots={},
1495+
modified_snapshots={snapshot.name: (updated_snapshot, snapshot)},
1496+
snapshots={
1497+
updated_snapshot.snapshot_id: updated_snapshot,
1498+
snapshot_child.snapshot_id: snapshot_child,
1499+
unrelated_snapshot.snapshot_id: unrelated_snapshot,
1500+
},
1501+
new_snapshots={
1502+
updated_snapshot.snapshot_id: snapshot,
1503+
snapshot_child.snapshot_id: snapshot_child,
1504+
unrelated_snapshot.snapshot_id: unrelated_snapshot,
1505+
},
1506+
previous_plan_id=None,
1507+
previously_promoted_snapshot_ids=set(),
1508+
previous_finalized_snapshots=None,
1509+
)
1510+
1511+
dev_plan_start_aligned = PlanBuilder(
1512+
context_diff, start="2023-01-01", end="2023-01-10", is_dev=True
1513+
).build()
1514+
assert len(dev_plan_start_aligned.new_snapshots) == 3
1515+
assert sorted([x.name for x in dev_plan_start_aligned.new_snapshots]) == [
1516+
'"a"',
1517+
'"a_child"',
1518+
'"b"',
1519+
]
1520+
1521+
# There should be no ignored snapshots because all changes are non-deployable.
1522+
dev_plan_start_ahead_of_model = PlanBuilder(
1523+
context_diff, start="2023-01-02", end="2023-01-10", is_dev=True
1524+
).build()
1525+
assert len(dev_plan_start_ahead_of_model.new_snapshots) == 3
1526+
assert sorted([x.name for x in dev_plan_start_aligned.new_snapshots]) == [
1527+
'"a"',
1528+
'"a_child"',
1529+
'"b"',
1530+
]
1531+
assert not dev_plan_start_ahead_of_model.ignored
14281532

14291533

14301534
def test_restatement_intervals_after_updating_start(sushi_context: Context):

0 commit comments

Comments
 (0)