Skip to content

Commit ce791bd

Browse files
committed
Fix: Exclude pending restatement intervals when fetching max interval end for a snapshot (#3661)
1 parent 17a2e4f commit ce791bd

File tree

2 files changed

+62
-1
lines changed

2 files changed

+62
-1
lines changed

sqlmesh/core/state_sync/engine_adapter.py

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1177,7 +1177,11 @@ def max_interval_end_per_model(
11771177
.from_(exp.to_table(self.intervals_table).as_(table_alias))
11781178
.where(where, copy=False)
11791179
.where(
1180-
exp.and_(exp.to_column("is_dev").not_(), exp.to_column("is_removed").not_()),
1180+
exp.and_(
1181+
exp.to_column("is_dev").not_(),
1182+
exp.to_column("is_removed").not_(),
1183+
exp.to_column("is_pending_restatement").not_(),
1184+
),
11811185
copy=False,
11821186
)
11831187
.group_by(name_col, version_col, copy=False)

tests/core/test_state_sync.py

Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2212,6 +2212,63 @@ def test_max_interval_end_per_model(
22122212
assert state_sync.max_interval_end_per_model(environment_name, set()) == {}
22132213

22142214

2215+
def test_max_interval_end_per_model_with_pending_restatements(
2216+
state_sync: EngineAdapterStateSync, make_snapshot: t.Callable
2217+
) -> None:
2218+
snapshot = make_snapshot(
2219+
SqlModel(
2220+
name="a",
2221+
cron="@daily",
2222+
query=parse_one("select 1, ds"),
2223+
),
2224+
)
2225+
snapshot.categorize_as(SnapshotChangeCategory.BREAKING)
2226+
2227+
state_sync.push_snapshots([snapshot])
2228+
2229+
state_sync.add_interval(snapshot, "2023-01-01", "2023-01-01")
2230+
state_sync.add_interval(snapshot, "2023-01-02", "2023-01-02")
2231+
state_sync.add_interval(snapshot, "2023-01-03", "2023-01-03")
2232+
# Add a pending restatement interval
2233+
state_sync.add_snapshots_intervals(
2234+
[
2235+
SnapshotIntervals(
2236+
name=snapshot.name,
2237+
identifier=snapshot.identifier,
2238+
version=snapshot.version,
2239+
intervals=[],
2240+
dev_intervals=[],
2241+
pending_restatement_intervals=[
2242+
(to_timestamp("2023-01-04"), to_timestamp("2023-01-05"))
2243+
],
2244+
)
2245+
]
2246+
)
2247+
2248+
snapshot = state_sync.get_snapshots([snapshot.snapshot_id])[snapshot.snapshot_id]
2249+
assert snapshot.intervals == [(to_timestamp("2023-01-01"), to_timestamp("2023-01-04"))]
2250+
assert snapshot.pending_restatement_intervals == [
2251+
(to_timestamp("2023-01-04"), to_timestamp("2023-01-05"))
2252+
]
2253+
2254+
environment_name = "test_max_interval_end_for_environment"
2255+
2256+
state_sync.promote(
2257+
Environment(
2258+
name=environment_name,
2259+
snapshots=[snapshot.table_info],
2260+
start_at="2023-01-01",
2261+
end_at="2023-01-03",
2262+
plan_id="test_plan_id",
2263+
previous_finalized_snapshots=[],
2264+
)
2265+
)
2266+
2267+
assert state_sync.max_interval_end_per_model(environment_name) == {
2268+
snapshot.name: to_timestamp("2023-01-04")
2269+
}
2270+
2271+
22152272
def test_max_interval_end_per_model_ensure_finalized_snapshots(
22162273
state_sync: EngineAdapterStateSync, make_snapshot: t.Callable
22172274
) -> None:

0 commit comments

Comments
 (0)