Skip to content

Commit 3c7779b

Browse files
committed
Fix: Check whether the expired snapshot is among previously finalized snapshots if the environment is not finalized
1 parent 5435ff8 commit 3c7779b

File tree

2 files changed

+83
-1
lines changed

2 files changed

+83
-1
lines changed

sqlmesh/core/state_sync/db/snapshot.py

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -185,7 +185,12 @@ def get_expired_snapshots(
185185
promoted_snapshot_ids = {
186186
snapshot.snapshot_id
187187
for environment in environments
188-
for snapshot in environment.snapshots
188+
for snapshot in (
189+
environment.snapshots
190+
if environment.finalized_ts is not None
191+
# If the environment is not finalized, check both the current snapshots and the previous finalized snapshots
192+
else [*environment.snapshots, *(environment.previous_finalized_snapshots or [])]
193+
)
189194
}
190195

191196
if promoted_snapshot_ids:

tests/core/state_sync/test_state_sync.py

Lines changed: 77 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1638,6 +1638,83 @@ def test_delete_expired_snapshots_promoted(
16381638
assert not state_sync.get_snapshots(all_snapshots)
16391639

16401640

1641+
def test_delete_expired_snapshots_previous_finalized_snapshots(
1642+
state_sync: EngineAdapterStateSync, make_snapshot: t.Callable
1643+
):
1644+
"""Test that expired snapshots are protected if they are part of previous finalized snapshots
1645+
in a non-finalized environment."""
1646+
now_ts = now_timestamp()
1647+
1648+
# Create an old snapshot that will be expired
1649+
old_snapshot = make_snapshot(
1650+
SqlModel(
1651+
name="a",
1652+
query=parse_one("select a, ds"),
1653+
),
1654+
)
1655+
old_snapshot.ttl = "in 10 seconds"
1656+
old_snapshot.categorize_as(SnapshotChangeCategory.BREAKING)
1657+
1658+
# Create a new snapshot
1659+
new_snapshot = make_snapshot(
1660+
SqlModel(
1661+
name="a",
1662+
query=parse_one("select a, b, ds"),
1663+
),
1664+
)
1665+
new_snapshot.categorize_as(SnapshotChangeCategory.BREAKING)
1666+
1667+
state_sync.push_snapshots([old_snapshot, new_snapshot])
1668+
1669+
# Promote the old snapshot to an environment and finalize it
1670+
env = Environment(
1671+
name="test_environment",
1672+
snapshots=[old_snapshot.table_info],
1673+
start_at="2022-01-01",
1674+
end_at="2022-01-01",
1675+
plan_id="test_plan_id",
1676+
previous_plan_id="test_plan_id",
1677+
)
1678+
state_sync.promote(env)
1679+
state_sync.finalize(env)
1680+
1681+
# Verify old snapshot is not cleaned up because it's in a finalized environment
1682+
assert not _get_cleanup_tasks(state_sync)
1683+
1684+
# Now promote the new snapshot to the same environment (this simulates a new plan)
1685+
# The environment will have previous_finalized_snapshots set to the old snapshot
1686+
# and will not be finalized yet
1687+
env = Environment(
1688+
name="test_environment",
1689+
snapshots=[new_snapshot.table_info],
1690+
previous_finalized_snapshots=[old_snapshot.table_info],
1691+
start_at="2022-01-01",
1692+
end_at="2022-01-01",
1693+
plan_id="new_plan_id",
1694+
previous_plan_id="test_plan_id",
1695+
)
1696+
state_sync.promote(env)
1697+
1698+
# Manually update the snapshtos updated_ts to simulate expiration
1699+
state_sync.engine_adapter.execute(
1700+
f"UPDATE sqlmesh._snapshots SET updated_ts = {now_ts - 15000} WHERE name = '{old_snapshot.name}' AND identifier = '{old_snapshot.identifier}'"
1701+
)
1702+
1703+
# The old snapshot should still not be cleaned up because it's part of
1704+
# previous_finalized_snapshots in a non-finalized environment
1705+
assert not _get_cleanup_tasks(state_sync)
1706+
state_sync.delete_expired_snapshots(batch_range=ExpiredBatchRange.all_batch_range())
1707+
assert state_sync.snapshots_exist([old_snapshot.snapshot_id]) == {old_snapshot.snapshot_id}
1708+
1709+
# Once the environment is finalized, the expired snapshot should be removed successfully
1710+
state_sync.finalize(env)
1711+
assert _get_cleanup_tasks(state_sync) == [
1712+
SnapshotTableCleanupTask(snapshot=old_snapshot.table_info, dev_table_only=False),
1713+
]
1714+
state_sync.delete_expired_snapshots(batch_range=ExpiredBatchRange.all_batch_range())
1715+
assert not state_sync.snapshots_exist([old_snapshot.snapshot_id])
1716+
1717+
16411718
def test_delete_expired_snapshots_dev_table_cleanup_only(
16421719
state_sync: EngineAdapterStateSync, make_snapshot: t.Callable
16431720
):

0 commit comments

Comments
 (0)