44import typing as t
55import logging
66from dataclasses import dataclass
7+ from collections import defaultdict
78
89from rich .console import Console as RichConsole
910from rich .tree import Tree
2223 PlanEvaluator ,
2324)
2425from sqlmesh .core .state_sync import StateReader
25- from sqlmesh .core .snapshot .definition import SnapshotInfoMixin , SnapshotNameVersionLike
26+ from sqlmesh .core .snapshot .definition import (
27+ Snapshot ,
28+ SnapshotInfoMixin ,
29+ SnapshotNameVersionLike ,
30+ )
2631from sqlmesh .utils import Verbosity , rich as srich , to_snake_case
2732from sqlmesh .utils .date import to_ts
2833from sqlmesh .utils .errors import SQLMeshError
@@ -76,8 +81,10 @@ class ExplainableRestatementStage(stages.RestatementStage):
7681 of what might happen when they ask for the plan to be explained
7782 """
7883
79- snapshot_intervals_to_clear : t .Dict [str , SnapshotIntervalClearRequest ]
80- """Which snapshots from other environments would have intervals cleared as part of restatement, keyed by name"""
84+ snapshot_intervals_to_clear : t .Dict [
85+ str , t .List [t .Tuple [Snapshot , SnapshotIntervalClearRequest ]]
86+ ]
87+ """Which snapshots from other environments would have intervals cleared as part of restatement, grouped by name."""
8188
8289 @classmethod
8390 def from_restatement_stage (
@@ -86,17 +93,30 @@ def from_restatement_stage(
8693 state_reader : StateReader ,
8794 plan : EvaluatablePlan ,
8895 ) -> ExplainableRestatementStage :
96+ loaded_snapshots = {s .snapshot_id : s for s in stage .all_snapshots .values ()}
97+
8998 all_restatement_intervals = identify_restatement_intervals_across_snapshot_versions (
9099 state_reader = state_reader ,
91100 prod_restatements = plan .restatements ,
92101 disable_restatement_models = plan .disabled_restatement_models ,
93- loaded_snapshots = { s . snapshot_id : s for s in stage . all_snapshots . values ()} ,
102+ loaded_snapshots = loaded_snapshots ,
94103 )
95104
105+ # extend loaded_snapshots with the remaining full Snapshot objects from all_restatement_intervals
106+ # so that we can generate physical table names for them while explaining what's going on
107+ remaining_snapshot_ids_to_load = set (all_restatement_intervals ).difference (loaded_snapshots )
108+ loaded_snapshots .update (
109+ state_reader .get_snapshots (snapshot_ids = remaining_snapshot_ids_to_load )
110+ )
111+
112+ snapshot_intervals_to_clear = defaultdict (list )
113+ for snapshot_id , clear_request in all_restatement_intervals .items ():
114+ snapshot_intervals_to_clear [clear_request .snapshot .name ].append (
115+ (loaded_snapshots [snapshot_id ], clear_request )
116+ )
117+
96118 return cls (
97- snapshot_intervals_to_clear = {
98- s .snapshot .name : s for s in all_restatement_intervals .values ()
99- },
119+ snapshot_intervals_to_clear = snapshot_intervals_to_clear ,
100120 all_snapshots = stage .all_snapshots ,
101121 )
102122
@@ -199,15 +219,50 @@ def visit_explainable_restatement_stage(self, stage: ExplainableRestatementStage
199219 def visit_restatement_stage (
200220 self , stage : t .Union [ExplainableRestatementStage , stages .RestatementStage ]
201221 ) -> Tree :
202- tree = Tree ("[bold]Invalidate data intervals as part of restatement[/bold]" )
222+ tree = Tree (
223+ "[bold]Invalidate data intervals in state for development environments to prevent old data from being promoted[/bold]\n "
224+ "This only affects state and will not clear physical data from the tables until the next plan for each environment"
225+ )
203226
204227 if isinstance (stage , ExplainableRestatementStage ) and (
205228 snapshot_intervals := stage .snapshot_intervals_to_clear
206229 ):
207- for clear_request in snapshot_intervals .values ():
208- display_name = self ._display_name (clear_request .snapshot )
209- interval = clear_request .interval
210- tree .add (f"{ display_name } [{ to_ts (interval [0 ])} - { to_ts (interval [1 ])} ]" )
230+ for name , requests in snapshot_intervals .items ():
231+ display_name = model_display_name (
232+ name , self .environment_naming_info , self .default_catalog , self .dialect
233+ )
234+
235+ # group by environment for the console output
236+ by_environment : t .Dict [t .Optional [str ], t .List [Snapshot ]] = defaultdict (list )
237+
238+ interval_start = None
239+ interval_end = None
240+
241+ for snapshot , clear_request in requests :
242+ # used for the top level tree node
243+ interval_start , interval_end = clear_request .interval
244+
245+ if clear_request .sorted_environment_names :
246+ # snapshot is promoted in these environments
247+ for env in clear_request .sorted_environment_names :
248+ by_environment [env ].append (snapshot )
249+ else :
250+ # snapshot is not currently promoted in any environment
251+ by_environment [None ].append (snapshot )
252+
253+ if not interval_start or not interval_end :
254+ continue
255+
256+ node = tree .add (f"{ display_name } [{ to_ts (interval_start )} - { to_ts (interval_end )} ]" )
257+
258+ for env_name , snapshots_to_clear in by_environment .items ():
259+ env_name = env_name or "(no env)"
260+ for snapshot in snapshots_to_clear :
261+ # note: we dont need a DeployabilityIndex and can just hardcode is_deployable=True.
262+ # The reason is that non-deployable data can never be restated so we only need to
263+ # bother clearing intervals for the deployable version of the table
264+ physical_table_name = snapshot .table_name (True )
265+ node .add (f"{ env_name } -> { physical_table_name } " )
211266
212267 return tree
213268
0 commit comments