Skip to content

Commit 5d08cdf

Browse files
committed
Fix: The inference of the default plan start (#3628)
1 parent 008f7d7 commit 5d08cdf

File tree

3 files changed

+76
-16
lines changed

3 files changed

+76
-16
lines changed

sqlmesh/core/context.py

Lines changed: 26 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,6 @@
4141
import traceback
4242
import typing as t
4343
import unittest.result
44-
from datetime import date, timedelta
4544
from functools import cached_property
4645
from io import StringIO
4746
from pathlib import Path
@@ -112,7 +111,7 @@
112111
from sqlmesh.core.user import User
113112
from sqlmesh.utils import UniqueKeyDict, sys_path
114113
from sqlmesh.utils.dag import DAG
115-
from sqlmesh.utils.date import TimeLike, now_ds, to_date
114+
from sqlmesh.utils.date import TimeLike, now_ds, to_timestamp
116115
from sqlmesh.utils.errors import (
117116
CircuitBreakerError,
118117
ConfigError,
@@ -1266,6 +1265,10 @@ def plan_builder(
12661265
or (backfill_models is not None and not backfill_models),
12671266
ensure_finalized_snapshots=self.config.plan.use_finalized_state,
12681267
)
1268+
modified_model_names = {
1269+
*context_diff.modified_snapshots,
1270+
*[s.name for s in context_diff.added],
1271+
}
12691272

12701273
if (
12711274
is_dev
@@ -1275,15 +1278,12 @@ def plan_builder(
12751278
):
12761279
# Only backfill modified and added models.
12771280
# This ensures that no models outside the impacted sub-DAG(s) will be backfilled unexpectedly.
1278-
backfill_models = {
1279-
*context_diff.modified_snapshots,
1280-
*[s.name for s in context_diff.added],
1281-
} or None
1281+
backfill_models = modified_model_names or None
12821282

12831283
# If no end date is specified, use the max interval end from prod
12841284
# to prevent unintended evaluation of the entire DAG.
12851285
default_end: t.Optional[int] = None
1286-
default_start: t.Optional[date] = None
1286+
default_start: t.Optional[int] = None
12871287
max_interval_end_per_model: t.Optional[t.Dict[str, int]] = None
12881288
if not run and not end:
12891289
models_for_interval_end: t.Optional[t.Set[str]] = None
@@ -1308,9 +1308,25 @@ def plan_builder(
13081308
)
13091309
if max_interval_end_per_model:
13101310
default_end = max(max_interval_end_per_model.values())
1311-
default_start = to_date(min(max_interval_end_per_model.values())) - timedelta(
1312-
days=1
1313-
)
1311+
# Infer the default start by finding the smallest interval start that corresponds to the default end.
1312+
for model_name in (
1313+
backfill_models or modified_model_names or max_interval_end_per_model
1314+
):
1315+
if model_name not in snapshots:
1316+
continue
1317+
interval_unit = snapshots[model_name].node.interval_unit
1318+
default_start = min(
1319+
default_start or sys.maxsize,
1320+
to_timestamp(
1321+
interval_unit.cron_prev(
1322+
interval_unit.cron_floor(
1323+
max_interval_end_per_model.get(model_name, default_end),
1324+
estimate=True,
1325+
),
1326+
estimate=True,
1327+
)
1328+
),
1329+
)
13141330

13151331
return self.PLAN_BUILDER_TYPE(
13161332
context_diff=context_diff,

tests/core/test_context.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -852,7 +852,7 @@ def test_plan_default_end(sushi_context_pre_scheduling: Context):
852852
).build()
853853
assert forward_only_dev_plan.end is not None
854854
assert to_date(make_inclusive_end(forward_only_dev_plan.end)) == plan_end
855-
assert forward_only_dev_plan.start == plan_end
855+
assert to_timestamp(forward_only_dev_plan.start) == to_timestamp(plan_end)
856856

857857

858858
@pytest.mark.slow

tests/core/test_integration.py

Lines changed: 49 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -100,7 +100,7 @@ def test_forward_only_plan_with_effective_date(context_fixture: Context, request
100100
plan.context_diff.snapshots[top_waiters_snapshot.snapshot_id].change_category
101101
== SnapshotChangeCategory.FORWARD_ONLY
102102
)
103-
assert plan.start == to_date("2023-01-07")
103+
assert to_timestamp(plan.start) == to_timestamp("2023-01-07")
104104
assert plan.missing_intervals == [
105105
SnapshotIntervals(
106106
snapshot_id=top_waiters_snapshot.snapshot_id,
@@ -335,7 +335,7 @@ def test_forward_only_model_regular_plan_preview_enabled(init_and_plan_context:
335335
plan.context_diff.snapshots[top_waiters_snapshot.snapshot_id].change_category
336336
== SnapshotChangeCategory.FORWARD_ONLY
337337
)
338-
assert plan.start == to_date("2023-01-07")
338+
assert to_timestamp(plan.start) == to_timestamp("2023-01-07")
339339
assert plan.missing_intervals == [
340340
SnapshotIntervals(
341341
snapshot_id=top_waiters_snapshot.snapshot_id,
@@ -472,7 +472,7 @@ def test_full_history_restatement_model_regular_plan_preview_enabled(
472472
== SnapshotChangeCategory.FORWARD_ONLY
473473
)
474474

475-
assert plan.start == to_date("2023-01-07")
475+
assert to_timestamp(plan.start) == to_timestamp("2023-01-07")
476476
assert plan.missing_intervals == [
477477
SnapshotIntervals(
478478
snapshot_id=active_customers_snapshot.snapshot_id,
@@ -690,6 +690,50 @@ def test_cron_not_aligned_with_day_boundary(
690690
]
691691

692692

693+
@time_machine.travel("2023-01-08 00:00:00 UTC")
694+
def test_forward_only_monthly_model(init_and_plan_context: t.Callable):
695+
context, _ = init_and_plan_context("examples/sushi")
696+
697+
model = context.get_model("sushi.waiter_revenue_by_day")
698+
model = SqlModel.parse_obj(
699+
{
700+
**model.dict(),
701+
"kind": model.kind.copy(update={"forward_only": True}),
702+
"cron": "0 0 1 * *",
703+
"start": "2022-01-01",
704+
"audits": [],
705+
}
706+
)
707+
context.upsert_model(model)
708+
709+
plan = context.plan_builder("prod", skip_tests=True).build()
710+
context.apply(plan)
711+
712+
waiter_revenue_by_day_snapshot = context.get_snapshot(model.name, raise_if_missing=True)
713+
assert waiter_revenue_by_day_snapshot.intervals == [
714+
(to_timestamp("2022-01-01"), to_timestamp("2023-01-01"))
715+
]
716+
717+
model = add_projection_to_model(t.cast(SqlModel, model), literal=True)
718+
context.upsert_model(model)
719+
720+
waiter_revenue_by_day_snapshot = context.get_snapshot(
721+
"sushi.waiter_revenue_by_day", raise_if_missing=True
722+
)
723+
724+
plan = context.plan_builder(
725+
"dev", select_models=[model.name], skip_tests=True, enable_preview=True
726+
).build()
727+
assert to_timestamp(plan.start) == to_timestamp("2022-12-01")
728+
assert to_timestamp(plan.end) == to_timestamp("2023-01-08")
729+
assert plan.missing_intervals == [
730+
SnapshotIntervals(
731+
snapshot_id=waiter_revenue_by_day_snapshot.snapshot_id,
732+
intervals=[(to_timestamp("2022-12-01"), to_timestamp("2023-01-01"))],
733+
),
734+
]
735+
736+
693737
@time_machine.travel("2023-01-08 15:00:00 UTC")
694738
def test_forward_only_parent_created_in_dev_child_created_in_prod(
695739
init_and_plan_context: t.Callable,
@@ -915,7 +959,7 @@ def test_non_breaking_change_after_forward_only_in_dev(
915959
plan.context_diff.snapshots[top_waiters_snapshot.snapshot_id].change_category
916960
== SnapshotChangeCategory.FORWARD_ONLY
917961
)
918-
assert plan.start == to_date("2023-01-07")
962+
assert to_timestamp(plan.start) == to_timestamp("2023-01-07")
919963
assert plan.missing_intervals == [
920964
SnapshotIntervals(
921965
snapshot_id=top_waiters_snapshot.snapshot_id,
@@ -947,7 +991,7 @@ def test_non_breaking_change_after_forward_only_in_dev(
947991
plan.context_diff.snapshots[top_waiters_snapshot.snapshot_id].change_category
948992
== SnapshotChangeCategory.NON_BREAKING
949993
)
950-
assert plan.start == to_timestamp("2023-01-01")
994+
assert to_timestamp(plan.start) == to_timestamp("2023-01-01")
951995
assert plan.missing_intervals == [
952996
SnapshotIntervals(
953997
snapshot_id=top_waiters_snapshot.snapshot_id,

0 commit comments

Comments
 (0)