Skip to content

Commit 959b606

Browse files
committed
Fix: Make sure that pending restatement intervals are always recorded last during compaction (#3862)
1 parent 70ddf0c commit 959b606

File tree

2 files changed

+56
-0
lines changed

2 files changed

+56
-0
lines changed

sqlmesh/core/state_sync/engine_adapter.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1330,6 +1330,9 @@ def _push_snapshot_intervals(
13301330
new_intervals.append(
13311331
_interval_to_df(snapshot, start_ts, end_ts, is_dev=True, is_compacted=True)
13321332
)
1333+
1334+
# Make sure that all pending restatement intervals are recorded last
1335+
for snapshot in snapshots:
13331336
for start_ts, end_ts in snapshot.pending_restatement_intervals:
13341337
new_intervals.append(
13351338
_interval_to_df(

tests/core/test_state_sync.py

Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2854,3 +2854,56 @@ def test_compact_intervals_pending_restatement_shared_version(
28542854
assert snapshots[snapshot_b.snapshot_id].intervals == [
28552855
(to_timestamp("2020-01-01"), to_timestamp("2020-01-06")),
28562856
]
2857+
2858+
2859+
@time_machine.travel("2020-01-05 00:00:00 UTC")
2860+
def test_compact_intervals_pending_restatement_many_snapshots_same_version(
2861+
state_sync: EngineAdapterStateSync,
2862+
make_snapshot: t.Callable,
2863+
get_snapshot_intervals: t.Callable,
2864+
) -> None:
2865+
snapshots = [
2866+
make_snapshot(
2867+
SqlModel(
2868+
name="a",
2869+
cron="@daily",
2870+
query=parse_one(f"select {i}, ds"),
2871+
),
2872+
version="a",
2873+
)
2874+
for i in range(100)
2875+
]
2876+
2877+
state_sync.push_snapshots(snapshots)
2878+
2879+
for snapshot in snapshots:
2880+
state_sync.add_interval(snapshot, "2020-01-01", "2020-01-01")
2881+
state_sync.add_interval(snapshot, "2020-01-02", "2020-01-02")
2882+
state_sync.add_interval(snapshot, "2020-01-03", "2020-01-03")
2883+
state_sync.add_interval(snapshot, "2020-01-04", "2020-01-04")
2884+
2885+
pending_restatement_intervals = [
2886+
(to_timestamp("2020-01-03"), to_timestamp("2020-01-05")),
2887+
]
2888+
state_sync.add_snapshots_intervals(
2889+
[
2890+
SnapshotIntervals(
2891+
name=snapshots[0].name,
2892+
identifier=snapshots[0].identifier,
2893+
version=snapshots[0].version,
2894+
intervals=[],
2895+
dev_intervals=[],
2896+
pending_restatement_intervals=pending_restatement_intervals,
2897+
)
2898+
]
2899+
)
2900+
2901+
# Because of the number of snapshots requiring compaction, some compacted records will have different creation
2902+
# timestamps.
2903+
state_sync.compact_intervals()
2904+
2905+
assert state_sync.get_snapshots([snapshots[0].snapshot_id])[
2906+
snapshots[0].snapshot_id
2907+
].pending_restatement_intervals == [
2908+
(to_timestamp("2020-01-03"), to_timestamp("2020-01-05")),
2909+
]

0 commit comments

Comments
 (0)