Skip to content

Commit b38bc85

Browse files
committed
Fix: Take max interval end for a model into account even when there's a restatement interval (#3913)
1 parent 3111e5c commit b38bc85

File tree

3 files changed

+35
-13
lines changed

3 files changed

+35
-13
lines changed

sqlmesh/core/plan/builder.py

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -292,11 +292,6 @@ def _build_dag(self) -> DAG[SnapshotId]:
292292
def _build_restatements(
293293
self, dag: DAG[SnapshotId], earliest_interval_start: TimeLike
294294
) -> t.Dict[SnapshotId, Interval]:
295-
def is_restateable_snapshot(snapshot: Snapshot) -> bool:
296-
if not self._is_dev and snapshot.disable_restatement:
297-
return False
298-
return not snapshot.is_symbolic and not snapshot.is_seed
299-
300295
restate_models = self._restate_models
301296
if restate_models == set():
302297
# This is a warning but we print this as error since the Console is lacking API for warnings.

sqlmesh/core/snapshot/definition.py

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1745,14 +1745,14 @@ def missing_intervals(
17451745
snapshot = snapshot.copy()
17461746
snapshot.intervals = snapshot.intervals.copy()
17471747
snapshot.remove_interval(restated_interval)
1748-
else:
1749-
existing_interval_end = interval_end_per_model.get(snapshot.name)
1750-
if existing_interval_end:
1751-
if to_timestamp(snapshot_start_date) >= existing_interval_end:
1752-
# The start exceeds the provided interval end, so we can skip this snapshot
1753-
# since it doesn't have missing intervals by definition
1754-
continue
1755-
snapshot_end_date = existing_interval_end
1748+
1749+
existing_interval_end = interval_end_per_model.get(snapshot.name)
1750+
if existing_interval_end:
1751+
if to_timestamp(snapshot_start_date) >= existing_interval_end:
1752+
# The start exceeds the provided interval end, so we can skip this snapshot
1753+
# since it doesn't have missing intervals by definition
1754+
continue
1755+
snapshot_end_date = existing_interval_end
17561756

17571757
missing_interval_end_date = snapshot_end_date
17581758
node_end_date = snapshot.node.end

tests/core/test_integration.py

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4572,6 +4572,33 @@ def test_restatement_of_full_model_with_start(init_and_plan_context: t.Callable)
45724572
assert waiter_by_day_interval == (to_timestamp("2023-01-07"), to_timestamp("2023-01-08"))
45734573

45744574

4575+
@time_machine.travel("2023-01-08 15:00:00 UTC")
4576+
def test_restatement_shouldnt_backfill_beyond_prod_intervals(init_and_plan_context: t.Callable):
4577+
context, _ = init_and_plan_context("examples/sushi")
4578+
4579+
model = context.get_model("sushi.top_waiters")
4580+
context.upsert_model(SqlModel.parse_obj({**model.dict(), "cron": "@hourly"}))
4581+
4582+
context.plan("prod", auto_apply=True, no_prompts=True, skip_tests=True)
4583+
context.run()
4584+
4585+
with time_machine.travel("2023-01-09 02:00:00 UTC"):
4586+
# It's time to backfill the waiter_revenue_by_day model but it hasn't run yet
4587+
restatement_plan = context.plan(
4588+
restate_models=["sushi.waiter_revenue_by_day"],
4589+
no_prompts=True,
4590+
skip_tests=True,
4591+
)
4592+
intervals_by_id = {i.snapshot_id: i for i in restatement_plan.missing_intervals}
4593+
# Make sure the intervals don't go beyond the prod intervals
4594+
assert intervals_by_id[context.get_snapshot("sushi.top_waiters").snapshot_id].intervals[-1][
4595+
1
4596+
] == to_timestamp("2023-01-08 15:00:00 UTC")
4597+
assert intervals_by_id[
4598+
context.get_snapshot("sushi.waiter_revenue_by_day").snapshot_id
4599+
].intervals[-1][1] == to_timestamp("2023-01-08 00:00:00 UTC")
4600+
4601+
45754602
def initial_add(context: Context, environment: str):
45764603
assert not context.state_reader.get_environment(environment)
45774604

0 commit comments

Comments
 (0)