5454
5555if t .TYPE_CHECKING :
5656 from sqlmesh .core .context import ExecutionContext
57+ from sqlmesh .core ._typing import TableName
58+ from sqlmesh .core .engine_adapter import EngineAdapter
5759
5860logger = logging .getLogger (__name__ )
5961SnapshotToIntervals = t .Dict [Snapshot , Intervals ]
@@ -188,6 +190,46 @@ def merged_missing_intervals(
188190 }
189191 return snapshots_to_intervals
190192
193+ def can_skip_evaluation (self , snapshot : Snapshot , snapshots : t .Dict [str , Snapshot ]) -> bool :
194+ if not snapshot .last_altered_ts :
195+ return False
196+
197+ from collections import defaultdict
198+
199+ parent_snapshots = {p for p in snapshots .values () if p .name != snapshot .name }
200+ if len (parent_snapshots ) != len (snapshot .node .depends_on ):
201+ # The mismatch can happen if e.g an external model is not registered in the project
202+ return False
203+
204+ adapter_to_parent_snapshots : t .Dict [EngineAdapter , t .List [Snapshot ]] = defaultdict (list )
205+
206+ for parent_snapshot in parent_snapshots :
207+ if not parent_snapshot .is_external :
208+ return False
209+
210+ adapter = self .snapshot_evaluator .get_adapter (parent_snapshot .model_gateway )
211+ if not adapter .SUPPORTS_EXTERNAL_MODEL_FRESHNESS :
212+ return False
213+
214+ adapter_to_parent_snapshots [adapter ].append (parent_snapshot )
215+
216+ if not adapter_to_parent_snapshots :
217+ return False
218+
219+ external_models_freshness : t .List [int ] = []
220+
221+ for adapter , adapter_snapshots in adapter_to_parent_snapshots .items ():
222+ table_names : t .List [TableName ] = [
223+ exp .to_table (parent_snapshot .name , parent_snapshot .node .dialect )
224+ for parent_snapshot in adapter_snapshots
225+ ]
226+ external_models_freshness .extend (adapter .get_external_model_freshness (table_names ))
227+
228+ return all (
229+ snapshot .last_altered_ts > external_model_freshness
230+ for external_model_freshness in external_models_freshness
231+ )
232+
191233 def evaluate (
192234 self ,
193235 snapshot : Snapshot ,
@@ -200,6 +242,7 @@ def evaluate(
200242 allow_destructive_snapshots : t .Optional [t .Set [str ]] = None ,
201243 allow_additive_snapshots : t .Optional [t .Set [str ]] = None ,
202244 target_table_exists : t .Optional [bool ] = None ,
245+ is_restatement_plan : bool = False ,
203246 ** kwargs : t .Any ,
204247 ) -> t .List [AuditResult ]:
205248 """Evaluate a snapshot and add the processed interval to the state sync.
@@ -224,6 +267,14 @@ def evaluate(
224267
225268 snapshots = parent_snapshots_by_name (snapshot , self .snapshots )
226269
270+ if not is_restatement_plan and self .can_skip_evaluation (snapshot , snapshots ):
271+ logger .info (f"""
272+ Skipping evaluation for snapshot { snapshot .name } as it depends on external models
273+ that have not been updated since the last run.
274+ """ )
275+
276+ return []
277+
227278 is_deployable = deployability_index .is_deployable (snapshot )
228279
229280 wap_id = self .snapshot_evaluator .evaluate (
@@ -251,7 +302,9 @@ def evaluate(
251302 ** kwargs ,
252303 )
253304
254- self .state_sync .add_interval (snapshot , start , end , is_dev = not is_deployable )
305+ self .state_sync .add_interval (
306+ snapshot , start , end , is_dev = not is_deployable , last_altered_ts = now_timestamp ()
307+ )
255308 return audit_results
256309
257310 def run (
@@ -421,6 +474,7 @@ def run_merged_intervals(
421474 run_environment_statements : bool = False ,
422475 audit_only : bool = False ,
423476 auto_restatement_triggers : t .Dict [SnapshotId , t .List [SnapshotId ]] = {},
477+ is_restatement_plan : bool = False ,
424478 ) -> t .Tuple [t .List [NodeExecutionFailedError [SchedulingUnit ]], t .List [SchedulingUnit ]]:
425479 """Runs precomputed batches of missing intervals.
426480
@@ -526,6 +580,7 @@ def run_node(node: SchedulingUnit) -> None:
526580 allow_destructive_snapshots = allow_destructive_snapshots ,
527581 allow_additive_snapshots = allow_additive_snapshots ,
528582 target_table_exists = snapshot .snapshot_id not in snapshots_to_create ,
583+ is_restatement_plan = is_restatement_plan ,
529584 )
530585
531586 evaluation_duration_ms = now_timestamp () - execution_start_ts
0 commit comments