Skip to content

Commit 87c2ebc

Browse files
committed
Chore: Treat the target environment as missing during planning if it's expired (#3051)
1 parent 43af1bb commit 87c2ebc

File tree

6 files changed

+78
-15
lines changed

6 files changed

+78
-15
lines changed

sqlmesh/core/context.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1249,6 +1249,7 @@ def invalidate_environment(self, name: str, sync: bool = False) -> None:
12491249
sync: If True, the call blocks until the environment is deleted. Otherwise, the environment will
12501250
be deleted asynchronously by the janitor process.
12511251
"""
1252+
name = Environment.sanitize_name(name)
12521253
self.state_sync.invalidate_environment(name)
12531254
if sync:
12541255
self._cleanup_environments()

sqlmesh/core/context_diff.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -87,7 +87,7 @@ def create(
8787
environment = environment.lower()
8888
env = state_reader.get_environment(environment)
8989

90-
if env is None:
90+
if env is None or env.expired:
9191
env = state_reader.get_environment(create_from.lower())
9292
is_new_environment = True
9393
previously_promoted_snapshot_ids = set()

sqlmesh/core/environment.py

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@
1010
from sqlmesh.core.config import EnvironmentSuffixTarget
1111
from sqlmesh.core.snapshot import SnapshotId, SnapshotTableInfo
1212
from sqlmesh.utils import word_characters_only
13-
from sqlmesh.utils.date import TimeLike
13+
from sqlmesh.utils.date import TimeLike, now_timestamp
1414
from sqlmesh.utils.pydantic import PydanticModel, field_validator
1515

1616
T = t.TypeVar("T", bound="EnvironmentNamingInfo")
@@ -152,3 +152,7 @@ def naming_info(self) -> EnvironmentNamingInfo:
152152
catalog_name_override=self.catalog_name_override,
153153
normalize_name=self.normalize_name,
154154
)
155+
156+
@property
157+
def expired(self) -> bool:
158+
return self.expiration_ts is not None and self.expiration_ts <= now_timestamp()

sqlmesh/core/state_sync/engine_adapter.py

Lines changed: 14 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -284,19 +284,20 @@ def promote(
284284
)
285285
!= table_infos[name].qualified_view_name.for_environment(environment.naming_info)
286286
}
287-
if environment.previous_plan_id != existing_environment.plan_id:
288-
raise SQLMeshError(
289-
f"Plan '{environment.plan_id}' is no longer valid for the target environment '{environment.name}'. "
290-
f"Expected previous plan ID: '{environment.previous_plan_id}', actual previous plan ID: '{existing_environment.plan_id}'. "
291-
"Please recreate the plan and try again"
292-
)
293-
if no_gaps_snapshot_names != set():
294-
snapshots = self._get_snapshots(environment.snapshots).values()
295-
self._ensure_no_gaps(
296-
snapshots,
297-
existing_environment,
298-
no_gaps_snapshot_names,
299-
)
287+
if not existing_environment.expired:
288+
if environment.previous_plan_id != existing_environment.plan_id:
289+
raise SQLMeshError(
290+
f"Plan '{environment.plan_id}' is no longer valid for the target environment '{environment.name}'. "
291+
f"Expected previous plan ID: '{environment.previous_plan_id}', actual previous plan ID: '{existing_environment.plan_id}'. "
292+
"Please recreate the plan and try again"
293+
)
294+
if no_gaps_snapshot_names != set():
295+
snapshots = self._get_snapshots(environment.snapshots).values()
296+
self._ensure_no_gaps(
297+
snapshots,
298+
existing_environment,
299+
no_gaps_snapshot_names,
300+
)
300301
demoted_snapshots = set(existing_environment.snapshots) - set(environment.snapshots)
301302
# Update the updated_at attribute.
302303
self._update_snapshots(demoted_snapshots)

tests/core/test_integration.py

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1559,6 +1559,37 @@ def test_restatement_plan_ignores_changes(init_and_plan_context: t.Callable):
15591559
context.apply(plan)
15601560

15611561

1562+
@freeze_time("2023-01-08 15:00:00")
1563+
def test_plan_against_expired_environment(init_and_plan_context: t.Callable):
1564+
context, plan = init_and_plan_context("examples/sushi")
1565+
context.apply(plan)
1566+
1567+
model = context.get_model("sushi.waiter_revenue_by_day")
1568+
context.upsert_model(add_projection_to_model(t.cast(SqlModel, model)))
1569+
1570+
modified_models = {model.fqn, context.get_model("sushi.top_waiters").fqn}
1571+
1572+
plan = context.plan("dev", no_prompts=True)
1573+
assert plan.has_changes
1574+
assert set(plan.context_diff.modified_snapshots) == modified_models
1575+
assert plan.missing_intervals
1576+
context.apply(plan)
1577+
1578+
# Make sure there are no changes when comparing against the existing environment.
1579+
plan = context.plan("dev", no_prompts=True)
1580+
assert not plan.has_changes
1581+
assert not plan.context_diff.modified_snapshots
1582+
assert not plan.missing_intervals
1583+
1584+
# Invalidate the environment and make sure that the plan detects the changes.
1585+
context.invalidate_environment("dev")
1586+
plan = context.plan("dev", no_prompts=True)
1587+
assert plan.has_changes
1588+
assert set(plan.context_diff.modified_snapshots) == modified_models
1589+
assert not plan.missing_intervals
1590+
context.apply(plan)
1591+
1592+
15621593
def test_plan_twice_with_star_macro_yields_no_diff(tmp_path: Path):
15631594
init_example_project(tmp_path, dialect="duckdb")
15641595

tests/core/test_state_sync.py

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -862,6 +862,32 @@ def test_promote_snapshots_parent_plan_id_mismatch(
862862
state_sync.promote(stale_new_environment)
863863

864864

865+
def test_promote_environment_expired(state_sync: EngineAdapterStateSync, make_snapshot: t.Callable):
866+
snapshot = make_snapshot(
867+
SqlModel(
868+
name="a",
869+
query=parse_one("select 1, ds"),
870+
),
871+
)
872+
snapshot.categorize_as(SnapshotChangeCategory.BREAKING)
873+
874+
state_sync.push_snapshots([snapshot])
875+
promote_snapshots(state_sync, [snapshot], "dev")
876+
state_sync.invalidate_environment("dev")
877+
878+
new_environment = Environment(
879+
name="dev",
880+
snapshots=[snapshot.table_info],
881+
start_at="2022-01-01",
882+
end_at="2022-01-01",
883+
plan_id="new_plan_id",
884+
previous_plan_id=None, # No previous plan ID since it's technically a new environment
885+
)
886+
887+
# This call shouldn't fail.
888+
state_sync.promote(new_environment)
889+
890+
865891
def test_promote_snapshots_no_gaps(state_sync: EngineAdapterStateSync, make_snapshot: t.Callable):
866892
model = SqlModel(
867893
name="a",

0 commit comments

Comments
 (0)