Skip to content

Commit 87161bd

Browse files
committed
Fix: Unexpected backfill of a parent when an interval outside the parent's range is restated for a child
1 parent 77ce71c commit 87161bd

File tree

2 files changed

+71
-12
lines changed

2 files changed

+71
-12
lines changed

sqlmesh/core/snapshot/definition.py

Lines changed: 18 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -2081,16 +2081,20 @@ def missing_intervals(
20812081
continue
20822082
snapshot_end_date = existing_interval_end
20832083

2084+
snapshot_start_date = max(
2085+
to_datetime(snapshot_start_date),
2086+
to_datetime(start_date(snapshot, snapshots, cache, relative_to=snapshot_end_date)),
2087+
)
2088+
if snapshot_start_date > to_datetime(snapshot_end_date):
2089+
continue
2090+
20842091
missing_interval_end_date = snapshot_end_date
20852092
node_end_date = snapshot.node.end
20862093
if node_end_date and (to_datetime(node_end_date) < to_datetime(snapshot_end_date)):
20872094
missing_interval_end_date = node_end_date
20882095

20892096
intervals = snapshot.missing_intervals(
2090-
max(
2091-
to_datetime(snapshot_start_date),
2092-
to_datetime(start_date(snapshot, snapshots, cache, relative_to=snapshot_end_date)),
2093-
),
2097+
snapshot_start_date,
20942098
missing_interval_end_date,
20952099
execution_time=execution_time,
20962100
deployability_index=deployability_index,
@@ -2295,14 +2299,16 @@ def start_date(
22952299
if not isinstance(snapshots, dict):
22962300
snapshots = {snapshot.snapshot_id: snapshot for snapshot in snapshots}
22972301

2298-
earliest = snapshot.node.cron_prev(snapshot.node.cron_floor(relative_to or now()))
2299-
2300-
for parent in snapshot.parents:
2301-
if parent in snapshots:
2302-
earliest = min(
2303-
earliest,
2304-
start_date(snapshots[parent], snapshots, cache=cache, relative_to=relative_to),
2305-
)
2302+
parent_starts = [
2303+
start_date(snapshots[parent], snapshots, cache=cache, relative_to=relative_to)
2304+
for parent in snapshot.parents
2305+
if parent in snapshots
2306+
]
2307+
earliest = (
2308+
min(parent_starts)
2309+
if parent_starts
2310+
else snapshot.node.cron_prev(snapshot.node.cron_floor(relative_to or now()))
2311+
)
23062312

23072313
cache[key] = earliest
23082314
return earliest

tests/core/integration/test_restatement.py

Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1880,3 +1880,56 @@ def _run_restatement_plan(tmp_path: Path, config: Config, q: queue.Queue):
18801880
assert len(model_a.intervals)
18811881

18821882
set_console(orig_console)
1883+
1884+
1885+
@time_machine.travel("2023-01-08 15:00:00 UTC")
1886+
def test_restatement_plan_outside_parent_date_range(init_and_plan_context: t.Callable):
1887+
context, _ = init_and_plan_context("examples/sushi")
1888+
1889+
context.upsert_model("sushi.items", start="2023-01-06")
1890+
context.upsert_model("sushi.orders", start="2023-01-06")
1891+
# One of the parents should derive the start from its own parents for the issue
1892+
# to reproduce
1893+
context.upsert_model("sushi.order_items", start=None)
1894+
context.upsert_model("sushi.waiter_revenue_by_day", start="2023-01-01", audits=[])
1895+
1896+
context.plan("prod", auto_apply=True, no_prompts=True, skip_tests=True)
1897+
1898+
restated_snapshot = context.get_snapshot("sushi.waiter_revenue_by_day")
1899+
downstream_snapshot = context.get_snapshot("sushi.top_waiters")
1900+
1901+
plan = context.plan_builder(
1902+
restate_models=["sushi.waiter_revenue_by_day"],
1903+
start="2023-01-01",
1904+
end="2023-01-01",
1905+
min_intervals=0,
1906+
).build()
1907+
assert plan.snapshots != context.snapshots
1908+
1909+
assert plan.requires_backfill
1910+
assert plan.restatements == {
1911+
restated_snapshot.snapshot_id: (to_timestamp("2023-01-01"), to_timestamp("2023-01-02")),
1912+
downstream_snapshot.snapshot_id: (to_timestamp("2023-01-01"), to_timestamp("2023-01-09")),
1913+
}
1914+
assert plan.missing_intervals == [
1915+
SnapshotIntervals(
1916+
snapshot_id=downstream_snapshot.snapshot_id,
1917+
intervals=[
1918+
(to_timestamp("2023-01-01"), to_timestamp("2023-01-02")),
1919+
(to_timestamp("2023-01-02"), to_timestamp("2023-01-03")),
1920+
(to_timestamp("2023-01-03"), to_timestamp("2023-01-04")),
1921+
(to_timestamp("2023-01-04"), to_timestamp("2023-01-05")),
1922+
(to_timestamp("2023-01-05"), to_timestamp("2023-01-06")),
1923+
(to_timestamp("2023-01-06"), to_timestamp("2023-01-07")),
1924+
(to_timestamp("2023-01-07"), to_timestamp("2023-01-08")),
1925+
],
1926+
),
1927+
SnapshotIntervals(
1928+
snapshot_id=restated_snapshot.snapshot_id,
1929+
intervals=[
1930+
(to_timestamp("2023-01-01"), to_timestamp("2023-01-02")),
1931+
],
1932+
),
1933+
]
1934+
1935+
context.apply(plan)

0 commit comments

Comments
 (0)