Skip to content

Commit d820ce3

Browse files
authored
Fix: Invalid fallback start date for snapshots with missing start (#1504)
1 parent 7c95bc1 commit d820ce3

File tree

5 files changed

+44
-8
lines changed

5 files changed

+44
-8
lines changed

sqlmesh/core/context.py

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@
3838
import traceback
3939
import typing as t
4040
import unittest.result
41+
from datetime import timedelta
4142
from io import StringIO
4243
from pathlib import Path
4344
from types import MappingProxyType
@@ -89,7 +90,7 @@
8990
from sqlmesh.core.user import User
9091
from sqlmesh.utils import UniqueKeyDict, env_vars, sys_path
9192
from sqlmesh.utils.dag import DAG
92-
from sqlmesh.utils.date import TimeLike, now_ds
93+
from sqlmesh.utils.date import TimeLike, now_ds, to_date
9394
from sqlmesh.utils.errors import (
9495
ConfigError,
9596
MissingDependencyError,
@@ -775,6 +776,7 @@ def plan(
775776
# If no end date is specified, use the max interval end from prod
776777
# to prevent unintended evaluation of the entire DAG.
777778
default_end = self.state_sync.max_interval_end_for_environment(c.PROD) if is_dev else None
779+
default_start = to_date(default_end) - timedelta(days=1) if default_end else None
778780

779781
plan = Plan(
780782
context_diff=self._context_diff(
@@ -797,6 +799,7 @@ def plan(
797799
auto_categorization_enabled=not no_auto_categorization,
798800
effective_from=effective_from,
799801
include_unmodified=include_unmodified,
802+
default_start=default_start,
800803
default_end=default_end,
801804
)
802805

sqlmesh/core/plan/definition.py

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -71,6 +71,7 @@ class Plan:
7171
effective_from: The effective date from which to apply forward-only changes on production.
7272
include_unmodified: Indicates whether to include unmodified nodes in the target development environment.
7373
environment_suffix_target: Indicates whether to append the environment name to the schema or table name.
74+
default_start: The default plan start to use if not specified.
7475
default_end: The default plan end to use if not specified.
7576
"""
7677

@@ -92,6 +93,7 @@ def __init__(
9293
auto_categorization_enabled: bool = True,
9394
effective_from: t.Optional[TimeLike] = None,
9495
include_unmodified: bool = False,
96+
default_start: t.Optional[TimeLike] = None,
9597
default_end: t.Optional[TimeLike] = None,
9698
):
9799
self.context_diff = context_diff
@@ -109,7 +111,9 @@ def __init__(
109111
self.include_unmodified = include_unmodified
110112
self._restate_models = set(restate_models or [])
111113
self._effective_from: t.Optional[TimeLike] = None
112-
self._start = start if start or not (is_dev and forward_only) else yesterday_ds()
114+
self._start = (
115+
start if start or not (is_dev and forward_only) else (default_start or yesterday_ds())
116+
)
113117
self._end = end if end or not is_dev else (default_end or now())
114118
self._execution_time = execution_time or now()
115119
self._apply = apply

sqlmesh/core/snapshot/definition.py

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1253,9 +1253,6 @@ def earliest_start_date(
12531253
snapshots = list(snapshots)
12541254
earliest = to_datetime(yesterday().date())
12551255
if snapshots:
1256-
for snapshot in snapshots:
1257-
if not snapshot.parents and not snapshot.node.start:
1258-
cache[snapshot.name] = earliest
12591256
return min(start_date(snapshot, snapshots, cache) or earliest for snapshot in snapshots)
12601257
return earliest
12611258

tests/core/test_context.py

Lines changed: 21 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
import pathlib
2-
from datetime import date
2+
from datetime import date, timedelta
33
from tempfile import TemporaryDirectory
44
from unittest.mock import call
55

@@ -19,7 +19,7 @@
1919
from sqlmesh.core.environment import Environment
2020
from sqlmesh.core.model import load_sql_based_model
2121
from sqlmesh.core.plan import BuiltInPlanEvaluator, Plan
22-
from sqlmesh.utils.date import yesterday_ds
22+
from sqlmesh.utils.date import now, to_date, yesterday_ds
2323
from sqlmesh.utils.errors import ConfigError
2424
from tests.utils.test_filesystem import create_temp_file
2525

@@ -425,3 +425,22 @@ def test_janitor(sushi_context, mocker: MockerFixture) -> None:
425425
],
426426
any_order=True,
427427
)
428+
429+
430+
def test_plan_default_end(sushi_context_pre_scheduling: Context):
431+
prod_plan = sushi_context_pre_scheduling.plan("prod", no_prompts=True)
432+
# Simulate that the prod is 3 days behind.
433+
plan_end = to_date(now()) - timedelta(days=3)
434+
prod_plan._end = plan_end
435+
sushi_context_pre_scheduling.apply(prod_plan)
436+
437+
dev_plan = sushi_context_pre_scheduling.plan(
438+
"test_env", no_prompts=True, include_unmodified=True, skip_backfill=True, auto_apply=True
439+
)
440+
assert dev_plan.end == plan_end
441+
442+
forward_only_dev_plan = sushi_context_pre_scheduling.plan(
443+
"test_env_forward_only", no_prompts=True, include_unmodified=True, forward_only=True
444+
)
445+
assert forward_only_dev_plan.end == plan_end
446+
assert forward_only_dev_plan._start == plan_end

tests/core/test_snapshot.py

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
import json
22
import typing as t
33
from copy import deepcopy
4-
from datetime import timedelta
4+
from datetime import datetime, timedelta
55
from pathlib import Path
66

77
import pytest
@@ -11,6 +11,7 @@
1111

1212
from sqlmesh.core.audit import StandaloneAudit
1313
from sqlmesh.core.config import AutoCategorizationMode, CategorizerConfig
14+
from sqlmesh.core.context import Context
1415
from sqlmesh.core.dialect import parse, parse_one
1516
from sqlmesh.core.environment import EnvironmentNamingInfo
1617
from sqlmesh.core.model import (
@@ -30,6 +31,7 @@
3031
SnapshotChangeCategory,
3132
SnapshotFingerprint,
3233
categorize_change,
34+
earliest_start_date,
3335
fingerprint_from_node,
3436
has_paused_forward_only,
3537
)
@@ -1239,3 +1241,14 @@ def test_multi_interval_merge(make_snapshot):
12391241

12401242
assert b_start == to_timestamp("2023-01-01 00:15:00")
12411243
assert b_end == to_timestamp("2023-01-01 00:45:00")
1244+
1245+
1246+
def test_earliest_start_date(sushi_context: Context):
1247+
model_name = "sushi.waiter_names"
1248+
assert sushi_context.snapshots[model_name].node.start is None
1249+
1250+
cache: t.Dict[str, datetime] = {}
1251+
earliest_start_date(sushi_context.snapshots.values(), cache)
1252+
1253+
# Make sure that the default value for a snapshot with a missing start is not cached.
1254+
assert model_name not in cache

0 commit comments

Comments
 (0)