Skip to content

Commit ad66273

Browse files
committed
Fix: Improve the inference of the default plan start for new models for which cron is not aligned with interval unit boundary (#3758)
1 parent 3266db8 commit ad66273

File tree

2 files changed

+72
-3
lines changed

2 files changed

+72
-3
lines changed

sqlmesh/core/context.py

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -637,6 +637,7 @@ def run(
637637
True if the run was successful, False otherwise.
638638
"""
639639
environment = environment or self.config.default_target_environment
640+
environment = Environment.sanitize_name(environment)
640641
if not skip_janitor and environment.lower() == c.PROD:
641642
self._run_janitor()
642643

@@ -1321,14 +1322,16 @@ def plan_builder(
13211322
):
13221323
if model_name not in snapshots:
13231324
continue
1324-
interval_unit = snapshots[model_name].node.interval_unit
1325+
node = snapshots[model_name].node
1326+
interval_unit = node.interval_unit
13251327
default_start = min(
13261328
default_start or sys.maxsize,
13271329
to_timestamp(
13281330
interval_unit.cron_prev(
13291331
interval_unit.cron_floor(
1330-
max_interval_end_per_model.get(model_name, default_end),
1331-
estimate=True,
1332+
max_interval_end_per_model.get(
1333+
model_name, node.cron_floor(default_end)
1334+
),
13321335
),
13331336
estimate=True,
13341337
)

tests/core/test_integration.py

Lines changed: 66 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -701,6 +701,72 @@ def test_cron_not_aligned_with_day_boundary(
701701
]
702702

703703

704+
@time_machine.travel("2023-01-08 00:00:00 UTC")
705+
def test_cron_not_aligned_with_day_boundary_new_model(init_and_plan_context: t.Callable):
706+
context, _ = init_and_plan_context("examples/sushi")
707+
708+
existing_model = context.get_model("sushi.waiter_revenue_by_day")
709+
existing_model = SqlModel.parse_obj(
710+
{
711+
**existing_model.dict(),
712+
"kind": existing_model.kind.copy(update={"forward_only": True}),
713+
}
714+
)
715+
context.upsert_model(existing_model)
716+
717+
plan = context.plan_builder("prod", skip_tests=True).build()
718+
context.apply(plan)
719+
720+
# Add a new model and make a change to a forward-only model.
721+
# The cron of the new model is not aligned with the day boundary.
722+
new_model = load_sql_based_model(
723+
d.parse(
724+
"""
725+
MODEL (
726+
name memory.sushi.new_model,
727+
kind FULL,
728+
cron '0 8 * * *',
729+
start '2023-01-01',
730+
);
731+
732+
SELECT 1 AS one;
733+
"""
734+
)
735+
)
736+
context.upsert_model(new_model)
737+
738+
existing_model = add_projection_to_model(t.cast(SqlModel, existing_model), literal=True)
739+
context.upsert_model(existing_model)
740+
741+
plan = context.plan_builder("dev", skip_tests=True, enable_preview=True).build()
742+
assert plan.missing_intervals == [
743+
SnapshotIntervals(
744+
snapshot_id=context.get_snapshot(
745+
"memory.sushi.new_model", raise_if_missing=True
746+
).snapshot_id,
747+
intervals=[(to_timestamp("2023-01-06"), to_timestamp("2023-01-07"))],
748+
),
749+
SnapshotIntervals(
750+
snapshot_id=context.get_snapshot(
751+
"sushi.top_waiters", raise_if_missing=True
752+
).snapshot_id,
753+
intervals=[
754+
(to_timestamp("2023-01-06"), to_timestamp("2023-01-07")),
755+
(to_timestamp("2023-01-07"), to_timestamp("2023-01-08")),
756+
],
757+
),
758+
SnapshotIntervals(
759+
snapshot_id=context.get_snapshot(
760+
"sushi.waiter_revenue_by_day", raise_if_missing=True
761+
).snapshot_id,
762+
intervals=[
763+
(to_timestamp("2023-01-06"), to_timestamp("2023-01-07")),
764+
(to_timestamp("2023-01-07"), to_timestamp("2023-01-08")),
765+
],
766+
),
767+
]
768+
769+
704770
@time_machine.travel("2023-01-08 00:00:00 UTC")
705771
def test_forward_only_monthly_model(init_and_plan_context: t.Callable):
706772
context, _ = init_and_plan_context("examples/sushi")

0 commit comments

Comments
 (0)