Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 9 additions & 2 deletions sqlmesh/core/context.py
Original file line number Diff line number Diff line change
Expand Up @@ -3043,10 +3043,17 @@ def _get_plan_default_start_end(
modified_model_names: t.Set[str],
execution_time: t.Optional[TimeLike] = None,
) -> t.Tuple[t.Optional[int], t.Optional[int]]:
if not max_interval_end_per_model:
# exclude seeds so their stale interval ends does not become the default plan end date
# when they're the only ones that contain intervals in this plan
non_seed_interval_ends = {
model_fqn: end
for model_fqn, end in max_interval_end_per_model.items()
if model_fqn not in snapshots or not snapshots[model_fqn].is_seed
}
if not non_seed_interval_ends:
return None, None

default_end = to_timestamp(max(max_interval_end_per_model.values()))
default_end = to_timestamp(max(non_seed_interval_ends.values()))
default_start: t.Optional[int] = None
# Infer the default start by finding the smallest interval start that corresponds to the default end.
for model_name in backfill_models or modified_model_names or max_interval_end_per_model:
Expand Down
66 changes: 66 additions & 0 deletions tests/core/test_context.py
Original file line number Diff line number Diff line change
Expand Up @@ -1157,6 +1157,72 @@ def test_plan_start_ahead_of_end(copy_to_temp_path):
context.close()


@pytest.mark.slow
def test_plan_seed_model_excluded_from_default_end(copy_to_temp_path: t.Callable):
path = copy_to_temp_path("examples/sushi")
with time_machine.travel("2024-06-01 00:00:00 UTC"):
context = Context(paths=path, gateway="duckdb_persistent")
context.plan("prod", no_prompts=True, auto_apply=True)
max_ends = context.state_sync.max_interval_end_per_model("prod")
seed_fqns = [k for k in max_ends if "waiter_names" in k]
assert len(seed_fqns) == 1
assert max_ends[seed_fqns[0]] == to_timestamp("2024-06-01")
context.close()

with time_machine.travel("2026-03-01 00:00:00 UTC"):
context = Context(paths=path, gateway="duckdb_persistent")

# a model that depends on this seed but has no interval in prod yet so only the seed would contribute to max_interval_end_per_model
context.upsert_model(
load_sql_based_model(
parse(
"""
MODEL(
name sushi.waiter_summary,
kind INCREMENTAL_BY_TIME_RANGE (
time_column ds
),
start '2025-01-01',
cron '@daily'
);

SELECT
id,
name,
@start_ds AS ds
FROM
sushi.waiter_names
WHERE
@start_ds BETWEEN @start_ds AND @end_ds
"""
),
default_catalog=context.default_catalog,
)
)

# the seed's interval end would still be 2024-06-01
max_ends = context.state_sync.max_interval_end_per_model("prod")
seed_fqns = [k for k in max_ends if "waiter_names" in k]
assert len(seed_fqns) == 1
assert max_ends[seed_fqns[0]] == to_timestamp("2024-06-01")

# the plan start date 2025-01-01 is after the seeds end date but shouldnt cause the plan to fail
plan = context.plan(
"dev", start="2025-01-01", no_prompts=True, select_models=["*waiter_summary"]
)

# the end should fall back to execution_time rather than seeds end
assert plan.models_to_backfill == {
'"duckdb"."sushi"."waiter_names"',
'"duckdb"."sushi"."waiter_summary"',
}
assert plan.provided_end is None
assert plan.provided_start == "2025-01-01"
assert to_timestamp(plan.end) == to_timestamp("2026-03-01")
assert to_timestamp(plan.start) == to_timestamp("2025-01-01")
context.close()


@pytest.mark.slow
def test_schema_error_no_default(sushi_context_pre_scheduling) -> None:
context = sushi_context_pre_scheduling
Expand Down