1010from unittest import mock
1111from unittest .mock import patch
1212import logging
13+ from IPython .utils .capture import capture_output
14+
1315
1416import time_machine
1517from pytest_mock .plugin import MockerFixture
@@ -3833,36 +3835,45 @@ def test_external_model_freshness(ctx: TestContext, mocker: MockerFixture, tmp_p
38333835 if not adapter .SUPPORTS_EXTERNAL_MODEL_FRESHNESS :
38343836 pytest .skip ("This test only runs for engines that support external model freshness" )
38353837
3836- def _run_plan (
3837- sqlmesh_context : Context , restate_models : t .Optional [t .List [str ]] = None
3838- ) -> PlanResults :
3839- plan : Plan = sqlmesh_context .plan (
3840- auto_apply = True , no_prompts = True , restate_models = restate_models
3838+ def _assert_snapshot_last_altered_ts (context : Context , snapshot_id : str , timestamp : datetime ):
3839+ from sqlmesh .utils .date import to_datetime
3840+
3841+ snapshot = context .state_sync .get_snapshots ([snapshot_id ])[snapshot_id ]
3842+ assert to_datetime (snapshot .last_altered_ts ).replace (microsecond = 0 ) == timestamp .replace (
3843+ microsecond = 0
38413844 )
3842- return PlanResults .create (plan , ctx , schema )
38433845
38443846 import sqlmesh
38453847
38463848 spy = mocker .spy (sqlmesh .core .snapshot .evaluator .SnapshotEvaluator , "evaluate" )
38473849
38483850 def _assert_model_evaluation (lambda_func , was_evaluated , day_delta = 0 ):
3849- call_count_before = spy .call_count
3850- logger = logging .getLogger ("sqlmesh.core.scheduler" )
3851-
3852- with time_machine .travel (now (minute_floor = False ) + timedelta (days = day_delta )):
3853- with mock .patch .object (logger , "info" ) as mock_logger :
3854- lambda_func ()
3855-
3856- evaluation_skipped_log = any (
3857- "Skipping evaluation for snapshot" in call [0 ][0 ] for call in mock_logger .call_args_list
3858- )
3859-
3851+ spy .reset_mock ()
3852+ timestamp = now (minute_floor = False ) + timedelta (days = day_delta )
3853+ with time_machine .travel (timestamp , tick = False ):
3854+ with capture_output () as output :
3855+ plan_or_run_result = lambda_func ()
3856+
3857+ evaluate_function_called = spy .call_count == 1
3858+ signal_was_checked = "Checking signals for" in output .stdout
3859+ restatement_plan = isinstance (plan_or_run_result , Plan ) and plan_or_run_result .restatements
3860+ if restatement_plan :
3861+ # Restatement plans exclude this signal so we expect the actual evaluation
3862+ # to happen but not through the signal
3863+ assert evaluate_function_called
3864+ assert not signal_was_checked
3865+ return
3866+
3867+ # All other cases (e.g normal plans or runs) will check the freshness signal
3868+ assert signal_was_checked
38603869 if was_evaluated :
3861- assert not evaluation_skipped_log
3862- assert spy . call_count == call_count_before + 1
3870+ assert "All ready" in output . stdout
3871+ assert evaluate_function_called
38633872 else :
3864- assert evaluation_skipped_log
3865- assert spy .call_count == call_count_before
3873+ assert "None ready" in output .stdout
3874+ assert not evaluate_function_called
3875+
3876+ return timestamp , plan_or_run_result
38663877
38673878 # Create & initialize schema
38683879 schema = ctx .add_test_suffix (TEST_SCHEMA )
@@ -3899,7 +3910,10 @@ def _assert_model_evaluation(lambda_func, was_evaluated, day_delta=0):
38993910 MODEL (
39003911 name { model_name } ,
39013912 start '2024-01-01',
3902- kind FULL
3913+ kind FULL,
3914+ signals (
3915+ freshness(),
3916+ )
39033917 );
39043918
39053919 SELECT col1 * col2 AS col FROM { external_table1 } , { external_table2 } ;
@@ -3911,23 +3925,47 @@ def _set_config(gateway: str, config: Config) -> None:
39113925
39123926 context = ctx .create_context (path = tmp_path , config_mutator = _set_config )
39133927
3914- # Case 1: Model is evaluated on first insertion
3915- _assert_model_evaluation (lambda : _run_plan (context ), was_evaluated = True )
3928+ # Case 1: Model is evaluated for the first plan
3929+ prod_plan_ts , prod_plan = _assert_model_evaluation (
3930+ lambda : context .plan (auto_apply = True , no_prompts = True ), was_evaluated = True
3931+ )
3932+
3933+ prod_snapshot_id = next (iter (prod_plan .context_diff .new_snapshots ))
3934+ _assert_snapshot_last_altered_ts (context , prod_snapshot_id , prod_plan_ts )
39163935
39173936 # Case 2: Model is NOT evaluated on run if external models are not fresh
3918- _assert_model_evaluation (lambda : context .run (), was_evaluated = False , day_delta = 2 )
3937+ _assert_model_evaluation (lambda : context .run (), was_evaluated = False , day_delta = 1 )
39193938
3920- # Case 3: Model is evaluated on run if any external model is fresh
3921- adapter .execute (f"INSERT INTO { external_table2 } (col2) VALUES (3)" , quote_identifiers = False )
3939+ # Case 3: Differentiate last_altered_ts between snapshots with shared version
3940+ # For instance, creating a FORWARD_ONLY change in dev (reusing the version but creating a dev preview) should not cause
3941+ # the prod snapshot's last_altered_ts to be updated when fetched from the state sync
3942+ model_path .write_text (model_path .read_text ().replace ("col1 * col2" , "col1 + col2" ))
3943+ context .load ()
3944+ dev_plan_ts = now (minute_floor = False ) + timedelta (days = 2 )
3945+ with time_machine .travel (dev_plan_ts , tick = False ):
3946+ dev_plan = context .plan (
3947+ environment = "dev" , forward_only = True , auto_apply = True , no_prompts = True
3948+ )
3949+
3950+ context .state_sync .clear_cache ()
3951+ dev_snapshot_id = next (iter (dev_plan .context_diff .new_snapshots ))
3952+ _assert_snapshot_last_altered_ts (context , dev_snapshot_id , dev_plan_ts )
3953+ _assert_snapshot_last_altered_ts (context , prod_snapshot_id , prod_plan_ts )
39223954
3955+ # Case 4: Model is evaluated on run if any external model is fresh
3956+ adapter .execute (f"INSERT INTO { external_table2 } (col2) VALUES (3)" , quote_identifiers = False )
39233957 _assert_model_evaluation (lambda : context .run (), was_evaluated = True , day_delta = 2 )
39243958
3925- # Case 4: Model is evaluated on a restatement plan even if the external model is not fresh
3959+ # Case 5: Model is evaluated if changed (case 3) even if the external model is not fresh
3960+ model_path .write_text (model_path .read_text ().replace ("col1 + col2" , "col1 * col2 * 5" ))
3961+ context .load ()
39263962 _assert_model_evaluation (
3927- lambda : _run_plan ( context , restate_models = [ model_name ] ), was_evaluated = True , day_delta = 3
3963+ lambda : context . plan ( auto_apply = True , no_prompts = True ), was_evaluated = True , day_delta = 3
39283964 )
39293965
3930- # Case 5: Model is evaluated if changed even if the external model is not fresh
3931- model_path .write_text (model_path .read_text ().replace ("col1 * col2" , "col1 + col2" ))
3932- context .load ()
3933- _assert_model_evaluation (lambda : _run_plan (context ), was_evaluated = True , day_delta = 2 )
3966+ # Case 6: Model is evaluated on a restatement plan even if the external model is not fresh
3967+ _assert_model_evaluation (
3968+ lambda : context .plan (restate_models = [model_name ], auto_apply = True , no_prompts = True ),
3969+ was_evaluated = True ,
3970+ day_delta = 4 ,
3971+ )
0 commit comments