Skip to content

Commit e716c97

Browse files
committed
Fix: User-provided plan end must take precedence over the max interval end per model (#3042)
1 parent fe059b9 commit e716c97

File tree

6 files changed

+130
-4
lines changed

6 files changed

+130
-4
lines changed

sqlmesh/core/context.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1134,7 +1134,7 @@ def plan_builder(
11341134
# to prevent unintended evaluation of the entire DAG.
11351135
default_end: t.Optional[int] = None
11361136
max_interval_end_per_model: t.Optional[t.Dict[str, int]] = None
1137-
if not run:
1137+
if not run and not end:
11381138
models_for_interval_end: t.Optional[t.Set[str]] = None
11391139
if backfill_models is not None:
11401140
models_for_interval_end = set()

sqlmesh/core/plan/builder.py

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -257,6 +257,12 @@ def build(self) -> Plan:
257257
dag, earliest_interval_start(filtered_snapshots.values())
258258
)
259259

260+
interval_end_per_model = self._interval_end_per_model
261+
if interval_end_per_model and self.override_end:
262+
# If the end date was provided explicitly by a user, then interval end for each individual
263+
# model should be ignored.
264+
interval_end_per_model = None
265+
260266
plan = Plan(
261267
context_diff=self._context_diff,
262268
plan_id=self._plan_id,
@@ -275,7 +281,7 @@ def build(self) -> Plan:
275281
ignored=ignored,
276282
deployability_index=deployability_index,
277283
restatements=restatements,
278-
interval_end_per_model=self._interval_end_per_model,
284+
interval_end_per_model=interval_end_per_model,
279285
selected_models_to_backfill=self._backfill_models,
280286
models_to_backfill=models_to_backfill,
281287
effective_from=self._effective_from,

sqlmesh/core/snapshot/definition.py

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1628,9 +1628,14 @@ def missing_intervals(
16281628
for snapshot in snapshots:
16291629
if not snapshot.evaluatable:
16301630
continue
1631-
interval = restatements.get(snapshot.snapshot_id)
16321631
snapshot_start_date = start_dt
1633-
snapshot_end_date = interval_end_per_model.get(snapshot.name, end_date)
1632+
snapshot_end_date: TimeLike = end_date
1633+
1634+
existing_interval_end = interval_end_per_model.get(snapshot.name)
1635+
if existing_interval_end and existing_interval_end > to_timestamp(snapshot_start_date):
1636+
snapshot_end_date = existing_interval_end
1637+
1638+
interval = restatements.get(snapshot.snapshot_id)
16341639
if interval:
16351640
snapshot_start_date, snapshot_end_date = (to_datetime(i) for i in interval)
16361641
snapshot = snapshot.copy()

tests/core/test_integration.py

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1258,6 +1258,22 @@ def test_select_unchanged_model_for_backfill(init_and_plan_context: t.Callable):
12581258
assert {o.name for o in schema_objects} == {"waiter_revenue_by_day", "top_waiters"}
12591259

12601260

1261+
@freeze_time("2023-01-08 15:00:00")
1262+
def test_max_interval_end_per_model_not_applied_when_end_is_provided(
1263+
init_and_plan_context: t.Callable,
1264+
):
1265+
context, plan = init_and_plan_context("examples/sushi")
1266+
context.apply(plan)
1267+
1268+
with freeze_time("2023-01-09 00:00:00"):
1269+
context.run()
1270+
1271+
plan = context.plan(
1272+
no_prompts=True, restate_models=["*"], start="2023-01-09", end="2023-01-09"
1273+
)
1274+
context.apply(plan)
1275+
1276+
12611277
@freeze_time("2023-01-08 15:00:00")
12621278
def test_select_models_for_backfill(init_and_plan_context: t.Callable):
12631279
context, _ = init_and_plan_context("examples/sushi")

tests/core/test_plan.py

Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2423,3 +2423,45 @@ def test_plan_start_when_preview_enabled(make_snapshot, mocker: MockerFixture):
24232423
enable_preview=True,
24242424
)
24252425
assert plan_builder.build().start == default_start_for_preview
2426+
2427+
2428+
def test_interval_end_per_model(make_snapshot):
2429+
snapshot = make_snapshot(SqlModel(name="a", query=parse_one("select 1, ds")))
2430+
snapshot.categorize_as(SnapshotChangeCategory.BREAKING)
2431+
2432+
new_snapshot = make_snapshot(SqlModel(name="a", query=parse_one("select 2, ds")))
2433+
2434+
context_diff = ContextDiff(
2435+
environment="test_environment",
2436+
is_new_environment=True,
2437+
is_unfinalized_environment=False,
2438+
normalize_environment_name=True,
2439+
create_from="prod",
2440+
added=set(),
2441+
removed_snapshots={},
2442+
modified_snapshots={new_snapshot.name: (new_snapshot, snapshot)},
2443+
snapshots={new_snapshot.snapshot_id: new_snapshot},
2444+
new_snapshots={new_snapshot.snapshot_id: new_snapshot},
2445+
previous_plan_id=None,
2446+
previously_promoted_snapshot_ids=set(),
2447+
previous_finalized_snapshots=None,
2448+
)
2449+
2450+
plan_builder = PlanBuilder(
2451+
context_diff,
2452+
DuckDBEngineAdapter.SCHEMA_DIFFER,
2453+
interval_end_per_model={snapshot.name: to_timestamp("2023-01-09")},
2454+
)
2455+
assert plan_builder.build().interval_end_per_model == {
2456+
snapshot.name: to_timestamp("2023-01-09")
2457+
}
2458+
2459+
# User-provided end should take precedence.
2460+
plan_builder = PlanBuilder(
2461+
context_diff,
2462+
DuckDBEngineAdapter.SCHEMA_DIFFER,
2463+
interval_end_per_model={snapshot.name: to_timestamp("2023-01-09")},
2464+
end="2023-01-10",
2465+
is_dev=True,
2466+
)
2467+
assert plan_builder.build().interval_end_per_model is None

tests/core/test_snapshot.py

Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1989,3 +1989,60 @@ def test_snapshot_pickle_intervals(make_snapshot):
19891989
assert not loaded_snapshot.dev_intervals
19901990
assert len(snapshot.intervals) > 0
19911991
assert len(snapshot.dev_intervals) > 0
1992+
1993+
1994+
def test_missing_intervals_interval_end_per_model(make_snapshot):
1995+
snapshot_a = make_snapshot(
1996+
SqlModel(
1997+
name="a",
1998+
start="2023-01-04",
1999+
query=parse_one("SELECT 1"),
2000+
),
2001+
version="a",
2002+
)
2003+
2004+
snapshot_b = make_snapshot(
2005+
SqlModel(
2006+
name="b",
2007+
start="2023-01-04",
2008+
query=parse_one("SELECT 2"),
2009+
),
2010+
version="b",
2011+
)
2012+
2013+
assert missing_intervals(
2014+
[snapshot_a, snapshot_b],
2015+
start="2023-01-04",
2016+
end="2023-01-10",
2017+
interval_end_per_model={
2018+
snapshot_a.name: to_timestamp("2023-01-09"),
2019+
snapshot_b.name: to_timestamp("2023-01-06"),
2020+
},
2021+
) == {
2022+
snapshot_a: [
2023+
(to_timestamp("2023-01-04"), to_timestamp("2023-01-05")),
2024+
(to_timestamp("2023-01-05"), to_timestamp("2023-01-06")),
2025+
(to_timestamp("2023-01-06"), to_timestamp("2023-01-07")),
2026+
(to_timestamp("2023-01-07"), to_timestamp("2023-01-08")),
2027+
(to_timestamp("2023-01-08"), to_timestamp("2023-01-09")),
2028+
],
2029+
snapshot_b: [
2030+
(to_timestamp("2023-01-04"), to_timestamp("2023-01-05")),
2031+
(to_timestamp("2023-01-05"), to_timestamp("2023-01-06")),
2032+
],
2033+
}
2034+
2035+
assert missing_intervals(
2036+
[snapshot_a, snapshot_b],
2037+
start="2023-01-08",
2038+
end="2023-01-08",
2039+
interval_end_per_model={
2040+
snapshot_a.name: to_timestamp("2023-01-09"),
2041+
snapshot_b.name: to_timestamp(
2042+
"2023-01-06"
2043+
), # The interval end is before the start. This should be ignored.
2044+
},
2045+
) == {
2046+
snapshot_a: [(to_timestamp("2023-01-08"), to_timestamp("2023-01-09"))],
2047+
snapshot_b: [(to_timestamp("2023-01-08"), to_timestamp("2023-01-09"))],
2048+
}

0 commit comments

Comments
 (0)