Skip to content

Commit f33490a

Browse files
committed
PR feedback
1 parent b7dac9a commit f33490a

File tree

1 file changed

+18
-9
lines changed

1 file changed

+18
-9
lines changed

sqlmesh/core/plan/common.py

Lines changed: 18 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -134,12 +134,20 @@ def identify_restatement_intervals_across_snapshot_versions(
134134
unique_snapshot_names = set(snapshot_id.name for snapshot_id in snapshot_intervals_to_clear)
135135

136136
current_ts = current_ts or now_timestamp()
137-
all_matching_snapshot_ids = state_reader.get_snapshot_ids_by_names(
138-
snapshot_names=unique_snapshot_names, current_ts=current_ts, exclude_expired=True
139-
)
137+
all_matching_non_prod_snapshots = {
138+
s.snapshot_id: s
139+
for s in state_reader.get_snapshots_by_names(
140+
snapshot_names=unique_snapshot_names, current_ts=current_ts, exclude_expired=True
141+
)
142+
# Don't clear intervals for a snapshot if it shares the same physical version with prod.
143+
# Otherwise, prod will be affected by what should be a dev operation
144+
if s.name_version not in prod_name_versions
145+
}
140146

141147
# identify the ones that we havent picked up yet, which are the ones that dont exist in any environment
142-
if remaining_snapshot_ids := all_matching_snapshot_ids.difference(snapshot_intervals_to_clear):
148+
if remaining_snapshot_ids := set(all_matching_non_prod_snapshots).difference(
149+
snapshot_intervals_to_clear
150+
):
143151
# these snapshot id's exist in isolation and may be related to a downstream dependency of the :prod_restatements,
144152
# rather than directly related, so we can't simply look up the interval to clear based on :prod_restatements.
145153
# To figure out the interval that should be cleared, we can match to the existing list based on name
@@ -156,18 +164,19 @@ def identify_restatement_intervals_across_snapshot_versions(
156164

157165
snapshot_name_to_widest_interval[s_id.name] = (next_start, next_end)
158166

167+
# we need to fetch full Snapshot's to get access to the SnapshotTableInfo objects
168+
# required by StateSync.remove_intervals()
169+
# but at this point we have minimized the list by excluding the ones that are already present in prod
170+
# and also excluding the ones we have already matched earlier while traversing the environment DAGs
159171
remaining_snapshots = state_reader.get_snapshots(snapshot_ids=remaining_snapshot_ids)
160172
for remaining_snapshot_id, remaining_snapshot in remaining_snapshots.items():
161-
# Don't clear intervals for a snapshot if it shares the same physical version with prod.
162-
# Otherwise, prod will be affected by what should be a dev operation
163-
if remaining_snapshot.name_version in prod_name_versions:
164-
continue
165-
166173
snapshot_intervals_to_clear[remaining_snapshot_id] = SnapshotIntervalClearRequest(
167174
table_info=remaining_snapshot.table_info,
168175
interval=snapshot_name_to_widest_interval[remaining_snapshot_id.name],
169176
)
170177

178+
loaded_snapshots.update(remaining_snapshots)
179+
171180
# for any affected full_history_restatement_only snapshots, we need to widen the intervals being restated to
172181
# include the whole time range for that snapshot. This requires a call to state to load the full snapshot record,
173182
# so we only do it if necessary

0 commit comments

Comments
 (0)